Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix inconsistent persitence of cron workflow objects #4639

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 68 additions & 14 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import (
"sort"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"

"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned"
Expand All @@ -29,17 +28,20 @@ type cronWfOperationCtx struct {
// CronWorkflow is the CronWorkflow to be run
name string
cronWf *v1alpha1.CronWorkflow
origCronWf *v1alpha1.CronWorkflow
wfClientset versioned.Interface
wfClient typed.WorkflowInterface
cronWfIf typed.CronWorkflowInterface
log *log.Entry
metrics *metrics.Metrics
updated bool
}

func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, metrics *metrics.Metrics) *cronWfOperationCtx {
return &cronWfOperationCtx{
name: cronWorkflow.ObjectMeta.Name,
cronWf: cronWorkflow,
origCronWf: cronWorkflow.DeepCopy(),
wfClientset: wfClientset,
wfClient: wfClientset.ArgoprojV1alpha1().Workflows(cronWorkflow.Namespace),
cronWfIf: wfClientset.ArgoprojV1alpha1().CronWorkflows(cronWorkflow.Namespace),
Expand All @@ -48,6 +50,7 @@ func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset vers
"namespace": cronWorkflow.ObjectMeta.Namespace,
}),
metrics: metrics,
updated: false,
}
}

Expand Down Expand Up @@ -80,6 +83,7 @@ func (woc *cronWfOperationCtx) Run() {
woc.cronWf.Status.Active = append(woc.cronWf.Status.Active, getWorkflowObjectReference(wf, runWf))
woc.cronWf.Status.LastScheduledTime = &v1.Time{Time: time.Now()}
woc.cronWf.Status.Conditions.RemoveCondition(v1alpha1.ConditionTypeSubmissionError)
woc.updated = true
}

func (woc *cronWfOperationCtx) validateCronWorkflow() error {
Expand All @@ -90,6 +94,7 @@ func (woc *cronWfOperationCtx) validateCronWorkflow() error {
woc.reportCronWorkflowError(v1alpha1.ConditionTypeSpecError, fmt.Sprint(err))
} else {
woc.cronWf.Status.Conditions.RemoveCondition(v1alpha1.ConditionTypeSpecError)
woc.updated = true
}
return err
}
Expand All @@ -108,22 +113,69 @@ func getWorkflowObjectReference(wf *v1alpha1.Workflow, runWf *v1alpha1.Workflow)
}

func (woc *cronWfOperationCtx) persistUpdate() {
data, err := json.Marshal(map[string]interface{}{"status": woc.cronWf.Status})
if err != nil {
woc.log.WithError(err).Error("failed to marshall cron workflow status data")
if !woc.updated {
return
}
err = wait.ExponentialBackoff(retry.DefaultBackoff, func() (bool, error) {
cronWf, err := woc.cronWfIf.Patch(woc.cronWf.Name, types.MergePatchType, data)
if err != nil {
return false, err

cronWf, err := woc.cronWfIf.Update(woc.cronWf)
if err != nil {
if !errors.IsConflict(err) {
woc.log.WithError(err).Error("failed to update CronWorkflow")
return
}
woc.cronWf = cronWf
return true, nil
})
var reapplyErr error
cronWf, reapplyErr = woc.reapplyUpdate()
if reapplyErr != nil {
woc.log.WithError(reapplyErr).WithField("original error", err).Error("failed to update CronWorkflow after reapply attempt")
return
}
}
woc.cronWf = cronWf
}

func (woc *cronWfOperationCtx) reapplyUpdate() (*v1alpha1.CronWorkflow, error) {
if woc.origCronWf.ResourceVersion != woc.cronWf.ResourceVersion {
return nil, fmt.Errorf("cannot re-apply cron workflow update with mismatched resource versions")
}
orig, err := json.Marshal(woc.origCronWf)
if err != nil {
woc.log.WithError(err).Error("failed to data cron workflow")
return
return nil, err
}
curr, err := json.Marshal(woc.cronWf)
if err != nil {
return nil, err
}
patch, err := jsonpatch.CreateMergePatch(orig, curr)
if err != nil {
return nil, err
}
attempts := 0
for {
currCronWf, err := woc.cronWfIf.Get(woc.name, v1.GetOptions{})
if err != nil {
return nil, err
}
currCronWfBytes, err := json.Marshal(currCronWf)
if err != nil {
return nil, err
}
newCronWfBytes, err := jsonpatch.MergePatch(currCronWfBytes, patch)
if err != nil {
return nil, err
}
var newCronWf v1alpha1.CronWorkflow
err = json.Unmarshal(newCronWfBytes, &newCronWf)
if err != nil {
return nil, err
}
cronWf, err := woc.cronWfIf.Update(&newCronWf)
if err == nil {
return cronWf, nil
}
attempts++
if attempts == 5 {
return nil, fmt.Errorf("ran out of retries when trying to reapply update: %s", err)
}
}
}

Expand Down Expand Up @@ -249,6 +301,7 @@ func (woc *cronWfOperationCtx) removeFromActiveList(uid types.UID) {
}
}
woc.cronWf.Status.Active = newActive
woc.updated = true
}

func (woc *cronWfOperationCtx) enforceHistoryLimit(workflows []v1alpha1.Workflow) error {
Expand Down Expand Up @@ -320,4 +373,5 @@ func (woc *cronWfOperationCtx) reportCronWorkflowError(conditionType v1alpha1.Co
Status: v1.ConditionTrue,
})
woc.metrics.CronWorkflowSubmissionError()
woc.updated = true
}
29 changes: 29 additions & 0 deletions workflow/cron/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,32 @@ func TestSpecError(t *testing.T) {
assert.Equal(t, v1alpha1.ConditionTypeSpecError, submissionErrorCond.Type)
assert.Contains(t, submissionErrorCond.Message, "cron schedule is malformed: end of range (12737123) above maximum (12): 12737123")
}

func TestReapplyUpdate(t *testing.T) {
cronWf := v1alpha1.CronWorkflow{
ObjectMeta: v1.ObjectMeta{Name: "my-wf"},
Spec: v1alpha1.CronWorkflowSpec{Schedule: "* * * * *"},
}

cs := fake.NewSimpleClientset(&cronWf)
testMetrics := metrics.New(metrics.ServerConfig{}, metrics.ServerConfig{})
woc := &cronWfOperationCtx{
wfClientset: cs,
wfClient: cs.ArgoprojV1alpha1().Workflows(""),
cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""),
cronWf: &cronWf,
origCronWf: cronWf.DeepCopy(),
name: cronWf.Name,
log: logrus.WithFields(logrus.Fields{}),
metrics: testMetrics,
}

cronWf.Spec.Schedule = "1 * * * *"
_, err := woc.reapplyUpdate()
if assert.NoError(t, err) {
updatedCronWf, err := woc.cronWfIf.Get("my-wf", v1.GetOptions{})
if assert.NoError(t, err) {
assert.Equal(t, "1 * * * *", updatedCronWf.Spec.Schedule)
}
}
}