From 3ba86cb63c45a7f042a2051faca5f8ddfdc5b2ad Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Sun, 26 Apr 2026 19:21:01 -0600 Subject: [PATCH 01/23] restructured loaders package --- .../controller/discovery/loaders/http/{loader.go => http.go} | 0 .../discovery/loaders/http/{loader_test.go => http_test.go} | 0 .../controller/discovery/{loader.go => loaders/loaders.go} | 2 +- internal/controller/targetsource_controller.go | 3 ++- 4 files changed, 3 insertions(+), 2 deletions(-) rename internal/controller/discovery/loaders/http/{loader.go => http.go} (100%) rename internal/controller/discovery/loaders/http/{loader_test.go => http_test.go} (100%) rename internal/controller/discovery/{loader.go => loaders/loaders.go} (97%) diff --git a/internal/controller/discovery/loaders/http/loader.go b/internal/controller/discovery/loaders/http/http.go similarity index 100% rename from internal/controller/discovery/loaders/http/loader.go rename to internal/controller/discovery/loaders/http/http.go diff --git a/internal/controller/discovery/loaders/http/loader_test.go b/internal/controller/discovery/loaders/http/http_test.go similarity index 100% rename from internal/controller/discovery/loaders/http/loader_test.go rename to internal/controller/discovery/loaders/http/http_test.go diff --git a/internal/controller/discovery/loader.go b/internal/controller/discovery/loaders/loaders.go similarity index 97% rename from internal/controller/discovery/loader.go rename to internal/controller/discovery/loaders/loaders.go index 0d8ddd3..45bf9c1 100644 --- a/internal/controller/discovery/loader.go +++ b/internal/controller/discovery/loaders/loaders.go @@ -1,4 +1,4 @@ -package discovery +package loaders import ( "fmt" diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 232c624..77a3a35 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -31,6 +31,7 @@ import ( gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery" "github.com/gnmic/operator/internal/controller/discovery/core" + "github.com/gnmic/operator/internal/controller/discovery/loaders" _ "github.com/gnmic/operator/internal/controller/discovery/loaders/all" "github.com/gnmic/operator/internal/controller/discovery/registry" "github.com/go-logr/logr" @@ -207,7 +208,7 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName loaderConfigured := targetSource.Spec.Provider != nil webhookConfigured := targetSource.Spec.Webhook.Enabled != nil if loaderConfigured { - loader, err := discovery.NewLoader( + loader, err := loaders.NewLoader( key, targetSource.Spec, core.LoaderConfig{ChunkSize: r.ChunkSize}, From d0ac86be2e389e91ef833bf5c278324af2df59bb Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Sun, 26 Apr 2026 19:21:13 -0600 Subject: [PATCH 02/23] restructured target handler --- internal/controller/discovery/client.go | 27 ---- .../{target_applier.go => target_handler.go} | 121 ++++++++++-------- .../controller/targetsource_controller.go | 20 +-- 3 files changed, 80 insertions(+), 88 deletions(-) delete mode 100644 internal/controller/discovery/client.go rename internal/controller/discovery/{target_applier.go => target_handler.go} (66%) diff --git a/internal/controller/discovery/client.go b/internal/controller/discovery/client.go deleted file mode 100644 index 3bc7ef7..0000000 --- a/internal/controller/discovery/client.go +++ /dev/null @@ -1,27 +0,0 @@ -package discovery - -// File may become obsolete, depends on how the logic to compare desired vs. existing state will get implemented - -import ( - "context" - - "sigs.k8s.io/controller-runtime/pkg/client" - - gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" -) - -func FetchExistingTargets(ctx context.Context, c client.Client, ts gnmicv1alpha1.TargetSource) ([]gnmicv1alpha1.Target, error) { - var targetList gnmicv1alpha1.TargetList - - err := c.List(ctx, &targetList, - client.InNamespace(ts.Namespace), - client.MatchingLabels{ - "gnmic.io/source": ts.Name, - }, - ) - if err != nil { - return nil, err - } - - return targetList.Items, nil -} diff --git a/internal/controller/discovery/target_applier.go b/internal/controller/discovery/target_handler.go similarity index 66% rename from internal/controller/discovery/target_applier.go rename to internal/controller/discovery/target_handler.go index 3c714bd..e8c0308 100644 --- a/internal/controller/discovery/target_applier.go +++ b/internal/controller/discovery/target_handler.go @@ -20,8 +20,9 @@ type snapshotBuffer struct { complete bool } -// TargetApplier consumes discovered targets and applies them to Kubernetes -type TargetApplier struct { +// TargetHandler consumes discovered targets and applies them to Kubernetes +type TargetHandler struct { + ctx context.Context client client.Client scheme *runtime.Scheme targetSource *gnmicv1alpha1.TargetSource @@ -32,9 +33,9 @@ type TargetApplier struct { deferredEvents []core.DiscoveryEvent } -// NewTargetApplier wires a TargetApplier instance -func NewTargetApplier(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *TargetApplier { - return &TargetApplier{ +// NewTargetHandler wires a TargetHandler instance +func NewTargetHandler(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *TargetHandler { + return &TargetHandler{ client: c, scheme: s, targetSource: ts, @@ -44,38 +45,40 @@ func NewTargetApplier(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.Targ // Run is a long‑running loop that receives target snapshots // and reconciles Target CRs accordingly -func (a *TargetApplier) Run(ctx context.Context) error { - logger := log.FromContext(ctx). +func (c *TargetHandler) Run(ctx context.Context) error { + c.ctx = ctx + + logger := log.FromContext(c.ctx). WithValues( - "name", a.targetSource.Name, - "namespace", a.targetSource.Namespace, + "name", c.targetSource.Name, + "namespace", c.targetSource.Namespace, ) - logger.Info("target applier started") + logger.Info("target handler started") - for ctx.Err() == nil { + for c.ctx.Err() == nil { select { - case batch, ok := <-a.in: + case batch, ok := <-c.in: if !ok { // Channel closed, pipeline is shutting down - logger.Info("input channel closed, stopping target applier") + logger.Info("input channel closed, stopping target handler") return nil } - a.queue = append(a.queue, batch...) + c.queue = append(c.queue, batch...) case <-ctx.Done(): - logger.Info("context canceled, stopping target applier") + logger.Info("context canceled, stopping target handler") return nil } - for len(a.queue) > 0 { + for len(c.queue) > 0 { if ctx.Err() != nil { return nil // why return nil? } - msg := a.queue[0] - a.queue = a.queue[1:] + msg := c.queue[0] + c.queue = c.queue[1:] - if err := a.processMessage(ctx, msg, logger); err != nil { + if err := c.processMessage(c.ctx, msg, logger); err != nil { // Returning error lets the supervisor (controller) // tear down and restart the pipeline via reconciliation // Q: when to return an error vs just log and continue? @@ -85,11 +88,11 @@ func (a *TargetApplier) Run(ctx context.Context) error { } } - logger.Info("target applier stopped") + logger.Info("target handler stopped") return nil } -func (a *TargetApplier) processMessage(ctx context.Context, message core.DiscoveryMessage, logger logr.Logger) error { +func (c *TargetHandler) processMessage(ctx context.Context, message core.DiscoveryMessage, logger logr.Logger) error { if err := ctx.Err(); err != nil { return err } @@ -104,7 +107,7 @@ func (a *TargetApplier) processMessage(ctx context.Context, message core.Discove "index", msg.ChunkIndex, "targetCount", len(msg.Targets), ) - return a.processSnapshot(ctx, msg, logger) + return c.processSnapshot(ctx, msg, logger) case core.DiscoveryEvent: // Process individual event-driven update @@ -112,7 +115,7 @@ func (a *TargetApplier) processMessage(ctx context.Context, message core.Discove "received discovery event", "target", msg.Target.Name, ) - return a.processEvent(ctx, msg, logger) + return c.processEvent(ctx, msg, logger) default: return fmt.Errorf("unknonw discovery message type %T", msg) @@ -120,18 +123,18 @@ func (a *TargetApplier) processMessage(ctx context.Context, message core.Discove } // processSnapshot takes a complete snapshot of discovered targets and reconciles Target CRs accordingly -func (a *TargetApplier) processSnapshot(ctx context.Context, chunk core.DiscoverySnapshot, logger logr.Logger) error { - if a.activeSnapshot == nil { - a.startNewSnapshot(chunk, logger) +func (c *TargetHandler) processSnapshot(ctx context.Context, chunk core.DiscoverySnapshot, logger logr.Logger) error { + if c.activeSnapshot == nil { + c.startNewSnapshot(chunk, logger) return nil } - snapshot := a.activeSnapshot + snapshot := c.activeSnapshot // Check if a new snapshot arrived if snapshot.snapshotID != chunk.SnapshotID { // If current snapshot is complete apply it first if snapshot.complete { - if err := a.applySnapshot(ctx, snapshot, logger); err != nil { + if err := c.applySnapshot(ctx, snapshot, logger); err != nil { return err } } else { @@ -144,40 +147,40 @@ func (a *TargetApplier) processSnapshot(ctx context.Context, chunk core.Discover } // Start collecting the new snapshot - a.startNewSnapshot(chunk, logger) + c.startNewSnapshot(chunk, logger) return nil } - return a.collectSnapshot(chunk, logger) + return c.collectSnapshot(chunk, logger) } -func (a *TargetApplier) startNewSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) { - a.activeSnapshot = &snapshotBuffer{ +func (c *TargetHandler) startNewSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) { + c.activeSnapshot = &snapshotBuffer{ snapshotID: chunk.SnapshotID, totalChunks: chunk.TotalChunks, received: make(map[int][]core.DiscoveredTarget), complete: false, } // Delete buffered events that will be current with new snapshot - a.deferredEvents = nil + c.deferredEvents = nil - a.collectSnapshot(chunk, logger) + c.collectSnapshot(chunk, logger) } -func (a *TargetApplier) collectSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) error { - snapshot := a.activeSnapshot +func (c *TargetHandler) collectSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) error { + snapshot := c.activeSnapshot if chunk.TotalChunks != snapshot.totalChunks { logger.Error(nil, "snapshot totalChunks mismatch", "snapshotID", snapshot.snapshotID) } if chunk.ChunkIndex < 0 || chunk.ChunkIndex >= snapshot.totalChunks { logger.Error(nil, "snapshot chunk index out of range", "index", chunk.ChunkIndex) - a.activeSnapshot = nil + c.activeSnapshot = nil return nil } if _, exists := snapshot.received[chunk.ChunkIndex]; exists { logger.Error(nil, "duplicate snapshot chunk", "index", chunk.ChunkIndex) - a.activeSnapshot = nil + c.activeSnapshot = nil return nil } @@ -190,10 +193,10 @@ func (a *TargetApplier) collectSnapshot(chunk core.DiscoverySnapshot, logger log return nil } -func (a *TargetApplier) applySnapshot(ctx context.Context, snapshot *snapshotBuffer, logger logr.Logger) error { +func (c *TargetHandler) applySnapshot(ctx context.Context, snapshot *snapshotBuffer, logger logr.Logger) error { select { case <-ctx.Done(): - a.activeSnapshot = nil + c.activeSnapshot = nil return nil default: } @@ -202,7 +205,7 @@ func (a *TargetApplier) applySnapshot(ctx context.Context, snapshot *snapshotBuf for i := 0; i < snapshot.totalChunks; i++ { select { case <-ctx.Done(): - a.activeSnapshot = nil + c.activeSnapshot = nil return nil default: } @@ -210,7 +213,7 @@ func (a *TargetApplier) applySnapshot(ctx context.Context, snapshot *snapshotBuf chunk, ok := snapshot.received[i] if !ok { logger.Error(nil, "missing snapshot chunk", "index", i) - a.activeSnapshot = nil + c.activeSnapshot = nil return nil } allTargets = append(allTargets, chunk...) @@ -226,34 +229,34 @@ func (a *TargetApplier) applySnapshot(ctx context.Context, snapshot *snapshotBuf // a.applyTargets // Replay deferred events - for _, event := range a.deferredEvents { + for _, event := range c.deferredEvents { select { case <-ctx.Done(): return nil default: } - if err := a.applyEvent(ctx, event, logger); err != nil { + if err := c.applyEvent(ctx, event, logger); err != nil { return err } } - a.activeSnapshot = nil - a.deferredEvents = nil + c.activeSnapshot = nil + c.deferredEvents = nil return nil } -func (a *TargetApplier) processEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { +func (c *TargetHandler) processEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { // If snapshot collecting is active defer events - if a.activeSnapshot != nil { - a.deferredEvents = append(a.deferredEvents, event) + if c.activeSnapshot != nil { + c.deferredEvents = append(c.deferredEvents, event) return nil } // Apply events - return a.applyEvent(ctx, event, logger) + return c.applyEvent(ctx, event, logger) } -func (a *TargetApplier) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { +func (c *TargetHandler) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { switch event.Event { case core.CREATE: logger.Info("Would create target", "name", event.Target.Name, "address", event.Target.Address, "labels", event.Target.Labels) @@ -264,3 +267,19 @@ func (a *TargetApplier) applyEvent(ctx context.Context, event core.DiscoveryEven } return nil } + +func (c *TargetHandler) fetchExistingTargets() ([]gnmicv1alpha1.Target, error) { + var targetList gnmicv1alpha1.TargetList + + err := c.client.List(c.ctx, &targetList, + client.InNamespace(c.targetSource.Namespace), + client.MatchingLabels{ + "gnmic.io/source": c.targetSource.Name, + }, + ) + if err != nil { + return nil, err + } + + return targetList.Items, nil +} diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 77a3a35..4d5f400 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -164,7 +164,7 @@ func (r *TargetSourceReconciler) ensureFinalizer(ctx context.Context, targetSour // startDiscoveryPipeline creates and starts a discover pipeline for a TargetSource // // Pipeline semantics: -// 1. target-applier is mandatory and must start first +// 1. target-handler is mandatory and must start first // 2. loader is optional and conditional on spec // 3. Permanent failure of required components shuts down the pipeline // 4. Shutdown ordering: cancel ctx -> wait for goroutines to exit -> close channel -> unregister @@ -176,30 +176,30 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName return err } - // Create target applier instance - applier := discovery.NewTargetApplier( + // Create target targetHandler instance + targetHandler := discovery.NewTargetHandler( r.Client, r.Scheme, targetSource, targetChannel, ) - // Start target applier - applierReady := make(chan struct{}) + // Start target handler + handlerReady := make(chan struct{}) supervisor.StartSupervisedComponent(discovery.ComponentSpec{ - Name: "target-applier", + Name: "target-handler", Policy: discovery.RestartPolicy{ MaxRestarts: pipelineMaxRestarts, Backoff: pipelineBackoff, }, EscalatesOnFailure: true, Run: func(ctx context.Context) error { - close(applierReady) // Signals that applier started successfully - return applier.Run(ctx) + close(handlerReady) // Signals that handler started successfully + return targetHandler.Run(ctx) }, }) - // Wait for applier to be ready before starting loader + // Wait for handler to be ready before starting loader select { - case <-applierReady: + case <-handlerReady: case <-supervisor.Done(): return nil } From 7ef1281a7b37bd8b9a845501f7011c615710429b Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Mon, 27 Apr 2026 10:10:08 -0600 Subject: [PATCH 03/23] renamed target applier to message processor & created client.go for generic functions --- internal/controller/discovery/client.go | 25 ++++ ...target_handler.go => message_processor.go} | 112 ++++++++---------- .../controller/targetsource_controller.go | 2 +- 3 files changed, 74 insertions(+), 65 deletions(-) create mode 100644 internal/controller/discovery/client.go rename internal/controller/discovery/{target_handler.go => message_processor.go} (63%) diff --git a/internal/controller/discovery/client.go b/internal/controller/discovery/client.go new file mode 100644 index 0000000..72147b7 --- /dev/null +++ b/internal/controller/discovery/client.go @@ -0,0 +1,25 @@ +package discovery + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/client" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" +) + +func fetchExistingTargets(ctx context.Context, c client.Client, ts *gnmicv1alpha1.TargetSource) ([]gnmicv1alpha1.Target, error) { + var targetList gnmicv1alpha1.TargetList + + err := c.List(ctx, &targetList, + client.InNamespace(ts.Namespace), + client.MatchingLabels{ + "gnmic.io/source": ts.Name, + }, + ) + if err != nil { + return nil, err + } + + return targetList.Items, nil +} diff --git a/internal/controller/discovery/target_handler.go b/internal/controller/discovery/message_processor.go similarity index 63% rename from internal/controller/discovery/target_handler.go rename to internal/controller/discovery/message_processor.go index e8c0308..65c8b44 100644 --- a/internal/controller/discovery/target_handler.go +++ b/internal/controller/discovery/message_processor.go @@ -20,8 +20,8 @@ type snapshotBuffer struct { complete bool } -// TargetHandler consumes discovered targets and applies them to Kubernetes -type TargetHandler struct { +// MessageProcessor consumes discovered targets and applies them to Kubernetes +type MessageProcessor struct { ctx context.Context client client.Client scheme *runtime.Scheme @@ -33,9 +33,9 @@ type TargetHandler struct { deferredEvents []core.DiscoveryEvent } -// NewTargetHandler wires a TargetHandler instance -func NewTargetHandler(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *TargetHandler { - return &TargetHandler{ +// NewMessageProcessor wires a MessageProcessor instance +func NewMessageProcessor(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *MessageProcessor { + return &MessageProcessor{ client: c, scheme: s, targetSource: ts, @@ -45,40 +45,40 @@ func NewTargetHandler(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.Targ // Run is a long‑running loop that receives target snapshots // and reconciles Target CRs accordingly -func (c *TargetHandler) Run(ctx context.Context) error { - c.ctx = ctx +func (m *MessageProcessor) Run(ctx context.Context) error { + m.ctx = ctx - logger := log.FromContext(c.ctx). + logger := log.FromContext(m.ctx). WithValues( - "name", c.targetSource.Name, - "namespace", c.targetSource.Namespace, + "name", m.targetSource.Name, + "namespace", m.targetSource.Namespace, ) logger.Info("target handler started") - for c.ctx.Err() == nil { + for m.ctx.Err() == nil { select { - case batch, ok := <-c.in: + case batch, ok := <-m.in: if !ok { // Channel closed, pipeline is shutting down logger.Info("input channel closed, stopping target handler") return nil } - c.queue = append(c.queue, batch...) + m.queue = append(m.queue, batch...) case <-ctx.Done(): logger.Info("context canceled, stopping target handler") return nil } - for len(c.queue) > 0 { + for len(m.queue) > 0 { if ctx.Err() != nil { return nil // why return nil? } - msg := c.queue[0] - c.queue = c.queue[1:] + msg := m.queue[0] + m.queue = m.queue[1:] - if err := c.processMessage(c.ctx, msg, logger); err != nil { + if err := m.processMessage(m.ctx, msg, logger); err != nil { // Returning error lets the supervisor (controller) // tear down and restart the pipeline via reconciliation // Q: when to return an error vs just log and continue? @@ -92,7 +92,7 @@ func (c *TargetHandler) Run(ctx context.Context) error { return nil } -func (c *TargetHandler) processMessage(ctx context.Context, message core.DiscoveryMessage, logger logr.Logger) error { +func (m *MessageProcessor) processMessage(ctx context.Context, message core.DiscoveryMessage, logger logr.Logger) error { if err := ctx.Err(); err != nil { return err } @@ -107,7 +107,7 @@ func (c *TargetHandler) processMessage(ctx context.Context, message core.Discove "index", msg.ChunkIndex, "targetCount", len(msg.Targets), ) - return c.processSnapshot(ctx, msg, logger) + return m.processSnapshot(ctx, msg, logger) case core.DiscoveryEvent: // Process individual event-driven update @@ -115,7 +115,7 @@ func (c *TargetHandler) processMessage(ctx context.Context, message core.Discove "received discovery event", "target", msg.Target.Name, ) - return c.processEvent(ctx, msg, logger) + return m.processEvent(ctx, msg, logger) default: return fmt.Errorf("unknonw discovery message type %T", msg) @@ -123,18 +123,18 @@ func (c *TargetHandler) processMessage(ctx context.Context, message core.Discove } // processSnapshot takes a complete snapshot of discovered targets and reconciles Target CRs accordingly -func (c *TargetHandler) processSnapshot(ctx context.Context, chunk core.DiscoverySnapshot, logger logr.Logger) error { - if c.activeSnapshot == nil { - c.startNewSnapshot(chunk, logger) +func (m *MessageProcessor) processSnapshot(ctx context.Context, chunk core.DiscoverySnapshot, logger logr.Logger) error { + if m.activeSnapshot == nil { + m.startNewSnapshot(chunk, logger) return nil } - snapshot := c.activeSnapshot + snapshot := m.activeSnapshot // Check if a new snapshot arrived if snapshot.snapshotID != chunk.SnapshotID { // If current snapshot is complete apply it first if snapshot.complete { - if err := c.applySnapshot(ctx, snapshot, logger); err != nil { + if err := m.applySnapshot(ctx, snapshot, logger); err != nil { return err } } else { @@ -147,40 +147,40 @@ func (c *TargetHandler) processSnapshot(ctx context.Context, chunk core.Discover } // Start collecting the new snapshot - c.startNewSnapshot(chunk, logger) + m.startNewSnapshot(chunk, logger) return nil } - return c.collectSnapshot(chunk, logger) + return m.collectSnapshot(chunk, logger) } -func (c *TargetHandler) startNewSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) { - c.activeSnapshot = &snapshotBuffer{ +func (m *MessageProcessor) startNewSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) { + m.activeSnapshot = &snapshotBuffer{ snapshotID: chunk.SnapshotID, totalChunks: chunk.TotalChunks, received: make(map[int][]core.DiscoveredTarget), complete: false, } // Delete buffered events that will be current with new snapshot - c.deferredEvents = nil + m.deferredEvents = nil - c.collectSnapshot(chunk, logger) + m.collectSnapshot(chunk, logger) } -func (c *TargetHandler) collectSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) error { - snapshot := c.activeSnapshot +func (m *MessageProcessor) collectSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) error { + snapshot := m.activeSnapshot if chunk.TotalChunks != snapshot.totalChunks { logger.Error(nil, "snapshot totalChunks mismatch", "snapshotID", snapshot.snapshotID) } if chunk.ChunkIndex < 0 || chunk.ChunkIndex >= snapshot.totalChunks { logger.Error(nil, "snapshot chunk index out of range", "index", chunk.ChunkIndex) - c.activeSnapshot = nil + m.activeSnapshot = nil return nil } if _, exists := snapshot.received[chunk.ChunkIndex]; exists { logger.Error(nil, "duplicate snapshot chunk", "index", chunk.ChunkIndex) - c.activeSnapshot = nil + m.activeSnapshot = nil return nil } @@ -193,10 +193,10 @@ func (c *TargetHandler) collectSnapshot(chunk core.DiscoverySnapshot, logger log return nil } -func (c *TargetHandler) applySnapshot(ctx context.Context, snapshot *snapshotBuffer, logger logr.Logger) error { +func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshotBuffer, logger logr.Logger) error { select { case <-ctx.Done(): - c.activeSnapshot = nil + m.activeSnapshot = nil return nil default: } @@ -205,7 +205,7 @@ func (c *TargetHandler) applySnapshot(ctx context.Context, snapshot *snapshotBuf for i := 0; i < snapshot.totalChunks; i++ { select { case <-ctx.Done(): - c.activeSnapshot = nil + m.activeSnapshot = nil return nil default: } @@ -213,7 +213,7 @@ func (c *TargetHandler) applySnapshot(ctx context.Context, snapshot *snapshotBuf chunk, ok := snapshot.received[i] if !ok { logger.Error(nil, "missing snapshot chunk", "index", i) - c.activeSnapshot = nil + m.activeSnapshot = nil return nil } allTargets = append(allTargets, chunk...) @@ -229,34 +229,34 @@ func (c *TargetHandler) applySnapshot(ctx context.Context, snapshot *snapshotBuf // a.applyTargets // Replay deferred events - for _, event := range c.deferredEvents { + for _, event := range m.deferredEvents { select { case <-ctx.Done(): return nil default: } - if err := c.applyEvent(ctx, event, logger); err != nil { + if err := m.applyEvent(ctx, event, logger); err != nil { return err } } - c.activeSnapshot = nil - c.deferredEvents = nil + m.activeSnapshot = nil + m.deferredEvents = nil return nil } -func (c *TargetHandler) processEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { +func (m *MessageProcessor) processEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { // If snapshot collecting is active defer events - if c.activeSnapshot != nil { - c.deferredEvents = append(c.deferredEvents, event) + if m.activeSnapshot != nil { + m.deferredEvents = append(m.deferredEvents, event) return nil } // Apply events - return c.applyEvent(ctx, event, logger) + return m.applyEvent(ctx, event, logger) } -func (c *TargetHandler) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { +func (m *MessageProcessor) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { switch event.Event { case core.CREATE: logger.Info("Would create target", "name", event.Target.Name, "address", event.Target.Address, "labels", event.Target.Labels) @@ -267,19 +267,3 @@ func (c *TargetHandler) applyEvent(ctx context.Context, event core.DiscoveryEven } return nil } - -func (c *TargetHandler) fetchExistingTargets() ([]gnmicv1alpha1.Target, error) { - var targetList gnmicv1alpha1.TargetList - - err := c.client.List(c.ctx, &targetList, - client.InNamespace(c.targetSource.Namespace), - client.MatchingLabels{ - "gnmic.io/source": c.targetSource.Name, - }, - ) - if err != nil { - return nil, err - } - - return targetList.Items, nil -} diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 4d5f400..8070a3a 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -177,7 +177,7 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName } // Create target targetHandler instance - targetHandler := discovery.NewTargetHandler( + targetHandler := discovery.NewMessageProcessor( r.Client, r.Scheme, targetSource, From d10fc9ac868d50be64c123cbc619b2f4eb189682 Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Mon, 27 Apr 2026 11:26:10 -0600 Subject: [PATCH 04/23] removed all package --- internal/controller/discovery/loaders/all/all.go | 5 ----- internal/controller/targetsource_controller.go | 1 - 2 files changed, 6 deletions(-) delete mode 100644 internal/controller/discovery/loaders/all/all.go diff --git a/internal/controller/discovery/loaders/all/all.go b/internal/controller/discovery/loaders/all/all.go deleted file mode 100644 index 3590cda..0000000 --- a/internal/controller/discovery/loaders/all/all.go +++ /dev/null @@ -1,5 +0,0 @@ -package all - -import ( - _ "github.com/gnmic/operator/internal/controller/discovery/loaders/http" -) diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 8070a3a..49f9683 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -32,7 +32,6 @@ import ( "github.com/gnmic/operator/internal/controller/discovery" "github.com/gnmic/operator/internal/controller/discovery/core" "github.com/gnmic/operator/internal/controller/discovery/loaders" - _ "github.com/gnmic/operator/internal/controller/discovery/loaders/all" "github.com/gnmic/operator/internal/controller/discovery/registry" "github.com/go-logr/logr" ) From b7dd0367e99a0c5435db00092c83e1bc01ab439b Mon Sep 17 00:00:00 2001 From: Daniel Schatzmann Date: Wed, 29 Apr 2026 08:53:30 +0000 Subject: [PATCH 05/23] remove unused fiels --- internal/controller/discovery/mapper.go | 4 ---- internal/controller/discovery/mapper_test.go | 1 - 2 files changed, 5 deletions(-) delete mode 100644 internal/controller/discovery/mapper.go delete mode 100644 internal/controller/discovery/mapper_test.go diff --git a/internal/controller/discovery/mapper.go b/internal/controller/discovery/mapper.go deleted file mode 100644 index 18470b2..0000000 --- a/internal/controller/discovery/mapper.go +++ /dev/null @@ -1,4 +0,0 @@ -package discovery - -// This file makes diff between existing and new targets -// file decides which targets to create/update/delete diff --git a/internal/controller/discovery/mapper_test.go b/internal/controller/discovery/mapper_test.go deleted file mode 100644 index 5844159..0000000 --- a/internal/controller/discovery/mapper_test.go +++ /dev/null @@ -1 +0,0 @@ -package discovery From d3a9b5ca3021c9f0485698c1d1c54bbd3562bb9b Mon Sep 17 00:00:00 2001 From: Daniel Schatzmann Date: Wed, 29 Apr 2026 11:56:56 +0000 Subject: [PATCH 06/23] rename files and restructure packages --- .../core/{loader_interface.go => loader.go} | 0 .../core/{message_interface.go => message.go} | 0 .../discovery/core/{helpers.go => send.go} | 0 internal/controller/discovery/core/types.go | 4 ++-- internal/controller/discovery/discovery.go | 17 +++++++++++++++++ .../loaders/{loaders.go => factory.go} | 0 .../loaders/http/{http.go => loader.go} | 0 .../http/{http_test.go => loader_test.go} | 0 .../discovery/{ => pipeline}/supervisor.go | 5 +++-- .../discovery/{ => reconciler}/client.go | 13 ++++++++++--- .../{ => reconciler}/message_processor.go | 2 +- internal/controller/targetsource_controller.go | 15 ++++++++------- 12 files changed, 41 insertions(+), 15 deletions(-) rename internal/controller/discovery/core/{loader_interface.go => loader.go} (100%) rename internal/controller/discovery/core/{message_interface.go => message.go} (100%) rename internal/controller/discovery/core/{helpers.go => send.go} (100%) create mode 100644 internal/controller/discovery/discovery.go rename internal/controller/discovery/loaders/{loaders.go => factory.go} (100%) rename internal/controller/discovery/loaders/http/{http.go => loader.go} (100%) rename internal/controller/discovery/loaders/http/{http_test.go => loader_test.go} (100%) rename internal/controller/discovery/{ => pipeline}/supervisor.go (95%) rename internal/controller/discovery/{ => reconciler}/client.go (68%) rename internal/controller/discovery/{ => reconciler}/message_processor.go (99%) diff --git a/internal/controller/discovery/core/loader_interface.go b/internal/controller/discovery/core/loader.go similarity index 100% rename from internal/controller/discovery/core/loader_interface.go rename to internal/controller/discovery/core/loader.go diff --git a/internal/controller/discovery/core/message_interface.go b/internal/controller/discovery/core/message.go similarity index 100% rename from internal/controller/discovery/core/message_interface.go rename to internal/controller/discovery/core/message.go diff --git a/internal/controller/discovery/core/helpers.go b/internal/controller/discovery/core/send.go similarity index 100% rename from internal/controller/discovery/core/helpers.go rename to internal/controller/discovery/core/send.go diff --git a/internal/controller/discovery/core/types.go b/internal/controller/discovery/core/types.go index 3f6957a..28ec503 100644 --- a/internal/controller/discovery/core/types.go +++ b/internal/controller/discovery/core/types.go @@ -12,14 +12,14 @@ type DiscoveredTarget struct { Labels map[string]string } +type EventAction int + const ( DELETE EventAction = 0 CREATE EventAction = 1 UPDATE EventAction = 2 ) -type EventAction int - type DiscoveryEvent struct { Target DiscoveredTarget Event EventAction diff --git a/internal/controller/discovery/discovery.go b/internal/controller/discovery/discovery.go new file mode 100644 index 0000000..3dc51bd --- /dev/null +++ b/internal/controller/discovery/discovery.go @@ -0,0 +1,17 @@ +package discovery + +// Package discovery implements the discovery runtime subsystem. +// +// The discovery subsystem is responsible for: +// - Receiving discovery data from external providers (loaders, webhooks). +// - Supervising discovery pipelines and restart semantics. +// - Applying discovered state to Kubernetes Targets. +// +// The package is structured into the following subpackages: +// - core: message contracts, snapshot/event types, and transport helpers. +// - pipeline: supervision, restart policies, and lifecycle control. +// - reconciler: snapshot + event target state application logic. +// - loaders: target discovery providers (HTTP, webhook, etc.). +// - registry: key -> channel registry. +// +// At the moment, the targetsource controller imports specific subpackages explicitly. diff --git a/internal/controller/discovery/loaders/loaders.go b/internal/controller/discovery/loaders/factory.go similarity index 100% rename from internal/controller/discovery/loaders/loaders.go rename to internal/controller/discovery/loaders/factory.go diff --git a/internal/controller/discovery/loaders/http/http.go b/internal/controller/discovery/loaders/http/loader.go similarity index 100% rename from internal/controller/discovery/loaders/http/http.go rename to internal/controller/discovery/loaders/http/loader.go diff --git a/internal/controller/discovery/loaders/http/http_test.go b/internal/controller/discovery/loaders/http/loader_test.go similarity index 100% rename from internal/controller/discovery/loaders/http/http_test.go rename to internal/controller/discovery/loaders/http/loader_test.go diff --git a/internal/controller/discovery/supervisor.go b/internal/controller/discovery/pipeline/supervisor.go similarity index 95% rename from internal/controller/discovery/supervisor.go rename to internal/controller/discovery/pipeline/supervisor.go index 710381e..042d305 100644 --- a/internal/controller/discovery/supervisor.go +++ b/internal/controller/discovery/pipeline/supervisor.go @@ -1,4 +1,4 @@ -package discovery +package pipeline import ( "context" @@ -25,12 +25,13 @@ type Supervisor struct { stopped bool } -// RestartPolicy defines the restart behavior for a component +// RestartPolicy defines restart behavior of a component type RestartPolicy struct { MaxRestarts int Backoff time.Duration } +// ComponentSpec defines a supervised component type ComponentSpec struct { Name string Run func(ctx context.Context) error diff --git a/internal/controller/discovery/client.go b/internal/controller/discovery/reconciler/client.go similarity index 68% rename from internal/controller/discovery/client.go rename to internal/controller/discovery/reconciler/client.go index 25100bd..4bbbbc1 100644 --- a/internal/controller/discovery/client.go +++ b/internal/controller/discovery/reconciler/client.go @@ -1,4 +1,4 @@ -package discovery +package reconciler import ( "context" @@ -9,10 +9,17 @@ import ( "github.com/gnmic/operator/internal/controller/discovery/core" ) -func fetchExistingTargets(ctx context.Context, c client.Client, ts *gnmicv1alpha1.TargetSource) ([]gnmicv1alpha1.Target, error) { +func fetchExistingTargets( + ctx context.Context, + c client.Client, + ts *gnmicv1alpha1.TargetSource, +) ([]gnmicv1alpha1.Target, error) { + var targetList gnmicv1alpha1.TargetList - err := c.List(ctx, &targetList, + err := c.List( + ctx, + &targetList, client.InNamespace(ts.Namespace), client.MatchingLabels{ core.LabelTargetSourceName: ts.Name, diff --git a/internal/controller/discovery/message_processor.go b/internal/controller/discovery/reconciler/message_processor.go similarity index 99% rename from internal/controller/discovery/message_processor.go rename to internal/controller/discovery/reconciler/message_processor.go index 65c8b44..0c205bd 100644 --- a/internal/controller/discovery/message_processor.go +++ b/internal/controller/discovery/reconciler/message_processor.go @@ -1,4 +1,4 @@ -package discovery +package reconciler import ( "context" diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 49f9683..35946d2 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -29,9 +29,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" - "github.com/gnmic/operator/internal/controller/discovery" "github.com/gnmic/operator/internal/controller/discovery/core" "github.com/gnmic/operator/internal/controller/discovery/loaders" + "github.com/gnmic/operator/internal/controller/discovery/pipeline" + "github.com/gnmic/operator/internal/controller/discovery/reconciler" "github.com/gnmic/operator/internal/controller/discovery/registry" "github.com/go-logr/logr" ) @@ -168,7 +169,7 @@ func (r *TargetSourceReconciler) ensureFinalizer(ctx context.Context, targetSour // 3. Permanent failure of required components shuts down the pipeline // 4. Shutdown ordering: cancel ctx -> wait for goroutines to exit -> close channel -> unregister func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName, targetSource *gnmicv1alpha1.TargetSource, logger logr.Logger) error { - supervisor := discovery.NewSupervisor(context.Background()) + supervisor := pipeline.NewSupervisor(context.Background()) targetChannel := make(chan []core.DiscoveryMessage, r.BufferSize) if err := r.DiscoveryRegistry.Register(key, targetChannel); err != nil { @@ -176,7 +177,7 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName } // Create target targetHandler instance - targetHandler := discovery.NewMessageProcessor( + targetHandler := reconciler.NewMessageProcessor( r.Client, r.Scheme, targetSource, @@ -184,9 +185,9 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName ) // Start target handler handlerReady := make(chan struct{}) - supervisor.StartSupervisedComponent(discovery.ComponentSpec{ + supervisor.StartSupervisedComponent(pipeline.ComponentSpec{ Name: "target-handler", - Policy: discovery.RestartPolicy{ + Policy: pipeline.RestartPolicy{ MaxRestarts: pipelineMaxRestarts, Backoff: pipelineBackoff, }, @@ -217,9 +218,9 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName return err } - supervisor.StartSupervisedComponent(discovery.ComponentSpec{ + supervisor.StartSupervisedComponent(pipeline.ComponentSpec{ Name: "loader", - Policy: discovery.RestartPolicy{ + Policy: pipeline.RestartPolicy{ MaxRestarts: pipelineMaxRestarts, Backoff: pipelineBackoff, }, From 0c80394ab358c662fe519b872ed7219c2f7e384c Mon Sep 17 00:00:00 2001 From: Daniel Schatzmann Date: Wed, 29 Apr 2026 12:01:40 +0000 Subject: [PATCH 07/23] rename target handler to target reconciler --- .../discovery/reconciler/message_processor.go | 8 ++++---- internal/controller/targetsource_controller.go | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/controller/discovery/reconciler/message_processor.go b/internal/controller/discovery/reconciler/message_processor.go index 0c205bd..2c4632c 100644 --- a/internal/controller/discovery/reconciler/message_processor.go +++ b/internal/controller/discovery/reconciler/message_processor.go @@ -53,20 +53,20 @@ func (m *MessageProcessor) Run(ctx context.Context) error { "name", m.targetSource.Name, "namespace", m.targetSource.Namespace, ) - logger.Info("target handler started") + logger.Info("target reconciler started") for m.ctx.Err() == nil { select { case batch, ok := <-m.in: if !ok { // Channel closed, pipeline is shutting down - logger.Info("input channel closed, stopping target handler") + logger.Info("input channel closed, stopping target reconciler") return nil } m.queue = append(m.queue, batch...) case <-ctx.Done(): - logger.Info("context canceled, stopping target handler") + logger.Info("context canceled, stopping target reconciler") return nil } @@ -88,7 +88,7 @@ func (m *MessageProcessor) Run(ctx context.Context) error { } } - logger.Info("target handler stopped") + logger.Info("target reconciler stopped") return nil } diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 35946d2..6c9ad31 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -164,7 +164,7 @@ func (r *TargetSourceReconciler) ensureFinalizer(ctx context.Context, targetSour // startDiscoveryPipeline creates and starts a discover pipeline for a TargetSource // // Pipeline semantics: -// 1. target-handler is mandatory and must start first +// 1. target reconciler is mandatory and must start first // 2. loader is optional and conditional on spec // 3. Permanent failure of required components shuts down the pipeline // 4. Shutdown ordering: cancel ctx -> wait for goroutines to exit -> close channel -> unregister @@ -176,14 +176,14 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName return err } - // Create target targetHandler instance - targetHandler := reconciler.NewMessageProcessor( + // Create target reconciler instance + targetReconciler := reconciler.NewMessageProcessor( r.Client, r.Scheme, targetSource, targetChannel, ) - // Start target handler + // Start target reconciler handlerReady := make(chan struct{}) supervisor.StartSupervisedComponent(pipeline.ComponentSpec{ Name: "target-handler", @@ -194,7 +194,7 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName EscalatesOnFailure: true, Run: func(ctx context.Context) error { close(handlerReady) // Signals that handler started successfully - return targetHandler.Run(ctx) + return targetReconciler.Run(ctx) }, }) // Wait for handler to be ready before starting loader From 04208bf078b170160a6ef72eda6b6ddaa3630070 Mon Sep 17 00:00:00 2001 From: Daniel Schatzmann Date: Wed, 29 Apr 2026 12:20:58 +0000 Subject: [PATCH 08/23] rename handler to reconciler --- internal/controller/targetsource_controller.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 6c9ad31..9078af2 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -184,22 +184,22 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName targetChannel, ) // Start target reconciler - handlerReady := make(chan struct{}) + reconcilerReady := make(chan struct{}) supervisor.StartSupervisedComponent(pipeline.ComponentSpec{ - Name: "target-handler", + Name: "target-reconciler", Policy: pipeline.RestartPolicy{ MaxRestarts: pipelineMaxRestarts, Backoff: pipelineBackoff, }, EscalatesOnFailure: true, Run: func(ctx context.Context) error { - close(handlerReady) // Signals that handler started successfully + close(reconcilerReady) // Signals that reconciler started successfully return targetReconciler.Run(ctx) }, }) - // Wait for handler to be ready before starting loader + // Wait for reconciler to be ready before starting loader select { - case <-handlerReady: + case <-reconcilerReady: case <-supervisor.Done(): return nil } From c3818ce6f7693360496866d7ba1694f7ce702f32 Mon Sep 17 00:00:00 2001 From: Daniel Schatzmann Date: Wed, 29 Apr 2026 12:21:46 +0000 Subject: [PATCH 09/23] clarify interface files --- .../discovery/core/{loader.go => loader_interface.go} | 2 +- internal/controller/discovery/core/message.go | 4 ---- internal/controller/discovery/core/message_interface.go | 5 +++++ 3 files changed, 6 insertions(+), 5 deletions(-) rename internal/controller/discovery/core/{loader.go => loader_interface.go} (91%) create mode 100644 internal/controller/discovery/core/message_interface.go diff --git a/internal/controller/discovery/core/loader.go b/internal/controller/discovery/core/loader_interface.go similarity index 91% rename from internal/controller/discovery/core/loader.go rename to internal/controller/discovery/core/loader_interface.go index 8964be8..72f1898 100644 --- a/internal/controller/discovery/core/loader.go +++ b/internal/controller/discovery/core/loader_interface.go @@ -14,7 +14,7 @@ type Loader interface { Name() string // Start begins discovery and pushes target snapshots or events into the out channel - // The loader must stop cleanly when ctx is cancelled + // The loader must stop cleanly when ctx is canceled Start( ctx context.Context, targetsourceName types.NamespacedName, diff --git a/internal/controller/discovery/core/message.go b/internal/controller/discovery/core/message.go index 0836bc6..af4f6c1 100644 --- a/internal/controller/discovery/core/message.go +++ b/internal/controller/discovery/core/message.go @@ -1,8 +1,4 @@ package core -type DiscoveryMessage interface { - isDiscoveryMessage() -} - func (DiscoveryEvent) isDiscoveryMessage() {} func (DiscoverySnapshot) isDiscoveryMessage() {} diff --git a/internal/controller/discovery/core/message_interface.go b/internal/controller/discovery/core/message_interface.go new file mode 100644 index 0000000..07b819e --- /dev/null +++ b/internal/controller/discovery/core/message_interface.go @@ -0,0 +1,5 @@ +package core + +type DiscoveryMessage interface { + isDiscoveryMessage() +} From e4df0d4a6245d71d48539414b0f3ab45136de874 Mon Sep 17 00:00:00 2001 From: Daniel Schatzmann Date: Wed, 29 Apr 2026 12:35:14 +0000 Subject: [PATCH 10/23] define EventAction to be go idomatic --- internal/controller/discovery/core/types.go | 20 +++++++++++-------- .../discovery/reconciler/message_processor.go | 6 +++--- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/internal/controller/discovery/core/types.go b/internal/controller/discovery/core/types.go index 28ec503..1ae2f7a 100644 --- a/internal/controller/discovery/core/types.go +++ b/internal/controller/discovery/core/types.go @@ -4,6 +4,18 @@ type LoaderConfig struct { ChunkSize int } +// EventAction represents the type of a discovery event +type EventAction int + +const ( + // EventDelete indicates that a target should be removed + EventDelete EventAction = iota + // EventCreate indicates that a target should be created + EventCreate + // EventUpdate indicates that a target should be updated + EventUpdate +) + // DiscoveredTarget represents a target discovered from an external source // before it is materialized as a Kubernetes Target CR type DiscoveredTarget struct { @@ -12,14 +24,6 @@ type DiscoveredTarget struct { Labels map[string]string } -type EventAction int - -const ( - DELETE EventAction = 0 - CREATE EventAction = 1 - UPDATE EventAction = 2 -) - type DiscoveryEvent struct { Target DiscoveredTarget Event EventAction diff --git a/internal/controller/discovery/reconciler/message_processor.go b/internal/controller/discovery/reconciler/message_processor.go index 2c4632c..a0e91e5 100644 --- a/internal/controller/discovery/reconciler/message_processor.go +++ b/internal/controller/discovery/reconciler/message_processor.go @@ -258,11 +258,11 @@ func (m *MessageProcessor) processEvent(ctx context.Context, event core.Discover func (m *MessageProcessor) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { switch event.Event { - case core.CREATE: + case core.EventCreate: logger.Info("Would create target", "name", event.Target.Name, "address", event.Target.Address, "labels", event.Target.Labels) - case core.UPDATE: + case core.EventUpdate: logger.Info("Would update target", "name", event.Target.Name, "address", event.Target.Address, "labels", event.Target.Labels) - case core.DELETE: + case core.EventDelete: logger.Info("Would delete target", "name", event.Target.Name) } return nil From 86c0af066faef2af3e75d68d3285c16dc6978bbe Mon Sep 17 00:00:00 2001 From: Daniel Schatzmann Date: Wed, 29 Apr 2026 13:49:19 +0000 Subject: [PATCH 11/23] add webhook activation info to metadata of DiscoveryRegistry --- cmd/main.go | 2 +- internal/apiserver/apiserver.go | 2 +- internal/controller/discovery/core/types.go | 5 +++++ .../controller/discovery/registry/registry.go | 14 +++++++------- internal/controller/targetsource_controller.go | 18 +++++++++++------- 5 files changed, 25 insertions(+), 16 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index e4bad31..4cf6e94 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -86,7 +86,7 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - discoveryRegistry := registry.NewRegistry[types.NamespacedName, []core.DiscoveryMessage]() + discoveryRegistry := registry.NewRegistry[types.NamespacedName, core.DiscoveryRegistryValue]() mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, diff --git a/internal/apiserver/apiserver.go b/internal/apiserver/apiserver.go index 17e5c82..a7ca16a 100644 --- a/internal/apiserver/apiserver.go +++ b/internal/apiserver/apiserver.go @@ -14,7 +14,7 @@ type APIServer struct { Server *http.Server clusterReconciler *controller.ClusterReconciler - DiscoveryRegistry *registry.Registry[types.NamespacedName, []core.DiscoveryMessage] + DiscoveryRegistry *registry.Registry[types.NamespacedName, core.DiscoveryRegistryValue] } func New(addr string, clusterReconciler *controller.ClusterReconciler) *APIServer { diff --git a/internal/controller/discovery/core/types.go b/internal/controller/discovery/core/types.go index 1ae2f7a..68c9c7e 100644 --- a/internal/controller/discovery/core/types.go +++ b/internal/controller/discovery/core/types.go @@ -1,5 +1,10 @@ package core +type DiscoveryRegistryValue struct { + Channel chan<- []DiscoveryMessage + WebhookEnabled bool +} + type LoaderConfig struct { ChunkSize int } diff --git a/internal/controller/discovery/registry/registry.go b/internal/controller/discovery/registry/registry.go index 093bd2c..f2630e8 100644 --- a/internal/controller/discovery/registry/registry.go +++ b/internal/controller/discovery/registry/registry.go @@ -10,20 +10,20 @@ import ( // DO NOT USE a pointer type as K type Registry[K comparable, V any] struct { mu sync.RWMutex - m map[K]chan<- V + m map[K]V } func NewRegistry[K comparable, V any]() *Registry[K, V] { - return &Registry[K, V]{m: make(map[K]chan<- V)} + return &Registry[K, V]{m: make(map[K]V)} } -func (r *Registry[K, V]) Register(key K, ch chan<- V) error { +func (r *Registry[K, V]) Register(key K, value V) error { r.mu.Lock() defer r.mu.Unlock() if _, exists := r.m[key]; exists { return fmt.Errorf("already registered: %v", key) } - r.m[key] = ch + r.m[key] = value return nil } @@ -33,9 +33,9 @@ func (r *Registry[K, V]) Unregister(key K) { r.mu.Unlock() } -func (r *Registry[K, V]) Get(key K) (chan<- V, bool) { +func (r *Registry[K, V]) Get(key K) (V, bool) { r.mu.RLock() - ch, ok := r.m[key] + value, ok := r.m[key] r.mu.RUnlock() - return ch, ok + return value, ok } diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 9078af2..c7e6460 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -66,7 +66,7 @@ type TargetSourceReconciler struct { BufferSize int ChunkSize int - DiscoveryRegistry *registry.Registry[types.NamespacedName, []core.DiscoveryMessage] + DiscoveryRegistry *registry.Registry[types.NamespacedName, core.DiscoveryRegistryValue] } // +kubebuilder:rbac:groups=operator.gnmic.dev,resources=targetsources,verbs=get;list;watch;create;update;patch;delete @@ -108,7 +108,7 @@ func (r *TargetSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, err } - logger.Info("Discover pipeline started") + logger.Info("Discovery pipeline started") return ctrl.Result{}, nil } @@ -161,7 +161,7 @@ func (r *TargetSourceReconciler) ensureFinalizer(ctx context.Context, targetSour return nil } -// startDiscoveryPipeline creates and starts a discover pipeline for a TargetSource +// startDiscoveryPipeline creates and starts a discovery pipeline for a TargetSource // // Pipeline semantics: // 1. target reconciler is mandatory and must start first @@ -169,10 +169,16 @@ func (r *TargetSourceReconciler) ensureFinalizer(ctx context.Context, targetSour // 3. Permanent failure of required components shuts down the pipeline // 4. Shutdown ordering: cancel ctx -> wait for goroutines to exit -> close channel -> unregister func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName, targetSource *gnmicv1alpha1.TargetSource, logger logr.Logger) error { + loaderConfigured := targetSource.Spec.Provider != nil + webhookActivated := targetSource.Spec.Webhook.Enabled != nil && *targetSource.Spec.Webhook.Enabled + supervisor := pipeline.NewSupervisor(context.Background()) targetChannel := make(chan []core.DiscoveryMessage, r.BufferSize) - if err := r.DiscoveryRegistry.Register(key, targetChannel); err != nil { + if err := r.DiscoveryRegistry.Register(key, core.DiscoveryRegistryValue{ + Channel: targetChannel, + WebhookEnabled: webhookActivated, + }); err != nil { return err } @@ -205,8 +211,6 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName } // Create loader instance - loaderConfigured := targetSource.Spec.Provider != nil - webhookConfigured := targetSource.Spec.Webhook.Enabled != nil if loaderConfigured { loader, err := loaders.NewLoader( key, @@ -224,7 +228,7 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName MaxRestarts: pipelineMaxRestarts, Backoff: pipelineBackoff, }, - EscalatesOnFailure: !webhookConfigured, + EscalatesOnFailure: !webhookActivated, Run: func(ctx context.Context) error { return loader.Start(ctx, key, targetSource.Spec, targetChannel) }, From 284b1f290bd7f1c33f6213bba5399fb16ac0dae9 Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 10:33:40 -0600 Subject: [PATCH 12/23] moved reconciler files to discovery --- internal/controller/discovery/{reconciler => }/client.go | 2 +- .../discovery/{reconciler => }/message_processor.go | 2 +- internal/controller/targetsource_controller.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) rename internal/controller/discovery/{reconciler => }/client.go (96%) rename internal/controller/discovery/{reconciler => }/message_processor.go (99%) diff --git a/internal/controller/discovery/reconciler/client.go b/internal/controller/discovery/client.go similarity index 96% rename from internal/controller/discovery/reconciler/client.go rename to internal/controller/discovery/client.go index 4bbbbc1..2deb477 100644 --- a/internal/controller/discovery/reconciler/client.go +++ b/internal/controller/discovery/client.go @@ -1,4 +1,4 @@ -package reconciler +package discovery import ( "context" diff --git a/internal/controller/discovery/reconciler/message_processor.go b/internal/controller/discovery/message_processor.go similarity index 99% rename from internal/controller/discovery/reconciler/message_processor.go rename to internal/controller/discovery/message_processor.go index a0e91e5..6e69c99 100644 --- a/internal/controller/discovery/reconciler/message_processor.go +++ b/internal/controller/discovery/message_processor.go @@ -1,4 +1,4 @@ -package reconciler +package discovery import ( "context" diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index c7e6460..84f9a6f 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -29,10 +29,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery" "github.com/gnmic/operator/internal/controller/discovery/core" "github.com/gnmic/operator/internal/controller/discovery/loaders" "github.com/gnmic/operator/internal/controller/discovery/pipeline" - "github.com/gnmic/operator/internal/controller/discovery/reconciler" "github.com/gnmic/operator/internal/controller/discovery/registry" "github.com/go-logr/logr" ) @@ -183,7 +183,7 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName } // Create target reconciler instance - targetReconciler := reconciler.NewMessageProcessor( + targetReconciler := discovery.NewMessageProcessor( r.Client, r.Scheme, targetSource, From b59897c253b5db8858a03026ae187ac6c8959d19 Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 10:34:55 -0600 Subject: [PATCH 13/23] renamed messageProcessor to targetReconciler --- ...sage_processor.go => target_reconciler.go} | 96 +++++++++---------- .../controller/targetsource_controller.go | 2 +- 2 files changed, 49 insertions(+), 49 deletions(-) rename internal/controller/discovery/{message_processor.go => target_reconciler.go} (72%) diff --git a/internal/controller/discovery/message_processor.go b/internal/controller/discovery/target_reconciler.go similarity index 72% rename from internal/controller/discovery/message_processor.go rename to internal/controller/discovery/target_reconciler.go index 6e69c99..4f3711c 100644 --- a/internal/controller/discovery/message_processor.go +++ b/internal/controller/discovery/target_reconciler.go @@ -20,8 +20,8 @@ type snapshotBuffer struct { complete bool } -// MessageProcessor consumes discovered targets and applies them to Kubernetes -type MessageProcessor struct { +// TargetReconciler consumes discovered targets and applies them to Kubernetes +type TargetReconciler struct { ctx context.Context client client.Client scheme *runtime.Scheme @@ -33,9 +33,9 @@ type MessageProcessor struct { deferredEvents []core.DiscoveryEvent } -// NewMessageProcessor wires a MessageProcessor instance -func NewMessageProcessor(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *MessageProcessor { - return &MessageProcessor{ +// NewTargetReconciler wires a TargetReconciler instance +func NewTargetReconciler(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *TargetReconciler { + return &TargetReconciler{ client: c, scheme: s, targetSource: ts, @@ -45,40 +45,40 @@ func NewMessageProcessor(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.T // Run is a long‑running loop that receives target snapshots // and reconciles Target CRs accordingly -func (m *MessageProcessor) Run(ctx context.Context) error { - m.ctx = ctx +func (r *TargetReconciler) Run(ctx context.Context) error { + r.ctx = ctx - logger := log.FromContext(m.ctx). + logger := log.FromContext(r.ctx). WithValues( - "name", m.targetSource.Name, - "namespace", m.targetSource.Namespace, + "name", r.targetSource.Name, + "namespace", r.targetSource.Namespace, ) logger.Info("target reconciler started") - for m.ctx.Err() == nil { + for r.ctx.Err() == nil { select { - case batch, ok := <-m.in: + case batch, ok := <-r.in: if !ok { // Channel closed, pipeline is shutting down logger.Info("input channel closed, stopping target reconciler") return nil } - m.queue = append(m.queue, batch...) + r.queue = append(r.queue, batch...) case <-ctx.Done(): logger.Info("context canceled, stopping target reconciler") return nil } - for len(m.queue) > 0 { + for len(r.queue) > 0 { if ctx.Err() != nil { return nil // why return nil? } - msg := m.queue[0] - m.queue = m.queue[1:] + msg := r.queue[0] + r.queue = r.queue[1:] - if err := m.processMessage(m.ctx, msg, logger); err != nil { + if err := r.processMessage(r.ctx, msg, logger); err != nil { // Returning error lets the supervisor (controller) // tear down and restart the pipeline via reconciliation // Q: when to return an error vs just log and continue? @@ -92,7 +92,7 @@ func (m *MessageProcessor) Run(ctx context.Context) error { return nil } -func (m *MessageProcessor) processMessage(ctx context.Context, message core.DiscoveryMessage, logger logr.Logger) error { +func (r *TargetReconciler) processMessage(ctx context.Context, message core.DiscoveryMessage, logger logr.Logger) error { if err := ctx.Err(); err != nil { return err } @@ -107,7 +107,7 @@ func (m *MessageProcessor) processMessage(ctx context.Context, message core.Disc "index", msg.ChunkIndex, "targetCount", len(msg.Targets), ) - return m.processSnapshot(ctx, msg, logger) + return r.processSnapshot(ctx, msg, logger) case core.DiscoveryEvent: // Process individual event-driven update @@ -115,7 +115,7 @@ func (m *MessageProcessor) processMessage(ctx context.Context, message core.Disc "received discovery event", "target", msg.Target.Name, ) - return m.processEvent(ctx, msg, logger) + return r.processEvent(ctx, msg, logger) default: return fmt.Errorf("unknonw discovery message type %T", msg) @@ -123,18 +123,18 @@ func (m *MessageProcessor) processMessage(ctx context.Context, message core.Disc } // processSnapshot takes a complete snapshot of discovered targets and reconciles Target CRs accordingly -func (m *MessageProcessor) processSnapshot(ctx context.Context, chunk core.DiscoverySnapshot, logger logr.Logger) error { - if m.activeSnapshot == nil { - m.startNewSnapshot(chunk, logger) +func (r *TargetReconciler) processSnapshot(ctx context.Context, chunk core.DiscoverySnapshot, logger logr.Logger) error { + if r.activeSnapshot == nil { + r.startNewSnapshot(chunk, logger) return nil } - snapshot := m.activeSnapshot + snapshot := r.activeSnapshot // Check if a new snapshot arrived if snapshot.snapshotID != chunk.SnapshotID { // If current snapshot is complete apply it first if snapshot.complete { - if err := m.applySnapshot(ctx, snapshot, logger); err != nil { + if err := r.applySnapshot(ctx, snapshot, logger); err != nil { return err } } else { @@ -147,40 +147,40 @@ func (m *MessageProcessor) processSnapshot(ctx context.Context, chunk core.Disco } // Start collecting the new snapshot - m.startNewSnapshot(chunk, logger) + r.startNewSnapshot(chunk, logger) return nil } - return m.collectSnapshot(chunk, logger) + return r.collectSnapshot(chunk, logger) } -func (m *MessageProcessor) startNewSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) { - m.activeSnapshot = &snapshotBuffer{ +func (r *TargetReconciler) startNewSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) { + r.activeSnapshot = &snapshotBuffer{ snapshotID: chunk.SnapshotID, totalChunks: chunk.TotalChunks, received: make(map[int][]core.DiscoveredTarget), complete: false, } // Delete buffered events that will be current with new snapshot - m.deferredEvents = nil + r.deferredEvents = nil - m.collectSnapshot(chunk, logger) + r.collectSnapshot(chunk, logger) } -func (m *MessageProcessor) collectSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) error { - snapshot := m.activeSnapshot +func (r *TargetReconciler) collectSnapshot(chunk core.DiscoverySnapshot, logger logr.Logger) error { + snapshot := r.activeSnapshot if chunk.TotalChunks != snapshot.totalChunks { logger.Error(nil, "snapshot totalChunks mismatch", "snapshotID", snapshot.snapshotID) } if chunk.ChunkIndex < 0 || chunk.ChunkIndex >= snapshot.totalChunks { logger.Error(nil, "snapshot chunk index out of range", "index", chunk.ChunkIndex) - m.activeSnapshot = nil + r.activeSnapshot = nil return nil } if _, exists := snapshot.received[chunk.ChunkIndex]; exists { logger.Error(nil, "duplicate snapshot chunk", "index", chunk.ChunkIndex) - m.activeSnapshot = nil + r.activeSnapshot = nil return nil } @@ -193,10 +193,10 @@ func (m *MessageProcessor) collectSnapshot(chunk core.DiscoverySnapshot, logger return nil } -func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshotBuffer, logger logr.Logger) error { +func (r *TargetReconciler) applySnapshot(ctx context.Context, snapshot *snapshotBuffer, logger logr.Logger) error { select { case <-ctx.Done(): - m.activeSnapshot = nil + r.activeSnapshot = nil return nil default: } @@ -205,7 +205,7 @@ func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshot for i := 0; i < snapshot.totalChunks; i++ { select { case <-ctx.Done(): - m.activeSnapshot = nil + r.activeSnapshot = nil return nil default: } @@ -213,7 +213,7 @@ func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshot chunk, ok := snapshot.received[i] if !ok { logger.Error(nil, "missing snapshot chunk", "index", i) - m.activeSnapshot = nil + r.activeSnapshot = nil return nil } allTargets = append(allTargets, chunk...) @@ -229,34 +229,34 @@ func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshot // a.applyTargets // Replay deferred events - for _, event := range m.deferredEvents { + for _, event := range r.deferredEvents { select { case <-ctx.Done(): return nil default: } - if err := m.applyEvent(ctx, event, logger); err != nil { + if err := r.applyEvent(ctx, event, logger); err != nil { return err } } - m.activeSnapshot = nil - m.deferredEvents = nil + r.activeSnapshot = nil + r.deferredEvents = nil return nil } -func (m *MessageProcessor) processEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { +func (r *TargetReconciler) processEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { // If snapshot collecting is active defer events - if m.activeSnapshot != nil { - m.deferredEvents = append(m.deferredEvents, event) + if r.activeSnapshot != nil { + r.deferredEvents = append(r.deferredEvents, event) return nil } // Apply events - return m.applyEvent(ctx, event, logger) + return r.applyEvent(ctx, event, logger) } -func (m *MessageProcessor) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { +func (r *TargetReconciler) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { switch event.Event { case core.EventCreate: logger.Info("Would create target", "name", event.Target.Name, "address", event.Target.Address, "labels", event.Target.Labels) diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 84f9a6f..65a4cf9 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -183,7 +183,7 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName } // Create target reconciler instance - targetReconciler := discovery.NewMessageProcessor( + targetReconciler := discovery.NewTargetReconciler( r.Client, r.Scheme, targetSource, From c268808d67eb8df1d7328c0658b36bd369eda489 Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 10:38:23 -0600 Subject: [PATCH 14/23] moved registry.go to discovery --- internal/controller/discovery/{registry => }/registry.go | 2 +- internal/controller/targetsource_controller.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) rename internal/controller/discovery/{registry => }/registry.go (97%) diff --git a/internal/controller/discovery/registry/registry.go b/internal/controller/discovery/registry.go similarity index 97% rename from internal/controller/discovery/registry/registry.go rename to internal/controller/discovery/registry.go index f2630e8..0afa2b2 100644 --- a/internal/controller/discovery/registry/registry.go +++ b/internal/controller/discovery/registry.go @@ -1,4 +1,4 @@ -package registry +package discovery import ( "fmt" diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 65a4cf9..3b62b6d 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -33,7 +33,6 @@ import ( "github.com/gnmic/operator/internal/controller/discovery/core" "github.com/gnmic/operator/internal/controller/discovery/loaders" "github.com/gnmic/operator/internal/controller/discovery/pipeline" - "github.com/gnmic/operator/internal/controller/discovery/registry" "github.com/go-logr/logr" ) @@ -66,7 +65,7 @@ type TargetSourceReconciler struct { BufferSize int ChunkSize int - DiscoveryRegistry *registry.Registry[types.NamespacedName, core.DiscoveryRegistryValue] + DiscoveryRegistry *discovery.Registry[types.NamespacedName, core.DiscoveryRegistryValue] } // +kubebuilder:rbac:groups=operator.gnmic.dev,resources=targetsources,verbs=get;list;watch;create;update;patch;delete From 02958966b77f80ee3fc1f0e447b98967e54e9c2a Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 10:39:32 -0600 Subject: [PATCH 15/23] moved supervisor to discovery --- .../controller/discovery/{pipeline => }/supervisor.go | 2 +- internal/controller/targetsource_controller.go | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) rename internal/controller/discovery/{pipeline => }/supervisor.go (99%) diff --git a/internal/controller/discovery/pipeline/supervisor.go b/internal/controller/discovery/supervisor.go similarity index 99% rename from internal/controller/discovery/pipeline/supervisor.go rename to internal/controller/discovery/supervisor.go index 042d305..56fa687 100644 --- a/internal/controller/discovery/pipeline/supervisor.go +++ b/internal/controller/discovery/supervisor.go @@ -1,4 +1,4 @@ -package pipeline +package discovery import ( "context" diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 3b62b6d..301e421 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -32,7 +32,6 @@ import ( "github.com/gnmic/operator/internal/controller/discovery" "github.com/gnmic/operator/internal/controller/discovery/core" "github.com/gnmic/operator/internal/controller/discovery/loaders" - "github.com/gnmic/operator/internal/controller/discovery/pipeline" "github.com/go-logr/logr" ) @@ -171,7 +170,7 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName loaderConfigured := targetSource.Spec.Provider != nil webhookActivated := targetSource.Spec.Webhook.Enabled != nil && *targetSource.Spec.Webhook.Enabled - supervisor := pipeline.NewSupervisor(context.Background()) + supervisor := discovery.NewSupervisor(context.Background()) targetChannel := make(chan []core.DiscoveryMessage, r.BufferSize) if err := r.DiscoveryRegistry.Register(key, core.DiscoveryRegistryValue{ @@ -190,9 +189,9 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName ) // Start target reconciler reconcilerReady := make(chan struct{}) - supervisor.StartSupervisedComponent(pipeline.ComponentSpec{ + supervisor.StartSupervisedComponent(discovery.ComponentSpec{ Name: "target-reconciler", - Policy: pipeline.RestartPolicy{ + Policy: discovery.RestartPolicy{ MaxRestarts: pipelineMaxRestarts, Backoff: pipelineBackoff, }, @@ -221,9 +220,9 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName return err } - supervisor.StartSupervisedComponent(pipeline.ComponentSpec{ + supervisor.StartSupervisedComponent(discovery.ComponentSpec{ Name: "loader", - Policy: pipeline.RestartPolicy{ + Policy: discovery.RestartPolicy{ MaxRestarts: pipelineMaxRestarts, Backoff: pipelineBackoff, }, From 4d32c40fb2e319fa2ff77a9c05f576ba6e0dba4d Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 10:40:26 -0600 Subject: [PATCH 16/23] moved factory.go to discovery/loaders.go --- .../controller/discovery/{loaders/factory.go => loaders.go} | 2 +- internal/controller/targetsource_controller.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) rename internal/controller/discovery/{loaders/factory.go => loaders.go} (97%) diff --git a/internal/controller/discovery/loaders/factory.go b/internal/controller/discovery/loaders.go similarity index 97% rename from internal/controller/discovery/loaders/factory.go rename to internal/controller/discovery/loaders.go index 45bf9c1..0d8ddd3 100644 --- a/internal/controller/discovery/loaders/factory.go +++ b/internal/controller/discovery/loaders.go @@ -1,4 +1,4 @@ -package loaders +package discovery import ( "fmt" diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 301e421..9ba2c94 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -31,7 +31,6 @@ import ( gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery" "github.com/gnmic/operator/internal/controller/discovery/core" - "github.com/gnmic/operator/internal/controller/discovery/loaders" "github.com/go-logr/logr" ) @@ -210,7 +209,7 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName // Create loader instance if loaderConfigured { - loader, err := loaders.NewLoader( + loader, err := discovery.NewLoader( key, targetSource.Spec, core.LoaderConfig{ChunkSize: r.ChunkSize}, From 7671c1a20aa7a48a26cf306c55ef0698c1ec448f Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 10:54:58 -0600 Subject: [PATCH 17/23] moved send.go to loaders package --- .../discovery/loaders/http/loader.go | 3 ++- .../discovery/{core => loaders}/send.go | 22 ++++++++++--------- 2 files changed, 14 insertions(+), 11 deletions(-) rename internal/controller/discovery/{core => loaders}/send.go (67%) diff --git a/internal/controller/discovery/loaders/http/loader.go b/internal/controller/discovery/loaders/http/loader.go index 09bb7d6..1e5fc37 100644 --- a/internal/controller/discovery/loaders/http/loader.go +++ b/internal/controller/discovery/loaders/http/loader.go @@ -10,6 +10,7 @@ import ( gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery/core" + "github.com/gnmic/operator/internal/controller/discovery/loaders" "github.com/google/uuid" ) @@ -66,7 +67,7 @@ func (l *Loader) Start( }, } - if err := core.SendSnapshot(ctx, out, targets, snapshotID, l.cfg.ChunkSize); err != nil { + if err := loaders.SendSnapshot(ctx, out, targets, snapshotID, l.cfg.ChunkSize); err != nil { return err } } diff --git a/internal/controller/discovery/core/send.go b/internal/controller/discovery/loaders/send.go similarity index 67% rename from internal/controller/discovery/core/send.go rename to internal/controller/discovery/loaders/send.go index f24b50c..1377432 100644 --- a/internal/controller/discovery/core/send.go +++ b/internal/controller/discovery/loaders/send.go @@ -1,12 +1,14 @@ -package core +package loaders import ( "context" "fmt" + + "github.com/gnmic/operator/internal/controller/discovery/core" ) // sendMessages sends discovery messages over a channel in a context-aware manner -func sendMessages(ctx context.Context, out chan<- []DiscoveryMessage, messages []DiscoveryMessage) error { +func sendMessages(ctx context.Context, out chan<- []core.DiscoveryMessage, messages []core.DiscoveryMessage) error { select { case <-ctx.Done(): return ctx.Err() @@ -30,14 +32,14 @@ func forEachChunk(total, chunkSize int, fn func(start, end int) error) error { } // createDiscoverySnapshots takes a list of discovered targets and returns chunked DiscoverySnapshots -func createDiscoverySnapshots(targets []DiscoveredTarget, snapshotID string, chunkSize int) []DiscoverySnapshot { - var snapshots []DiscoverySnapshot +func createDiscoverySnapshots(targets []core.DiscoveredTarget, snapshotID string, chunkSize int) []core.DiscoverySnapshot { + var snapshots []core.DiscoverySnapshot totalTargets := len(targets) totalChunks := (totalTargets + chunkSize - 1) / chunkSize _ = forEachChunk(totalTargets, chunkSize, func(i, end int) error { chunk := targets[i:end] - snapshots = append(snapshots, DiscoverySnapshot{ + snapshots = append(snapshots, core.DiscoverySnapshot{ Targets: chunk, SnapshotID: snapshotID, ChunkIndex: i / chunkSize, @@ -50,7 +52,7 @@ func createDiscoverySnapshots(targets []DiscoveredTarget, snapshotID string, chu } // SendSnapshot sends discovered targets as a snapshot over a channel in chunks -func SendSnapshot(ctx context.Context, out chan<- []DiscoveryMessage, targets []DiscoveredTarget, snapshotID string, chunkSize int) error { +func SendSnapshot(ctx context.Context, out chan<- []core.DiscoveryMessage, targets []core.DiscoveredTarget, snapshotID string, chunkSize int) error { if len(targets) == 0 { return fmt.Errorf("no targets in Snapshot") } @@ -58,7 +60,7 @@ func SendSnapshot(ctx context.Context, out chan<- []DiscoveryMessage, targets [] snapshots := createDiscoverySnapshots(targets, snapshotID, chunkSize) for _, snapshot := range snapshots { // Convert DiscoverySnapshot to DiscoveryMessage - messages := make([]DiscoveryMessage, 1) + messages := make([]core.DiscoveryMessage, 1) messages[0] = snapshot if err := sendMessages(ctx, out, messages); err != nil { @@ -69,8 +71,8 @@ func SendSnapshot(ctx context.Context, out chan<- []DiscoveryMessage, targets [] return nil } -func eventsToMessages(events []DiscoveryEvent) []DiscoveryMessage { - message := make([]DiscoveryMessage, len(events)) +func eventsToMessages(events []core.DiscoveryEvent) []core.DiscoveryMessage { + message := make([]core.DiscoveryMessage, len(events)) for i, event := range events { message[i] = event } @@ -78,7 +80,7 @@ func eventsToMessages(events []DiscoveryEvent) []DiscoveryMessage { } // SendEvents sends discovery messages over channel in a context-aware manner -func SendEvents(ctx context.Context, out chan<- []DiscoveryMessage, events []DiscoveryEvent, chunkSize int) error { +func SendEvents(ctx context.Context, out chan<- []core.DiscoveryMessage, events []core.DiscoveryEvent, chunkSize int) error { if len(events) == 0 { return fmt.Errorf("no events to process") } From 5f1e9cbe91d28e837ff7fbfae4029df45f27c001 Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 10:55:59 -0600 Subject: [PATCH 18/23] eliminated message.go --- internal/controller/discovery/core/message.go | 4 ---- internal/controller/discovery/core/message_interface.go | 3 +++ 2 files changed, 3 insertions(+), 4 deletions(-) delete mode 100644 internal/controller/discovery/core/message.go diff --git a/internal/controller/discovery/core/message.go b/internal/controller/discovery/core/message.go deleted file mode 100644 index af4f6c1..0000000 --- a/internal/controller/discovery/core/message.go +++ /dev/null @@ -1,4 +0,0 @@ -package core - -func (DiscoveryEvent) isDiscoveryMessage() {} -func (DiscoverySnapshot) isDiscoveryMessage() {} diff --git a/internal/controller/discovery/core/message_interface.go b/internal/controller/discovery/core/message_interface.go index 07b819e..0836bc6 100644 --- a/internal/controller/discovery/core/message_interface.go +++ b/internal/controller/discovery/core/message_interface.go @@ -3,3 +3,6 @@ package core type DiscoveryMessage interface { isDiscoveryMessage() } + +func (DiscoveryEvent) isDiscoveryMessage() {} +func (DiscoverySnapshot) isDiscoveryMessage() {} From 6d6753731ca36cdafa5a251e164ed1b70eafd3dc Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 10:56:39 -0600 Subject: [PATCH 19/23] moved const.go to discovery.go --- internal/controller/discovery/client.go | 3 +-- internal/controller/discovery/{core => }/const.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) rename internal/controller/discovery/{core => }/const.go (81%) diff --git a/internal/controller/discovery/client.go b/internal/controller/discovery/client.go index 2deb477..cb02161 100644 --- a/internal/controller/discovery/client.go +++ b/internal/controller/discovery/client.go @@ -6,7 +6,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" - "github.com/gnmic/operator/internal/controller/discovery/core" ) func fetchExistingTargets( @@ -22,7 +21,7 @@ func fetchExistingTargets( &targetList, client.InNamespace(ts.Namespace), client.MatchingLabels{ - core.LabelTargetSourceName: ts.Name, + LabelTargetSourceName: ts.Name, }, ) if err != nil { diff --git a/internal/controller/discovery/core/const.go b/internal/controller/discovery/const.go similarity index 81% rename from internal/controller/discovery/core/const.go rename to internal/controller/discovery/const.go index 82a5962..ac7a57f 100644 --- a/internal/controller/discovery/core/const.go +++ b/internal/controller/discovery/const.go @@ -1,4 +1,4 @@ -package core +package discovery const ( // Labels From 391463097c6caab4b89c72de9789efe8b346e8bf Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 11:28:29 -0600 Subject: [PATCH 20/23] renamed core package within targetsource controller --- internal/controller/targetsource_controller.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 9ba2c94..e52b02b 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -30,7 +30,7 @@ import ( gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery" - "github.com/gnmic/operator/internal/controller/discovery/core" + discoveryTypes "github.com/gnmic/operator/internal/controller/discovery/core" "github.com/go-logr/logr" ) @@ -63,7 +63,7 @@ type TargetSourceReconciler struct { BufferSize int ChunkSize int - DiscoveryRegistry *discovery.Registry[types.NamespacedName, core.DiscoveryRegistryValue] + DiscoveryRegistry *discovery.Registry[types.NamespacedName, discoveryTypes.DiscoveryRegistryValue] } // +kubebuilder:rbac:groups=operator.gnmic.dev,resources=targetsources,verbs=get;list;watch;create;update;patch;delete @@ -171,8 +171,8 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName supervisor := discovery.NewSupervisor(context.Background()) - targetChannel := make(chan []core.DiscoveryMessage, r.BufferSize) - if err := r.DiscoveryRegistry.Register(key, core.DiscoveryRegistryValue{ + targetChannel := make(chan []discoveryTypes.DiscoveryMessage, r.BufferSize) + if err := r.DiscoveryRegistry.Register(key, discoveryTypes.DiscoveryRegistryValue{ Channel: targetChannel, WebhookEnabled: webhookActivated, }); err != nil { @@ -212,7 +212,7 @@ func (r *TargetSourceReconciler) startDiscoveryPipeline(key types.NamespacedName loader, err := discovery.NewLoader( key, targetSource.Spec, - core.LoaderConfig{ChunkSize: r.ChunkSize}, + discoveryTypes.LoaderConfig{ChunkSize: r.ChunkSize}, ) if err != nil { supervisor.Stop() From 46a201fc1d9f0dc9cc73825477f789fc3cb3e860 Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 11:34:42 -0600 Subject: [PATCH 21/23] changed events to delete / apply --- internal/controller/discovery/core/types.go | 6 ++---- internal/controller/discovery/target_reconciler.go | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/internal/controller/discovery/core/types.go b/internal/controller/discovery/core/types.go index 68c9c7e..2c37fc7 100644 --- a/internal/controller/discovery/core/types.go +++ b/internal/controller/discovery/core/types.go @@ -15,10 +15,8 @@ type EventAction int const ( // EventDelete indicates that a target should be removed EventDelete EventAction = iota - // EventCreate indicates that a target should be created - EventCreate - // EventUpdate indicates that a target should be updated - EventUpdate + // EventApply indicates that a target should be applied (created or updated) + EventApply ) // DiscoveredTarget represents a target discovered from an external source diff --git a/internal/controller/discovery/target_reconciler.go b/internal/controller/discovery/target_reconciler.go index 4f3711c..86470c6 100644 --- a/internal/controller/discovery/target_reconciler.go +++ b/internal/controller/discovery/target_reconciler.go @@ -258,12 +258,10 @@ func (r *TargetReconciler) processEvent(ctx context.Context, event core.Discover func (r *TargetReconciler) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { switch event.Event { - case core.EventCreate: - logger.Info("Would create target", "name", event.Target.Name, "address", event.Target.Address, "labels", event.Target.Labels) - case core.EventUpdate: - logger.Info("Would update target", "name", event.Target.Name, "address", event.Target.Address, "labels", event.Target.Labels) case core.EventDelete: logger.Info("Would delete target", "name", event.Target.Name) + case core.EventApply: + logger.Info("Would apply target", "name", event.Target.Name, "address", event.Target.Address, "labels", event.Target.Labels) } return nil } From 7b17f7e77644abff70f5796704e36b10bf03da15 Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 11:37:39 -0600 Subject: [PATCH 22/23] moved send.go into separate utils for loaders --- internal/controller/discovery/loaders/http/loader.go | 4 ++-- internal/controller/discovery/loaders/{ => utils}/send.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename internal/controller/discovery/loaders/{ => utils}/send.go (99%) diff --git a/internal/controller/discovery/loaders/http/loader.go b/internal/controller/discovery/loaders/http/loader.go index 1e5fc37..d7d5961 100644 --- a/internal/controller/discovery/loaders/http/loader.go +++ b/internal/controller/discovery/loaders/http/loader.go @@ -10,7 +10,7 @@ import ( gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery/core" - "github.com/gnmic/operator/internal/controller/discovery/loaders" + loaderUtils "github.com/gnmic/operator/internal/controller/discovery/loaders/utils" "github.com/google/uuid" ) @@ -67,7 +67,7 @@ func (l *Loader) Start( }, } - if err := loaders.SendSnapshot(ctx, out, targets, snapshotID, l.cfg.ChunkSize); err != nil { + if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.cfg.ChunkSize); err != nil { return err } } diff --git a/internal/controller/discovery/loaders/send.go b/internal/controller/discovery/loaders/utils/send.go similarity index 99% rename from internal/controller/discovery/loaders/send.go rename to internal/controller/discovery/loaders/utils/send.go index 1377432..3cfba8d 100644 --- a/internal/controller/discovery/loaders/send.go +++ b/internal/controller/discovery/loaders/utils/send.go @@ -1,4 +1,4 @@ -package loaders +package utils import ( "context" From 4540163d4137a27a291846a5960ecf09844bf5f8 Mon Sep 17 00:00:00 2001 From: Valentino Diller Date: Wed, 29 Apr 2026 11:45:43 -0600 Subject: [PATCH 23/23] replaced legacy registry package --- cmd/main.go | 4 ++-- internal/apiserver/apiserver.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 4cf6e94..aaf398a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -42,7 +42,7 @@ import ( "github.com/gnmic/operator/internal/apiserver" "github.com/gnmic/operator/internal/controller" "github.com/gnmic/operator/internal/controller/discovery/core" - "github.com/gnmic/operator/internal/controller/discovery/registry" + "github.com/gnmic/operator/internal/controller/discovery" webhookv1alpha1 "github.com/gnmic/operator/internal/webhook/v1alpha1" //+kubebuilder:scaffold:imports ) @@ -86,7 +86,7 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - discoveryRegistry := registry.NewRegistry[types.NamespacedName, core.DiscoveryRegistryValue]() + discoveryRegistry := discovery.NewRegistry[types.NamespacedName, core.DiscoveryRegistryValue]() mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, diff --git a/internal/apiserver/apiserver.go b/internal/apiserver/apiserver.go index a7ca16a..705b277 100644 --- a/internal/apiserver/apiserver.go +++ b/internal/apiserver/apiserver.go @@ -6,7 +6,7 @@ import ( "github.com/gnmic/operator/internal/controller" "github.com/gnmic/operator/internal/controller/discovery/core" - "github.com/gnmic/operator/internal/controller/discovery/registry" + "github.com/gnmic/operator/internal/controller/discovery" "k8s.io/apimachinery/pkg/types" ) @@ -14,7 +14,7 @@ type APIServer struct { Server *http.Server clusterReconciler *controller.ClusterReconciler - DiscoveryRegistry *registry.Registry[types.NamespacedName, core.DiscoveryRegistryValue] + DiscoveryRegistry *discovery.Registry[types.NamespacedName, core.DiscoveryRegistryValue] } func New(addr string, clusterReconciler *controller.ClusterReconciler) *APIServer {