diff --git a/cmd/main.go b/cmd/main.go index e4bad31..aaf398a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -42,7 +42,7 @@ import ( "github.com/gnmic/operator/internal/apiserver" "github.com/gnmic/operator/internal/controller" "github.com/gnmic/operator/internal/controller/discovery/core" - "github.com/gnmic/operator/internal/controller/discovery/registry" + "github.com/gnmic/operator/internal/controller/discovery" webhookv1alpha1 "github.com/gnmic/operator/internal/webhook/v1alpha1" //+kubebuilder:scaffold:imports ) @@ -86,7 +86,7 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - discoveryRegistry := registry.NewRegistry[types.NamespacedName, []core.DiscoveryMessage]() + discoveryRegistry := discovery.NewRegistry[types.NamespacedName, core.DiscoveryRegistryValue]() mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, diff --git a/internal/apiserver/apiserver.go b/internal/apiserver/apiserver.go index 17e5c82..705b277 100644 --- a/internal/apiserver/apiserver.go +++ b/internal/apiserver/apiserver.go @@ -6,7 +6,7 @@ import ( "github.com/gnmic/operator/internal/controller" "github.com/gnmic/operator/internal/controller/discovery/core" - "github.com/gnmic/operator/internal/controller/discovery/registry" + "github.com/gnmic/operator/internal/controller/discovery" "k8s.io/apimachinery/pkg/types" ) @@ -14,7 +14,7 @@ type APIServer struct { Server *http.Server clusterReconciler *controller.ClusterReconciler - DiscoveryRegistry *registry.Registry[types.NamespacedName, []core.DiscoveryMessage] + DiscoveryRegistry *discovery.Registry[types.NamespacedName, core.DiscoveryRegistryValue] } func New(addr string, clusterReconciler *controller.ClusterReconciler) *APIServer { diff --git a/internal/controller/discovery/client.go b/internal/controller/discovery/client.go index d23c043..cb02161 100644 --- a/internal/controller/discovery/client.go +++ b/internal/controller/discovery/client.go @@ -1,23 +1,27 @@ package discovery -// File may become obsolete, depends on how the logic to compare desired vs. existing state will get implemented - import ( "context" "sigs.k8s.io/controller-runtime/pkg/client" gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" - "github.com/gnmic/operator/internal/controller/discovery/core" ) -func FetchExistingTargets(ctx context.Context, c client.Client, ts gnmicv1alpha1.TargetSource) ([]gnmicv1alpha1.Target, error) { +func fetchExistingTargets( + ctx context.Context, + c client.Client, + ts *gnmicv1alpha1.TargetSource, +) ([]gnmicv1alpha1.Target, error) { + var targetList gnmicv1alpha1.TargetList - err := c.List(ctx, &targetList, + err := c.List( + ctx, + &targetList, client.InNamespace(ts.Namespace), client.MatchingLabels{ - core.LabelTargetSourceName: ts.Name, + LabelTargetSourceName: ts.Name, }, ) if err != nil { diff --git a/internal/controller/discovery/core/const.go b/internal/controller/discovery/const.go similarity index 81% rename from internal/controller/discovery/core/const.go rename to internal/controller/discovery/const.go index 82a5962..ac7a57f 100644 --- a/internal/controller/discovery/core/const.go +++ b/internal/controller/discovery/const.go @@ -1,4 +1,4 @@ -package core +package discovery const ( // Labels diff --git a/internal/controller/discovery/core/loader_interface.go b/internal/controller/discovery/core/loader_interface.go index 8964be8..72f1898 100644 --- a/internal/controller/discovery/core/loader_interface.go +++ b/internal/controller/discovery/core/loader_interface.go @@ -14,7 +14,7 @@ type Loader interface { Name() string // Start begins discovery and pushes target snapshots or events into the out channel - // The loader must stop cleanly when ctx is cancelled + // The loader must stop cleanly when ctx is canceled Start( ctx context.Context, targetsourceName types.NamespacedName, diff --git a/internal/controller/discovery/core/types.go b/internal/controller/discovery/core/types.go index 3f6957a..2c37fc7 100644 --- a/internal/controller/discovery/core/types.go +++ b/internal/controller/discovery/core/types.go @@ -1,9 +1,24 @@ package core +type DiscoveryRegistryValue struct { + Channel chan<- []DiscoveryMessage + WebhookEnabled bool +} + type LoaderConfig struct { ChunkSize int } +// EventAction represents the type of a discovery event +type EventAction int + +const ( + // EventDelete indicates that a target should be removed + EventDelete EventAction = iota + // EventApply indicates that a target should be applied (created or updated) + EventApply +) + // DiscoveredTarget represents a target discovered from an external source // before it is materialized as a Kubernetes Target CR type DiscoveredTarget struct { @@ -12,14 +27,6 @@ type DiscoveredTarget struct { Labels map[string]string } -const ( - DELETE EventAction = 0 - CREATE EventAction = 1 - UPDATE EventAction = 2 -) - -type EventAction int - type DiscoveryEvent struct { Target DiscoveredTarget Event EventAction diff --git a/internal/controller/discovery/discovery.go b/internal/controller/discovery/discovery.go new file mode 100644 index 0000000..3dc51bd --- /dev/null +++ b/internal/controller/discovery/discovery.go @@ -0,0 +1,17 @@ +package discovery + +// Package discovery implements the discovery runtime subsystem. +// +// The discovery subsystem is responsible for: +// - Receiving discovery data from external providers (loaders, webhooks). +// - Supervising discovery pipelines and restart semantics. +// - Applying discovered state to Kubernetes Targets. +// +// The package is structured into the following subpackages: +// - core: message contracts, snapshot/event types, and transport helpers. +// - pipeline: supervision, restart policies, and lifecycle control. +// - reconciler: snapshot + event target state application logic. +// - loaders: target discovery providers (HTTP, webhook, etc.). +// - registry: key -> channel registry. +// +// At the moment, the targetsource controller imports specific subpackages explicitly. diff --git a/internal/controller/discovery/loader.go b/internal/controller/discovery/loaders.go similarity index 100% rename from internal/controller/discovery/loader.go rename to internal/controller/discovery/loaders.go diff --git a/internal/controller/discovery/loaders/all/all.go b/internal/controller/discovery/loaders/all/all.go deleted file mode 100644 index 3590cda..0000000 --- a/internal/controller/discovery/loaders/all/all.go +++ /dev/null @@ -1,5 +0,0 @@ -package all - -import ( - _ "github.com/gnmic/operator/internal/controller/discovery/loaders/http" -) diff --git a/internal/controller/discovery/loaders/http/loader.go b/internal/controller/discovery/loaders/http/loader.go index 09bb7d6..d7d5961 100644 --- a/internal/controller/discovery/loaders/http/loader.go +++ b/internal/controller/discovery/loaders/http/loader.go @@ -10,6 +10,7 @@ import ( gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery/core" + loaderUtils "github.com/gnmic/operator/internal/controller/discovery/loaders/utils" "github.com/google/uuid" ) @@ -66,7 +67,7 @@ func (l *Loader) Start( }, } - if err := core.SendSnapshot(ctx, out, targets, snapshotID, l.cfg.ChunkSize); err != nil { + if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.cfg.ChunkSize); err != nil { return err } } diff --git a/internal/controller/discovery/core/helpers.go b/internal/controller/discovery/loaders/utils/send.go similarity index 67% rename from internal/controller/discovery/core/helpers.go rename to internal/controller/discovery/loaders/utils/send.go index f24b50c..3cfba8d 100644 --- a/internal/controller/discovery/core/helpers.go +++ b/internal/controller/discovery/loaders/utils/send.go @@ -1,12 +1,14 @@ -package core +package utils import ( "context" "fmt" + + "github.com/gnmic/operator/internal/controller/discovery/core" ) // sendMessages sends discovery messages over a channel in a context-aware manner -func sendMessages(ctx context.Context, out chan<- []DiscoveryMessage, messages []DiscoveryMessage) error { +func sendMessages(ctx context.Context, out chan<- []core.DiscoveryMessage, messages []core.DiscoveryMessage) error { select { case <-ctx.Done(): return ctx.Err() @@ -30,14 +32,14 @@ func forEachChunk(total, chunkSize int, fn func(start, end int) error) error { } // createDiscoverySnapshots takes a list of discovered targets and returns chunked DiscoverySnapshots -func createDiscoverySnapshots(targets []DiscoveredTarget, snapshotID string, chunkSize int) []DiscoverySnapshot { - var snapshots []DiscoverySnapshot +func createDiscoverySnapshots(targets []core.DiscoveredTarget, snapshotID string, chunkSize int) []core.DiscoverySnapshot { + var snapshots []core.DiscoverySnapshot totalTargets := len(targets) totalChunks := (totalTargets + chunkSize - 1) / chunkSize _ = forEachChunk(totalTargets, chunkSize, func(i, end int) error { chunk := targets[i:end] - snapshots = append(snapshots, DiscoverySnapshot{ + snapshots = append(snapshots, core.DiscoverySnapshot{ Targets: chunk, SnapshotID: snapshotID, ChunkIndex: i / chunkSize, @@ -50,7 +52,7 @@ func createDiscoverySnapshots(targets []DiscoveredTarget, snapshotID string, chu } // SendSnapshot sends discovered targets as a snapshot over a channel in chunks -func SendSnapshot(ctx context.Context, out chan<- []DiscoveryMessage, targets []DiscoveredTarget, snapshotID string, chunkSize int) error { +func SendSnapshot(ctx context.Context, out chan<- []core.DiscoveryMessage, targets []core.DiscoveredTarget, snapshotID string, chunkSize int) error { if len(targets) == 0 { return fmt.Errorf("no targets in Snapshot") } @@ -58,7 +60,7 @@ func SendSnapshot(ctx context.Context, out chan<- []DiscoveryMessage, targets [] snapshots := createDiscoverySnapshots(targets, snapshotID, chunkSize) for _, snapshot := range snapshots { // Convert DiscoverySnapshot to DiscoveryMessage - messages := make([]DiscoveryMessage, 1) + messages := make([]core.DiscoveryMessage, 1) messages[0] = snapshot if err := sendMessages(ctx, out, messages); err != nil { @@ -69,8 +71,8 @@ func SendSnapshot(ctx context.Context, out chan<- []DiscoveryMessage, targets [] return nil } -func eventsToMessages(events []DiscoveryEvent) []DiscoveryMessage { - message := make([]DiscoveryMessage, len(events)) +func eventsToMessages(events []core.DiscoveryEvent) []core.DiscoveryMessage { + message := make([]core.DiscoveryMessage, len(events)) for i, event := range events { message[i] = event } @@ -78,7 +80,7 @@ func eventsToMessages(events []DiscoveryEvent) []DiscoveryMessage { } // SendEvents sends discovery messages over channel in a context-aware manner -func SendEvents(ctx context.Context, out chan<- []DiscoveryMessage, events []DiscoveryEvent, chunkSize int) error { +func SendEvents(ctx context.Context, out chan<- []core.DiscoveryMessage, events []core.DiscoveryEvent, chunkSize int) error { if len(events) == 0 { return fmt.Errorf("no events to process") } diff --git a/internal/controller/discovery/mapper.go b/internal/controller/discovery/mapper.go deleted file mode 100644 index 18470b2..0000000 --- a/internal/controller/discovery/mapper.go +++ /dev/null @@ -1,4 +0,0 @@ -package discovery - -// This file makes diff between existing and new targets -// file decides which targets to create/update/delete diff --git a/internal/controller/discovery/mapper_test.go b/internal/controller/discovery/mapper_test.go deleted file mode 100644 index 5844159..0000000 --- a/internal/controller/discovery/mapper_test.go +++ /dev/null @@ -1 +0,0 @@ -package discovery diff --git a/internal/controller/discovery/registry/registry.go b/internal/controller/discovery/registry.go similarity index 68% rename from internal/controller/discovery/registry/registry.go rename to internal/controller/discovery/registry.go index 093bd2c..0afa2b2 100644 --- a/internal/controller/discovery/registry/registry.go +++ b/internal/controller/discovery/registry.go @@ -1,4 +1,4 @@ -package registry +package discovery import ( "fmt" @@ -10,20 +10,20 @@ import ( // DO NOT USE a pointer type as K type Registry[K comparable, V any] struct { mu sync.RWMutex - m map[K]chan<- V + m map[K]V } func NewRegistry[K comparable, V any]() *Registry[K, V] { - return &Registry[K, V]{m: make(map[K]chan<- V)} + return &Registry[K, V]{m: make(map[K]V)} } -func (r *Registry[K, V]) Register(key K, ch chan<- V) error { +func (r *Registry[K, V]) Register(key K, value V) error { r.mu.Lock() defer r.mu.Unlock() if _, exists := r.m[key]; exists { return fmt.Errorf("already registered: %v", key) } - r.m[key] = ch + r.m[key] = value return nil } @@ -33,9 +33,9 @@ func (r *Registry[K, V]) Unregister(key K) { r.mu.Unlock() } -func (r *Registry[K, V]) Get(key K) (chan<- V, bool) { +func (r *Registry[K, V]) Get(key K) (V, bool) { r.mu.RLock() - ch, ok := r.m[key] + value, ok := r.m[key] r.mu.RUnlock() - return ch, ok + return value, ok } diff --git a/internal/controller/discovery/supervisor.go b/internal/controller/discovery/supervisor.go index 710381e..56fa687 100644 --- a/internal/controller/discovery/supervisor.go +++ b/internal/controller/discovery/supervisor.go @@ -25,12 +25,13 @@ type Supervisor struct { stopped bool } -// RestartPolicy defines the restart behavior for a component +// RestartPolicy defines restart behavior of a component type RestartPolicy struct { MaxRestarts int Backoff time.Duration } +// ComponentSpec defines a supervised component type ComponentSpec struct { Name string Run func(ctx context.Context) error diff --git a/internal/controller/discovery/target_applier.go b/internal/controller/discovery/target_reconciler.go similarity index 59% rename from internal/controller/discovery/target_applier.go rename to internal/controller/discovery/target_reconciler.go index 3c714bd..86470c6 100644 --- a/internal/controller/discovery/target_applier.go +++ b/internal/controller/discovery/target_reconciler.go @@ -20,8 +20,9 @@ type snapshotBuffer struct { complete bool } -// TargetApplier consumes discovered targets and applies them to Kubernetes -type TargetApplier struct { +// TargetReconciler consumes discovered targets and applies them to Kubernetes +type TargetReconciler struct { + ctx context.Context client client.Client scheme *runtime.Scheme targetSource *gnmicv1alpha1.TargetSource @@ -32,9 +33,9 @@ type TargetApplier struct { deferredEvents []core.DiscoveryEvent } -// NewTargetApplier wires a TargetApplier instance -func NewTargetApplier(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *TargetApplier { - return &TargetApplier{ +// NewTargetReconciler wires a TargetReconciler instance +func NewTargetReconciler(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *TargetReconciler { + return &TargetReconciler{ client: c, scheme: s, targetSource: ts, @@ -44,38 +45,40 @@ func NewTargetApplier(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.Targ // Run is a long‑running loop that receives target snapshots // and reconciles Target CRs accordingly -func (a *TargetApplier) Run(ctx context.Context) error { - logger := log.FromContext(ctx). +func (r *TargetReconciler) Run(ctx context.Context) error { + r.ctx = ctx + + logger := log.FromContext(r.ctx). WithValues( - "name", a.targetSource.Name, - "namespace", a.targetSource.Namespace, + "name", r.targetSource.Name, + "namespace", r.targetSource.Namespace, ) - logger.Info("target applier started") + logger.Info("target reconciler started") - for ctx.Err() == nil { + for r.ctx.Err() == nil { select { - case batch, ok := <-a.in: + case batch, ok := <-r.in: if !ok { // Channel closed, pipeline is shutting down - logger.Info("input channel closed, stopping target applier") + logger.Info("input channel closed, stopping target reconciler") return nil } - a.queue = append(a.queue, batch...) + r.queue = append(r.queue, batch...) case <-ctx.Done(): - logger.Info("context canceled, stopping target applier") + logger.Info("context canceled, stopping target reconciler") return nil } - for len(a.queue) > 0 { + for len(r.queue) > 0 { if ctx.Err() != nil { return nil // why return nil? } - msg := a.queue[0] - a.queue = a.queue[1:] + msg := r.queue[0] + r.queue = r.queue[1:] - if err := a.processMessage(ctx, msg, logger); err != nil { + if err := r.processMessage(r.ctx, msg, logger); err != nil { // Returning error lets the supervisor (controller) // tear down and restart the pipeline via reconciliation // Q: when to return an error vs just log and continue? @@ -85,11 +88,11 @@ func (a *TargetApplier) Run(ctx context.Context) error { } } - logger.Info("target applier stopped") + logger.Info("target reconciler stopped") return nil } -func (a *TargetApplier) processMessage(ctx context.Context, message core.DiscoveryMessage, logger logr.Logger) error { +func (r *TargetReconciler) processMessage(ctx context.Context, message core.DiscoveryMessage, logger logr.Logger) error { if err := ctx.Err(); err != nil { return err } @@ -104,7 +107,7 @@ func (a *TargetApplier) processMessage(ctx context.Context, message core.Discove "index", msg.ChunkIndex, "targetCount", len(msg.Targets), ) - return a.processSnapshot(ctx, msg, logger) + return r.processSnapshot(ctx, msg, logger) case core.DiscoveryEvent: // Process individual event-driven update @@ -112,7 +115,7 @@ func (a *TargetApplier) processMessage(ctx context.Context, message core.Discove "received discovery event", "target", msg.Target.Name, ) - return a.processEvent(ctx, msg, logger) + return r.processEvent(ctx, msg, logger) default: return fmt.Errorf("unknonw discovery message type %T", msg) @@ -120,18 +123,18 @@ func (a *TargetApplier) processMessage(ctx context.Context, message core.Discove } // processSnapshot takes a complete snapshot of discovered targets and reconciles Target CRs accordingly -func (a *TargetApplier) processSnapshot(ctx context.Context, chunk core.DiscoverySnapshot, logger logr.Logger) error { - if a.activeSnapshot == nil { - a.startNewSnapshot(chunk, logger) +func (r *TargetReconciler) processSnapshot(ctx context.Context, chunk core.DiscoverySnapshot, logger logr.Logger) error { + if r.activeSnapshot == nil { + r.startNewSnapshot(chunk, logger) return nil } - snapshot := a.activeSnapshot + snapshot := r.activeSnapshot // Check if a new snapshot arrived if snapshot.snapshotID != chunk.SnapshotID { // If current snapshot is complete apply it first if snapshot.complete { - if err := a.applySnapshot(ctx, snapshot, logger); err != nil { + if err := r.applySnapshot(ctx, snapshot, logger); err != nil { return err } } else { @@ -144,40 +147,40 @@ func (a *TargetApplier) processSnapshot(ctx context.Context, chunk core.Discover } // Start collecting the new snapshot - a.startNewSnapshot(chunk, logger) + r.startNewSnapshot(chunk, logger) return nil } - return a.collectSnapshot(chunk, logger) + return r.collectSnapshot(chunk, logger) } -func (a *TargetApplier) startNewSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) { - a.activeSnapshot = &snapshotBuffer{ +func (r *TargetReconciler) startNewSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) { + r.activeSnapshot = &snapshotBuffer{ snapshotID: chunk.SnapshotID, totalChunks: chunk.TotalChunks, received: make(map[int][]core.DiscoveredTarget), complete: false, } // Delete buffered events that will be current with new snapshot - a.deferredEvents = nil + r.deferredEvents = nil - a.collectSnapshot(chunk, logger) + r.collectSnapshot(chunk, logger) } -func (a *TargetApplier) collectSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) error { - snapshot := a.activeSnapshot +func (r *TargetReconciler) collectSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) error { + snapshot := r.activeSnapshot if chunk.TotalChunks != snapshot.totalChunks { logger.Error(nil, "snapshot totalChunks mismatch", "snapshotID", snapshot.snapshotID) } if chunk.ChunkIndex < 0 || chunk.ChunkIndex >= snapshot.totalChunks { logger.Error(nil, "snapshot chunk index out of range", "index", chunk.ChunkIndex) - a.activeSnapshot = nil + r.activeSnapshot = nil return nil } if _, exists := snapshot.received[chunk.ChunkIndex]; exists { logger.Error(nil, "duplicate snapshot chunk", "index", chunk.ChunkIndex) - a.activeSnapshot = nil + r.activeSnapshot = nil return nil } @@ -190,10 +193,10 @@ func (a *TargetApplier) collectSnapshot(chunk core.DiscoverySnapshot, logger log return nil } -func (a *TargetApplier) applySnapshot(ctx context.Context, snapshot *snapshotBuffer, logger logr.Logger) error { +func (r *TargetReconciler) applySnapshot(ctx context.Context, snapshot *snapshotBuffer, logger logr.Logger) error { select { case <-ctx.Done(): - a.activeSnapshot = nil + r.activeSnapshot = nil return nil default: } @@ -202,7 +205,7 @@ func (a *TargetApplier) applySnapshot(ctx context.Context, snapshot *snapshotBuf for i := 0; i < snapshot.totalChunks; i++ { select { case <-ctx.Done(): - a.activeSnapshot = nil + r.activeSnapshot = nil return nil default: } @@ -210,7 +213,7 @@ func (a *TargetApplier) applySnapshot(ctx context.Context, snapshot *snapshotBuf chunk, ok := snapshot.received[i] if !ok { logger.Error(nil, "missing snapshot chunk", "index", i) - a.activeSnapshot = nil + r.activeSnapshot = nil return nil } allTargets = append(allTargets, chunk...) @@ -226,41 +229,39 @@ func (a *TargetApplier) applySnapshot(ctx context.Context, snapshot *snapshotBuf // a.applyTargets // Replay deferred events - for _, event := range a.deferredEvents { + for _, event := range r.deferredEvents { select { case <-ctx.Done(): return nil default: } - if err := a.applyEvent(ctx, event, logger); err != nil { + if err := r.applyEvent(ctx, event, logger); err != nil { return err } } - a.activeSnapshot = nil - a.deferredEvents = nil + r.activeSnapshot = nil + r.deferredEvents = nil return nil } -func (a *TargetApplier) processEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { +func (r *TargetReconciler) processEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { // If snapshot collecting is active defer events - if a.activeSnapshot != nil { - a.deferredEvents = append(a.deferredEvents, event) + if r.activeSnapshot != nil { + r.deferredEvents = append(r.deferredEvents, event) return nil } // Apply events - return a.applyEvent(ctx, event, logger) + return r.applyEvent(ctx, event, logger) } -func (a *TargetApplier) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { +func (r *TargetReconciler) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { switch event.Event { - case core.CREATE: - logger.Info("Would create target", "name", event.Target.Name, "address", event.Target.Address, "labels", event.Target.Labels) - case core.UPDATE: - logger.Info("Would update target", "name", event.Target.Name, "address", event.Target.Address, "labels", event.Target.Labels) - case core.DELETE: + case core.EventDelete: logger.Info("Would delete target", "name", event.Target.Name) + case core.EventApply: + logger.Info("Would apply target", "name", event.Target.Name, "address", event.Target.Address, "labels", event.Target.Labels) } return nil } diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 232c624..e52b02b 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -30,9 +30,7 @@ import ( gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery" - "github.com/gnmic/operator/internal/controller/discovery/core" - _ "github.com/gnmic/operator/internal/controller/discovery/loaders/all" - "github.com/gnmic/operator/internal/controller/discovery/registry" + discoveryTypes "github.com/gnmic/operator/internal/controller/discovery/core" "github.com/go-logr/logr" ) @@ -65,7 +63,7 @@ type TargetSourceReconciler struct { BufferSize int ChunkSize int - DiscoveryRegistry *registry.Registry[types.NamespacedName, []core.DiscoveryMessage] + DiscoveryRegistry *discovery.Registry[types.NamespacedName, discoveryTypes.DiscoveryRegistryValue] } // +kubebuilder:rbac:groups=operator.gnmic.dev,resources=targetsources,verbs=get;list;watch;create;update;patch;delete @@ -107,7 +105,7 @@ func (r *TargetSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, err } - logger.Info("Discover pipeline started") + logger.Info("Discovery pipeline started") return ctrl.Result{}, nil } @@ -160,57 +158,61 @@ func (r *TargetSourceReconciler) ensureFinalizer(ctx context.Context, targetSour return nil } -// startDiscoveryPipeline creates and starts a discover pipeline for a TargetSource +// startDiscoveryPipeline creates and starts a discovery pipeline for a TargetSource // // Pipeline semantics: -// 1. target-applier is mandatory and must start first +// 1. target reconciler is mandatory and must start first // 2. loader is optional and conditional on spec // 3. Permanent failure of required components shuts down the pipeline // 4. Shutdown ordering: cancel ctx -> wait for goroutines to exit -> close channel -> unregister func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName, targetSource *gnmicv1alpha1.TargetSource, logger logr.Logger) error { + loaderConfigured := targetSource.Spec.Provider != nil + webhookActivated := targetSource.Spec.Webhook.Enabled != nil && *targetSource.Spec.Webhook.Enabled + supervisor := discovery.NewSupervisor(context.Background()) - targetChannel := make(chan []core.DiscoveryMessage, r.BufferSize) - if err := r.DiscoveryRegistry.Register(key, targetChannel); err != nil { + targetChannel := make(chan []discoveryTypes.DiscoveryMessage, r.BufferSize) + if err := r.DiscoveryRegistry.Register(key, discoveryTypes.DiscoveryRegistryValue{ + Channel: targetChannel, + WebhookEnabled: webhookActivated, + }); err != nil { return err } - // Create target applier instance - applier := discovery.NewTargetApplier( + // Create target reconciler instance + targetReconciler := discovery.NewTargetReconciler( r.Client, r.Scheme, targetSource, targetChannel, ) - // Start target applier - applierReady := make(chan struct{}) + // Start target reconciler + reconcilerReady := make(chan struct{}) supervisor.StartSupervisedComponent(discovery.ComponentSpec{ - Name: "target-applier", + Name: "target-reconciler", Policy: discovery.RestartPolicy{ MaxRestarts: pipelineMaxRestarts, Backoff: pipelineBackoff, }, EscalatesOnFailure: true, Run: func(ctx context.Context) error { - close(applierReady) // Signals that applier started successfully - return applier.Run(ctx) + close(reconcilerReady) // Signals that reconciler started successfully + return targetReconciler.Run(ctx) }, }) - // Wait for applier to be ready before starting loader + // Wait for reconciler to be ready before starting loader select { - case <-applierReady: + case <-reconcilerReady: case <-supervisor.Done(): return nil } // Create loader instance - loaderConfigured := targetSource.Spec.Provider != nil - webhookConfigured := targetSource.Spec.Webhook.Enabled != nil if loaderConfigured { loader, err := discovery.NewLoader( key, targetSource.Spec, - core.LoaderConfig{ChunkSize: r.ChunkSize}, + discoveryTypes.LoaderConfig{ChunkSize: r.ChunkSize}, ) if err != nil { supervisor.Stop() @@ -223,7 +225,7 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName MaxRestarts: pipelineMaxRestarts, Backoff: pipelineBackoff, }, - EscalatesOnFailure: !webhookConfigured, + EscalatesOnFailure: !webhookActivated, Run: func(ctx context.Context) error { return loader.Start(ctx, key, targetSource.Spec, targetChannel) },