Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): Use deterministic name for cron workflow children #4638

Merged
merged 11 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 33 additions & 23 deletions workflow/common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,21 @@ import (
)

func ConvertCronWorkflowToWorkflow(cronWf *wfv1.CronWorkflow) *wfv1.Workflow {
wf := toWorkflow(cronWf.TypeMeta, cronWf.ObjectMeta, cronWf.Spec.WorkflowSpec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this covered by existing tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is. I added an extra test as well

wf.Labels[LabelKeyCronWorkflow] = cronWf.Name
if cronWf.Spec.WorkflowMetadata != nil {
for key, label := range cronWf.Spec.WorkflowMetadata.Labels {
wf.Labels[key] = label
}
meta := metav1.ObjectMeta{
GenerateName: cronWf.Name + "-",
Labels: make(map[string]string),
Annotations: make(map[string]string),
}
return toWorkflow(*cronWf, meta)
}

if len(cronWf.Spec.WorkflowMetadata.Annotations) > 0 {
wf.Annotations = make(map[string]string)
for key, annotation := range cronWf.Spec.WorkflowMetadata.Annotations {
wf.Annotations[key] = annotation
}
}
func ConvertCronWorkflowToWorkflowWithName(cronWf *wfv1.CronWorkflow, name string) *wfv1.Workflow {
meta := metav1.ObjectMeta{
Name: name,
Labels: make(map[string]string),
Annotations: make(map[string]string),
}
wf.SetOwnerReferences(append(wf.GetOwnerReferences(), *metav1.NewControllerRef(cronWf, wfv1.SchemeGroupVersion.WithKind(workflow.CronWorkflowKind))))
return wf
return toWorkflow(*cronWf, meta)
}

func NewWorkflowFromWorkflowTemplate(templateName string, workflowMetadata *metav1.ObjectMeta, clusterScope bool) *wfv1.Workflow {
Expand Down Expand Up @@ -58,23 +57,34 @@ func NewWorkflowFromWorkflowTemplate(templateName string, workflowMetadata *meta
return wf
}

func toWorkflow(typeMeta metav1.TypeMeta, objectMeta metav1.ObjectMeta, spec wfv1.WorkflowSpec) *wfv1.Workflow {
func toWorkflow(cronWf wfv1.CronWorkflow, objectMeta metav1.ObjectMeta) *wfv1.Workflow {
wf := &wfv1.Workflow{
TypeMeta: metav1.TypeMeta{
Kind: workflow.WorkflowKind,
APIVersion: typeMeta.APIVersion,
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: objectMeta.GetName() + "-",
Labels: make(map[string]string),
Annotations: make(map[string]string),
APIVersion: cronWf.TypeMeta.APIVersion,
},
Spec: spec,
ObjectMeta: objectMeta,
Spec: cronWf.Spec.WorkflowSpec,
}

if instanceId, ok := objectMeta.GetLabels()[LabelKeyControllerInstanceID]; ok {
if instanceId, ok := cronWf.ObjectMeta.GetLabels()[LabelKeyControllerInstanceID]; ok {
wf.ObjectMeta.GetLabels()[LabelKeyControllerInstanceID] = instanceId
}

wf.Labels[LabelKeyCronWorkflow] = cronWf.Name
if cronWf.Spec.WorkflowMetadata != nil {
simster7 marked this conversation as resolved.
Show resolved Hide resolved
for key, label := range cronWf.Spec.WorkflowMetadata.Labels {
wf.Labels[key] = label
}

if len(cronWf.Spec.WorkflowMetadata.Annotations) > 0 {
wf.Annotations = make(map[string]string)
for key, annotation := range cronWf.Spec.WorkflowMetadata.Annotations {
wf.Annotations[key] = annotation
}
}
}
wf.SetOwnerReferences(append(wf.GetOwnerReferences(), *metav1.NewControllerRef(&cronWf, wfv1.SchemeGroupVersion.WithKind(workflow.CronWorkflowKind))))

return wf
}
33 changes: 24 additions & 9 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset vers
}

func (woc *cronWfOperationCtx) Run() {
woc.run(inferScheduledRuntime())
}

func (woc *cronWfOperationCtx) run(scheduledRuntime time.Time) {
defer woc.persistUpdate()

woc.log.Infof("Running %s", woc.name)
Expand All @@ -69,7 +73,7 @@ func (woc *cronWfOperationCtx) Run() {
return
}

wf := common.ConvertCronWorkflowToWorkflow(woc.cronWf)
wf := common.ConvertCronWorkflowToWorkflowWithName(woc.cronWf, getChildWorkflowName(woc.cronWf.Name, scheduledRuntime))

runWf, err := util.SubmitWorkflow(woc.wfClient, woc.wfClientset, woc.cronWf.Namespace, wf, &v1alpha1.SubmitOpts{})
if err != nil {
Expand Down Expand Up @@ -169,40 +173,40 @@ func (woc *cronWfOperationCtx) terminateOutstandingWorkflows() error {
}

func (woc *cronWfOperationCtx) runOutstandingWorkflows() (bool, error) {
proceed, err := woc.shouldOutstandingWorkflowsBeRun()
proceed, missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun()
simster7 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return false, err
}
if proceed {
woc.Run()
woc.run(missedExecutionTime)
return true, nil
}
return false, nil
}

func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (bool, error) {
func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (bool, time.Time, error) {
// If this CronWorkflow has been run before, check if we have missed any scheduled executions
if woc.cronWf.Status.LastScheduledTime != nil {
var now time.Time
var cronSchedule cron.Schedule
if woc.cronWf.Spec.Timezone != "" {
loc, err := time.LoadLocation(woc.cronWf.Spec.Timezone)
if err != nil {
return false, fmt.Errorf("invalid timezone '%s': %s", woc.cronWf.Spec.Timezone, err)
return false, time.Time{}, fmt.Errorf("invalid timezone '%s': %s", woc.cronWf.Spec.Timezone, err)
}
now = time.Now().In(loc)

cronScheduleString := "CRON_TZ=" + woc.cronWf.Spec.Timezone + " " + woc.cronWf.Spec.Schedule
cronSchedule, err = cron.ParseStandard(cronScheduleString)
if err != nil {
return false, fmt.Errorf("unable to form timezone schedule '%s': %s", cronScheduleString, err)
return false, time.Time{}, fmt.Errorf("unable to form timezone schedule '%s': %s", cronScheduleString, err)
}
} else {
var err error
now = time.Now()
cronSchedule, err = cron.ParseStandard(woc.cronWf.Spec.Schedule)
if err != nil {
return false, err
return false, time.Time{}, err
}
}

Expand All @@ -219,11 +223,11 @@ func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (bool, error) {
// If StartingDeadlineSeconds is not set, or we are still within the deadline window, run the Workflow
if woc.cronWf.Spec.StartingDeadlineSeconds == nil || *woc.cronWf.Spec.StartingDeadlineSeconds == 0 || now.Before(missedExecutionTime.Add(time.Duration(*woc.cronWf.Spec.StartingDeadlineSeconds)*time.Second)) {
woc.log.Infof("%s missed an execution at %s and is within StartingDeadline", woc.cronWf.Name, missedExecutionTime.Format("Mon Jan _2 15:04:05 2006"))
return true, nil
return true, missedExecutionTime, nil
}
}
}
return false, nil
return false, time.Time{}, nil
}

func (woc *cronWfOperationCtx) reconcileActiveWfs(workflows []v1alpha1.Workflow) error {
Expand Down Expand Up @@ -321,3 +325,14 @@ func (woc *cronWfOperationCtx) reportCronWorkflowError(conditionType v1alpha1.Co
})
woc.metrics.CronWorkflowSubmissionError()
}

func inferScheduledRuntime() time.Time {
simster7 marked this conversation as resolved.
Show resolved Hide resolved
// Infer scheduled runtime by getting current time and zeroing out current seconds and nanoseconds
// This works because the finest possible scheduled runtime is a minute
now := time.Now().UTC()
return time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location())
}

func getChildWorkflowName(cronWorkflowName string, scheduledRuntime time.Time) string {
return fmt.Sprintf("%s-%d", cronWorkflowName, scheduledRuntime.Unix())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

truncate time to 1 minute?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function should accept scheduledRuntime as given and not modify it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you truncate there?

}
14 changes: 10 additions & 4 deletions workflow/cron/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err := woc.shouldOutstandingWorkflowsBeRun()
proceed, missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
// The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledRuntime
assert.Equal(t, inferScheduledRuntime().Unix(), missedExecutionTime.Unix())
assert.True(t, proceed)

// StartingDeadlineSeconds is not after the current second, so cron should not be run
Expand All @@ -89,8 +91,9 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err = woc.shouldOutstandingWorkflowsBeRun()
proceed, missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
assert.Equal(t, time.Time{}, missedExecutionTime)
assert.False(t, proceed)

// Run the same test in a different timezone
Expand All @@ -110,8 +113,10 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err = woc.shouldOutstandingWorkflowsBeRun()
proceed, missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
// The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledRuntime
assert.Equal(t, inferScheduledRuntime().Unix(), missedExecutionTime.Unix())
assert.True(t, proceed)

// StartingDeadlineSeconds is not after the current second, so cron should not be run
Expand All @@ -121,8 +126,9 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err = woc.shouldOutstandingWorkflowsBeRun()
proceed, missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
assert.Equal(t, time.Time{}, missedExecutionTime)
assert.False(t, proceed)
}

Expand Down