diff --git a/.github/workflows/chart-lint.yml b/.github/workflows/chart-lint.yml index b14f9425..78d550bf 100644 --- a/.github/workflows/chart-lint.yml +++ b/.github/workflows/chart-lint.yml @@ -13,7 +13,12 @@ jobs: steps: - uses: actions/checkout@v3 + - name: Determine Go version from go.mod + run: echo "GO_VERSION=$(grep "go 1." go.mod | cut -d " " -f 2)" >> $GITHUB_ENV + - uses: actions/setup-go@v3 + with: + go-version: ${{ env.GO_VERSION }} - uses: actions/cache@v3 with: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6d2b748a..c9c3724a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -37,4 +37,4 @@ jobs: CC_TEST_REPORTER_ID: 273b145c3b441d2abc054951c66b99a6d55f9eb4a25ddacd0731e14fcc921dc2 with: coverageLocations: cover.out:gocov - prefix: github.com/${{ github.event.repository.name }} + prefix: github.com/${{ github.repository }} diff --git a/base_command.go b/base_command.go deleted file mode 100644 index c9e62aa2..00000000 --- a/base_command.go +++ /dev/null @@ -1,44 +0,0 @@ -package main - -import ( - "context" - - "github.com/ccremer/clustercode/api/v1alpha1" - batchv1 "k8s.io/api/batch/v1" - "k8s.io/apimachinery/pkg/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -type commandContext struct { - context.Context - - kubeconfig *rest.Config - kube client.Client - scheme *runtime.Scheme -} - -var createClientFn = func(ctx *commandContext) error { - kube, err := client.New(ctx.kubeconfig, client.Options{Scheme: ctx.scheme}) - ctx.kube = kube - return err -} - -var registerSchemesFn = func(ctx *commandContext) error { - ctx.scheme = runtime.NewScheme() - b := &runtime.SchemeBuilder{} - b.Register( - clientgoscheme.AddToScheme, - batchv1.AddToScheme, - v1alpha1.AddToScheme, - ) - return b.AddToScheme(ctx.scheme) -} - -var loadKubeConfigFn = func(ctx *commandContext) error { - clientConfig, err := ctrl.GetConfig() - ctx.kubeconfig = clientConfig - return err -} diff --git a/charts/clustercode/Makefile b/charts/clustercode/Makefile index 6e84bcdb..8dbc0203 100644 --- a/charts/clustercode/Makefile +++ b/charts/clustercode/Makefile @@ -14,6 +14,7 @@ else sed := sed -i endif +.PHONY: $(rbac_gen_tgt) $(rbac_gen_tgt): @cp $(rbac_gen_src) $@ @yq -i e '.metadata.name="{{ include \"clustercode.fullname\" . }}-manager", del(.metadata.creationTimestamp)' $@ diff --git a/charts/clustercode/templates/clusterrole.yaml b/charts/clustercode/templates/clusterrole.yaml index 157725eb..1128a37e 100644 --- a/charts/clustercode/templates/clusterrole.yaml +++ b/charts/clustercode/templates/clusterrole.yaml @@ -11,8 +11,8 @@ rules: - apiGroups: - clustercode.github.io resources: - - clustercodeplans - - clustercodetasks + - blueprints + - tasks verbs: - get - list @@ -20,10 +20,10 @@ rules: - apiGroups: - clustercode.github.io resources: - - clustercodeplans/finalizers - - clustercodeplans/status - - clustercodetasks/finalizers - - clustercodetasks/status + - blueprints/finalizers + - blueprints/status + - tasks/finalizers + - tasks/status verbs: - get --- @@ -39,8 +39,8 @@ rules: - apiGroups: - clustercode.github.io resources: - - clustercodeplans - - clustercodetasks + - blueprints + - tasks verbs: - create - delete @@ -52,12 +52,22 @@ rules: - apiGroups: - clustercode.github.io resources: - - clustercodeplans/finalizers - - clustercodeplans/status - - clustercodetasks/finalizers - - clustercodetasks/status + - blueprints/finalizers + - blueprints/status + - tasks/finalizers + - tasks/status verbs: - get - patch - update + - apiGroups: + - "" + resources: + - configmaps + verbs: + - get + - list + - watch + - create + - update {{- end }} diff --git a/charts/clustercode/templates/operator-clusterrole.yaml b/charts/clustercode/templates/operator-clusterrole.yaml index 583392c2..0d2e4692 100644 --- a/charts/clustercode/templates/operator-clusterrole.yaml +++ b/charts/clustercode/templates/operator-clusterrole.yaml @@ -53,6 +53,7 @@ rules: - apiGroups: - clustercode.github.io resources: + - blueprints/finalizers - blueprints/status verbs: - get @@ -73,6 +74,7 @@ rules: - apiGroups: - clustercode.github.io resources: + - tasks/finalizers - tasks/status verbs: - get @@ -108,6 +110,7 @@ rules: - delete - get - list + - watch - apiGroups: - rbac.authorization.k8s.io resources: @@ -118,3 +121,4 @@ rules: - delete - get - list + - watch diff --git a/cleanup_command.go b/cleanup_command.go index 56fad5b3..03088c51 100644 --- a/cleanup_command.go +++ b/cleanup_command.go @@ -1,82 +1,26 @@ package main import ( - "os" - "path/filepath" - - "github.com/ccremer/clustercode/api/v1alpha1" - "github.com/ccremer/clustercode/pkg/operator/controllers" + "github.com/ccremer/clustercode/pkg/cleanupcmd" "github.com/urfave/cli/v2" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/rest" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" + controllerruntime "sigs.k8s.io/controller-runtime" ) -type cleanupCommand struct { - kubeconfig *rest.Config - kube client.Client - - TaskName string - TaskNamespace string - SourceRootDir string -} - -var cleanupCommandName = "cleanup" -var cleanupLog = ctrl.Log.WithName("cleanup") - func newCleanupCommand() *cli.Command { - command := &cleanupCommand{} + command := cleanupcmd.Command{} return &cli.Command{ - Name: cleanupCommandName, + Name: "cleanup", Usage: "Remove intermediary files and finish the task", - Action: command.execute, + Before: LogMetadata, + Action: func(ctx *cli.Context) error { + command.Log = AppLogger(ctx).WithName(ctx.Command.Name) + controllerruntime.SetLogger(command.Log) + return command.Execute(controllerruntime.LoggerInto(ctx.Context, command.Log)) + }, Flags: []cli.Flag{ newTaskNameFlag(&command.TaskName), - newNamespaceFlag(&command.TaskNamespace), + newNamespaceFlag(&command.Namespace), newSourceRootDirFlag(&command.SourceRootDir), }, } } - -func (c *cleanupCommand) execute(ctx *cli.Context) error { - - registerScheme() - if err := createClientFn(&commandContext{}); err != nil { - return err - } - - nsName := types.NamespacedName{Namespace: c.TaskNamespace, Name: c.TaskName} - task := &v1alpha1.Task{} - cleanupLog.Info("get task", "name", nsName.String()) - if err := c.kube.Get(ctx.Context, nsName, task); err != nil { - return err - } - - intermediaryFiles, err := filepath.Glob(filepath.Join(c.SourceRootDir, controllers.IntermediateSubMountPath, task.Spec.TaskId.String()+"*")) - if err != nil { - return err - } - cleanupLog.Info("deleting intermediary files", "files", intermediaryFiles) - deleteFiles(intermediaryFiles) - - sourceFile := filepath.Join(c.SourceRootDir, controllers.SourceSubMountPath, task.Spec.SourceUrl.GetPath()) - cleanupLog.Info("deleting source file", "file", sourceFile) - if err := os.Remove(sourceFile); err != nil { - return err - } - if err := c.kube.Delete(ctx.Context, task); err != nil { - return err - } - return nil -} - -func deleteFiles(files []string) { - for _, file := range files { - if err := os.Remove(file); err != nil { - cleanupLog.Info("could not delete file", "file", file, "error", err.Error()) - } else { - cleanupLog.V(1).Info("deleted file", "file", file) - } - } -} diff --git a/count_command.go b/count_command.go index ab39ebba..1877f69f 100644 --- a/count_command.go +++ b/count_command.go @@ -1,168 +1,26 @@ package main import ( - "context" - "fmt" - "os" - "path/filepath" - "sort" - "strings" - - "github.com/ccremer/clustercode/api/v1alpha1" - "github.com/ccremer/clustercode/pkg/operator/controllers" + "github.com/ccremer/clustercode/pkg/countcmd" "github.com/urfave/cli/v2" - v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/rest" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + controllerruntime "sigs.k8s.io/controller-runtime" ) -type countCommand struct { - kubeconfig *rest.Config - kube client.Client - - TaskName string - TaskNamespace string - SourceRootDir string -} - -var countCommandName = "count" -var countLog = ctrl.Log.WithName("count") - func newCountCommand() *cli.Command { - command := &countCommand{} + command := &countcmd.Command{} return &cli.Command{ - Name: countCommandName, + Name: "count", Usage: "Counts the number of generated intermediary media files", - Action: command.execute, + Before: LogMetadata, + Action: func(ctx *cli.Context) error { + command.Log = AppLogger(ctx).WithName(ctx.Command.Name) + controllerruntime.SetLogger(command.Log) + return command.Execute(controllerruntime.LoggerInto(ctx.Context, command.Log)) + }, Flags: []cli.Flag{ newTaskNameFlag(&command.TaskName), - newNamespaceFlag(&command.TaskNamespace), + newNamespaceFlag(&command.Namespace), newSourceRootDirFlag(&command.SourceRootDir), }, } } - -func (c *countCommand) execute(ctx *cli.Context) error { - - registerScheme() - err := createClientFn(&commandContext{}) - if err != nil { - return err - } - task, err := c.getTask() - if err != nil { - return err - } - countLog = countLog.WithValues("task", task.Name) - countLog.Info("found task", "task", task) - - files, err := c.scanSegmentFiles(task.Spec.TaskId.String() + "_") - if err != nil { - return err - } - countLog.Info("found segments", "count", len(files)) - - err = c.createFileList(files, task) - if err != nil { - return err - } - - err = c.updateTask(task, len(files)) - if err != nil { - return err - } - countLog.Info("updated task") - - return nil -} - -func (c *countCommand) updateTask(task *v1alpha1.Task, count int) error { - task.Spec.SlicesPlannedCount = count - err := c.kube.Update(context.Background(), task) - if err != nil { - return err - } - return nil -} - -func (c *countCommand) createFileList(files []string, task *v1alpha1.Task) error { - var fileList []string - for _, file := range files { - fileList = append(fileList, fmt.Sprintf("file '%s'", file)) - } - data := strings.Join(fileList, "\n") + "\n" - cm := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: task.Spec.FileListConfigMapRef, - Namespace: task.Namespace, - Labels: labels.Merge(controllers.ClusterCodeLabels, task.Spec.TaskId.AsLabels()), - }, - Data: map[string]string{ - v1alpha1.ConfigMapFileName: data, - }, - } - if err := controllerutil.SetControllerReference(task, cm.GetObjectMeta(), scheme); err != nil { - return fmt.Errorf("could not set controller reference: %w", err) - } - if err := c.kube.Create(context.Background(), cm); err != nil { - if apierrors.IsAlreadyExists(err) { - if err := c.kube.Update(context.Background(), cm); err != nil { - return fmt.Errorf("could not update config map: %w", err) - } - countLog.Info("updated config map", "configmap", cm.Name) - } - return fmt.Errorf("could not create config map: %w", err) - } else { - countLog.Info("created config map", "configmap", cm.Name, "data", cm.Data) - } - return nil -} - -func (c *countCommand) scanSegmentFiles(prefix string) ([]string, error) { - var files []string - root := filepath.Join(c.SourceRootDir, controllers.IntermediateSubMountPath) - err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { - if err != nil { - // could not access file, let's prevent a panic - return err - } - if info.IsDir() { - return nil - } - if matchesTaskSegment(path, prefix) { - return nil - } - files = append(files, path) - return nil - }) - if len(files) <= 0 { - return files, fmt.Errorf("could not find any segments in '%s", root) - } - sort.Strings(files) - return files, err -} - -func matchesTaskSegment(path string, prefix string) bool { - base := filepath.Base(path) - return strings.HasPrefix(base, prefix) && !strings.Contains(base, v1alpha1.MediaFileDoneSuffix) -} - -func (c *countCommand) getTask() (*v1alpha1.Task, error) { - ctx := context.Background() - task := &v1alpha1.Task{} - name := types.NamespacedName{ - Name: c.TaskName, - Namespace: c.TaskNamespace, - } - err := c.kube.Get(ctx, name, task) - if err != nil { - return &v1alpha1.Task{}, err - } - return task, nil -} diff --git a/flags.go b/flags.go index 049f3641..aba7c0ec 100644 --- a/flags.go +++ b/flags.go @@ -7,6 +7,11 @@ import ( "github.com/urfave/cli/v2" ) +const ( + ClusterRole = "ClusterRole" + Role = "Role" +) + type EnumValue struct { Enum []string Default string @@ -39,7 +44,7 @@ func newTaskNameFlag(dest *string) *cli.StringFlag { } func newNamespaceFlag(dest *string) *cli.StringFlag { - return &cli.StringFlag{Name: "namespace", EnvVars: envVars("NAMESPACE"), Required: true, + return &cli.StringFlag{Name: "namespace", Aliases: []string{"n"}, EnvVars: envVars("NAMESPACE"), Required: true, Usage: "Namespace in which to find the resource.", Destination: dest, } @@ -62,6 +67,15 @@ func newScanRoleKindFlag() *cli.GenericFlag { } } +func newLogFormatFlag() *cli.GenericFlag { + enum := &EnumValue{Enum: []string{"console", "json"}, Default: "console"} + return &cli.GenericFlag{Name: "log-format", EnvVars: envVars("LOG_FORMAT"), + Usage: "sets the log format", + Category: "Encoding", + DefaultText: fmt.Sprintf("%q [%s]", enum.Default, strings.Join(enum.Enum, ", ")), + Value: enum, + } +} func newSourceRootDirFlag(dest *string) *cli.StringFlag { return &cli.StringFlag{Name: "source-root-dir", EnvVars: envVars("SOURCE_ROOT_DIR"), Usage: "Directory path where to find the source files", diff --git a/go.mod b/go.mod index ff11f6bd..9e846617 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,9 @@ module github.com/ccremer/clustercode go 1.18 require ( - github.com/ccremer/go-command-pipeline v0.19.0 - github.com/go-logr/logr v1.2.0 - github.com/go-logr/zapr v1.2.0 + github.com/ccremer/go-command-pipeline v0.20.0 + github.com/go-logr/logr v1.2.3 + github.com/go-logr/zapr v1.2.3 github.com/urfave/cli/v2 v2.11.1 go.uber.org/zap v1.23.0 k8s.io/api v0.24.3 diff --git a/go.sum b/go.sum index e1c75422..ed7b333b 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= -github.com/ccremer/go-command-pipeline v0.19.0 h1:2Mx0FX7cRzDyN92vTyc2QyYQGW3LBzDjeJpRGxuJdPo= -github.com/ccremer/go-command-pipeline v0.19.0/go.mod h1:uTtRkKisQugA2PNMf1V+lN2Jcv1fH5hnrAJHTRHpfJo= +github.com/ccremer/go-command-pipeline v0.20.0 h1:2bjmhyvQsbD9ZARGtiW+hxdN2vANlVXCHU+0PoZqeME= +github.com/ccremer/go-command-pipeline v0.20.0/go.mod h1:uTtRkKisQugA2PNMf1V+lN2Jcv1fH5hnrAJHTRHpfJo= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= @@ -155,10 +155,13 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= -github.com/go-logr/logr v1.2.0 h1:QK40JKJyMdUDz+h+xvCsru/bJhvG0UxvePV0ufL/AcE= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/zapr v1.2.0 h1:n4JnPI1T3Qq1SFEi/F8rwLrZERp2bso19PJZDB9dayk= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/zapr v1.2.0/go.mod h1:Qa4Bsj2Vb+FAVeAKsLD8RLQ+YRJB8YDmOAKxaBQf7Ro= +github.com/go-logr/zapr v1.2.3 h1:a9vnzlIBPQBBkeaR9IuMUfmVOrQlkoC4YfPoFkX3T7A= +github.com/go-logr/zapr v1.2.3/go.mod h1:eIauM6P8qSvTw5o2ez6UEAfGjQKrxQTl5EoK+Qa2oG4= github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY= github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= diff --git a/logger.go b/logger.go index 38ae851b..264a6808 100644 --- a/logger.go +++ b/logger.go @@ -1,7 +1,8 @@ package main import ( - "log" + "context" + "fmt" "os" "runtime" "strings" @@ -21,6 +22,11 @@ func AppLogger(c *cli.Context) logr.Logger { return c.Context.Value(loggerContextKey{}).(*atomic.Value).Load().(logr.Logger) } +// SetLogger copies the application-wide logger instance from cli.Context to new context using logr.NewContext. +func SetLogger(ctx *cli.Context) context.Context { + return logr.NewContext(ctx.Context, AppLogger(ctx)) +} + // LogMetadata prints various metadata to the root logger. // It prints version, architecture and current user ID and returns nil. func LogMetadata(c *cli.Context) error { @@ -41,37 +47,37 @@ func LogMetadata(c *cli.Context) error { } func setupLogging(c *cli.Context) error { - logger := newZapLogger(appName, c.Bool("debug"), usesProductionLoggingConfig(c)) + logger, err := newZapLogger(appName, c.Int("log-level"), usesProductionLoggingConfig(c)) c.Context.Value(loggerContextKey{}).(*atomic.Value).Store(logger) - return nil + return err } func usesProductionLoggingConfig(c *cli.Context) bool { - return strings.EqualFold("JSON", c.String("log-format")) + return strings.EqualFold("JSON", c.String(newLogFormatFlag().Name)) } -func newZapLogger(name string, debug bool, useProductionConfig bool) logr.Logger { +func newZapLogger(name string, verbosityLevel int, useProductionConfig bool) (logr.Logger, error) { cfg := zap.NewDevelopmentConfig() cfg.EncoderConfig.ConsoleSeparator = " | " if useProductionConfig { cfg = zap.NewProductionConfig() } - if debug { + if verbosityLevel > 0 { // Zap's levels get more verbose as the number gets smaller, // bug logr's level increases with greater numbers. - cfg.Level = zap.NewAtomicLevelAt(zapcore.Level(-2)) // max logger.V(2) + cfg.Level = zap.NewAtomicLevelAt(zapcore.Level(verbosityLevel * -1)) } else { cfg.Level = zap.NewAtomicLevelAt(zapcore.InfoLevel) } z, err := cfg.Build() - zap.ReplaceGlobals(z) if err != nil { - log.Fatalf("error configuring the logging stack") + return logr.Discard(), fmt.Errorf("error configuring the logging stack: %w", err) } + zap.ReplaceGlobals(z) logger := zapr.NewLogger(z).WithName(name) if useProductionConfig { // Append the version to each log so that logging stacks like EFK/Loki can correlate errors with specific versions. - return logger.WithValues("version", version) + return logger.WithValues("version", version), nil } - return logger + return logger, nil } diff --git a/main.go b/main.go index ea362d9a..e0277dcc 100644 --- a/main.go +++ b/main.go @@ -9,13 +9,8 @@ import ( "syscall" "time" - "github.com/ccremer/clustercode/api/v1alpha1" "github.com/go-logr/logr" "github.com/urfave/cli/v2" - batchv1 "k8s.io/api/batch/v1" - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" ) var ( @@ -31,6 +26,11 @@ var ( envPrefix = "CC_" ) +func init() { + // Remove `-v` short option from --version flag + cli.VersionFlag.(*cli.BoolFlag).Aliases = nil +} + func main() { ctx, stop, app := newApp() defer stop() @@ -38,6 +38,7 @@ func main() { // If required flags aren't set, it will return with error before we could set up logging if err != nil { _, _ = fmt.Fprintf(os.Stderr, "%v\n", err) + stop() os.Exit(1) } @@ -54,18 +55,12 @@ func newApp() (context.Context, context.CancelFunc, *cli.App) { Before: setupLogging, Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "debug", - Aliases: []string{"verbose", "d"}, - Usage: "sets the log level to debug", - EnvVars: envVars("DEBUG"), - }, - &cli.StringFlag{ - Name: "log-format", - Usage: "sets the log format (values: [json, console])", - EnvVars: envVars("LOG_FORMAT"), - DefaultText: "console", + &cli.IntFlag{ + Name: "log-level", Aliases: []string{"v"}, EnvVars: envVars("LOG_LEVEL"), + Usage: "number of the log level verbosity", + Value: 0, }, + newLogFormatFlag(), }, Commands: []*cli.Command{ newOperatorCommand(), @@ -112,14 +107,3 @@ func envVars(suffixes ...string) []string { } return arr } - -var ( - scheme = runtime.NewScheme() -) - -func registerScheme() { - - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) - utilruntime.Must(batchv1.AddToScheme(scheme)) - utilruntime.Must(v1alpha1.AddToScheme(scheme)) -} diff --git a/operator_command.go b/operator_command.go index 5b4357f4..a3ce5af3 100644 --- a/operator_command.go +++ b/operator_command.go @@ -1,121 +1,46 @@ package main import ( - "context" - "time" - - "github.com/ccremer/clustercode/api" "github.com/ccremer/clustercode/pkg/operator" - "github.com/ccremer/clustercode/pkg/operator/controllers" - pipeline "github.com/ccremer/go-command-pipeline" + "github.com/ccremer/clustercode/pkg/operator/blueprintcontroller" + "github.com/ccremer/clustercode/pkg/operator/taskcontroller" "github.com/urfave/cli/v2" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/leaderelection/resourcelock" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/manager" ) -type operatorCommand struct { - manager manager.Manager - kubeconfig *rest.Config - - LeaderElectionEnabled bool - WebhookCertDir string - FfmpegImage string -} - -var operatorCommandName = "operator" - func newOperatorCommand() *cli.Command { - command := &operatorCommand{} + command := &operator.Command{} return &cli.Command{ - Name: operatorCommandName, + Name: "operator", Usage: "Start provider in operator mode", Before: LogMetadata, - Action: command.execute, + Action: func(ctx *cli.Context) error { + command.Log = AppLogger(ctx).WithName(ctx.Command.Name) + blueprintcontroller.ScanRoleKind = ctx.String(newScanRoleKindFlag().Name) + return command.Execute(ctx.Context) + }, Flags: []cli.Flag{ &cli.BoolFlag{Name: "leader-election-enabled", Value: false, EnvVars: envVars("LEADER_ELECTION_ENABLED"), Usage: "Use leader election for the controller manager.", Destination: &command.LeaderElectionEnabled, Category: "Operator", }, - &cli.StringFlag{Name: "webhook-tls-cert-dir", EnvVars: envVars("WEBHOOK_TLS_CERT_DIR"), - Usage: "Directory containing the certificates for the webhook server. If empty, the webhook server is not started.", - Destination: &command.WebhookCertDir, - Category: "Operator", - }, &cli.StringFlag{Name: "clustercode-image", EnvVars: envVars("CLUSTERCODE_IMAGE"), Usage: "Container image to be used when launching Clustercode jobs.", - Destination: &controllers.DefaultClusterCodeContainerImage, + Destination: &blueprintcontroller.DefaultClusterCodeContainerImage, Category: "Encoding", Required: true, }, &cli.StringFlag{Name: "ffmpeg-image", EnvVars: envVars("FFMPEG_IMAGE"), Usage: "Container image to be used when launching Ffmpeg jobs.", - Destination: &controllers.DefaultFfmpegContainerImage, + Destination: &taskcontroller.DefaultFfmpegContainerImage, Category: "Encoding", Required: true, }, newScanRoleKindFlag(), &cli.StringFlag{Name: "scan-role-name", EnvVars: envVars("SCAN_ROLE_NAME"), Usage: "TODO", - Value: "clustercode-editor-role", - Destination: &controllers.ScanRoleName, + Value: "clustercode-edit", + Destination: &blueprintcontroller.ScanRoleName, Category: "Encoding", }, }, } } - -func (c *operatorCommand) execute(ctx *cli.Context) error { - controllers.ScanRoleKind = ctx.String(newScanRoleKindFlag().Name) - log := AppLogger(ctx).WithName(operatorCommandName) - log.Info("Setting up controllers", "config", c) - ctrl.SetLogger(log) - - p := pipeline.NewPipeline[context.Context]() - p.WithSteps( - p.NewStep("get config", func(ctx context.Context) error { - cfg, err := ctrl.GetConfig() - c.kubeconfig = cfg - return err - }), - p.NewStep("create manager", func(ctx context.Context) error { - // configure client-side throttling - c.kubeconfig.QPS = 100 - c.kubeconfig.Burst = 150 // more Openshift friendly - - mgr, err := ctrl.NewManager(c.kubeconfig, ctrl.Options{ - // controller-runtime uses both ConfigMaps and Leases for leader election by default. - // Leases expire after 15 seconds, with a 10-second renewal deadline. - // We've observed leader loss due to renewal deadlines being exceeded when under high load - i.e. - // hundreds of reconciles per second and ~200rps to the API server. - // Switching to Leases only and longer leases appears to alleviate this. - LeaderElection: c.LeaderElectionEnabled, - LeaderElectionID: "leader-election-" + appName, - LeaderElectionResourceLock: resourcelock.LeasesResourceLock, - LeaseDuration: func() *time.Duration { d := 60 * time.Second; return &d }(), - RenewDeadline: func() *time.Duration { d := 50 * time.Second; return &d }(), - }) - c.manager = mgr - return err - }), - p.NewStep("register schemes", func(ctx context.Context) error { - return api.AddToScheme(c.manager.GetScheme()) - }), - p.NewStep("setup controllers", func(ctx context.Context) error { - return operator.SetupControllers(c.manager) - }), - p.When(pipeline.Bool[context.Context](c.WebhookCertDir != ""), "setup webhook server", - func(ctx context.Context) error { - ws := c.manager.GetWebhookServer() - ws.CertDir = c.WebhookCertDir - ws.TLSMinVersion = "1.3" - return operator.SetupWebhooks(c.manager) - }), - p.NewStep("run manager", func(ctx context.Context) error { - log.Info("Starting manager") - return c.manager.Start(ctx) - }), - ) - - return p.RunWithContext(ctx.Context) -} diff --git a/package/rbac/role.yaml b/package/rbac/role.yaml index e9f68cc5..d8a3888b 100644 --- a/package/rbac/role.yaml +++ b/package/rbac/role.yaml @@ -52,6 +52,7 @@ rules: - apiGroups: - clustercode.github.io resources: + - blueprints/finalizers - blueprints/status verbs: - get @@ -72,6 +73,7 @@ rules: - apiGroups: - clustercode.github.io resources: + - tasks/finalizers - tasks/status verbs: - get @@ -107,6 +109,7 @@ rules: - delete - get - list + - watch - apiGroups: - rbac.authorization.k8s.io resources: @@ -117,3 +120,4 @@ rules: - delete - get - list + - watch diff --git a/api/generate.go b/pkg/api/generate.go similarity index 57% rename from api/generate.go rename to pkg/api/generate.go index a9fff20e..74f0c655 100644 --- a/api/generate.go +++ b/pkg/api/generate.go @@ -1,12 +1,12 @@ //go:build generate // Remove existing manifests -//go:generate rm -rf ../package/crds ../package/webhook ../package/rbac +//go:generate rm -rf ../../package/crds ../../package/webhook ../../package/rbac // Generate deepcopy methodsets and CRD manifests -//go:generate go run -tags generate sigs.k8s.io/controller-tools/cmd/controller-gen object:headerFile=../.github/boilerplate.go.txt paths=./... crd:crdVersions=v1 output:artifacts:config=../package/crds +//go:generate go run -tags generate sigs.k8s.io/controller-tools/cmd/controller-gen object:headerFile=../../.github/boilerplate.go.txt paths=./... crd:crdVersions=v1 output:artifacts:config=../../package/crds // Generate webhook manifests -//go:generate go run -tags generate sigs.k8s.io/controller-tools/cmd/controller-gen webhook paths=./... output:artifacts:config=../package/webhook +//go:generate go run -tags generate sigs.k8s.io/controller-tools/cmd/controller-gen webhook paths=./... output:artifacts:config=../../package/webhook package api diff --git a/api/init.go b/pkg/api/init.go similarity index 91% rename from api/init.go rename to pkg/api/init.go index ff6a45da..0f377f91 100644 --- a/api/init.go +++ b/pkg/api/init.go @@ -2,7 +2,7 @@ package api import ( - "github.com/ccremer/clustercode/api/v1alpha1" + "github.com/ccremer/clustercode/pkg/api/v1alpha1" "k8s.io/apimachinery/pkg/runtime" ) diff --git a/api/v1alpha1/blueprint_types.go b/pkg/api/v1alpha1/blueprint_types.go similarity index 100% rename from api/v1alpha1/blueprint_types.go rename to pkg/api/v1alpha1/blueprint_types.go diff --git a/api/v1alpha1/common.go b/pkg/api/v1alpha1/common.go similarity index 100% rename from api/v1alpha1/common.go rename to pkg/api/v1alpha1/common.go diff --git a/api/v1alpha1/groupversion_info.go b/pkg/api/v1alpha1/groupversion_info.go similarity index 100% rename from api/v1alpha1/groupversion_info.go rename to pkg/api/v1alpha1/groupversion_info.go diff --git a/api/v1alpha1/task_types.go b/pkg/api/v1alpha1/task_types.go similarity index 100% rename from api/v1alpha1/task_types.go rename to pkg/api/v1alpha1/task_types.go diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/pkg/api/v1alpha1/zz_generated.deepcopy.go similarity index 100% rename from api/v1alpha1/zz_generated.deepcopy.go rename to pkg/api/v1alpha1/zz_generated.deepcopy.go diff --git a/pkg/cleanupcmd/run.go b/pkg/cleanupcmd/run.go new file mode 100644 index 00000000..555c0105 --- /dev/null +++ b/pkg/cleanupcmd/run.go @@ -0,0 +1,111 @@ +package cleanupcmd + +import ( + "context" + "os" + "path/filepath" + + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + "github.com/ccremer/clustercode/pkg/internal/pipe" + internaltypes "github.com/ccremer/clustercode/pkg/internal/types" + pipeline "github.com/ccremer/go-command-pipeline" + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type Command struct { + Log logr.Logger + + SourceRootDir string + Namespace string + TaskName string +} + +type commandContext struct { + context.Context + dependencyResolver pipeline.DependencyResolver[*commandContext] + + kube client.Client + task *v1alpha1.Task + intermediaryFiles []string +} + +// Execute runs the command and returns an error, if any. +func (c *Command) Execute(ctx context.Context) error { + + pctx := &commandContext{ + dependencyResolver: pipeline.NewDependencyRecorder[*commandContext](), + Context: ctx, + } + + p := pipeline.NewPipeline[*commandContext]().WithBeforeHooks(pipe.DebugLogger(pctx), pctx.dependencyResolver.Record) + p.WithSteps( + p.NewStep("create client", c.createClient), + p.NewStep("fetch task", c.fetchTask), + p.NewStep("list intermediary files", c.listIntermediaryFiles), + p.NewStep("delete intermediary files", c.deleteFiles), + p.NewStep("delete source file", c.deleteSourceFile), + p.NewStep("delete task", c.deleteTask), + ) + + return p.RunWithContext(pctx) +} + +func (c *Command) createClient(ctx *commandContext) error { + kube, err := pipe.NewKubeClient(ctx) + ctx.kube = kube + return err +} + +func (c *Command) fetchTask(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.createClient) + log := c.getLogger() + + task := &v1alpha1.Task{} + if err := ctx.kube.Get(ctx, types.NamespacedName{Namespace: c.Namespace, Name: c.TaskName}, task); err != nil { + return err + } + ctx.task = task + log.Info("fetched task") + return nil +} + +func (c *Command) listIntermediaryFiles(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.fetchTask) + + intermediaryFiles, err := filepath.Glob(filepath.Join(c.SourceRootDir, internaltypes.IntermediateSubMountPath, ctx.task.Spec.TaskId.String()+"*")) + ctx.intermediaryFiles = intermediaryFiles + return err +} + +func (c *Command) deleteFiles(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.listIntermediaryFiles) + log := c.getLogger() + + for _, file := range ctx.intermediaryFiles { + log.Info("deleting file", "file", file) + if err := os.Remove(file); err != nil { + log.Info("could not delete file, ignoring", "file", file, "error", err.Error()) + } + } + return nil +} + +func (c *Command) deleteSourceFile(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.fetchTask) + log := c.getLogger() + + sourceFile := filepath.Join(c.SourceRootDir, internaltypes.SourceSubMountPath, ctx.task.Spec.SourceUrl.GetPath()) + log.Info("deleting file", "file", sourceFile) + return os.Remove(sourceFile) +} + +func (c *Command) deleteTask(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.createClient, c.fetchTask) + return ctx.kube.Delete(ctx.Context, ctx.task) +} + +func (c *Command) getLogger() logr.Logger { + return c.Log.WithValues("task_name", c.TaskName, "namespace", c.Namespace) +} diff --git a/pkg/countcmd/run.go b/pkg/countcmd/run.go new file mode 100644 index 00000000..b4e1a8e8 --- /dev/null +++ b/pkg/countcmd/run.go @@ -0,0 +1,160 @@ +package countcmd + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sort" + "strings" + + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + "github.com/ccremer/clustercode/pkg/internal/pipe" + internaltypes "github.com/ccremer/clustercode/pkg/internal/types" + pipeline "github.com/ccremer/go-command-pipeline" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +type Command struct { + Log logr.Logger + + SourceRootDir string + Namespace string + TaskName string +} + +type commandContext struct { + context.Context + dependencyResolver pipeline.DependencyResolver[*commandContext] + + kube client.Client + task *v1alpha1.Task + segmentFiles []string +} + +// Execute runs the command and returns an error, if any. +func (c *Command) Execute(ctx context.Context) error { + + pctx := &commandContext{ + dependencyResolver: pipeline.NewDependencyRecorder[*commandContext](), + Context: ctx, + } + + p := pipeline.NewPipeline[*commandContext]().WithBeforeHooks(pipe.DebugLogger(pctx), pctx.dependencyResolver.Record) + p.WithSteps( + p.NewStep("create client", c.createClient), + p.NewStep("fetch task", c.fetchTask), + p.NewStep("scan segment files", c.scanSegmentFiles), + p.NewStep("create file list", c.ensureConfigMap), + p.NewStep("update task", c.updateTask), + ) + + return p.RunWithContext(pctx) +} + +func (c *Command) createClient(ctx *commandContext) error { + kube, err := pipe.NewKubeClient(ctx) + ctx.kube = kube + return err +} + +func (c *Command) fetchTask(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.createClient) + log := c.getLogger() + + task := &v1alpha1.Task{} + if err := ctx.kube.Get(ctx, types.NamespacedName{Namespace: c.Namespace, Name: c.TaskName}, task); err != nil { + return err + } + ctx.task = task + log.Info("fetched task") + return nil +} + +func (c *Command) scanSegmentFiles(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.fetchTask) + log := c.getLogger() + + prefix := ctx.task.Spec.TaskId.String() + "_" + files := make([]string, 0) + root := filepath.Join(c.SourceRootDir, internaltypes.IntermediateSubMountPath) + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + // could not access file, let's prevent a panic + return err + } + if info.IsDir() { + return nil + } + if !matchesTaskSegment(path, prefix) { + return nil + } + files = append(files, path) + return nil + }) + if len(files) <= 0 { + return fmt.Errorf("could not find any segments in '%s", root) + } + sort.Strings(files) + ctx.segmentFiles = files + log.Info("found segments", "count", len(files)) + return err +} + +func matchesTaskSegment(path string, prefix string) bool { + base := filepath.Base(path) + return strings.HasPrefix(base, prefix) && !strings.Contains(base, v1alpha1.MediaFileDoneSuffix) +} + +func (c *Command) ensureConfigMap(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.createClient, c.fetchTask, c.scanSegmentFiles) + log := c.getLogger() + + task := ctx.task + cm := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{ + Name: task.Spec.FileListConfigMapRef, + Namespace: task.Namespace}} + + op, err := controllerutil.CreateOrUpdate(ctx, ctx.kube, cm, func() error { + cm.Labels = labels.Merge(cm.Labels, labels.Merge(internaltypes.ClusterCodeLabels, task.Spec.TaskId.AsLabels())) + + fileList := make([]string, len(ctx.segmentFiles)) + for i, file := range ctx.segmentFiles { + fileList[i] = fmt.Sprintf("file '%s'", file) + } + data := strings.Join(fileList, "\n") + "\n" + cm.Data = map[string]string{ + v1alpha1.ConfigMapFileName: data, + } + return controllerutil.SetOwnerReference(task, cm, ctx.kube.Scheme()) + }) + if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated { + log.Info("Updated config map", "configmap", cm.Name) + } + return err +} + +func (c *Command) updateTask(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.createClient) + log := c.getLogger() + + task := ctx.task + op, err := controllerutil.CreateOrPatch(ctx, ctx.kube, task, func() error { + task.Spec.SlicesPlannedCount = len(ctx.segmentFiles) + return nil + }) + if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated { + log.Info("Updated task") + } + return err +} + +func (c *Command) getLogger() logr.Logger { + return c.Log.WithValues("task_name", c.TaskName, "namespace", c.Namespace) +} diff --git a/pkg/internal/pipe/debuglogger.go b/pkg/internal/pipe/debuglogger.go new file mode 100644 index 00000000..9636042d --- /dev/null +++ b/pkg/internal/pipe/debuglogger.go @@ -0,0 +1,18 @@ +package pipe + +import ( + "context" + + pipeline "github.com/ccremer/go-command-pipeline" + controllerruntime "sigs.k8s.io/controller-runtime" +) + +// DebugLogger returns a list with a single hook that logs the step name. +// The logger is retrieved from the given context. +func DebugLogger[T context.Context](ctx T) pipeline.Listener[T] { + log := controllerruntime.LoggerFrom(ctx) + hook := func(step pipeline.Step[T]) { + log.V(2).Info(`Entering step "` + step.Name + `"`) + } + return hook +} diff --git a/pkg/internal/pipe/kubeclient.go b/pkg/internal/pipe/kubeclient.go new file mode 100644 index 00000000..3bbfc740 --- /dev/null +++ b/pkg/internal/pipe/kubeclient.go @@ -0,0 +1,62 @@ +package pipe + +import ( + "context" + "fmt" + + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + pipeline "github.com/ccremer/go-command-pipeline" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/runtime" + kubernetesscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type kubeContext struct { + context.Context + + kubeconfig *rest.Config + kube client.Client + scheme *runtime.Scheme +} + +// NewKubeClient creates a new client.Client using in-cluster config. +func NewKubeClient(ctx context.Context) (client.Client, error) { + pctx := &kubeContext{Context: ctx} + p := pipeline.NewPipeline[*kubeContext]().WithBeforeHooks(DebugLogger[*kubeContext](pctx)) + p.WithSteps( + p.NewStep("register schemes", registerSchemesFn), + p.NewStep("load kube config", loadKubeConfigFn), + p.NewStep("create client", createClientFn), + ) + err := p.RunWithContext(pctx) + if err != nil { + return nil, fmt.Errorf("cannot instantiate new kubernetes client: %w", err) + } + return pctx.kube, nil +} + +var createClientFn = func(ctx *kubeContext) error { + kube, err := client.New(ctx.kubeconfig, client.Options{Scheme: ctx.scheme}) + ctx.kube = kube + return err +} + +var registerSchemesFn = func(ctx *kubeContext) error { + ctx.scheme = runtime.NewScheme() + b := &runtime.SchemeBuilder{} + b.Register( + kubernetesscheme.AddToScheme, + batchv1.AddToScheme, + v1alpha1.AddToScheme, + ) + return b.AddToScheme(ctx.scheme) +} + +var loadKubeConfigFn = func(ctx *kubeContext) error { + clientConfig, err := controllerruntime.GetConfig() + ctx.kubeconfig = clientConfig + return err +} diff --git a/pkg/internal/types/types.go b/pkg/internal/types/types.go new file mode 100644 index 00000000..7ece41c9 --- /dev/null +++ b/pkg/internal/types/types.go @@ -0,0 +1,48 @@ +package types + +import ( + "k8s.io/apimachinery/pkg/labels" +) + +var ( + ClusterCodeLabels = labels.Set{ + "app.kubernetes.io/managed-by": "clustercode", + } +) + +type ( + ClusterCodeJobType string +) + +const ( + SourceSubMountPath = "source" + TargetSubMountPath = "target" + IntermediateSubMountPath = "intermediate" + ConfigSubMountPath = "config" + + ClustercodeTypeLabelKey = "clustercode.github.io/type" + ClustercodeSliceIndexLabelKey = "clustercode.github.io/slice-index" + + JobTypeScan ClusterCodeJobType = "scan" + JobTypeSplit ClusterCodeJobType = "split" + JobTypeSlice ClusterCodeJobType = "slice" + JobTypeCount ClusterCodeJobType = "count" + JobTypeMerge ClusterCodeJobType = "merge" + JobTypeCleanup ClusterCodeJobType = "cleanup" +) + +var ( + JobTypes = []ClusterCodeJobType{ + JobTypeScan, JobTypeSplit, JobTypeCount, JobTypeSlice, + JobTypeMerge, JobTypeCleanup} +) + +func (t ClusterCodeJobType) AsLabels() labels.Set { + return labels.Set{ + ClustercodeTypeLabelKey: string(t), + } +} + +func (t ClusterCodeJobType) String() string { + return string(t) +} diff --git a/pkg/internal/utils/utils.go b/pkg/internal/utils/utils.go new file mode 100644 index 00000000..cf6a4a82 --- /dev/null +++ b/pkg/internal/utils/utils.go @@ -0,0 +1,66 @@ +package utils + +import ( + "strings" + + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + v13 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" +) + +func EnsurePVCVolume(job *v1.Job, name, podMountRoot string, volume v1alpha1.ClusterCodeVolumeRef) { + found := false + for _, container := range job.Spec.Template.Spec.Containers { + if HasVolumeMount(name, container) { + found = true + break + } + } + if found { + return + } + job.Spec.Template.Spec.Containers[0].VolumeMounts = append(job.Spec.Template.Spec.Containers[0].VolumeMounts, + corev1.VolumeMount{Name: name, MountPath: podMountRoot, SubPath: volume.SubPath}) + job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, corev1.Volume{ + Name: name, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: volume.ClaimName, + }, + }}) +} + +func HasVolumeMount(name string, container corev1.Container) bool { + found := false + for _, mount := range container.VolumeMounts { + if mount.Name == name { + found = true + break + } + } + return found +} + +func GetOwner(obj v13.Object) types.NamespacedName { + for _, owner := range obj.GetOwnerReferences() { + if pointer.BoolPtrDerefOr(owner.Controller, false) { + return types.NamespacedName{Namespace: obj.GetNamespace(), Name: owner.Name} + } + } + return types.NamespacedName{} +} + +func MergeArgsAndReplaceVariables(variables map[string]string, argsList ...[]string) (merged []string) { + for _, args := range argsList { + for _, arg := range args { + for k, v := range variables { + arg = strings.ReplaceAll(arg, k, v) + } + merged = append(merged, arg) + } + } + return merged +} diff --git a/pkg/internal/utils/utils_test.go b/pkg/internal/utils/utils_test.go new file mode 100644 index 00000000..d4b585bf --- /dev/null +++ b/pkg/internal/utils/utils_test.go @@ -0,0 +1 @@ +package utils diff --git a/pkg/operator/blueprintcontroller/controller.go b/pkg/operator/blueprintcontroller/controller.go new file mode 100644 index 00000000..272f57c7 --- /dev/null +++ b/pkg/operator/blueprintcontroller/controller.go @@ -0,0 +1,185 @@ +package blueprintcontroller + +import ( + "context" + "path/filepath" + + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + "github.com/ccremer/clustercode/pkg/internal/pipe" + internaltypes "github.com/ccremer/clustercode/pkg/internal/types" + pipeline "github.com/ccremer/go-command-pipeline" + "github.com/go-logr/logr" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/utils/pointer" + "k8s.io/utils/strings" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +var ScanRoleName = "clustercode-edit" +var ScanRoleKind = "ClusterRole" +var DefaultClusterCodeContainerImage string + +// BlueprintProvisioner reconciles Blueprint objects +type BlueprintProvisioner struct { + client client.Client + Log logr.Logger +} + +// BlueprintContext holds the parameters of a single reconciliation +type BlueprintContext struct { + context.Context + blueprint *v1alpha1.Blueprint + log logr.Logger +} + +func (r *BlueprintProvisioner) NewObject() *v1alpha1.Blueprint { + return &v1alpha1.Blueprint{} +} + +func (r *BlueprintProvisioner) Provision(ctx context.Context, obj *v1alpha1.Blueprint) (reconcile.Result, error) { + + pctx := &BlueprintContext{ + blueprint: obj, + Context: ctx, + } + + p := pipeline.NewPipeline[*BlueprintContext]().WithBeforeHooks(pipe.DebugLogger(pctx)) + p.WithSteps( + p.NewStep("ensure service account", r.ensureServiceAccount), + p.NewStep("ensure role binding", r.ensureRoleBinding), + p.NewStep("ensure cron job", r.ensureCronJob), + ) + return reconcile.Result{}, p.RunWithContext(pctx) +} + +func (r *BlueprintProvisioner) Deprovision(_ context.Context, _ *v1alpha1.Blueprint) (reconcile.Result, error) { + return reconcile.Result{}, nil +} + +func (r *BlueprintProvisioner) ensureServiceAccount(ctx *BlueprintContext) error { + sa := &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{ + Name: ctx.blueprint.GetServiceAccountName(), + Namespace: ctx.blueprint.Namespace, + }} + + _, err := controllerutil.CreateOrUpdate(ctx, r.client, sa, func() error { + sa.Labels = labels.Merge(sa.Labels, internaltypes.ClusterCodeLabels) + return controllerutil.SetOwnerReference(ctx.blueprint, sa, r.client.Scheme()) + }) + return err +} + +func (r *BlueprintProvisioner) ensureRoleBinding(ctx *BlueprintContext) error { + saName := ctx.blueprint.GetServiceAccountName() + roleBinding := &rbacv1.RoleBinding{ObjectMeta: metav1.ObjectMeta{ + Name: strings.ShortenString(saName, 51) + "-rolebinding", + Namespace: ctx.blueprint.Namespace, + }} + + _, err := controllerutil.CreateOrUpdate(ctx, r.client, roleBinding, func() error { + roleBinding.Labels = labels.Merge(roleBinding.Labels, internaltypes.ClusterCodeLabels) + roleBinding.Subjects = []rbacv1.Subject{{ + Kind: "ServiceAccount", + Namespace: ctx.blueprint.Namespace, + Name: saName, + }} + // Don't change existing kind or role name if already existing + kind := roleBinding.RoleRef.Kind + if kind == "" { + kind = ScanRoleKind + } + roleName := roleBinding.RoleRef.Name + if roleName == "" { + roleName = ScanRoleName + } + roleBinding.RoleRef = rbacv1.RoleRef{Kind: kind, Name: roleName, APIGroup: rbacv1.GroupName} + return controllerutil.SetOwnerReference(ctx.blueprint, roleBinding, r.client.Scheme()) + }) + return err +} + +func (r *BlueprintProvisioner) ensureCronJob(ctx *BlueprintContext) error { + cronJob := &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: ctx.blueprint.Name + "-scan-job", + Namespace: ctx.blueprint.Namespace, + }, + } + + _, err := controllerutil.CreateOrUpdate(ctx, r.client, cronJob, func() error { + cronJob.Labels = labels.Merge(cronJob.Labels, labels.Merge(internaltypes.ClusterCodeLabels, internaltypes.JobTypeScan.AsLabels())) + cronJob.Spec = batchv1.CronJobSpec{ + Schedule: ctx.blueprint.Spec.ScanSchedule, + ConcurrencyPolicy: batchv1.ForbidConcurrent, + SuccessfulJobsHistoryLimit: pointer.Int32Ptr(1), + FailedJobsHistoryLimit: pointer.Int32Ptr(1), + Suspend: pointer.Bool(ctx.blueprint.Spec.Suspend), + + JobTemplate: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + BackoffLimit: pointer.Int32Ptr(0), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + ServiceAccountName: ctx.blueprint.GetServiceAccountName(), + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{{ + Name: "scanner", + Env: []corev1.EnvVar{ + { + Name: "CC_LOG_DEBUG", + Value: "true", + }, + }, + Args: []string{ + "scan", + "--namespace=" + ctx.blueprint.Namespace, + "--blueprint-name=" + ctx.blueprint.Name, + }, + Image: DefaultClusterCodeContainerImage, + ImagePullPolicy: corev1.PullIfNotPresent, + VolumeMounts: []corev1.VolumeMount{ + { + Name: internaltypes.SourceSubMountPath, + MountPath: filepath.Join("/clustercode", internaltypes.SourceSubMountPath), + SubPath: ctx.blueprint.Spec.Storage.SourcePvc.SubPath, + }, + { + Name: internaltypes.IntermediateSubMountPath, + MountPath: filepath.Join("/clustercode", internaltypes.IntermediateSubMountPath), + SubPath: ctx.blueprint.Spec.Storage.SourcePvc.SubPath, + }, + }, + }}, + Volumes: []corev1.Volume{ + { + Name: internaltypes.SourceSubMountPath, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: ctx.blueprint.Spec.Storage.SourcePvc.ClaimName, + }, + }, + }, + { + Name: internaltypes.IntermediateSubMountPath, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: ctx.blueprint.Spec.Storage.IntermediatePvc.ClaimName, + }, + }, + }, + }, + }, + }, + }, + }, + } + return controllerutil.SetOwnerReference(ctx.blueprint, cronJob, r.client.Scheme()) + }) + return err +} diff --git a/pkg/operator/blueprintcontroller/setup.go b/pkg/operator/blueprintcontroller/setup.go new file mode 100644 index 00000000..acd40abd --- /dev/null +++ b/pkg/operator/blueprintcontroller/setup.go @@ -0,0 +1,32 @@ +package blueprintcontroller + +import ( + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + "github.com/ccremer/clustercode/pkg/operator/reconciler" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// +kubebuilder:rbac:groups=clustercode.github.io,resources=blueprints,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=clustercode.github.io,resources=blueprints/status;blueprints/finalizers,verbs=get;update;patch +// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=batch,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=batch,resources=cronjobs/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;watch;create;delete +// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=roles;rolebindings,verbs=get;list;watch;create;delete + +// SetupBlueprintController adds a controller that reconciles managed resources. +func SetupBlueprintController(mgr ctrl.Manager) error { + name := "blueprint.clustercode.github.io" + + controller := reconciler.NewReconciler[*v1alpha1.Blueprint](mgr.GetClient(), &BlueprintProvisioner{ + Log: mgr.GetLogger(), + client: mgr.GetClient(), + }) + + return ctrl.NewControllerManagedBy(mgr). + Named(name). + For(&v1alpha1.Blueprint{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Complete(controller) +} diff --git a/pkg/operator/command.go b/pkg/operator/command.go new file mode 100644 index 00000000..c2b35f72 --- /dev/null +++ b/pkg/operator/command.go @@ -0,0 +1,74 @@ +package operator + +import ( + "context" + "time" + + "github.com/ccremer/clustercode/pkg/api" + pipeline "github.com/ccremer/go-command-pipeline" + "github.com/go-logr/logr" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection/resourcelock" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +type Command struct { + Log logr.Logger + + LeaderElectionEnabled bool +} + +type commandContext struct { + context.Context + manager manager.Manager + kubeconfig *rest.Config +} + +func (c *Command) Execute(ctx context.Context) error { + log := c.Log + log.Info("Setting up controllers", "config", c) + controllerruntime.SetLogger(log) + + pctx := &commandContext{Context: ctx} + p := pipeline.NewPipeline[*commandContext]() + p.WithSteps( + p.NewStep("get config", func(ctx *commandContext) error { + cfg, err := controllerruntime.GetConfig() + ctx.kubeconfig = cfg + return err + }), + p.NewStep("create manager", func(ctx *commandContext) error { + // configure client-side throttling + ctx.kubeconfig.QPS = 100 + ctx.kubeconfig.Burst = 150 // more Openshift friendly + + mgr, err := controllerruntime.NewManager(ctx.kubeconfig, controllerruntime.Options{ + // controller-runtime uses both ConfigMaps and Leases for leader election by default. + // Leases expire after 15 seconds, with a 10-second renewal deadline. + // We've observed leader loss due to renewal deadlines being exceeded when under high load - i.e. + // hundreds of reconciles per second and ~200rps to the API server. + // Switching to Leases only and longer leases appears to alleviate this. + LeaderElection: c.LeaderElectionEnabled, + LeaderElectionID: "leader-election-clustercode", + LeaderElectionResourceLock: resourcelock.LeasesResourceLock, + LeaseDuration: func() *time.Duration { d := 60 * time.Second; return &d }(), + RenewDeadline: func() *time.Duration { d := 50 * time.Second; return &d }(), + }) + ctx.manager = mgr + return err + }), + p.NewStep("register schemes", func(ctx *commandContext) error { + return api.AddToScheme(ctx.manager.GetScheme()) + }), + p.NewStep("setup controllers", func(ctx *commandContext) error { + return SetupControllers(ctx.manager) + }), + p.NewStep("run manager", func(ctx *commandContext) error { + log.Info("Starting manager") + return ctx.manager.Start(ctx) + }), + ) + + return p.RunWithContext(pctx) +} diff --git a/pkg/operator/controllers/blueprint_controller.go b/pkg/operator/controllers/blueprint_controller.go deleted file mode 100644 index 4a44fcaa..00000000 --- a/pkg/operator/controllers/blueprint_controller.go +++ /dev/null @@ -1,232 +0,0 @@ -package controllers - -import ( - "context" - "path/filepath" - "time" - - "github.com/ccremer/clustercode/api/v1alpha1" - "github.com/go-logr/logr" - batchv1 "k8s.io/api/batch/v1" - "k8s.io/api/batch/v1beta1" - corev1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/utils/pointer" - "k8s.io/utils/strings" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" -) - -type ( - // BlueprintReconciler reconciles Blueprint objects - BlueprintReconciler struct { - Client client.Client - Log logr.Logger - } - // BlueprintContext holds the parameters of a single reconciliation - BlueprintContext struct { - ctx context.Context - blueprint *v1alpha1.Blueprint - log logr.Logger - } -) - -var ScanRoleKind = "ClusterRole" -var ScanRoleName = "clustercode-edit" -var DefaultClusterCodeContainerImage string - -// +kubebuilder:rbac:groups=clustercode.github.io,resources=blueprints,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=clustercode.github.io,resources=blueprints/status,verbs=get;update;patch -// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=batch,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=batch,resources=cronjobs/status,verbs=get;update;patch -// +kubebuilder:rbac:groups=core,resources=serviceaccounts,verbs=get;list;create;delete -// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=roles;rolebindings,verbs=get;list;create;delete - -func (r *BlueprintReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - rc := &BlueprintContext{ - ctx: ctx, - blueprint: &v1alpha1.Blueprint{}, - } - err := r.Client.Get(ctx, req.NamespacedName, rc.blueprint) - if err != nil { - if apierrors.IsNotFound(err) { - r.Log.Info("object not found, ignoring reconcile", "object", req.NamespacedName) - return ctrl.Result{}, nil - } - r.Log.Error(err, "could not retrieve object", "object", req.NamespacedName) - return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, err - } - rc.log = r.Log.WithValues("blueprint", req.NamespacedName) - r.handleBlueprint(rc) - rc.log.Info("reconciled blueprint") - return ctrl.Result{}, nil -} - -func (r *BlueprintReconciler) handleBlueprint(rc *BlueprintContext) { - - saName, err := r.createServiceAccountAndBinding(rc) - if err != nil { - rc.log.Error(err, "cannot ensure that scanner job have necessary RBAC permissions") - } - - cronJob := v1beta1.CronJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: rc.blueprint.Name + "-scan-job", - Namespace: rc.blueprint.Namespace, - Labels: labels.Merge(ClusterCodeLabels, ClustercodeTypeScan.AsLabels()), - }, - Spec: v1beta1.CronJobSpec{ - Schedule: rc.blueprint.Spec.ScanSchedule, - ConcurrencyPolicy: v1beta1.ForbidConcurrent, - Suspend: &rc.blueprint.Spec.Suspend, - - JobTemplate: v1beta1.JobTemplateSpec{ - Spec: batchv1.JobSpec{ - BackoffLimit: pointer.Int32Ptr(0), - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - ServiceAccountName: saName, - RestartPolicy: corev1.RestartPolicyNever, - Containers: []corev1.Container{ - { - Name: "scanner", - Env: []corev1.EnvVar{ - { - Name: "CC_LOG__DEBUG", - Value: "true", - }, - }, - Args: []string{ - "scan", - "--namespace=" + rc.blueprint.Namespace, - "--blueprint-name=" + rc.blueprint.Name, - }, - Image: DefaultClusterCodeContainerImage, - ImagePullPolicy: corev1.PullIfNotPresent, - VolumeMounts: []corev1.VolumeMount{ - { - Name: SourceSubMountPath, - MountPath: filepath.Join("/clustercode", SourceSubMountPath), - SubPath: rc.blueprint.Spec.Storage.SourcePvc.SubPath, - }, - { - Name: IntermediateSubMountPath, - MountPath: filepath.Join("/clustercode", IntermediateSubMountPath), - SubPath: rc.blueprint.Spec.Storage.SourcePvc.SubPath, - }, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: SourceSubMountPath, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: rc.blueprint.Spec.Storage.SourcePvc.ClaimName, - }, - }, - }, - { - Name: IntermediateSubMountPath, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: rc.blueprint.Spec.Storage.IntermediatePvc.ClaimName, - }, - }, - }, - }, - }, - }, - }, - }, - SuccessfulJobsHistoryLimit: pointer.Int32Ptr(1), - FailedJobsHistoryLimit: pointer.Int32Ptr(1), - }, - } - if err := controllerutil.SetControllerReference(rc.blueprint, cronJob.GetObjectMeta(), r.Client.Scheme()); err != nil { - rc.log.Error(err, "could not set controller reference, deleting the blueprint will not delete the cronjob", "cronjob", cronJob.Name) - } - - if err := r.Client.Create(rc.ctx, &cronJob); err != nil { - if apierrors.IsAlreadyExists(err) { - rc.log.Info("cronjob already exists, updating it") - err = r.Client.Update(rc.ctx, &cronJob) - if err != nil { - rc.log.Error(err, "could not update cronjob") - } - return - } - if !apierrors.IsNotFound(err) { - rc.log.Error(err, "could not create cronjob") - return - } - } else { - rc.log.Info("created cronjob") - } -} - -func (r *BlueprintReconciler) createServiceAccountAndBinding(rc *BlueprintContext) (string, error) { - binding, sa := r.newRbacDefinition(rc) - - err := r.Client.Create(rc.ctx, &sa) - if err != nil { - if !apierrors.IsAlreadyExists(err) { - return sa.Name, err - } - } else { - rc.log.Info("service account created", "sa", sa.Name) - } - err = r.Client.Create(rc.ctx, &binding) - if err != nil { - if !apierrors.IsAlreadyExists(err) { - return sa.Name, err - } - } else { - rc.log.Info("rolebinding created", "roleBinding", binding.Name) - } - return sa.Name, nil -} - -func (r *BlueprintReconciler) newRbacDefinition(rc *BlueprintContext) (rbacv1.RoleBinding, corev1.ServiceAccount) { - saName := rc.blueprint.GetServiceAccountName() - roleBinding := rbacv1.RoleBinding{ - ObjectMeta: metav1.ObjectMeta{ - Name: strings.ShortenString(saName, 51) + "-rolebinding", - Namespace: rc.blueprint.Namespace, - Labels: ClusterCodeLabels, - }, - Subjects: []rbacv1.Subject{ - { - Kind: "ServiceAccount", - Namespace: rc.blueprint.Namespace, - Name: saName, - }, - }, - RoleRef: rbacv1.RoleRef{ - Kind: ScanRoleKind, - Name: ScanRoleName, - APIGroup: rbacv1.GroupName, - }, - } - - account := corev1.ServiceAccount{ - ObjectMeta: metav1.ObjectMeta{ - Name: saName, - Namespace: rc.blueprint.Namespace, - Labels: ClusterCodeLabels, - }, - } - - if err := controllerutil.SetControllerReference(rc.blueprint, roleBinding.GetObjectMeta(), r.Client.Scheme()); err != nil { - rc.log.Error(err, "could not set controller reference on role", "roleBinding", roleBinding.Name) - } - if err := controllerutil.SetControllerReference(rc.blueprint, account.GetObjectMeta(), r.Client.Scheme()); err != nil { - rc.log.Error(err, "could not set controller reference on service account", "sa", account.Name) - } - return roleBinding, account -} diff --git a/pkg/operator/controllers/job_controller.go b/pkg/operator/controllers/job_controller.go deleted file mode 100644 index 0882b561..00000000 --- a/pkg/operator/controllers/job_controller.go +++ /dev/null @@ -1,280 +0,0 @@ -package controllers - -import ( - "context" - "fmt" - "path/filepath" - "strconv" - "time" - - "github.com/ccremer/clustercode/api/v1alpha1" - "github.com/go-logr/logr" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/utils/pointer" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" -) - -type ( - // JobReconciler reconciles Job objects - JobReconciler struct { - Client client.Client - Log logr.Logger - } - // JobContext holds the parameters of a single reconciliation - JobContext struct { - ctx context.Context - job *batchv1.Job - jobType ClusterCodeJobType - task *v1alpha1.Task - log logr.Logger - } -) - -// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete - -func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - rc := &JobContext{ - job: &batchv1.Job{}, - ctx: ctx, - } - err := r.Client.Get(ctx, req.NamespacedName, rc.job) - if err != nil { - if apierrors.IsNotFound(err) { - r.Log.V(1).Info("object not found, ignoring reconcile", "object", req.NamespacedName) - return ctrl.Result{}, nil - } - r.Log.Error(err, "could not retrieve object", "object", req.NamespacedName) - return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, err - } - rc.log = r.Log.WithValues("job", req.NamespacedName) - if rc.job.GetDeletionTimestamp() != nil { - rc.log.V(1).Info("job is being deleted, ignoring reconcile") - return ctrl.Result{}, nil - } - jobType, err := rc.getJobType() - if err != nil { - rc.log.V(1).Info("cannot determine job type, ignoring reconcile", "error", err.Error()) - return ctrl.Result{}, nil - } - conditions := castConditions(rc.job.Status.Conditions) - if !meta.IsStatusConditionPresentAndEqual(conditions, string(batchv1.JobComplete), metav1.ConditionTrue) { - rc.log.V(1).Info("job is not completed yet, ignoring reconcile") - return ctrl.Result{}, nil - } - rc.jobType = jobType - switch jobType { - case ClustercodeTypeSplit: - return ctrl.Result{}, r.handleSplitJob(rc) - case ClustercodeTypeCount: - rc.log.Info("reconciled count job") - case ClustercodeTypeSlice: - rc.log.Info("reconciling slice job") - return ctrl.Result{}, r.handleSliceJob(rc) - case ClustercodeTypeMerge: - rc.log.Info("reconciling merge job") - return ctrl.Result{}, r.handleMergeJob(rc) - } - return ctrl.Result{}, nil -} - -func (r *JobReconciler) handleSplitJob(rc *JobContext) error { - rc.task = &v1alpha1.Task{} - if err := r.Client.Get(rc.ctx, getOwner(rc.job), rc.task); err != nil { - return err - } - - return r.createCountJob(rc) -} - -func (r *JobReconciler) handleSliceJob(rc *JobContext) error { - indexStr, found := rc.job.Labels[ClustercodeSliceIndexLabelKey] - if !found { - return fmt.Errorf("cannot determine slice index, missing label '%s'", ClustercodeSliceIndexLabelKey) - } - index, err := strconv.Atoi(indexStr) - if err != nil { - return fmt.Errorf("cannot determine slice index from label '%s': %w", ClustercodeSliceIndexLabelKey, err) - } - - rc.task = &v1alpha1.Task{} - if err := r.Client.Get(rc.ctx, getOwner(rc.job), rc.task); err != nil { - return err - } - finished := rc.task.Status.SlicesFinished - finished = append(finished, v1alpha1.ClustercodeSliceRef{ - SliceIndex: index, - JobName: rc.job.Name, - }) - rc.task.Status.SlicesFinished = finished - rc.task.Status.SlicesFinishedCount = len(finished) - - var scheduled []v1alpha1.ClustercodeSliceRef - for _, ref := range rc.task.Status.SlicesScheduled { - if ref.SliceIndex != index { - scheduled = append(scheduled, ref) - } - } - rc.task.Status.SlicesScheduled = scheduled - return r.Client.Status().Update(rc.ctx, rc.task) -} - -func (r *JobReconciler) createCountJob(rc *JobContext) error { - - taskId := rc.task.Spec.TaskId - intermediateMountRoot := filepath.Join("/clustercode", IntermediateSubMountPath) - job := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%.*s-%s", 62-len(ClustercodeTypeCount), taskId, ClustercodeTypeCount), - Namespace: rc.job.Namespace, - Labels: labels.Merge(ClusterCodeLabels, labels.Merge(ClustercodeTypeCount.AsLabels(), taskId.AsLabels())), - }, - Spec: batchv1.JobSpec{ - BackoffLimit: pointer.Int32Ptr(0), - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - ServiceAccountName: rc.job.Spec.Template.Spec.ServiceAccountName, - RestartPolicy: corev1.RestartPolicyNever, - Containers: []corev1.Container{ - { - Name: "clustercode", - Image: DefaultClusterCodeContainerImage, - ImagePullPolicy: corev1.PullIfNotPresent, - Args: []string{ - "-d", - "count", - "--task-name=" + rc.task.Name, - "--namespace=" + rc.job.Namespace, - }, - VolumeMounts: []corev1.VolumeMount{ - {Name: IntermediateSubMountPath, MountPath: intermediateMountRoot, SubPath: rc.task.Spec.Storage.IntermediatePvc.SubPath}, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: IntermediateSubMountPath, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: rc.task.Spec.Storage.IntermediatePvc.ClaimName, - }, - }, - }, - }, - }, - }, - }, - } - if err := controllerutil.SetControllerReference(rc.task, job.GetObjectMeta(), r.Client.Scheme()); err != nil { - rc.log.Info("could not set controller reference, deleting the task won't delete the job", "err", err.Error()) - } - if err := r.Client.Create(rc.ctx, job); err != nil { - if apierrors.IsAlreadyExists(err) { - rc.log.Info("skip creating job, it already exists", "job", job.Name) - } else { - rc.log.Error(err, "could not create job", "job", job.Name) - return err - } - } else { - rc.log.Info("job created", "job", job.Name) - } - return nil -} - -func (r *JobReconciler) handleMergeJob(rc *JobContext) error { - rc.task = &v1alpha1.Task{} - if err := r.Client.Get(rc.ctx, getOwner(rc.job), rc.task); err != nil { - return err - } - - return r.createCleanupJob(rc) -} - -func (r *JobReconciler) createCleanupJob(rc *JobContext) error { - - taskId := rc.task.Spec.TaskId - job := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%.*s-%s", 62-len(ClustercodeTypeCleanup), taskId, ClustercodeTypeCleanup), - Namespace: rc.job.Namespace, - Labels: labels.Merge(ClusterCodeLabels, labels.Merge(ClustercodeTypeCleanup.AsLabels(), taskId.AsLabels())), - }, - Spec: batchv1.JobSpec{ - BackoffLimit: pointer.Int32Ptr(0), - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - SecurityContext: &corev1.PodSecurityContext{ - RunAsUser: pointer.Int64Ptr(1000), - RunAsGroup: pointer.Int64Ptr(0), - FSGroup: pointer.Int64Ptr(0), - }, - ServiceAccountName: rc.job.Spec.Template.Spec.ServiceAccountName, - RestartPolicy: corev1.RestartPolicyNever, - Containers: []corev1.Container{ - { - Name: "clustercode", - Image: DefaultClusterCodeContainerImage, - ImagePullPolicy: corev1.PullIfNotPresent, - Args: []string{ - "-d", - "--namespace=" + rc.job.Namespace, - "cleanup", - "--task-name=" + rc.task.Name, - }, - }, - }, - }, - }, - }, - } - addPvcVolume(job, SourceSubMountPath, filepath.Join("/clustercode", SourceSubMountPath), rc.task.Spec.Storage.SourcePvc) - addPvcVolume(job, IntermediateSubMountPath, filepath.Join("/clustercode", IntermediateSubMountPath), rc.task.Spec.Storage.IntermediatePvc) - if err := controllerutil.SetControllerReference(rc.task, job.GetObjectMeta(), r.Client.Scheme()); err != nil { - rc.log.Info("could not set controller reference, deleting the task won't delete the job", "err", err.Error()) - } - if err := r.Client.Create(rc.ctx, job); err != nil { - if apierrors.IsAlreadyExists(err) { - rc.log.Info("skip creating job, it already exists", "job", job.Name) - } else { - rc.log.Error(err, "could not create job", "job", job.Name) - return err - } - } else { - rc.log.Info("job created", "job", job.Name) - } - return nil -} - -func (c JobContext) getJobType() (ClusterCodeJobType, error) { - set := labels.Set(c.job.Labels) - if !set.Has(ClustercodeTypeLabelKey) { - return "", fmt.Errorf("missing label key '%s", ClustercodeTypeLabelKey) - } - label := set.Get(ClustercodeTypeLabelKey) - for _, jobType := range ClustercodeTypes { - if label == string(jobType) { - return jobType, nil - } - } - return "", fmt.Errorf("value of label '%s' unrecognized: %s", ClustercodeTypeLabelKey, label) -} - -func castConditions(conditions []batchv1.JobCondition) (converted []metav1.Condition) { - for _, c := range conditions { - converted = append(converted, metav1.Condition{ - Type: string(c.Type), - Status: metav1.ConditionStatus(c.Status), - LastTransitionTime: c.LastTransitionTime, - Reason: c.Reason, - Message: c.Message, - }) - } - return converted -} diff --git a/pkg/operator/controllers/setup.go b/pkg/operator/controllers/setup.go deleted file mode 100644 index 243a9659..00000000 --- a/pkg/operator/controllers/setup.go +++ /dev/null @@ -1,61 +0,0 @@ -package controllers - -import ( - "github.com/ccremer/clustercode/api/v1alpha1" - batchv1 "k8s.io/api/batch/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/predicate" -) - -// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;create;update - -// SetupBlueprintController adds a controller that reconciles managed resources. -func SetupBlueprintController(mgr ctrl.Manager) error { - name := "blueprint.clustercode.github.io" - - return ctrl.NewControllerManagedBy(mgr). - Named(name). - For(&v1alpha1.Blueprint{}). - WithEventFilter(predicate.GenerationChangedPredicate{}). - Complete(&BlueprintReconciler{ - Client: mgr.GetClient(), - Log: mgr.GetLogger(), - }) -} - -// SetupJobController adds a controller that reconciles managed resources. -func SetupJobController(mgr ctrl.Manager) error { - name := "job.clustercode.github.io" - pred, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{MatchLabels: ClusterCodeLabels}) - if err != nil { - return err - } - return ctrl.NewControllerManagedBy(mgr). - Named(name). - For(&batchv1.Job{}, builder.WithPredicates(pred)). - Complete(&JobReconciler{ - Client: mgr.GetClient(), - Log: mgr.GetLogger(), - }) -} - -// SetupTaskController adds a controller that reconciles managed resources. -func SetupTaskController(mgr ctrl.Manager) error { - name := "task.clustercode.github.io" - - pred, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{MatchLabels: ClusterCodeLabels}) - if err != nil { - return err - } - return ctrl.NewControllerManagedBy(mgr). - Named(name). - For(&v1alpha1.Task{}, builder.WithPredicates(pred)). - //Owns(&batchv1.Job{}). - //WithEventFilter(predicate.GenerationChangedPredicate{}). - Complete(&TaskReconciler{ - Client: mgr.GetClient(), - Log: mgr.GetLogger(), - }) -} diff --git a/pkg/operator/controllers/task_controller.go b/pkg/operator/controllers/task_controller.go deleted file mode 100644 index b0fda72a..00000000 --- a/pkg/operator/controllers/task_controller.go +++ /dev/null @@ -1,217 +0,0 @@ -package controllers - -import ( - "context" - "fmt" - "path/filepath" - "strconv" - "time" - - "github.com/ccremer/clustercode/api/v1alpha1" - "github.com/go-logr/logr" - apierrors "k8s.io/apimachinery/pkg/api/errors" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" -) - -type ( - // TaskReconciler reconciles Task objects - TaskReconciler struct { - Client client.Client - Log logr.Logger - } - // TaskContext holds the parameters of a single reconciliation - TaskContext struct { - ctx context.Context - task *v1alpha1.Task - blueprint *v1alpha1.Blueprint - log logr.Logger - } - TaskOpts struct { - args []string - jobType ClusterCodeJobType - mountSource bool - mountIntermediate bool - mountTarget bool - mountConfig bool - } -) - -// +kubebuilder:rbac:groups=clustercode.github.io,resources=tasks,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=clustercode.github.io,resources=tasks/status,verbs=get;update;patch - -func (r *TaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - rc := &TaskContext{ - ctx: ctx, - task: &v1alpha1.Task{}, - } - err := r.Client.Get(ctx, req.NamespacedName, rc.task) - if err != nil { - if apierrors.IsNotFound(err) { - r.Log.Info("object not found, ignoring reconcile", "object", req.NamespacedName) - return ctrl.Result{}, nil - } - r.Log.Error(err, "could not retrieve object", "object", req.NamespacedName) - return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, err - } - rc.log = r.Log.WithValues("task", req.NamespacedName) - - if err := r.handleTask(rc); err != nil { - rc.log.Error(err, "could not reconcile task") - return ctrl.Result{}, err - } - rc.log.Info("reconciled task") - return ctrl.Result{}, nil -} - -func (r *TaskReconciler) handleTask(rc *TaskContext) error { - if rc.task.Spec.SlicesPlannedCount == 0 { - return r.createSplitJob(rc) - } - - if len(rc.task.Status.SlicesFinished) >= rc.task.Spec.SlicesPlannedCount { - rc.log.Info("no more slices to schedule") - return r.createMergeJob(rc) - } - // Todo: Check condition whether more jobs are needed - nextSliceIndex := r.determineNextSliceIndex(rc) - if nextSliceIndex < 0 { - return nil - } else { - rc.log.Info("scheduling next slice", "index", nextSliceIndex) - return r.createSliceJob(rc, nextSliceIndex) - } -} - -func (r *TaskReconciler) determineNextSliceIndex(rc *TaskContext) int { - status := rc.task.Status - if rc.task.Spec.ConcurrencyStrategy.ConcurrentCountStrategy != nil { - maxCount := rc.task.Spec.ConcurrencyStrategy.ConcurrentCountStrategy.MaxCount - if len(status.SlicesScheduled) >= maxCount { - rc.log.V(1).Info("reached concurrent max count, cannot schedule more", "max", maxCount) - return -1 - } - } - for i := 0; i < rc.task.Spec.SlicesPlannedCount; i++ { - if containsSliceIndex(status.SlicesScheduled, i) { - continue - } - if containsSliceIndex(status.SlicesFinished, i) { - continue - } - return i - } - return -1 -} - -func containsSliceIndex(list []v1alpha1.ClustercodeSliceRef, index int) bool { - for _, t := range list { - if t.SliceIndex == index { - return true - } - } - return false -} - -func (r *TaskReconciler) createSplitJob(rc *TaskContext) error { - sourceMountRoot := filepath.Join("/clustercode", SourceSubMountPath) - intermediateMountRoot := filepath.Join("/clustercode", IntermediateSubMountPath) - variables := map[string]string{ - "${INPUT}": filepath.Join(sourceMountRoot, rc.task.Spec.SourceUrl.GetPath()), - "${OUTPUT}": getSegmentFileNameTemplatePath(rc, intermediateMountRoot), - "${SLICE_SIZE}": strconv.Itoa(rc.task.Spec.EncodeSpec.SliceSize), - } - job := createFfmpegJobDefinition(rc.task, &TaskOpts{ - args: mergeArgsAndReplaceVariables(variables, rc.task.Spec.EncodeSpec.DefaultCommandArgs, rc.task.Spec.EncodeSpec.SplitCommandArgs), - jobType: ClustercodeTypeSplit, - mountSource: true, - mountIntermediate: true, - }) - if err := controllerutil.SetControllerReference(rc.task, job.GetObjectMeta(), r.Client.Scheme()); err != nil { - rc.log.Info("could not set controller reference, deleting the task won't delete the job", "err", err.Error()) - } - if err := r.Client.Create(rc.ctx, job); err != nil { - if apierrors.IsAlreadyExists(err) { - rc.log.Info("skip creating job, it already exists", "job", job.Name) - } else { - rc.log.Error(err, "could not create job", "job", job.Name) - } - } else { - rc.log.Info("job created", "job", job.Name) - } - return nil -} - -func (r *TaskReconciler) createSliceJob(rc *TaskContext, index int) error { - intermediateMountRoot := filepath.Join("/clustercode", IntermediateSubMountPath) - variables := map[string]string{ - "${INPUT}": getSourceSegmentFileNameIndexPath(rc, intermediateMountRoot, index), - "${OUTPUT}": getTargetSegmentFileNameIndexPath(rc, intermediateMountRoot, index), - } - job := createFfmpegJobDefinition(rc.task, &TaskOpts{ - args: mergeArgsAndReplaceVariables(variables, rc.task.Spec.EncodeSpec.DefaultCommandArgs, rc.task.Spec.EncodeSpec.TranscodeCommandArgs), - jobType: ClustercodeTypeSlice, - mountIntermediate: true, - }) - job.Name = fmt.Sprintf("%s-%d", job.Name, index) - job.Labels[ClustercodeSliceIndexLabelKey] = strconv.Itoa(index) - if err := controllerutil.SetControllerReference(rc.task, job.GetObjectMeta(), r.Client.Scheme()); err != nil { - return fmt.Errorf("could not set controller reference: %w", err) - } - if err := r.Client.Create(rc.ctx, job); err != nil { - if apierrors.IsAlreadyExists(err) { - rc.log.Info("skip creating job, it already exists", "job", job.Name) - } else { - rc.log.Error(err, "could not create job", "job", job.Name) - } - } else { - rc.log.Info("job created", "job", job.Name) - } - rc.task.Status.SlicesScheduled = append(rc.task.Status.SlicesScheduled, v1alpha1.ClustercodeSliceRef{ - JobName: job.Name, - SliceIndex: index, - }) - return r.Client.Status().Update(rc.ctx, rc.task) -} - -func (r *TaskReconciler) createMergeJob(rc *TaskContext) error { - configMountRoot := filepath.Join("/clustercode", ConfigSubMountPath) - targetMountRoot := filepath.Join("/clustercode", TargetSubMountPath) - variables := map[string]string{ - "${INPUT}": filepath.Join(configMountRoot, v1alpha1.ConfigMapFileName), - "${OUTPUT}": filepath.Join(targetMountRoot, rc.task.Spec.TargetUrl.GetPath()), - } - job := createFfmpegJobDefinition(rc.task, &TaskOpts{ - args: mergeArgsAndReplaceVariables(variables, rc.task.Spec.EncodeSpec.DefaultCommandArgs, rc.task.Spec.EncodeSpec.MergeCommandArgs), - jobType: ClustercodeTypeMerge, - mountIntermediate: true, - mountTarget: true, - mountConfig: true, - }) - if err := controllerutil.SetControllerReference(rc.task, job.GetObjectMeta(), r.Client.Scheme()); err != nil { - return fmt.Errorf("could not set controller reference: %w", err) - } - if err := r.Client.Create(rc.ctx, job); err != nil { - if apierrors.IsAlreadyExists(err) { - rc.log.Info("skip creating job, it already exists", "job", job.Name) - } else { - rc.log.Error(err, "could not create job", "job", job.Name) - } - } else { - rc.log.Info("job created", "job", job.Name) - } - return nil -} - -func getSegmentFileNameTemplatePath(rc *TaskContext, intermediateMountRoot string) string { - return filepath.Join(intermediateMountRoot, rc.task.Name+"_%d"+filepath.Ext(rc.task.Spec.SourceUrl.GetPath())) -} - -func getSourceSegmentFileNameIndexPath(rc *TaskContext, intermediateMountRoot string, index int) string { - return filepath.Join(intermediateMountRoot, fmt.Sprintf("%s_%d%s", rc.task.Name, index, filepath.Ext(rc.task.Spec.SourceUrl.GetPath()))) -} - -func getTargetSegmentFileNameIndexPath(rc *TaskContext, intermediateMountRoot string, index int) string { - return filepath.Join(intermediateMountRoot, fmt.Sprintf("%s_%d%s%s", rc.task.Name, index, v1alpha1.MediaFileDoneSuffix, filepath.Ext(rc.task.Spec.TargetUrl.GetPath()))) -} diff --git a/pkg/operator/controllers/utils.go b/pkg/operator/controllers/utils.go deleted file mode 100644 index 2deb4e7d..00000000 --- a/pkg/operator/controllers/utils.go +++ /dev/null @@ -1,154 +0,0 @@ -package controllers - -import ( - "fmt" - "path/filepath" - "strings" - - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - "k8s.io/utils/pointer" - - "github.com/ccremer/clustercode/api/v1alpha1" -) - -type ( - ClusterCodeJobType string -) - -var ( - ClusterCodeLabels = labels.Set{ - "app.kubernetes.io/managed-by": "clustercode", - } -) - -const ( - SourceSubMountPath = "source" - TargetSubMountPath = "target" - IntermediateSubMountPath = "intermediate" - ConfigSubMountPath = "config" - - ClustercodeTypeLabelKey = "clustercode.github.io/type" - ClustercodeSliceIndexLabelKey = "clustercode.github.io/slice-index" - ClustercodeTypeScan ClusterCodeJobType = "scan" - ClustercodeTypeSplit ClusterCodeJobType = "split" - ClustercodeTypeSlice ClusterCodeJobType = "slice" - ClustercodeTypeCount ClusterCodeJobType = "count" - ClustercodeTypeMerge ClusterCodeJobType = "merge" - ClustercodeTypeCleanup ClusterCodeJobType = "cleanup" -) - -var ( - ClustercodeTypes = []ClusterCodeJobType{ - ClustercodeTypeScan, ClustercodeTypeSplit, ClustercodeTypeCount, ClustercodeTypeSlice, - ClustercodeTypeMerge, ClustercodeTypeCleanup} -) - -func (t ClusterCodeJobType) AsLabels() labels.Set { - return labels.Set{ - ClustercodeTypeLabelKey: string(t), - } -} - -func (t ClusterCodeJobType) String() string { - return string(t) -} - -func mergeArgsAndReplaceVariables(variables map[string]string, argsList ...[]string) (merged []string) { - for _, args := range argsList { - for _, arg := range args { - for k, v := range variables { - arg = strings.ReplaceAll(arg, k, v) - } - merged = append(merged, arg) - } - } - return merged -} - -func getOwner(obj metav1.Object) types.NamespacedName { - for _, owner := range obj.GetOwnerReferences() { - if pointer.BoolPtrDerefOr(owner.Controller, false) { - return types.NamespacedName{Namespace: obj.GetNamespace(), Name: owner.Name} - } - } - return types.NamespacedName{} -} - -var DefaultFfmpegContainerImage string - -func createFfmpegJobDefinition(task *v1alpha1.Task, opts *TaskOpts) *batchv1.Job { - job := &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", task.Spec.TaskId, opts.jobType), - Namespace: task.Namespace, - Labels: labels.Merge(ClusterCodeLabels, labels.Merge(opts.jobType.AsLabels(), task.Spec.TaskId.AsLabels())), - }, - Spec: batchv1.JobSpec{ - BackoffLimit: pointer.Int32Ptr(0), - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - SecurityContext: &corev1.PodSecurityContext{ - RunAsUser: pointer.Int64Ptr(1000), - RunAsGroup: pointer.Int64Ptr(0), - FSGroup: pointer.Int64Ptr(0), - }, - ServiceAccountName: task.Spec.ServiceAccountName, - RestartPolicy: corev1.RestartPolicyNever, - Containers: []corev1.Container{ - { - Name: "ffmpeg", - Image: DefaultClusterCodeContainerImage, - ImagePullPolicy: corev1.PullIfNotPresent, - Args: opts.args, - }, - }, - }, - }, - }, - } - if opts.mountSource { - addPvcVolume(job, SourceSubMountPath, filepath.Join("/clustercode", SourceSubMountPath), task.Spec.Storage.SourcePvc) - } - if opts.mountIntermediate { - addPvcVolume(job, IntermediateSubMountPath, filepath.Join("/clustercode", IntermediateSubMountPath), task.Spec.Storage.IntermediatePvc) - } - if opts.mountTarget { - addPvcVolume(job, TargetSubMountPath, filepath.Join("/clustercode", TargetSubMountPath), task.Spec.Storage.TargetPvc) - } - if opts.mountConfig { - addConfigMapVolume(job, ConfigSubMountPath, filepath.Join("/clustercode", ConfigSubMountPath), task.Spec.FileListConfigMapRef) - } - return job -} - -func addPvcVolume(job *batchv1.Job, name, podMountRoot string, volume v1alpha1.ClusterCodeVolumeRef) { - job.Spec.Template.Spec.Containers[0].VolumeMounts = append(job.Spec.Template.Spec.Containers[0].VolumeMounts, - corev1.VolumeMount{Name: name, MountPath: podMountRoot, SubPath: volume.SubPath}) - job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, corev1.Volume{ - Name: name, - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: volume.ClaimName, - }, - }}) -} - -func addConfigMapVolume(job *batchv1.Job, name, podMountRoot, configMapName string) { - job.Spec.Template.Spec.Containers[0].VolumeMounts = append(job.Spec.Template.Spec.Containers[0].VolumeMounts, - corev1.VolumeMount{ - Name: name, - MountPath: podMountRoot, - }) - job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, corev1.Volume{ - Name: name, - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{Name: configMapName}, - }, - }, - }) -} diff --git a/pkg/operator/jobcontroller/controller.go b/pkg/operator/jobcontroller/controller.go new file mode 100644 index 00000000..f7a16ffa --- /dev/null +++ b/pkg/operator/jobcontroller/controller.go @@ -0,0 +1,120 @@ +package jobcontroller + +import ( + "context" + "fmt" + + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + "github.com/ccremer/clustercode/pkg/internal/pipe" + internaltypes "github.com/ccremer/clustercode/pkg/internal/types" + "github.com/ccremer/clustercode/pkg/internal/utils" + pipeline "github.com/ccremer/go-command-pipeline" + "github.com/go-logr/logr" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type ( + // JobProvisioner reconciles Job objects + JobProvisioner struct { + Client client.Client + Log logr.Logger + } + // JobContext holds the parameters of a single reconciliation + JobContext struct { + context.Context + resolver pipeline.DependencyResolver[*JobContext] + + job *batchv1.Job + jobType internaltypes.ClusterCodeJobType + task *v1alpha1.Task + log logr.Logger + sliceIndex int + } +) + +func (r *JobProvisioner) NewObject() *batchv1.Job { + return &batchv1.Job{} +} + +func (r *JobProvisioner) Provision(ctx context.Context, obj *batchv1.Job) (reconcile.Result, error) { + pctx := &JobContext{job: obj, Context: ctx, resolver: pipeline.NewDependencyRecorder[*JobContext]()} + + if !r.isJobComplete(obj) { + r.Log.V(1).Info("job is not completed yet, ignoring reconcile") + return reconcile.Result{}, nil + } + + p := pipeline.NewPipeline[*JobContext]().WithBeforeHooks(pipe.DebugLogger[*JobContext](pctx), pctx.resolver.Record) + p.WithSteps( + p.NewStep("determine job type", r.getJobType), + p.NewStep("fetch owning task", r.fetchTask), + p.When(r.isJobType(internaltypes.JobTypeSplit), "reconcile split job", r.ensureCountJob), + p.WithNestedSteps("reconcile slice job", r.isJobType(internaltypes.JobTypeSlice), + p.NewStep("determine ", r.determineSliceIndex), + p.NewStep("update status", r.updateStatus), + ), + p.When(r.isJobType(internaltypes.JobTypeMerge), "reconcile merge job", r.ensureCleanupJob), + ) + err := p.RunWithContext(pctx) + return reconcile.Result{}, err +} + +func (r *JobProvisioner) Deprovision(_ context.Context, _ *batchv1.Job) (reconcile.Result, error) { + r.Log.V(1).Info("job is being deleted, ignoring reconcile") + return reconcile.Result{}, nil +} + +func (r *JobProvisioner) isJobComplete(job *batchv1.Job) bool { + conditions := castConditions(job.Status.Conditions) + return meta.IsStatusConditionPresentAndEqual(conditions, string(batchv1.JobComplete), metav1.ConditionTrue) +} + +func (r *JobProvisioner) isJobType(jobType internaltypes.ClusterCodeJobType) func(ctx *JobContext) bool { + return func(ctx *JobContext) bool { + return ctx.jobType == jobType + } +} + +func (r *JobProvisioner) getJobType(ctx *JobContext) error { + jobType, err := getJobType(ctx.job) + ctx.jobType = jobType + return err +} + +func (r *JobProvisioner) fetchTask(ctx *JobContext) error { + ctx.task = &v1alpha1.Task{} + err := r.Client.Get(ctx, utils.GetOwner(ctx.job), ctx.task) + return err +} + +func getJobType(job *batchv1.Job) (internaltypes.ClusterCodeJobType, error) { + set := labels.Set(job.Labels) + if !set.Has(internaltypes.ClustercodeTypeLabelKey) { + return "", fmt.Errorf("missing label key '%s", internaltypes.ClustercodeTypeLabelKey) + } + label := set.Get(internaltypes.ClustercodeTypeLabelKey) + for _, jobType := range internaltypes.JobTypes { + if label == string(jobType) { + return jobType, nil + } + } + return "", fmt.Errorf("value of label '%s' unrecognized: %s", internaltypes.ClustercodeTypeLabelKey, label) +} + +func castConditions(conditions []batchv1.JobCondition) (converted []metav1.Condition) { + for _, c := range conditions { + converted = append(converted, metav1.Condition{ + Type: string(c.Type), + Status: metav1.ConditionStatus(c.Status), + LastTransitionTime: c.LastTransitionTime, + Reason: c.Reason, + Message: c.Message, + }) + } + return converted +} diff --git a/pkg/operator/jobcontroller/handler.go b/pkg/operator/jobcontroller/handler.go new file mode 100644 index 00000000..0e761eda --- /dev/null +++ b/pkg/operator/jobcontroller/handler.go @@ -0,0 +1,152 @@ +package jobcontroller + +import ( + "fmt" + "path/filepath" + "strconv" + + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + internaltypes "github.com/ccremer/clustercode/pkg/internal/types" + "github.com/ccremer/clustercode/pkg/internal/utils" + "github.com/ccremer/clustercode/pkg/operator/blueprintcontroller" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +func (r *JobProvisioner) ensureCountJob(ctx *JobContext) error { + ctx.resolver.MustRequireDependencyByFuncName(r.fetchTask) + + taskId := ctx.task.Spec.TaskId + intermediateMountRoot := filepath.Join("/clustercode", internaltypes.IntermediateSubMountPath) + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%.*s-%s", 62-len(internaltypes.JobTypeCount), taskId, internaltypes.JobTypeCount), + Namespace: ctx.job.Namespace, + }, + } + _, err := controllerutil.CreateOrUpdate(ctx, r.Client, job, func() error { + job.Labels = labels.Merge(job.Labels, labels.Merge(internaltypes.ClusterCodeLabels, labels.Merge(internaltypes.JobTypeCount.AsLabels(), taskId.AsLabels()))) + job.Spec = batchv1.JobSpec{ + BackoffLimit: pointer.Int32Ptr(0), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + ServiceAccountName: ctx.job.Spec.Template.Spec.ServiceAccountName, + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "clustercode", + Image: blueprintcontroller.DefaultClusterCodeContainerImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "--log-level=1", + "count", + "--task-name=" + ctx.task.Name, + "--namespace=" + ctx.job.Namespace, + }, + }, + }, + }, + }, + } + utils.EnsurePVCVolume(job, internaltypes.IntermediateSubMountPath, intermediateMountRoot, ctx.task.Spec.Storage.IntermediatePvc) + return controllerutil.SetControllerReference(ctx.task, job, r.Client.Scheme()) + }) + return err +} + +func (r *JobProvisioner) ensureCleanupJob(ctx *JobContext) error { + ctx.resolver.MustRequireDependencyByFuncName(r.fetchTask) + + taskId := ctx.task.Spec.TaskId + job := &batchv1.Job{ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%.*s-%s", 62-len(internaltypes.JobTypeCleanup), taskId, internaltypes.JobTypeCleanup), + Namespace: ctx.job.Namespace, + }} + + _, err := controllerutil.CreateOrUpdate(ctx, r.Client, job, func() error { + job.Labels = labels.Merge(job.Labels, labels.Merge(internaltypes.ClusterCodeLabels, labels.Merge(internaltypes.JobTypeCleanup.AsLabels(), taskId.AsLabels()))) + job.Spec = batchv1.JobSpec{ + BackoffLimit: pointer.Int32Ptr(0), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + SecurityContext: &corev1.PodSecurityContext{ + RunAsUser: pointer.Int64Ptr(1000), + RunAsGroup: pointer.Int64Ptr(0), + FSGroup: pointer.Int64Ptr(0), + }, + ServiceAccountName: ctx.job.Spec.Template.Spec.ServiceAccountName, + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "clustercode", + Image: blueprintcontroller.DefaultClusterCodeContainerImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "--log-level=1", + "--namespace=" + ctx.job.Namespace, + "cleanup", + "--task-name=" + ctx.task.Name, + }, + }, + }, + }, + }, + } + job.Spec.Template.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{ + {Name: internaltypes.SourceSubMountPath, MountPath: filepath.Join("/clustercode"), SubPath: internaltypes.SourceSubMountPath}, + {Name: internaltypes.IntermediateSubMountPath, MountPath: filepath.Join("/clustercode"), SubPath: internaltypes.IntermediateSubMountPath}, + } + job.Spec.Template.Spec.Volumes = []corev1.Volume{ + { + Name: internaltypes.SourceSubMountPath, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: ctx.task.Spec.Storage.SourcePvc.ClaimName}}}, + { + Name: internaltypes.IntermediateSubMountPath, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: ctx.task.Spec.Storage.IntermediatePvc.ClaimName}}}, + } + return controllerutil.SetOwnerReference(ctx.task, job, r.Client.Scheme()) + }) + return err +} + +func (r *JobProvisioner) determineSliceIndex(ctx *JobContext) error { + indexStr, found := ctx.job.Labels[internaltypes.ClustercodeSliceIndexLabelKey] + if !found { + return fmt.Errorf("cannot determine slice index, missing label '%s'", internaltypes.ClustercodeSliceIndexLabelKey) + } + index, err := strconv.Atoi(indexStr) + if err != nil { + return fmt.Errorf("cannot determine slice index from label '%s': %w", internaltypes.ClustercodeSliceIndexLabelKey, err) + } + ctx.sliceIndex = index + return err +} + +func (r *JobProvisioner) updateStatus(ctx *JobContext) error { + ctx.resolver.MustRequireDependencyByFuncName(r.fetchTask, r.determineSliceIndex) + + finishedList := ctx.task.Status.SlicesFinished + finishedList = append(finishedList, v1alpha1.ClustercodeSliceRef{ + SliceIndex: ctx.sliceIndex, + JobName: ctx.job.Name, + }) + ctx.task.Status.SlicesFinished = finishedList + ctx.task.Status.SlicesFinishedCount = len(finishedList) + + var scheduled []v1alpha1.ClustercodeSliceRef + for _, ref := range ctx.task.Status.SlicesScheduled { + if ref.SliceIndex != ctx.sliceIndex { + scheduled = append(scheduled, ref) + } + } + ctx.task.Status.SlicesScheduled = scheduled + return r.Client.Status().Update(ctx, ctx.task) +} diff --git a/pkg/operator/jobcontroller/setup.go b/pkg/operator/jobcontroller/setup.go new file mode 100644 index 00000000..7cec0135 --- /dev/null +++ b/pkg/operator/jobcontroller/setup.go @@ -0,0 +1,28 @@ +package jobcontroller + +import ( + "github.com/ccremer/clustercode/pkg/internal/types" + "github.com/ccremer/clustercode/pkg/operator/reconciler" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete + +// SetupJobController adds a controller that reconciles managed resources. +func SetupJobController(mgr ctrl.Manager) error { + name := "job.clustercode.github.io" + pred, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{MatchLabels: types.ClusterCodeLabels}) + if err != nil { + return err + } + controller := reconciler.NewReconciler[*batchv1.Job](mgr.GetClient(), &JobProvisioner{Client: mgr.GetClient(), Log: mgr.GetLogger()}) + return ctrl.NewControllerManagedBy(mgr). + Named(name). + For(&batchv1.Job{}, builder.WithPredicates(pred)). + Complete(controller) +} diff --git a/pkg/operator/reconciler/reconciler.go b/pkg/operator/reconciler/reconciler.go new file mode 100644 index 00000000..2993e934 --- /dev/null +++ b/pkg/operator/reconciler/reconciler.go @@ -0,0 +1,53 @@ +package reconciler + +import ( + "context" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// Reconciler is a generic controller. +type Reconciler[T client.Object] interface { + // NewObject returns a new instance of T. + // Users should just return an empty object without any fields set. + NewObject() T + // Provision is called when reconciling objects. + // This is only called when the object exists and was fetched successfully. + Provision(ctx context.Context, obj T) (reconcile.Result, error) + // Deprovision is called when the object has a deletion timestamp set. + Deprovision(ctx context.Context, obj T) (reconcile.Result, error) +} + +type controller[T client.Object] struct { + kube client.Client + reconciler Reconciler[T] +} + +// NewReconciler returns a new instance of Reconciler. +func NewReconciler[T client.Object](kube client.Client, reconciler Reconciler[T]) reconcile.Reconciler { + return &controller[T]{ + kube: kube, + reconciler: reconciler, + } +} + +// Reconcile implements reconcile.Reconciler +func (ctrl *controller[T]) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + obj := ctrl.reconciler.NewObject() + err := ctrl.kube.Get(ctx, request.NamespacedName, obj) + if err != nil && apierrors.IsNotFound(err) { + // doesn't exist anymore, ignore. + return reconcile.Result{}, nil + } + if err != nil { + // some other error + return reconcile.Result{}, err + } + if !obj.GetDeletionTimestamp().IsZero() { + return ctrl.reconciler.Deprovision(ctx, obj) + } + return ctrl.reconciler.Provision(ctx, obj) +} diff --git a/pkg/operator/setup.go b/pkg/operator/setup.go index 4cd9deb5..1425d778 100644 --- a/pkg/operator/setup.go +++ b/pkg/operator/setup.go @@ -1,16 +1,20 @@ package operator import ( - "github.com/ccremer/clustercode/pkg/operator/controllers" + "github.com/ccremer/clustercode/pkg/operator/blueprintcontroller" + "github.com/ccremer/clustercode/pkg/operator/jobcontroller" + "github.com/ccremer/clustercode/pkg/operator/taskcontroller" ctrl "sigs.k8s.io/controller-runtime" ) +// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;create;update + // SetupControllers creates all controllers and adds them to the supplied manager. func SetupControllers(mgr ctrl.Manager) error { for _, setup := range []func(ctrl.Manager) error{ - controllers.SetupBlueprintController, - controllers.SetupJobController, - controllers.SetupTaskController, + blueprintcontroller.SetupBlueprintController, + jobcontroller.SetupJobController, + taskcontroller.SetupTaskController, } { if err := setup(mgr); err != nil { return err diff --git a/pkg/operator/taskcontroller/setup.go b/pkg/operator/taskcontroller/setup.go new file mode 100644 index 00000000..ece7a543 --- /dev/null +++ b/pkg/operator/taskcontroller/setup.go @@ -0,0 +1,35 @@ +package taskcontroller + +import ( + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + "github.com/ccremer/clustercode/pkg/internal/types" + "github.com/ccremer/clustercode/pkg/operator/reconciler" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// +kubebuilder:rbac:groups=clustercode.github.io,resources=tasks,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=clustercode.github.io,resources=tasks/status;tasks/finalizers,verbs=get;update;patch + +// SetupTaskController adds a controller that reconciles managed resources. +func SetupTaskController(mgr ctrl.Manager) error { + name := "task.clustercode.github.io" + + controller := reconciler.NewReconciler[*v1alpha1.Task](mgr.GetClient(), &TaskReconciler{ + Client: mgr.GetClient(), + Log: mgr.GetLogger(), + }) + + pred, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{MatchLabels: types.ClusterCodeLabels}) + if err != nil { + return err + } + return ctrl.NewControllerManagedBy(mgr). + Named(name). + For(&v1alpha1.Task{}, builder.WithPredicates(pred)). + //Owns(&batchv1.Job{}). + //WithEventFilter(predicate.GenerationChangedPredicate{}). + Complete(controller) +} diff --git a/pkg/operator/taskcontroller/task_controller.go b/pkg/operator/taskcontroller/task_controller.go new file mode 100644 index 00000000..2bf3ae67 --- /dev/null +++ b/pkg/operator/taskcontroller/task_controller.go @@ -0,0 +1,216 @@ +package taskcontroller + +import ( + "context" + "fmt" + "path/filepath" + "strconv" + + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + "github.com/ccremer/clustercode/pkg/internal/pipe" + internaltypes "github.com/ccremer/clustercode/pkg/internal/types" + "github.com/ccremer/clustercode/pkg/internal/utils" + pipeline "github.com/ccremer/go-command-pipeline" + "github.com/go-logr/logr" + batchv1 "k8s.io/api/batch/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type ( + // TaskReconciler reconciles Task objects + TaskReconciler struct { + Client client.Client + Log logr.Logger + } + // TaskContext holds the parameters of a single reconciliation + TaskContext struct { + context.Context + task *v1alpha1.Task + blueprint *v1alpha1.Blueprint + log logr.Logger + job *batchv1.Job + nextSliceIndex int + } + TaskOpts struct { + args []string + jobType internaltypes.ClusterCodeJobType + mountSource bool + mountIntermediate bool + mountTarget bool + mountConfig bool + } +) + +func (r *TaskReconciler) NewObject() *v1alpha1.Task { + return &v1alpha1.Task{} +} + +func (r *TaskReconciler) Provision(ctx context.Context, obj *v1alpha1.Task) (reconcile.Result, error) { + tc := &TaskContext{ + Context: ctx, + task: obj, + nextSliceIndex: -1, + } + + p := pipeline.NewPipeline[*TaskContext]().WithBeforeHooks(pipe.DebugLogger[*TaskContext](tc)) + p.WithSteps( + p.When(r.hasNoSlicesPlanned, "create split job", r.createSplitJob), + p.When(r.allSlicesFinished, "create merge job", r.createMergeJob), + p.WithNestedSteps("schedule next slice job", r.partialSlicesFinished, + p.NewStep("determine next slice index", r.determineNextSliceIndex), + p.When(r.nextSliceDetermined, "create slice job", r.createSliceJob), + p.When(r.nextSliceDetermined, "update status", r.updateStatus), + ), + ) + err := p.RunWithContext(tc) + return reconcile.Result{}, err +} + +func (r *TaskReconciler) Deprovision(_ context.Context, _ *v1alpha1.Task) (reconcile.Result, error) { + return reconcile.Result{}, nil +} + +func (r *TaskReconciler) hasNoSlicesPlanned(ctx *TaskContext) bool { + return ctx.task.Spec.SlicesPlannedCount == 0 +} + +func (r *TaskReconciler) allSlicesFinished(ctx *TaskContext) bool { + return len(ctx.task.Status.SlicesFinished) >= ctx.task.Spec.SlicesPlannedCount +} + +func (r *TaskReconciler) partialSlicesFinished(ctx *TaskContext) bool { + return len(ctx.task.Status.SlicesFinished) < ctx.task.Spec.SlicesPlannedCount +} + +func (r *TaskReconciler) nextSliceDetermined(ctx *TaskContext) bool { + return ctx.nextSliceIndex >= 0 +} + +func (r *TaskReconciler) determineNextSliceIndex(ctx *TaskContext) error { + // Todo: Check condition whether more jobs are needed + status := ctx.task.Status + if ctx.task.Spec.ConcurrencyStrategy.ConcurrentCountStrategy != nil { + maxCount := ctx.task.Spec.ConcurrencyStrategy.ConcurrentCountStrategy.MaxCount + if len(status.SlicesScheduled) >= maxCount { + ctx.log.V(1).Info("reached concurrent max count, cannot schedule more", "max", maxCount) + ctx.nextSliceIndex = -1 + return nil + } + } + total := len(status.SlicesScheduled) + len(status.SlicesFinished) + toSkipIndexes := make(map[int]bool, total) + for i := 0; i < len(status.SlicesScheduled); i++ { + toSkipIndexes[status.SlicesScheduled[i].SliceIndex] = true + } + for i := 0; i < len(status.SlicesFinished); i++ { + toSkipIndexes[status.SlicesFinished[i].SliceIndex] = true + } + + for i := 0; i < total; i++ { + if _, exists := toSkipIndexes[i]; exists { + continue + } + ctx.nextSliceIndex = i + return nil + } + ctx.nextSliceIndex = -1 + return nil +} + +func (r *TaskReconciler) createSplitJob(ctx *TaskContext) error { + sourceMountRoot := filepath.Join("/clustercode", internaltypes.SourceSubMountPath) + intermediateMountRoot := filepath.Join("/clustercode", internaltypes.IntermediateSubMountPath) + variables := map[string]string{ + "${INPUT}": filepath.Join(sourceMountRoot, ctx.task.Spec.SourceUrl.GetPath()), + "${OUTPUT}": getSegmentFileNameTemplatePath(ctx, intermediateMountRoot), + "${SLICE_SIZE}": strconv.Itoa(ctx.task.Spec.EncodeSpec.SliceSize), + } + job := &batchv1.Job{ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", ctx.task.Spec.TaskId, internaltypes.JobTypeSplit), + Namespace: ctx.task.Namespace, + }} + + _, err := controllerutil.CreateOrUpdate(ctx, r.Client, job, func() error { + createFfmpegJobDefinition(job, ctx.task, &TaskOpts{ + args: utils.MergeArgsAndReplaceVariables(variables, ctx.task.Spec.EncodeSpec.DefaultCommandArgs, ctx.task.Spec.EncodeSpec.SplitCommandArgs), + jobType: internaltypes.JobTypeSplit, + mountSource: true, + mountIntermediate: true, + }) + return controllerutil.SetControllerReference(ctx.task, job, r.Client.Scheme()) + }) + return err +} + +func (r *TaskReconciler) createSliceJob(ctx *TaskContext) error { + intermediateMountRoot := filepath.Join("/clustercode", internaltypes.IntermediateSubMountPath) + index := ctx.nextSliceIndex + variables := map[string]string{ + "${INPUT}": getSourceSegmentFileNameIndexPath(ctx, intermediateMountRoot, index), + "${OUTPUT}": getTargetSegmentFileNameIndexPath(ctx, intermediateMountRoot, index), + } + job := &batchv1.Job{ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s-%d", ctx.task.Spec.TaskId, internaltypes.JobTypeSlice, index), + Namespace: ctx.task.Namespace, + }} + _, err := controllerutil.CreateOrUpdate(ctx, r.Client, job, func() error { + createFfmpegJobDefinition(job, ctx.task, &TaskOpts{ + args: utils.MergeArgsAndReplaceVariables(variables, ctx.task.Spec.EncodeSpec.DefaultCommandArgs, ctx.task.Spec.EncodeSpec.TranscodeCommandArgs), + jobType: internaltypes.JobTypeSlice, + mountIntermediate: true, + }) + job.Labels[internaltypes.ClustercodeSliceIndexLabelKey] = strconv.Itoa(index) + + return controllerutil.SetControllerReference(ctx.task, job, r.Client.Scheme()) + }) + return err + +} + +func (r *TaskReconciler) updateStatus(ctx *TaskContext) error { + ctx.task.Status.SlicesScheduled = append(ctx.task.Status.SlicesScheduled, v1alpha1.ClustercodeSliceRef{ + JobName: ctx.job.Name, + SliceIndex: ctx.nextSliceIndex, + }) + return r.Client.Status().Update(ctx, ctx.task) +} + +func (r *TaskReconciler) createMergeJob(ctx *TaskContext) error { + configMountRoot := filepath.Join("/clustercode", internaltypes.ConfigSubMountPath) + targetMountRoot := filepath.Join("/clustercode", internaltypes.TargetSubMountPath) + variables := map[string]string{ + "${INPUT}": filepath.Join(configMountRoot, v1alpha1.ConfigMapFileName), + "${OUTPUT}": filepath.Join(targetMountRoot, ctx.task.Spec.TargetUrl.GetPath()), + } + job := &batchv1.Job{ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", ctx.task.Spec.TaskId, internaltypes.JobTypeMerge), + Namespace: ctx.task.Namespace, + }} + + _, err := controllerutil.CreateOrUpdate(ctx, r.Client, job, func() error { + createFfmpegJobDefinition(job, ctx.task, &TaskOpts{ + args: utils.MergeArgsAndReplaceVariables(variables, ctx.task.Spec.EncodeSpec.DefaultCommandArgs, ctx.task.Spec.EncodeSpec.MergeCommandArgs), + jobType: internaltypes.JobTypeMerge, + mountIntermediate: true, + mountTarget: true, + mountConfig: true, + }) + return controllerutil.SetControllerReference(ctx.task, job, r.Client.Scheme()) + }) + return err +} + +func getSegmentFileNameTemplatePath(ctx *TaskContext, intermediateMountRoot string) string { + return filepath.Join(intermediateMountRoot, ctx.task.Name+"_%d"+filepath.Ext(ctx.task.Spec.SourceUrl.GetPath())) +} + +func getSourceSegmentFileNameIndexPath(ctx *TaskContext, intermediateMountRoot string, index int) string { + return filepath.Join(intermediateMountRoot, fmt.Sprintf("%s_%d%s", ctx.task.Name, index, filepath.Ext(ctx.task.Spec.SourceUrl.GetPath()))) +} + +func getTargetSegmentFileNameIndexPath(ctx *TaskContext, intermediateMountRoot string, index int) string { + return filepath.Join(intermediateMountRoot, fmt.Sprintf("%s_%d%s%s", ctx.task.Name, index, v1alpha1.MediaFileDoneSuffix, filepath.Ext(ctx.task.Spec.TargetUrl.GetPath()))) +} diff --git a/pkg/operator/taskcontroller/utils.go b/pkg/operator/taskcontroller/utils.go new file mode 100644 index 00000000..e73f5149 --- /dev/null +++ b/pkg/operator/taskcontroller/utils.go @@ -0,0 +1,73 @@ +package taskcontroller + +import ( + "path/filepath" + + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + internaltypes "github.com/ccremer/clustercode/pkg/internal/types" + "github.com/ccremer/clustercode/pkg/internal/utils" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/utils/pointer" +) + +var DefaultFfmpegContainerImage string + +func createFfmpegJobDefinition(job *batchv1.Job, task *v1alpha1.Task, opts *TaskOpts) *batchv1.Job { + + job.Labels = labels.Merge(job.Labels, labels.Merge(internaltypes.ClusterCodeLabels, labels.Merge(opts.jobType.AsLabels(), task.Spec.TaskId.AsLabels()))) + job.Spec.BackoffLimit = pointer.Int32Ptr(0) + job.Spec.Template.Spec.SecurityContext = &corev1.PodSecurityContext{ + RunAsUser: pointer.Int64Ptr(1000), + RunAsGroup: pointer.Int64Ptr(0), + FSGroup: pointer.Int64Ptr(0), + } + job.Spec.Template.Spec.ServiceAccountName = task.Spec.ServiceAccountName + job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever + job.Spec.Template.Spec.Containers = []corev1.Container{{ + Name: "ffmpeg", + Image: DefaultFfmpegContainerImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Args: opts.args, + }} + if opts.mountSource { + utils.EnsurePVCVolume(job, internaltypes.SourceSubMountPath, filepath.Join("/clustercode", internaltypes.SourceSubMountPath), task.Spec.Storage.SourcePvc) + } + if opts.mountIntermediate { + utils.EnsurePVCVolume(job, internaltypes.IntermediateSubMountPath, filepath.Join("/clustercode", internaltypes.IntermediateSubMountPath), task.Spec.Storage.IntermediatePvc) + } + if opts.mountTarget { + utils.EnsurePVCVolume(job, internaltypes.TargetSubMountPath, filepath.Join("/clustercode", internaltypes.TargetSubMountPath), task.Spec.Storage.TargetPvc) + } + if opts.mountConfig { + addConfigMapVolume(job, internaltypes.ConfigSubMountPath, filepath.Join("/clustercode", internaltypes.ConfigSubMountPath), task.Spec.FileListConfigMapRef) + } + return job +} + +func addConfigMapVolume(job *batchv1.Job, name, podMountRoot, configMapName string) { + found := false + for _, container := range job.Spec.Template.Spec.Containers { + if utils.HasVolumeMount(name, container) { + found = true + break + } + } + if found { + return + } + job.Spec.Template.Spec.Containers[0].VolumeMounts = append(job.Spec.Template.Spec.Containers[0].VolumeMounts, + corev1.VolumeMount{ + Name: name, + MountPath: podMountRoot, + }) + job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes, corev1.Volume{ + Name: name, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: configMapName}, + }, + }, + }) +} diff --git a/pkg/scancmd/run.go b/pkg/scancmd/run.go new file mode 100644 index 00000000..bd0f8d9d --- /dev/null +++ b/pkg/scancmd/run.go @@ -0,0 +1,232 @@ +package scancmd + +import ( + "context" + "errors" + "os" + "path/filepath" + "strings" + + "github.com/ccremer/clustercode/pkg/api/v1alpha1" + "github.com/ccremer/clustercode/pkg/internal/pipe" + internaltypes "github.com/ccremer/clustercode/pkg/internal/types" + pipeline "github.com/ccremer/go-command-pipeline" + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +type Command struct { + Log logr.Logger + + SourceRootDir string + Namespace string + BlueprintName string +} + +type commandContext struct { + context.Context + dependencyResolver pipeline.DependencyResolver[*commandContext] + + kube client.Client + blueprint *v1alpha1.Blueprint + segmentFiles []string + currentTasks []v1alpha1.Task + selectedRelPath string +} + +var noMatchFoundErr = errors.New("no matching source file found") + +// Execute runs the command and returns an error, if any. +func (c *Command) Execute(ctx context.Context) error { + + pctx := &commandContext{ + dependencyResolver: pipeline.NewDependencyRecorder[*commandContext](), + Context: ctx, + } + + p := pipeline.NewPipeline[*commandContext]().WithBeforeHooks(pipe.DebugLogger(pctx), pctx.dependencyResolver.Record) + p.WithSteps( + p.NewStep("create client", c.createClient), + p.NewStep("fetch blueprint", c.fetchBlueprint), + p.WithNestedSteps("schedule new task", c.hasFreeTaskSlots, + p.NewStep("fetch current tasks", c.fetchCurrentTasks), + p.NewStep("select new file", c.selectNewFile), + p.NewStep("create task", c.createTask), + ).WithErrorHandler(c.abortIfNoMatchFound), + ) + + return p.RunWithContext(pctx) +} + +func (c *Command) createClient(ctx *commandContext) error { + kube, err := pipe.NewKubeClient(ctx) + ctx.kube = kube + return err +} + +func (c *Command) fetchBlueprint(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.createClient) + log := c.getLogger() + + blueprint := &v1alpha1.Blueprint{} + if err := ctx.kube.Get(ctx, types.NamespacedName{Namespace: c.Namespace, Name: c.BlueprintName}, blueprint); err != nil { + return err + } + ctx.blueprint = blueprint + log.Info("fetched blueprint") + return nil +} + +func (c *Command) hasFreeTaskSlots(ctx *commandContext) bool { + return !ctx.blueprint.IsMaxParallelTaskLimitReached() +} + +func (c *Command) fetchCurrentTasks(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.createClient, c.fetchBlueprint) + log := c.getLogger() + + taskList := v1alpha1.TaskList{} + err := ctx.kube.List(ctx, &taskList, + client.MatchingLabels(internaltypes.ClusterCodeLabels), + client.InNamespace(ctx.blueprint.Namespace), + ) + if err != nil { + return err + } + filteredTasks := make([]v1alpha1.Task, 0) + for _, task := range taskList.Items { + if !task.GetDeletionTimestamp().IsZero() { + // ignore tasks about to be deleted + continue + } + for _, owner := range task.GetOwnerReferences() { + if owner.Name == ctx.blueprint.Name { + filteredTasks = append(filteredTasks, task) + } + } + } + log.Info("fetched current tasks", "count", len(filteredTasks)) + ctx.currentTasks = filteredTasks + return nil +} + +func (c *Command) selectNewFile(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.fetchBlueprint, c.fetchCurrentTasks) + log := c.getLogger() + + alreadyQueuedFiles := make([]string, len(ctx.currentTasks)) + for i, task := range ctx.currentTasks { + alreadyQueuedFiles[i] = c.getAbsolutePath(task.Spec.SourceUrl) + } + + var foundFileErr = errors.New("found") + + root := filepath.Join(c.SourceRootDir, internaltypes.SourceSubMountPath) + err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { + if err != nil { + // could not access file, let's prevent a panic + return err + } + if info.IsDir() { + return nil + } + if !containsExtension(filepath.Ext(path), ctx.blueprint.Spec.ScanSpec.MediaFileExtensions) { + log.V(1).Info("file extension not accepted", "path", path) + return nil + } + for _, queuedFile := range alreadyQueuedFiles { + if queuedFile == path { + log.V(1).Info("skipping already queued file", "path", path) + return nil + } + } + ctx.selectedRelPath = path + return foundFileErr // abort early if found a match + }) + if errors.Is(err, foundFileErr) { + return nil + } + if err != nil { + return err + } + // we didn't find anything, let the pipeline know. + return noMatchFoundErr +} + +func (c *Command) getAbsolutePath(uri v1alpha1.ClusterCodeUrl) string { + return filepath.Join(c.SourceRootDir, uri.GetRoot(), uri.GetPath()) +} + +func (c *Command) createTask(ctx *commandContext) error { + ctx.dependencyResolver.MustRequireDependencyByFuncName(c.selectNewFile) + log := c.getLogger() + + bp := ctx.blueprint + selectedFile, err := filepath.Rel(filepath.Join(c.SourceRootDir, internaltypes.SourceSubMountPath), ctx.selectedRelPath) + if err != nil { + return err + } + + taskId := string(uuid.NewUUID()) + task := &v1alpha1.Task{ObjectMeta: metav1.ObjectMeta{ + Namespace: c.Namespace, + Name: taskId, + }} + op, err := controllerutil.CreateOrUpdate(ctx, ctx.kube, task, func() error { + task.Labels = labels.Merge(task.Labels, internaltypes.ClusterCodeLabels) + task.Spec = v1alpha1.TaskSpec{ + TaskId: v1alpha1.ClustercodeTaskId(taskId), + SourceUrl: v1alpha1.ToUrl(internaltypes.SourceSubMountPath, selectedFile), + TargetUrl: v1alpha1.ToUrl(internaltypes.TargetSubMountPath, selectedFile), + EncodeSpec: bp.Spec.EncodeSpec, + Storage: bp.Spec.Storage, + ServiceAccountName: bp.GetServiceAccountName(), + FileListConfigMapRef: taskId + "-slice-list", + ConcurrencyStrategy: bp.Spec.TaskConcurrencyStrategy, + } + return controllerutil.SetOwnerReference(ctx.blueprint, task, ctx.kube.Scheme()) + }) + if op == controllerutil.OperationResultCreated || op == controllerutil.OperationResultUpdated { + log.Info("Updated task", "name", task.Name) + } + return err +} + +func (c *Command) abortIfNoMatchFound(ctx *commandContext, err error) error { + log := c.getLogger() + + if errors.Is(err, noMatchFoundErr) { + log.Info("no media files found") + return nil + } + return err +} + +func (c *Command) getLogger() logr.Logger { + return c.Log.WithValues("blueprint", c.BlueprintName, "namespace", c.Namespace) +} + +// containsExtension returns true if the given extension is in the given acceptableFileExtensions. For each entry in the list, +// the leading "." prefix is optional. The leading "." is mandatory for `extension` and it returns false if extension is empty +func containsExtension(extension string, acceptableFileExtensions []string) bool { + if extension == "" { + return false + } + for _, ext := range acceptableFileExtensions { + if strings.HasPrefix(ext, ".") { + if extension == ext { + return true + } + continue + } + if extension == "."+ext { + return true + } + } + return false +} diff --git a/scan_command.go b/scan_command.go index 230e3624..93487442 100644 --- a/scan_command.go +++ b/scan_command.go @@ -1,215 +1,26 @@ package main import ( - "context" - "fmt" - "os" - "path/filepath" - "strings" - - "github.com/ccremer/clustercode/api/v1alpha1" - "github.com/ccremer/clustercode/pkg/operator/controllers" + "github.com/ccremer/clustercode/pkg/scancmd" "github.com/urfave/cli/v2" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/client-go/rest" - "k8s.io/utils/pointer" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - controllerclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + controllerruntime "sigs.k8s.io/controller-runtime" ) -type scanCommand struct { - kubeconfig *rest.Config - kube client.Client - - BlueprintName string - BlueprintNamespace string - SourceRoot string - RoleKind string -} - -const ( - ClusterRole = "ClusterRole" - Role = "Role" -) - -var scanCommandName = "scan" -var scanLog = ctrl.Log.WithName("scan") - func newScanCommand() *cli.Command { - command := &scanCommand{} + command := &scancmd.Command{} return &cli.Command{ - Name: scanCommandName, - Usage: "Scan source storage for new files", - Action: command.execute, + Name: "scan", + Usage: "Scan source storage for new files and queue task", + Before: LogMetadata, + Action: func(ctx *cli.Context) error { + command.Log = AppLogger(ctx).WithName(ctx.Command.Name) + controllerruntime.SetLogger(command.Log) + return command.Execute(controllerruntime.LoggerInto(ctx.Context, command.Log)) + }, Flags: []cli.Flag{ newBlueprintNameFlag(&command.BlueprintName), - newNamespaceFlag(&command.BlueprintNamespace), - newSourceRootDirFlag(&command.SourceRoot), + newNamespaceFlag(&command.Namespace), + newSourceRootDirFlag(&command.SourceRootDir), }, } } - -func (c *scanCommand) execute(ctx *cli.Context) error { - registerScheme() - err := createClientFn(&commandContext{}) - if err != nil { - return err - } - bp, err := c.getBlueprint(ctx.Context) - if err != nil { - return err - } - scanLog.Info("found bp", "bp", bp) - - if bp.IsMaxParallelTaskLimitReached() { - scanLog.Info("max parallel task count is reached, ignoring scan") - return nil - } - - tasks, err := c.getCurrentTasks(ctx.Context, bp) - if err != nil { - return err - } - scanLog.Info("get list of current tasks", "tasks", tasks) - existingFiles := c.mapAndFilterTasks(tasks, bp) - files, err := c.scanSourceForMedia(bp, existingFiles) - if err != nil { - return err - } - - if len(files) <= 0 { - scanLog.Info("no media files found") - return nil - } - - selectedFile, err := filepath.Rel(filepath.Join(c.SourceRoot, controllers.SourceSubMountPath), files[0]) - - taskId := string(uuid.NewUUID()) - task := &v1alpha1.Task{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: c.BlueprintNamespace, - Name: taskId, - Labels: controllers.ClusterCodeLabels, - }, - Spec: v1alpha1.TaskSpec{ - TaskId: v1alpha1.ClustercodeTaskId(taskId), - SourceUrl: v1alpha1.ToUrl(controllers.SourceSubMountPath, selectedFile), - TargetUrl: v1alpha1.ToUrl(controllers.TargetSubMountPath, selectedFile), - EncodeSpec: bp.Spec.EncodeSpec, - Storage: bp.Spec.Storage, - ServiceAccountName: bp.GetServiceAccountName(), - FileListConfigMapRef: taskId + "-slice-list", - ConcurrencyStrategy: bp.Spec.TaskConcurrencyStrategy, - }, - } - if err := controllerutil.SetControllerReference(bp, task.GetObjectMeta(), scheme); err != nil { - scanLog.Error(err, "could not set controller reference. Deleting the bp might not delete this task") - } - if err := c.kube.Create(ctx.Context, task); err != nil { - return fmt.Errorf("could not create task: %w", err) - } else { - scanLog.Info("created task", "task", task.Name, "source", task.Spec.SourceUrl) - } - return nil -} - -func (c *scanCommand) mapAndFilterTasks(tasks []v1alpha1.Task, bp *v1alpha1.Blueprint) []string { - - var sourceFiles []string - for _, task := range tasks { - if task.GetDeletionTimestamp() != nil { - continue - } - sourceFiles = append(sourceFiles, c.getAbsolutePath(task.Spec.SourceUrl)) - } - - return sourceFiles -} - -func (c *scanCommand) getAbsolutePath(uri v1alpha1.ClusterCodeUrl) string { - return filepath.Join(c.SourceRoot, uri.GetRoot(), uri.GetPath()) -} - -func (c *scanCommand) getCurrentTasks(ctx context.Context, bp *v1alpha1.Blueprint) ([]v1alpha1.Task, error) { - list := v1alpha1.TaskList{} - err := c.kube.List(ctx, &list, - controllerclient.MatchingLabels(controllers.ClusterCodeLabels), - controllerclient.InNamespace(bp.Namespace)) - if err != nil { - return list.Items, err - } - var tasks []v1alpha1.Task - for _, task := range list.Items { - for _, owner := range task.GetOwnerReferences() { - if pointer.BoolPtrDerefOr(owner.Controller, false) && owner.Name == bp.Name { - tasks = append(tasks, task) - } - } - } - return list.Items, err -} - -func (c *scanCommand) scanSourceForMedia(bp *v1alpha1.Blueprint, skipFiles []string) (files []string, funcErr error) { - root := filepath.Join(c.SourceRoot, controllers.SourceSubMountPath) - err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { - if err != nil { - // could not access file, let's prevent a panic - return err - } - if info.IsDir() { - return nil - } - if !containsExtension(filepath.Ext(path), bp.Spec.ScanSpec.MediaFileExtensions) { - scanLog.V(1).Info("file extension not accepted", "path", path) - return nil - } - for _, skipFile := range skipFiles { - if skipFile == path { - scanLog.V(1).Info("skipping already queued file", "path", path) - return nil - } - } - - files = append(files, path) - return nil - }) - - return files, err -} - -func (c *scanCommand) getBlueprint(ctx context.Context) (*v1alpha1.Blueprint, error) { - bp := &v1alpha1.Blueprint{} - name := types.NamespacedName{ - Name: c.BlueprintName, - Namespace: c.BlueprintNamespace, - } - err := c.kube.Get(ctx, name, bp) - if err != nil { - return &v1alpha1.Blueprint{}, err - } - return bp, nil -} - -// containsExtension returns true if the given extension is in the given acceptableFileExtensions. For each entry in the list, -// the leading "." prefix is optional. The leading "." is mandatory for `extension` and it returns false if extension is empty -func containsExtension(extension string, acceptableFileExtensions []string) bool { - if extension == "" { - return false - } - for _, ext := range acceptableFileExtensions { - if strings.HasPrefix(ext, ".") { - if extension == ext { - return true - } - continue - } - if extension == "."+ext { - return true - } - } - return false -} diff --git a/test/e2e/kuttl-test.yaml b/test/e2e/kuttl-test.yaml index 8e2b805f..67ddf433 100644 --- a/test/e2e/kuttl-test.yaml +++ b/test/e2e/kuttl-test.yaml @@ -2,5 +2,5 @@ apiVersion: kuttl.dev/v1beta1 kind: TestSuite testDirs: - ./test/e2e -timeout: 60 +timeout: 120 parallel: 1 diff --git a/test/e2e/operator/00-assert.yaml b/test/e2e/operator/00-assert.yaml deleted file mode 100644 index 9c6afd1c..00000000 --- a/test/e2e/operator/00-assert.yaml +++ /dev/null @@ -1,4 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert ---- diff --git a/test/e2e/operator/00-delete.yaml b/test/e2e/operator/00-delete.yaml new file mode 100644 index 00000000..e9c393d9 --- /dev/null +++ b/test/e2e/operator/00-delete.yaml @@ -0,0 +1,22 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +delete: +# This will wait until resources are really gone + - apiVersion: v1 + kind: PersistentVolumeClaim + name: source + - apiVersion: v1 + kind: PersistentVolumeClaim + name: intermediate + - apiVersion: v1 + kind: PersistentVolumeClaim + name: target + - apiVersion: v1 + kind: PersistentVolume + name: e2e-source + - apiVersion: v1 + kind: PersistentVolume + name: e2e-intermediate + - apiVersion: v1 + kind: PersistentVolume + name: e2e-target diff --git a/test/e2e/operator/00-install.yaml b/test/e2e/operator/00-install.yaml deleted file mode 100644 index 8dd44659..00000000 --- a/test/e2e/operator/00-install.yaml +++ /dev/null @@ -1,48 +0,0 @@ ---- -apiVersion: v1 -kind: PersistentVolume -metadata: - name: e2e-source - labels: - pv.kubernetes.io/type: source -spec: - capacity: - storage: 1Gi - accessModes: - - ReadWriteMany - storageClassName: hostpath - hostPath: - path: /pv/data - type: Directory ---- -apiVersion: v1 -kind: PersistentVolume -metadata: - name: e2e-intermediate - labels: - pv.kubernetes.io/type: intermediate -spec: - capacity: - storage: 1Gi - accessModes: - - ReadWriteMany - storageClassName: hostpath - hostPath: - path: /pv/data - type: Directory ---- -apiVersion: v1 -kind: PersistentVolume -metadata: - name: e2e-target - labels: - pv.kubernetes.io/type: target -spec: - capacity: - storage: 1Gi - accessModes: - - ReadWriteMany - storageClassName: hostpath - hostPath: - path: /pv/data - type: Directory diff --git a/test/e2e/operator/01-assert.yaml b/test/e2e/operator/01-assert.yaml new file mode 100644 index 00000000..4d6a194c --- /dev/null +++ b/test/e2e/operator/01-assert.yaml @@ -0,0 +1,24 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +--- +apiVersion: v1 +kind: PersistentVolume +metadata: + name: e2e-source +status: + phase: Available +--- +apiVersion: v1 +kind: PersistentVolume +metadata: + name: e2e-intermediate +status: + phase: Available +--- +apiVersion: v1 +kind: PersistentVolume +metadata: + name: e2e-target +status: + phase: Available diff --git a/test/e2e/operator/01-install.yaml b/test/e2e/operator/01-install.yaml index e2cdf8b5..6f6f69a2 100644 --- a/test/e2e/operator/01-install.yaml +++ b/test/e2e/operator/01-install.yaml @@ -1,49 +1,45 @@ --- apiVersion: v1 -kind: PersistentVolumeClaim +kind: PersistentVolume metadata: - name: source + name: e2e-source + labels: + pv.kubernetes.io/type: source spec: + capacity: + storage: 1Gi accessModes: - ReadWriteMany - volumeMode: Filesystem - resources: - requests: - storage: 1Gi - storageClassName: hostpath - selector: - matchLabels: - pv.kubernetes.io/type: source + hostPath: + path: /pv/data + type: Directory --- apiVersion: v1 -kind: PersistentVolumeClaim +kind: PersistentVolume metadata: - name: intermediate + name: e2e-intermediate + labels: + pv.kubernetes.io/type: intermediate spec: + capacity: + storage: 1Gi accessModes: - ReadWriteMany - volumeMode: Filesystem - resources: - requests: - storage: 1Gi - storageClassName: hostpath - selector: - matchLabels: - pv.kubernetes.io/type: intermediate - + hostPath: + path: /pv/data + type: Directory --- apiVersion: v1 -kind: PersistentVolumeClaim +kind: PersistentVolume metadata: - name: target + name: e2e-target + labels: + pv.kubernetes.io/type: target spec: + capacity: + storage: 1Gi accessModes: - ReadWriteMany - volumeMode: Filesystem - resources: - requests: - storage: 1Gi - storageClassName: hostpath - selector: - matchLabels: - pv.kubernetes.io/type: target + hostPath: + path: /pv/data + type: Directory diff --git a/test/e2e/operator/02-assert.yaml b/test/e2e/operator/02-assert.yaml new file mode 100644 index 00000000..f05d45ed --- /dev/null +++ b/test/e2e/operator/02-assert.yaml @@ -0,0 +1,21 @@ +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: source +status: + phase: Bound +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: intermediate +status: + phase: Bound +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: target +status: + phase: Bound diff --git a/test/e2e/operator/02-install.yaml b/test/e2e/operator/02-install.yaml index 14490dec..c9088198 100644 --- a/test/e2e/operator/02-install.yaml +++ b/test/e2e/operator/02-install.yaml @@ -1,60 +1,48 @@ - --- -apiVersion: clustercode.github.io/v1alpha1 -kind: Blueprint +apiVersion: v1 +kind: PersistentVolumeClaim metadata: - name: test-blueprint + name: source spec: - scanSchedule: "*/1 * * * *" - storage: - sourcePvc: - claimName: source - subPath: source - intermediatePvc: - claimName: intermediate - subPath: intermediate - targetPvc: - claimName: target - subPath: target - scanSpec: - mediaFileExtensions: - - mp4 - taskConcurrencyStrategy: - concurrentCountStrategy: - maxCount: 1 - encodeSpec: - sliceSize: 1 - defaultCommandArgs: - - -y - - -hide_banner - - -nostats - splitCommandArgs: - - -i - - ${INPUT} - - -c - - copy - - -map - - "0" - - -segment_time - - ${SLICE_SIZE} - - -f - - segment - - ${OUTPUT} - transcodeCommandArgs: - - -i - - ${INPUT} - - -c:v - - copy - - -c:a - - copy - - ${OUTPUT} - mergeCommandArgs: - - -f - - concat - - -safe - - "0" - - -i - - ${INPUT} - - -c - - copy - - ${OUTPUT} + accessModes: + - ReadWriteMany + storageClassName: "" + volumeMode: Filesystem + resources: + requests: + storage: 1Gi + selector: + matchLabels: + pv.kubernetes.io/type: source +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: intermediate +spec: + accessModes: + - ReadWriteMany + storageClassName: "" + volumeMode: Filesystem + resources: + requests: + storage: 1Gi + selector: + matchLabels: + pv.kubernetes.io/type: intermediate +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: target +spec: + accessModes: + - ReadWriteMany + storageClassName: "" + volumeMode: Filesystem + resources: + requests: + storage: 1Gi + selector: + matchLabels: + pv.kubernetes.io/type: target diff --git a/test/e2e/operator/03-assert.yaml b/test/e2e/operator/03-assert.yaml new file mode 100644 index 00000000..83f3c280 --- /dev/null +++ b/test/e2e/operator/03-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +--- +apiVersion: clustercode.github.io/v1alpha1 +kind: Task +metadata: + labels: + app.kubernetes.io/managed-by: clustercode +spec: + slicesPlannedCount: 3 diff --git a/test/e2e/operator/03-install.yaml b/test/e2e/operator/03-install.yaml new file mode 100644 index 00000000..d1a0e995 --- /dev/null +++ b/test/e2e/operator/03-install.yaml @@ -0,0 +1,59 @@ +--- +apiVersion: clustercode.github.io/v1alpha1 +kind: Blueprint +metadata: + name: test-blueprint +spec: + scanSchedule: "*/1 * * * *" + storage: + sourcePvc: + claimName: source + subPath: source + intermediatePvc: + claimName: intermediate + subPath: intermediate + targetPvc: + claimName: target + subPath: target + scanSpec: + mediaFileExtensions: + - mp4 + taskConcurrencyStrategy: + concurrentCountStrategy: + maxCount: 1 + encodeSpec: + sliceSize: 1 + defaultCommandArgs: + - -y + - -hide_banner + - -nostats + splitCommandArgs: + - -i + - ${INPUT} + - -c + - copy + - -map + - "0" + - -segment_time + - ${SLICE_SIZE} + - -f + - segment + - ${OUTPUT} + transcodeCommandArgs: + - -i + - ${INPUT} + - -c:v + - copy + - -c:a + - copy + - ${OUTPUT} + mergeCommandArgs: + - -f + - concat + - -safe + - "0" + - -i + - ${INPUT} + - -c + - copy + - ${OUTPUT} diff --git a/test/e2e/operator/04-delete.yaml b/test/e2e/operator/04-delete.yaml new file mode 100644 index 00000000..e9c393d9 --- /dev/null +++ b/test/e2e/operator/04-delete.yaml @@ -0,0 +1,22 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +delete: +# This will wait until resources are really gone + - apiVersion: v1 + kind: PersistentVolumeClaim + name: source + - apiVersion: v1 + kind: PersistentVolumeClaim + name: intermediate + - apiVersion: v1 + kind: PersistentVolumeClaim + name: target + - apiVersion: v1 + kind: PersistentVolume + name: e2e-source + - apiVersion: v1 + kind: PersistentVolume + name: e2e-intermediate + - apiVersion: v1 + kind: PersistentVolume + name: e2e-target