Skip to content

Commit

Permalink
feat: allow multiple schedules in a cron workflow (#12616)
Browse files Browse the repository at this point in the history
Signed-off-by: eduardodbr <eduardodbr@hotmail.com>
  • Loading branch information
eduardodbr committed Feb 19, 2024
1 parent d7db55c commit 986b069
Show file tree
Hide file tree
Showing 30 changed files with 1,380 additions and 764 deletions.
7 changes: 7 additions & 0 deletions api/jsonschema/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/argo/commands/cron/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func getCronWorkflowGet(cwf *wfv1.CronWorkflow) string {
out += fmt.Sprintf(fmtStr, "Name:", cwf.ObjectMeta.Name)
out += fmt.Sprintf(fmtStr, "Namespace:", cwf.ObjectMeta.Namespace)
out += fmt.Sprintf(fmtStr, "Created:", humanize.Timestamp(cwf.ObjectMeta.CreationTimestamp.Time))
out += fmt.Sprintf(fmtStr, "Schedule:", cwf.Spec.Schedule)
out += fmt.Sprintf(fmtStr, "Schedule:", cwf.Spec.GetScheduleString())
out += fmt.Sprintf(fmtStr, "Suspended:", cwf.Spec.Suspend)
if cwf.Spec.Timezone != "" {
out += fmt.Sprintf(fmtStr, "Timezone:", cwf.Spec.Timezone)
Expand Down
33 changes: 33 additions & 0 deletions cmd/argo/commands/cron/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,36 @@ func TestNextRuntime(t *testing.T) {
assert.Greater(t, next.Unix(), time.Now().Unix())
}
}

var cronMultipleSchedules = `
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
creationTimestamp: "2020-05-19T16:47:25Z"
generation: 98
name: wonderful-tiger
namespace: argo
resourceVersion: "465179"
selfLink: /apis/argoproj.io/v1alpha1/namespaces/argo/cronworkflows/wonderful-tiger
uid: c4ea2e84-ec58-4638-bf1d-5d543e7cc86a
spec:
schedules:
- '* * * * *'
- '*/2 * * * *'
workflowSpec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: argoproj/argosay:v2
command: [/argosay]
`

func TestNextRuntimeWithMultipleSchedules(t *testing.T) {
var cronWf = v1alpha1.MustUnmarshalCronWorkflow(cronMultipleSchedules)
next, err := GetNextRuntime(cronWf)
if assert.NoError(t, err) {
assert.LessOrEqual(t, next.Unix(), time.Now().Add(1*time.Minute).Unix())
assert.Greater(t, next.Unix(), time.Now().Unix())
}
}
2 changes: 1 addition & 1 deletion cmd/argo/commands/cron/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func printTable(wfList []wfv1.CronWorkflow, listArgs *listFlags) {
} else {
cleanNextScheduledTime = "N/A"
}
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%t", cwf.ObjectMeta.Name, humanize.RelativeDurationShort(cwf.ObjectMeta.CreationTimestamp.Time, time.Now()), cleanLastScheduledTime, cleanNextScheduledTime, cwf.Spec.Schedule, cwf.Spec.Timezone, cwf.Spec.Suspend)
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%t", cwf.ObjectMeta.Name, humanize.RelativeDurationShort(cwf.ObjectMeta.CreationTimestamp.Time, time.Now()), cleanLastScheduledTime, cleanNextScheduledTime, cwf.Spec.GetScheduleString(), cwf.Spec.Timezone, cwf.Spec.Suspend)
_, _ = fmt.Fprintf(w, "\n")
}
_ = w.Flush()
Expand Down
17 changes: 13 additions & 4 deletions cmd/argo/commands/cron/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,18 @@ import (
// GetNextRuntime returns the next time the workflow should run in local time. It assumes the workflow-controller is in
// UTC, but nevertheless returns the time in the local timezone.
func GetNextRuntime(cwf *v1alpha1.CronWorkflow) (time.Time, error) {
cronSchedule, err := cron.ParseStandard(cwf.Spec.GetScheduleString())
if err != nil {
return time.Time{}, err
var nextRunTime time.Time
now := time.Now().UTC()
for _, schedule := range cwf.Spec.GetSchedulesWithTimezone() {
cronSchedule, err := cron.ParseStandard(schedule)
if err != nil {
return time.Time{}, err
}
next := cronSchedule.Next(now).Local()
if nextRunTime.IsZero() || next.Before(nextRunTime) {
nextRunTime = next
}
}
return cronSchedule.Next(time.Now().UTC()).Local(), nil

return nextRunTime, nil
}
1 change: 1 addition & 0 deletions docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,7 @@ CronWorkflowSpec is the specification of a CronWorkflow
|`concurrencyPolicy`|`string`|ConcurrencyPolicy is the K8s-style concurrency policy that will be used|
|`failedJobsHistoryLimit`|`integer`|FailedJobsHistoryLimit is the number of failed jobs to be kept at a time|
|`schedule`|`string`|Schedule is a schedule to run the Workflow in Cron format|
|`schedules`|`Array< string >`|Schedules is a list of schedules to run the Workflow in Cron format|
|`startingDeadlineSeconds`|`integer`|StartingDeadlineSeconds is the K8s-style deadline that will limit the time a CronWorkflow will be run after its original scheduled time if it is missed.|
|`stopStrategy`|[`StopStrategy`](#stopstrategy)|StopStrategy defines if the cron workflow will stop being triggered once a certain condition has been reached, involving a number of runs of the workflow|
|`successfulJobsHistoryLimit`|`integer`|SuccessfulJobsHistoryLimit is the number of successful jobs to be kept at a time|
Expand Down
4 changes: 4 additions & 0 deletions manifests/base/crds/full/argoproj.io_cronworkflows.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/api-rules/violation_exceptions.list
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,ContainerNode,Dependencies
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,ContainerSetTemplate,Containers
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,ContainerSetTemplate,VolumeMounts
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,CronWorkflowSpec,Schedules
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,CronWorkflowStatus,Active
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,DAGTask,Dependencies
API rule violation: list_type_missing,github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1,DAGTask,WithItems
Expand Down
70 changes: 69 additions & 1 deletion pkg/apis/workflow/v1alpha1/cron_workflow_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package v1alpha1

import (
"strings"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -61,6 +63,8 @@ type CronWorkflowSpec struct {
WorkflowMetadata *metav1.ObjectMeta `json:"workflowMetadata,omitempty" protobuf:"bytes,9,opt,name=workflowMeta"`
// StopStrategy defines if the cron workflow will stop being triggered once a certain condition has been reached, involving a number of runs of the workflow
StopStrategy *StopStrategy `json:"stopStrategy,omitempty" protobuf:"bytes,10,opt,name=stopStrategy"`
// Schedules is a list of schedules to run the Workflow in Cron format
Schedules []string `json:"schedules,omitempty" protobuf:"bytes,11,opt,name=schedules"`
}

// StopStrategy defines if the cron workflow will stop being triggered once a certain condition has been reached, involving a number of runs of the workflow
Expand Down Expand Up @@ -107,12 +111,76 @@ func (c *CronWorkflow) SetSchedule(schedule string) {
c.Annotations[annotationKeyLatestSchedule] = schedule
}

func (c *CronWorkflow) SetSchedules(schedules []string) {
if c.Annotations == nil {
c.Annotations = map[string]string{}
}
var scheduleString strings.Builder
for i, schedule := range schedules {
scheduleString.WriteString(schedule)
if i != len(schedules)-1 {
scheduleString.WriteString(",")
}
}
c.Annotations[annotationKeyLatestSchedule] = scheduleString.String()
}

func (c *CronWorkflow) GetLatestSchedule() string {
return c.Annotations[annotationKeyLatestSchedule]
}

// GetScheduleString returns the schedule expression with timezone, if available. If multiple
// expressions are configured it returns a comma separated list of cron expressions
func (c *CronWorkflowSpec) GetScheduleString() string {
scheduleString := c.Schedule
var scheduleString string
if c.Schedule != "" {
scheduleString = c.withTimezone(c.Schedule)
} else {
var sb strings.Builder
for i, schedule := range c.Schedules {
sb.WriteString(c.withTimezone(schedule))
if i != len(c.Schedules)-1 {
sb.WriteString(",")
}
}
scheduleString = sb.String()
}
return scheduleString
}

// GetSchedulesWithTimezone returns all schedules configured for the CronWorkflow with a timezone. It handles
// both Spec.Schedules and Spec.Schedule for backwards compatibility
func (c *CronWorkflowSpec) GetSchedulesWithTimezone() []string {
return c.getSchedules(true)
}

// GetSchedules returns all schedules configured for the CronWorkflow. It handles both Spec.Schedules
// and Spec.Schedule for backwards compatibility
func (c *CronWorkflowSpec) GetSchedules() []string {
return c.getSchedules(false)
}

func (c *CronWorkflowSpec) getSchedules(withTimezone bool) []string {
var schedules []string
if c.Schedule != "" {
schedule := c.Schedule
if withTimezone {
schedule = c.withTimezone(c.Schedule)
}
schedules = append(schedules, schedule)
} else {
schedules = make([]string, len(c.Schedules))
for i, schedule := range c.Schedules {
if withTimezone {
schedule = c.withTimezone(schedule)
}
schedules[i] = c.withTimezone(schedule)
}
}
return schedules
}

func (c *CronWorkflowSpec) withTimezone(scheduleString string) string {
if c.Timezone != "" {
scheduleString = "CRON_TZ=" + c.Timezone + " " + scheduleString
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/workflow/v1alpha1/cron_workflow_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ func TestCronWorkflowSpec_GetScheduleString(t *testing.T) {

cwfSpec.Timezone = "America/Los_Angeles"
assert.Equal(t, "CRON_TZ=America/Los_Angeles * * * * *", cwfSpec.GetScheduleString())
cwfSpec = CronWorkflowSpec{
Timezone: "",
Schedules: []string{"* * * * *", "0 * * * *"},
}
assert.Equal(t, "* * * * *,0 * * * *", cwfSpec.GetScheduleString())

cwfSpec.Timezone = "America/Los_Angeles"
assert.Equal(t, "CRON_TZ=America/Los_Angeles * * * * *,CRON_TZ=America/Los_Angeles 0 * * * *", cwfSpec.GetScheduleString())
}

0 comments on commit 986b069

Please sign in to comment.