diff --git a/.github/release-please-config.json b/.github/release-please-config.json index a4be8e8..032c3ee 100644 --- a/.github/release-please-config.json +++ b/.github/release-please-config.json @@ -4,6 +4,7 @@ ".": { "release-type": "go", "package-name": "bobrapet", + "include-component-in-tag": false, "changelog-sections": [ { "type": "feat", "section": "Features", "hidden": false }, { "type": "fix", "section": "Bug Fixes", "hidden": false }, diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index 6639a0f..8930c00 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -58,6 +58,7 @@ jobs: type=semver,pattern={{version}},value=${{ steps.release.outputs.tag_name }} type=semver,pattern={{major}}.{{minor}},value=${{ steps.release.outputs.tag_name }} type=semver,pattern={{major}},value=${{ steps.release.outputs.tag_name }} + type=raw,value=${{ steps.release.outputs.tag_name }} type=raw,value=latest - name: Build and push Docker image @@ -95,6 +96,21 @@ jobs: run: | gh release upload ${{ steps.release.outputs.tag_name }} dist/install.yaml + - name: Warm Go module caches (proxy/sum/pkg.go.dev) + if: ${{ steps.release.outputs.release_created }} + run: | + set -euo pipefail + MOD=github.com/${{ github.repository }} + VER=${{ steps.release.outputs.tag_name }} + echo "Warming proxy.golang.org for $MOD@$VER" + curl -sSfL "https://proxy.golang.org/${MOD}/@v/${VER}.info" || true + echo "Warming sum.golang.org for $MOD@$VER" + curl -sSfL "https://sum.golang.org/lookup/${MOD}@${VER}" || true + echo "Triggering pkg.go.dev indexing for $MOD@$VER" + curl -sSfL "https://pkg.go.dev/${MOD}@${VER}" > /dev/null || true + echo "Triggering Go Report Card for $MOD" + curl -X POST -F "repo=${MOD}" https://goreportcard.com/checks + - name: Create release summary if: ${{ steps.release.outputs.release_created }} run: | diff --git a/.golangci.yml b/.golangci.yml index e5b21b0..6ef9fd0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -22,6 +22,8 @@ linters: - unparam - unused settings: + gocyclo: + min-complexity: 15 revive: rules: - name: comment-spacings diff --git a/cmd/main.go b/cmd/main.go index 91a372b..a5c1b63 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -100,71 +100,27 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - // if the enable-http2 flag is false (the default), http/2 should be disabled - // due to its vulnerabilities. More specifically, disabling http/2 will - // prevent from being vulnerable to the HTTP/2 Stream Cancellation and - // Rapid Reset CVEs. For more information see: - // - https://github.com/advisories/GHSA-qppj-fm5r-hxr3 - // - https://github.com/advisories/GHSA-4374-p667-p6c8 - disableHTTP2 := func(c *tls.Config) { - setupLog.Info("disabling http/2") - c.NextProtos = []string{"http/1.1"} - } - - if !enableHTTP2 { - tlsOpts = append(tlsOpts, disableHTTP2) - } - - // Initial webhook TLS options - webhookTLSOpts := tlsOpts - webhookServerOptions := webhook.Options{ - TLSOpts: webhookTLSOpts, - } - - if len(webhookCertPath) > 0 { - setupLog.Info("Initializing webhook certificate watcher using provided certificates", - "webhook-cert-path", webhookCertPath, "webhook-cert-name", webhookCertName, "webhook-cert-key", webhookCertKey) - - webhookServerOptions.CertDir = webhookCertPath - webhookServerOptions.CertName = webhookCertName - webhookServerOptions.KeyName = webhookCertKey - } - - webhookServer := webhook.NewServer(webhookServerOptions) - - // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server. - // More info: - // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.22.1/pkg/metrics/server - // - https://book.kubebuilder.io/reference/metrics.html - metricsServerOptions := metricsserver.Options{ - BindAddress: metricsAddr, - SecureServing: secureMetrics, - TLSOpts: tlsOpts, - } - - if secureMetrics { - // FilterProvider is used to protect the metrics endpoint with authn/authz. - // These configurations ensure that only authorized users and service accounts - // can access the metrics endpoint. The RBAC are configured in 'config/rbac/kustomization.yaml'. More info: - // https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.22.1/pkg/metrics/filters#WithAuthenticationAndAuthorization - metricsServerOptions.FilterProvider = filters.WithAuthenticationAndAuthorization - } - - // If the certificate is not specified, controller-runtime will automatically - // generate self-signed certificates for the metrics server. While convenient for development and testing, - // this setup is not recommended for production. - // - // - [METRICS-WITH-CERTS] at config/default/kustomization.yaml to generate and use certificates - // managed by cert-manager for the metrics server. - // - [PROMETHEUS-WITH-CERTS] at config/prometheus/kustomization.yaml for TLS certification. - if len(metricsCertPath) > 0 { - setupLog.Info("Initializing metrics certificate watcher using provided certificates", - "metrics-cert-path", metricsCertPath, "metrics-cert-name", metricsCertName, "metrics-cert-key", metricsCertKey) + // Configure HTTP/2 and TLS options + configureHTTP2(enableHTTP2, &tlsOpts) + + // Build servers' options + webhookServer := webhook.NewServer( + buildWebhookServerOptions( + webhookCertPath, + webhookCertName, + webhookCertKey, + tlsOpts, + ), + ) - metricsServerOptions.CertDir = metricsCertPath - metricsServerOptions.CertName = metricsCertName - metricsServerOptions.KeyName = metricsCertKey - } + metricsServerOptions := buildMetricsServerOptions( + metricsAddr, + secureMetrics, + metricsCertPath, + metricsCertName, + metricsCertKey, + tlsOpts, + ) mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, @@ -173,16 +129,6 @@ func main() { HealthProbeBindAddress: probeAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "d3a8b358.bubustack.io", - // LeaderElectionReleaseOnCancel defines if the leader should step down voluntarily - // when the Manager ends. This requires the binary to immediately end when the - // Manager is stopped, otherwise, this setting is unsafe. Setting this significantly - // speeds up voluntary leader transitions as the new leader don't have to wait - // LeaseDuration time first. - // - // In the default scaffold provided, the program ends immediately after - // the manager stops, so would be fine to enable this option. However, - // if you are doing or is intended to do any operation such as perform cleanups - // after the manager stops then its usage might be unsafe. // LeaderElectionReleaseOnCancel: true, }) if err != nil { @@ -194,6 +140,81 @@ func main() { managerCtx := ctrl.SetupSignalHandler() setup.SetupIndexers(managerCtx, mgr) + operatorConfigManager, controllerConfig, configResolver, celEvaluator := mustInitOperatorServices(mgr) + + deps := config.ControllerDependencies{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ConfigResolver: configResolver, + CELEvaluator: *celEvaluator, + } + + mustSetupControllers(mgr, deps, controllerConfig) + + setupWebhooksIfEnabled(mgr, operatorConfigManager) + // +kubebuilder:scaffold:builder + + mustAddHealthChecks(mgr) + + setupLog.Info("starting manager") + defer celEvaluator.Close() + if err := mgr.Start(managerCtx); err != nil { + setupLog.Error(err, "problem running manager") + os.Exit(1) + } +} + +func configureHTTP2(enableHTTP2 bool, tlsOpts *[]func(*tls.Config)) { + // if the enable-http2 flag is false (the default), http/2 should be disabled + // due to its vulnerabilities. + disableHTTP2 := func(c *tls.Config) { + setupLog.Info("disabling http/2") + c.NextProtos = []string{"http/1.1"} + } + if !enableHTTP2 { + *tlsOpts = append(*tlsOpts, disableHTTP2) + } +} + +func buildWebhookServerOptions(certPath, certName, certKey string, tlsOpts []func(*tls.Config)) webhook.Options { + opts := webhook.Options{TLSOpts: tlsOpts} + if len(certPath) > 0 { + setupLog.Info("Initializing webhook certificate watcher using provided certificates", + "webhook-cert-path", certPath, "webhook-cert-name", certName, "webhook-cert-key", certKey) + opts.CertDir = certPath + opts.CertName = certName + opts.KeyName = certKey + } + return opts +} + +func buildMetricsServerOptions( + metricsAddr string, + secure bool, + certPath, certName, certKey string, + tlsOpts []func(*tls.Config), +) metricsserver.Options { + opts := metricsserver.Options{ + BindAddress: metricsAddr, + SecureServing: secure, + TLSOpts: tlsOpts, + } + if secure { + opts.FilterProvider = filters.WithAuthenticationAndAuthorization + } + if len(certPath) > 0 { + setupLog.Info("Initializing metrics certificate watcher using provided certificates", + "metrics-cert-path", certPath, "metrics-cert-name", certName, "metrics-cert-key", certKey) + opts.CertDir = certPath + opts.CertName = certName + opts.KeyName = certKey + } + return opts +} + +func mustInitOperatorServices( + mgr ctrl.Manager, +) (*config.OperatorConfigManager, *config.ControllerConfig, *config.Resolver, *cel.Evaluator) { operatorConfigManager := config.NewOperatorConfigManager( mgr.GetClient(), "bobrapet-system", @@ -204,7 +225,6 @@ func main() { setupLog.Error(err, "unable to add operator config manager to manager") os.Exit(1) } - controllerConfig := operatorConfigManager.GetControllerConfig() setupLog.Info("Controller configuration loaded") configResolver := config.NewResolver(mgr.GetClient(), operatorConfigManager) @@ -215,14 +235,14 @@ func main() { setupLog.Error(err, "unable to create CEL evaluator") os.Exit(1) } + return operatorConfigManager, controllerConfig, configResolver, celEvaluator +} - deps := config.ControllerDependencies{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - ConfigResolver: configResolver, - CELEvaluator: *celEvaluator, - } - +func mustSetupControllers( + mgr ctrl.Manager, + deps config.ControllerDependencies, + controllerConfig *config.ControllerConfig, +) { if err := (&controller.StoryReconciler{ ControllerDependencies: deps, }).SetupWithManager(mgr, controllerConfig.BuildStoryControllerOptions()); err != nil { @@ -265,38 +285,44 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "ImpulseTemplate") os.Exit(1) } +} - if os.Getenv("ENABLE_WEBHOOKS") != "false" { - setupLog.Info("setting up webhooks") - if err := (&webhookv1alpha1.StoryWebhook{}).SetupWebhookWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create webhook", "webhook", "Story") - os.Exit(1) - } - if err := (&webhookv1alpha1.EngramWebhook{ - Config: operatorConfigManager.GetControllerConfig(), - }).SetupWebhookWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create webhook", "webhook", "Engram") - os.Exit(1) - } - if err := (&webhookv1alpha1.ImpulseWebhook{ - Config: operatorConfigManager.GetControllerConfig(), - }).SetupWebhookWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create webhook", "webhook", "Impulse") - os.Exit(1) - } - if err := (&webhookrunsv1alpha1.StoryRunWebhook{ - Config: operatorConfigManager.GetControllerConfig(), - }).SetupWebhookWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create webhook", "webhook", "StoryRun") - os.Exit(1) - } - if err := (&webhookrunsv1alpha1.StepRunWebhook{}).SetupWebhookWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create webhook", "webhook", "StepRun") - os.Exit(1) - } +func setupWebhooksIfEnabled(mgr ctrl.Manager, operatorConfigManager *config.OperatorConfigManager) { + if os.Getenv("ENABLE_WEBHOOKS") == "false" { + return } - // +kubebuilder:scaffold:builder + setupLog.Info("setting up webhooks") + if err := (&webhookv1alpha1.StoryWebhook{ + Config: operatorConfigManager.GetControllerConfig(), + }).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "Story") + os.Exit(1) + } + if err := (&webhookv1alpha1.EngramWebhook{ + Config: operatorConfigManager.GetControllerConfig(), + }).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "Engram") + os.Exit(1) + } + if err := (&webhookv1alpha1.ImpulseWebhook{ + Config: operatorConfigManager.GetControllerConfig(), + }).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "Impulse") + os.Exit(1) + } + if err := (&webhookrunsv1alpha1.StoryRunWebhook{ + Config: operatorConfigManager.GetControllerConfig(), + }).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "StoryRun") + os.Exit(1) + } + if err := (&webhookrunsv1alpha1.StepRunWebhook{}).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "StepRun") + os.Exit(1) + } +} +func mustAddHealthChecks(mgr ctrl.Manager) { if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") os.Exit(1) @@ -305,12 +331,4 @@ func main() { setupLog.Error(err, "unable to set up ready check") os.Exit(1) } - - setupLog.Info("starting manager") - // Ensure CEL evaluator background routines stop on manager shutdown - defer celEvaluator.Close() - if err := mgr.Start(managerCtx); err != nil { - setupLog.Error(err, "problem running manager") - os.Exit(1) - } } diff --git a/internal/config/operator.go b/internal/config/operator.go index 800ea32..c3e5137 100644 --- a/internal/config/operator.go +++ b/internal/config/operator.go @@ -182,39 +182,65 @@ func (ocm *OperatorConfigManager) parseConfigMap(cm *corev1.ConfigMap, config *O parseStoryRunConfig(cm, config) } -// --- helpers below keep parseConfigMap readable and reduce cyclomatic complexity --- - func parseControllerTimings(cm *corev1.ConfigMap, config *OperatorConfig) { + setMaxConcurrentReconciles(cm, config) + setRequeueBaseDelay(cm, config) + setRequeueMaxDelay(cm, config) + setHealthCheckInterval(cm, config) + setCleanupInterval(cm, config) + setReconcileTimeout(cm, config) + setDefaultEngramGRPCPort(cm, config) +} + +func setMaxConcurrentReconciles(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["controller.max-concurrent-reconciles"]; exists { if parsed, err := strconv.Atoi(val); err == nil && parsed > 0 { config.Controller.MaxConcurrentReconciles = parsed } } +} + +func setRequeueBaseDelay(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["controller.requeue-base-delay"]; exists { if parsed, err := time.ParseDuration(val); err == nil { config.Controller.RequeueBaseDelay = parsed } } +} + +func setRequeueMaxDelay(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["controller.requeue-max-delay"]; exists { if parsed, err := time.ParseDuration(val); err == nil { config.Controller.RequeueMaxDelay = parsed } } +} + +func setHealthCheckInterval(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["controller.health-check-interval"]; exists { if parsed, err := time.ParseDuration(val); err == nil { config.Controller.HealthCheckInterval = parsed } } +} + +func setCleanupInterval(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["controller.cleanup-interval"]; exists { if parsed, err := time.ParseDuration(val); err == nil { config.Controller.CleanupInterval = metav1.Duration{Duration: parsed} } } +} + +func setReconcileTimeout(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["controller.reconcile-timeout"]; exists { if parsed, err := time.ParseDuration(val); err == nil { config.Controller.ReconcileTimeout = parsed } } +} + +func setDefaultEngramGRPCPort(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["controller.default-engram-grpc-port"]; exists { if parsed, err := strconv.Atoi(val); err == nil && parsed > 0 { config.Controller.DefaultEngramGRPCPort = parsed @@ -257,36 +283,64 @@ func parseResourceLimits(cm *corev1.ConfigMap, config *OperatorConfig) { } func parseRetryAndTimeouts(cm *corev1.ConfigMap, config *OperatorConfig) { + setMaxRetries(cm, config) + setExponentialBackoffBase(cm, config) + setExponentialBackoffMax(cm, config) + setDefaultStepTimeout(cm, config) + setApprovalDefaultTimeout(cm, config) + setExternalDataTimeout(cm, config) + setConditionalTimeout(cm, config) +} + +func setMaxRetries(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["retry.max-retries"]; exists { if parsed, err := strconv.Atoi(val); err == nil && parsed >= 0 { config.Controller.MaxRetries = parsed } } +} + +func setExponentialBackoffBase(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["retry.exponential-backoff-base"]; exists { if parsed, err := time.ParseDuration(val); err == nil { config.Controller.ExponentialBackoffBase = parsed } } +} + +func setExponentialBackoffMax(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["retry.exponential-backoff-max"]; exists { if parsed, err := time.ParseDuration(val); err == nil { config.Controller.ExponentialBackoffMax = parsed } } +} + +func setDefaultStepTimeout(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["timeout.default-step"]; exists { if parsed, err := time.ParseDuration(val); err == nil { config.Controller.DefaultStepTimeout = parsed } } +} + +func setApprovalDefaultTimeout(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["timeout.approval-default"]; exists { if parsed, err := time.ParseDuration(val); err == nil { config.Controller.ApprovalDefaultTimeout = parsed } } +} + +func setExternalDataTimeout(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["timeout.external-data-default"]; exists { if parsed, err := time.ParseDuration(val); err == nil { config.Controller.ExternalDataTimeout = parsed } } +} + +func setConditionalTimeout(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["timeout.conditional-default"]; exists { if parsed, err := time.ParseDuration(val); err == nil { config.Controller.ConditionalTimeout = parsed @@ -295,26 +349,46 @@ func parseRetryAndTimeouts(cm *corev1.ConfigMap, config *OperatorConfig) { } func parseLoopConfig(cm *corev1.ConfigMap, config *OperatorConfig) { + setMaxLoopIterations(cm, config) + setDefaultLoopBatchSize(cm, config) + setMaxLoopBatchSize(cm, config) + setMaxLoopConcurrency(cm, config) + setMaxConcurrencyLimit(cm, config) +} + +func setMaxLoopIterations(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["loop.max-iterations"]; exists { if parsed, err := strconv.Atoi(val); err == nil && parsed > 0 { config.Controller.MaxLoopIterations = parsed } } +} + +func setDefaultLoopBatchSize(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["loop.default-batch-size"]; exists { if parsed, err := strconv.Atoi(val); err == nil && parsed > 0 { config.Controller.DefaultLoopBatchSize = parsed } } +} + +func setMaxLoopBatchSize(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["loop.max-batch-size"]; exists { if parsed, err := strconv.Atoi(val); err == nil && parsed > 0 { config.Controller.MaxLoopBatchSize = parsed } } +} + +func setMaxLoopConcurrency(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["loop.max-concurrency"]; exists { if parsed, err := strconv.Atoi(val); err == nil && parsed > 0 { config.Controller.MaxLoopConcurrency = parsed } } +} + +func setMaxConcurrencyLimit(cm *corev1.ConfigMap, config *OperatorConfig) { if val, exists := cm.Data["loop.max-concurrency-limit"]; exists { if parsed, err := strconv.Atoi(val); err == nil && parsed > 0 { config.Controller.MaxConcurrencyLimit = parsed diff --git a/internal/config/resolver.go b/internal/config/resolver.go index 470c1e7..4984a30 100644 --- a/internal/config/resolver.go +++ b/internal/config/resolver.go @@ -334,37 +334,56 @@ func (cr *Resolver) ApplyExecutionOverrides(overrides *v1alpha1.ExecutionOverrid return } + applyBasicOverrides(overrides, config) + applySecurityOverrides(overrides, config) + applyImagePolicyOverride(overrides, config) + applyTimeoutRetryOverrides(overrides, config) + applyProbeOverrides(overrides, config) +} + +func applyBasicOverrides(overrides *v1alpha1.ExecutionOverrides, config *ResolvedExecutionConfig) { if overrides.ServiceAccountName != nil { config.ServiceAccountName = *overrides.ServiceAccountName } if overrides.AutomountServiceAccountToken != nil { config.AutomountServiceAccountToken = *overrides.AutomountServiceAccountToken } - if overrides.Security != nil { - sec := overrides.Security - if sec.RunAsNonRoot != nil { - config.RunAsNonRoot = *sec.RunAsNonRoot - } - if sec.ReadOnlyRootFilesystem != nil { - config.ReadOnlyRootFilesystem = *sec.ReadOnlyRootFilesystem - } - if sec.AllowPrivilegeEscalation != nil { - config.AllowPrivilegeEscalation = *sec.AllowPrivilegeEscalation - } - if sec.RunAsUser != nil { - config.RunAsUser = *sec.RunAsUser - } +} + +func applySecurityOverrides(overrides *v1alpha1.ExecutionOverrides, config *ResolvedExecutionConfig) { + if overrides.Security == nil { + return } - if overrides.ImagePullPolicy != nil { - switch *overrides.ImagePullPolicy { - case string(corev1.PullAlways): - config.ImagePullPolicy = corev1.PullAlways - case string(corev1.PullNever): - config.ImagePullPolicy = corev1.PullNever - case string(corev1.PullIfNotPresent): - config.ImagePullPolicy = corev1.PullIfNotPresent - } + sec := overrides.Security + if sec.RunAsNonRoot != nil { + config.RunAsNonRoot = *sec.RunAsNonRoot + } + if sec.ReadOnlyRootFilesystem != nil { + config.ReadOnlyRootFilesystem = *sec.ReadOnlyRootFilesystem + } + if sec.AllowPrivilegeEscalation != nil { + config.AllowPrivilegeEscalation = *sec.AllowPrivilegeEscalation } + if sec.RunAsUser != nil { + config.RunAsUser = *sec.RunAsUser + } +} + +func applyImagePolicyOverride(overrides *v1alpha1.ExecutionOverrides, config *ResolvedExecutionConfig) { + if overrides.ImagePullPolicy == nil { + return + } + switch *overrides.ImagePullPolicy { + case string(corev1.PullAlways): + config.ImagePullPolicy = corev1.PullAlways + case string(corev1.PullNever): + config.ImagePullPolicy = corev1.PullNever + case string(corev1.PullIfNotPresent): + config.ImagePullPolicy = corev1.PullIfNotPresent + } +} + +func applyTimeoutRetryOverrides(overrides *v1alpha1.ExecutionOverrides, config *ResolvedExecutionConfig) { if overrides.Timeout != nil { if d, err := time.ParseDuration(*overrides.Timeout); err == nil { config.DefaultStepTimeout = d @@ -373,17 +392,20 @@ func (cr *Resolver) ApplyExecutionOverrides(overrides *v1alpha1.ExecutionOverrid if overrides.Retry != nil && overrides.Retry.MaxRetries != nil { config.MaxRetries = int(*overrides.Retry.MaxRetries) } - // Handle probe disabling at instance level - if overrides.Probes != nil { - if overrides.Probes.DisableLiveness { - config.LivenessProbe = nil - } - if overrides.Probes.DisableReadiness { - config.ReadinessProbe = nil - } - if overrides.Probes.DisableStartup { - config.StartupProbe = nil - } +} + +func applyProbeOverrides(overrides *v1alpha1.ExecutionOverrides, config *ResolvedExecutionConfig) { + if overrides.Probes == nil { + return + } + if overrides.Probes.DisableLiveness { + config.LivenessProbe = nil + } + if overrides.Probes.DisableReadiness { + config.ReadinessProbe = nil + } + if overrides.Probes.DisableStartup { + config.StartupProbe = nil } } diff --git a/internal/controller/catalog/engramtemplate_controller.go b/internal/controller/catalog/engramtemplate_controller.go index 2ac18c4..169f174 100644 --- a/internal/controller/catalog/engramtemplate_controller.go +++ b/internal/controller/catalog/engramtemplate_controller.go @@ -66,19 +66,10 @@ func (r *EngramTemplateReconciler) Reconcile(ctx context.Context, req ctrl.Reque templateLogger := rl.WithValues("template", template.Name, "version", template.Spec.Version) rl.ReconcileStart("Processing EngramTemplate") - // Validate required fields - if template.Spec.Image == "" { - r.updateErrorStatus(&template, "image is required") - rl.ReconcileError(fmt.Errorf("image missing"), "Image is required for EngramTemplate") - if err := r.updateStatus(ctx, &template); err != nil { - return ctrl.Result{RequeueAfter: 5 * time.Second}, err - } - return ctrl.Result{}, nil - } - - if template.Spec.Version == "" { - r.updateErrorStatus(&template, "version is required") - rl.ReconcileError(fmt.Errorf("version missing"), "Version is required for EngramTemplate") + // Validate required fields (extracted helper to reduce cyclomatic complexity) + if missingField, handled := r.handleRequiredFields(ctx, &template); handled { + r.updateErrorStatus(&template, fmt.Sprintf("%s is required", missingField)) + rl.ReconcileError(fmt.Errorf("%s missing", missingField), fmt.Sprintf("%s is required for EngramTemplate", missingField)) if err := r.updateStatus(ctx, &template); err != nil { return ctrl.Result{RequeueAfter: 5 * time.Second}, err } @@ -87,53 +78,22 @@ func (r *EngramTemplateReconciler) Reconcile(ctx context.Context, req ctrl.Reque templateLogger.Info("Validating EngramTemplate", "image", template.Spec.Image, "version", template.Spec.Version) - // Validate JSON schemas if provided - if template.Spec.InputSchema != nil { - if err := r.validateJSONSchema(template.Spec.InputSchema.Raw); err != nil { - r.updateErrorStatus(&template, fmt.Sprintf("invalid input schema: %v", err)) - rl.ReconcileError(err, "Invalid input schema") - if updateErr := r.updateStatus(ctx, &template); updateErr != nil { - templateLogger.Error(updateErr, "failed to update status after schema validation error") - } - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + // Validate JSON schemas if provided (extracted helper) + if which, schemaErr := r.validateAllSchemas(ctx, &template); which != "" { + r.updateErrorStatus(&template, fmt.Sprintf("invalid %s schema: %v", which, schemaErr)) + rl.ReconcileError(schemaErr, fmt.Sprintf("Invalid %s schema", which)) + if updateErr := r.updateStatus(ctx, &template); updateErr != nil { + templateLogger.Error(updateErr, "failed to update status after schema validation error") } - templateLogger.V(1).Info("Input schema validated") - } - - if template.Spec.OutputSchema != nil { - if err := r.validateJSONSchema(template.Spec.OutputSchema.Raw); err != nil { - r.updateErrorStatus(&template, fmt.Sprintf("invalid output schema: %v", err)) - rl.ReconcileError(err, "Invalid output schema") - if updateErr := r.updateStatus(ctx, &template); updateErr != nil { - templateLogger.Error(updateErr, "failed to update status after schema validation error") - } - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil - } - templateLogger.V(1).Info("Output schema validated") - } - - if template.Spec.ConfigSchema != nil { - if err := r.validateJSONSchema(template.Spec.ConfigSchema.Raw); err != nil { - r.updateErrorStatus(&template, fmt.Sprintf("invalid config schema: %v", err)) - rl.ReconcileError(err, "Invalid config schema") - if updateErr := r.updateStatus(ctx, &template); updateErr != nil { - templateLogger.Error(updateErr, "failed to update status after schema validation error") - } - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil - } - templateLogger.V(1).Info("Config schema validated") + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } // Update status to ready r.updateReadyStatus(&template) - // List all Engrams that were created from this template - var engrams bubushv1alpha1.EngramList - if err := r.List(ctx, &engrams, client.MatchingFields{"spec.templateRef.name": req.Name}); err != nil { + // List all Engrams that were created from this template (extracted helper) + if err := r.setUsageCount(ctx, &template, req.Name); err != nil { templateLogger.Error(err, "Failed to list engrams for template") - // We don't fail the reconcile, just log the error. Status will be updated on next reconcile. - } else { - template.Status.UsageCount = int32(len(engrams.Items)) } if err := r.updateStatus(ctx, &template); err != nil { @@ -150,6 +110,47 @@ func (r *EngramTemplateReconciler) Reconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, nil } +// handleRequiredFields returns which required field is missing if any, and whether it handled a condition. +func (r *EngramTemplateReconciler) handleRequiredFields(_ context.Context, template *catalogv1alpha1.EngramTemplate) (missingField string, handled bool) { + if template.Spec.Image == "" { + return "image", true + } + if template.Spec.Version == "" { + return "version", true + } + return "", false +} + +// validateAllSchemas validates input/output/config schemas and returns which failed and the error. +func (r *EngramTemplateReconciler) validateAllSchemas(_ context.Context, template *catalogv1alpha1.EngramTemplate) (which string, err error) { + if template.Spec.InputSchema != nil { + if err := r.validateJSONSchema(template.Spec.InputSchema.Raw); err != nil { + return "input", err + } + } + if template.Spec.OutputSchema != nil { + if err := r.validateJSONSchema(template.Spec.OutputSchema.Raw); err != nil { + return "output", err + } + } + if template.Spec.ConfigSchema != nil { + if err := r.validateJSONSchema(template.Spec.ConfigSchema.Raw); err != nil { + return "config", err + } + } + return "", nil +} + +// setUsageCount lists Engrams using this template and updates UsageCount on the in-memory object. +func (r *EngramTemplateReconciler) setUsageCount(ctx context.Context, template *catalogv1alpha1.EngramTemplate, templateName string) error { + var engrams bubushv1alpha1.EngramList + if err := r.List(ctx, &engrams, client.MatchingFields{"spec.templateRef.name": templateName}); err != nil { + return err + } + template.Status.UsageCount = int32(len(engrams.Items)) + return nil +} + // validateJSONSchema performs basic JSON schema validation func (r *EngramTemplateReconciler) validateJSONSchema(schemaBytes []byte) error { if len(schemaBytes) == 0 { diff --git a/internal/controller/catalog/impulsetemplate_controller.go b/internal/controller/catalog/impulsetemplate_controller.go index b5ef9a1..9012fde 100644 --- a/internal/controller/catalog/impulsetemplate_controller.go +++ b/internal/controller/catalog/impulsetemplate_controller.go @@ -72,64 +72,37 @@ func (r *ImpulseTemplateReconciler) Reconcile(ctx context.Context, req ctrl.Requ templateLogger := rl.WithValues("template", template.Name, "version", template.Spec.Version) rl.ReconcileStart("Processing ImpulseTemplate") - // Validate required fields - if template.Spec.Image == "" { - r.updateErrorStatus(&template, "image is required") - rl.ReconcileError(fmt.Errorf("image missing"), "Image is required for ImpulseTemplate") + // Validate required fields (extracted helper) + if missing, handled := r.handleRequiredFields(&template); handled { + r.updateErrorStatus(&template, fmt.Sprintf("%s is required", missing)) + rl.ReconcileError(fmt.Errorf("%s missing", missing), fmt.Sprintf("%s is required for ImpulseTemplate", missing)) if err := r.updateStatusWithRetry(ctx, &template); err != nil { return ctrl.Result{RequeueAfter: 5 * time.Second}, err } return ctrl.Result{}, nil } - if template.Spec.Version == "" { - r.updateErrorStatus(&template, "version is required") - rl.ReconcileError(fmt.Errorf("version missing"), "Version is required for ImpulseTemplate") + templateLogger.Info("Validating ImpulseTemplate", "image", template.Spec.Image, "version", template.Spec.Version) + + // Validate supported modes (extracted helper) + if invalidMode, handled := r.validateSupportedModes(&template); handled { + r.updateErrorStatus(&template, fmt.Sprintf("invalid supported mode '%s' for impulse template (must be deployment or statefulset)", invalidMode)) + rl.ReconcileError(fmt.Errorf("invalid supported mode: %s", invalidMode), "Invalid supported mode for ImpulseTemplate") if err := r.updateStatusWithRetry(ctx, &template); err != nil { return ctrl.Result{RequeueAfter: 5 * time.Second}, err } return ctrl.Result{}, nil } - - templateLogger.Info("Validating ImpulseTemplate", "image", template.Spec.Image, "version", template.Spec.Version) - - // Validate supported modes are appropriate for impulses (deployment or statefulset only) - validImpulseModes := []enums.WorkloadMode{"deployment", "statefulset"} - for _, mode := range template.Spec.SupportedModes { - if !slices.Contains(validImpulseModes, mode) { - r.updateErrorStatus(&template, fmt.Sprintf("invalid supported mode '%s' for impulse template (must be deployment or statefulset)", mode)) - rl.ReconcileError(fmt.Errorf("invalid supported mode: %s", mode), "Invalid supported mode for ImpulseTemplate") - if err := r.updateStatusWithRetry(ctx, &template); err != nil { - return ctrl.Result{RequeueAfter: 5 * time.Second}, err - } - return ctrl.Result{}, nil - } - } templateLogger.V(1).Info("Supported modes validated", "modes", template.Spec.SupportedModes) - // Validate JSON schemas if provided - if template.Spec.ContextSchema != nil { - if err := r.validateJSONSchema(template.Spec.ContextSchema.Raw); err != nil { - r.updateErrorStatus(&template, fmt.Sprintf("invalid context schema: %v", err)) - rl.ReconcileError(err, "Invalid context schema") - if updateErr := r.updateStatusWithRetry(ctx, &template); updateErr != nil { - templateLogger.Error(updateErr, "failed to update status after schema validation error") - } - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + // Validate JSON schemas if provided (extracted helper) + if which, serr := r.validateAllSchemas(&template); which != "" { + r.updateErrorStatus(&template, fmt.Sprintf("invalid %s schema: %v", which, serr)) + rl.ReconcileError(serr, fmt.Sprintf("Invalid %s schema", which)) + if updateErr := r.updateStatusWithRetry(ctx, &template); updateErr != nil { + templateLogger.Error(updateErr, "failed to update status after schema validation error") } - templateLogger.V(1).Info("Context schema validated") - } - - if template.Spec.ConfigSchema != nil { - if err := r.validateJSONSchema(template.Spec.ConfigSchema.Raw); err != nil { - r.updateErrorStatus(&template, fmt.Sprintf("invalid config schema: %v", err)) - rl.ReconcileError(err, "Invalid config schema") - if updateErr := r.updateStatusWithRetry(ctx, &template); updateErr != nil { - templateLogger.Error(updateErr, "failed to update status after schema validation error") - } - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil - } - templateLogger.V(1).Info("Config schema validated") + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } // Update status to ready @@ -235,3 +208,40 @@ func (r *ImpulseTemplateReconciler) setupControllerWithOptions(mgr ctrl.Manager, Named("catalog-impulsetemplate"). Complete(r) } + +// handleRequiredFields returns which required field is missing if any, and whether it's handled. +func (r *ImpulseTemplateReconciler) handleRequiredFields(t *catalogv1alpha1.ImpulseTemplate) (missing string, handled bool) { + if t.Spec.Image == "" { + return "image", true + } + if t.Spec.Version == "" { + return "version", true + } + return "", false +} + +// validateSupportedModes ensures only deployment/statefulset are allowed for impulses. +func (r *ImpulseTemplateReconciler) validateSupportedModes(t *catalogv1alpha1.ImpulseTemplate) (invalid enums.WorkloadMode, handled bool) { + valid := []enums.WorkloadMode{"deployment", "statefulset"} + for _, mode := range t.Spec.SupportedModes { + if !slices.Contains(valid, mode) { + return mode, true + } + } + return "", false +} + +// validateAllSchemas validates context/config schemas and returns which failed and error. +func (r *ImpulseTemplateReconciler) validateAllSchemas(t *catalogv1alpha1.ImpulseTemplate) (which string, err error) { + if t.Spec.ContextSchema != nil { + if err := r.validateJSONSchema(t.Spec.ContextSchema.Raw); err != nil { + return "context", err + } + } + if t.Spec.ConfigSchema != nil { + if err := r.validateJSONSchema(t.Spec.ConfigSchema.Raw); err != nil { + return "config", err + } + } + return "", nil +} diff --git a/internal/controller/realtime_engram_controller.go b/internal/controller/realtime_engram_controller.go index 3351444..003460d 100644 --- a/internal/controller/realtime_engram_controller.go +++ b/internal/controller/realtime_engram_controller.go @@ -131,11 +131,8 @@ func (r *RealtimeEngramReconciler) Reconcile(ctx context.Context, req ctrl.Reque metrics.RecordControllerReconcile("realtime-engram", time.Since(startTime), err) }() - // Bound reconcile duration - timeout := r.ConfigResolver.GetOperatorConfig().Controller.ReconcileTimeout - if timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, timeout) + ctx, cancel := r.withReconcileTimeout(ctx) + if cancel != nil { defer cancel() } @@ -144,121 +141,166 @@ func (r *RealtimeEngramReconciler) Reconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, client.IgnoreNotFound(err) } - // This reconciler only handles non-job modes. - // If the engram is not in a realtime mode, we should ensure our finalizer is removed and then stop. - if engram.Spec.Mode != enums.WorkloadModeDeployment && engram.Spec.Mode != enums.WorkloadModeStatefulSet { - if controllerutil.ContainsFinalizer(&engram, RealtimeEngramFinalizer) { - controllerutil.RemoveFinalizer(&engram, RealtimeEngramFinalizer) - if err := r.Update(ctx, &engram); err != nil { - log.Error(err, "Failed to remove realtime finalizer from job-mode Engram") - return ctrl.Result{}, err - } - } - return ctrl.Result{}, nil + // If not a realtime mode, ensure finalizer is removed and stop. + if handled, err := r.handleNonRealtimeFinalizerIfNeeded(ctx, &engram, log); handled || err != nil { + return ctrl.Result{}, err } // Handle deletion - if !engram.DeletionTimestamp.IsZero() { - if err := r.reconcileDelete(ctx, &engram); err != nil { - return ctrl.Result{}, err - } - return ctrl.Result{}, nil + if handled, err := r.handleDeletionIfNeeded(ctx, &engram); handled || err != nil { + return ctrl.Result{}, err } - // Add finalizer if it doesn't exist - if !controllerutil.ContainsFinalizer(&engram, RealtimeEngramFinalizer) { - controllerutil.AddFinalizer(&engram, RealtimeEngramFinalizer) - if err := r.Update(ctx, &engram); err != nil { - log.Error(err, "Failed to add realtime finalizer") - return ctrl.Result{}, err - } + // Ensure finalizer exists + if err := r.ensureRealtimeFinalizer(ctx, &engram, log); err != nil { + return ctrl.Result{}, err } - // Get the referenced EngramTemplate - template, err := r.getEngramTemplate(ctx, &engram) - if err != nil { - if errors.IsNotFound(err) { - log.Info("EngramTemplate not found for Engram, setting status to Blocked") - updateErr := patch.RetryableStatusPatch(ctx, r.Client, &engram, func(obj client.Object) { - e := obj.(*v1alpha1.Engram) - cm := conditions.NewConditionManager(e.Generation) - cm.SetCondition(&e.Status.Conditions, conditions.ConditionReady, metav1.ConditionFalse, conditions.ReasonTemplateNotFound, err.Error()) - e.Status.Phase = enums.PhaseBlocked - }) - // We return the error from the patch operation, but if it's nil, we stop reconciling. - return ctrl.Result{}, updateErr + return r.reconcileRealtime(ctx, &engram, log) +} + +// withReconcileTimeout applies a controller-configured timeout to the given context. +func (r *RealtimeEngramReconciler) withReconcileTimeout(ctx context.Context) (context.Context, context.CancelFunc) { + timeout := r.ConfigResolver.GetOperatorConfig().Controller.ReconcileTimeout + if timeout <= 0 { + return ctx, nil + } + ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout) + return ctxWithTimeout, cancel +} + +// handleNonRealtimeFinalizerIfNeeded removes the realtime finalizer for non-realtime modes. +func (r *RealtimeEngramReconciler) handleNonRealtimeFinalizerIfNeeded(ctx context.Context, engram *v1alpha1.Engram, log *logging.ControllerLogger) (bool, error) { + if engram.Spec.Mode == enums.WorkloadModeDeployment || engram.Spec.Mode == enums.WorkloadModeStatefulSet { + return false, nil + } + if controllerutil.ContainsFinalizer(engram, RealtimeEngramFinalizer) { + controllerutil.RemoveFinalizer(engram, RealtimeEngramFinalizer) + if err := r.Update(ctx, engram); err != nil { + log.Error(err, "Failed to remove realtime finalizer from non-realtime Engram") + return true, err } - // For other errors, log as an error and requeue. - log.Error(err, "Failed to get EngramTemplate") - return ctrl.Result{}, err } + return true, nil +} + +// handleDeletionIfNeeded handles deletion flow when DeletionTimestamp is set. +func (r *RealtimeEngramReconciler) handleDeletionIfNeeded(ctx context.Context, engram *v1alpha1.Engram) (bool, error) { + if engram.DeletionTimestamp.IsZero() { + return false, nil + } + if err := r.reconcileDelete(ctx, engram); err != nil { + return true, err + } + return true, nil +} - // If we are here, the template was found. Mark the engram as ready. - if err := patch.RetryableStatusPatch(ctx, r.Client, &engram, func(obj client.Object) { +// ensureRealtimeFinalizer ensures the reconciler finalizer is present. +func (r *RealtimeEngramReconciler) ensureRealtimeFinalizer(ctx context.Context, engram *v1alpha1.Engram, log *logging.ControllerLogger) error { + if controllerutil.ContainsFinalizer(engram, RealtimeEngramFinalizer) { + return nil + } + controllerutil.AddFinalizer(engram, RealtimeEngramFinalizer) + if err := r.Update(ctx, engram); err != nil { + log.Error(err, "Failed to add realtime finalizer") + return err + } + return nil +} + +// markTemplateNotFound patches status to Blocked when the template is missing. +func (r *RealtimeEngramReconciler) markTemplateNotFound(ctx context.Context, engram *v1alpha1.Engram, cause error) (ctrl.Result, error) { + updateErr := patch.RetryableStatusPatch(ctx, r.Client, engram, func(obj client.Object) { + e := obj.(*v1alpha1.Engram) + cm := conditions.NewConditionManager(e.Generation) + cm.SetCondition(&e.Status.Conditions, conditions.ConditionReady, metav1.ConditionFalse, conditions.ReasonTemplateNotFound, cause.Error()) + e.Status.Phase = enums.PhaseBlocked + }) + return ctrl.Result{}, updateErr +} + +// markTemplateResolvedReady marks Engram as Ready after template resolution. +func (r *RealtimeEngramReconciler) markTemplateResolvedReady(ctx context.Context, engram *v1alpha1.Engram) error { + return patch.RetryableStatusPatch(ctx, r.Client, engram, func(obj client.Object) { e := obj.(*v1alpha1.Engram) cm := conditions.NewConditionManager(e.Generation) cm.SetCondition(&e.Status.Conditions, conditions.ConditionReady, metav1.ConditionTrue, conditions.ReasonTemplateResolved, "Engram template was found and is available.") - // We only set phase for terminal/blocked states at this level. - // Running state is managed by the workload reconciliation. if e.Status.Phase == enums.PhaseBlocked { e.Status.Phase = enums.PhasePending } - }); err != nil { - log.Error(err, "Failed to update Engram status to Ready") - return ctrl.Result{}, err + }) +} + +// ensureRunnerServiceAccount ensures a non-default ServiceAccount is used. +func (r *RealtimeEngramReconciler) ensureRunnerServiceAccount(ctx context.Context, engram *v1alpha1.Engram, resolved *config.ResolvedExecutionConfig, log *logging.ControllerLogger) error { + if resolved.ServiceAccountName != "" && resolved.ServiceAccountName != "default" { + return nil + } + saName := fmt.Sprintf("%s-engram-runner", engram.Name) + if err := r.ensureEngramServiceAccount(ctx, engram, saName); err != nil { + log.Error(err, "Failed to ensure ServiceAccount for realtime engram") + return err } + resolved.ServiceAccountName = saName + return nil +} - // Determine execution mode - Job, Deployment, or StatefulSet - // This logic decides which type of workload to create for the Engram +// reconcileWorkload dispatches to the proper workload reconciler based on mode. +func (r *RealtimeEngramReconciler) reconcileWorkload(ctx context.Context, engram *v1alpha1.Engram, resolved *config.ResolvedExecutionConfig) error { mode := engram.Spec.Mode if mode == "" { - // Default to job if not specified mode = enums.WorkloadModeJob } - - // This reconciler only handles non-job modes. if mode == enums.WorkloadModeJob { - // This case is handled by the check at the beginning of the function. - return ctrl.Result{}, nil + return nil } + switch mode { + case enums.WorkloadModeDeployment: + return r.reconcileDeployment(ctx, engram, resolved) + case enums.WorkloadModeStatefulSet: + return r.reconcileStatefulSet(ctx, engram, resolved) + default: + return nil + } +} - // Resolve the full configuration - resolvedConfig, err := r.ConfigResolver.ResolveExecutionConfig(ctx, nil, nil, &engram, template) +// reconcileRealtime performs the core realtime reconciliation steps after guards. +func (r *RealtimeEngramReconciler) reconcileRealtime(ctx context.Context, engram *v1alpha1.Engram, log *logging.ControllerLogger) (ctrl.Result, error) { + // Fetch template + template, err := r.getEngramTemplate(ctx, engram) if err != nil { - log.Error(err, "Failed to resolve execution config") - // TODO: Update status with an error condition + if errors.IsNotFound(err) { + return r.markTemplateNotFound(ctx, engram, err) + } + log.Error(err, "Failed to get EngramTemplate") return ctrl.Result{}, err } - // Ensure a non-default ServiceAccount for realtime engrams if none was specified - if resolvedConfig.ServiceAccountName == "" || resolvedConfig.ServiceAccountName == "default" { - saName := fmt.Sprintf("%s-engram-runner", engram.Name) - if err := r.ensureEngramServiceAccount(ctx, &engram, saName); err != nil { - log.Error(err, "Failed to ensure ServiceAccount for realtime engram") - return ctrl.Result{}, err - } - resolvedConfig.ServiceAccountName = saName + // Mark template resolved + if err := r.markTemplateResolvedReady(ctx, engram); err != nil { + log.Error(err, "Failed to update Engram status to Ready") + return ctrl.Result{}, err } - // At this point, we reconcile the necessary resources for service-like engrams. - switch mode { - case enums.WorkloadModeDeployment: - if err := r.reconcileDeployment(ctx, &engram, resolvedConfig); err != nil { - return ctrl.Result{}, err - } - case enums.WorkloadModeStatefulSet: - if err := r.reconcileStatefulSet(ctx, &engram, resolvedConfig); err != nil { - return ctrl.Result{}, err - } + // Resolve execution config + resolvedConfig, err := r.ConfigResolver.ResolveExecutionConfig(ctx, nil, nil, engram, template) + if err != nil { + log.Error(err, "Failed to resolve execution config") + return ctrl.Result{}, err } - // Reconcile the associated Service - if err := r.reconcileService(ctx, &engram, resolvedConfig); err != nil { + // Ensure ServiceAccount + if err := r.ensureRunnerServiceAccount(ctx, engram, resolvedConfig, log); err != nil { return ctrl.Result{}, err } - // For now, the reconciler's main job is to keep the status in sync with the managed resources. - // A more advanced implementation would handle updates, scaling, etc. + // Reconcile workload and service + if err := r.reconcileWorkload(ctx, engram, resolvedConfig); err != nil { + return ctrl.Result{}, err + } + if err := r.reconcileService(ctx, engram, resolvedConfig); err != nil { + return ctrl.Result{}, err + } return ctrl.Result{}, nil } diff --git a/internal/controller/runs/dag.go b/internal/controller/runs/dag.go index 9a97621..2b6e580 100644 --- a/internal/controller/runs/dag.go +++ b/internal/controller/runs/dag.go @@ -43,93 +43,49 @@ func NewDAGReconciler(k8sClient client.Client, celEval *cel.Evaluator, stepExecu func (r *DAGReconciler) Reconcile(ctx context.Context, srun *runsv1alpha1.StoryRun, story *bubuv1alpha1.Story) (ctrl.Result, error) { log := logging.NewReconcileLogger(ctx, "storyrun-dag").WithValues("storyrun", srun.Name) - // 1. Initialize Status - if srun.Status.StepStates == nil { - srun.Status.StepStates = make(map[string]runsv1alpha1.StepState) - } + r.initStepStatesIfNeeded(srun) - // 2. Sync State from Child Resources (StepRuns) stepRunList, err := r.syncStateFromStepRuns(ctx, srun) if err != nil { return ctrl.Result{}, err } log.Info("Synced StepRuns", "count", len(stepRunList.Items)) - // Loop to traverse the DAG as far as possible in one reconciliation cycle. - // The loop is bounded by the number of steps to prevent infinite cycles in case of bugs. priorStepOutputs, err := getPriorStepOutputs(ctx, r.Client, srun, stepRunList) if err != nil { return ctrl.Result{}, fmt.Errorf("failed to get prior step outputs: %w", err) } - for i := 0; i < len(story.Spec.Steps)+1; i++ { // +1 to allow one final check after all steps are processed - // Sync state from synchronous sub-stories - if updated := r.checkSyncSubStories(ctx, srun, story); updated { - // If a sub-story status was synced, we must re-fetch the outputs - // Inefficient to re-list all step runs, but acceptable for now as sub-stories are less common. - // A future improvement would be to only fetch the single sub-story's output and merge it in. - stepRunList, err = r.syncStateFromStepRuns(ctx, srun) - if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to re-sync step runs after sub-story sync: %w", err) - } - priorStepOutputs, err = getPriorStepOutputs(ctx, r.Client, srun, stepRunList) - if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to re-fetch prior step outputs after sub-story sync: %w", err) - } + + for i := 0; i < len(story.Spec.Steps)+1; i++ { // bounded traversal + _, priorStepOutputs, err = r.refreshAfterSubStoriesIfNeeded(ctx, srun, story, stepRunList, priorStepOutputs) + if err != nil { + return ctrl.Result{}, err } - // Build state maps for DAG traversal completedSteps, runningSteps, failedSteps := buildStateMaps(srun.Status.StepStates) - - // 5. Check for StoryRun Completion or Failure - if len(failedSteps) > 0 && r.shouldFailFast(story) { - return ctrl.Result{}, r.setStoryRunPhase(ctx, srun, enums.PhaseFailed, "A step failed and fail-fast policy is enabled.") - } - if len(completedSteps) == len(story.Spec.Steps) { - if err := r.finalizeSuccessfulRun(ctx, srun, story); err != nil { - log.Error(err, "Failed to finalize successful story run") - // Try to set phase to failed, but the finalization error is the root cause. - _ = r.setStoryRunPhase(ctx, srun, enums.PhaseFailed, "Failed to evaluate final output template.") - return ctrl.Result{}, err // Return the evaluation error - } - return ctrl.Result{}, nil // Finalization handles setting the Succeeded phase + if done, err := r.checkCompletionOrFailure(ctx, srun, story, failedSteps, completedSteps, log); done || err != nil { + return ctrl.Result{}, err } - // 6. Persist current status BEFORE launching new steps so downstream resolvers can see prior outputs. - // This prevents a race where a newly created StepRun's reconciler fetches the - // parent StoryRun before its status is updated with the latest completed steps. - if err := patch.RetryableStatusPatch(ctx, r.Client, srun, func(obj client.Object) { - sr := obj.(*runsv1alpha1.StoryRun) - sr.Status.StepStates = srun.Status.StepStates - }); err != nil { + if err := r.persistStepStates(ctx, srun); err != nil { log.Error(err, "Failed to patch StoryRun status before launching steps") return ctrl.Result{}, err } - // 7. Find and Launch Ready Steps readySteps, skippedSteps, err := r.findAndLaunchReadySteps(ctx, srun, story, completedSteps, runningSteps, priorStepOutputs) if err != nil { return ctrl.Result{}, err } - // 8. Persist Status Changes (including skipped steps) - err = patch.RetryableStatusPatch(ctx, r.Client, srun, func(obj client.Object) { - sr := obj.(*runsv1alpha1.StoryRun) - // Merge the maps to ensure we don't lose state - for k, v := range srun.Status.StepStates { - sr.Status.StepStates[k] = v - } - }) - if err != nil { + if err := r.persistMergedStates(ctx, srun); err != nil { log.Error(err, "Failed to patch StoryRun status") return ctrl.Result{}, err } - if len(readySteps) == 0 && len(skippedSteps) == 0 { - break // DAG is stable, no more progress can be made in this cycle. + break } - // After launching, immediately sync state again and refresh prior outputs - // to allow the next level of the DAG to be processed in this cycle. + // refresh for next level stepRunList, err = r.syncStateFromStepRuns(ctx, srun) if err != nil { return ctrl.Result{}, err @@ -139,10 +95,61 @@ func (r *DAGReconciler) Reconcile(ctx context.Context, srun *runsv1alpha1.StoryR return ctrl.Result{}, err } } - return ctrl.Result{}, nil } +func (r *DAGReconciler) initStepStatesIfNeeded(srun *runsv1alpha1.StoryRun) { + if srun.Status.StepStates == nil { + srun.Status.StepStates = make(map[string]runsv1alpha1.StepState) + } +} + +func (r *DAGReconciler) refreshAfterSubStoriesIfNeeded(ctx context.Context, srun *runsv1alpha1.StoryRun, story *bubuv1alpha1.Story, stepRunList *runsv1alpha1.StepRunList, prior map[string]any) (*runsv1alpha1.StepRunList, map[string]any, error) { + if updated := r.checkSyncSubStories(ctx, srun, story); !updated { + return stepRunList, prior, nil + } + lst, err := r.syncStateFromStepRuns(ctx, srun) + if err != nil { + return nil, nil, fmt.Errorf("failed to re-sync step runs after sub-story sync: %w", err) + } + out, err := getPriorStepOutputs(ctx, r.Client, srun, lst) + if err != nil { + return nil, nil, fmt.Errorf("failed to re-fetch prior step outputs after sub-story sync: %w", err) + } + return lst, out, nil +} + +func (r *DAGReconciler) checkCompletionOrFailure(ctx context.Context, srun *runsv1alpha1.StoryRun, story *bubuv1alpha1.Story, failed, completed map[string]bool, log *logging.ControllerLogger) (bool, error) { + if len(failed) > 0 && r.shouldFailFast(story) { + return true, r.setStoryRunPhase(ctx, srun, enums.PhaseFailed, "A step failed and fail-fast policy is enabled.") + } + if len(completed) == len(story.Spec.Steps) { + if err := r.finalizeSuccessfulRun(ctx, srun, story); err != nil { + log.Error(err, "Failed to finalize successful story run") + _ = r.setStoryRunPhase(ctx, srun, enums.PhaseFailed, "Failed to evaluate final output template.") + return true, err + } + return true, nil + } + return false, nil +} + +func (r *DAGReconciler) persistStepStates(ctx context.Context, srun *runsv1alpha1.StoryRun) error { + return patch.RetryableStatusPatch(ctx, r.Client, srun, func(obj client.Object) { + sr := obj.(*runsv1alpha1.StoryRun) + sr.Status.StepStates = srun.Status.StepStates + }) +} + +func (r *DAGReconciler) persistMergedStates(ctx context.Context, srun *runsv1alpha1.StoryRun) error { + return patch.RetryableStatusPatch(ctx, r.Client, srun, func(obj client.Object) { + sr := obj.(*runsv1alpha1.StoryRun) + for k, v := range srun.Status.StepStates { + sr.Status.StepStates[k] = v + } + }) +} + func (r *DAGReconciler) syncStateFromStepRuns(ctx context.Context, srun *runsv1alpha1.StoryRun) (*runsv1alpha1.StepRunList, error) { log := logging.NewReconcileLogger(ctx, "storyrun-dag-sync") var stepRunList runsv1alpha1.StepRunList @@ -261,25 +268,48 @@ func getPriorStepOutputs(ctx context.Context, c client.Client, srun *runsv1alpha log := logging.NewReconcileLogger(ctx, "dag-output-resolver") outputs := make(map[string]any) - // The StoryRun's status.stepOutputs field is deprecated and no longer used. - // Instead, we always resolve outputs by listing the child StepRun objects, - // which is the single source of truth. - if stepRunList == nil { - var newList runsv1alpha1.StepRunList - if err := c.List(ctx, &newList, client.InNamespace(srun.Namespace), - client.MatchingLabels{"bubustack.io/storyrun": srun.Name}); err != nil { - log.Error(err, "Failed to list StepRuns for output resolution") - return nil, err - } - stepRunList = &newList + // Ensure we have a list of StepRuns to inspect + lst, err := ensureStepRunList(ctx, c, srun, stepRunList, log) + if err != nil { + return nil, err + } + + // Collect outputs from StepRuns and synchronous sub-stories + collectOutputsFromStepRuns(lst, outputs, log) + collectOutputsFromSubStories(ctx, c, srun, outputs, log) + + // Add normalized aliases for CEL expressions + addAliasKeys(outputs) + + return outputs, nil +} + +// ensureStepRunList returns a non-nil StepRunList scoped to the StoryRun +func ensureStepRunList( + ctx context.Context, + c client.Client, + srun *runsv1alpha1.StoryRun, + stepRunList *runsv1alpha1.StepRunList, + log *logging.ReconcileLogger, +) (*runsv1alpha1.StepRunList, error) { + if stepRunList != nil { + return stepRunList, nil } + var newList runsv1alpha1.StepRunList + if err := c.List(ctx, &newList, client.InNamespace(srun.Namespace), client.MatchingLabels{"bubustack.io/storyrun": srun.Name}); err != nil { + log.Error(err, "Failed to list StepRuns for output resolution") + return nil, err + } + return &newList, nil +} +// collectOutputsFromStepRuns extracts outputs from succeeded StepRuns into outputs map +func collectOutputsFromStepRuns(stepRunList *runsv1alpha1.StepRunList, outputs map[string]any, log *logging.ReconcileLogger) { for i := range stepRunList.Items { sr := &stepRunList.Items[i] if _, exists := outputs[sr.Spec.StepID]; exists { - continue // An output for this step has already been resolved. + continue } - if sr.Status.Phase == enums.PhaseSucceeded && sr.Status.Output != nil { var outputData map[string]any if err := json.Unmarshal(sr.Status.Output.Raw, &outputData); err != nil { @@ -289,41 +319,42 @@ func getPriorStepOutputs(ctx context.Context, c client.Client, srun *runsv1alpha outputs[sr.Spec.StepID] = map[string]any{"outputs": outputData} } } +} - // Also resolve outputs from completed synchronous sub-stories. +// collectOutputsFromSubStories extracts outputs from completed synchronous sub-stories +func collectOutputsFromSubStories(ctx context.Context, c client.Client, srun *runsv1alpha1.StoryRun, outputs map[string]any, log *logging.ReconcileLogger) { for stepID, state := range srun.Status.StepStates { - if state.Phase == enums.PhaseSucceeded && state.SubStoryRunName != "" { - if _, exists := outputs[stepID]; exists { - continue // Already populated, maybe from a StepRun (shouldn't happen for substories) - } - - subRun := &runsv1alpha1.StoryRun{} - subRunKey := types.NamespacedName{Name: state.SubStoryRunName, Namespace: srun.Namespace} - if err := c.Get(ctx, subRunKey, subRun); err != nil { - log.Error(err, "Failed to get sub-StoryRun for output resolution", "subStoryRun", state.SubStoryRunName) - continue // Don't fail the whole reconcile, just skip this output - } - - if subRun.Status.Output != nil { - var outputData map[string]any - if err := json.Unmarshal(subRun.Status.Output.Raw, &outputData); err != nil { - log.Error(err, "Failed to unmarshal output from sub-StoryRun", "step", stepID) - continue - } - outputs[stepID] = map[string]any{"outputs": outputData} + if state.Phase != enums.PhaseSucceeded || state.SubStoryRunName == "" { + continue + } + if _, exists := outputs[stepID]; exists { + continue + } + subRun := &runsv1alpha1.StoryRun{} + subRunKey := types.NamespacedName{Name: state.SubStoryRunName, Namespace: srun.Namespace} + if err := c.Get(ctx, subRunKey, subRun); err != nil { + log.Error(err, "Failed to get sub-StoryRun for output resolution", "subStoryRun", state.SubStoryRunName) + continue + } + if subRun.Status.Output != nil { + var outputData map[string]any + if err := json.Unmarshal(subRun.Status.Output.Raw, &outputData); err != nil { + log.Error(err, "Failed to unmarshal output from sub-StoryRun", "step", stepID) + continue } + outputs[stepID] = map[string]any{"outputs": outputData} } } +} - // Add normalized aliases for CEL expressions. +// addAliasKeys duplicates keys using normalized aliases for CEL convenience +func addAliasKeys(outputs map[string]any) { for stepID, stepContext := range outputs { alias := normalizeStepIdentifier(stepID) if _, exists := outputs[alias]; !exists { outputs[alias] = stepContext } } - - return outputs, nil } func (r *DAGReconciler) findReadySteps(ctx context.Context, steps []bubuv1alpha1.Step, completed, running map[string]bool, dependencies map[string]map[string]bool, vars map[string]any) ([]*bubuv1alpha1.Step, []*bubuv1alpha1.Step) { diff --git a/internal/controller/runs/rbac.go b/internal/controller/runs/rbac.go index 8ac9cdd..77d0dbe 100644 --- a/internal/controller/runs/rbac.go +++ b/internal/controller/runs/rbac.go @@ -31,59 +31,45 @@ func NewRBACManager(k8sClient client.Client, scheme *runtime.Scheme) *RBACManage // Reconcile ensures the necessary ServiceAccount, Role, and RoleBinding exist for the StoryRun. func (r *RBACManager) Reconcile(ctx context.Context, storyRun *runsv1alpha1.StoryRun) error { log := logging.NewReconcileLogger(ctx, "storyrun-rbac") - - story, err := r.getStoryForRun(ctx, storyRun) - if err != nil { - // If the story isn't found, we can't determine the storage policy, but we can still proceed - // with the basic RBAC setup. The error will be handled in the main reconcile loop. - log.Error(err, "Could not get parent story for RBAC setup, proceeding without storage policy") - } + story, _ := r.getStoryForRun(ctx, storyRun) saName := fmt.Sprintf("%s-engram-runner", storyRun.Name) - sa := &corev1.ServiceAccount{ - ObjectMeta: metav1.ObjectMeta{ - Name: saName, - Namespace: storyRun.Namespace, - }, + if err := r.reconcileServiceAccount(ctx, storyRun, story, saName, log); err != nil { + return err + } + if err := r.reconcileRole(ctx, storyRun, saName, log); err != nil { + return err } + if err := r.reconcileRoleBinding(ctx, storyRun, saName, log); err != nil { + return err + } + return nil +} - _, err = controllerutil.CreateOrUpdate(ctx, r.Client, sa, func() error { +func (r *RBACManager) reconcileServiceAccount(ctx context.Context, storyRun *runsv1alpha1.StoryRun, story *bubuv1alpha1.Story, saName string, log *logging.ReconcileLogger) error { + sa := &corev1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Name: saName, Namespace: storyRun.Namespace}} + _, err := controllerutil.CreateOrUpdate(ctx, r.Client, sa, func() error { if sa.Annotations == nil { sa.Annotations = make(map[string]string) } - - if story != nil && story.Spec.Policy != nil && story.Spec.Policy.Storage != nil && story.Spec.Policy.Storage.S3 != nil && - story.Spec.Policy.Storage.S3.Authentication.ServiceAccountAnnotations != nil { + if story != nil && story.Spec.Policy != nil && story.Spec.Policy.Storage != nil && story.Spec.Policy.Storage.S3 != nil && story.Spec.Policy.Storage.S3.Authentication.ServiceAccountAnnotations != nil { log.Info("Applying S3 ServiceAccount annotations from StoragePolicy") for k, v := range story.Spec.Policy.Storage.S3.Authentication.ServiceAccountAnnotations { sa.Annotations[k] = v } } - return controllerutil.SetOwnerReference(storyRun, sa, r.Scheme) }) - if err != nil { return fmt.Errorf("failed to create or update ServiceAccount: %w", err) } + return nil +} +func (r *RBACManager) reconcileRole(ctx context.Context, storyRun *runsv1alpha1.StoryRun, saName string, log *logging.ReconcileLogger) error { role := &rbacv1.Role{ - ObjectMeta: metav1.ObjectMeta{ - Name: saName, - Namespace: storyRun.Namespace, - }, - Rules: []rbacv1.PolicyRule{ - { - APIGroups: []string{"runs.bubustack.io"}, - Resources: []string{"stepruns"}, - Verbs: []string{"get", "watch"}, - }, - { - APIGroups: []string{"runs.bubustack.io"}, - Resources: []string{"stepruns/status"}, - Verbs: []string{"patch", "update"}, - }, - }, + ObjectMeta: metav1.ObjectMeta{Name: saName, Namespace: storyRun.Namespace}, + Rules: []rbacv1.PolicyRule{{APIGroups: []string{"runs.bubustack.io"}, Resources: []string{"stepruns"}, Verbs: []string{"get", "watch"}}, {APIGroups: []string{"runs.bubustack.io"}, Resources: []string{"stepruns/status"}, Verbs: []string{"patch", "update"}}}, } if err := controllerutil.SetOwnerReference(storyRun, role, r.Scheme); err != nil { return fmt.Errorf("failed to set owner reference on Role: %w", err) @@ -93,24 +79,14 @@ func (r *RBACManager) Reconcile(ctx context.Context, storyRun *runsv1alpha1.Stor } else if err == nil { log.Info("Created Role for Engram runner", "role", role.Name) } + return nil +} +func (r *RBACManager) reconcileRoleBinding(ctx context.Context, storyRun *runsv1alpha1.StoryRun, saName string, log *logging.ReconcileLogger) error { rb := &rbacv1.RoleBinding{ - ObjectMeta: metav1.ObjectMeta{ - Name: saName, - Namespace: storyRun.Namespace, - }, - Subjects: []rbacv1.Subject{ - { - Kind: "ServiceAccount", - Name: saName, - Namespace: storyRun.Namespace, - }, - }, - RoleRef: rbacv1.RoleRef{ - APIGroup: "rbac.authorization.k8s.io", - Kind: "Role", - Name: saName, - }, + ObjectMeta: metav1.ObjectMeta{Name: saName, Namespace: storyRun.Namespace}, + Subjects: []rbacv1.Subject{{Kind: "ServiceAccount", Name: saName, Namespace: storyRun.Namespace}}, + RoleRef: rbacv1.RoleRef{APIGroup: "rbac.authorization.k8s.io", Kind: "Role", Name: saName}, } if err := controllerutil.SetOwnerReference(storyRun, rb, r.Scheme); err != nil { return fmt.Errorf("failed to set owner reference on RoleBinding: %w", err) @@ -120,7 +96,6 @@ func (r *RBACManager) Reconcile(ctx context.Context, storyRun *runsv1alpha1.Stor } else if err == nil { log.Info("Created RoleBinding for Engram runner", "roleBinding", rb.Name) } - return nil } diff --git a/internal/controller/runs/steprun_controller.go b/internal/controller/runs/steprun_controller.go index f2b8d73..4c7d49c 100644 --- a/internal/controller/runs/steprun_controller.go +++ b/internal/controller/runs/steprun_controller.go @@ -355,71 +355,93 @@ func (r *StepRunReconciler) reconcileDelete(ctx context.Context, step *runsv1alp func (r *StepRunReconciler) createJobForStep(ctx context.Context, srun *runsv1alpha1.StepRun, engram *v1alpha1.Engram, engramTemplate *catalogv1alpha1.EngramTemplate) (*batchv1.Job, error) { stepLogger := logging.NewControllerLogger(ctx, "steprun").WithStepRun(srun) - // Fetch the parent Story to provide the full context for the resolver. - story, err := r.getStoryForStep(ctx, srun) + story, storyRun, resolvedConfig, inputBytes, stepTimeout, err := r.prepareExecutionContext(ctx, srun, engram, engramTemplate, stepLogger) if err != nil { - return nil, fmt.Errorf("failed to get story for step: %w", err) + return nil, err + } + + secretEnvVars, volumes, volumeMounts := r.setupSecrets(ctx, resolvedConfig, engramTemplate) + envVars := r.buildBaseEnvVars(srun, story, inputBytes, stepTimeout) + envVars = r.appendGRPCTuningEnv(envVars) + envVars = r.appendStorageEnv(envVars, resolvedConfig, stepLogger) + + activeDeadlineSeconds := int64((stepTimeout + 2*time.Minute).Seconds()) + startedAt := metav1.Now() + if srun.Status.StartedAt != nil { + startedAt = *srun.Status.StartedAt + } + r.addStartedAtEnv(&envVars, startedAt) + envVars = append(envVars, secretEnvVars...) + if engram.Spec.With != nil && len(engram.Spec.With.Raw) > 0 { + envVars = append(envVars, corev1.EnvVar{Name: "BUBU_CONFIG", Value: string(engram.Spec.With.Raw)}) } - // Fetch the parent StoryRun to get inputs and outputs context. + job := r.buildJobSpec(srun, resolvedConfig, envVars, volumes, volumeMounts, activeDeadlineSeconds) + if err := controllerutil.SetControllerReference(srun, job, r.Scheme); err != nil { + return nil, err + } + _ = storyRun // storyRun currently unused beyond inputs; keep for future enhancements + return job, nil +} + +func (r *StepRunReconciler) prepareExecutionContext(ctx context.Context, srun *runsv1alpha1.StepRun, engram *v1alpha1.Engram, engramTemplate *catalogv1alpha1.EngramTemplate, stepLogger *logging.ControllerLogger) (*v1alpha1.Story, *runsv1alpha1.StoryRun, *config.ResolvedExecutionConfig, []byte, time.Duration, error) { + story, err := r.getStoryForStep(ctx, srun) + if err != nil { + return nil, nil, nil, nil, 0, fmt.Errorf("failed to get story for step: %w", err) + } storyRun, err := r.getParentStoryRun(ctx, srun) if err != nil { - return nil, fmt.Errorf("failed to get parent storyrun: %w", err) + return nil, nil, nil, nil, 0, fmt.Errorf("failed to get parent storyrun: %w", err) } - - // Resolve the final configuration using the hierarchical resolver. resolvedConfig, err := r.ConfigResolver.ResolveExecutionConfig(ctx, srun, story, engram, engramTemplate) if err != nil { - return nil, fmt.Errorf("failed to resolve execution config for step '%s': %w", srun.Name, err) + return nil, nil, nil, nil, 0, fmt.Errorf("failed to resolve execution config for step '%s': %w", srun.Name, err) } - stepLogger.Info("Resolved ServiceAccountName", "sa", resolvedConfig.ServiceAccountName) - - // Get the StoryRun's initial inputs storyRunInputs, err := r.getStoryRunInputs(ctx, storyRun) if err != nil { - return nil, fmt.Errorf("failed to get storyrun inputs: %w", err) + return nil, nil, nil, nil, 0, fmt.Errorf("failed to get storyrun inputs: %w", err) } - - // Resolve inputs using the shared CEL evaluator stepOutputs, err := getPriorStepOutputs(ctx, r.Client, storyRun, nil) if err != nil { - return nil, fmt.Errorf("failed to get prior step outputs: %w", err) + return nil, nil, nil, nil, 0, fmt.Errorf("failed to get prior step outputs: %w", err) } - - var with map[string]any + with := map[string]any{} if srun.Spec.Input != nil { if err := json.Unmarshal(srun.Spec.Input.Raw, &with); err != nil { - return nil, fmt.Errorf("failed to unmarshal step 'with' block: %w", err) + return nil, nil, nil, nil, 0, fmt.Errorf("failed to unmarshal step 'with' block: %w", err) } } - - vars := map[string]any{ - "inputs": storyRunInputs, - "steps": stepOutputs, - } + vars := map[string]any{"inputs": storyRunInputs, "steps": stepOutputs} resolvedInputs, err := r.CELEvaluator.ResolveWithInputs(ctx, with, vars) if err != nil { - // This is where we detect if an upstream output is not ready. - // The CEL evaluator will return a compile error for an undeclared reference. - return nil, fmt.Errorf("failed to resolve inputs with CEL: %w", err) + return nil, nil, nil, nil, 0, fmt.Errorf("failed to resolve inputs with CEL: %w", err) } - inputBytes, err := json.Marshal(resolvedInputs) if err != nil { - return nil, fmt.Errorf("failed to marshal resolved inputs: %w", err) + return nil, nil, nil, nil, 0, fmt.Errorf("failed to marshal resolved inputs: %w", err) } + stepTimeout := r.computeStepTimeout(story, stepLogger) + return story, storyRun, resolvedConfig, inputBytes, stepTimeout, nil +} - // Setup secrets - secretEnvVars, volumes, volumeMounts := r.setupSecrets(ctx, resolvedConfig, engramTemplate) - - // Determine startedAt timestamp for accurate duration tracking - startedAt := metav1.Now() - if srun.Status.StartedAt != nil { - startedAt = *srun.Status.StartedAt +func (r *StepRunReconciler) computeStepTimeout(story *v1alpha1.Story, stepLogger *logging.ControllerLogger) time.Duration { + stepTimeout := r.ConfigResolver.GetOperatorConfig().Controller.DefaultStepTimeout + if stepTimeout == 0 { + stepTimeout = 30 * time.Minute + } + if story.Spec.Policy != nil && story.Spec.Policy.Timeouts != nil && story.Spec.Policy.Timeouts.Step != nil { + if parsedTimeout, err := time.ParseDuration(*story.Spec.Policy.Timeouts.Step); err == nil && parsedTimeout > 0 { + stepTimeout = parsedTimeout + stepLogger.Info("Using Story-level step timeout", "timeout", stepTimeout) + } else if err != nil { + stepLogger.Error(err, "Invalid step timeout in Story policy, using default", "rawTimeout", *story.Spec.Policy.Timeouts.Step, "default", stepTimeout) + } } + return stepTimeout +} - // Base environment variables +func (r *StepRunReconciler) buildBaseEnvVars(srun *runsv1alpha1.StepRun, story *v1alpha1.Story, inputBytes []byte, stepTimeout time.Duration) []corev1.EnvVar { envVars := []corev1.EnvVar{ {Name: "BUBU_STORY_NAME", Value: story.Name}, {Name: "BUBU_STORYRUN_ID", Value: srun.Spec.StoryRunRef.Name}, @@ -427,16 +449,18 @@ func (r *StepRunReconciler) createJobForStep(ctx context.Context, srun *runsv1al {Name: "BUBU_STEPRUN_NAME", Value: srun.Name}, {Name: "BUBU_STEPRUN_NAMESPACE", Value: srun.Namespace}, {Name: "BUBU_INPUTS", Value: string(inputBytes)}, - {Name: "BUBU_STARTED_AT", Value: startedAt.Format(time.RFC3339Nano)}, {Name: "BUBU_EXECUTION_MODE", Value: "batch"}, {Name: "BUBU_GRPC_PORT", Value: fmt.Sprintf("%d", r.ConfigResolver.GetOperatorConfig().Controller.Engram.EngramControllerConfig.DefaultGRPCPort)}, {Name: "BUBU_MAX_INLINE_SIZE", Value: fmt.Sprintf("%d", r.ConfigResolver.GetOperatorConfig().Controller.Engram.EngramControllerConfig.DefaultMaxInlineSize)}, {Name: "BUBU_STORAGE_TIMEOUT", Value: fmt.Sprintf("%ds", r.ConfigResolver.GetOperatorConfig().Controller.Engram.EngramControllerConfig.DefaultStorageTimeoutSeconds)}, + {Name: "BUBU_STEP_TIMEOUT", Value: stepTimeout.String()}, } + return envVars +} - // Add gRPC tuning parameters for consistency, even in batch mode +func (r *StepRunReconciler) appendGRPCTuningEnv(envVars []corev1.EnvVar) []corev1.EnvVar { engramConfig := r.ConfigResolver.GetOperatorConfig().Controller.Engram.EngramControllerConfig - envVars = append(envVars, + return append(envVars, corev1.EnvVar{Name: "BUBU_GRPC_MAX_RECV_BYTES", Value: fmt.Sprintf("%d", engramConfig.DefaultMaxRecvMsgBytes)}, corev1.EnvVar{Name: "BUBU_GRPC_MAX_SEND_BYTES", Value: fmt.Sprintf("%d", engramConfig.DefaultMaxSendMsgBytes)}, corev1.EnvVar{Name: "BUBU_GRPC_CLIENT_MAX_RECV_BYTES", Value: fmt.Sprintf("%d", engramConfig.DefaultMaxRecvMsgBytes)}, @@ -448,53 +472,34 @@ func (r *StepRunReconciler) createJobForStep(ctx context.Context, srun *runsv1al corev1.EnvVar{Name: "BUBU_GRPC_RECONNECT_MAX_BACKOFF", Value: fmt.Sprintf("%ds", engramConfig.DefaultReconnectMaxBackoffSeconds)}, corev1.EnvVar{Name: "BUBU_GRPC_HANG_TIMEOUT", Value: fmt.Sprintf("%ds", engramConfig.DefaultHangTimeoutSeconds)}, corev1.EnvVar{Name: "BUBU_GRPC_MESSAGE_TIMEOUT", Value: fmt.Sprintf("%ds", engramConfig.DefaultMessageTimeoutSeconds)}, - corev1.EnvVar{Name: "BUBU_GRPC_CHANNEL_SEND_TIMEOUT", Value: fmt.Sprintf("%ds", engramConfig.DefaultMessageTimeoutSeconds)}, // Re-use message timeout + corev1.EnvVar{Name: "BUBU_GRPC_CHANNEL_SEND_TIMEOUT", Value: fmt.Sprintf("%ds", engramConfig.DefaultMessageTimeoutSeconds)}, ) +} - // Determine step timeout with priority hierarchy: - // 1. Story.Spec.Policy.Timeouts.Step (highest priority) - // 2. Operator config DefaultStepTimeout - // 3. Fallback to 30 minutes (matches SDK default) - // This allows SDK to enforce timeout before Job-level activeDeadlineSeconds kills the pod - stepTimeout := r.ConfigResolver.GetOperatorConfig().Controller.DefaultStepTimeout - if stepTimeout == 0 { - stepTimeout = 30 * time.Minute // Fallback to SDK's default - } - // Override from Story-level policy if specified - if story.Spec.Policy != nil && story.Spec.Policy.Timeouts != nil && story.Spec.Policy.Timeouts.Step != nil { - if parsedTimeout, err := time.ParseDuration(*story.Spec.Policy.Timeouts.Step); err == nil && parsedTimeout > 0 { - stepTimeout = parsedTimeout - stepLogger.Info("Using Story-level step timeout", "timeout", stepTimeout) - } else { - stepLogger.Error(err, "Invalid step timeout in Story policy, using default", "rawTimeout", *story.Spec.Policy.Timeouts.Step, "default", stepTimeout) +func (r *StepRunReconciler) appendStorageEnv(envVars []corev1.EnvVar, resolvedConfig *config.ResolvedExecutionConfig, stepLogger *logging.ControllerLogger) []corev1.EnvVar { + if storagePolicy := resolvedConfig.Storage; storagePolicy != nil && storagePolicy.S3 != nil { + s3Config := storagePolicy.S3 + stepLogger.Info("Configuring pod for S3 object storage access", "bucket", s3Config.Bucket) + envVars = append(envVars, + corev1.EnvVar{Name: "BUBU_STORAGE_PROVIDER", Value: "s3"}, + corev1.EnvVar{Name: "BUBU_STORAGE_S3_BUCKET", Value: s3Config.Bucket}, + ) + if s3Config.Region != "" { + envVars = append(envVars, corev1.EnvVar{Name: "BUBU_STORAGE_S3_REGION", Value: s3Config.Region}) + } + if s3Config.Endpoint != "" { + envVars = append(envVars, corev1.EnvVar{Name: "BUBU_STORAGE_S3_ENDPOINT", Value: s3Config.Endpoint}) } } - envVars = append(envVars, corev1.EnvVar{ - Name: "BUBU_STEP_TIMEOUT", - Value: stepTimeout.String(), // e.g., "30m", "5m" - }) - - // Calculate activeDeadlineSeconds for defense-in-depth timeout enforcement. - // Add 2-minute grace period to allow SDK time to: - // - Dehydrate large outputs (up to 60s for large payloads) - // - Patch StepRun status with retries (up to 30s) - // - Flush logs and cleanup (up to 5s) - // This provides Kubernetes-level timeout enforcement if SDK process hangs - // before reaching its exit logic (kernel deadlock, OOM, etc.). - activeDeadlineSeconds := int64((stepTimeout + 2*time.Minute).Seconds()) - - envVars = append(envVars, secretEnvVars...) + return envVars +} - // Serialize engram's 'with' configuration to JSON for the SDK - // This mirrors the logic in realtime_engram_controller.go for streaming engrams - if engram.Spec.With != nil && len(engram.Spec.With.Raw) > 0 { - envVars = append(envVars, corev1.EnvVar{ - Name: "BUBU_CONFIG", - Value: string(engram.Spec.With.Raw), - }) - } +func (r *StepRunReconciler) addStartedAtEnv(envVars *[]corev1.EnvVar, startedAt metav1.Time) { + *envVars = append(*envVars, corev1.EnvVar{Name: "BUBU_STARTED_AT", Value: startedAt.Format(time.RFC3339Nano)}) +} - job := &batchv1.Job{ +func (r *StepRunReconciler) buildJobSpec(srun *runsv1alpha1.StepRun, resolvedConfig *config.ResolvedExecutionConfig, envVars []corev1.EnvVar, volumes []corev1.Volume, volumeMounts []corev1.VolumeMount, activeDeadlineSeconds int64) *batchv1.Job { + return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: srun.Name, Namespace: srun.Namespace, @@ -511,9 +516,7 @@ func (r *StepRunReconciler) createJobForStep(ctx context.Context, srun *runsv1al TTLSecondsAfterFinished: &resolvedConfig.TTLSecondsAfterFinished, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - "bubustack.io/steprun": srun.Name, - }, + Labels: map[string]string{"bubustack.io/steprun": srun.Name}, }, Spec: corev1.PodSpec{ RestartPolicy: resolvedConfig.RestartPolicy, @@ -521,6 +524,7 @@ func (r *StepRunReconciler) createJobForStep(ctx context.Context, srun *runsv1al AutomountServiceAccountToken: &resolvedConfig.AutomountServiceAccountToken, TerminationGracePeriodSeconds: &r.ConfigResolver.GetOperatorConfig().Controller.Engram.EngramControllerConfig.DefaultTerminationGracePeriodSeconds, SecurityContext: resolvedConfig.ToPodSecurityContext(), + Volumes: volumes, Containers: []corev1.Container{{ Name: "engram", Image: resolvedConfig.Image, @@ -530,52 +534,13 @@ func (r *StepRunReconciler) createJobForStep(ctx context.Context, srun *runsv1al LivenessProbe: resolvedConfig.LivenessProbe, ReadinessProbe: resolvedConfig.ReadinessProbe, StartupProbe: resolvedConfig.StartupProbe, + Env: envVars, + VolumeMounts: volumeMounts, }}, }, }, }, } - - // Handle object storage configuration - if storagePolicy := resolvedConfig.Storage; storagePolicy != nil && storagePolicy.S3 != nil { - s3Config := storagePolicy.S3 - stepLogger.Info("Configuring pod for S3 object storage access", "bucket", s3Config.Bucket) - - // Add environment variables for the SDK - envVars = append(envVars, - corev1.EnvVar{Name: "BUBU_STORAGE_PROVIDER", Value: "s3"}, - corev1.EnvVar{Name: "BUBU_STORAGE_S3_BUCKET", Value: s3Config.Bucket}, - ) - if s3Config.Region != "" { - envVars = append(envVars, corev1.EnvVar{Name: "BUBU_STORAGE_S3_REGION", Value: s3Config.Region}) - } - if s3Config.Endpoint != "" { - envVars = append(envVars, corev1.EnvVar{Name: "BUBU_STORAGE_S3_ENDPOINT", Value: s3Config.Endpoint}) - } - - // Handle secret-based authentication - if auth := &s3Config.Authentication; auth.SecretRef != nil { - secretName := auth.SecretRef.Name - stepLogger.Info("Using S3 secret reference for authentication", "secretName", secretName) - // envFromSources = append(envFromSources, corev1.EnvFromSource{ // This line is removed - // SecretRef: &corev1.SecretEnvSource{ - // LocalObjectReference: corev1.LocalObjectReference{Name: secretName}, - // }, - // }) - } - } - - // Final assembly of pod spec - job.Spec.Template.Spec.Volumes = volumes - job.Spec.Template.Spec.Containers[0].VolumeMounts = volumeMounts - job.Spec.Template.Spec.Containers[0].Env = envVars - // job.Spec.Template.Spec.Containers[0].EnvFrom = envFromSources // This line is removed - - if err := controllerutil.SetControllerReference(srun, job, r.Scheme); err != nil { - return nil, err - } - - return job, nil } // getStoryRunInputs fetches the initial inputs from the parent StoryRun. diff --git a/internal/controller/runs/storyrun_controller.go b/internal/controller/runs/storyrun_controller.go index 73eda16..4551597 100644 --- a/internal/controller/runs/storyrun_controller.go +++ b/internal/controller/runs/storyrun_controller.go @@ -69,126 +69,166 @@ type StoryRunReconciler struct { func (r *StoryRunReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { log := logging.NewReconcileLogger(ctx, "storyrun").WithValues("storyrun", req.NamespacedName) startTime := time.Now() - defer func() { - metrics.RecordControllerReconcile("storyrun", time.Since(startTime), err) - }() + defer func() { metrics.RecordControllerReconcile("storyrun", time.Since(startTime), err) }() - // Bound reconcile duration - timeout := r.ConfigResolver.GetOperatorConfig().Controller.ReconcileTimeout - if timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, timeout) - defer cancel() - } + ctx, cancel := r.withReconcileTimeout(ctx) + defer cancel() var srun runsv1alpha1.StoryRun if err := r.Get(ctx, req.NamespacedName, &srun); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } - // Validate input size to prevent oversized objects when webhooks are disabled. - maxBytes := r.ConfigResolver.GetOperatorConfig().Controller.Engram.EngramControllerConfig.DefaultMaxInlineSize - if maxBytes == 0 { - maxBytes = DefaultMaxInlineInputsSize // Fallback to a safe default, mirroring webhook logic. + if handled, err := r.guardOversizedInputs(ctx, &srun, log); handled || err != nil { + return ctrl.Result{}, err + } + if handled, result, err := r.handleDeletionIfNeeded(ctx, &srun); handled { + return result, err + } + if err := r.ensureFinalizer(ctx, &srun, log); err != nil { + return ctrl.Result{}, err + } + if err := r.rbacManager.Reconcile(ctx, &srun); err != nil { + log.Error(err, "Failed to reconcile RBAC") + return ctrl.Result{}, err + } + if srun.Status.Phase == enums.PhaseSucceeded || srun.Status.Phase == enums.PhaseFailed { + return ctrl.Result{}, nil + } + + return r.reconcileAfterSetup(ctx, &srun, log) +} + +func (r *StoryRunReconciler) withReconcileTimeout(ctx context.Context) (context.Context, context.CancelFunc) { + timeout := r.ConfigResolver.GetOperatorConfig().Controller.ReconcileTimeout + if timeout <= 0 { + return ctx, func() {} + } + return context.WithTimeout(ctx, timeout) +} + +func (r *StoryRunReconciler) reconcileAfterSetup(ctx context.Context, srun *runsv1alpha1.StoryRun, log *logging.ControllerLogger) (ctrl.Result, error) { + var story *bubushv1alpha1.Story + handled, result, s, err := r.getStoryOrWait(ctx, srun, log) + if handled || err != nil { + return result, err + } + story = s + + if handled, err := r.handleStreamingIfNeeded(ctx, srun, story, log); handled || err != nil { + return ctrl.Result{}, err + } + if handled, err := r.initializePhaseIfEmpty(ctx, srun); handled || err != nil { + if err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{Requeue: true}, nil + } + return r.dagReconciler.Reconcile(ctx, srun, story) +} + +func (r *StoryRunReconciler) guardOversizedInputs(ctx context.Context, srun *runsv1alpha1.StoryRun, log *logging.ControllerLogger) (bool, error) { + // Prefer StoryRun-specific knob from operator config; fall back to sane default + maxBytes := r.ConfigResolver.GetOperatorConfig().Controller.StoryRun.MaxInlineInputsSize + if maxBytes <= 0 { + maxBytes = DefaultMaxInlineInputsSize } if srun.Spec.Inputs != nil && len(srun.Spec.Inputs.Raw) > maxBytes { if srun.Status.Phase != enums.PhaseFailed { log.Info("StoryRun spec.inputs is too large, failing run", "size", len(srun.Spec.Inputs.Raw), "maxSize", maxBytes) failMsg := fmt.Sprintf("spec.inputs size %d bytes exceeds maximum of %d", len(srun.Spec.Inputs.Raw), maxBytes) - if err := r.setStoryRunPhase(ctx, &srun, enums.PhaseFailed, failMsg); err != nil { + if err := r.setStoryRunPhase(ctx, srun, enums.PhaseFailed, failMsg); err != nil { log.Error(err, "Failed to set StoryRun status to Failed due to oversized inputs") - return ctrl.Result{}, err + return true, err } } - return ctrl.Result{}, nil // Stop reconciliation for this oversized object. + return true, nil } + return false, nil +} - // Handle deletion first +func (r *StoryRunReconciler) handleDeletionIfNeeded(ctx context.Context, srun *runsv1alpha1.StoryRun) (bool, ctrl.Result, error) { if !srun.DeletionTimestamp.IsZero() { - return r.reconcileDelete(ctx, &srun) + res, err := r.reconcileDelete(ctx, srun) + return true, res, err } + return false, ctrl.Result{}, nil +} - // Add finalizer if it doesn't exist - if !controllerutil.ContainsFinalizer(&srun, StoryRunFinalizer) { - beforePatch := srun.DeepCopy() - controllerutil.AddFinalizer(&srun, StoryRunFinalizer) - if err := r.Patch(ctx, &srun, client.MergeFrom(beforePatch)); err != nil { - log.Error(err, "Failed to add finalizer") - return ctrl.Result{}, err - } +func (r *StoryRunReconciler) ensureFinalizer(ctx context.Context, srun *runsv1alpha1.StoryRun, log *logging.ControllerLogger) error { + if controllerutil.ContainsFinalizer(srun, StoryRunFinalizer) { + return nil } - - if err := r.rbacManager.Reconcile(ctx, &srun); err != nil { - log.Error(err, "Failed to reconcile RBAC") - // We can choose to fail hard here or just log and continue. - // For now, let's fail hard as RBAC is critical for engrams to run. - return ctrl.Result{}, err + beforePatch := srun.DeepCopy() + controllerutil.AddFinalizer(srun, StoryRunFinalizer) + if err := r.Patch(ctx, srun, client.MergeFrom(beforePatch)); err != nil { + log.Error(err, "Failed to add finalizer") + return err } + return nil +} - if srun.Status.Phase == enums.PhaseSucceeded || srun.Status.Phase == enums.PhaseFailed { - return ctrl.Result{}, nil +func (r *StoryRunReconciler) getStoryOrWait(ctx context.Context, srun *runsv1alpha1.StoryRun, log *logging.ControllerLogger) (bool, ctrl.Result, *bubushv1alpha1.Story, error) { + story, err := r.getStoryForRun(ctx, srun) + if err == nil { + return false, ctrl.Result{}, story, nil } - - story, err := r.getStoryForRun(ctx, &srun) - if err != nil { - if errors.IsNotFound(err) { - log.Info("Story not found for StoryRun, will requeue", "story", srun.Spec.StoryRef.Name) - // Emit event for user visibility (guard recorder for tests) - if r.Recorder != nil { - r.Recorder.Event(&srun, "Warning", conditions.ReasonStoryNotFound, fmt.Sprintf("Waiting for Story '%s'", srun.Spec.StoryRef.ToNamespacedName(&srun).String())) - } - statusErr := patch.RetryableStatusPatch(ctx, r.Client, &srun, func(obj client.Object) { - sr := obj.(*runsv1alpha1.StoryRun) - if sr.Status.Phase == "" { - sr.Status.Phase = enums.PhasePending - } - cm := conditions.NewConditionManager(sr.Generation) - message := fmt.Sprintf("Waiting for Story '%s' to be created", srun.Spec.StoryRef.ToNamespacedName(&srun).String()) - cm.SetCondition(&sr.Status.Conditions, conditions.ConditionReady, metav1.ConditionFalse, conditions.ReasonStoryNotFound, message) - sr.Status.Message = message - }) - if statusErr != nil { - log.Error(statusErr, "Failed to update StoryRun status while waiting for Story") - return ctrl.Result{}, statusErr + if errors.IsNotFound(err) { + log.Info("Story not found for StoryRun, will requeue", "story", srun.Spec.StoryRef.Name) + if r.Recorder != nil { + r.Recorder.Event(srun, "Warning", conditions.ReasonStoryNotFound, fmt.Sprintf("Waiting for Story '%s'", srun.Spec.StoryRef.ToNamespacedName(srun).String())) + } + statusErr := patch.RetryableStatusPatch(ctx, r.Client, srun, func(obj client.Object) { + sr := obj.(*runsv1alpha1.StoryRun) + if sr.Status.Phase == "" { + sr.Status.Phase = enums.PhasePending } - return ctrl.Result{RequeueAfter: 15 * time.Second}, nil + cm := conditions.NewConditionManager(sr.Generation) + message := fmt.Sprintf("Waiting for Story '%s' to be created", srun.Spec.StoryRef.ToNamespacedName(srun).String()) + cm.SetCondition(&sr.Status.Conditions, conditions.ConditionReady, metav1.ConditionFalse, conditions.ReasonStoryNotFound, message) + sr.Status.Message = message + }) + if statusErr != nil { + log.Error(statusErr, "Failed to update StoryRun status while waiting for Story") + return true, ctrl.Result{}, nil, statusErr } + return true, ctrl.Result{RequeueAfter: 15 * time.Second}, nil, nil + } + log.Error(err, "Failed to get Story for StoryRun") + if updateErr := r.setStoryRunPhase(ctx, srun, enums.PhaseFailed, fmt.Sprintf("failed to get story: %v", err)); updateErr != nil { + log.Error(updateErr, "Failed to set StoryRun status to Failed") + return true, ctrl.Result{}, nil, updateErr + } + return true, ctrl.Result{}, nil, nil +} - // For other errors, fail the run - log.Error(err, "Failed to get Story for StoryRun") - if updateErr := r.setStoryRunPhase(ctx, &srun, enums.PhaseFailed, fmt.Sprintf("failed to get story: %v", err)); updateErr != nil { +func (r *StoryRunReconciler) handleStreamingIfNeeded(ctx context.Context, srun *runsv1alpha1.StoryRun, story *bubushv1alpha1.Story, log *logging.ControllerLogger) (bool, error) { + if story.Spec.Pattern != enums.StreamingPattern { + return false, nil + } + if err := r.reconcileStreamingStoryRun(ctx, srun, story); err != nil { + if ctx.Err() == context.DeadlineExceeded { + log.Info("Reconciliation timed out, returning error to trigger failure rate limiter") + return true, fmt.Errorf("reconcile timed out: %w", ctx.Err()) + } + log.Error(err, "Failed to reconcile streaming story run") + if updateErr := r.setStoryRunPhase(ctx, srun, enums.PhaseFailed, fmt.Sprintf("failed to reconcile streaming story: %v", err)); updateErr != nil { log.Error(updateErr, "Failed to set StoryRun status to Failed") - return ctrl.Result{}, updateErr // Return the error from the status update + return true, updateErr } - return ctrl.Result{}, nil // Do not requeue, as this is a terminal state + return true, nil } + return false, nil +} - if story.Spec.Pattern == enums.StreamingPattern { - if err := r.reconcileStreamingStoryRun(ctx, &srun, story); err != nil { - if ctx.Err() == context.DeadlineExceeded { - log.Info("Reconciliation timed out, returning error to trigger failure rate limiter") - return ctrl.Result{}, fmt.Errorf("reconcile timed out: %w", ctx.Err()) - } - log.Error(err, "Failed to reconcile streaming story run") - if updateErr := r.setStoryRunPhase(ctx, &srun, enums.PhaseFailed, fmt.Sprintf("failed to reconcile streaming story: %v", err)); updateErr != nil { - log.Error(updateErr, "Failed to set StoryRun status to Failed") - return ctrl.Result{}, updateErr - } - return ctrl.Result{}, nil - } +func (r *StoryRunReconciler) initializePhaseIfEmpty(ctx context.Context, srun *runsv1alpha1.StoryRun) (bool, error) { + if srun.Status.Phase != "" { + return false, nil } - - if srun.Status.Phase == "" { - err := r.setStoryRunPhase(ctx, &srun, enums.PhaseRunning, "Starting StoryRun execution") - if err != nil { - log.Error(err, "Failed to update StoryRun status to Running") - return ctrl.Result{}, err - } - return ctrl.Result{Requeue: true}, nil + if err := r.setStoryRunPhase(ctx, srun, enums.PhaseRunning, "Starting StoryRun execution"); err != nil { + return true, err } - - // Re-evaluate the entire DAG based on the current state. - return r.dagReconciler.Reconcile(ctx, &srun, story) + return true, nil } func (r *StoryRunReconciler) reconcileStreamingStoryRun(ctx context.Context, srun *runsv1alpha1.StoryRun, story *bubushv1alpha1.Story) error { diff --git a/internal/setup/indexing.go b/internal/setup/indexing.go index 676de3c..7b2dde7 100644 --- a/internal/setup/indexing.go +++ b/internal/setup/indexing.go @@ -21,135 +21,128 @@ var setupLog = log.Log.WithName("setup") func SetupIndexers(ctx context.Context, mgr manager.Manager) { setupLog.Info("setting up field indexes") - // Index Engrams by the EngramTemplate they reference. - if err := mgr.GetFieldIndexer().IndexField(ctx, &bubushv1alpha1.Engram{}, "spec.templateRef.name", func(rawObj client.Object) []string { - engram := rawObj.(*bubushv1alpha1.Engram) - if engram.Spec.TemplateRef.Name == "" { - return nil - } - return []string{engram.Spec.TemplateRef.Name} - }); err != nil { - setupLog.Error(err, "failed to index Engram spec.templateRef.name") + mustIndexField(ctx, mgr, &bubushv1alpha1.Engram{}, "spec.templateRef.name", extractEngramTemplateName, "failed to index Engram spec.templateRef.name") + + mustIndexField(ctx, mgr, &runsv1alpha1.StepRun{}, "spec.engramRef", extractStepRunEngramRef, "failed to index StepRun spec.engramRef.name") + + mustIndexField(ctx, mgr, &runsv1alpha1.StepRun{}, "spec.storyRunRef.name", extractStepRunStoryRunRef, "failed to index StepRun spec.storyRunRef.name") + + mustIndexField(ctx, mgr, &runsv1alpha1.StoryRun{}, "spec.impulseRef.name", extractStoryRunImpulseRef, "failed to index StoryRun spec.impulseRef.name") + + mustIndexField(ctx, mgr, &bubushv1alpha1.Impulse{}, "spec.templateRef.name", extractImpulseTemplateName, "failed to index Impulse spec.templateRef.name") + + mustIndexField(ctx, mgr, &runsv1alpha1.StoryRun{}, "spec.storyRef.name", extractStoryRunStoryRefName, "failed to index StoryRun spec.storyRef.name") + + mustIndexField(ctx, mgr, &bubushv1alpha1.Impulse{}, "spec.storyRef.name", extractImpulseStoryRefName, "failed to index Impulse spec.storyRef.name") + + mustIndexField(ctx, mgr, &bubushv1alpha1.Story{}, "spec.steps.ref.name", extractStoryStepEngramRefs, "failed to index Story spec.steps.ref.name") + + mustIndexField(ctx, mgr, &catalogv1alpha1.EngramTemplate{}, "spec.description", extractEngramTemplateDescription, "failed to index EngramTemplate spec.description") + + mustIndexField(ctx, mgr, &catalogv1alpha1.EngramTemplate{}, "spec.version", extractEngramTemplateVersion, "failed to index EngramTemplate spec.version") +} + +// mustIndexField registers an index and terminates the process if registration fails. +func mustIndexField( + ctx context.Context, + mgr manager.Manager, + obj client.Object, + field string, + extractor func(client.Object) []string, + errMsg string, +) { + if err := mgr.GetFieldIndexer().IndexField(ctx, obj, field, extractor); err != nil { + setupLog.Error(err, errMsg) os.Exit(1) } +} - // Index StepRuns by the Engram they reference. - if err := mgr.GetFieldIndexer().IndexField(ctx, &runsv1alpha1.StepRun{}, "spec.engramRef", func(rawObj client.Object) []string { - stepRun := rawObj.(*runsv1alpha1.StepRun) - if stepRun.Spec.EngramRef == nil || stepRun.Spec.EngramRef.Name == "" { - return nil - } - return []string{stepRun.Spec.EngramRef.Name} - }); err != nil { - setupLog.Error(err, "failed to index StepRun spec.engramRef.name") - os.Exit(1) +func extractEngramTemplateName(rawObj client.Object) []string { + engram := rawObj.(*bubushv1alpha1.Engram) + if engram.Spec.TemplateRef.Name == "" { + return nil } + return []string{engram.Spec.TemplateRef.Name} +} - // Index StepRuns by the StoryRun they belong to. - if err := mgr.GetFieldIndexer().IndexField(ctx, &runsv1alpha1.StepRun{}, "spec.storyRunRef.name", func(rawObj client.Object) []string { - stepRun := rawObj.(*runsv1alpha1.StepRun) - if stepRun.Spec.StoryRunRef.Name == "" { - return nil - } - return []string{stepRun.Spec.StoryRunRef.Name} - }); err != nil { - setupLog.Error(err, "failed to index StepRun spec.storyRunRef.name") - os.Exit(1) +func extractStepRunEngramRef(rawObj client.Object) []string { + stepRun := rawObj.(*runsv1alpha1.StepRun) + if stepRun.Spec.EngramRef == nil || stepRun.Spec.EngramRef.Name == "" { + return nil } + return []string{stepRun.Spec.EngramRef.Name} +} - // Index StoryRuns by the Impulse that triggered them. - if err := mgr.GetFieldIndexer().IndexField(ctx, &runsv1alpha1.StoryRun{}, "spec.impulseRef.name", func(rawObj client.Object) []string { - storyRun := rawObj.(*runsv1alpha1.StoryRun) - if storyRun.Spec.ImpulseRef == nil || storyRun.Spec.ImpulseRef.Name == "" { - return nil - } - return []string{storyRun.Spec.ImpulseRef.Name} - }); err != nil { - setupLog.Error(err, "failed to index StoryRun spec.impulseRef.name") - os.Exit(1) +func extractStepRunStoryRunRef(rawObj client.Object) []string { + stepRun := rawObj.(*runsv1alpha1.StepRun) + if stepRun.Spec.StoryRunRef.Name == "" { + return nil } + return []string{stepRun.Spec.StoryRunRef.Name} +} - // Index Impulses by the ImpulseTemplate they reference. - if err := mgr.GetFieldIndexer().IndexField(ctx, &bubushv1alpha1.Impulse{}, "spec.templateRef.name", func(rawObj client.Object) []string { - impulse := rawObj.(*bubushv1alpha1.Impulse) - if impulse.Spec.TemplateRef.Name == "" { - return nil - } - return []string{impulse.Spec.TemplateRef.Name} - }); err != nil { - setupLog.Error(err, "failed to index Impulse spec.templateRef.name") - os.Exit(1) +func extractStoryRunImpulseRef(rawObj client.Object) []string { + storyRun := rawObj.(*runsv1alpha1.StoryRun) + if storyRun.Spec.ImpulseRef == nil || storyRun.Spec.ImpulseRef.Name == "" { + return nil } + return []string{storyRun.Spec.ImpulseRef.Name} +} - // Index StoryRuns by the Story they reference. - if err := mgr.GetFieldIndexer().IndexField(ctx, &runsv1alpha1.StoryRun{}, "spec.storyRef.name", func(rawObj client.Object) []string { - storyRun := rawObj.(*runsv1alpha1.StoryRun) - if storyRun.Spec.StoryRef.Name == "" { - return nil - } - return []string{storyRun.Spec.StoryRef.Name} - }); err != nil { - setupLog.Error(err, "failed to index StoryRun spec.storyRef.name") - os.Exit(1) +func extractImpulseTemplateName(rawObj client.Object) []string { + impulse := rawObj.(*bubushv1alpha1.Impulse) + if impulse.Spec.TemplateRef.Name == "" { + return nil } + return []string{impulse.Spec.TemplateRef.Name} +} - // Index Impulses by the Story they trigger. - if err := mgr.GetFieldIndexer().IndexField(ctx, &bubushv1alpha1.Impulse{}, "spec.storyRef.name", func(rawObj client.Object) []string { - impulse := rawObj.(*bubushv1alpha1.Impulse) - if impulse.Spec.StoryRef.Name == "" { - return nil - } - return []string{impulse.Spec.StoryRef.Name} - }); err != nil { - setupLog.Error(err, "failed to index Impulse spec.storyRef.name") - os.Exit(1) +func extractStoryRunStoryRefName(rawObj client.Object) []string { + storyRun := rawObj.(*runsv1alpha1.StoryRun) + if storyRun.Spec.StoryRef.Name == "" { + return nil } + return []string{storyRun.Spec.StoryRef.Name} +} - // Index Stories by Engram names referenced in steps (spec.steps[].ref.name) - if err := mgr.GetFieldIndexer().IndexField(ctx, &bubushv1alpha1.Story{}, "spec.steps.ref.name", func(rawObj client.Object) []string { - story := rawObj.(*bubushv1alpha1.Story) - if len(story.Spec.Steps) == 0 { - return nil - } - nameSet := make(map[string]struct{}) - for i := range story.Spec.Steps { - step := &story.Spec.Steps[i] - if step.Ref != nil && step.Ref.Name != "" { - nameSet[step.Ref.Name] = struct{}{} - } - } - if len(nameSet) == 0 { - return nil - } - out := make([]string, 0, len(nameSet)) - for n := range nameSet { - out = append(out, n) - } - return out - }); err != nil { - setupLog.Error(err, "failed to index Story spec.steps.ref.name") - os.Exit(1) +func extractImpulseStoryRefName(rawObj client.Object) []string { + impulse := rawObj.(*bubushv1alpha1.Impulse) + if impulse.Spec.StoryRef.Name == "" { + return nil } + return []string{impulse.Spec.StoryRef.Name} +} - // Index EngramTemplates by description. - if err := mgr.GetFieldIndexer().IndexField(ctx, &catalogv1alpha1.EngramTemplate{}, "spec.description", - func(obj client.Object) []string { - template := obj.(*catalogv1alpha1.EngramTemplate) - if template.Spec.Description != "" { - return []string{template.Spec.Description} - } - return []string{"no-description"} - }); err != nil { - setupLog.Error(err, "failed to index EngramTemplate spec.description") - os.Exit(1) +func extractStoryStepEngramRefs(rawObj client.Object) []string { + story := rawObj.(*bubushv1alpha1.Story) + if len(story.Spec.Steps) == 0 { + return nil + } + nameSet := make(map[string]struct{}) + for i := range story.Spec.Steps { + step := &story.Spec.Steps[i] + if step.Ref != nil && step.Ref.Name != "" { + nameSet[step.Ref.Name] = struct{}{} + } + } + if len(nameSet) == 0 { + return nil } + out := make([]string, 0, len(nameSet)) + for n := range nameSet { + out = append(out, n) + } + return out +} - // Index EngramTemplates by version. - if err := mgr.GetFieldIndexer().IndexField(ctx, &catalogv1alpha1.EngramTemplate{}, "spec.version", - func(obj client.Object) []string { - return []string{obj.(*catalogv1alpha1.EngramTemplate).Spec.Version} - }); err != nil { - setupLog.Error(err, "failed to index EngramTemplate spec.version") - os.Exit(1) +func extractEngramTemplateDescription(obj client.Object) []string { + template := obj.(*catalogv1alpha1.EngramTemplate) + if template.Spec.Description != "" { + return []string{template.Spec.Description} } + return []string{"no-description"} +} + +func extractEngramTemplateVersion(obj client.Object) []string { + return []string{obj.(*catalogv1alpha1.EngramTemplate).Spec.Version} } diff --git a/internal/webhook/runs/v1alpha1/steprun_webhook.go b/internal/webhook/runs/v1alpha1/steprun_webhook.go index ed8b575..7a45db5 100644 --- a/internal/webhook/runs/v1alpha1/steprun_webhook.go +++ b/internal/webhook/runs/v1alpha1/steprun_webhook.go @@ -116,50 +116,69 @@ func (v *StepRunCustomValidator) ValidateDelete(ctx context.Context, obj runtime // - spec.stepId required // - If Input present, must be a JSON object func (v *StepRunCustomValidator) validateStepRun(sr *runsv1alpha1.StepRun) error { + if err := requireBasicFields(sr); err != nil { + return err + } + maxBytes := pickMaxInlineBytes(v.Config) + if err := validateInputs(sr, maxBytes); err != nil { + return err + } + if err := validateStatusOutput(sr, maxBytes); err != nil { + return err + } + return validateTotalSize(sr) +} + +func requireBasicFields(sr *runsv1alpha1.StepRun) error { if sr.Spec.StoryRunRef.Name == "" { return fmt.Errorf("spec.storyRunRef.name is required") } if sr.Spec.StepID == "" { return fmt.Errorf("spec.stepId is required") } + return nil +} - maxBytes := v.Config.Engram.EngramControllerConfig.DefaultMaxInlineSize +func pickMaxInlineBytes(cfg *config.ControllerConfig) int { + maxBytes := cfg.Engram.EngramControllerConfig.DefaultMaxInlineSize if maxBytes == 0 { - maxBytes = 1024 // Fallback to a safe default if config is not loaded + maxBytes = 1024 } + return maxBytes +} - if sr.Spec.Input != nil && len(sr.Spec.Input.Raw) > 0 { - b := sr.Spec.Input.Raw - for len(b) > 0 && (b[0] == ' ' || b[0] == '\n' || b[0] == '\t' || b[0] == '\r') { - b = b[1:] - } - if len(b) > 0 && b[0] != '{' { - return fmt.Errorf("spec.input must be a JSON object") - } - // Enforce an upper bound for inline input size to align with Engram/Impulse validation. - // Large payloads must be handled via SDK offload and storage policies. - if len(sr.Spec.Input.Raw) > maxBytes { - return fmt.Errorf("spec.input too large (%d bytes > %d). Provide large payloads via object storage (Story.policy.storage) and references instead of inlining", len(sr.Spec.Input.Raw), maxBytes) - } +func validateInputs(sr *runsv1alpha1.StepRun, maxBytes int) error { + if sr.Spec.Input == nil || len(sr.Spec.Input.Raw) == 0 { + return nil + } + b := sr.Spec.Input.Raw + for len(b) > 0 && (b[0] == ' ' || b[0] == '\n' || b[0] == '\t' || b[0] == '\r') { + b = b[1:] } + if len(b) > 0 && b[0] != '{' { + return fmt.Errorf("spec.input must be a JSON object") + } + if len(sr.Spec.Input.Raw) > maxBytes { + return fmt.Errorf("spec.input too large (%d bytes > %d). Provide large payloads via object storage (Story.policy.storage) and references instead of inlining", len(sr.Spec.Input.Raw), maxBytes) + } + return nil +} - // Validate status.output size on updates. This is the critical check. +func validateStatusOutput(sr *runsv1alpha1.StepRun, maxBytes int) error { if sr.Status.Output != nil && len(sr.Status.Output.Raw) > maxBytes { return fmt.Errorf("status.output is too large (%d bytes > %d). Large outputs must be offloaded by the SDK", len(sr.Status.Output.Raw), maxBytes) } + return nil +} - // Add total object size validation on update to prevent etcd errors +func validateTotalSize(sr *runsv1alpha1.StepRun) error { rawSR, err := json.Marshal(sr) if err != nil { - // This should not happen on a valid object return fmt.Errorf("internal error: failed to marshal StepRun for size validation: %w", err) } - - // Use a safe, hardcoded limit slightly below the typical 1.5MiB etcd limit. const maxTotalStepRunSizeBytes = 1 * 1024 * 1024 // 1 MiB if len(rawSR) > maxTotalStepRunSizeBytes { return fmt.Errorf("StepRun total size of %d bytes exceeds maximum allowed size of %d bytes", len(rawSR), maxTotalStepRunSizeBytes) } - return nil } diff --git a/internal/webhook/runs/v1alpha1/storyrun_webhook.go b/internal/webhook/runs/v1alpha1/storyrun_webhook.go index 8b08f55..dd72c9b 100644 --- a/internal/webhook/runs/v1alpha1/storyrun_webhook.go +++ b/internal/webhook/runs/v1alpha1/storyrun_webhook.go @@ -21,6 +21,7 @@ import ( "fmt" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/webhook" @@ -46,6 +47,13 @@ type StoryRunWebhook struct { func (wh *StoryRunWebhook) SetupWebhookWithManager(mgr ctrl.Manager) error { wh.Client = mgr.GetClient() + // Initialize operator config for validation knobs + operatorConfigManager := config.NewOperatorConfigManager( + mgr.GetClient(), + "bobrapet-system", + "bobrapet-operator-config", + ) + wh.Config = operatorConfigManager.GetControllerConfig() return ctrl.NewWebhookManagedBy(mgr). For(&runsv1alpha1.StoryRun{}). @@ -116,64 +124,81 @@ func (v *StoryRunCustomValidator) ValidateDelete(ctx context.Context, obj runtim // - spec.storyRef.name required // - inputs, if present, must be JSON object func (v *StoryRunCustomValidator) validateStoryRun(ctx context.Context, sr *runsv1alpha1.StoryRun) error { + if err := requireStoryRef(sr); err != nil { + return err + } + story, _, err := fetchStory(ctx, v.Client, sr) + if err != nil { + return err + } + if err := validateInputsShapeAndSize(v.Config, sr); err != nil { + return err + } + return validateInputsSchema(story, sr) +} + +func requireStoryRef(sr *runsv1alpha1.StoryRun) error { if sr.Spec.StoryRef.Name == "" { return fmt.Errorf("spec.storyRef.name is required") } + return nil +} - // Fetch the referenced Story to validate against its schema +func fetchStory(ctx context.Context, c client.Client, sr *runsv1alpha1.StoryRun) (*bubuv1alpha1.Story, types.NamespacedName, error) { story := &bubuv1alpha1.Story{} storyKey := sr.Spec.StoryRef.ToNamespacedName(sr) - if err := v.Client.Get(ctx, storyKey, story); err != nil { + if err := c.Get(ctx, storyKey, story); err != nil { if errors.IsNotFound(err) { - return fmt.Errorf("referenced story '%s' not found", storyKey.String()) + return nil, storyKey, fmt.Errorf("referenced story '%s' not found", storyKey.String()) } - return fmt.Errorf("failed to get referenced story '%s': %w", storyKey.String(), err) + return nil, storyKey, fmt.Errorf("failed to get referenced story '%s': %w", storyKey.String(), err) } + return story, storyKey, nil +} - if sr.Spec.Inputs != nil && len(sr.Spec.Inputs.Raw) > 0 { - b := sr.Spec.Inputs.Raw - for len(b) > 0 && (b[0] == ' ' || b[0] == '\n' || b[0] == '\t' || b[0] == '\r') { - b = b[1:] - } - if len(b) > 0 && b[0] != '{' { - return fmt.Errorf("spec.inputs must be a JSON object") - } - - // Enforce an upper bound for inline input size to avoid oversized API server payloads. - // Use EngramControllerConfig.DefaultMaxInlineSize for a single cap. - maxBytes := v.Config.Engram.EngramControllerConfig.DefaultMaxInlineSize - if maxBytes == 0 { - maxBytes = 1024 // Fallback - } - if len(sr.Spec.Inputs.Raw) > maxBytes { - return fmt.Errorf("spec.inputs is too large (%d bytes > %d). Provide large inputs via an offloading mechanism instead of inlining", len(sr.Spec.Inputs.Raw), maxBytes) - } +func validateInputsShapeAndSize(cfg *config.ControllerConfig, sr *runsv1alpha1.StoryRun) error { + if sr.Spec.Inputs == nil || len(sr.Spec.Inputs.Raw) == 0 { + return nil } + b := sr.Spec.Inputs.Raw + for len(b) > 0 && (b[0] == ' ' || b[0] == '\n' || b[0] == '\t' || b[0] == '\r') { + b = b[1:] + } + if len(b) > 0 && b[0] != '{' { + return fmt.Errorf("spec.inputs must be a JSON object") + } + // Use StoryRun-specific knob; fall back to defaults if unset + maxBytes := cfg.StoryRun.MaxInlineInputsSize + if maxBytes <= 0 { + maxBytes = config.DefaultControllerConfig().StoryRun.MaxInlineInputsSize + } + if len(sr.Spec.Inputs.Raw) > maxBytes { + return fmt.Errorf("spec.inputs is too large (%d bytes > %d). Provide large inputs via an offloading mechanism instead of inlining", len(sr.Spec.Inputs.Raw), maxBytes) + } + return nil +} - // Validate inputs against the Story's input schema - if story.Spec.InputsSchema != nil && len(story.Spec.InputsSchema.Raw) > 0 { - schemaLoader := gojsonschema.NewStringLoader(string(story.Spec.InputsSchema.Raw)) - var documentLoader gojsonschema.JSONLoader - if sr.Spec.Inputs != nil && len(sr.Spec.Inputs.Raw) > 0 { - documentLoader = gojsonschema.NewStringLoader(string(sr.Spec.Inputs.Raw)) - } else { - // If inputs are nil or empty, validate against an empty JSON object. - // This correctly handles cases where the schema requires certain fields. - documentLoader = gojsonschema.NewStringLoader("{}") - } - - result, err := gojsonschema.Validate(schemaLoader, documentLoader) - if err != nil { - return fmt.Errorf("error validating spec.inputs against Story schema: %w", err) - } - if !result.Valid() { - var errs []string - for _, desc := range result.Errors() { - errs = append(errs, desc.String()) - } - return fmt.Errorf("spec.inputs is invalid against Story schema: %v", errs) +func validateInputsSchema(story *bubuv1alpha1.Story, sr *runsv1alpha1.StoryRun) error { + if story.Spec.InputsSchema == nil || len(story.Spec.InputsSchema.Raw) == 0 { + return nil + } + schemaLoader := gojsonschema.NewStringLoader(string(story.Spec.InputsSchema.Raw)) + var documentLoader gojsonschema.JSONLoader + if sr.Spec.Inputs != nil && len(sr.Spec.Inputs.Raw) > 0 { + documentLoader = gojsonschema.NewStringLoader(string(sr.Spec.Inputs.Raw)) + } else { + documentLoader = gojsonschema.NewStringLoader("{}") + } + result, err := gojsonschema.Validate(schemaLoader, documentLoader) + if err != nil { + return fmt.Errorf("error validating spec.inputs against Story schema: %w", err) + } + if !result.Valid() { + var errs []string + for _, desc := range result.Errors() { + errs = append(errs, desc.String()) } + return fmt.Errorf("spec.inputs is invalid against Story schema: %v", errs) } - return nil } diff --git a/internal/webhook/v1alpha1/engram_webhook.go b/internal/webhook/v1alpha1/engram_webhook.go index 2907c70..ed51cae 100644 --- a/internal/webhook/v1alpha1/engram_webhook.go +++ b/internal/webhook/v1alpha1/engram_webhook.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - "github.com/xeipuuv/gojsonschema" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -141,53 +140,52 @@ func (v *EngramCustomValidator) ValidateDelete(ctx context.Context, obj runtime. // - templateRef.name must be set (enforced by CRD; double-check presence) // - if With is present, it must be a JSON object (not array/primitive) func (v *EngramCustomValidator) validateEngram(ctx context.Context, engram *bubushv1alpha1.Engram) error { + if err := requireTemplateRef(engram); err != nil { + return err + } + template, err := fetchEngramTemplate(ctx, v.Client, engram.Spec.TemplateRef.Name) + if err != nil { + return err + } + if err := validateWithBlock(engram, v.Config, template); err != nil { + return err + } + return nil +} + +func requireTemplateRef(engram *bubushv1alpha1.Engram) error { if engram.Spec.TemplateRef.Name == "" { return fmt.Errorf("spec.templateRef.name is required") } + return nil +} - // Fetch the template to validate against its schema +func fetchEngramTemplate(ctx context.Context, c client.Client, name string) (*v1alpha1.EngramTemplate, error) { var template v1alpha1.EngramTemplate - if err := v.Client.Get(ctx, types.NamespacedName{Name: engram.Spec.TemplateRef.Name, Namespace: ""}, &template); err != nil { + if err := c.Get(ctx, types.NamespacedName{Name: name, Namespace: ""}, &template); err != nil { if errors.IsNotFound(err) { - return fmt.Errorf("EngramTemplate '%s' not found", engram.Spec.TemplateRef.Name) + return nil, fmt.Errorf("EngramTemplate '%s' not found", name) } - return fmt.Errorf("failed to get EngramTemplate '%s': %w", engram.Spec.TemplateRef.Name, err) + return nil, fmt.Errorf("failed to get EngramTemplate '%s': %w", name, err) } + return &template, nil +} - if engram.Spec.With != nil && len(engram.Spec.With.Raw) > 0 { - // Validate that Raw JSON is an object (starts with '{') - b := engram.Spec.With.Raw - for len(b) > 0 && (b[0] == ' ' || b[0] == '\n' || b[0] == '\t' || b[0] == '\r') { - b = b[1:] - } - if len(b) > 0 && b[0] != '{' { - return fmt.Errorf("spec.with must be a JSON object") - } - // Enforce an upper bound for inline config to avoid oversized pod specs/APIServer payloads. - // Use EngramControllerConfig.DefaultMaxInlineSize for a single cap. - maxBytes := v.Config.Engram.EngramControllerConfig.DefaultMaxInlineSize - if maxBytes == 0 { - maxBytes = 1024 // Fallback to a safe default - } - if len(engram.Spec.With.Raw) > maxBytes { - return fmt.Errorf("spec.with too large (%d bytes). Provide large payloads via object storage and references instead of inlining", len(engram.Spec.With.Raw)) - } - - // Validate 'with' against the template's configSchema - if template.Spec.ConfigSchema != nil && len(template.Spec.ConfigSchema.Raw) > 0 { - schemaLoader := gojsonschema.NewStringLoader(string(template.Spec.ConfigSchema.Raw)) - documentLoader := gojsonschema.NewStringLoader(string(engram.Spec.With.Raw)) - result, err := gojsonschema.Validate(schemaLoader, documentLoader) - if err != nil { - return fmt.Errorf("error validating spec.with against EngramTemplate configSchema: %w", err) - } - if !result.Valid() { - var errs []string - for _, desc := range result.Errors() { - errs = append(errs, desc.String()) - } - return fmt.Errorf("spec.with is invalid against EngramTemplate configSchema: %v", errs) - } +func validateWithBlock(engram *bubushv1alpha1.Engram, cfg *config.ControllerConfig, template *v1alpha1.EngramTemplate) error { + if engram.Spec.With == nil || len(engram.Spec.With.Raw) == 0 { + return nil + } + b := trimLeadingSpace(engram.Spec.With.Raw) + if err := ensureJSONObject("spec.with", b); err != nil { + return err + } + maxBytes := pickMaxInline(cfg) + if err := enforceMaxBytes("spec.with", engram.Spec.With.Raw, maxBytes); err != nil { + return err + } + if template.Spec.ConfigSchema != nil && len(template.Spec.ConfigSchema.Raw) > 0 { + if err := validateJSONAgainstSchema(engram.Spec.With.Raw, template.Spec.ConfigSchema.Raw, "EngramTemplate"); err != nil { + return err } } return nil diff --git a/internal/webhook/v1alpha1/impulse_webhook.go b/internal/webhook/v1alpha1/impulse_webhook.go index 6c70be6..11474b6 100644 --- a/internal/webhook/v1alpha1/impulse_webhook.go +++ b/internal/webhook/v1alpha1/impulse_webhook.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - "github.com/xeipuuv/gojsonschema" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -183,33 +182,17 @@ func (v *ImpulseCustomValidator) validateWithBlock(impulse *bubushv1alpha1.Impul if impulse.Spec.With == nil || len(impulse.Spec.With.Raw) == 0 { return nil } - b := impulse.Spec.With.Raw - for len(b) > 0 && (b[0] == ' ' || b[0] == '\n' || b[0] == '\t' || b[0] == '\r') { - b = b[1:] - } - if len(b) > 0 && b[0] != '{' { - return fmt.Errorf("spec.with must be a JSON object") - } - maxBytes := v.Config.Engram.EngramControllerConfig.DefaultMaxInlineSize - if maxBytes == 0 { - maxBytes = 1024 + b := trimLeadingSpace(impulse.Spec.With.Raw) + if err := ensureJSONObject("spec.with", b); err != nil { + return err } - if len(impulse.Spec.With.Raw) > maxBytes { - return fmt.Errorf("spec.with too large (%d bytes). Provide large payloads via object storage and references instead of inlining", len(impulse.Spec.With.Raw)) + maxBytes := pickMaxInline(v.Config) + if err := enforceMaxBytes("spec.with", impulse.Spec.With.Raw, maxBytes); err != nil { + return err } if template.Spec.ConfigSchema != nil && len(template.Spec.ConfigSchema.Raw) > 0 { - schemaLoader := gojsonschema.NewStringLoader(string(template.Spec.ConfigSchema.Raw)) - documentLoader := gojsonschema.NewStringLoader(string(impulse.Spec.With.Raw)) - result, err := gojsonschema.Validate(schemaLoader, documentLoader) - if err != nil { - return fmt.Errorf("error validating spec.with against ImpulseTemplate schema: %w", err) - } - if !result.Valid() { - var errs []string - for _, desc := range result.Errors() { - errs = append(errs, desc.String()) - } - return fmt.Errorf("spec.with is invalid against ImpulseTemplate schema: %v", errs) + if err := validateJSONAgainstSchema(impulse.Spec.With.Raw, template.Spec.ConfigSchema.Raw, "ImpulseTemplate"); err != nil { + return err } } return nil @@ -219,33 +202,17 @@ func (v *ImpulseCustomValidator) validateMappingBlock(impulse *bubushv1alpha1.Im if impulse.Spec.Mapping == nil || len(impulse.Spec.Mapping.Raw) == 0 { return nil } - b := impulse.Spec.Mapping.Raw - for len(b) > 0 && (b[0] == ' ' || b[0] == '\n' || b[0] == '\t' || b[0] == '\r') { - b = b[1:] - } - if len(b) > 0 && b[0] != '{' { - return fmt.Errorf("spec.mapping must be a JSON object") - } - maxBytes := v.Config.Engram.EngramControllerConfig.DefaultMaxInlineSize - if maxBytes == 0 { - maxBytes = 1024 + b := trimLeadingSpace(impulse.Spec.Mapping.Raw) + if err := ensureJSONObject("spec.mapping", b); err != nil { + return err } - if len(impulse.Spec.Mapping.Raw) > maxBytes { - return fmt.Errorf("spec.mapping too large (%d bytes). Provide large payloads via object storage and references instead of inlining", len(impulse.Spec.Mapping.Raw)) + maxBytes := pickMaxInline(v.Config) + if err := enforceMaxBytes("spec.mapping", impulse.Spec.Mapping.Raw, maxBytes); err != nil { + return err } if template.Spec.ContextSchema != nil && len(template.Spec.ContextSchema.Raw) > 0 { - schemaLoader := gojsonschema.NewStringLoader(string(template.Spec.ContextSchema.Raw)) - documentLoader := gojsonschema.NewStringLoader(string(impulse.Spec.Mapping.Raw)) - result, err := gojsonschema.Validate(schemaLoader, documentLoader) - if err != nil { - return fmt.Errorf("error validating spec.mapping against ImpulseTemplate schema: %w", err) - } - if !result.Valid() { - var errs []string - for _, desc := range result.Errors() { - errs = append(errs, desc.String()) - } - return fmt.Errorf("spec.mapping is invalid against ImpulseTemplate schema: %v", errs) + if err := validateJSONAgainstSchema(impulse.Spec.Mapping.Raw, template.Spec.ContextSchema.Raw, "ImpulseTemplate"); err != nil { + return err } } return nil diff --git a/internal/webhook/v1alpha1/story_webhook.go b/internal/webhook/v1alpha1/story_webhook.go index 17d1670..16a26fe 100644 --- a/internal/webhook/v1alpha1/story_webhook.go +++ b/internal/webhook/v1alpha1/story_webhook.go @@ -155,28 +155,51 @@ func (v *StoryCustomValidator) ValidateDelete(ctx context.Context, obj runtime.O // - All items in step.Needs must reference existing step names (no self-dependency) // - For streaming pattern PerStory, referenced Engrams must be long-running modes is enforced at reconcile; here we only validate shape func (v *StoryCustomValidator) validateStory(ctx context.Context, story *bubushv1alpha1.Story) error { - // Add a total size check to prevent etcd overload. + if err := validateStorySize(story); err != nil { + return err + } + maxSize := pickStoryMaxWithSize(v.Config) + if err := validateOutputSize(story, maxSize); err != nil { + return err + } + if err := validateStepsShape(ctx, v, story, maxSize); err != nil { + return err + } + if err := validateNeedsExistence(story); err != nil { + return err + } + return nil +} + +func validateStorySize(story *bubushv1alpha1.Story) error { rawStory, err := json.Marshal(story) if err != nil { - // This should not happen with valid Kubernetes objects. return fmt.Errorf("internal error: failed to marshal story for size validation: %w", err) } - // Use a safe, hardcoded limit slightly below the typical 1.5MiB etcd limit. const maxTotalStorySizeBytes = 1 * 1024 * 1024 // 1 MiB if len(rawStory) > maxTotalStorySizeBytes { return fmt.Errorf("story size of %d bytes exceeds maximum allowed size of %d bytes", len(rawStory), maxTotalStorySizeBytes) } + return nil +} - seen := make(map[string]struct{}) - maxSize := v.Config.MaxStoryWithBlockSizeBytes +func pickStoryMaxWithSize(cfg *config.ControllerConfig) int { + maxSize := cfg.MaxStoryWithBlockSizeBytes if maxSize <= 0 { maxSize = config.DefaultControllerConfig().MaxStoryWithBlockSizeBytes } + return maxSize +} +func validateOutputSize(story *bubushv1alpha1.Story, maxSize int) error { if story.Spec.Output != nil && len(story.Spec.Output.Raw) > maxSize { return fmt.Errorf("story 'output' block size of %d bytes exceeds maximum allowed size of %d bytes", len(story.Spec.Output.Raw), maxSize) } + return nil +} +func validateStepsShape(ctx context.Context, v *StoryCustomValidator, story *bubushv1alpha1.Story, maxSize int) error { + seen := make(map[string]struct{}) for i := range story.Spec.Steps { s := &story.Spec.Steps[i] if _, exists := seen[s.Name]; exists { @@ -189,44 +212,47 @@ func (v *StoryCustomValidator) validateStory(ctx context.Context, story *bubushv if s.Ref == nil && s.Type == "" { return fmt.Errorf("step '%s' must set either ref or type", s.Name) } - for _, dep := range s.Needs { if dep == s.Name { return fmt.Errorf("step '%s' cannot depend on itself in needs", s.Name) } - // second pass below validates that all dependencies exist; no-op here to avoid SA4006 } - if s.Ref != nil { if err := v.validateEngramStep(ctx, story.Namespace, s); err != nil { return err } } - if s.With != nil && len(s.With.Raw) > maxSize { return fmt.Errorf("step '%s': 'with' block size of %d bytes exceeds maximum allowed size of %d bytes", s.Name, len(s.With.Raw), maxSize) } + if err := validatePrimitiveShapes(s); err != nil { + return err + } + } + return nil +} - // Validate the 'with' block for known primitive types that require a specific schema. - if s.Type == enums.StepTypeExecuteStory { - if s.With == nil { - return fmt.Errorf("step '%s' of type 'executeStory' requires a 'with' block", s.Name) - } - var withConfig struct { - StoryRef struct { - Name string `json:"name"` - } `json:"storyRef"` - } - if err := json.Unmarshal(s.With.Raw, &withConfig); err != nil { - return fmt.Errorf("step '%s' has an invalid 'with' block for type 'executeStory': %w", s.Name, err) - } - if withConfig.StoryRef.Name == "" { - return fmt.Errorf("step '%s' of type 'executeStory' requires 'with.storyRef.name' to be set", s.Name) - } +func validatePrimitiveShapes(s *bubushv1alpha1.Step) error { + if s.Type == enums.StepTypeExecuteStory { + if s.With == nil { + return fmt.Errorf("step '%s' of type 'executeStory' requires a 'with' block", s.Name) + } + var withConfig struct { + StoryRef struct { + Name string `json:"name"` + } `json:"storyRef"` + } + if err := json.Unmarshal(s.With.Raw, &withConfig); err != nil { + return fmt.Errorf("step '%s' has an invalid 'with' block for type 'executeStory': %w", s.Name, err) + } + if withConfig.StoryRef.Name == "" { + return fmt.Errorf("step '%s' of type 'executeStory' requires 'with.storyRef.name' to be set", s.Name) } } + return nil +} - // Second pass: ensure all needs exist +func validateNeedsExistence(story *bubushv1alpha1.Story) error { stepNames := make(map[string]struct{}, len(story.Spec.Steps)) for i := range story.Spec.Steps { stepNames[story.Spec.Steps[i].Name] = struct{}{} diff --git a/internal/webhook/v1alpha1/validate_helpers.go b/internal/webhook/v1alpha1/validate_helpers.go new file mode 100644 index 0000000..aa25f0c --- /dev/null +++ b/internal/webhook/v1alpha1/validate_helpers.go @@ -0,0 +1,59 @@ +package v1alpha1 + +import ( + "fmt" + + "github.com/xeipuuv/gojsonschema" + + "github.com/bubustack/bobrapet/internal/config" +) + +func trimLeadingSpace(b []byte) []byte { + for len(b) > 0 { + if b[0] == ' ' || b[0] == '\n' || b[0] == '\t' || b[0] == '\r' { + b = b[1:] + continue + } + break + } + return b +} + +func ensureJSONObject(field string, b []byte) error { + if len(b) > 0 && b[0] != '{' { + return fmt.Errorf("%s must be a JSON object", field) + } + return nil +} + +func pickMaxInline(cfg *config.ControllerConfig) int { + maxBytes := cfg.Engram.EngramControllerConfig.DefaultMaxInlineSize + if maxBytes == 0 { + maxBytes = 1024 + } + return maxBytes +} + +func enforceMaxBytes(field string, raw []byte, max int) error { + if len(raw) > max { + return fmt.Errorf("%s too large (%d bytes). Provide large payloads via object storage and references instead of inlining", field, len(raw)) + } + return nil +} + +func validateJSONAgainstSchema(doc []byte, schema []byte, schemaName string) error { + schemaLoader := gojsonschema.NewStringLoader(string(schema)) + documentLoader := gojsonschema.NewStringLoader(string(doc)) + result, err := gojsonschema.Validate(schemaLoader, documentLoader) + if err != nil { + return fmt.Errorf("error validating against %s schema: %w", schemaName, err) + } + if !result.Valid() { + var errs []string + for _, desc := range result.Errors() { + errs = append(errs, desc.String()) + } + return fmt.Errorf("object is invalid against %s schema: %v", schemaName, errs) + } + return nil +}