Skip to content

Commit

Permalink
feat(controller): Use deterministic name for cron workflow children (#…
Browse files Browse the repository at this point in the history
…4638)

Signed-off-by: Simon Behar <simbeh7@gmail.com>
  • Loading branch information
simster7 committed Dec 10, 2020
1 parent 3a4e974 commit 50210fc
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 52 deletions.
56 changes: 33 additions & 23 deletions workflow/common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,21 @@ import (
)

func ConvertCronWorkflowToWorkflow(cronWf *wfv1.CronWorkflow) *wfv1.Workflow {
wf := toWorkflow(cronWf.TypeMeta, cronWf.ObjectMeta, cronWf.Spec.WorkflowSpec)
wf.Labels[LabelKeyCronWorkflow] = cronWf.Name
if cronWf.Spec.WorkflowMetadata != nil {
for key, label := range cronWf.Spec.WorkflowMetadata.Labels {
wf.Labels[key] = label
}
meta := metav1.ObjectMeta{
GenerateName: cronWf.Name + "-",
Labels: make(map[string]string),
Annotations: make(map[string]string),
}
return toWorkflow(*cronWf, meta)
}

if len(cronWf.Spec.WorkflowMetadata.Annotations) > 0 {
wf.Annotations = make(map[string]string)
for key, annotation := range cronWf.Spec.WorkflowMetadata.Annotations {
wf.Annotations[key] = annotation
}
}
func ConvertCronWorkflowToWorkflowWithName(cronWf *wfv1.CronWorkflow, name string) *wfv1.Workflow {
meta := metav1.ObjectMeta{
Name: name,
Labels: make(map[string]string),
Annotations: make(map[string]string),
}
wf.SetOwnerReferences(append(wf.GetOwnerReferences(), *metav1.NewControllerRef(cronWf, wfv1.SchemeGroupVersion.WithKind(workflow.CronWorkflowKind))))
return wf
return toWorkflow(*cronWf, meta)
}

func NewWorkflowFromWorkflowTemplate(templateName string, workflowMetadata *metav1.ObjectMeta, clusterScope bool) *wfv1.Workflow {
Expand Down Expand Up @@ -58,23 +57,34 @@ func NewWorkflowFromWorkflowTemplate(templateName string, workflowMetadata *meta
return wf
}

func toWorkflow(typeMeta metav1.TypeMeta, objectMeta metav1.ObjectMeta, spec wfv1.WorkflowSpec) *wfv1.Workflow {
func toWorkflow(cronWf wfv1.CronWorkflow, objectMeta metav1.ObjectMeta) *wfv1.Workflow {
wf := &wfv1.Workflow{
TypeMeta: metav1.TypeMeta{
Kind: workflow.WorkflowKind,
APIVersion: typeMeta.APIVersion,
},
ObjectMeta: metav1.ObjectMeta{
GenerateName: objectMeta.GetName() + "-",
Labels: make(map[string]string),
Annotations: make(map[string]string),
APIVersion: cronWf.TypeMeta.APIVersion,
},
Spec: spec,
ObjectMeta: objectMeta,
Spec: cronWf.Spec.WorkflowSpec,
}

if instanceId, ok := objectMeta.GetLabels()[LabelKeyControllerInstanceID]; ok {
if instanceId, ok := cronWf.ObjectMeta.GetLabels()[LabelKeyControllerInstanceID]; ok {
wf.ObjectMeta.GetLabels()[LabelKeyControllerInstanceID] = instanceId
}

wf.Labels[LabelKeyCronWorkflow] = cronWf.Name
if cronWf.Spec.WorkflowMetadata != nil {
for key, label := range cronWf.Spec.WorkflowMetadata.Labels {
wf.Labels[key] = label
}

if len(cronWf.Spec.WorkflowMetadata.Annotations) > 0 {
wf.Annotations = make(map[string]string)
for key, annotation := range cronWf.Spec.WorkflowMetadata.Annotations {
wf.Annotations[key] = annotation
}
}
}
wf.SetOwnerReferences(append(wf.GetOwnerReferences(), *metav1.NewControllerRef(&cronWf, wfv1.SchemeGroupVersion.WithKind(workflow.CronWorkflowKind))))

return wf
}
5 changes: 5 additions & 0 deletions workflow/common/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ spec:
if assert.Contains(t, wf.GetLabels(), LabelKeyControllerInstanceID) {
assert.Equal(t, wf.GetLabels()[LabelKeyControllerInstanceID], "test-controller")
}

err = yaml.Unmarshal([]byte(cronWfInstanceIdString), &cronWf)
assert.NoError(t, err)
wf = ConvertCronWorkflowToWorkflowWithName(&cronWf, "test-name")
assert.Equal(t, "test-name", wf.Name)
}

const workflowTmpl = `
Expand Down
4 changes: 3 additions & 1 deletion workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ func (cc *Controller) processNextCronItem() bool {
cronSchedule = "CRON_TZ=" + cronWf.Spec.Timezone + " " + cronSchedule
}

err = cc.cron.AddJob(key.(string), cronSchedule, cronWorkflowOperationCtx)
lastScheduledTimeFunc, err := cc.cron.AddJob(key.(string), cronSchedule, cronWorkflowOperationCtx)
if err != nil {
logCtx.WithError(err).Error("could not schedule CronWorkflow")
return true
}

cronWorkflowOperationCtx.scheduledTimeFunc = lastScheduledTimeFunc

logCtx.Infof("CronWorkflow %s added", key.(string))

return true
Expand Down
13 changes: 10 additions & 3 deletions workflow/cron/cron_facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"reflect"
"sync"
"time"

"github.com/robfig/cron/v3"
)
Expand All @@ -16,6 +17,8 @@ type cronFacade struct {
entryIDs map[string]cron.EntryID
}

type ScheduledTimeFunc func() time.Time

func newCronFacade() *cronFacade {
return &cronFacade{
cron: cron.New(),
Expand All @@ -42,15 +45,19 @@ func (f *cronFacade) Delete(key string) {
delete(f.entryIDs, key)
}

func (f *cronFacade) AddJob(key, schedule string, cwoc *cronWfOperationCtx) error {
func (f *cronFacade) AddJob(key, schedule string, cwoc *cronWfOperationCtx) (ScheduledTimeFunc, error) {
f.mu.Lock()
defer f.mu.Unlock()
entryID, err := f.cron.AddJob(schedule, cwoc)
if err != nil {
return err
return nil, err
}
f.entryIDs[key] = entryID
return nil

// Return a function to return the last scheduled time
return func() time.Time {
return f.cron.Entry(entryID).Prev
}, nil
}

func (f *cronFacade) Load(key string) (*cronWfOperationCtx, error) {
Expand Down
49 changes: 38 additions & 11 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type cronWfOperationCtx struct {
cronWfIf typed.CronWorkflowInterface
log *log.Entry
metrics *metrics.Metrics
// scheduledTimeFunc returns the last scheduled time when it is called
scheduledTimeFunc ScheduledTimeFunc
}

func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, metrics *metrics.Metrics) *cronWfOperationCtx {
Expand All @@ -49,10 +51,20 @@ func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset vers
"namespace": cronWorkflow.ObjectMeta.Namespace,
}),
metrics: metrics,
// inferScheduledTime returns an inferred scheduled time based on the current time and only works if it is called
// within 59 seconds of the scheduled time. Here it acts as a placeholder until it is replaced by a similar
// function that returns the last scheduled time deterministically from the cron engine. Since we are only able
// to generate the latter function after the job is scheduled, there is a tiny chance that the job is run before
// the deterministic function is supplanted. If that happens, we use the infer function as the next-best thing
scheduledTimeFunc: inferScheduledTime,
}
}

func (woc *cronWfOperationCtx) Run() {
woc.run(woc.scheduledTimeFunc())
}

func (woc *cronWfOperationCtx) run(scheduledRuntime time.Time) {
defer woc.persistUpdate()

woc.log.Infof("Running %s", woc.name)
Expand All @@ -70,7 +82,7 @@ func (woc *cronWfOperationCtx) Run() {
return
}

wf := common.ConvertCronWorkflowToWorkflow(woc.cronWf)
wf := common.ConvertCronWorkflowToWorkflowWithName(woc.cronWf, getChildWorkflowName(woc.cronWf.Name, scheduledRuntime))

runWf, err := util.SubmitWorkflow(woc.wfClient, woc.wfClientset, woc.cronWf.Namespace, wf, &v1alpha1.SubmitOpts{})
if err != nil {
Expand All @@ -79,7 +91,7 @@ func (woc *cronWfOperationCtx) Run() {
}

woc.cronWf.Status.Active = append(woc.cronWf.Status.Active, getWorkflowObjectReference(wf, runWf))
woc.cronWf.Status.LastScheduledTime = &v1.Time{Time: time.Now()}
woc.cronWf.Status.LastScheduledTime = &v1.Time{Time: scheduledRuntime}
woc.cronWf.Status.Conditions.RemoveCondition(v1alpha1.ConditionTypeSubmissionError)
}

Expand Down Expand Up @@ -181,40 +193,40 @@ func (woc *cronWfOperationCtx) terminateOutstandingWorkflows() error {
}

func (woc *cronWfOperationCtx) runOutstandingWorkflows() (bool, error) {
proceed, err := woc.shouldOutstandingWorkflowsBeRun()
missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun()
if err != nil {
return false, err
}
if proceed {
woc.Run()
if !missedExecutionTime.IsZero() {
woc.run(missedExecutionTime)
return true, nil
}
return false, nil
}

func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (bool, error) {
func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (time.Time, error) {
// If this CronWorkflow has been run before, check if we have missed any scheduled executions
if woc.cronWf.Status.LastScheduledTime != nil {
var now time.Time
var cronSchedule cron.Schedule
if woc.cronWf.Spec.Timezone != "" {
loc, err := time.LoadLocation(woc.cronWf.Spec.Timezone)
if err != nil {
return false, fmt.Errorf("invalid timezone '%s': %s", woc.cronWf.Spec.Timezone, err)
return time.Time{}, fmt.Errorf("invalid timezone '%s': %s", woc.cronWf.Spec.Timezone, err)
}
now = time.Now().In(loc)

cronScheduleString := "CRON_TZ=" + woc.cronWf.Spec.Timezone + " " + woc.cronWf.Spec.Schedule
cronSchedule, err = cron.ParseStandard(cronScheduleString)
if err != nil {
return false, fmt.Errorf("unable to form timezone schedule '%s': %s", cronScheduleString, err)
return time.Time{}, fmt.Errorf("unable to form timezone schedule '%s': %s", cronScheduleString, err)
}
} else {
var err error
now = time.Now()
cronSchedule, err = cron.ParseStandard(woc.cronWf.Spec.Schedule)
if err != nil {
return false, err
return time.Time{}, err
}
}

Expand All @@ -231,11 +243,11 @@ func (woc *cronWfOperationCtx) shouldOutstandingWorkflowsBeRun() (bool, error) {
// If StartingDeadlineSeconds is not set, or we are still within the deadline window, run the Workflow
if woc.cronWf.Spec.StartingDeadlineSeconds == nil || *woc.cronWf.Spec.StartingDeadlineSeconds == 0 || now.Before(missedExecutionTime.Add(time.Duration(*woc.cronWf.Spec.StartingDeadlineSeconds)*time.Second)) {
woc.log.Infof("%s missed an execution at %s and is within StartingDeadline", woc.cronWf.Name, missedExecutionTime.Format("Mon Jan _2 15:04:05 2006"))
return true, nil
return missedExecutionTime, nil
}
}
}
return false, nil
return time.Time{}, nil
}

func (woc *cronWfOperationCtx) reconcileActiveWfs(workflows []v1alpha1.Workflow) error {
Expand Down Expand Up @@ -344,3 +356,18 @@ func (woc *cronWfOperationCtx) reportCronWorkflowError(conditionType v1alpha1.Co
})
woc.metrics.CronWorkflowSubmissionError()
}

func inferScheduledTime() time.Time {
// Infer scheduled runtime by getting current time and zeroing out current seconds and nanoseconds
// This works because the finest possible scheduled runtime is a minute. It is unlikely to ever be used, since this
// function is quickly supplanted by a deterministic function from the cron engine.
now := time.Now().UTC()
scheduledTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location())

log.Infof("inferred scheduled time: %s", scheduledTime)
return scheduledTime
}

func getChildWorkflowName(cronWorkflowName string, scheduledRuntime time.Time) string {
return fmt.Sprintf("%s-%d", cronWorkflowName, scheduledRuntime.Unix())
}
31 changes: 17 additions & 14 deletions workflow/cron/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err := woc.shouldOutstandingWorkflowsBeRun()
missedExecutionTime, err := woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
assert.True(t, proceed)
// The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledTime
assert.Equal(t, inferScheduledTime().Unix(), missedExecutionTime.Unix())

// StartingDeadlineSeconds is not after the current second, so cron should not be run
startingDeadlineSeconds = int64(25)
Expand All @@ -89,9 +90,9 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err = woc.shouldOutstandingWorkflowsBeRun()
missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
assert.False(t, proceed)
assert.True(t, missedExecutionTime.IsZero())

// Run the same test in a different timezone

Expand All @@ -110,9 +111,10 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err = woc.shouldOutstandingWorkflowsBeRun()
missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
assert.True(t, proceed)
// The missedExecutionTime should be the last complete minute mark, which we can get with inferScheduledTime
assert.Equal(t, inferScheduledTime().Unix(), missedExecutionTime.Unix())

// StartingDeadlineSeconds is not after the current second, so cron should not be run
startingDeadlineSeconds = int64(25)
Expand All @@ -121,9 +123,9 @@ func TestRunOutstandingWorkflows(t *testing.T) {
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
}
proceed, err = woc.shouldOutstandingWorkflowsBeRun()
missedExecutionTime, err = woc.shouldOutstandingWorkflowsBeRun()
assert.NoError(t, err)
assert.False(t, proceed)
assert.True(t, missedExecutionTime.IsZero())
}

type fakeLister struct {
Expand Down Expand Up @@ -170,12 +172,13 @@ func TestCronWorkflowConditionSubmissionError(t *testing.T) {
cs := fake.NewSimpleClientset()
testMetrics := metrics.New(metrics.ServerConfig{}, metrics.ServerConfig{})
woc := &cronWfOperationCtx{
wfClientset: cs,
wfClient: cs.ArgoprojV1alpha1().Workflows(""),
cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""),
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: testMetrics,
wfClientset: cs,
wfClient: cs.ArgoprojV1alpha1().Workflows(""),
cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""),
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: testMetrics,
scheduledTimeFunc: inferScheduledTime,
}
woc.Run()

Expand Down

0 comments on commit 50210fc

Please sign in to comment.