From 2790dce8ef37f7e82709babac20a934fd8f6a067 Mon Sep 17 00:00:00 2001 From: Karim Radhouani Date: Tue, 17 Feb 2026 18:36:03 -0800 Subject: [PATCH] add Target Status using gNMIc SSE endpoint --- api/v1alpha1/target_types.go | 39 +- api/v1alpha1/zz_generated.deepcopy.go | 32 +- cmd/main.go | 7 + .../crd/bases/operator.gnmic.dev_targets.yaml | 69 ++- config/rbac/role.yaml | 1 + internal/controller/cluster_controller.go | 38 +- internal/controller/targetstate_controller.go | 531 ++++++++++++++++++ internal/gnmic/sse.go | 160 ++++++ 8 files changed, 848 insertions(+), 29 deletions(-) create mode 100644 internal/controller/targetstate_controller.go create mode 100644 internal/gnmic/sse.go diff --git a/api/v1alpha1/target_types.go b/api/v1alpha1/target_types.go index 3c2f794..a2e0e65 100644 --- a/api/v1alpha1/target_types.go +++ b/api/v1alpha1/target_types.go @@ -28,19 +28,46 @@ type TargetSpec struct { Profile string `json:"profile"` } -// TargetStatus defines the observed state of Target +// TargetStatus defines the observed state of Target. +// A single Target may be collected by multiple Clusters (via different Pipelines), +// so the status is reported per-cluster. type TargetStatus struct { - // The connection state of the target - ConnectionState string `json:"connectionState"` - LastConnected metav1.Time `json:"lastConnected"` - LastDisconnected metav1.Time `json:"lastDisconnected"` - LastError string `json:"lastError"` + // Number of clusters currently collecting this target. + Clusters int32 `json:"clusters"` + // Aggregate connection state across all clusters. + // READY if all clusters report READY, DEGRADED if any do not. + // Empty when no clusters are collecting this target. + ConnectionState string `json:"connectionState,omitempty"` + // Per-cluster target state, keyed by Cluster CR name. + // A target may be collected by multiple clusters (via different pipelines). + // +optional + ClusterStates map[string]ClusterTargetState `json:"clusterStates,omitempty"` +} + +// ClusterTargetState represents the state of a target on a specific gNMIc cluster pod. +type ClusterTargetState struct { + // The pod within the cluster that currently owns this target. + Pod string `json:"pod"` + // The target's operational state (starting, running, stopping, stopped, failed). + State string `json:"state,omitempty"` + // The reason for failure when state is "failed". + // +optional + FailedReason string `json:"failedReason,omitempty"` + // The gNMI connection state (CONNECTING, READY, TRANSIENT_FAILURE, etc.). + ConnectionState string `json:"connectionState,omitempty"` + // Per-subscription state (subscription name -> running/stopped). + // +optional + Subscriptions map[string]string `json:"subscriptions,omitempty"` + // When this state was last updated by the gNMIc pod. + LastUpdated metav1.Time `json:"lastUpdated,omitempty"` } //+kubebuilder:object:root=true //+kubebuilder:subresource:status // +kubebuilder:printcolumn:name="Address",type=string,JSONPath=`.spec.address` // +kubebuilder:printcolumn:name="Profile",type=string,JSONPath=`.spec.profile` +// +kubebuilder:printcolumn:name="Clusters",type=integer,JSONPath=`.status.clusters` +// +kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.connectionState` // Target is the Schema for the targets API type Target struct { diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 9a22e5c..ffd628e 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -185,6 +185,29 @@ func (in *ClusterTLSConfig) DeepCopy() *ClusterTLSConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterTargetState) DeepCopyInto(out *ClusterTargetState) { + *out = *in + if in.Subscriptions != nil { + in, out := &in.Subscriptions, &out.Subscriptions + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + in.LastUpdated.DeepCopyInto(&out.LastUpdated) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterTargetState. +func (in *ClusterTargetState) DeepCopy() *ClusterTargetState { + if in == nil { + return nil + } + out := new(ClusterTargetState) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ConsulConfig) DeepCopyInto(out *ConsulConfig) { *out = *in @@ -1284,8 +1307,13 @@ func (in *TargetSpec) DeepCopy() *TargetSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TargetStatus) DeepCopyInto(out *TargetStatus) { *out = *in - in.LastConnected.DeepCopyInto(&out.LastConnected) - in.LastDisconnected.DeepCopyInto(&out.LastDisconnected) + if in.ClusterStates != nil { + in, out := &in.ClusterStates, &out.ClusterStates + *out = make(map[string]ClusterTargetState, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TargetStatus. diff --git a/cmd/main.go b/cmd/main.go index fe4263d..6c5e73c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -178,6 +178,13 @@ func main() { os.Exit(1) } } + if err = (&controller.TargetStateReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "TargetState") + os.Exit(1) + } if err := (&controller.TunnelTargetPolicyReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), diff --git a/config/crd/bases/operator.gnmic.dev_targets.yaml b/config/crd/bases/operator.gnmic.dev_targets.yaml index b0a42ce..bd648ec 100644 --- a/config/crd/bases/operator.gnmic.dev_targets.yaml +++ b/config/crd/bases/operator.gnmic.dev_targets.yaml @@ -21,6 +21,12 @@ spec: - jsonPath: .spec.profile name: Profile type: string + - jsonPath: .status.clusters + name: Clusters + type: integer + - jsonPath: .status.connectionState + name: State + type: string name: v1alpha1 schema: openAPIV3Schema: @@ -57,24 +63,59 @@ spec: - profile type: object status: - description: TargetStatus defines the observed state of Target + description: |- + TargetStatus defines the observed state of Target. + A single Target may be collected by multiple Clusters (via different Pipelines), + so the status is reported per-cluster. properties: + clusterStates: + additionalProperties: + description: ClusterTargetState represents the state of a target + on a specific gNMIc cluster pod. + properties: + connectionState: + description: The gNMI connection state (CONNECTING, READY, TRANSIENT_FAILURE, + etc.). + type: string + failedReason: + description: The reason for failure when state is "failed". + type: string + lastUpdated: + description: When this state was last updated by the gNMIc pod. + format: date-time + type: string + pod: + description: The pod within the cluster that currently owns + this target. + type: string + state: + description: The target's operational state (starting, running, + stopping, stopped, failed). + type: string + subscriptions: + additionalProperties: + type: string + description: Per-subscription state (subscription name -> running/stopped). + type: object + required: + - pod + type: object + description: |- + Per-cluster target state, keyed by Cluster CR name. + A target may be collected by multiple clusters (via different pipelines). + type: object + clusters: + description: Number of clusters currently collecting this target. + format: int32 + type: integer connectionState: - description: The connection state of the target - type: string - lastConnected: - format: date-time - type: string - lastDisconnected: - format: date-time - type: string - lastError: + description: |- + Aggregate connection state across all clusters. + READY if all clusters report READY, DEGRADED if any do not. + Empty when no clusters are collecting this target. type: string required: - - connectionState - - lastConnected - - lastDisconnected - - lastError + - clusters type: object type: object served: true diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index ee698b5..1f96c7c 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -87,6 +87,7 @@ rules: resources: - clusters/status - pipelines/status + - targets/status - targetsources/status - tunneltargetpolicies/status verbs: diff --git a/internal/controller/cluster_controller.go b/internal/controller/cluster_controller.go index 99393ae..34e7b6d 100644 --- a/internal/controller/cluster_controller.go +++ b/internal/controller/cluster_controller.go @@ -1978,13 +1978,24 @@ func (r *ClusterReconciler) updatePipelineStatus(ctx context.Context, pipeline * } } - // update status if changed + // update status if changed, with retry on conflict if !pipelineStatusEqual(pipeline.Status, newStatus) { - pipeline.Status = newStatus - if err := r.Status().Update(ctx, pipeline); err != nil { - return fmt.Errorf("failed to update pipeline status: %w", err) + pipelineNN := types.NamespacedName{Name: pipeline.Name, Namespace: pipeline.Namespace} + for attempt := 0; attempt < 5; attempt++ { + // re-fetch to get the latest resourceVersion + if err := r.Get(ctx, pipelineNN, pipeline); err != nil { + return fmt.Errorf("failed to re-fetch pipeline: %w", err) + } + pipeline.Status = newStatus + if err := r.Status().Update(ctx, pipeline); err != nil { + if errors.IsConflict(err) { + continue + } + return fmt.Errorf("failed to update pipeline status: %w", err) + } + logger.Info("updated pipeline status", "pipeline", pipeline.Name, "targets", newStatus.TargetsCount) + break } - logger.Info("updated pipeline status", "pipeline", pipeline.Name, "targets", newStatus.TargetsCount) } return nil @@ -2032,8 +2043,21 @@ func (r *ClusterReconciler) updatePipelineStatusWithError(ctx context.Context, p }, } - pipeline.Status = newStatus - return r.Status().Update(ctx, pipeline) + pipelineNN := types.NamespacedName{Name: pipeline.Name, Namespace: pipeline.Namespace} + for attempt := 0; attempt < 5; attempt++ { + if err := r.Get(ctx, pipelineNN, pipeline); err != nil { + return fmt.Errorf("failed to re-fetch pipeline: %w", err) + } + pipeline.Status = newStatus + if err := r.Status().Update(ctx, pipeline); err != nil { + if errors.IsConflict(err) { + continue + } + return fmt.Errorf("failed to update pipeline status: %w", err) + } + return nil + } + return fmt.Errorf("failed to update pipeline status after retries: conflict") } // listPipelinesForCluster returns all enabled Pipelines that reference this Cluster diff --git a/internal/controller/targetstate_controller.go b/internal/controller/targetstate_controller.go new file mode 100644 index 0000000..c7b9b0c --- /dev/null +++ b/internal/controller/targetstate_controller.go @@ -0,0 +1,531 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "net/http" + "os" + "strings" + "sync" + "time" + + certmanagerv1 "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/gnmic" +) + +const ( + sseTargetsPath = "/api/v1/sse/targets?store=state" + pollTargetsPath = "/api/v1/targets" + reconnectMinDelay = 2 * time.Second + reconnectMaxDelay = 10 * time.Second + pollInterval = 15 * time.Second // TODO: make it an ENV var + sseStreamBufferCapacity = 1024 // TODO: make it an ENV var +) + +// TargetStateReconciler watches Cluster resources and manages SSE stream +// goroutines to collect target state from gNMIc pods. State updates are +// reflected into the Target CR's .status.clusterStates field. +type TargetStateReconciler struct { + client.Client + Scheme *runtime.Scheme + + // mu protects the streams map. + mu sync.Mutex + // streams tracks active SSE goroutines. + // Key: "namespace/clusterName/podIndex" + streams map[string]context.CancelFunc +} + +// +kubebuilder:rbac:groups=operator.gnmic.dev,resources=clusters,verbs=get;list;watch +// +kubebuilder:rbac:groups=operator.gnmic.dev,resources=targets,verbs=get;list +// +kubebuilder:rbac:groups=operator.gnmic.dev,resources=targets/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get +// +kubebuilder:rbac:groups=cert-manager.io,resources=issuers,verbs=get +// +kubebuilder:rbac:groups="",resources=secrets,verbs=get + +func (r *TargetStateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx).WithValues("controller", "TargetState") + + r.mu.Lock() + defer r.mu.Unlock() + + if r.streams == nil { + r.streams = make(map[string]context.CancelFunc) + } + + // fetch the cluster + var cluster gnmicv1alpha1.Cluster + if err := r.Get(ctx, req.NamespacedName, &cluster); err != nil { + if apierrors.IsNotFound(err) { + // cluster deleted — stop all streams for it + r.stopStreamsForCluster(req.Namespace, req.Name) + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + // if replicas is nil or 0, stop all streams + if cluster.Spec.Replicas == nil || *cluster.Spec.Replicas == 0 { + r.stopStreamsForCluster(cluster.Namespace, cluster.Name) + return ctrl.Result{}, nil + } + + // check if statefulset is ready + stsName := fmt.Sprintf("%s%s", resourcePrefix, cluster.Name) + var sts appsv1.StatefulSet + if err := r.Get(ctx, types.NamespacedName{Name: stsName, Namespace: cluster.Namespace}, &sts); err != nil { + if apierrors.IsNotFound(err) { + // statefulset not created yet, stop existing streams and requeue + r.stopStreamsForCluster(cluster.Namespace, cluster.Name) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } + return ctrl.Result{}, err + } + + desiredPods := int(*cluster.Spec.Replicas) + + // build the set of desired stream keys + desiredKeys := make(map[string]struct{}, desiredPods) + for i := 0; i < desiredPods; i++ { + key := streamKey(cluster.Namespace, cluster.Name, i) + desiredKeys[key] = struct{}{} + } + + // stop streams that are no longer needed (e.g., scale down) + prefix := cluster.Namespace + "/" + cluster.Name + "/" + for key, cancel := range r.streams { + if !strings.HasPrefix(key, prefix) { + continue + } + if _, ok := desiredKeys[key]; !ok { + logger.Info("stopping SSE stream (pod removed)", "key", key) + cancel() + delete(r.streams, key) + } + } + + // start streams for new pods + for i := 0; i < desiredPods; i++ { + key := streamKey(cluster.Namespace, cluster.Name, i) + if _, ok := r.streams[key]; ok { + continue // already running + } + + podURL, err := r.buildPodSSEURL(&cluster, stsName, i) + if err != nil { + logger.Error(err, "failed to build pod SSE URL", "podIndex", i) + continue + } + + streamCtx, cancel := context.WithCancel(context.Background()) + r.streams[key] = cancel + + logger.Info("starting SSE stream", "key", key, "url", podURL) + go r.runStream(streamCtx, &cluster, stsName, i, podURL) + } + + return ctrl.Result{}, nil +} + +// runStream manages the SSE connection lifecycle for a single pod. +// It reconnects with exponential backoff on failure. +func (r *TargetStateReconciler) runStream(ctx context.Context, cluster *gnmicv1alpha1.Cluster, stsName string, podIndex int, sseURL string) { + logger := log.FromContext(ctx).WithValues( + "cluster", cluster.Name, + "namespace", cluster.Namespace, + "pod", fmt.Sprintf("%s-%d", stsName, podIndex), + ) + + podName := fmt.Sprintf("%s-%d", stsName, podIndex) + pollURL := r.buildPodPollURL(cluster, stsName, podIndex) + delay := reconnectMinDelay + + for { + select { + case <-ctx.Done(): + return + default: + } + + httpClient, err := r.createHTTPClient(ctx, cluster) + if err != nil { + logger.Error(err, "failed to create HTTP client, retrying") + sleepOrDone(ctx, delay) + delay = backoff(delay) + continue + } + + events := make(chan gnmic.SSEEvent, sseStreamBufferCapacity) + + // start the SSE stream reader in a separate goroutine + streamDone := make(chan error, 1) + go func() { + streamDone <- gnmic.StreamTargetState(ctx, httpClient, sseURL, events) + }() + + // reset backoff on successful connection + delay = reconnectMinDelay + + // process events and poll periodically until the stream ends + r.processEvents(ctx, httpClient, events, streamDone, cluster.Name, cluster.Namespace, podName, pollURL, logger) + + // check if context was cancelled (intentional stop) + select { + case <-ctx.Done(): + return + default: + logger.Info("SSE stream disconnected, reconnecting", "delay", delay) + sleepOrDone(ctx, delay) + delay = backoff(delay) + } + } +} + +// processEvents reads from the events channel, updates Target CR statuses, +// and periodically polls the pod for a full state snapshot to catch missed events. +func (r *TargetStateReconciler) processEvents( + ctx context.Context, + httpClient *http.Client, + events <-chan gnmic.SSEEvent, + streamDone <-chan error, + clusterName, namespace, podName, pollURL string, + logger interface { + Info(string, ...any) + Error(error, string, ...any) + }, +) { + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case err := <-streamDone: + if err != nil { + logger.Error(err, "SSE stream ended with error") + } + return + case event := <-events: + r.handleEvent(ctx, event, clusterName, namespace, podName) + case <-ticker.C: + r.pollAndSync(ctx, httpClient, pollURL, clusterName, podName, logger) + } + } +} + +// pollAndSync fetches the full target state from a pod and reconciles it +// with the Target CR statuses to catch any events missed by the SSE stream. +func (r *TargetStateReconciler) pollAndSync( + ctx context.Context, + httpClient *http.Client, + pollURL, clusterName, podName string, + logger interface { + Info(string, ...any) + Error(error, string, ...any) + }, +) { + entries, err := gnmic.PollTargetState(ctx, httpClient, pollURL) + if err != nil { + logger.Error(err, "periodic poll failed") + return + } + + for _, entry := range entries { + if entry.State == nil { + continue + } + + targetNamespace, targetName, ok := parseTargetName(entry.Name) + if !ok { + continue + } + + targetNN := types.NamespacedName{Name: targetName, Namespace: targetNamespace} + + for attempt := 0; attempt < maxConflictRetries; attempt++ { + var target gnmicv1alpha1.Target + if err := r.Get(ctx, targetNN, &target); err != nil { + if !apierrors.IsNotFound(err) { + logger.Error(err, "poll: failed to get target", "target", entry.Name) + } + break + } + + if target.Status.ClusterStates == nil { + target.Status.ClusterStates = make(map[string]gnmicv1alpha1.ClusterTargetState) + } + + target.Status.ClusterStates[clusterName] = gnmicv1alpha1.ClusterTargetState{ + Pod: podName, + State: entry.State.State, + FailedReason: entry.State.FailedReason, + ConnectionState: entry.State.ConnectionState, + Subscriptions: entry.State.Subscriptions, + LastUpdated: metav1.NewTime(entry.State.LastUpdated), + } + + computeStatusSummary(&target.Status) + + if err := r.Status().Update(ctx, &target); err != nil { + if apierrors.IsConflict(err) { + continue + } + logger.Error(err, "poll: failed to update target status", "target", entry.Name) + } + break + } + } +} + +const maxConflictRetries = 5 + +// handleEvent processes a single SSE event and updates the corresponding Target CR status. +func (r *TargetStateReconciler) handleEvent(ctx context.Context, event gnmic.SSEEvent, clusterName, namespace, podName string) { + logger := log.FromContext(ctx) + + targetNamespace, targetName, ok := parseTargetName(event.Data.Name) + if !ok { + logger.Info("skipping event with invalid target name", "name", event.Data.Name) + return + } + + stateObj, err := gnmic.ParseTargetStateObject(event.Data.Object) + if err != nil { + logger.Error(err, "failed to parse target state object", "target", event.Data.Name) + return + } + + targetNN := types.NamespacedName{Name: targetName, Namespace: targetNamespace} + + for attempt := 0; attempt < maxConflictRetries; attempt++ { + // always fetch a fresh copy before updating + var target gnmicv1alpha1.Target + if err := r.Get(ctx, targetNN, &target); err != nil { + if !apierrors.IsNotFound(err) { + logger.Error(err, "failed to get target", "target", event.Data.Name) + } + return + } + + if target.Status.ClusterStates == nil { + target.Status.ClusterStates = make(map[string]gnmicv1alpha1.ClusterTargetState) + } + + if event.EventType == gnmic.SSEEventDelete { + delete(target.Status.ClusterStates, clusterName) + } else { + target.Status.ClusterStates[clusterName] = gnmicv1alpha1.ClusterTargetState{ + Pod: podName, + State: stateObj.State, + FailedReason: stateObj.FailedReason, + ConnectionState: stateObj.ConnectionState, + Subscriptions: stateObj.Subscriptions, + LastUpdated: metav1.NewTime(stateObj.LastUpdated), + } + } + + computeStatusSummary(&target.Status) + + if err := r.Status().Update(ctx, &target); err != nil { + if apierrors.IsConflict(err) { + continue // retry with a fresh read + } + logger.Error(err, "failed to update target status", "target", event.Data.Name) + return + } + return // success + } + + logger.Info("giving up after max conflict retries", "target", event.Data.Name, "retries", maxConflictRetries) +} + +// computeStatusSummary updates the top-level summary fields (Clusters, ConnectionState) +// based on the current ClusterStates map. +func computeStatusSummary(status *gnmicv1alpha1.TargetStatus) { + status.Clusters = int32(len(status.ClusterStates)) + if status.Clusters == 0 { + status.ConnectionState = "" + return + } + allReady := true + for _, s := range status.ClusterStates { + if s.ConnectionState != "READY" { + allReady = false + break + } + } + if allReady { + status.ConnectionState = "READY" + } else { + status.ConnectionState = "DEGRADED" + } +} + +// stopStreamsForCluster cancels all SSE goroutines for the given cluster. +func (r *TargetStateReconciler) stopStreamsForCluster(namespace, clusterName string) { + prefix := namespace + "/" + clusterName + "/" + for key, cancel := range r.streams { + if strings.HasPrefix(key, prefix) { + cancel() + delete(r.streams, key) + } + } +} + +func (r *TargetStateReconciler) podBaseURL(cluster *gnmicv1alpha1.Cluster, stsName string, podIndex int) string { + restPort := int32(defaultRestPort) + if cluster.Spec.API != nil && cluster.Spec.API.RestPort != 0 { + restPort = cluster.Spec.API.RestPort + } + + podDNS := fmt.Sprintf("%s-%d.%s.%s.svc.cluster.local", stsName, podIndex, stsName, cluster.Namespace) + scheme := "http" + if cluster.Spec.API != nil && cluster.Spec.API.TLS != nil && cluster.Spec.API.TLS.IssuerRef != "" { + scheme = "https" + } + + return fmt.Sprintf("%s://%s:%d", scheme, podDNS, restPort) +} + +func (r *TargetStateReconciler) buildPodSSEURL(cluster *gnmicv1alpha1.Cluster, stsName string, podIndex int) (string, error) { + return r.podBaseURL(cluster, stsName, podIndex) + sseTargetsPath, nil +} + +func (r *TargetStateReconciler) buildPodPollURL(cluster *gnmicv1alpha1.Cluster, stsName string, podIndex int) string { + return r.podBaseURL(cluster, stsName, podIndex) + pollTargetsPath +} + +// createHTTPClient creates an HTTP client with appropriate TLS configuration for the cluster. +func (r *TargetStateReconciler) createHTTPClient(ctx context.Context, cluster *gnmicv1alpha1.Cluster) (*http.Client, error) { + if cluster.Spec.API == nil || cluster.Spec.API.TLS == nil { + return &http.Client{}, nil + } + tlsConfig := &tls.Config{} + if cluster.Spec.API.TLS.IssuerRef != "" { + cert, err := os.ReadFile(gnmic.GetControllerCertPath()) + if err != nil { + return nil, fmt.Errorf("failed to read controller cert: %w", err) + } + key, err := os.ReadFile(gnmic.GetControllerKeyPath()) + if err != nil { + return nil, fmt.Errorf("failed to read controller key: %w", err) + } + certificate, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + tlsConfig.Certificates = []tls.Certificate{certificate} + + ca, err := r.getIssuerCA(ctx, cluster.Namespace, cluster.Spec.API.TLS.IssuerRef) + if err != nil { + return nil, fmt.Errorf("failed to get issuer CA: %w", err) + } + tlsConfig.RootCAs = x509.NewCertPool() + tlsConfig.RootCAs.AppendCertsFromPEM(ca) + } + if cluster.Spec.API.TLS.BundleRef != "" { + ca, err := os.ReadFile(gnmic.GetControllerCAPath()) + if err != nil { + return nil, fmt.Errorf("failed to read controller CA: %w", err) + } + if tlsConfig.RootCAs == nil { + tlsConfig.RootCAs = x509.NewCertPool() + } + tlsConfig.RootCAs.AppendCertsFromPEM(ca) + } + return &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + }, nil +} + +// getIssuerCA fetches the CA certificate from a cert-manager Issuer's backing secret. +func (r *TargetStateReconciler) getIssuerCA(ctx context.Context, namespace, issuerName string) ([]byte, error) { + issuer := &certmanagerv1.Issuer{} + if err := r.Get(ctx, types.NamespacedName{Name: issuerName, Namespace: namespace}, issuer); err != nil { + return nil, fmt.Errorf("failed to get issuer %s: %w", issuerName, err) + } + if issuer.Spec.CA == nil || issuer.Spec.CA.SecretName == "" { + return nil, fmt.Errorf("issuer %s is not a CA issuer or has no secret configured", issuerName) + } + caSecret := &corev1.Secret{} + if err := r.Get(ctx, types.NamespacedName{Name: issuer.Spec.CA.SecretName, Namespace: namespace}, caSecret); err != nil { + return nil, fmt.Errorf("failed to get CA secret %s: %w", issuer.Spec.CA.SecretName, err) + } + caCert, ok := caSecret.Data["tls.crt"] + if !ok { + return nil, fmt.Errorf("CA secret %s does not contain tls.crt", issuer.Spec.CA.SecretName) + } + return caCert, nil +} + +func (r *TargetStateReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&gnmicv1alpha1.Cluster{}). + WithEventFilter(predicate.GenerationChangedPredicate{}). + Named("targetstate"). + Complete(r) +} + +// streamKey builds a unique key for tracking a stream goroutine. +func streamKey(namespace, clusterName string, podIndex int) string { + return fmt.Sprintf("%s/%s/%d", namespace, clusterName, podIndex) +} + +// parseTargetName splits "namespace/name" into its parts. +func parseTargetName(name string) (namespace, targetName string, ok bool) { + parts := strings.SplitN(name, "/", 2) + if len(parts) != 2 { + return "", "", false + } + return parts[0], parts[1], true +} + +func backoff(current time.Duration) time.Duration { + next := current * 2 + if next > reconnectMaxDelay { + return reconnectMaxDelay + } + return next +} + +func sleepOrDone(ctx context.Context, d time.Duration) { + timer := time.NewTimer(d) + defer timer.Stop() + select { + case <-ctx.Done(): + case <-timer.C: + } +} diff --git a/internal/gnmic/sse.go b/internal/gnmic/sse.go new file mode 100644 index 0000000..c95f00e --- /dev/null +++ b/internal/gnmic/sse.go @@ -0,0 +1,160 @@ +package gnmic + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" +) + +// SSE event types returned by the gNMIc pods. +const ( + SSEEventCreate = "create" + SSEEventUpdate = "update" + SSEEventDelete = "delete" +) + +// SSE store types. +const ( + SSEStoreState = "state" + SSEStoreConfig = "config" +) + +// SSEEvent represents a parsed SSE event from a gNMIc pod. +type SSEEvent struct { + // The SSE event type (create, update, delete). + EventType string + // The parsed event data. + Data SSEEventData +} + +// SSEEventData represents the JSON payload of an SSE event. +type SSEEventData struct { + Timestamp time.Time `json:"timestamp"` + Store string `json:"store"` + Kind string `json:"kind"` + Name string `json:"name"` + EventType string `json:"event-type"` + Object json.RawMessage `json:"object"` +} + +// TargetStateObject represents the "object" field of a target state SSE event. +type TargetStateObject struct { + IntendedState string `json:"intended-state"` + State string `json:"state"` + FailedReason string `json:"failed-reason,omitempty"` + LastUpdated time.Time `json:"last-updated"` + ConnectionState string `json:"connection-state"` + Subscriptions map[string]string `json:"subscriptions"` +} + +// StreamTargetState opens an SSE connection to a gNMIc pod and sends parsed +// target state events to the provided channel. It blocks until the context is +// cancelled or the connection is closed. Returns an error on connection failure +// or unexpected stream termination. +func StreamTargetState(ctx context.Context, httpClient *http.Client, podURL string, events chan<- SSEEvent) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, podURL, nil) + if err != nil { + return fmt.Errorf("failed to create SSE request: %w", err) + } + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("Cache-Control", "no-cache") + + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("SSE connection failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("SSE endpoint returned status %d", resp.StatusCode) + } + + scanner := bufio.NewScanner(resp.Body) + var currentEventType string + + for scanner.Scan() { + line := scanner.Text() + + // keepalive comment + if strings.HasPrefix(line, ":") { + continue + } + + // empty line = end of event (but we process on "data:" line) + if line == "" { + currentEventType = "" + continue + } + if after, found := strings.CutPrefix(line, "event: "); found { + currentEventType = after + continue + } + if dataStr, found := strings.CutPrefix(line, "data: "); found { + + var data SSEEventData + if err := json.Unmarshal([]byte(dataStr), &data); err != nil { + continue // skip malformed events + } + + // only forward target state events + if data.Kind != "targets" || data.Store != SSEStoreState { + continue + } + + events <- SSEEvent{ + EventType: currentEventType, + Data: data, + } + } + } + + if err := scanner.Err(); err != nil { + return fmt.Errorf("SSE stream error: %w", err) + } + + return nil +} + +// ParseTargetStateObject parses the raw JSON object from a target state SSE event. +func ParseTargetStateObject(raw json.RawMessage) (*TargetStateObject, error) { + var obj TargetStateObject + if err := json.Unmarshal(raw, &obj); err != nil { + return nil, fmt.Errorf("failed to parse target state object: %w", err) + } + return &obj, nil +} + +// PollTargetEntry represents a single target returned by GET /api/v1/targets. +type PollTargetEntry struct { + Name string `json:"name"` + State *TargetStateObject `json:"state"` +} + +// PollTargetState fetches the full target state snapshot from a gNMIc pod. +func PollTargetState(ctx context.Context, httpClient *http.Client, pollURL string) ([]PollTargetEntry, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, pollURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create poll request: %w", err) + } + + resp, err := httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("poll request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("poll endpoint returned status %d", resp.StatusCode) + } + + var entries []PollTargetEntry + if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil { + return nil, fmt.Errorf("failed to decode poll response: %w", err) + } + + return entries, nil +}