diff --git a/cmd/schedule.go b/cmd/schedule.go new file mode 100644 index 0000000..c4e13d2 --- /dev/null +++ b/cmd/schedule.go @@ -0,0 +1,52 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cmd + +import ( + "github.com/aurora-scheduler/australis/internal" + "github.com/spf13/cobra" +) + +func init() { + rootCmd.AddCommand(scheduleCmd) + +} + +var scheduleCmd = &cobra.Command{ + Use: "schedule", + Short: "Schedule a cron job on Aurora scheduler", + Run: scheduleCron, + Args: cobra.ExactArgs(1), +} + +func scheduleCron(cmd *cobra.Command, args []string) { + job, err := internal.UnmarshalJob(args[0]) + if err != nil { + log.Fatalln(err) + } + + if err := job.ValidateCron(); err != nil { + log.Fatal(err) + } + + auroraJob, err := job.ToRealis() + if err != nil { + log.Fatalln(err) + } + + if err := client.ScheduleCronJob(auroraJob); err != nil { + log.Fatal("unable to schedule job: ", err) + } +} diff --git a/docs/australis.md b/docs/australis.md index 29b1784..1374c86 100644 --- a/docs/australis.md +++ b/docs/australis.md @@ -34,6 +34,7 @@ A light-weight command line client for use with Apache Aurora built using goreal * [australis restart](australis_restart.md) - Restart an Aurora Job. * [australis resume](australis_resume.md) - Resume a Job update * [australis rollback](australis_rollback.md) - Rollback an operation such as an Update +* [australis schedule](australis_schedule.md) - Schedule a cron job on Aurora scheduler * [australis set](australis_set.md) - Set a value in the Aurora Scheduler. * [australis start](australis_start.md) - Start a service, maintenance on a host (DRAIN), a snapshot, an update, or a backup. * [australis stop](australis_stop.md) - Stop a service or maintenance on a host (DRAIN). diff --git a/docs/australis_schedule.md b/docs/australis_schedule.md new file mode 100644 index 0000000..0218243 --- /dev/null +++ b/docs/australis_schedule.md @@ -0,0 +1,39 @@ +## australis schedule + +Schedule a cron job on Aurora scheduler + +### Synopsis + +Schedule a cron job on Aurora scheduler + +``` +australis schedule [flags] +``` + +### Options + +``` + -h, --help help for schedule +``` + +### Options inherited from parent commands + +``` + -a, --caCertsPath string Path where CA certificates can be found. + -c, --clientCert string Client certificate to use to connect to Aurora. + -k, --clientKey string Client key to use to connect to Aurora. + --config string Config file to use. (default "/etc/aurora/australis.yml") + -l, --logLevel string Set logging level [panic fatal error warning info debug trace]. (default "info") + -p, --password string Password to use for API authentication + -s, --scheduler_addr string Aurora Scheduler's address. + -i, --skipCertVerification Skip CA certificate hostname verification. + --toJSON Print output in JSON format. + -u, --username string Username to use for API authentication + -z, --zookeeper string Zookeeper node(s) where Aurora stores information. (comma separated list) +``` + +### SEE ALSO + +* [australis](australis.md) - australis is a client for Apache Aurora + +###### Auto generated by spf13/cobra on 7-May-2020 diff --git a/internal/converter.go b/internal/converter.go deleted file mode 100644 index a9204f7..0000000 --- a/internal/converter.go +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package internal - -import ( - "errors" - "fmt" - "strings" - - realis "github.com/aurora-scheduler/gorealis/v2" -) - -func (j *Job) ToRealis() (*realis.AuroraJob, error) { - - auroraJob := realis.NewJob(). - Environment(j.Environment). - Role(j.Role). - Name(j.Name). - CPU(j.CPU). - RAM(j.RAM). - Disk(j.Disk). - IsService(j.Service). - InstanceCount(j.Instances). - MaxFailure(j.MaxFailures) - - // Adding URIs. - for _, uri := range j.URIs { - auroraJob.AddURIs(uri.Extract, uri.Cache, uri.URI) - } - - // Adding Metadata. - for key, value := range j.Metadata { - auroraJob.AddLabel(key, value) - } - - // If thermos jobs processes are provided, use them - if len(j.Thermos) > 0 { - thermosExec := realis.ThermosExecutor{} - for _, process := range j.Thermos { - thermosExec.AddProcess(realis.NewThermosProcess(process.Name, process.Cmd)) - } - auroraJob.ThermosExecutor(thermosExec) - } else if j.Executor.Name != "" { - // Non-Thermos executor - if j.Executor.Name == "" { - return nil, errors.New("no executor name provided") - } - - auroraJob.ExecutorName(j.Executor.Name) - auroraJob.ExecutorData(j.Executor.Data) - } else if j.Container != nil { - if j.Container.Docker == nil { - return nil, errors.New("no container specified") - } - - if j.Container.Docker.Tag != "" && !strings.ContainsRune(j.Container.Docker.Name, ':') { - j.Container.Docker.Name += ":" + j.Container.Docker.Tag - } - auroraJob.Container(realis.NewDockerContainer().Image(j.Container.Docker.Name)) - - } - - return auroraJob, nil -} - -func (u *UpdateJob) ToRealis() (*realis.JobUpdate, error) { - - jobConfig, err := u.JobConfig.ToRealis() - if err != nil { - return nil, fmt.Errorf("invalid job configuration %w", err) - } - - update := realis.JobUpdateFromAuroraTask(jobConfig.AuroraTask()) - - update.MaxPerInstanceFailures(u.UpdateSettings.MaxPerInstanceFailures). - MaxFailedInstances(u.UpdateSettings.MaxFailedInstances). - WatchTime(u.UpdateSettings.MinTimeInRunning). - RollbackOnFail(u.UpdateSettings.RollbackOnFailure). - PulseIntervalTimeout(u.UpdateSettings.PulseTimeout). - SlaAware(u.UpdateSettings.SLAAware). - InstanceCount(u.UpdateSettings.InstanceCount) - - strategy := u.UpdateSettings.Strategy - switch { - case strategy.VariableBatch != nil: - update.VariableBatchStrategy(strategy.VariableBatch.AutoPause, strategy.VariableBatch.GroupSizes...) - case strategy.Batch != nil: - update.BatchUpdateStrategy(strategy.Batch.AutoPause,strategy.Batch.GroupSize) - case strategy.Queue != nil: - update.QueueUpdateStrategy(strategy.Queue.GroupSize) - default: - update.QueueUpdateStrategy(1) - } - - for _,r := range u.UpdateSettings.InstanceRanges { - update.AddInstanceRange(r.First, r.Last) - } - - return update, nil - - -} diff --git a/internal/job.go b/internal/job.go new file mode 100644 index 0000000..dcbf771 --- /dev/null +++ b/internal/job.go @@ -0,0 +1,180 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package internal + +import ( + "errors" + "strings" + + realis "github.com/aurora-scheduler/gorealis/v2" + "github.com/aurora-scheduler/gorealis/v2/gen-go/apache/aurora" +) + +type URI struct { + URI string `yaml:"uri"` + Extract bool `yaml:"extract"` + Cache bool `yaml:"cache"` +} + +type Executor struct { + Name string `yaml:"name"` + Data string `yaml:"data"` +} + +type ThermosProcess struct { + Name string `yaml:"name"` + Cmd string `yaml:"cmd"` +} + +type DockerContainer struct { + Name string `yaml:"name"` + Tag string `yaml:"tag"` +} + +type Container struct { + Docker *DockerContainer `yaml:"docker"` +} + +type Job struct { + Environment string `yaml:"environment"` + Role string `yaml:"role"` + Name string `yaml:"name"` + CPU float64 `yaml:"cpu"` + RAM int64 `yaml:"ram"` + Disk int64 `yaml:"disk"` + Executor Executor `yaml:"executor"` + Instances int32 `yaml:"instances"` + MaxFailures int32 `yaml:"maxFailures"` + URIs []URI `yaml:"uris"` + Metadata map[string]string `yaml:"labels"` + Service bool `yaml:"service"` + Thermos []ThermosProcess `yaml:",flow,omitempty"` + Container *Container `yaml:"container,omitempty"` + CronSchedule *string `yaml:"cronSchedule,omitempty"` + CronCollisionPolicy *string `yaml:"cronCollisionPolicy,omitempty"` +} + +func (j *Job) ToRealis() (*realis.AuroraJob, error) { + auroraJob := realis.NewJob(). + Environment(j.Environment). + Role(j.Role). + Name(j.Name). + CPU(j.CPU). + RAM(j.RAM). + Disk(j.Disk). + IsService(j.Service). + InstanceCount(j.Instances). + MaxFailure(j.MaxFailures) + + if j.CronSchedule != nil { + auroraJob.CronSchedule(*j.CronSchedule) + } + + if j.CronCollisionPolicy != nil { + // Ignoring error because we have already checked for it in the validate function + policy, _ := aurora.CronCollisionPolicyFromString(*j.CronCollisionPolicy) + auroraJob.CronCollisionPolicy(policy) + } + + // Adding URIs. + for _, uri := range j.URIs { + auroraJob.AddURIs(uri.Extract, uri.Cache, uri.URI) + } + + // Adding Metadata. + for key, value := range j.Metadata { + auroraJob.AddLabel(key, value) + } + + // If thermos jobs processes are provided, use them + if len(j.Thermos) > 0 { + thermosExec := realis.ThermosExecutor{} + for _, process := range j.Thermos { + thermosExec.AddProcess(realis.NewThermosProcess(process.Name, process.Cmd)) + } + auroraJob.ThermosExecutor(thermosExec) + } else if j.Executor.Name != "" { + // Non-Thermos executor + if j.Executor.Name == "" { + return nil, errors.New("no executor name provided") + } + + auroraJob.ExecutorName(j.Executor.Name) + auroraJob.ExecutorData(j.Executor.Data) + } else if j.Container != nil { + if j.Container.Docker == nil { + return nil, errors.New("no container specified") + } + + if j.Container.Docker.Tag != "" && !strings.ContainsRune(j.Container.Docker.Name, ':') { + j.Container.Docker.Name += ":" + j.Container.Docker.Tag + } + auroraJob.Container(realis.NewDockerContainer().Image(j.Container.Docker.Name)) + + } + + return auroraJob, nil +} + +func (j *Job) Validate() error { + if j.Name == "" { + return errors.New("job name not specified") + } + + if j.Role == "" { + return errors.New("job role not specified") + } + + if j.Environment == "" { + return errors.New("job environment not specified") + } + + if j.Instances <= 0 { + return errors.New("number of instances in job cannot be less than or equal to 0") + } + + if j.CPU <= 0.0 { + return errors.New("CPU must be greater than 0") + } + + if j.RAM <= 0 { + return errors.New("RAM must be greater than 0") + } + + if j.Disk <= 0 { + return errors.New("disk must be greater than 0") + } + + if len(j.Thermos) == 0 && j.Executor.Name == "" && j.Container == nil { + return errors.New("task does not contain a thermos definition, a custom executor name, or a container to launch") + } + return nil +} +func (j *Job) ValidateCron() error { + if j.CronSchedule == nil { + return errors.New("cron schedule must be set") + } + + if j.CronCollisionPolicy != nil { + if _, err := aurora.CronCollisionPolicyFromString(*j.CronCollisionPolicy); err != nil { + return err + } + } else { + killExisting := aurora.CronCollisionPolicy_KILL_EXISTING.String() + j.CronCollisionPolicy = &killExisting + } + + return nil +} diff --git a/internal/types.go b/internal/types.go deleted file mode 100644 index 16cca88..0000000 --- a/internal/types.go +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package internal - -import ( - "time" -) - -type URI struct { - URI string `yaml:"uri"` - Extract bool `yaml:"extract"` - Cache bool `yaml:"cache"` -} - -type Executor struct { - Name string `yaml:"name"` - Data string `yaml:"data"` -} - -type ThermosProcess struct { - Name string `yaml:"name"` - Cmd string `yaml:"cmd"` -} - -type DockerContainer struct { - Name string `yaml:"name"` - Tag string `yaml:"tag"` -} - -type Container struct { - Docker *DockerContainer `yaml:"docker"` -} - -type Job struct { - Environment string `yaml:"environment"` - Role string `yaml:"role"` - Name string `yaml:"name"` - CPU float64 `yaml:"cpu"` - RAM int64 `yaml:"ram"` - Disk int64 `yaml:"disk"` - Executor Executor `yaml:"executor"` - Instances int32 `yaml:"instances"` - MaxFailures int32 `yaml:"maxFailures"` - URIs []URI `yaml:"uris"` - Metadata map[string]string `yaml:"labels"` - Service bool `yaml:"service"` - Thermos []ThermosProcess `yaml:",flow,omitempty"` - Container *Container `yaml:"container,omitempty"` -} -type InstanceRange struct { - First int32 `yaml:"first"` - Last int32 `yaml:"last"` -} - -type VariableBatchStrategy struct { - GroupSizes []int32 `yaml:"groupSizes"` - AutoPause bool `yaml:"autoPause"` -} - -type BatchStrategy struct { - GroupSize int32 `yaml:"groupSize"` - AutoPause bool `yaml:"autoPause"` -} - -type QueueStrategy struct { - GroupSize int32 `yaml:"groupSize"` -} - -type UpdateStrategy struct { - VariableBatch *VariableBatchStrategy `yaml:"variableBatch"` - Batch *BatchStrategy `yaml:"batch"` - Queue *QueueStrategy `yaml:"queue"` -} -type UpdateSettings struct { - MaxPerInstanceFailures int32 `yaml:"maxPerInstanceFailures"` - MaxFailedInstances int32 `yaml:"maxFailedInstances"` - MinTimeInRunning time.Duration `yaml:"minTimeRunning"` - RollbackOnFailure bool `yaml:"rollbackOnFailure"` - InstanceRanges []InstanceRange `yaml:"instanceRanges"` - InstanceCount int32 `yaml:"instanceCount"` - PulseTimeout time.Duration `yaml:"pulseTimeout"` - SLAAware bool `yaml:"slaAware"` - Strategy UpdateStrategy `yaml:"strategy"` -} - -type UpdateJob struct { - JobConfig Job `yaml:"jobConfig"` - UpdateSettings UpdateSettings `yaml:"updateSettings"` -} diff --git a/internal/updateJob.go b/internal/updateJob.go new file mode 100644 index 0000000..32cafd3 --- /dev/null +++ b/internal/updateJob.go @@ -0,0 +1,128 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package internal + +import ( + "errors" + "fmt" + "time" + + realis "github.com/aurora-scheduler/gorealis/v2" +) + +type InstanceRange struct { + First int32 `yaml:"first"` + Last int32 `yaml:"last"` +} + +type VariableBatchStrategy struct { + GroupSizes []int32 `yaml:"groupSizes"` + AutoPause bool `yaml:"autoPause"` +} + +type BatchStrategy struct { + GroupSize int32 `yaml:"groupSize"` + AutoPause bool `yaml:"autoPause"` +} + +type QueueStrategy struct { + GroupSize int32 `yaml:"groupSize"` +} + +type UpdateStrategy struct { + VariableBatch *VariableBatchStrategy `yaml:"variableBatch"` + Batch *BatchStrategy `yaml:"batch"` + Queue *QueueStrategy `yaml:"queue"` +} +type UpdateSettings struct { + MaxPerInstanceFailures int32 `yaml:"maxPerInstanceFailures"` + MaxFailedInstances int32 `yaml:"maxFailedInstances"` + MinTimeInRunning time.Duration `yaml:"minTimeRunning"` + RollbackOnFailure bool `yaml:"rollbackOnFailure"` + InstanceRanges []InstanceRange `yaml:"instanceRanges"` + InstanceCount int32 `yaml:"instanceCount"` + PulseTimeout time.Duration `yaml:"pulseTimeout"` + SLAAware bool `yaml:"slaAware"` + Strategy UpdateStrategy `yaml:"strategy"` +} + +func (u *UpdateSettings) Validate() error { + if u.InstanceCount <= 0 { + return errors.New("instance count must be larger than 0") + } + + if u.Strategy.VariableBatch != nil { + if len(u.Strategy.VariableBatch.GroupSizes) == 0 { + return errors.New("variable batch strategy must specify at least one batch size") + } + for _, batch := range u.Strategy.VariableBatch.GroupSizes { + if batch <= 0 { + return errors.New("all groups in a variable batch strategy must be larger than 0") + } + } + } else if u.Strategy.Batch != nil { + if u.Strategy.Batch.GroupSize <= 0 { + return errors.New("batch strategy must specify a group larger than 0") + } + } else if u.Strategy.Queue != nil { + if u.Strategy.Queue.GroupSize <= 0 { + return errors.New("queue strategy must specify a group larger than 0") + } + } else { + log.Info("No strategy set, falling back on queue strategy with a group size 1") + } + return nil +} + +type UpdateJob struct { + JobConfig Job `yaml:"jobConfig"` + UpdateSettings UpdateSettings `yaml:"updateSettings"` +} + +func (u *UpdateJob) ToRealis() (*realis.JobUpdate, error) { + jobConfig, err := u.JobConfig.ToRealis() + if err != nil { + return nil, fmt.Errorf("invalid job configuration %w", err) + } + + update := realis.JobUpdateFromAuroraTask(jobConfig.AuroraTask()) + + update.MaxPerInstanceFailures(u.UpdateSettings.MaxPerInstanceFailures). + MaxFailedInstances(u.UpdateSettings.MaxFailedInstances). + WatchTime(u.UpdateSettings.MinTimeInRunning). + RollbackOnFail(u.UpdateSettings.RollbackOnFailure). + PulseIntervalTimeout(u.UpdateSettings.PulseTimeout). + SlaAware(u.UpdateSettings.SLAAware). + InstanceCount(u.UpdateSettings.InstanceCount) + + strategy := u.UpdateSettings.Strategy + switch { + case strategy.VariableBatch != nil: + update.VariableBatchStrategy(strategy.VariableBatch.AutoPause, strategy.VariableBatch.GroupSizes...) + case strategy.Batch != nil: + update.BatchUpdateStrategy(strategy.Batch.AutoPause, strategy.Batch.GroupSize) + case strategy.Queue != nil: + update.QueueUpdateStrategy(strategy.Queue.GroupSize) + default: + update.QueueUpdateStrategy(1) + } + + for _, r := range u.UpdateSettings.InstanceRanges { + update.AddInstanceRange(r.First, r.Last) + } + + return update, nil + +} diff --git a/internal/util.go b/internal/util.go index aad41a7..cc81cdb 100644 --- a/internal/util.go +++ b/internal/util.go @@ -118,41 +118,6 @@ func UnmarshalJob(filename string) (Job, error) { return job, nil } -func (j *Job) Validate() error { - if j.Name == "" { - return errors.New("job name not specified") - } - - if j.Role == "" { - return errors.New("job role not specified") - } - - if j.Environment == "" { - return errors.New("job environment not specified") - } - - if j.Instances <= 0 { - return errors.New("number of instances in job cannot be less than or equal to 0") - } - - if j.CPU <= 0.0 { - return errors.New("CPU must be greater than 0") - } - - if j.RAM <= 0 { - return errors.New("RAM must be greater than 0") - } - - if j.Disk <= 0 { - return errors.New("Disk must be greater than 0") - } - - if len(j.Thermos) == 0 && j.Executor.Name == "" && j.Container == nil { - return errors.New("task does not contain a thermos definition, a custom executor name, or a container to launch") - } - return nil -} - func UnmarshalUpdate(filename string) (UpdateJob, error) { updateJob := UpdateJob{} @@ -174,31 +139,3 @@ func UnmarshalUpdate(filename string) (UpdateJob, error) { return updateJob, nil } - -func (u *UpdateSettings) Validate() error { - if u.InstanceCount <= 0 { - return errors.New("instance count must be larger than 0") - } - - if u.Strategy.VariableBatch != nil { - if len(u.Strategy.VariableBatch.GroupSizes) == 0 { - return errors.New("variable batch strategy must specify at least one batch size") - } - for _, batch := range u.Strategy.VariableBatch.GroupSizes { - if batch <= 0 { - return errors.New("all groups in a variable batch strategy must be larger than 0") - } - } - } else if u.Strategy.Batch != nil { - if u.Strategy.Batch.GroupSize <= 0 { - return errors.New("batch strategy must specify a group larger than 0") - } - } else if u.Strategy.Queue != nil { - if u.Strategy.Queue.GroupSize <= 0 { - return errors.New("queue strategy must specify a group larger than 0") - } - } else { - log.Info("No strategy set, falling back on queue strategy with a group size 1") - } - return nil -} diff --git a/test/hello_world_cron.yaml b/test/hello_world_cron.yaml new file mode 100644 index 0000000..ff43c08 --- /dev/null +++ b/test/hello_world_cron.yaml @@ -0,0 +1,28 @@ +--- +environment: "prod" +role: "vagrant" +name: "hello_world" +cpu: 0.09 +ram: 64 +disk: 128 +instances: 1 +cronSchedule: "*/1 * * * *" +cronCollisionPolicy: "CANCEL_NEW" +thermos: + - name: "bootstrap" + cmd: "echo bootstrapping" + - name: "hello_gorealis" + cmd: "echo hello world from gorealis; sleep 10;" +updateSettings: + maxPerInstanceFailures: 1 + maxFailedInstances: 1 + minTimeInRunning: 1m + rollbackOnFailure: true + instanceRanges: + - start: 1 + end: 4 + blockIfNoPulseAfter: 1m + slaAware: false + strategy: + name: Batch + groupSize: 2 \ No newline at end of file