Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,10 @@ jobs:
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
# PRs
type=ref,event=pr
# Default branch builds
type=raw,value=latest,enable={{is_default_branch}}
type=ref,event=branch
type=sha
# Semver tags
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
Expand Down Expand Up @@ -85,4 +82,3 @@ jobs:
if: always()
with:
sarif_file: 'trivy-results.sarif'

3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ go.work
*.swo
*~

.DS_Store
.DS_Store
.gocache/
59 changes: 59 additions & 0 deletions api/runs/v1alpha1/steprun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ type StepRunSpec struct {
// Can be a list to support fanning out to multiple parallel steps.
// +optional
DownstreamTargets []DownstreamTarget `json:"downstreamTargets,omitempty"`

// RequestedManifest lists the metadata fields the controller expects the SDK
// to materialize alongside the offloaded output. These are derived from CEL expressions
// that reference this step's outputs (e.g., len(steps.foo.output.bar)).
// +optional
RequestedManifest []ManifestRequest `json:"requestedManifest,omitempty"`
}

// DownstreamTarget defines the destination for an Engram's output in real-time execution mode.
Expand Down Expand Up @@ -140,7 +146,50 @@ type TerminateTarget struct {
StopMode enums.StopMode `json:"stopMode"`
}

// ManifestOperation enumerates the metadata operations supported for step manifests.
type ManifestOperation string

const (
// ManifestOperationExists records whether the referenced field exists/non-nil.
ManifestOperationExists ManifestOperation = "exists"
// ManifestOperationLength records the length of the referenced field when it is an array, map, or string.
ManifestOperationLength ManifestOperation = "length"
)

// ManifestRequest describes a single output field and the metadata operations required for it.
type ManifestRequest struct {
// Path is the dot/bracket notation path relative to the step output root.
// Examples: "result.items", "tools", "items[0].id".
// +kubebuilder:validation:MinLength=1
Path string `json:"path"`
// Operations lists the metadata operations that should be computed for this path.
// Defaults to ["exists"] when omitted.
// +optional
Operations []ManifestOperation `json:"operations,omitempty"`
}

// StepManifestData captures the metadata emitted by the SDK for a single manifest path.
type StepManifestData struct {
// Exists indicates whether the referenced field was present and non-nil.
// +optional
Exists *bool `json:"exists,omitempty"`
// Length contains the computed length when requested and applicable.
// +optional
Length *int64 `json:"length,omitempty"`
// Truncated signals that the SDK could not compute the full metadata due to limits.
// +optional
Truncated bool `json:"truncated,omitempty"`
// Error contains a warning message emitted by the SDK when it cannot honour the manifest request.
// +optional
Error string `json:"error,omitempty"`
// Sample holds an optional representative slice of the data (implementation-defined).
// +optional
Sample *runtime.RawExtension `json:"sample,omitempty"`
}

// StepRunStatus tracks the detailed execution state of this individual step
// +kubebuilder:validation:XValidation:message="status.conditions reason field must be <= 64 characters",rule="!has(self.conditions) || self.conditions.all(c, !has(c.reason) || size(c.reason) <= 64)"
// +kubebuilder:validation:XValidation:message="status.conditions message field must be <= 2048 characters",rule="!has(self.conditions) || self.conditions.all(c, !has(c.message) || size(c.message) <= 2048)"
type StepRunStatus struct {
// observedGeneration is the most recent generation observed for this StepRun. It corresponds to the
// StepRun's generation, which is updated on mutation by the API Server.
Expand Down Expand Up @@ -198,6 +247,16 @@ type StepRunStatus struct {
// Step coordination - which steps must complete before this one can start
// Uses the same "needs" terminology as our Story API for consistency
Needs []string `json:"needs,omitempty"` // StepRun names that must complete first

// Manifest contains metadata captured for this step's output that enables CEL expressions
// to execute without hydrating large blobs from storage.
// The map key matches the ManifestRequest path.
// +optional
Manifest map[string]StepManifestData `json:"manifest,omitempty"`

// ManifestWarnings contains any warnings produced while computing manifest data (e.g., unsupported operations).
// +optional
ManifestWarnings []string `json:"manifestWarnings,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
4 changes: 4 additions & 0 deletions api/runs/v1alpha1/storyrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type StoryRunSpec struct {
}

// StoryRunStatus tracks the current state and results of this story execution
// +kubebuilder:validation:XValidation:rule="!has(self.conditions) || self.conditions.exists(c, c.type == 'Ready')",message="status.conditions must include Ready when conditions are set"
// +kubebuilder:validation:XValidation:rule="!has(self.conditions) || self.conditions.all(c, has(c.lastTransitionTime))",message="status.conditions entries must set lastTransitionTime"
// +kubebuilder:validation:XValidation:message="status.conditions reason field must be <= 64 characters",rule="!has(self.conditions) || self.conditions.all(c, !has(c.reason) || size(c.reason) <= 64)"
// +kubebuilder:validation:XValidation:message="status.conditions message field must be <= 2048 characters",rule="!has(self.conditions) || self.conditions.all(c, !has(c.message) || size(c.message) <= 2048)"
type StoryRunStatus struct {
// observedGeneration is the most recent generation observed for this StoryRun. It corresponds to the
// StoryRun's generation, which is updated on mutation by the API Server.
Expand Down
69 changes: 69 additions & 0 deletions api/runs/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/v1alpha1/shared_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type ExecutionOverrides struct {
ServiceAccountName *string `json:"serviceAccountName,omitempty"`

// AutomountServiceAccountToken controls whether a service account token should be automatically mounted.
// Defaults to false.
// +kubebuilder:default=true
// +optional
AutomountServiceAccountToken *bool `json:"automountServiceAccountToken,omitempty"`

Expand Down
2 changes: 2 additions & 0 deletions api/v1alpha1/story_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Story struct {
}

// StorySpec defines what the workflow does and how it should run
// +kubebuilder:validation:XValidation:rule="self.steps.all(step, has(step.ref) != has(step.type))",message="each step must set exactly one of ref or type"
// +kubebuilder:validation:XValidation:rule="self.steps.all(step, self.steps.exists_one(other, other.name == step.name))",message="step names must be unique"
type StorySpec struct {
// Pattern specifies the execution model for the Story.
// "batch" stories are run to completion via a StoryRun.
Expand Down
82 changes: 73 additions & 9 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ limitations under the License.
package main

import (
"context"
"crypto/tls"
"flag"
"os"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -40,6 +43,7 @@ import (
setup "github.com/bubustack/bobrapet/internal/setup"
"github.com/bubustack/bobrapet/pkg/cel"
"github.com/bubustack/bobrapet/pkg/logging"
"github.com/bubustack/bobrapet/pkg/observability"

catalogv1alpha1 "github.com/bubustack/bobrapet/api/catalog/v1alpha1"
runsv1alpha1 "github.com/bubustack/bobrapet/api/runs/v1alpha1"
Expand Down Expand Up @@ -75,6 +79,11 @@ func main() {
var secureMetrics bool
var enableHTTP2 bool
var tlsOpts []func(*tls.Config)

// Operator configuration flags
var operatorConfigNamespace string
var operatorConfigName string

flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
"Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -92,6 +101,13 @@ func main() {
flag.StringVar(&metricsCertKey, "metrics-cert-key", "tls.key", "The name of the metrics server key file.")
flag.BoolVar(&enableHTTP2, "enable-http2", false,
"If set, HTTP/2 will be enabled for the metrics and webhook servers")

// Operator configuration flags (similar to kube-controller-manager --kubeconfig pattern)
flag.StringVar(&operatorConfigNamespace, "config-namespace", "bobrapet-system",
"The namespace where the operator configuration ConfigMap resides.")
flag.StringVar(&operatorConfigName, "config-name", "bobrapet-operator-config",
"The name of the operator configuration ConfigMap.")

opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -140,7 +156,12 @@ func main() {
managerCtx := ctrl.SetupSignalHandler()
setup.SetupIndexers(managerCtx, mgr)

operatorConfigManager, controllerConfig, configResolver, celEvaluator := mustInitOperatorServices(mgr)
operatorConfigManager, controllerConfig, configResolver, celEvaluator := mustInitOperatorServices(
mgr,
managerCtx,
operatorConfigNamespace,
operatorConfigName,
)

deps := config.ControllerDependencies{
Client: mgr.GetClient(),
Expand Down Expand Up @@ -214,19 +235,51 @@ func buildMetricsServerOptions(

func mustInitOperatorServices(
mgr ctrl.Manager,
startupCtx context.Context,
configNamespace string,
configName string,
) (*config.OperatorConfigManager, *config.ControllerConfig, *config.Resolver, *cel.Evaluator) {
operatorConfigManager := config.NewOperatorConfigManager(
mgr.GetClient(),
"bobrapet-system",
"bobrapet-operator-config",
configNamespace,
configName,
)
setupLog.Info("Operator configuration manager initialized")
if err := mgr.Add(operatorConfigManager); err != nil {
setupLog.Error(err, "unable to add operator config manager to manager")
operatorConfigManager.SetAPIReader(mgr.GetAPIReader())

setupLog.Info("Operator configuration manager initialized",
"configNamespace", configNamespace,
"configName", configName)

// Setup the config manager as a reconciler (event-driven)
if err := operatorConfigManager.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to setup operator config manager controller")
os.Exit(1)
}
setupLog.Info("Operator config manager controller registered")

loadCtx, cancel := context.WithTimeout(startupCtx, 15*time.Second)
defer cancel()
if err := operatorConfigManager.LoadInitial(loadCtx); err != nil {
if apierrors.IsNotFound(err) {
setupLog.Info("Operator config map not found; continuing with defaults",
"configNamespace", configNamespace,
"configName", configName)
} else {
setupLog.Error(err, "failed to load operator configuration during startup",
"configNamespace", configNamespace,
"configName", configName)
os.Exit(1)
}
} else {
setupLog.Info("Operator configuration loaded from ConfigMap",
"configNamespace", configNamespace,
"configName", configName)
}

controllerConfig := operatorConfigManager.GetControllerConfig()
setupLog.Info("Controller configuration loaded")
config.EnableTelemetry(controllerConfig.TelemetryEnabled)
observability.EnableTracing(controllerConfig.TelemetryEnabled)
configResolver := config.NewResolver(mgr.GetClient(), operatorConfigManager)
setupLog.Info("Configuration resolver initialized")
celLogger := logging.NewCELLogger(ctrl.Log)
Expand Down Expand Up @@ -261,6 +314,12 @@ func mustSetupControllers(
setupLog.Error(err, "unable to create controller", "controller", "Impulse")
os.Exit(1)
}
if err := (&controller.RealtimeEngramReconciler{
ControllerDependencies: deps,
}).SetupWithManager(mgr, controllerConfig.BuildEngramControllerOptions()); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RealtimeEngram")
os.Exit(1)
}
if err := (&runscontroller.StoryRunReconciler{
ControllerDependencies: deps,
}).SetupWithManager(mgr, controllerConfig.BuildStoryRunControllerOptions()); err != nil {
Expand Down Expand Up @@ -293,7 +352,8 @@ func setupWebhooksIfEnabled(mgr ctrl.Manager, operatorConfigManager *config.Oper
}
setupLog.Info("setting up webhooks")
if err := (&webhookv1alpha1.StoryWebhook{
Config: operatorConfigManager.GetControllerConfig(),
Config: operatorConfigManager.GetControllerConfig(),
ConfigManager: operatorConfigManager,
}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "Story")
os.Exit(1)
Expand All @@ -311,12 +371,16 @@ func setupWebhooksIfEnabled(mgr ctrl.Manager, operatorConfigManager *config.Oper
os.Exit(1)
}
if err := (&webhookrunsv1alpha1.StoryRunWebhook{
Config: operatorConfigManager.GetControllerConfig(),
Config: operatorConfigManager.GetControllerConfig(),
ConfigManager: operatorConfigManager,
}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "StoryRun")
os.Exit(1)
}
if err := (&webhookrunsv1alpha1.StepRunWebhook{}).SetupWebhookWithManager(mgr); err != nil {
if err := (&webhookrunsv1alpha1.StepRunWebhook{
Config: operatorConfigManager.GetControllerConfig(),
ConfigManager: operatorConfigManager,
}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "StepRun")
os.Exit(1)
}
Expand Down
Loading
Loading