Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): Use deterministic name for cron workflow children #4638

Merged
merged 11 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 33 additions & 23 deletions workflow/common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,21 @@ import (
)

func ConvertCronWorkflowToWorkflow(cronWf *wfv1.CronWorkflow) *wfv1.Workflow {
wf := toWorkflow(cronWf.TypeMeta, cronWf.ObjectMeta, cronWf.Spec.WorkflowSpec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this covered by existing tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is. I added an extra test as well

wf.Labels[LabelKeyCronWorkflow] = cronWf.Name
if cronWf.Spec.WorkflowMetadata != nil {
for key, label := range cronWf.Spec.WorkflowMetadata.Labels {
wf.Labels[key] = label
}
meta := metav1.ObjectMeta{
GenerateName: cronWf.Name + "-",
Labels: make(map[string]string),
Annotations: make(map[string]string),
}
return toWorkflow(*cronWf, meta)
}

if len(cronWf.Spec.WorkflowMetadata.Annotations) > 0 {
wf.Annotations = make(map[string]string)
for key, annotation := range cronWf.Spec.WorkflowMetadata.Annotations {
wf.Annotations[key] = annotation
}
}
func ConvertCronWorkflowToWorkflowWithName(cronWf *wfv1.CronWorkflow, name string) *wfv1.Workflow {
meta := metav1.ObjectMeta{
Name: name,
Labels: make(map[string]string),
Annotations: make(map[string]string),
}
wf.SetOwnerReferences(append(wf.GetOwnerReferences(), *metav1.NewControllerRef(cronWf, wfv1.SchemeGroupVersion.WithKind(workflow.CronWorkflowKind))))
return wf
return toWorkflow(*cronWf, meta)
}

func NewWorkflowFromWorkflowTemplate(templateName string, workflowMetadata *metav1.ObjectMeta, clusterScope bool) *wfv1.Workflow {
Expand Down Expand Up @@ -58,23 +57,34 @@ func NewWorkflowFromWorkflowTemplate(templateName string, workflowMetadata *meta
return wf
}

func toWorkflow(typeMeta metav1.TypeMeta, objectMeta metav1.ObjectMeta, spec wfv1.WorkflowSpec) *wfv1.Workflow {
func toWorkflow(cronWf wfv1.CronWorkflow, objectMeta metav1.ObjectMeta) *wfv1.Workflow {
wf := &wfv1.Workflow{
TypeMeta: metav1.TypeMeta{
Kind: workflow.WorkflowKind,
APIVersion: typeMeta.APIVersion,
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: objectMeta.GetName() + "-",
Labels: make(map[string]string),
Annotations: make(map[string]string),
APIVersion: cronWf.TypeMeta.APIVersion,
},
Spec: spec,
ObjectMeta: objectMeta,
Spec: cronWf.Spec.WorkflowSpec,
}

if instanceId, ok := objectMeta.GetLabels()[LabelKeyControllerInstanceID]; ok {
if instanceId, ok := cronWf.ObjectMeta.GetLabels()[LabelKeyControllerInstanceID]; ok {
wf.ObjectMeta.GetLabels()[LabelKeyControllerInstanceID] = instanceId
}

wf.Labels[LabelKeyCronWorkflow] = cronWf.Name
if cronWf.Spec.WorkflowMetadata != nil {
simster7 marked this conversation as resolved.
Show resolved Hide resolved
for key, label := range cronWf.Spec.WorkflowMetadata.Labels {
wf.Labels[key] = label
}

if len(cronWf.Spec.WorkflowMetadata.Annotations) > 0 {
wf.Annotations = make(map[string]string)
for key, annotation := range cronWf.Spec.WorkflowMetadata.Annotations {
wf.Annotations[key] = annotation
}
}
}
wf.SetOwnerReferences(append(wf.GetOwnerReferences(), *metav1.NewControllerRef(&cronWf, wfv1.SchemeGroupVersion.WithKind(workflow.CronWorkflowKind))))

return wf
}
21 changes: 19 additions & 2 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,22 @@ func (cc *Controller) Run(ctx context.Context) {
defer cc.cron.Stop()

go cc.cronWfInformer.Informer().Run(ctx.Done())
go wait.Until(cc.syncAll, 10*time.Second, ctx.Done())
go func() {
// To minimize syncAll and scheduled crons stepping over each other, ensure that syncAll runs every 10 seconds
// starting on a x5 second mark (e.g., 15, 35, etc.). Since crons are guaranteed to run on a 00 second mark, this
// makes sure that there is always a 5 second time separation between syncAll opeartions and any scheduled cron
_, _, sec := time.Now().Clock()
singleSec := sec % 10
var toWait time.Duration
if singleSec <= 5 {
toWait = time.Duration(5-singleSec) * time.Second
alexec marked this conversation as resolved.
Show resolved Hide resolved
} else {
toWait = time.Duration(15-singleSec) * time.Second
}
time.Sleep(toWait)

go wait.NonSlidingUntil(cc.syncAll, 10*time.Second, ctx.Done())
}()

for i := 0; i < cronWorkflowWorkers; i++ {
go wait.Until(cc.runCronWorker, time.Second, ctx.Done())
Expand Down Expand Up @@ -170,12 +185,14 @@ func (cc *Controller) processNextCronItem() bool {
cronSchedule = "CRON_TZ=" + cronWf.Spec.Timezone + " " + cronSchedule
}

err = cc.cron.AddJob(key.(string), cronSchedule, cronWorkflowOperationCtx)
lastScheduledTimeFunc, err := cc.cron.AddJob(key.(string), cronSchedule, cronWorkflowOperationCtx)
if err != nil {
logCtx.WithError(err).Error("could not schedule CronWorkflow")
return true
}

cronWorkflowOperationCtx.scheduledTimeFunc = lastScheduledTimeFunc

logCtx.Infof("CronWorkflow %s added", key.(string))

return true
Expand Down
13 changes: 10 additions & 3 deletions workflow/cron/cron_facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"sync"
"time"

"github.com/robfig/cron/v3"
)
Expand All @@ -16,6 +17,8 @@ type cronFacade struct {
entryIDs map[string]cron.EntryID
}

type ScheduledTimeFunc func() time.Time

func newCronFacade() *cronFacade {
return &cronFacade{
cron: cron.New(),
Expand All @@ -42,15 +45,19 @@ func (f *cronFacade) Delete(key string) {
delete(f.entryIDs, key)
}

func (f *cronFacade) AddJob(key, schedule string, cwoc *cronWfOperationCtx) error {
func (f *cronFacade) AddJob(key, schedule string, cwoc *cronWfOperationCtx) (ScheduledTimeFunc, error) {
f.mu.Lock()
defer f.mu.Unlock()
entryID, err := f.cron.AddJob(schedule, cwoc)
if err != nil {
return err
return nil, err
}
f.entryIDs[key] = entryID
return nil

// Return a function to return the last scheduled time
return func() time.Time {
simster7 marked this conversation as resolved.
Show resolved Hide resolved
return f.cron.Entry(entryID).Prev
}, nil
}

func (f *cronFacade) Load(key string) (*cronWfOperationCtx, error) {
Expand Down
54 changes: 44 additions & 10 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type cronWfOperationCtx struct {
cronWfIf typed.CronWorkflowInterface
log *log.Entry
metrics *metrics.Metrics
// scheduledTimeFunc returns the nearest scheduled time when it is called
simster7 marked this conversation as resolved.
Show resolved Hide resolved
scheduledTimeFunc ScheduledTimeFunc
}

func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, metrics *metrics.Metrics) *cronWfOperationCtx {
Expand All @@ -48,10 +50,20 @@ func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset vers
"namespace": cronWorkflow.ObjectMeta.Namespace,
}),
metrics: metrics,
// inferScheduledTime returns an inferred scheduled time based on the current time and only works if it is called
// within 59 seconds of the scheduled time. Here it acts as a placeholder until it is replaced by a similar
// function that returns the last scheduled time deterministically from the cron engine. Since we are only able
// to generate the latter function after the job is scheduled, there is a tiny chance that the job is run before
// the deterministic function is supplanted. If that happens, we use the infer function as the next-best thing
scheduledTimeFunc: inferScheduledTime,
}
}

func (woc *cronWfOperationCtx) Run() {
woc.run(woc.scheduledTimeFunc())
}

func (woc *cronWfOperationCtx) run(scheduledRuntime time.Time) {
defer woc.persistUpdate()

woc.log.Infof("Running %s", woc.name)
Expand All @@ -69,16 +81,25 @@ func (woc *cronWfOperationCtx) Run() {
return
}

wf := common.ConvertCronWorkflowToWorkflow(woc.cronWf)
wf := common.ConvertCronWorkflowToWorkflowWithName(woc.cronWf, getChildWorkflowName(woc.cronWf.Name, scheduledRuntime))

runWf, err := util.SubmitWorkflow(woc.wfClient, woc.wfClientset, woc.cronWf.Namespace, wf, &v1alpha1.SubmitOpts{})
if err != nil {
if errors.IsAlreadyExists(err) {
simster7 marked this conversation as resolved.
Show resolved Hide resolved
// The scheduled workflow already exists, likely indicating that there is a corrupted LastScheduledTime field.
// If the intended scheduledRuntime is later than the present value in LastScheduledTime, then replace it
if scheduledRuntime.After(woc.cronWf.Status.LastScheduledTime.Time) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is a conflict error, then our time should be the same surely? This code does not get run?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I think after the overwriting stale LastScheduledRun issue is fixed in #4659 this check will be moot. Not sure I would remove it though as it can only fix a corrupted state if it encounters one. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to write tests and then bug fix and maintain all new code - so add less code if it achieves the same outcome

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will remove this then

woc.cronWf.Status.LastScheduledTime = &v1.Time{Time: scheduledRuntime}
}
woc.reportCronWorkflowError(v1alpha1.ConditionTypeSubmissionError, fmt.Sprintf("Workflow scheduled for %s already exists", scheduledRuntime))
return
}
woc.reportCronWorkflowError(v1alpha1.ConditionTypeSubmissionError, fmt.Sprintf("Failed to submit Workflow: %s", err))
return
}

woc.cronWf.Status.Active = append(woc.cronWf.Status.Active, getWorkflowObjectReference(wf, runWf))
woc.cronWf.Status.LastScheduledTime = &v1.Time{Time: time.Now()}
woc.cronWf.Status.LastScheduledTime = &runWf.CreationTimestamp
woc.cronWf.Status.Conditions.RemoveCondition(v1alpha1.ConditionTypeSubmissionError)
}

Expand Down Expand Up @@ -169,40 +190,40 @@ func (woc *cronWfOperationCtx) terminateOutstandingWorkflows() error {
}

func (woc *cronWfOperationCtx) runOutstandingWorkflows() (bool, error) {
proceed, err := woc.shouldOutstandingWorkflowsBeRun()
proceed, missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun()
simster7 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, err
}
if proceed {
woc.Run()
woc.run(missedExecutionTime)
return true, nil
}
return false, nil
}

func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (bool, error) {
func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (bool, time.Time, error) {
// If this CronWorkflow has been run before, check if we have missed any scheduled executions
if woc.cronWf.Status.LastScheduledTime != nil {
var now time.Time
var cronSchedule cron.Schedule
if woc.cronWf.Spec.Timezone != "" {
loc, err := time.LoadLocation(woc.cronWf.Spec.Timezone)
if err != nil {
return false, fmt.Errorf("invalid timezone '%s': %s", woc.cronWf.Spec.Timezone, err)
return false, time.Time{}, fmt.Errorf("invalid timezone '%s': %s", woc.cronWf.Spec.Timezone, err)
}
now = time.Now().In(loc)

cronScheduleString := "CRON_TZ=" + woc.cronWf.Spec.Timezone + " " + woc.cronWf.Spec.Schedule
cronSchedule, err = cron.ParseStandard(cronScheduleString)
if err != nil {
return false, fmt.Errorf("unable to form timezone schedule '%s': %s", cronScheduleString, err)
return false, time.Time{}, fmt.Errorf("unable to form timezone schedule '%s': %s", cronScheduleString, err)
}
} else {
var err error
now = time.Now()
cronSchedule, err = cron.ParseStandard(woc.cronWf.Spec.Schedule)
if err != nil {
return false, err
return false, time.Time{}, err
}
}

Expand All @@ -219,11 +240,11 @@ func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (bool, error) {
// If StartingDeadlineSeconds is not set, or we are still within the deadline window, run the Workflow
if woc.cronWf.Spec.StartingDeadlineSeconds == nil || *woc.cronWf.Spec.StartingDeadlineSeconds == 0 || now.Before(missedExecutionTime.Add(time.Duration(*woc.cronWf.Spec.StartingDeadlineSeconds)*time.Second)) {
woc.log.Infof("%s missed an execution at %s and is within StartingDeadline", woc.cronWf.Name, missedExecutionTime.Format("Mon Jan _2 15:04:05 2006"))
return true, nil
return true, missedExecutionTime, nil
}
}
}
return false, nil
return false, time.Time{}, nil
}

func (woc *cronWfOperationCtx) reconcileActiveWfs(workflows []v1alpha1.Workflow) error {
Expand Down Expand Up @@ -321,3 +342,16 @@ func (woc *cronWfOperationCtx) reportCronWorkflowError(conditionType v1alpha1.Co
})
woc.metrics.CronWorkflowSubmissionError()
}

func inferScheduledTime() time.Time {
// Infer scheduled runtime by getting current time and zeroing out current seconds and nanoseconds
// This works because the finest possible scheduled runtime is a minute. It is unlikely to ever be used, since this
// function is quickly supplanted by a deterministic function from the cron engine.
log.Infof("inferred scheduled time")
alexec marked this conversation as resolved.
Show resolved Hide resolved
now := time.Now().UTC()
return time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location())
}

func getChildWorkflowName(cronWorkflowName string, scheduledRuntime time.Time) string {
return fmt.Sprintf("%s-%d", cronWorkflowName, scheduledRuntime.Unix())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

truncate time to 1 minute?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function should accept scheduledRuntime as given and not modify it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you truncate there?

}
27 changes: 17 additions & 10 deletions workflow/cron/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err := woc.shouldOutstandingWorkflowsBeRun()
proceed, missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
// The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledTime
assert.Equal(t, inferScheduledTime().Unix(), missedExecutionTime.Unix())
assert.True(t, proceed)

// StartingDeadlineSeconds is not after the current second, so cron should not be run
Expand All @@ -89,8 +91,9 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err = woc.shouldOutstandingWorkflowsBeRun()
proceed, missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
assert.Equal(t, time.Time{}, missedExecutionTime)
assert.False(t, proceed)

// Run the same test in a different timezone
Expand All @@ -110,8 +113,10 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err = woc.shouldOutstandingWorkflowsBeRun()
proceed, missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
// The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledTime
assert.Equal(t, inferScheduledTime().Unix(), missedExecutionTime.Unix())
assert.True(t, proceed)

// StartingDeadlineSeconds is not after the current second, so cron should not be run
Expand All @@ -121,8 +126,9 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err = woc.shouldOutstandingWorkflowsBeRun()
proceed, missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
assert.Equal(t, time.Time{}, missedExecutionTime)
assert.False(t, proceed)
}

Expand Down Expand Up @@ -170,12 +176,13 @@ func TestCronWorkflowConditionSubmissionError(t *testing.T) {
cs := fake.NewSimpleClientset()
testMetrics := metrics.New(metrics.ServerConfig{}, metrics.ServerConfig{})
woc := &cronWfOperationCtx{
wfClientset: cs,
wfClient: cs.ArgoprojV1alpha1().Workflows(""),
cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""),
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: testMetrics,
wfClientset: cs,
wfClient: cs.ArgoprojV1alpha1().Workflows(""),
cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""),
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: testMetrics,
scheduledTimeFunc: inferScheduledTime,
}
woc.Run()

Expand Down