diff --git a/docs/docs/100-reference/01-command-line/acorn.md b/docs/docs/100-reference/01-command-line/acorn.md index c2c83ba33..35993fff7 100644 --- a/docs/docs/100-reference/01-command-line/acorn.md +++ b/docs/docs/100-reference/01-command-line/acorn.md @@ -38,6 +38,7 @@ acorn [flags] * [acorn image](acorn_image.md) - Manage images * [acorn info](acorn_info.md) - Info about acorn installation * [acorn install](acorn_install.md) - Install and configure acorn in the cluster +* [acorn job](acorn_job.md) - Manage jobs * [acorn login](acorn_login.md) - Add registry credentials * [acorn logout](acorn_logout.md) - Remove registry credentials * [acorn logs](acorn_logs.md) - Log all workloads from an app diff --git a/docs/docs/100-reference/01-command-line/acorn_job.md b/docs/docs/100-reference/01-command-line/acorn_job.md new file mode 100644 index 000000000..76257f323 --- /dev/null +++ b/docs/docs/100-reference/01-command-line/acorn_job.md @@ -0,0 +1,40 @@ +--- +title: "acorn job" +--- +## acorn job + +Manage jobs + +``` +acorn job [flags] [ACORN_NAME|JOB_NAME...] +``` + +### Examples + +``` + +acorn jobs +``` + +### Options + +``` + -h, --help help for job + -o, --output string Output format (json, yaml, {{gotemplate}}) + -q, --quiet Output only names +``` + +### Options inherited from parent commands + +``` + --debug Enable debug logging + --debug-level int Debug log level (valid 0-9) (default 7) + --kubeconfig string Explicitly use kubeconfig file, overriding the default context + -j, --project string Project to work in +``` + +### SEE ALSO + +* [acorn](acorn.md) - +* [acorn job restart](acorn_job_restart.md) - Restart a job + diff --git a/docs/docs/100-reference/01-command-line/acorn_job_restart.md b/docs/docs/100-reference/01-command-line/acorn_job_restart.md new file mode 100644 index 000000000..777919a3d --- /dev/null +++ b/docs/docs/100-reference/01-command-line/acorn_job_restart.md @@ -0,0 +1,39 @@ +--- +title: "acorn job restart" +--- +## acorn job restart + +Restart a job + +``` +acorn job restart [JOB_NAME...] [flags] +``` + +### Examples + +``` + +acorn job restart app-name.job-name +``` + +### Options + +``` + -h, --help help for restart +``` + +### Options inherited from parent commands + +``` + --debug Enable debug logging + --debug-level int Debug log level (valid 0-9) (default 7) + --kubeconfig string Explicitly use kubeconfig file, overriding the default context + -o, --output string Output format (json, yaml, {{gotemplate}}) + -j, --project string Project to work in + -q, --quiet Output only names +``` + +### SEE ALSO + +* [acorn job](acorn_job.md) - Manage jobs + diff --git a/go.mod b/go.mod index 118f95f22..3c702280d 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/pterm/pterm v0.12.49 github.com/rancher/wrangler v1.0.2 + github.com/robfig/cron/v3 v3.0.1 github.com/sigstore/cosign/v2 v2.0.2 github.com/sigstore/sigstore v1.6.4 github.com/sirupsen/logrus v1.9.2 diff --git a/go.sum b/go.sum index 3a0e74665..d6df20fad 100644 --- a/go.sum +++ b/go.sum @@ -772,6 +772,8 @@ github.com/rancher/lasso v0.0.0-20221227210133-6ea88ca2fbcc h1:29VHrInLV4qSevvcv github.com/rancher/lasso v0.0.0-20221227210133-6ea88ca2fbcc/go.mod h1:dEfC9eFQigj95lv/JQ8K5e7+qQCacWs1aIA6nLxKzT8= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/integration/client/client.go b/integration/client/client.go index 50a5d3fef..61e514a7e 100644 --- a/integration/client/client.go +++ b/integration/client/client.go @@ -5,6 +5,7 @@ import ( "github.com/acorn-io/runtime/integration/helper" "github.com/acorn-io/runtime/pkg/client" + "github.com/stretchr/testify/require" ) func NewImageWithSidecar(t *testing.T, namespace string) string { @@ -20,6 +21,18 @@ func NewImageWithSidecar(t *testing.T, namespace string) string { return image.ID } +func NewImageWithJobs(t *testing.T, namespace string) string { + t.Helper() + + c := helper.BuilderClient(t, namespace) + image, err := c.AcornImageBuild(helper.GetCTX(t), "../testdata/job/Acornfile", &client.AcornImageBuildOptions{ + Cwd: "../testdata/job", + }) + require.NoError(t, err) + + return image.ID +} + func NewImage2(t *testing.T, namespace string) string { t.Helper() diff --git a/integration/client/jobs/jobs_test.go b/integration/client/jobs/jobs_test.go new file mode 100644 index 000000000..f68781e15 --- /dev/null +++ b/integration/client/jobs/jobs_test.go @@ -0,0 +1,141 @@ +package containers + +import ( + "strings" + "testing" + + client2 "github.com/acorn-io/runtime/integration/client" + "github.com/acorn-io/runtime/integration/helper" + apiv1 "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1" + "github.com/acorn-io/runtime/pkg/client" + kclient "github.com/acorn-io/runtime/pkg/k8sclient" + "github.com/acorn-io/runtime/pkg/publicname" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestJobList(t *testing.T) { + helper.StartController(t) + restConfig := helper.StartAPI(t) + + ctx := helper.GetCTX(t) + lclient, err := kclient.New(restConfig) + require.NoError(t, err) + + kclient := helper.MustReturn(kclient.Default) + project := helper.TempProject(t, kclient) + + c, err := client.New(restConfig, "", project.Name) + require.NoError(t, err) + + imageID := client2.NewImageWithJobs(t, project.Name) + app, err := c.AppRun(ctx, imageID, nil) + require.NoError(t, err) + + app = helper.WaitForObject(t, lclient.Watch, &apiv1.AppList{}, app, func(app *apiv1.App) bool { + return app.Status.AppStatus.Jobs["job"].CompletionTime != nil + }) + + jobs, err := c.JobList(ctx, nil) + require.NoError(t, err) + + require.Len(t, jobs, 2) + for _, job := range jobs { + assert.Truef(t, strings.HasPrefix(job.Name, app.Name+"."), "not prefix %s %s", job.Name, app.Name) + assert.Equal(t, app.Namespace, job.Namespace) + } +} + +func TestJobGet(t *testing.T) { + helper.StartController(t) + restConfig := helper.StartAPI(t) + + ctx := helper.GetCTX(t) + lclient, err := kclient.New(restConfig) + require.NoError(t, err) + + kclient := helper.MustReturn(kclient.Default) + project := helper.TempProject(t, kclient) + + c, err := client.New(restConfig, "", project.Name) + require.NoError(t, err) + + imageID := client2.NewImageWithJobs(t, project.Name) + app, err := c.AppRun(ctx, imageID, nil) + require.NoError(t, err) + + helper.WaitForObject(t, lclient.Watch, &apiv1.AppList{}, app, func(app *apiv1.App) bool { + return app.Status.AppStatus.Jobs["job"].CompletionTime != nil + }) + + jobs, err := c.JobList(ctx, nil) + require.NoError(t, err) + + // Determine which job is the cronjob and which is the job + require.Len(t, jobs, 2) + jobFromList, cronjobFromList := jobs[0], jobs[1] + if cronjobFromList.Spec.Schedule == "" { + jobFromList = jobs[1] + cronjobFromList = jobs[0] + } + + // Check that the job without a schedule is correct + job, err := c.JobGet(ctx, jobFromList.Name) + require.NoError(t, err) + + assert.Nil(t, jobFromList.Status.NextRun) + assert.Equal(t, jobFromList.Name, job.Name) + assert.Equal(t, jobFromList.Namespace, job.Namespace) + assert.Equal(t, jobFromList.UID, job.UID) + + // Check that the cronjob is correct + cronjob, err := c.JobGet(ctx, cronjobFromList.Name) + require.NoError(t, err) + + assert.Equal(t, cronjobFromList.Name, cronjob.Name) + assert.Equal(t, cronjobFromList.Namespace, cronjob.Namespace) + assert.Equal(t, cronjobFromList.UID, cronjob.UID) +} + +func TestJobRestart(t *testing.T) { + helper.StartController(t) + restConfig := helper.StartAPI(t) + + ctx := helper.GetCTX(t) + lclient, err := kclient.New(restConfig) + require.NoError(t, err) + + kclient := helper.MustReturn(kclient.Default) + project := helper.TempProject(t, kclient) + + c, err := client.New(restConfig, "", project.Name) + require.NoError(t, err) + + imageID := client2.NewImageWithJobs(t, project.Name) + app, err := c.AppRun(ctx, imageID, nil) + require.NoError(t, err) + + // Wait for the Job to initially complete + var firstCompletion *metav1.Time + helper.WaitForObject(t, lclient.Watch, &apiv1.AppList{}, app, func(app *apiv1.App) bool { + firstCompletion = app.Status.AppStatus.Jobs["job"].CompletionTime + return app.Status.Namespace != "" && app.Status.AppStatus.Jobs["job"].CompletionTime != nil + }) + + require.NoError(t, c.JobRestart(ctx, publicname.ForChild(app, "job"))) + + // Wait for the Job to complete again by checking for a difference in the completion time + helper.WaitForObject(t, lclient.Watch, &apiv1.AppList{}, app, func(app *apiv1.App) bool { + secondCompletion := app.Status.AppStatus.Jobs["job"].CompletionTime + return app.Status.Namespace != "" && !firstCompletion.Equal(secondCompletion) + }) + + require.NoError(t, c.JobRestart(ctx, publicname.ForChild(app, "cronjob"))) + + // Wait for the CronJob to complete once, which means it has been restarted since the job + // is scheduled to never run + helper.WaitForObject(t, lclient.Watch, &apiv1.AppList{}, app, func(app *apiv1.App) bool { + return app.Status.AppStatus.Jobs["cronjob"].LastRun != nil + }) +} diff --git a/integration/client/testdata/job/Acornfile b/integration/client/testdata/job/Acornfile new file mode 100644 index 000000000..a0f7e1c8b --- /dev/null +++ b/integration/client/testdata/job/Acornfile @@ -0,0 +1,9 @@ +jobs: { + job: { + image:"ghcr.io/acorn-io/images-mirror/alpine:latest" + } + cronjob: { + image:"ghcr.io/acorn-io/images-mirror/alpine:latest" + schedule: "0 0 31 2 *" // February 31st, never runs + } +} \ No newline at end of file diff --git a/pkg/apis/api.acorn.io/v1/scheme.go b/pkg/apis/api.acorn.io/v1/scheme.go index 6e48ab4c7..e3928b452 100644 --- a/pkg/apis/api.acorn.io/v1/scheme.go +++ b/pkg/apis/api.acorn.io/v1/scheme.go @@ -53,6 +53,9 @@ func AddToSchemeWithGV(scheme *runtime.Scheme, schemeGroupVersion schema.GroupVe &ContainerReplicaList{}, &ContainerReplicaExecOptions{}, &ContainerReplicaPortForwardOptions{}, + &Job{}, + &JobRestart{}, + &JobList{}, &Secret{}, &SecretList{}, &Service{}, diff --git a/pkg/apis/api.acorn.io/v1/types.go b/pkg/apis/api.acorn.io/v1/types.go index 643d4fe47..5b156b12f 100644 --- a/pkg/apis/api.acorn.io/v1/types.go +++ b/pkg/apis/api.acorn.io/v1/types.go @@ -96,6 +96,43 @@ type ContainerReplicaStatus struct { Started *bool `json:"started,omitempty"` } +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type Job struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + + Spec JobSpec `json:"spec,omitempty"` + Status v1.JobStatus `json:"status,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type JobList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []Job `json:"items"` +} + +type JobSpec struct { + AppName string `json:"appName,omitempty"` + JobName string `json:"jobName,omitempty"` + Schedule string `json:"schedule,omitempty"` +} + +type JobColumns struct { + State string `json:"state,omitempty"` + App string `json:"app,omitempty"` + NextRun *metav1.Time `json:"nextRun,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type JobRestart struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` +} + // EnsureRegion checks or sets the region of a ContainerReplica. // If a ContainerReplica's region is unset, EnsureRegion sets it to the given region and returns true. // Otherwise, it returns true if and only if the ContainerReplica belongs to the given region. diff --git a/pkg/apis/api.acorn.io/v1/zz_generated.deepcopy.go b/pkg/apis/api.acorn.io/v1/zz_generated.deepcopy.go index 0edfd3fdd..a334f88eb 100644 --- a/pkg/apis/api.acorn.io/v1/zz_generated.deepcopy.go +++ b/pkg/apis/api.acorn.io/v1/zz_generated.deepcopy.go @@ -1631,6 +1631,124 @@ func (in *InfoSpec) DeepCopy() *InfoSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Job) DeepCopyInto(out *Job) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Job. +func (in *Job) DeepCopy() *Job { + if in == nil { + return nil + } + out := new(Job) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Job) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JobColumns) DeepCopyInto(out *JobColumns) { + *out = *in + if in.NextRun != nil { + in, out := &in.NextRun, &out.NextRun + *out = (*in).DeepCopy() + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobColumns. +func (in *JobColumns) DeepCopy() *JobColumns { + if in == nil { + return nil + } + out := new(JobColumns) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JobList) DeepCopyInto(out *JobList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Job, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobList. +func (in *JobList) DeepCopy() *JobList { + if in == nil { + return nil + } + out := new(JobList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *JobList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JobRestart) DeepCopyInto(out *JobRestart) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobRestart. +func (in *JobRestart) DeepCopy() *JobRestart { + if in == nil { + return nil + } + out := new(JobRestart) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *JobRestart) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JobSpec) DeepCopyInto(out *JobSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobSpec. +func (in *JobSpec) DeepCopy() *JobSpec { + if in == nil { + return nil + } + out := new(JobSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LogMessage) DeepCopyInto(out *LogMessage) { *out = *in diff --git a/pkg/apis/internal.acorn.io/v1/appstatus.go b/pkg/apis/internal.acorn.io/v1/appstatus.go index a285117b5..2f39395e5 100644 --- a/pkg/apis/internal.acorn.io/v1/appstatus.go +++ b/pkg/apis/internal.acorn.io/v1/appstatus.go @@ -2,6 +2,8 @@ package v1 import ( "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type AppStatus struct { @@ -145,6 +147,14 @@ func (in ContainerStatus) GetCommonStatus() CommonStatus { type JobStatus struct { CommonStatus `json:",inline"` + Schedule string `json:"schedule,omitempty"` + JobName string `json:"name,omitempty"` + JobNamespace string `json:"namespace,omitempty"` + CreationTime *metav1.Time `json:"creationTime,omitempty"` + StartTime *metav1.Time `json:"startTime,omitempty"` + CompletionTime *metav1.Time `json:"completionTime,omitempty"` + LastRun *metav1.Time `json:"lastRun,omitempty"` + NextRun *metav1.Time `json:"nextRun,omitempty"` RunningCount int `json:"runningCount,omitempty"` ErrorCount int `json:"errorCount,omitempty"` CreateEventSucceeded bool `json:"createEventSucceeded,omitempty"` diff --git a/pkg/apis/internal.acorn.io/v1/zz_generated.deepcopy.go b/pkg/apis/internal.acorn.io/v1/zz_generated.deepcopy.go index a32c78db5..2fe6855ce 100644 --- a/pkg/apis/internal.acorn.io/v1/zz_generated.deepcopy.go +++ b/pkg/apis/internal.acorn.io/v1/zz_generated.deepcopy.go @@ -2001,6 +2001,26 @@ func (in *ImagesData) DeepCopy() *ImagesData { func (in *JobStatus) DeepCopyInto(out *JobStatus) { *out = *in in.CommonStatus.DeepCopyInto(&out.CommonStatus) + if in.CreationTime != nil { + in, out := &in.CreationTime, &out.CreationTime + *out = (*in).DeepCopy() + } + if in.StartTime != nil { + in, out := &in.StartTime, &out.StartTime + *out = (*in).DeepCopy() + } + if in.CompletionTime != nil { + in, out := &in.CompletionTime, &out.CompletionTime + *out = (*in).DeepCopy() + } + if in.LastRun != nil { + in, out := &in.LastRun, &out.LastRun + *out = (*in).DeepCopy() + } + if in.NextRun != nil { + in, out := &in.NextRun, &out.NextRun + *out = (*in).DeepCopy() + } if in.Dependencies != nil { in, out := &in.Dependencies, &out.Dependencies *out = make(map[string]DependencyStatus, len(*in)) diff --git a/pkg/cli/acorn.go b/pkg/cli/acorn.go index 1ffb76f19..285e34d04 100644 --- a/pkg/cli/acorn.go +++ b/pkg/cli/acorn.go @@ -39,6 +39,7 @@ func New() *cobra.Command { NewBuildServer(cmdContext), NewCheck(cmdContext), NewContainer(cmdContext), + NewJob(cmdContext), NewController(cmdContext), NewCredential(cmdContext), NewDev(cmdContext), diff --git a/pkg/cli/builder/table/funcs.go b/pkg/cli/builder/table/funcs.go index 541ede2a1..e6e39e909 100644 --- a/pkg/cli/builder/table/funcs.go +++ b/pkg/cli/builder/table/funcs.go @@ -21,6 +21,9 @@ import ( var ( FuncMap = map[string]any{ "ago": FormatCreated, + "until": FormatUntil, + "lastRun": FormatLastRun, + "nextRun": FormatNextRun, "json": FormatJSON, "jsoncompact": FormatJSONCompact, "yaml": FormatYAML, @@ -114,6 +117,24 @@ func FormatCreated(data metav1.Time) string { return duration.HumanDuration(time.Now().UTC().Sub(data.Time)) + " ago" } +func FormatUntil(data metav1.Time) string { + return duration.HumanDuration(time.Until(data.Time.UTC())) + " from now" +} + +func FormatNextRun(data *metav1.Time) string { + if data == nil { + return "N/A" + } + return FormatUntil(*data) +} + +func FormatLastRun(data *metav1.Time) string { + if data == nil { + return "N/A" + } + return FormatCreated(*data) +} + func FormatJSON(data any) (string, error) { bytes, err := json.MarshalIndent(cleanFields(data), "", " ") return string(bytes) + "\n", err diff --git a/pkg/cli/completion.go b/pkg/cli/completion.go index 34985a6a2..218e2dd07 100644 --- a/pkg/cli/completion.go +++ b/pkg/cli/completion.go @@ -127,6 +127,22 @@ func containersCompletion(ctx context.Context, c client.Client, toComplete strin return result, nil } +func jobsCompletion(ctx context.Context, c client.Client, toComplete string) ([]string, error) { + var result []string + jobs, err := c.JobList(ctx, nil) + if err != nil { + return nil, err + } + + for _, job := range jobs { + if strings.HasPrefix(job.Name, toComplete) { + result = append(result, job.Name) + } + } + + return result, nil +} + // acornContainerCompletion will complete the `-c` flag for various commands like exec. It must look at all apps and // then for all containers on status.appSpec.Containers to produce a list of possibilities. func acornContainerCompletion(ctx context.Context, c client.Client, toComplete string) ([]string, error) { diff --git a/pkg/cli/jobs.go b/pkg/cli/jobs.go new file mode 100644 index 000000000..843000d35 --- /dev/null +++ b/pkg/cli/jobs.go @@ -0,0 +1,65 @@ +package cli + +import ( + cli "github.com/acorn-io/runtime/pkg/cli/builder" + "github.com/acorn-io/runtime/pkg/cli/builder/table" + "github.com/acorn-io/runtime/pkg/publicname" + "github.com/acorn-io/runtime/pkg/tables" + "github.com/spf13/cobra" +) + +func NewJob(c CommandContext) *cobra.Command { + cmd := cli.Command(&Job{client: c.ClientFactory}, cobra.Command{ + Use: "job [flags] [ACORN_NAME|JOB_NAME...]", + Aliases: []string{"jobs"}, + Example: ` +acorn jobs`, + SilenceUsage: true, + Short: "Manage jobs", + ValidArgsFunction: newCompletion(c.ClientFactory, jobsCompletion).complete, + }) + cmd.AddCommand(NewJobRestart(c)) + return cmd +} + +type Job struct { + Quiet bool `usage:"Output only names" short:"q"` + Output string `usage:"Output format (json, yaml, {{gotemplate}})" short:"o"` + client ClientFactory +} + +func (a *Job) Run(cmd *cobra.Command, args []string) error { + c, err := a.client.CreateDefault() + if err != nil { + return err + } + + out := table.NewWriter(tables.Job, a.Quiet, a.Output) + + jobs, err := c.JobList(cmd.Context(), nil) + if err != nil { + return err + } + + // Build a map of args to use instead of a slice for faster lookups. + argsMap := map[string]bool{} + for _, arg := range args { + argsMap[arg] = true + } + + printed := map[string]bool{} + for _, job := range jobs { + appName, _ := publicname.Split(job.Name) + + // If args were passed and this job doesn't match any args or has already been printed, skip it. + matchesArg := argsMap[appName] || argsMap[job.Name] + if len(args) != 0 && (!matchesArg || printed[job.Name]) { + continue + } + + printed[job.Name] = true + out.Write(&job) + } + + return out.Err() +} diff --git a/pkg/cli/jobs_restart.go b/pkg/cli/jobs_restart.go new file mode 100644 index 000000000..2840d55e5 --- /dev/null +++ b/pkg/cli/jobs_restart.go @@ -0,0 +1,42 @@ +package cli + +import ( + "fmt" + + cli "github.com/acorn-io/runtime/pkg/cli/builder" + "github.com/spf13/cobra" +) + +func NewJobRestart(c CommandContext) *cobra.Command { + cd := &JobRestart{client: c.ClientFactory} + cmd := cli.Command(cd, cobra.Command{ + Use: "restart [JOB_NAME...]", + Example: ` +acorn job restart app-name.job-name`, + SilenceUsage: true, + Short: "Restart a job", + Aliases: []string{"rs"}, + ValidArgsFunction: newCompletion(c.ClientFactory, jobsCompletion).complete, + }) + return cmd +} + +type JobRestart struct { + client ClientFactory +} + +func (a *JobRestart) Run(cmd *cobra.Command, args []string) error { + c, err := a.client.CreateDefault() + if err != nil { + return err + } + + for _, job := range args { + err := c.JobRestart(cmd.Context(), job) + if err != nil { + return fmt.Errorf("restarting %s: %w", job, err) + } + } + + return nil +} diff --git a/pkg/cli/testdata/MockClient.go b/pkg/cli/testdata/MockClient.go index 5e7629e9c..26025fdb0 100644 --- a/pkg/cli/testdata/MockClient.go +++ b/pkg/cli/testdata/MockClient.go @@ -39,6 +39,8 @@ type MockClientFactory struct { AppItem *apiv1.App ContainerList []apiv1.ContainerReplica ContainerItem *apiv1.ContainerReplica + JobList []apiv1.Job + JobItem *apiv1.Job CredentialList []apiv1.Credential CredentialItem *apiv1.Credential VolumeList []apiv1.Volume @@ -67,6 +69,7 @@ func (dc *MockClientFactory) CreateDefault() (client.Client, error) { return &MockClient{ Apps: dc.AppList, Containers: dc.ContainerList, + Jobs: dc.JobList, Credentials: dc.CredentialList, Volumes: dc.VolumeList, Secrets: dc.SecretList, @@ -75,6 +78,7 @@ func (dc *MockClientFactory) CreateDefault() (client.Client, error) { VolumeClasses: dc.VolumeClassList, AppItem: dc.AppItem, ContainerItem: dc.ContainerItem, + JobItem: dc.JobItem, CredentialItem: dc.CredentialItem, VolumeItem: dc.VolumeItem, SecretItem: dc.SecretItem, @@ -99,6 +103,8 @@ type MockClient struct { AppItem *apiv1.App Containers []apiv1.ContainerReplica ContainerItem *apiv1.ContainerReplica + Jobs []apiv1.Job + JobItem *apiv1.Job Credentials []apiv1.Credential CredentialItem *apiv1.Credential Volumes []apiv1.Volume @@ -510,6 +516,50 @@ func (m *MockClient) ContainerReplicaPortForward(ctx context.Context, name strin return nil, nil } +func (m *MockClient) JobList(ctx context.Context, opts *client.JobListOptions) ([]apiv1.Job, error) { + if m.Jobs != nil { + if opts == nil { + return m.Jobs, nil + } + // Do the filtering to make testing simpler + result := make([]apiv1.Job, 0, len(m.Jobs)) + for _, c := range m.Jobs { + if c.Spec.AppName == opts.App { + result = append(result, c) + } + } + return result, nil + } + return []apiv1.Job{{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{Name: "found.job"}, + Spec: apiv1.JobSpec{AppName: "found"}, + Status: v1.JobStatus{}, + }}, nil +} + +func (m *MockClient) JobGet(ctx context.Context, name string) (*apiv1.Job, error) { + if m.JobItem != nil { + return m.JobItem, nil + } + switch name { + case "dne": + return nil, fmt.Errorf("error: job %s does not exist", name) + case "found", "found.job": + return &apiv1.Job{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{Name: "found.job"}, + Spec: apiv1.JobSpec{AppName: "found"}, + Status: v1.JobStatus{}, + }, nil + } + return nil, nil +} + +func (m *MockClient) JobRestart(ctx context.Context, name string) error { + return nil +} + func (m *MockClient) VolumeList(ctx context.Context) ([]apiv1.Volume, error) { if m.Volumes != nil { return m.Volumes, nil diff --git a/pkg/cli/testdata/acorn/acorn_test_info.txt b/pkg/cli/testdata/acorn/acorn_test_info.txt index 822fb7419..e2bb6ee29 100644 --- a/pkg/cli/testdata/acorn/acorn_test_info.txt +++ b/pkg/cli/testdata/acorn/acorn_test_info.txt @@ -19,6 +19,7 @@ Available Commands: image Manage images info Info about acorn installation install Install and configure acorn in the cluster + job Manage jobs login Add registry credentials logout Remove registry credentials logs Log all workloads from an app diff --git a/pkg/client/client.go b/pkg/client/client.go index 5dcc23758..3e501fee0 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -240,6 +240,10 @@ type Client interface { ContainerReplicaExec(ctx context.Context, name string, args []string, tty bool, opts *ContainerReplicaExecOptions) (*term.ExecIO, error) ContainerReplicaPortForward(ctx context.Context, name string, port int) (PortForwardDialer, error) + JobList(ctx context.Context, opts *JobListOptions) ([]apiv1.Job, error) + JobGet(ctx context.Context, name string) (*apiv1.Job, error) + JobRestart(ctx context.Context, name string) error + VolumeList(ctx context.Context) ([]apiv1.Volume, error) VolumeGet(ctx context.Context, name string) (*apiv1.Volume, error) VolumeDelete(ctx context.Context, name string) (*apiv1.Volume, error) @@ -358,6 +362,10 @@ type KubeProxyAddressOptions struct { Region string `json:"region,omitempty"` } +type JobListOptions struct { + App string `json:"app,omitempty"` +} + type EventStreamOptions struct { Tail int `json:"tail,omitempty"` Follow bool `json:"follow,omitempty"` diff --git a/pkg/client/deferred.go b/pkg/client/deferred.go index 4f2da8014..70e2a3afb 100644 --- a/pkg/client/deferred.go +++ b/pkg/client/deferred.go @@ -234,6 +234,27 @@ func (d *DeferredClient) ContainerReplicaPortForward(ctx context.Context, contai return d.Client.ContainerReplicaPortForward(ctx, containerName, port) } +func (d *DeferredClient) JobList(ctx context.Context, opts *JobListOptions) ([]apiv1.Job, error) { + if err := d.create(); err != nil { + return nil, err + } + return d.Client.JobList(ctx, opts) +} + +func (d *DeferredClient) JobGet(ctx context.Context, name string) (*apiv1.Job, error) { + if err := d.create(); err != nil { + return nil, err + } + return d.Client.JobGet(ctx, name) +} + +func (d *DeferredClient) JobRestart(ctx context.Context, name string) error { + if err := d.create(); err != nil { + return err + } + return d.Client.JobRestart(ctx, name) +} + func (d *DeferredClient) VolumeList(ctx context.Context) ([]apiv1.Volume, error) { if err := d.create(); err != nil { return nil, err diff --git a/pkg/client/jobs.go b/pkg/client/jobs.go new file mode 100644 index 000000000..4972ced62 --- /dev/null +++ b/pkg/client/jobs.go @@ -0,0 +1,51 @@ +package client + +import ( + "context" + "sort" + + apiv1 "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + kclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +func (c *DefaultClient) JobGet(ctx context.Context, name string) (*apiv1.Job, error) { + job := &apiv1.Job{} + return job, c.Client.Get(ctx, kclient.ObjectKey{ + Name: name, + Namespace: c.Namespace, + }, job) +} + +func (c *DefaultClient) JobList(ctx context.Context, opts *JobListOptions) ([]apiv1.Job, error) { + result, listOptions := &apiv1.JobList{}, &kclient.ListOptions{Namespace: c.Namespace} + + if opts != nil && opts.App != "" { + listOptions.FieldSelector = fields.SelectorFromSet(map[string]string{"metadata.name": opts.App}) + } + + if err := c.Client.List(ctx, result, listOptions); err != nil { + return nil, err + } + + sort.Slice(result.Items, func(i, j int) bool { + if result.Items[i].CreationTimestamp.Time == result.Items[j].CreationTimestamp.Time { + return result.Items[i].Name < result.Items[j].Name + } + return result.Items[i].CreationTimestamp.After(result.Items[j].CreationTimestamp.Time) + }) + + return result.Items, nil +} + +func (c *DefaultClient) JobRestart(ctx context.Context, name string) error { + rs := &apiv1.JobRestart{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: c.Namespace}} + + return c.RESTClient.Post(). + Namespace(c.Namespace). + Resource("jobs"). + Name(name). + SubResource("restart"). + Body(rs).Do(ctx).Error() +} diff --git a/pkg/client/multi.go b/pkg/client/multi.go index 4739f6ba0..82f2d3625 100644 --- a/pkg/client/multi.go +++ b/pkg/client/multi.go @@ -339,6 +339,31 @@ func (m *MultiClient) ContainerReplicaPortForward(ctx context.Context, name stri return dialer, err } +func (m *MultiClient) JobList(ctx context.Context, opts *JobListOptions) ([]apiv1.Job, error) { + if opts != nil && opts.App != "" { + return onOneList(ctx, m.Factory, opts.App, func(name string, c Client) ([]apiv1.Job, error) { + opts.App = name + return c.JobList(ctx, opts) + }) + } + return aggregate(ctx, m.Factory, func(c Client) ([]apiv1.Job, error) { + return c.JobList(ctx, opts) + }) +} + +func (m *MultiClient) JobGet(ctx context.Context, name string) (*apiv1.Job, error) { + return onOne(ctx, m.Factory, name, func(name string, c Client) (*apiv1.Job, error) { + return c.JobGet(ctx, name) + }) +} + +func (m *MultiClient) JobRestart(ctx context.Context, name string) error { + _, err := onOne(ctx, m.Factory, name, func(name string, c Client) (*apiv1.App, error) { + return &apiv1.App{}, c.JobRestart(ctx, name) + }) + return err +} + func (m *MultiClient) VolumeList(ctx context.Context) ([]apiv1.Volume, error) { return aggregate(ctx, m.Factory, func(c Client) ([]apiv1.Volume, error) { return c.VolumeList(ctx) diff --git a/pkg/controller/appstatus/jobs.go b/pkg/controller/appstatus/jobs.go index ed2cd01a2..353b99d45 100644 --- a/pkg/controller/appstatus/jobs.go +++ b/pkg/controller/appstatus/jobs.go @@ -8,8 +8,11 @@ import ( v1 "github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1" "github.com/acorn-io/runtime/pkg/labels" "github.com/acorn-io/runtime/pkg/ports" + "github.com/acorn-io/z" + cronv3 "github.com/robfig/cron/v3" batchv1 "k8s.io/api/batch/v1" apierror "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func (a *appStatusRenderer) readJobs() error { @@ -39,8 +42,11 @@ func (a *appStatusRenderer) readJobs() error { c.TransitioningMessages = append(c.TransitioningMessages, summary.TransitioningMessages...) c.ErrorMessages = append(c.ErrorMessages, summary.ErrorMessages...) c.RunningCount = summary.RunningCount + c.JobName = jobName + c.JobNamespace = a.app.Status.Namespace if c.Skipped { + c.CreationTime = &a.app.CreationTimestamp c.State = "completed" c.Ready = true c.UpToDate = true @@ -62,17 +68,39 @@ func (a *appStatusRenderer) readJobs() error { } else if err != nil { return err } else { + c.CreationTime = &cronJob.CreationTimestamp + c.LastRun = cronJob.Status.LastScheduleTime + c.CompletionTime = cronJob.Status.LastSuccessfulTime + c.Schedule = cronJob.Spec.Schedule c.Defined = true c.UpToDate = cronJob.Annotations[labels.AcornAppGeneration] == strconv.Itoa(int(a.app.Generation)) - c.RunningCount = len(cronJob.Status.Active) + for _, nj := range cronJob.Status.Active { + nestedJob := &batchv1.Job{} + err := a.c.Get(a.ctx, router.Key(nj.Namespace, nj.Name), nestedJob) + if err != nil { + return err + } + c.RunningCount += int(nestedJob.Status.Active) + c.ErrorCount += int(nestedJob.Status.Failed) + } + if cronJob.Status.LastSuccessfulTime != nil { c.CreateEventSucceeded = true c.Ready = c.UpToDate } + + nextRun, err := nextRun(c.Schedule, cronJob.CreationTimestamp, cronJob.Status.LastScheduleTime) + if err != nil { + return err + } + c.NextRun = nextRun } } else if err != nil { return err } else { + c.CreationTime = &job.CreationTimestamp + c.CompletionTime = job.Status.CompletionTime + c.LastRun = job.Status.StartTime c.Defined = true c.UpToDate = job.Annotations[labels.AcornAppGeneration] == strconv.Itoa(int(a.app.Generation)) if job.Status.Succeeded > 0 { @@ -125,6 +153,8 @@ func (a *appStatusRenderer) readJobs() error { c.State = "failing" } else if c.RunningCount > 0 { c.State = "running" + } else { + c.State = "pending" } } else if c.Defined { if len(c.ErrorMessages) > 0 { @@ -180,3 +210,19 @@ func (a *appStatusRenderer) isJobReady(jobName string) (ready bool, err error) { return true, nil } + +// nextRun uses the cron expression library used by k8s to determine the next run time of a cronjob. +func nextRun(expression string, creation metav1.Time, last *metav1.Time) (*metav1.Time, error) { + schedule, err := cronv3.ParseStandard(expression) + if err != nil { + return nil, err + } + + if last == nil { + last = &creation + } + + return z.Pointer( + metav1.NewTime(schedule.Next(last.Time)), + ), nil +} diff --git a/pkg/install/role.yaml b/pkg/install/role.yaml index a496d3894..a57d4aa3d 100644 --- a/pkg/install/role.yaml +++ b/pkg/install/role.yaml @@ -52,6 +52,7 @@ rules: resources: - jobs - cronjobs + - cronjobs/status - verbs: ["*"] apiGroups: ["apps"] resources: diff --git a/pkg/mocks/mock_client.go b/pkg/mocks/mock_client.go index 6864ebdc0..c7608c738 100644 --- a/pkg/mocks/mock_client.go +++ b/pkg/mocks/mock_client.go @@ -690,6 +690,50 @@ func (mr *MockClientMockRecorder) Info(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockClient)(nil).Info), arg0) } +// JobGet mocks base method. +func (m *MockClient) JobGet(arg0 context.Context, arg1 string) (*v1.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "JobGet", arg0, arg1) + ret0, _ := ret[0].(*v1.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// JobGet indicates an expected call of JobGet. +func (mr *MockClientMockRecorder) JobGet(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JobGet", reflect.TypeOf((*MockClient)(nil).JobGet), arg0, arg1) +} + +// JobList mocks base method. +func (m *MockClient) JobList(arg0 context.Context, arg1 *client.JobListOptions) ([]v1.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "JobList", arg0, arg1) + ret0, _ := ret[0].([]v1.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// JobList indicates an expected call of JobList. +func (mr *MockClientMockRecorder) JobList(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JobList", reflect.TypeOf((*MockClient)(nil).JobList), arg0, arg1) +} + +// JobRestart mocks base method. +func (m *MockClient) JobRestart(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "JobRestart", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// JobRestart indicates an expected call of JobRestart. +func (mr *MockClientMockRecorder) JobRestart(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "JobRestart", reflect.TypeOf((*MockClient)(nil).JobRestart), arg0, arg1) +} + // KubeConfig mocks base method. func (m *MockClient) KubeConfig(arg0 context.Context, arg1 *client.KubeProxyAddressOptions) ([]byte, error) { m.ctrl.T.Helper() diff --git a/pkg/openapi/generated/openapi_generated.go b/pkg/openapi/generated/openapi_generated.go index dcf2253bf..031572f96 100644 --- a/pkg/openapi/generated/openapi_generated.go +++ b/pkg/openapi/generated/openapi_generated.go @@ -70,6 +70,11 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.Info": schema_pkg_apis_apiacornio_v1_Info(ref), "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.InfoList": schema_pkg_apis_apiacornio_v1_InfoList(ref), "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.InfoSpec": schema_pkg_apis_apiacornio_v1_InfoSpec(ref), + "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.Job": schema_pkg_apis_apiacornio_v1_Job(ref), + "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.JobColumns": schema_pkg_apis_apiacornio_v1_JobColumns(ref), + "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.JobList": schema_pkg_apis_apiacornio_v1_JobList(ref), + "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.JobRestart": schema_pkg_apis_apiacornio_v1_JobRestart(ref), + "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.JobSpec": schema_pkg_apis_apiacornio_v1_JobSpec(ref), "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.LogMessage": schema_pkg_apis_apiacornio_v1_LogMessage(ref), "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.LogOptions": schema_pkg_apis_apiacornio_v1_LogOptions(ref), "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.NestedImage": schema_pkg_apis_apiacornio_v1_NestedImage(ref), @@ -4302,6 +4307,195 @@ func schema_pkg_apis_apiacornio_v1_InfoSpec(ref common.ReferenceCallback) common } } +func schema_pkg_apis_apiacornio_v1_Job(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"), + }, + }, + "spec": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.JobSpec"), + }, + }, + "status": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1.JobStatus"), + }, + }, + }, + }, + }, + Dependencies: []string{ + "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.JobSpec", "github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1.JobStatus", "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"}, + } +} + +func schema_pkg_apis_apiacornio_v1_JobColumns(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "state": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "app": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "nextRun": { + SchemaProps: spec.SchemaProps{ + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, + }, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, + } +} + +func schema_pkg_apis_apiacornio_v1_JobList(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"), + }, + }, + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.Job"), + }, + }, + }, + }, + }, + }, + Required: []string{"items"}, + }, + }, + Dependencies: []string{ + "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1.Job", "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"}, + } +} + +func schema_pkg_apis_apiacornio_v1_JobRestart(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"), + }, + }, + }, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"}, + } +} + +func schema_pkg_apis_apiacornio_v1_JobSpec(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "appName": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "jobName": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "schedule": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + } +} + func schema_pkg_apis_apiacornio_v1_LogMessage(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -9269,6 +9463,49 @@ func schema_pkg_apis_internalacornio_v1_JobStatus(ref common.ReferenceCallback) }, }, }, + "schedule": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "name": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "namespace": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + "creationTime": { + SchemaProps: spec.SchemaProps{ + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, + "startTime": { + SchemaProps: spec.SchemaProps{ + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, + "completionTime": { + SchemaProps: spec.SchemaProps{ + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, + "lastRun": { + SchemaProps: spec.SchemaProps{ + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, + "nextRun": { + SchemaProps: spec.SchemaProps{ + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, "runningCount": { SchemaProps: spec.SchemaProps{ Type: []string{"integer"}, @@ -9324,7 +9561,7 @@ func schema_pkg_apis_internalacornio_v1_JobStatus(ref common.ReferenceCallback) }, }, Dependencies: []string{ - "github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1.DependencyStatus", "github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1.ExpressionError"}, + "github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1.DependencyStatus", "github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1.ExpressionError", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, } } diff --git a/pkg/server/registry/apigroups/acorn/apigroup.go b/pkg/server/registry/apigroups/acorn/apigroup.go index ddc797ee7..0edba445f 100644 --- a/pkg/server/registry/apigroups/acorn/apigroup.go +++ b/pkg/server/registry/apigroups/acorn/apigroup.go @@ -18,6 +18,7 @@ import ( "github.com/acorn-io/runtime/pkg/server/registry/apigroups/acorn/imageallowrules" "github.com/acorn-io/runtime/pkg/server/registry/apigroups/acorn/images" "github.com/acorn-io/runtime/pkg/server/registry/apigroups/acorn/info" + "github.com/acorn-io/runtime/pkg/server/registry/apigroups/acorn/jobs" "github.com/acorn-io/runtime/pkg/server/registry/apigroups/acorn/projects" "github.com/acorn-io/runtime/pkg/server/registry/apigroups/acorn/regions" "github.com/acorn-io/runtime/pkg/server/registry/apigroups/acorn/secrets" @@ -107,6 +108,8 @@ func Stores(c kclient.WithWatch, cfg, localCfg *clientgo.Config) (map[string]res "regions": regions.NewStorage(c), "imageallowrules": imageallowrules.NewStorage(c), "events": events.NewStorage(c), + "jobs": jobs.NewStorage(c), + "jobs/restart": jobs.NewRestart(c), } return stores, nil diff --git a/pkg/server/registry/apigroups/acorn/jobs/restart.go b/pkg/server/registry/apigroups/acorn/jobs/restart.go new file mode 100644 index 000000000..ba403b8fb --- /dev/null +++ b/pkg/server/registry/apigroups/acorn/jobs/restart.go @@ -0,0 +1,108 @@ +package jobs + +import ( + "context" + + "github.com/acorn-io/mink/pkg/stores" + "github.com/acorn-io/mink/pkg/types" + "github.com/acorn-io/mink/pkg/validator" + apiv1 "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1" + kclient "github.com/acorn-io/runtime/pkg/k8sclient" + "github.com/acorn-io/z" + batchv1 "k8s.io/api/batch/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func NewRestart(c client.WithWatch) rest.Storage { + return stores.NewBuilder(c.Scheme(), &apiv1.JobRestart{}). + WithCreate(&restartStrategy{client: c}). + WithValidateName(validator.NoValidation). + Build() +} + +type restartStrategy struct { + client client.WithWatch +} + +func (s *restartStrategy) New() types.Object { + return &apiv1.JobRestart{} +} + +func (s *restartStrategy) Create(ctx context.Context, obj types.Object) (types.Object, error) { + ri, _ := request.RequestInfoFrom(ctx) + + if ri.Namespace == "" || ri.Name == "" { + return obj, nil + } + + // Find the job to restart by looking it up with the name and namespace from the request info. + job := &apiv1.Job{} + err := s.client.Get(ctx, kclient.ObjectKey{Namespace: ri.Namespace, Name: ri.Name}, job) + if err != nil { + return nil, err + } + + key := kclient.ObjectKey{Namespace: job.Status.JobNamespace, Name: job.Status.JobName} + + cronToRestart := &batchv1.CronJob{} + if err = s.client.Get(ctx, key, cronToRestart); err == nil { + return obj, s.restartCronJob(ctx, cronToRestart) + } + + if !apierrors.IsNotFound(err) { + return obj, err + } + + jobToRestart := &batchv1.Job{} + if err = s.client.Get(ctx, key, jobToRestart); err != nil { + return obj, err + } + + // Delete the Job and set the propagation policy to foreground so that the dependent resources (Pods) + // created by the Job are also deleted. + opts := &client.DeleteOptions{PropagationPolicy: z.Pointer(metav1.DeletePropagationForeground)} + return obj, s.client.Delete(ctx, jobToRestart, opts) +} + +func (s *restartStrategy) restartCronJob(ctx context.Context, cron *batchv1.CronJob) error { + // Find all active jobs and delete them to make way for new ones. + for _, jobRef := range cron.Status.Active { + job := &batchv1.Job{ObjectMeta: metav1.ObjectMeta{Name: jobRef.Name, Namespace: jobRef.Namespace}} + + // Delete the Job and set the propagation policy to foreground so that the dependent resources (Pods) + // created by the Job are also deleted. + opts := &client.DeleteOptions{PropagationPolicy: z.Pointer(metav1.DeletePropagationForeground)} + if err := s.client.Delete(ctx, job, opts); err != nil && !apierrors.IsNotFound(err) { + return err + } + } + + // Create a new job from the cron's template to "restart" it. This is necessary because + // the cronjob controller will not create a new job until the next scheduled run. By creating + // it here we can ensure that the job is restarted immediately. + template := cron.Spec.JobTemplate + spec := template.Spec + spec.TTLSecondsAfterFinished = new(int32) // Want to delete the job immediately after it finishes. + newJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: cron.Name + "-", + Namespace: cron.Namespace, + Labels: template.Labels, + Annotations: template.Annotations, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(cron, batchv1.SchemeGroupVersion.WithKind("CronJob")), + }, + }, + Spec: spec, + } + if err := s.client.Create(ctx, newJob); err != nil { + return err + } + + cron.Status.LastScheduleTime = &metav1.Time{Time: newJob.CreationTimestamp.Time} + return s.client.Status().Update(ctx, cron) +} diff --git a/pkg/server/registry/apigroups/acorn/jobs/storage.go b/pkg/server/registry/apigroups/acorn/jobs/storage.go new file mode 100644 index 000000000..d8299b0d9 --- /dev/null +++ b/pkg/server/registry/apigroups/acorn/jobs/storage.go @@ -0,0 +1,19 @@ +package jobs + +import ( + "github.com/acorn-io/mink/pkg/stores" + apiv1 "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1" + "github.com/acorn-io/runtime/pkg/tables" + "k8s.io/apiserver/pkg/registry/rest" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func NewStorage(c client.WithWatch) rest.Storage { + strategy := NewStrategy(c) + + return stores.NewBuilder(c.Scheme(), &apiv1.Job{}). + WithGet(strategy). + WithList(strategy). + WithTableConverter(tables.JobConverter). + Build() +} diff --git a/pkg/server/registry/apigroups/acorn/jobs/strategy.go b/pkg/server/registry/apigroups/acorn/jobs/strategy.go new file mode 100644 index 000000000..3236d4891 --- /dev/null +++ b/pkg/server/registry/apigroups/acorn/jobs/strategy.go @@ -0,0 +1,87 @@ +package jobs + +import ( + "context" + + "github.com/acorn-io/mink/pkg/strategy" + "github.com/acorn-io/mink/pkg/types" + apiv1 "github.com/acorn-io/runtime/pkg/apis/api.acorn.io/v1" + internalapiv1 "github.com/acorn-io/runtime/pkg/apis/internal.acorn.io/v1" + "github.com/acorn-io/runtime/pkg/publicname" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/storage" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type Strategy struct { + client client.WithWatch +} + +func NewStrategy(c client.WithWatch) *Strategy { + return &Strategy{client: c} +} + +func (s *Strategy) NewList() types.ObjectList { + return &apiv1.JobList{} +} + +func (s *Strategy) New() types.Object { + return &apiv1.Job{} +} + +func (s *Strategy) List(ctx context.Context, namespace string, options storage.ListOptions) (types.ObjectList, error) { + apps := &apiv1.AppList{} + if err := s.client.List(ctx, apps, strategy.ToListOpts(namespace, options)); err != nil { + return nil, err + } + + acornJobs := apiv1.JobList{} + for _, app := range apps.Items { + for _, jobStatus := range app.Status.AppStatus.Jobs { + acornJobs.Items = append(acornJobs.Items, jobStatusToJob(app.Namespace, app, jobStatus)) + } + } + + return &acornJobs, nil +} + +func (s *Strategy) Get(ctx context.Context, namespace, name string) (types.Object, error) { + list, err := s.List(ctx, namespace, storage.ListOptions{}) + if err != nil { + return nil, err + } + + for _, job := range list.(*apiv1.JobList).Items { + if job.Name == name { + return &job, nil + } + } + + return nil, apierrors.NewNotFound(schema.GroupResource{ + Group: apiv1.SchemeGroupVersion.Group, + Resource: "job", + }, name) +} + +func jobStatusToJob(namespace string, app apiv1.App, jobStatus internalapiv1.JobStatus) apiv1.Job { + creationTime := jobStatus.CreationTime + if creationTime == nil { + creationTime = &metav1.Time{} + } + + return apiv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: publicname.ForChild(&app, jobStatus.JobName), + Namespace: namespace, + CreationTimestamp: *creationTime, + }, + Spec: apiv1.JobSpec{ + JobName: jobStatus.JobName, + AppName: app.Name, + Schedule: jobStatus.Schedule, + }, + Status: jobStatus, + } +} diff --git a/pkg/tables/tables.go b/pkg/tables/tables.go index 5e0d36dbd..54a4db7b7 100644 --- a/pkg/tables/tables.go +++ b/pkg/tables/tables.go @@ -81,6 +81,15 @@ var ( } ContainerConverter = MustConverter(Container) + Job = [][]string{ + {"Name", "{{ . | name }}"}, + {"State", "Status.State"}, + {"Last Run", "{{lastRun .Status.LastRun }}"}, + {"Next Run", "{{nextRun .Status.NextRun }}"}, + {"Created", "{{ago .CreationTimestamp}}"}, + } + JobConverter = MustConverter(Job) + CredentialClient = [][]string{ {"Server", "ServerAddress"}, {"Username", "Username"},