Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3ba86cb
restructured loaders package
mcdillson Apr 27, 2026
d0ac86b
restructured target handler
mcdillson Apr 27, 2026
7ef1281
renamed target applier to message processor & created client.go for g…
mcdillson Apr 27, 2026
4fa58b6
Merge branch 'feature/limit-target-loading' into feature/restructuring
mcdillson Apr 27, 2026
d10fc9a
removed all package
mcdillson Apr 27, 2026
b7dd036
remove unused fiels
denyost Apr 29, 2026
d3a9b5c
rename files and restructure packages
denyost Apr 29, 2026
0c80394
rename target handler to target reconciler
denyost Apr 29, 2026
04208bf
rename handler to reconciler
denyost Apr 29, 2026
c3818ce
clarify interface files
denyost Apr 29, 2026
e4df0d4
define EventAction to be go idomatic
denyost Apr 29, 2026
86c0af0
add webhook activation info to metadata of DiscoveryRegistry
denyost Apr 29, 2026
284b1f2
moved reconciler files to discovery
mcdillson Apr 29, 2026
b59897c
renamed messageProcessor to targetReconciler
mcdillson Apr 29, 2026
c268808
moved registry.go to discovery
mcdillson Apr 29, 2026
0295896
moved supervisor to discovery
mcdillson Apr 29, 2026
4d32c40
moved factory.go to discovery/loaders.go
mcdillson Apr 29, 2026
7671c1a
moved send.go to loaders package
mcdillson Apr 29, 2026
5f1e9cb
eliminated message.go
mcdillson Apr 29, 2026
6d67537
moved const.go to discovery.go
mcdillson Apr 29, 2026
3914630
renamed core package within targetsource controller
mcdillson Apr 29, 2026
46a201f
changed events to delete / apply
mcdillson Apr 29, 2026
7b17f7e
moved send.go into separate utils for loaders
mcdillson Apr 29, 2026
4540163
replaced legacy registry package
mcdillson Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
"github.com/gnmic/operator/internal/apiserver"
"github.com/gnmic/operator/internal/controller"
"github.com/gnmic/operator/internal/controller/discovery/core"
"github.com/gnmic/operator/internal/controller/discovery/registry"
"github.com/gnmic/operator/internal/controller/discovery"
webhookv1alpha1 "github.com/gnmic/operator/internal/webhook/v1alpha1"
//+kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -86,7 +86,7 @@ func main() {

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

discoveryRegistry := registry.NewRegistry[types.NamespacedName, []core.DiscoveryMessage]()
discoveryRegistry := discovery.NewRegistry[types.NamespacedName, core.DiscoveryRegistryValue]()

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Expand Down
4 changes: 2 additions & 2 deletions internal/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (

"github.com/gnmic/operator/internal/controller"
"github.com/gnmic/operator/internal/controller/discovery/core"
"github.com/gnmic/operator/internal/controller/discovery/registry"
"github.com/gnmic/operator/internal/controller/discovery"
"k8s.io/apimachinery/pkg/types"
)

type APIServer struct {
Server *http.Server
clusterReconciler *controller.ClusterReconciler

DiscoveryRegistry *registry.Registry[types.NamespacedName, []core.DiscoveryMessage]
DiscoveryRegistry *discovery.Registry[types.NamespacedName, core.DiscoveryRegistryValue]
}

func New(addr string, clusterReconciler *controller.ClusterReconciler) *APIServer {
Expand Down
16 changes: 10 additions & 6 deletions internal/controller/discovery/client.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package discovery

// File may become obsolete, depends on how the logic to compare desired vs. existing state will get implemented

import (
"context"

"sigs.k8s.io/controller-runtime/pkg/client"

gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1"
"github.com/gnmic/operator/internal/controller/discovery/core"
)

func FetchExistingTargets(ctx context.Context, c client.Client, ts gnmicv1alpha1.TargetSource) ([]gnmicv1alpha1.Target, error) {
func fetchExistingTargets(
ctx context.Context,
c client.Client,
ts *gnmicv1alpha1.TargetSource,
) ([]gnmicv1alpha1.Target, error) {

var targetList gnmicv1alpha1.TargetList

err := c.List(ctx, &targetList,
err := c.List(
ctx,
&targetList,
client.InNamespace(ts.Namespace),
client.MatchingLabels{
core.LabelTargetSourceName: ts.Name,
LabelTargetSourceName: ts.Name,
},
)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package core
package discovery

const (
// Labels
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/discovery/core/loader_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Loader interface {
Name() string

// Start begins discovery and pushes target snapshots or events into the out channel
// The loader must stop cleanly when ctx is cancelled
// The loader must stop cleanly when ctx is canceled
Start(
ctx context.Context,
targetsourceName types.NamespacedName,
Expand Down
23 changes: 15 additions & 8 deletions internal/controller/discovery/core/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
package core

type DiscoveryRegistryValue struct {
Channel chan<- []DiscoveryMessage
WebhookEnabled bool
}

type LoaderConfig struct {
ChunkSize int
}

// EventAction represents the type of a discovery event
type EventAction int

const (
// EventDelete indicates that a target should be removed
EventDelete EventAction = iota
// EventApply indicates that a target should be applied (created or updated)
EventApply
)

// DiscoveredTarget represents a target discovered from an external source
// before it is materialized as a Kubernetes Target CR
type DiscoveredTarget struct {
Expand All @@ -12,14 +27,6 @@ type DiscoveredTarget struct {
Labels map[string]string
}

const (
DELETE EventAction = 0
CREATE EventAction = 1
UPDATE EventAction = 2
)

type EventAction int

type DiscoveryEvent struct {
Target DiscoveredTarget
Event EventAction
Expand Down
17 changes: 17 additions & 0 deletions internal/controller/discovery/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package discovery

// Package discovery implements the discovery runtime subsystem.
//
// The discovery subsystem is responsible for:
// - Receiving discovery data from external providers (loaders, webhooks).
// - Supervising discovery pipelines and restart semantics.
// - Applying discovered state to Kubernetes Targets.
//
// The package is structured into the following subpackages:
// - core: message contracts, snapshot/event types, and transport helpers.
// - pipeline: supervision, restart policies, and lifecycle control.
// - reconciler: snapshot + event target state application logic.
// - loaders: target discovery providers (HTTP, webhook, etc.).
// - registry: key -> channel registry.
//
// At the moment, the targetsource controller imports specific subpackages explicitly.
5 changes: 0 additions & 5 deletions internal/controller/discovery/loaders/all/all.go

This file was deleted.

3 changes: 2 additions & 1 deletion internal/controller/discovery/loaders/http/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1"
"github.com/gnmic/operator/internal/controller/discovery/core"
loaderUtils "github.com/gnmic/operator/internal/controller/discovery/loaders/utils"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -66,7 +67,7 @@ func (l *Loader) Start(
},
}

if err := core.SendSnapshot(ctx, out, targets, snapshotID, l.cfg.ChunkSize); err != nil {
if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.cfg.ChunkSize); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package core
package utils

import (
"context"
"fmt"

"github.com/gnmic/operator/internal/controller/discovery/core"
)

// sendMessages sends discovery messages over a channel in a context-aware manner
func sendMessages(ctx context.Context, out chan<- []DiscoveryMessage, messages []DiscoveryMessage) error {
func sendMessages(ctx context.Context, out chan<- []core.DiscoveryMessage, messages []core.DiscoveryMessage) error {
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -30,14 +32,14 @@ func forEachChunk(total, chunkSize int, fn func(start, end int) error) error {
}

// createDiscoverySnapshots takes a list of discovered targets and returns chunked DiscoverySnapshots
func createDiscoverySnapshots(targets []DiscoveredTarget, snapshotID string, chunkSize int) []DiscoverySnapshot {
var snapshots []DiscoverySnapshot
func createDiscoverySnapshots(targets []core.DiscoveredTarget, snapshotID string, chunkSize int) []core.DiscoverySnapshot {
var snapshots []core.DiscoverySnapshot
totalTargets := len(targets)
totalChunks := (totalTargets + chunkSize - 1) / chunkSize

_ = forEachChunk(totalTargets, chunkSize, func(i, end int) error {
chunk := targets[i:end]
snapshots = append(snapshots, DiscoverySnapshot{
snapshots = append(snapshots, core.DiscoverySnapshot{
Targets: chunk,
SnapshotID: snapshotID,
ChunkIndex: i / chunkSize,
Expand All @@ -50,15 +52,15 @@ func createDiscoverySnapshots(targets []DiscoveredTarget, snapshotID string, chu
}

// SendSnapshot sends discovered targets as a snapshot over a channel in chunks
func SendSnapshot(ctx context.Context, out chan<- []DiscoveryMessage, targets []DiscoveredTarget, snapshotID string, chunkSize int) error {
func SendSnapshot(ctx context.Context, out chan<- []core.DiscoveryMessage, targets []core.DiscoveredTarget, snapshotID string, chunkSize int) error {
if len(targets) == 0 {
return fmt.Errorf("no targets in Snapshot")
}

snapshots := createDiscoverySnapshots(targets, snapshotID, chunkSize)
for _, snapshot := range snapshots {
// Convert DiscoverySnapshot to DiscoveryMessage
messages := make([]DiscoveryMessage, 1)
messages := make([]core.DiscoveryMessage, 1)
messages[0] = snapshot

if err := sendMessages(ctx, out, messages); err != nil {
Expand All @@ -69,16 +71,16 @@ func SendSnapshot(ctx context.Context, out chan<- []DiscoveryMessage, targets []
return nil
}

func eventsToMessages(events []DiscoveryEvent) []DiscoveryMessage {
message := make([]DiscoveryMessage, len(events))
func eventsToMessages(events []core.DiscoveryEvent) []core.DiscoveryMessage {
message := make([]core.DiscoveryMessage, len(events))
for i, event := range events {
message[i] = event
}
return message
}

// SendEvents sends discovery messages over channel in a context-aware manner
func SendEvents(ctx context.Context, out chan<- []DiscoveryMessage, events []DiscoveryEvent, chunkSize int) error {
func SendEvents(ctx context.Context, out chan<- []core.DiscoveryMessage, events []core.DiscoveryEvent, chunkSize int) error {
if len(events) == 0 {
return fmt.Errorf("no events to process")
}
Expand Down
4 changes: 0 additions & 4 deletions internal/controller/discovery/mapper.go

This file was deleted.

1 change: 0 additions & 1 deletion internal/controller/discovery/mapper_test.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package registry
package discovery

import (
"fmt"
Expand All @@ -10,20 +10,20 @@ import (
// DO NOT USE a pointer type as K
type Registry[K comparable, V any] struct {
mu sync.RWMutex
m map[K]chan<- V
m map[K]V
}

func NewRegistry[K comparable, V any]() *Registry[K, V] {
return &Registry[K, V]{m: make(map[K]chan<- V)}
return &Registry[K, V]{m: make(map[K]V)}
}

func (r *Registry[K, V]) Register(key K, ch chan<- V) error {
func (r *Registry[K, V]) Register(key K, value V) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.m[key]; exists {
return fmt.Errorf("already registered: %v", key)
}
r.m[key] = ch
r.m[key] = value
return nil
}

Expand All @@ -33,9 +33,9 @@ func (r *Registry[K, V]) Unregister(key K) {
r.mu.Unlock()
}

func (r *Registry[K, V]) Get(key K) (chan<- V, bool) {
func (r *Registry[K, V]) Get(key K) (V, bool) {
r.mu.RLock()
ch, ok := r.m[key]
value, ok := r.m[key]
r.mu.RUnlock()
return ch, ok
return value, ok
}
3 changes: 2 additions & 1 deletion internal/controller/discovery/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ type Supervisor struct {
stopped bool
}

// RestartPolicy defines the restart behavior for a component
// RestartPolicy defines restart behavior of a component
type RestartPolicy struct {
MaxRestarts int
Backoff time.Duration
}

// ComponentSpec defines a supervised component
type ComponentSpec struct {
Name string
Run func(ctx context.Context) error
Expand Down
Loading