diff --git a/api/v1alpha1/cluster_types.go b/api/v1alpha1/cluster_types.go index 91e832f..f1eb1e0 100644 --- a/api/v1alpha1/cluster_types.go +++ b/api/v1alpha1/cluster_types.go @@ -47,6 +47,15 @@ type ClusterSpec struct { // Environment variables to set in the gNMIc pods Env []corev1.EnvVar `json:"env,omitempty"` + + // The target distribution configuration + TargetDistribution *TargetDistributionConfig `json:"targetDistribution,omitempty"` +} + +type TargetDistributionConfig struct { + // The capacity per pod for distributing targets + // To be used in conjunction with Horizontal Pod Autoscaling (HPA) scaling. + PodCapacity int `json:"podCapacity,omitempty"` } type APIConfig struct { @@ -107,6 +116,9 @@ type ClusterStatus struct { PipelinesCount int32 `json:"pipelinesCount"` // The number of targets referenced by the pipelines TargetsCount int32 `json:"targetsCount"` + // The number of targets that could not be assigned to any pod due to capacity limits. + // Non-zero when total targets exceed numPods × perPodCapacity. + UnassignedTargets int32 `json:"unassignedTargets"` // The number of subscriptions referenced by the pipelines SubscriptionsCount int32 `json:"subscriptionsCount"` // The number of inputs referenced by the pipelines @@ -124,6 +136,7 @@ type ClusterStatus struct { // +kubebuilder:printcolumn:name="Ready",type=integer,JSONPath=`.status.readyReplicas` // +kubebuilder:printcolumn:name="Pipelines",type=integer,JSONPath=`.status.pipelinesCount` // +kubebuilder:printcolumn:name="Targets",type=integer,JSONPath=`.status.targetsCount` +// +kubebuilder:printcolumn:name="Unassigned",type=integer,JSONPath=`.status.unassignedTargets` // +kubebuilder:printcolumn:name="Subs",type=integer,JSONPath=`.status.subscriptionsCount` // +kubebuilder:printcolumn:name="Inputs",type=integer,JSONPath=`.status.inputsCount` // +kubebuilder:printcolumn:name="Outputs",type=integer,JSONPath=`.status.outputsCount` diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index ffd628e..07b0239 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -136,6 +136,11 @@ func (in *ClusterSpec) DeepCopyInto(out *ClusterSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.TargetDistribution != nil { + in, out := &in.TargetDistribution, &out.TargetDistribution + *out = new(TargetDistributionConfig) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterSpec. @@ -1035,6 +1040,21 @@ func (in *Target) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TargetDistributionConfig) DeepCopyInto(out *TargetDistributionConfig) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TargetDistributionConfig. +func (in *TargetDistributionConfig) DeepCopy() *TargetDistributionConfig { + if in == nil { + return nil + } + out := new(TargetDistributionConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TargetList) DeepCopyInto(out *TargetList) { *out = *in diff --git a/config/crd/bases/operator.gnmic.dev_clusters.yaml b/config/crd/bases/operator.gnmic.dev_clusters.yaml index f48c567..ccc58a6 100644 --- a/config/crd/bases/operator.gnmic.dev_clusters.yaml +++ b/config/crd/bases/operator.gnmic.dev_clusters.yaml @@ -30,6 +30,9 @@ spec: - jsonPath: .status.targetsCount name: Targets type: integer + - jsonPath: .status.unassignedTargets + name: Unassigned + type: integer - jsonPath: .status.subscriptionsCount name: Subs type: integer @@ -396,6 +399,15 @@ spec: More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ type: object type: object + targetDistribution: + description: The target distribution configuration + properties: + podCapacity: + description: |- + The capacity per pod for distributing targets + To be used in conjunction with Horizontal Pod Autoscaling (HPA) scaling. + type: integer + type: object required: - image type: object @@ -486,6 +498,12 @@ spec: description: The number of targets referenced by the pipelines format: int32 type: integer + unassignedTargets: + description: |- + The number of targets that could not be assigned to any pod due to capacity limits. + Non-zero when total targets exceed numPods × perPodCapacity. + format: int32 + type: integer required: - inputsCount - outputsCount @@ -494,6 +512,7 @@ spec: - selector - subscriptionsCount - targetsCount + - unassignedTargets type: object type: object served: true diff --git a/docs/content/docs/advanced/scaling.md b/docs/content/docs/advanced/scaling.md index 48e3049..8805905 100644 --- a/docs/content/docs/advanced/scaling.md +++ b/docs/content/docs/advanced/scaling.md @@ -28,25 +28,33 @@ spec: ### Scale Up ( 3 → 5 pods) -1. Kubernetes creates new pods (`gnmic-3`, `gnmic-4`) -2. Operator waits for pods to be ready -3. Operator redistributes targets using bounded load rendezvous hashing -4. Some targets move from existing pods to new pods -5. Configuration is applied to all pods +1. Kubernetes creates new pods (`gnmic-3`, `gnmic-4`). +2. Operator waits for pods to be ready. +3. Operator recomputes the distribution plan. Existing target assignments are + preserved — only unassigned targets or targets displaced by capacity limits + are placed on the new pods. +4. Configuration is applied to all pods. ### Scale Down ( 5 → 3 pods) -1. Operator redistributes targets away from pods being removed -2. Configuration is applied to remaining pods -3. Kubernetes terminates pods (`gnmic-4`, `gnmic-3`) -4. Targets from terminated pods are handled by remaining pods +1. Operator recomputes the distribution plan for the reduced replica count. + Targets from removed pods flow through rendezvous hashing onto surviving + pods, bounded by each pod's capacity. +2. Configuration is applied to remaining pods. +3. Kubernetes terminates pods (`gnmic-4`, `gnmic-3`). ## Target Redistribution -The operator uses **bounded load rendezvous hashing** to distribute targets: +The operator uses **bounded load rendezvous hashing** to distribute targets. +See [Target Distribution](../target-distribution/) for a detailed explanation +of the algorithm. -- **Stable**: Same target tends to stay on same pod -- **Even**: Targets are distributed evenly (within 1-2 of each other) +Key properties: + +- **Stable**: Targets stay on their current pod unless forced to move. +- **Even**: No pod exceeds its capacity. +- **Current-assignment aware**: The operator reads each target's current pod + from its status and feeds this as input to the algorithm, minimizing churn. ### Example Distribution @@ -56,10 +64,10 @@ Pod 0: [target1, target5, target8] (3 targets) Pod 1: [target2, target4, target9] (3 targets) Pod 2: [target3, target6, target7, target10] (4 targets) -# After scaling to 4 pods +# After scaling to 4 pods — existing assignments are preserved Pod 0: [target1, target5, target8] (3 targets) - unchanged -Pod 1: [target2, target4] (2 targets) - lost target9 -Pod 2: [target3, target7, target10] (3 targets) - lost target6 +Pod 1: [target2, target4] (2 targets) - target9 moved to new pod +Pod 2: [target3, target7, target10] (3 targets) - target6 moved to new pod Pod 3: [target6, target9] (2 targets) - new pod ``` @@ -105,43 +113,18 @@ gnmic_target_status{cluster="my-cluster"} ## Horizontal Pod Autoscaler -The operator's Cluster resource supports the `scale` subresource, allowing you to enable automatic scaling using the Horizontal Pod Autoscaler (HPA). - -To set up autoscaling, create an HPA resource that targets the Cluster resource. Specify the desired minimum and maximum number of replicas, as well as the metrics that will determine when scaling occurs: - -```yaml -apiVersion: autoscaling/v2 -kind: HorizontalPodAutoscaler -metadata: - name: gnmic-c1-hpa -spec: - scaleTargetRef: - apiVersion: operator.gnmic.dev/v1alpha1 - kind: Cluster - name: c1 - minReplicas: 1 - maxReplicas: 10 - metrics: - - type: Resource - resource: - name: cpu - target: - type: Utilization - averageUtilization: 70 -``` +The operator's Cluster resource supports the `scale` subresource, allowing you +to use the Horizontal Pod Autoscaler (HPA) for automatic scaling. -> **Note:** You must install the Kubernetes metrics server to enable HPA based on CPU or Memory: -> -> ```shell -> kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml -> ``` +> HPA scales the **Cluster CR**, not the StatefulSet directly. This ensures the +> operator remains in control of target redistribution and configuration rollout. -### Autoscaling based on custom resources +### Scaling based on target count (recommended) -gNMIc pods provide various Prometheus metrics that can be leveraged by an HPA resource for autoscaling. +Target count is the most accurate scaling signal for gNMIc — CPU/memory don't +reliably reflect the load from long-lived gRPC streaming connections. -One common use case is to scale based on the number of targets assigned to each Pod. -The gNMIc pods export metrics like: +gNMIc pods export per-target metrics: ``` gnmic_target_up{name="default/leaf1"} 0 @@ -149,12 +132,19 @@ gnmic_target_up{name="default/leaf2"} 0 gnmic_target_up{name="default/spine1"} 1 ``` -Here, a value of `1` indicates that the target is present, while `0` denotes it is absent. +A value of `1` indicates that the target is present, `0` denotes it is absent. -With [Prometheus Adapter](https://github.com/kubernetes-sigs/prometheus-adapter), this metric can be made available as `targets_per_pod{cluster="c1", pod="gnmic-c1-0"}` = 1. -You can use the following promQL to aggregate these into a “targets per pod” metric: `sum(gnmic_target_up == 1) by (namespace, pod)`. +With [Prometheus Adapter](https://github.com/kubernetes-sigs/prometheus-adapter), +aggregate these into a per-pod metric: -> You can assign `namespace` and `pod` labels to metrics using scrape configurations or relabeling. +```promql +sum(gnmic_target_up{namespace!="",pod!=""} == 1) by (namespace, pod) +``` + +> You can assign `namespace` and `pod` labels to metrics using scrape +> configurations or relabeling. + +Example Prometheus Adapter rule: ```yaml apiVersion: v1 @@ -181,9 +171,8 @@ data: sum(gnmic_target_up{<<.LabelMatchers>>} == 1) by (namespace, pod) ``` -The corresponding HPA resource would look like this: - -In other words: Scale **Cluster** `c1` to a max of `10` replicas if the average number of targets present in the current pods is above `30`. +The corresponding HPA resource — scale Cluster `c1` up when the average number +of targets per pod exceeds 75: ```yaml apiVersion: autoscaling/v2 @@ -204,8 +193,104 @@ spec: name: gnmic_targets_present target: type: AverageValue - averageValue: "30" + averageValue: "75" +``` + +### Threshold vs Capacity + +When using HPA, the Cluster CR's `spec.targetDistribution.perPodCapacity` acts +as a hard assignment ceiling — the operator never assigns more than +`perPodCapacity` targets to a single pod. The HPA **averageValue** (the scaling +threshold) should be set **lower** than capacity to create a buffer zone that +gives new pods time to start: + ``` +0 ─────── HPA threshold ─────── Capacity + (scale trigger) (assignment stops) +``` + +1. When the average target count crosses the HPA threshold, HPA increases + `.spec.replicas`. +2. While the new pod is starting, existing pods continue receiving targets up + to `capacity`. +3. If all pods reach `capacity` before the new pod is ready, overflow targets + remain unassigned until the next reconciliation. The Cluster status reports + the count via `status.unassignedTargets` and the `CapacityExhausted` + condition. + +**Sizing guidance** — set the HPA threshold to ~70–80% of capacity: + +| Cluster Capacity | HPA averageValue | Headroom per pod | +|---|---|---| +| 50 | 35 | 15 (30%) | +| 100 | 75 | 25 (25%) | +| 200 | 150 | 50 (25%) | + +For bursty workloads (e.g., many targets appearing at once via +`TunnelTargetPolicy`), use a wider buffer (lower threshold-to-capacity ratio). + +### Monitoring Capacity + +When targets exceed the total cluster capacity, the Cluster status makes this +visible: + +```bash +kubectl get clusters +``` + +``` +NAME IMAGE REPLICAS READY PIPELINES TARGETS UNASSIGNED SUBS INPUTS OUTPUTS +c1 ... 3 3 2 100 4 5 2 3 +``` + +The `CapacityExhausted` condition provides detail: + +```bash +kubectl describe cluster c1 +``` + +``` +Conditions: + Type Status Reason Message + CapacityExhausted True InsufficientCapacity 4 targets could not be assigned, all pods at capacity +``` + +Once HPA scales up and all targets are assigned, the condition clears +automatically. + +### Scaling based on CPU/Memory + +You can also use resource-based metrics: + +```yaml +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: gnmic-c1-hpa +spec: + scaleTargetRef: + apiVersion: operator.gnmic.dev/v1alpha1 + kind: Cluster + name: c1 + minReplicas: 1 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 +``` + +> **Note:** You must install the Kubernetes metrics server for CPU/Memory-based HPA: +> +> ```shell +> kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml +> ``` + +Target-count-based scaling is recommended over CPU/Memory because gRPC +streaming connections don't always correlate with CPU utilization. ## Considerations @@ -222,4 +307,3 @@ gNMIc pods are stateless by design: - No persistent volumes required - Configuration comes from operator via REST API - Targets can move between pods without data loss - diff --git a/docs/content/docs/advanced/target-distribution.md b/docs/content/docs/advanced/target-distribution.md index b7cace1..12bf4fa 100644 --- a/docs/content/docs/advanced/target-distribution.md +++ b/docs/content/docs/advanced/target-distribution.md @@ -20,7 +20,10 @@ The operator uses **bounded load rendezvous hashing**, which combines two techni ## How It Works -### Step 1: Calculate Capacity +### Step 1: Determine Capacity + +If the Cluster CR specifies `spec.targetDistribution.perPodCapacity`, that value +is used as a fixed ceiling. Otherwise capacity is calculated automatically: ``` capacity = ceil(numTargets / numPods) @@ -28,22 +31,42 @@ capacity = ceil(numTargets / numPods) Example: 10 targets, 3 pods → capacity = 4 -### Step 2: Sort Targets +A fixed `perPodCapacity` is useful when combined with [autoscaling](../scaling/) +— it sets a hard ceiling per pod so HPA has time to add replicas before pods are +full. + +### Step 2: Preserve Current Assignments + +If target status already records which pod each target is on (the **current +assignment**), the algorithm keeps those assignments as long as the pod is still +present and under capacity. + +When a pod has more pre-assigned targets than its capacity allows (e.g., after a +scale-down or capacity reduction), targets with the **lowest** hash score for +that pod are displaced first. This ensures deterministic selection of which +targets stay. -Targets are processed in alphabetical order for determinism: +### Step 3: Sort Remaining Targets + +Unassigned targets are processed in alphabetical order for determinism: ``` [target1, target10, target2, target3, ...] ``` -### Step 3: Assign Each Target +### Step 4: Assign Each Target -For each target: +For each unassigned target: 1. Calculate a score against each pod: `hash(targetName + podIndex)` 2. Sort pods by score (highest first) -3. Assign to highest-scoring pod that has capacity +3. Assign to highest-scoring pod that still has capacity + +If no pod has capacity, the target is left unassigned until the next +reconciliation (e.g., after HPA scales up a new replica). The Cluster CR status +reports the number of unassigned targets via the `unassignedTargets` field and +the `CapacityExhausted` condition. -### Step 4: Track Load +### Step 5: Track Load After each assignment, increment the pod's load count. When a pod reaches capacity, it's skipped for future assignments. @@ -51,7 +74,12 @@ After each assignment, increment the pod's load count. When a pod reaches capaci ### Stability -The same target gets the same score for each pod across reconciliations. Unless capacity constraints force a change, targets stay on their assigned pods. +The same target gets the same score for each pod across reconciliations. Unless +capacity constraints force a change, targets stay on their assigned pods. + +When current assignments are available, targets are kept on their existing pod +without recomputing scores — only truly unassigned targets go through the +hashing step. ``` # Before scaling: router1 on pod0 @@ -73,11 +101,16 @@ With capacity = ceil(n/p), no pod can have more than `capacity` targets: ### Minimal Redistribution -When scaling: +Current assignment awareness keeps churn to the minimum required: + +**Adding a pod**: Existing targets stay on their current pods. Only unassigned +targets (or targets displaced by capacity limits) may land on the new pod. -**Adding a pod**: Only targets that score highest for the new pod will move. +**Removing a pod**: Only targets on the removed pod redistribute. Targets on +remaining pods stay put. -**Removing a pod**: Only targets on the removed pod redistribute. Targets on remaining pods stay put. +**Adding/removing a target**: Other targets' assignments are unaffected when +current assignments are provided. ## Visualization diff --git a/docs/content/docs/user-guide/cluster.md b/docs/content/docs/user-guide/cluster.md index 605db21..68fea83 100644 --- a/docs/content/docs/user-guide/cluster.md +++ b/docs/content/docs/user-guide/cluster.md @@ -41,6 +41,40 @@ spec: | `clientTLS.issuerRef` | string | No | | CertManager Issuer reference, used to sign the gNMI client certificates | | `clientTLS.bundleRef` | string | No | | ConfigMap reference, used to add gNMI client trust bundles to the POD (key=`ca.crt`) | | `clientTLS.useCSIDriver` | bool | No | | If true the gNMI client certificates are generated and mounted using CertManager CSI Driver | +| **Target Distribution** | | | | | +| `targetDistribution` | TargetDistributionConfig | No | | Target distribution configuration | +| `targetDistribution.perPodCapacity` | int | No | ceil(targets/pods) | Maximum number of targets assigned to a single pod | + +## Target Distribution + +By default, the operator distributes targets evenly across pods using bounded +load rendezvous hashing with an auto-calculated capacity of +`ceil(totalTargets / replicas)`. + +When using [Horizontal Pod Autoscaling]({{< ref "../advanced/scaling" >}}), set +an explicit `perPodCapacity` to create a hard ceiling per pod. This ensures the +operator stops assigning targets before pods are overloaded, giving HPA time to +scale up: + +```yaml +apiVersion: operator.gnmic.dev/v1alpha1 +kind: Cluster +metadata: + name: autoscaled-cluster +spec: + replicas: 3 + image: ghcr.io/openconfig/gnmic:latest + targetDistribution: + perPodCapacity: 100 +``` + +When total targets exceed `replicas × perPodCapacity`, overflow targets remain +unassigned. The Cluster status reports this via the `unassignedTargets` field +and the `CapacityExhausted` condition. + +See [Target Distribution]({{< ref "../advanced/target-distribution" >}}) for +details on the algorithm and [Scaling]({{< ref "../advanced/scaling" >}}) for +HPA threshold sizing guidance. ## Resource Configuration @@ -401,6 +435,7 @@ status: readyReplicas: 3 pipelinesCount: 2 targetsCount: 10 + unassignedTargets: 0 subscriptionsCount: 5 inputsCount: 1 outputsCount: 3 @@ -413,6 +448,10 @@ status: status: "True" reason: ConfigurationApplied message: "Configuration applied to 3 pods" + - type: CapacityExhausted + status: "False" + reason: SufficientCapacity + message: "All targets assigned" ``` ### Status Fields @@ -422,6 +461,7 @@ status: | `readyReplicas` | Number of pods that are ready | | `pipelinesCount` | Number of enabled pipelines using this cluster | | `targetsCount` | Total unique targets across all pipelines | +| `unassignedTargets` | Number of targets that could not be assigned due to capacity limits (0 when all fit) | | `subscriptionsCount` | Total unique subscriptions | | `inputsCount` | Total unique inputs | | `outputsCount` | Total unique outputs | @@ -434,6 +474,7 @@ status: | `Ready` | True when all replicas are ready and configured | | `CertificatesReady` | True when TLS certificates are issued (only present if TLS enabled) | | `ConfigApplied` | True when configuration is successfully applied to all pods | +| `CapacityExhausted` | True when some targets could not be assigned because all pods are at capacity | ## Scaling @@ -443,7 +484,10 @@ To scale the cluster, update the `replicas` field: kubectl patch cluster telemetry-cluster --type merge -p '{"spec":{"replicas":5}}' ``` -Targets are automatically redistributed across pods when scaling. +Targets are automatically redistributed across pods when scaling. Existing +assignments are preserved — only targets from removed pods or unassigned targets +are placed on new pods. See [Scaling]({{< ref "../advanced/scaling" >}}) for +details on HPA integration and capacity planning. ## Example: Production Cluster diff --git a/internal/controller/cluster_controller.go b/internal/controller/cluster_controller.go index cbfe37a..24d7a1d 100644 --- a/internal/controller/cluster_controller.go +++ b/internal/controller/cluster_controller.go @@ -87,6 +87,8 @@ const ( ConditionTypeCertificatesReady = "CertificatesReady" // ConditionTypeConfigApplied indicates configuration was applied to pods ConditionTypeConfigApplied = "ConfigApplied" + // ConditionTypeCapacityExhausted indicates some targets could not be assigned + ConditionTypeCapacityExhausted = "CapacityExhausted" ) // Condition types for Pipeline status @@ -318,10 +320,13 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } // build pipeline data for the gNMIc plan builder - planBuilder := gnmic.NewPlanBuilder(r). - WithClientTLS( - gnmic.ClientTLSConfigForCluster(&cluster), - ) + planBuilder := gnmic.NewPlanBuilder(cluster.Name, r) + planBuilder = planBuilder.WithClientTLS( + gnmic.ClientTLSConfigForCluster(&cluster), + ) + if cluster.Spec.TargetDistribution != nil && cluster.Spec.TargetDistribution.PodCapacity > 0 { + planBuilder.WithTargetDistributionCapacity(cluster.Spec.TargetDistribution.PodCapacity) + } pipelineDataMap := make(map[string]*gnmic.PipelineData) for _, pipeline := range pipelines { @@ -339,7 +344,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } targetProfilesNames := make(map[string]struct{}) for _, target := range targets { - pipelineData.Targets[target.Namespace+gnmic.Delimiter+target.Name] = target.Spec + pipelineData.Targets[target.Namespace+gnmic.Delimiter+target.Name] = target targetProfilesNames[target.Spec.Profile] = struct{}{} } @@ -498,11 +503,13 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct numPods := int(desiredReplicas) configApplied := false var configError error - if err := r.applyConfigToPods(ctx, &cluster, applyPlan, numPods); err != nil { + var unassignedTargets int32 + if unassigned, err := r.applyConfigToPods(ctx, &cluster, applyPlan, numPods); err != nil { logger.Error(err, "failed to apply config to gNMIc pods") configError = err } else { configApplied = true + unassignedTargets = unassigned logger.Info("successfully applied config to gNMIc cluster", "pods", numPods) } @@ -538,6 +545,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct Selector: metav1.FormatLabelSelector(statefulSet.Spec.Selector), PipelinesCount: int32(len(pipelines)), TargetsCount: totalTargets, + UnassignedTargets: unassignedTargets, SubscriptionsCount: totalSubscriptions, InputsCount: totalInputs, OutputsCount: totalOutputs, @@ -606,6 +614,27 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } newStatus.Conditions = append(newStatus.Conditions, configCondition) + // capacityExhausted condition + if unassignedTargets > 0 { + newStatus.Conditions = append(newStatus.Conditions, metav1.Condition{ + Type: ConditionTypeCapacityExhausted, + Status: metav1.ConditionTrue, + ObservedGeneration: cluster.Generation, + LastTransitionTime: now, + Reason: "InsufficientCapacity", + Message: fmt.Sprintf("%d target(s) could not be assigned, all pods at capacity", unassignedTargets), + }) + } else if configApplied { + newStatus.Conditions = append(newStatus.Conditions, metav1.Condition{ + Type: ConditionTypeCapacityExhausted, + Status: metav1.ConditionFalse, + ObservedGeneration: cluster.Generation, + LastTransitionTime: now, + Reason: "SufficientCapacity", + Message: "All targets assigned", + }) + } + // preserve LastTransitionTime for unchanged conditions for i := range newStatus.Conditions { for _, oldCond := range cluster.Status.Conditions { @@ -636,6 +665,7 @@ func clusterStatusEqual(a, b gnmicv1alpha1.ClusterStatus) bool { if a.ReadyReplicas != b.ReadyReplicas || a.PipelinesCount != b.PipelinesCount || a.TargetsCount != b.TargetsCount || + a.UnassignedTargets != b.UnassignedTargets || a.SubscriptionsCount != b.SubscriptionsCount || a.InputsCount != b.InputsCount || a.OutputsCount != b.OutputsCount { @@ -655,8 +685,9 @@ func clusterStatusEqual(a, b gnmicv1alpha1.ClusterStatus) bool { return true } -// applyConfigToPods sends the apply plan to all gNMIc pods with distributed targets -func (r *ClusterReconciler) applyConfigToPods(ctx context.Context, cluster *gnmicv1alpha1.Cluster, plan *gnmic.ApplyPlan, numPods int) error { +// applyConfigToPods sends the apply plan to all gNMIc pods with distributed targets. +// Returns the number of targets that could not be assigned due to capacity limits. +func (r *ClusterReconciler) applyConfigToPods(ctx context.Context, cluster *gnmicv1alpha1.Cluster, plan *gnmic.ApplyPlan, numPods int) (int32, error) { logger := log.FromContext(ctx) stsName := fmt.Sprintf("%s%s", resourcePrefix, cluster.Name) @@ -668,13 +699,16 @@ func (r *ClusterReconciler) applyConfigToPods(ctx context.Context, cluster *gnmi // create an HTTP client to send the apply plan to the gNMIc pods httpClient, err := r.createHTTPClientForCluster(ctx, cluster) if err != nil { - return fmt.Errorf("failed to create HTTP client: %w", err) + return 0, fmt.Errorf("failed to create HTTP client: %w", err) } + distResult := gnmic.DistributeTargets(plan, numPods, cluster.Spec.TargetDistribution) // apply config to each pod with distributed targets for podIndex := 0; podIndex < numPods; podIndex++ { // distribute targets for this pod - distributedPlan := gnmic.DistributeTargets(plan, podIndex, numPods) - + podPlan, ok := distResult.PerPodPlans[podIndex] + if !ok { + continue + } // build the URL for this pod // statefulSet pods have predictable DNS names: // -...svc.cluster.local @@ -685,14 +719,19 @@ func (r *ClusterReconciler) applyConfigToPods(ctx context.Context, cluster *gnmi } url := fmt.Sprintf("%s://%s:%d/api/v1/config/apply", scheme, podDNS, restPort) logger.Info("sending config to gNMIc pod", "url", url) - if err := r.sendApplyRequest(ctx, url, distributedPlan, httpClient); err != nil { - return fmt.Errorf("failed to apply config to pod %d: %w", podIndex, err) + if err := r.sendApplyRequest(ctx, url, podPlan, httpClient); err != nil { + return 0, fmt.Errorf("failed to apply config to pod %d: %w", podIndex, err) } - logger.Info("config applied to pod", "pod", podIndex, "targets", len(distributedPlan.Targets)) + logger.Info("config applied to pod", "pod", podIndex, "targets", len(podPlan.Targets)) } - return nil + unassigned := int32(len(distResult.UnassignedTargets)) + if unassigned > 0 { + logger.Info("targets unassigned due to capacity limits", "count", unassigned) + } + + return unassigned, nil } func (r *ClusterReconciler) createHTTPClientForCluster(ctx context.Context, cluster *gnmicv1alpha1.Cluster) (*http.Client, error) { diff --git a/internal/controller/targetstate_controller.go b/internal/controller/targetstate_controller.go index d9f1154..21e440d 100644 --- a/internal/controller/targetstate_controller.go +++ b/internal/controller/targetstate_controller.go @@ -121,6 +121,9 @@ func (r *TargetStateReconciler) Reconcile(ctx context.Context, req ctrl.Request) // with the latest cluster config (port, TLS, image, etc.) r.stopStreamsForCluster(cluster.Namespace, cluster.Name) + // clean up target status entries that reference pods beyond the desired count + r.removeStalePodsFromTargets(ctx, cluster.Namespace, cluster.Name, stsName, desiredPods) + // start a stream for each pod for i := 0; i < desiredPods; i++ { key := streamKey(cluster.Namespace, cluster.Name, i) @@ -492,6 +495,62 @@ func (r *TargetStateReconciler) removeClusterFromTargets(ctx context.Context, na } } +// removeStalePodsFromTargets clears ClusterStates entries that reference pods +// with an index >= desiredPods (i.e., pods removed during scale-down). +func (r *TargetStateReconciler) removeStalePodsFromTargets(ctx context.Context, namespace, clusterName, stsName string, desiredPods int) { + logger := log.FromContext(ctx).WithValues("controller", "TargetState") + + // build the set of valid pod names + validPods := make(map[string]struct{}, desiredPods) + for i := 0; i < desiredPods; i++ { + validPods[fmt.Sprintf("%s-%d", stsName, i)] = struct{}{} + } + + var targets gnmicv1alpha1.TargetList + if err := r.List(ctx, &targets, client.InNamespace(namespace)); err != nil { + logger.Error(err, "failed to list targets for stale pod cleanup") + return + } + + for i := range targets.Items { + target := &targets.Items[i] + if target.Status.ClusterStates == nil { + continue + } + cs, ok := target.Status.ClusterStates[clusterName] + if !ok { + continue + } + if _, valid := validPods[cs.Pod]; valid { + continue + } + + for attempt := 0; attempt < maxConflictRetries; attempt++ { + if attempt > 0 { + if err := r.Get(ctx, types.NamespacedName{Name: target.Name, Namespace: target.Namespace}, target); err != nil { + if !apierrors.IsNotFound(err) { + logger.Error(err, "failed to re-fetch target for stale pod cleanup", "target", target.Name) + } + break + } + } + + delete(target.Status.ClusterStates, clusterName) + computeStatusSummary(&target.Status) + + if err := r.Status().Update(ctx, target); err != nil { + if apierrors.IsConflict(err) { + continue + } + logger.Error(err, "failed to remove stale pod state from target", "target", target.Name, "cluster", clusterName, "pod", cs.Pod) + break + } + logger.Info("removed stale pod state from target", "target", target.Name, "cluster", clusterName, "pod", cs.Pod) + break + } + } +} + func (r *TargetStateReconciler) podBaseURL(cluster *gnmicv1alpha1.Cluster, stsName string, podIndex int) string { restPort := int32(defaultRestPort) if cluster.Spec.API != nil && cluster.Spec.API.RestPort != 0 { diff --git a/internal/gnmic/distribute.go b/internal/gnmic/distribute.go index 1fb8653..230ddae 100644 --- a/internal/gnmic/distribute.go +++ b/internal/gnmic/distribute.go @@ -1,146 +1,69 @@ package gnmic import ( - "fmt" - "hash/fnv" "sort" + "github.com/gnmic/operator/api/v1alpha1" gapi "github.com/openconfig/gnmic/pkg/api/types" ) -// DistributeTargets creates a copy of the apply plan with only the targets -// assigned to the specified pod index. Other resources (subscriptions, outputs, -// inputs, processors, tunnel-target-matches) are included in full for all pods. -// -// Uses bounded load rendezvous hashing for stable AND even distribution: -// - Targets are assigned to pods based on highest hash score -// - Each pod has a capacity limit of ceil(numTargets/numPods) -// - When a pod is full, the target goes to the next highest scoring pod -func DistributeTargets(plan *ApplyPlan, podIndex, numPods int) *ApplyPlan { +// DistributeResult holds the per-pod plans and any targets that could not be +// assigned due to capacity limits. +type DistributeResult struct { + PerPodPlans map[int]*ApplyPlan + UnassignedTargets []string +} + +func DistributeTargets(plan *ApplyPlan, numPods int, targetDistribution *v1alpha1.TargetDistributionConfig) *DistributeResult { if numPods <= 0 { numPods = 1 } - if podIndex < 0 || podIndex >= numPods { - podIndex = 0 - } - - // create a new plan with the same subscriptions, outputs, inputs, processors, tunnel-target-matches - distributed := &ApplyPlan{ - Targets: make(map[string]*gapi.TargetConfig), - Subscriptions: plan.Subscriptions, - Outputs: plan.Outputs, - Inputs: plan.Inputs, - Processors: plan.Processors, - TunnelTargetMatches: plan.TunnelTargetMatches, - } - - // get all assignments using bounded load rendezvous hashing - assignments := boundedRendezvousAssign(plan.Targets, numPods) - - // filter to only targets assigned to this pod - for targetNN, assignedPod := range assignments { - if assignedPod == podIndex { - distributed.Targets[targetNN] = plan.Targets[targetNN] + currentAssignment := Assignment{} + if plan.CurrentTargetAssignment != nil { + for podIndex, targets := range plan.CurrentTargetAssignment { + for targetNN := range targets { + currentAssignment[podIndex] = append(currentAssignment[podIndex], targetNN) + } + sort.Strings(currentAssignment[podIndex]) } } - - return distributed -} - -// boundedRendezvousAssign assigns targets to pods using bounded load rendezvous hashing. -// returns a map of targetNN -> podIndex -func boundedRendezvousAssign(targets map[string]*gapi.TargetConfig, numPods int) map[string]int { - numTargets := len(targets) - if numTargets == 0 { - return make(map[string]int) - } - - // calculate capacity per pod: ceil(numTargets/numPods) - // this ensures distribution differs by at most 1 between pods - capacity := (numTargets + numPods - 1) / numPods - - // sort target names for deterministic assignment order - sortedTargets := make([]string, 0, numTargets) - for targetNN := range targets { - sortedTargets = append(sortedTargets, targetNN) - } - sort.Strings(sortedTargets) - - // track load per pod - podLoad := make([]int, numPods) - assignments := make(map[string]int, numTargets) - - // assign each target to its highest-scoring pod that has capacity - for _, targetNN := range sortedTargets { - assignedPod := boundedRendezvousHash(targetNN, numPods, capacity, podLoad) - assignments[targetNN] = assignedPod - podLoad[assignedPod]++ + placement := New(PlacementStrategyBoundedHashing) + placementOptions := &PlacementStrategyOpts{ + NumPods: numPods, + CurrentAssignment: currentAssignment, } - - return assignments -} - -type podScore struct { - index int - score uint32 -} - -// boundedRendezvousHash returns the pod index with the highest score that has capacity. -func boundedRendezvousHash(targetNN string, numPods, capacity int, podLoad []int) int { - scores := make([]podScore, numPods) - for i := range numPods { - scores[i] = podScore{index: i, score: hashScore(targetNN, i)} + if targetDistribution != nil { + placementOptions.Capacity = targetDistribution.PodCapacity } - sort.Slice(scores, func(i, j int) bool { - if scores[i].score == scores[j].score { - return scores[i].index < scores[j].index + newAssignment := placement.distributeTargets(plan.Targets, placementOptions) + + assigned := make(map[string]struct{}) + result := make(map[int]*ApplyPlan) + for podIndex, targets := range newAssignment { + result[podIndex] = &ApplyPlan{ + Targets: make(map[string]*gapi.TargetConfig), + Subscriptions: plan.Subscriptions, + Outputs: plan.Outputs, + Inputs: plan.Inputs, + Processors: plan.Processors, + TunnelTargetMatches: plan.TunnelTargetMatches, } - return scores[i].score > scores[j].score - }) - - // find the highest-scoring pod with capacity - for _, ps := range scores { - if podLoad[ps.index] < capacity { - return ps.index + for _, targetNN := range targets { + result[podIndex].Targets[targetNN] = plan.Targets[targetNN] + assigned[targetNN] = struct{}{} } } - // fallback (shouldn't happen with proper capacity) - return scores[0].index -} - -// hashScore computes a deterministic score for a target-pod pair -func hashScore(targetNN string, podIndex int) uint32 { - h := fnv.New32a() - fmt.Fprintf(h, "%s:%d", targetNN, podIndex) - return h.Sum32() -} - -// getTargetAssignments returns a map of podIndex -> list of targetNNs. -// used in tests -func getTargetAssignments(targetNNs []string, numPods int) map[int][]string { - // build a fake targets map - targets := make(map[string]*gapi.TargetConfig, len(targetNNs)) - for _, nn := range targetNNs { - targets[nn] = &gapi.TargetConfig{} - } - - // get assignments - assignments := boundedRendezvousAssign(targets, numPods) - - // convert to pod -> targets format - result := make(map[int][]string) - for i := range numPods { - result[i] = []string{} - } - for targetNN, podIndex := range assignments { - result[podIndex] = append(result[podIndex], targetNN) + var unassigned []string + for targetNN := range plan.Targets { + if _, ok := assigned[targetNN]; !ok { + unassigned = append(unassigned, targetNN) + } } + sort.Strings(unassigned) - // sort each pod's targets for consistent output - for i := range result { - sort.Strings(result[i]) + return &DistributeResult{ + PerPodPlans: result, + UnassignedTargets: unassigned, } - - return result } diff --git a/internal/gnmic/distribute_test.go b/internal/gnmic/distribute_test.go index 7888b7b..b4032e4 100644 --- a/internal/gnmic/distribute_test.go +++ b/internal/gnmic/distribute_test.go @@ -5,6 +5,7 @@ import ( "sort" "testing" + "github.com/gnmic/operator/api/v1alpha1" gapi "github.com/openconfig/gnmic/pkg/api/types" ) @@ -32,13 +33,15 @@ func TestDistributeTargets(t *testing.T) { numPods := 3 // distribute targets across pods - distributedPlans := make([]*ApplyPlan, numPods) + distResult := DistributeTargets(plan, numPods, &v1alpha1.TargetDistributionConfig{}) for i := 0; i < numPods; i++ { - distributedPlans[i] = DistributeTargets(plan, i, numPods) - } + dp, ok := distResult.PerPodPlans[i] + if !ok { + t.Errorf("failed to get distributed plan for pod %d", i) + continue + } - // verify each pod gets subscriptions, outputs, inputs - for i, dp := range distributedPlans { + // verify each pod gets subscriptions, outputs, inputs if len(dp.Subscriptions) != 1 { t.Errorf("pod %d: expected 1 subscription, got %d", i, len(dp.Subscriptions)) } @@ -50,9 +53,13 @@ func TestDistributeTargets(t *testing.T) { } } + if len(distResult.UnassignedTargets) != 0 { + t.Errorf("expected no unassigned targets, got %v", distResult.UnassignedTargets) + } + // verify all targets are distributed (no duplicates, no missing) allTargets := make(map[string]int) // targetNN -> count - for i, dp := range distributedPlans { + for i, dp := range distResult.PerPodPlans { t.Logf("pod %d targets: %v", i, keys(dp.Targets)) for targetNN := range dp.Targets { allTargets[targetNN]++ @@ -88,8 +95,8 @@ func TestDistributeTargetsDeterministic(t *testing.T) { // run distribution multiple times for run := 0; run < 10; run++ { - plan1 := DistributeTargets(plan, 0, numPods) - plan2 := DistributeTargets(plan, 0, numPods) + plan1 := DistributeTargets(plan, numPods, &v1alpha1.TargetDistributionConfig{}).PerPodPlans[0] + plan2 := DistributeTargets(plan, numPods, &v1alpha1.TargetDistributionConfig{}).PerPodPlans[0] // should get the same targets each time if len(plan1.Targets) != len(plan2.Targets) { @@ -109,6 +116,7 @@ func TestDistributeTargetsSinglePod(t *testing.T) { Targets: map[string]*gapi.TargetConfig{ "default/target1": {Name: "target1"}, "default/target2": {Name: "target2"}, + "default/target3": {Name: "target3"}, }, Subscriptions: map[string]*gapi.SubscriptionConfig{}, Outputs: map[string]map[string]any{}, @@ -116,8 +124,8 @@ func TestDistributeTargetsSinglePod(t *testing.T) { } // single pod should get all targets - distributed := DistributeTargets(plan, 0, 1) - if len(distributed.Targets) != 2 { + distributed := DistributeTargets(plan, 1, &v1alpha1.TargetDistributionConfig{}).PerPodPlans[0] + if len(distributed.Targets) != 3 { t.Errorf("single pod should get all targets, got %d", len(distributed.Targets)) } } @@ -133,19 +141,68 @@ func TestDistributeTargetsEdgeCases(t *testing.T) { } // invalid numPods should default to 1 - distributed := DistributeTargets(plan, 0, 0) + distributed := DistributeTargets(plan, 0, &v1alpha1.TargetDistributionConfig{}).PerPodPlans[0] if len(distributed.Targets) != 1 { t.Errorf("zero pods should default to 1, got %d targets", len(distributed.Targets)) } + // invalid numPods should default to 1 + distributed = DistributeTargets(plan, -1, &v1alpha1.TargetDistributionConfig{}).PerPodPlans[0] + if len(distributed.Targets) != 1 { + t.Errorf("negative pods should default to 1, got %d targets", len(distributed.Targets)) + } +} - // invalid podIndex should default to 0 - distributed = DistributeTargets(plan, -1, 2) - // just verify it doesn't panic - t.Logf("negative podIndex: %d targets", len(distributed.Targets)) +func TestDistributeTargets_CapacityOverflow(t *testing.T) { + plan := &ApplyPlan{ + Targets: map[string]*gapi.TargetConfig{ + "default/target1": {Name: "target1"}, + "default/target2": {Name: "target2"}, + "default/target3": {Name: "target3"}, + "default/target4": {Name: "target4"}, + "default/target5": {Name: "target5"}, + "default/target6": {Name: "target6"}, + "default/target7": {Name: "target7"}, + "default/target8": {Name: "target8"}, + "default/target9": {Name: "target9"}, + "default/target10": {Name: "target10"}, + }, + Subscriptions: map[string]*gapi.SubscriptionConfig{}, + Outputs: map[string]map[string]any{}, + Inputs: map[string]map[string]any{}, + } + + // 10 targets, 2 pods, capacity=3 → total capacity=6, so 4 targets unassigned + distResult := DistributeTargets(plan, 2, &v1alpha1.TargetDistributionConfig{ + PodCapacity: 3, + }) + + totalAssigned := 0 + for _, dp := range distResult.PerPodPlans { + if len(dp.Targets) > 3 { + t.Errorf("pod exceeds capacity 3, has %d targets", len(dp.Targets)) + } + totalAssigned += len(dp.Targets) + } - distributed = DistributeTargets(plan, 5, 2) - // should default to pod 0 - t.Logf("out of range podIndex: %d targets", len(distributed.Targets)) + if totalAssigned != 6 { + t.Errorf("expected 6 assigned targets, got %d", totalAssigned) + } + if len(distResult.UnassignedTargets) != 4 { + t.Errorf("expected 4 unassigned targets, got %d: %v", len(distResult.UnassignedTargets), distResult.UnassignedTargets) + } + + t.Logf("assigned: %d, unassigned: %v", totalAssigned, distResult.UnassignedTargets) +} + +// getTargetAssignments returns a map of podIndex -> list of targetNNs. +// used in tests +func getTargetAssignments(targetNNs []string, numPods int) Assignment { + // build a fake targets map + targets := make(map[string]*gapi.TargetConfig, len(targetNNs)) + for _, nn := range targetNNs { + targets[nn] = &gapi.TargetConfig{} + } + return boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{NumPods: numPods}) } func TestGetTargetAssignments(t *testing.T) { diff --git a/internal/gnmic/placement.go b/internal/gnmic/placement.go new file mode 100644 index 0000000..f39d89c --- /dev/null +++ b/internal/gnmic/placement.go @@ -0,0 +1,36 @@ +package gnmic + +import ( + gapi "github.com/openconfig/gnmic/pkg/api/types" +) + +type placementStrategy interface { + distributeTargets(targets map[string]*gapi.TargetConfig, options *PlacementStrategyOpts) Assignment +} + +type PlacementStrategyOpts struct { + // Strategy to use for placement + // if not set, the default strategy will be used + Strategy PlacementStrategyType `json:"strategy,omitempty"` + // Number of pods to distribute targets to + // if not set, the number of pods in the cluster is used + NumPods int `json:"numPods,omitempty"` + // Capacity per pod + // if not set, ceil(target/numPods) is used + Capacity int `json:"capacity,omitempty"` + // Current assignment of targets to pods + // if not set, it is assumed that there is no current assignment + CurrentAssignment Assignment `json:"currentAssignment,omitempty"` +} + +func New(strategy PlacementStrategyType) placementStrategy { + switch strategy { + case PlacementStrategyBoundedHashing: + return &blrh{} + default: + return &blrh{} + } +} + +// TargetToPodAssignment is a map of pod index to list of target names +type Assignment map[int][]string diff --git a/internal/gnmic/placement_blrh.go b/internal/gnmic/placement_blrh.go new file mode 100644 index 0000000..9d15d9a --- /dev/null +++ b/internal/gnmic/placement_blrh.go @@ -0,0 +1,149 @@ +package gnmic + +import ( + "fmt" + "hash/fnv" + "sort" + + gapi "github.com/openconfig/gnmic/pkg/api/types" +) + +type PlacementStrategyType string + +const ( + PlacementStrategyBoundedHashing PlacementStrategyType = "boundedLoadHashing" +) + +// bounded load rendezvous hashing placement implementation +type blrh struct { +} + +// podScore is a struct to store the index and score (hash value) of a pod for a target +type podScore struct { + index int + score uint64 +} + +func (p *blrh) distributeTargets(targets map[string]*gapi.TargetConfig, options *PlacementStrategyOpts) Assignment { + opts := normalizeOptions(options) + return boundedLoadRendezvousHash(targets, &opts) +} + +func normalizeOptions(opts *PlacementStrategyOpts) PlacementStrategyOpts { + if opts == nil { + return PlacementStrategyOpts{ + Strategy: PlacementStrategyBoundedHashing, + NumPods: 1, + } + } + n := *opts + if n.NumPods <= 0 { + n.NumPods = 1 + } + return n +} + +func (p *blrh) String() string { + return string(PlacementStrategyBoundedHashing) +} + +func boundedLoadRendezvousHash(targets map[string]*gapi.TargetConfig, options *PlacementStrategyOpts) Assignment { + numTargets := len(targets) + if numTargets == 0 { + return make(Assignment) + } + capacity := options.Capacity + if capacity == 0 { + // calculate capacity per pod: ceil(numTargets/numPods) + // this ensures distribution differs by at most 1 between pods + capacity = (numTargets + options.NumPods - 1) / options.NumPods + } + + // sort target names for deterministic assignment order + sortedTargets := make([]string, 0, numTargets) + for targetNN := range targets { + sortedTargets = append(sortedTargets, targetNN) + } + sort.Strings(sortedTargets) + + assignments := make(Assignment, options.NumPods) + + // keep track of pre-assigned targets to avoid re-assigning them + var preAssignedTargets = make(map[string]struct{}) + + // keep current assignment if it exists + if options.CurrentAssignment != nil { + for podIndex, targets := range options.CurrentAssignment { + if podIndex >= options.NumPods { + continue + } + // sort by hash score to make sure we skip targets with the lower scores if we have to + // move existing targets from a pod that is already at capacity. + sort.Slice(targets, func(i, j int) bool { + hash1 := hashScore(targets[i], podIndex) + hash2 := hashScore(targets[j], podIndex) + return hash1 > hash2 + }) + for _, targetNN := range targets { + if assignments[podIndex] == nil { + assignments[podIndex] = make([]string, 0, 1) + } + if len(assignments[podIndex]) >= capacity { + // do not keep pre-assigned targets in pods that are already at capacity + continue + } + // add existing target to new assignment + assignments[podIndex] = append(assignments[podIndex], targetNN) + preAssignedTargets[targetNN] = struct{}{} + } + } + } + + // assign each target to its highest-scoring pod that has capacity + for _, targetNN := range sortedTargets { + // skip pre-assigned targets + if _, ok := preAssignedTargets[targetNN]; ok { + continue + } + assignedPod := boundedRendezvousHash(targetNN, options.NumPods, capacity, assignments) + if assignedPod == nil { + continue + } + assignments[*assignedPod] = append(assignments[*assignedPod], targetNN) + } + + return assignments +} + +// boundedRendezvousHash returns the pod index with the highest score that has capacity. +func boundedRendezvousHash(targetNN string, numPods, capacity int, assignments Assignment) *int { + scores := make([]podScore, numPods) + for i := range numPods { + scores[i] = podScore{index: i, score: hashScore(targetNN, i)} + } + sort.Slice(scores, func(i, j int) bool { + if scores[i].score == scores[j].score { + return scores[i].index < scores[j].index + } + return scores[i].score > scores[j].score + }) + + // find the highest-scoring pod with capacity + for _, ps := range scores { + if len(assignments[ps.index]) < capacity { + return &ps.index + } + } + + // fallback (shouldn't happen with proper capacity) + // return &scores[0].index + // no pods with capacity found, return nil + return nil +} + +// hashScore computes a deterministic score for a target-pod pair +func hashScore(targetNN string, podIndex int) uint64 { + h := fnv.New64a() + fmt.Fprintf(h, "%s:%d", targetNN, podIndex) + return h.Sum64() +} diff --git a/internal/gnmic/placement_blrh_test.go b/internal/gnmic/placement_blrh_test.go new file mode 100644 index 0000000..e482c23 --- /dev/null +++ b/internal/gnmic/placement_blrh_test.go @@ -0,0 +1,449 @@ +package gnmic + +import ( + "fmt" + "sort" + "testing" + + gapi "github.com/openconfig/gnmic/pkg/api/types" +) + +type assigmentResult struct { + podIndex int + targets []string +} + +func Test_boundedLoadRendezvousHash(t *testing.T) { + tests := []struct { + name string + targets map[string]*gapi.TargetConfig + options *PlacementStrategyOpts + }{ + { + name: "targets=10/numPods=3", + targets: genTargets(10), + options: &PlacementStrategyOpts{NumPods: 3}, + }, + { + name: "targets=100/numPods=5", + targets: genTargets(100), + options: &PlacementStrategyOpts{NumPods: 5}, + }, + { + name: "targets=1000/numPods=3", + targets: genTargets(1000), + options: &PlacementStrategyOpts{NumPods: 3}, + }, + { + name: "targets=1000/numPods=5", + targets: genTargets(1000), + options: &PlacementStrategyOpts{NumPods: 5}, + }, + { + name: "targets=1000/numPods=10", + targets: genTargets(1000), + options: &PlacementStrategyOpts{NumPods: 10}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := boundedLoadRendezvousHash(tt.targets, tt.options) + + assertAllTargetsAssignedExactlyOnce(t, tt.targets, got) + assertCapacityRespected(t, len(tt.targets), tt.options.NumPods, got) + + // determinism: running again with same inputs must yield identical result + got2 := boundedLoadRendezvousHash(tt.targets, tt.options) + moved := countMovedTargets(got, got2) + if moved != 0 { + t.Errorf("non-deterministic: %d targets moved on re-run", moved) + } + + rs := make([]assigmentResult, 0, len(got)) + for podIndex, targets := range got { + rs = append(rs, assigmentResult{podIndex: podIndex, targets: targets}) + } + sort.Slice(rs, func(i, j int) bool { + return rs[i].podIndex < rs[j].podIndex + }) + for _, r := range rs { + t.Logf("pod %d: %d targets", r.podIndex, len(r.targets)) + } + }) + } +} + +func genTargets(n int) map[string]*gapi.TargetConfig { + out := make(map[string]*gapi.TargetConfig, n) + for i := 1; i <= n; i++ { + name := fmt.Sprintf("target-%02d", i) + out[name] = &gapi.TargetConfig{ + Name: name, + Address: fmt.Sprintf("%s:57400", name), + } + } + return out +} + +func TestBoundedLoadRendezvousHash_ChurnAcrossScalingAndTargetAddition(t *testing.T) { + targets := genTargets(35) + + type scaleStep struct { + numPods int + } + + steps := []scaleStep{ + {numPods: 1}, + {numPods: 2}, + {numPods: 3}, + {numPods: 4}, + {numPods: 5}, + {numPods: 6}, + {numPods: 7}, + {numPods: 8}, + {numPods: 9}, + {numPods: 10}, + } + + var prev Assignment + for i, step := range steps { + opts := &PlacementStrategyOpts{ + NumPods: step.numPods, + } + curr := boundedLoadRendezvousHash(targets, opts) + + // sanity checks + assertAllTargetsAssignedExactlyOnce(t, targets, curr) + assertCapacityRespected(t, len(targets), step.numPods, curr) + + if i == 0 { + t.Logf("pods=%d targets=%d moved=N/A (initial placement)", step.numPods, len(targets)) + } else { + moved := countMovedTargets(prev, curr) + t.Logf("pods=%d targets=%d moved=%d", step.numPods, len(targets), moved) + } + prev = curr + } + + // Add one target and measure churn among existing targets. + targetsPlusOne := genTargets(36) + opts := &PlacementStrategyOpts{ + NumPods: 10, + } + withOneMore := boundedLoadRendezvousHash(targetsPlusOne, opts) + + assertAllTargetsAssignedExactlyOnce(t, targetsPlusOne, withOneMore) + assertCapacityRespected(t, len(targetsPlusOne), opts.NumPods, withOneMore) + + movedExisting := countMovedTargetsForTargetSet(prev, withOneMore, targetNames(targets)) + t.Logf("pods=%d targets=%d->%d existing_targets_moved=%d new_target_assigned=%t", + opts.NumPods, + len(targets), + len(targetsPlusOne), + movedExisting, + isTargetAssigned(withOneMore, "target-36"), + ) +} + +func targetNames(targets map[string]*gapi.TargetConfig) []string { + names := make([]string, 0, len(targets)) + for name := range targets { + names = append(names, name) + } + sort.Strings(names) + return names +} + +func assertAllTargetsAssignedExactlyOnce(t *testing.T, targets map[string]*gapi.TargetConfig, a Assignment) { + t.Helper() + + seen := make(map[string]int, len(targets)) + for _, names := range a { + for _, name := range names { + seen[name]++ + } + } + + if len(seen) != len(targets) { + t.Fatalf("expected %d assigned targets, got %d", len(targets), len(seen)) + } + + for name := range targets { + if seen[name] != 1 { + t.Fatalf("target %q assigned %d times, want exactly once", name, seen[name]) + } + } +} + +func assertCapacityRespected(t *testing.T, numTargets, numPods int, a Assignment) { + t.Helper() + + capacity := (numTargets + numPods - 1) / numPods + for pod, names := range a { + if len(names) > capacity { + t.Fatalf("pod %d has %d targets, exceeds capacity %d", pod, len(names), capacity) + } + } +} + +func countMovedTargets(prev, curr Assignment) int { + prevIndex := invertAssignment(prev) + currIndex := invertAssignment(curr) + + moved := 0 + for target, prevPod := range prevIndex { + currPod, ok := currIndex[target] + if !ok { + continue + } + if currPod != prevPod { + moved++ + } + } + return moved +} + +func countMovedTargetsForTargetSet(prev, curr Assignment, targets []string) int { + prevIndex := invertAssignment(prev) + currIndex := invertAssignment(curr) + + moved := 0 + for _, target := range targets { + prevPod, okPrev := prevIndex[target] + currPod, okCurr := currIndex[target] + if !okPrev || !okCurr { + continue + } + if prevPod != currPod { + moved++ + } + } + return moved +} + +func invertAssignment(a Assignment) map[string]int { + out := make(map[string]int) + for pod, names := range a { + for _, name := range names { + out[name] = pod + } + } + return out +} + +func isTargetAssigned(a Assignment, target string) bool { + for _, names := range a { + for _, name := range names { + if name == target { + return true + } + } + } + return false +} + +func TestBoundedLoadRendezvousHash_CurrentAssignment_Stable(t *testing.T) { + targets := genTargets(30) + numPods := 5 + + // compute a fresh assignment + fresh := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{NumPods: numPods}) + assertAllTargetsAssignedExactlyOnce(t, targets, fresh) + assertCapacityRespected(t, len(targets), numPods, fresh) + + // feed the fresh result back as CurrentAssignment + withCurrent := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{ + NumPods: numPods, + CurrentAssignment: fresh, + }) + assertAllTargetsAssignedExactlyOnce(t, targets, withCurrent) + assertCapacityRespected(t, len(targets), numPods, withCurrent) + + moved := countMovedTargets(fresh, withCurrent) + if moved != 0 { + t.Errorf("re-running with same CurrentAssignment should move 0 targets, moved %d", moved) + } +} + +func TestBoundedLoadRendezvousHash_CurrentAssignment_ScaleUp(t *testing.T) { + targets := genTargets(30) + + // initial: 5 pods + initial := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{NumPods: 5}) + assertAllTargetsAssignedExactlyOnce(t, targets, initial) + + // scale to 6 pods, passing initial as CurrentAssignment + scaled := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{ + NumPods: 6, + CurrentAssignment: initial, + }) + assertAllTargetsAssignedExactlyOnce(t, targets, scaled) + assertCapacityRespected(t, len(targets), 6, scaled) + + moved := countMovedTargets(initial, scaled) + t.Logf("scale 5->6 pods, %d targets moved", moved) + + // new pod 5 should have some targets + if len(scaled[5]) == 0 { + t.Error("new pod 5 should have received targets after scale-up") + } + + // compare to a fresh assignment without CurrentAssignment + freshScaled := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{NumPods: 6}) + movedFresh := countMovedTargets(initial, freshScaled) + t.Logf("scale 5->6 without CurrentAssignment: %d targets moved", movedFresh) + + // using CurrentAssignment should move fewer (or equal) targets than a fresh recompute + if moved > movedFresh { + t.Errorf("CurrentAssignment should reduce churn: moved %d vs fresh %d", moved, movedFresh) + } +} + +func TestBoundedLoadRendezvousHash_CurrentAssignment_ScaleDown(t *testing.T) { + targets := genTargets(30) + + // initial: 6 pods + initial := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{NumPods: 6}) + assertAllTargetsAssignedExactlyOnce(t, targets, initial) + + // scale down to 4 pods -- remove pods 4 and 5 + // only pass assignments for pods 0-3 as CurrentAssignment + remaining := make(Assignment) + for pod, tgts := range initial { + if pod < 4 { + remaining[pod] = tgts + } + } + + scaled := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{ + NumPods: 4, + CurrentAssignment: remaining, + }) + assertAllTargetsAssignedExactlyOnce(t, targets, scaled) + assertCapacityRespected(t, len(targets), 4, scaled) + + // targets that were on pods 0-3 and stayed should not move + moved := countMovedTargetsForTargetSet(initial, scaled, assignedToPodsInRange(initial, 0, 3)) + t.Logf("scale 6->4: %d targets from surviving pods moved", moved) +} + +func TestBoundedLoadRendezvousHash_CurrentAssignment_TargetAdded(t *testing.T) { + targets := genTargets(30) + numPods := 5 + + initial := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{NumPods: numPods}) + assertAllTargetsAssignedExactlyOnce(t, targets, initial) + + // add one target + targets31 := genTargets(31) + withNew := boundedLoadRendezvousHash(targets31, &PlacementStrategyOpts{ + NumPods: numPods, + CurrentAssignment: initial, + }) + assertAllTargetsAssignedExactlyOnce(t, targets31, withNew) + assertCapacityRespected(t, len(targets31), numPods, withNew) + + if !isTargetAssigned(withNew, "target-31") { + t.Error("new target-31 should be assigned") + } + + moved := countMovedTargetsForTargetSet(initial, withNew, targetNames(targets)) + t.Logf("added 1 target: %d existing targets moved", moved) +} + +func TestBoundedLoadRendezvousHash_CurrentAssignment_TargetRemoved(t *testing.T) { + targets := genTargets(30) + numPods := 5 + + initial := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{NumPods: numPods}) + assertAllTargetsAssignedExactlyOnce(t, targets, initial) + + // remove target-15 + targets29 := genTargets(30) + delete(targets29, "target-15") + + // filter CurrentAssignment to exclude the removed target + filtered := filterAssignment(initial, targets29) + + withRemoved := boundedLoadRendezvousHash(targets29, &PlacementStrategyOpts{ + NumPods: numPods, + CurrentAssignment: filtered, + }) + assertAllTargetsAssignedExactlyOnce(t, targets29, withRemoved) + assertCapacityRespected(t, len(targets29), numPods, withRemoved) + + if isTargetAssigned(withRemoved, "target-15") { + t.Error("removed target-15 should not be assigned") + } + + moved := countMovedTargetsForTargetSet(initial, withRemoved, targetNames(targets29)) + t.Logf("removed 1 target: %d existing targets moved", moved) +} + +func TestBoundedLoadRendezvousHash_CurrentAssignment_OverCapacity(t *testing.T) { + targets := genTargets(20) + numPods := 4 + // capacity will be ceil(20/4) = 5 + + // create a CurrentAssignment where pod 0 has 7 targets (over capacity) + overloaded := Assignment{ + 0: {"target-01", "target-02", "target-03", "target-04", "target-05", "target-06", "target-07"}, + 1: {"target-08", "target-09", "target-10"}, + 2: {"target-11", "target-12", "target-13", "target-14", "target-15"}, + 3: {"target-16", "target-17", "target-18", "target-19", "target-20"}, + } + + result := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{ + NumPods: numPods, + CurrentAssignment: overloaded, + }) + assertAllTargetsAssignedExactlyOnce(t, targets, result) + assertCapacityRespected(t, len(targets), numPods, result) + + // pod 0 should now be at or under capacity (5) + if len(result[0]) > 5 { + t.Errorf("pod 0 should be capped at capacity 5, has %d", len(result[0])) + } +} + +func TestBoundedLoadRendezvousHash_CurrentAssignment_Idempotent(t *testing.T) { + targets := genTargets(50) + numPods := 7 + + // run three times, each feeding the previous result as CurrentAssignment + r1 := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{NumPods: numPods}) + r2 := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{NumPods: numPods, CurrentAssignment: r1}) + r3 := boundedLoadRendezvousHash(targets, &PlacementStrategyOpts{NumPods: numPods, CurrentAssignment: r2}) + + moved12 := countMovedTargets(r1, r2) + moved23 := countMovedTargets(r2, r3) + + if moved12 != 0 { + t.Errorf("r1->r2 should move 0 targets, moved %d", moved12) + } + if moved23 != 0 { + t.Errorf("r2->r3 should move 0 targets, moved %d", moved23) + } +} + +func assignedToPodsInRange(a Assignment, lo, hi int) []string { + var out []string + for pod, names := range a { + if pod >= lo && pod <= hi { + out = append(out, names...) + } + } + return out +} + +func filterAssignment(a Assignment, targets map[string]*gapi.TargetConfig) Assignment { + filtered := make(Assignment, len(a)) + for pod, names := range a { + for _, name := range names { + if _, exists := targets[name]; exists { + filtered[pod] = append(filtered[pod], name) + } + } + } + return filtered +} diff --git a/internal/gnmic/plan.go b/internal/gnmic/plan.go index 02e30fa..c4db779 100644 --- a/internal/gnmic/plan.go +++ b/internal/gnmic/plan.go @@ -3,17 +3,33 @@ package gnmic import ( "fmt" "hash/fnv" + "log/slog" "maps" + "os" "sort" + "strconv" - gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/utils" gapi "github.com/openconfig/gnmic/pkg/api/types" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +var logger *slog.Logger + +func init() { + logLevel := slog.LevelInfo + v, ok := os.LookupEnv("DEBUG") + if ok && v == "true" { + logLevel = slog.LevelDebug + } + logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: logLevel, + })) +} + // PlanBuilder builds an ApplyPlan from pipeline data type PlanBuilder struct { + clusterName string // all currently active pipelines pipelines map[string]*PipelineData // an impl to get credentials from a secret @@ -25,6 +41,8 @@ type PlanBuilder struct { // prometheus output controller selected ports for each pipeline/output // pipelineName -> outputNN -> port prometheusOutputPorts map[string]map[string]int32 + // target distribution capacity + targetDistributionCapacity int } type resourceRelationship struct { @@ -41,8 +59,10 @@ type resourceRelationship struct { } // NewPlanBuilder creates a new PlanBuilder -func NewPlanBuilder(credsFetcher CredentialsFetcher) *PlanBuilder { +func NewPlanBuilder(clusterName string, credsFetcher CredentialsFetcher) *PlanBuilder { + return &PlanBuilder{ + clusterName: clusterName, pipelines: make(map[string]*PipelineData), credsFetcher: credsFetcher, relationships: resourceRelationship{ @@ -62,6 +82,11 @@ func (b *PlanBuilder) WithClientTLS(clientTLS *ClientTLSPaths) *PlanBuilder { return b } +func (b *PlanBuilder) WithTargetDistributionCapacity(capacity int) *PlanBuilder { + b.targetDistributionCapacity = capacity + return b +} + // AddPipeline adds pipeline data to the builder func (b *PlanBuilder) AddPipeline(name string, data *PipelineData) *PlanBuilder { b.pipelines[name] = data @@ -71,16 +96,17 @@ func (b *PlanBuilder) AddPipeline(name string, data *PipelineData) *PlanBuilder // Build creates the ApplyPlan from all added pipelines func (b *PlanBuilder) Build() (*ApplyPlan, error) { plan := &ApplyPlan{ - Targets: make(map[string]*gapi.TargetConfig), - Subscriptions: make(map[string]*gapi.SubscriptionConfig), - Outputs: make(map[string]map[string]any), - Inputs: make(map[string]map[string]any), - Processors: make(map[string]map[string]any), - TunnelTargetMatches: make(map[string]*TunnelTargetMatch), - PrometheusPorts: make(map[string]int32), + Targets: make(map[string]*gapi.TargetConfig), + CurrentTargetAssignment: make(map[int]map[string]struct{}), + Subscriptions: make(map[string]*gapi.SubscriptionConfig), + Outputs: make(map[string]map[string]any), + Inputs: make(map[string]map[string]any), + Processors: make(map[string]map[string]any), + TunnelTargetMatches: make(map[string]*TunnelTargetMatch), + PrometheusPorts: make(map[string]int32), } // 1) collect relationships across all pipelines - b.collectRelationships() + b.collectRelationships(plan) // 2) build the configs for _, pipelineData := range b.pipelines { @@ -120,7 +146,7 @@ func (b *PlanBuilder) Build() (*ApplyPlan, error) { return plan, nil } -func (b *PlanBuilder) collectRelationships() { +func (b *PlanBuilder) collectRelationships(plan *ApplyPlan) { for _, pipelineData := range b.pipelines { // subscription -> outputs outputNames := make([]string, 0, len(pipelineData.Outputs)) @@ -142,14 +168,34 @@ func (b *PlanBuilder) collectRelationships() { subNames = append(subNames, subNN) } - for targetNN := range pipelineData.Targets { + for targetNN, targetCR := range pipelineData.Targets { + // target --> subscriptions if _, ok := b.relationships.targetSubscriptions[targetNN]; !ok { b.relationships.targetSubscriptions[targetNN] = make(map[string]struct{}) } for _, subName := range subNames { b.relationships.targetSubscriptions[targetNN][subName] = struct{}{} } + + // pods --> targets + podIdx := b.findTargetCurrentAssignment(targetCR) + if podIdx != nil { + logger.Debug("target assigned to pod", "target", targetNN, "pod", *podIdx) + } else { + logger.Debug("target not assigned to any pod", "target", targetNN) + } + if podIdx != nil && *podIdx >= 0 { + if plan.CurrentTargetAssignment == nil { + plan.CurrentTargetAssignment = make(map[int]map[string]struct{}) + } + if _, ok := plan.CurrentTargetAssignment[*podIdx]; !ok { + plan.CurrentTargetAssignment[*podIdx] = make(map[string]struct{}) + } + plan.CurrentTargetAssignment[*podIdx][targetNN] = struct{}{} + } + } + // input -> outputs inputOutputNames := make([]string, 0, len(pipelineData.Outputs)) for outputNN := range pipelineData.Outputs { @@ -199,29 +245,45 @@ func (b *PlanBuilder) collectRelationships() { } } +func (b *PlanBuilder) findTargetCurrentAssignment(targetCR v1alpha1.Target) *int { + // TODO(KR): add other strategies here + return b.findTargetCurrentAssignmentBoundedLoadHashing(targetCR) +} + +func (b *PlanBuilder) findTargetCurrentAssignmentBoundedLoadHashing(targetCR v1alpha1.Target) *int { + podID := targetCR.Status.ClusterStates[b.clusterName].Pod + if podID == "" { + return nil + } + // get pod index from pod ID + podSuffix := "" + for i := len(podID) - 1; i >= 0; i-- { + if podID[i] == '-' { + podSuffix = podID[i+1:] + break + } + } + podIdx, err := strconv.Atoi(podSuffix) + if err != nil { + return nil // KR: TODO warn ? + } + return &podIdx +} + func (b *PlanBuilder) buildTargets(plan *ApplyPlan, pipelineData *PipelineData) error { - for targetNN, targetSpec := range pipelineData.Targets { + for targetNN, target := range pipelineData.Targets { if _, ok := plan.Targets[targetNN]; ok { continue } - namespace, name := utils.SplitNN(targetNN) + namespace, _ := utils.SplitNN(targetNN) // find the target profile: TODO: cannot happen once the data is collected ? - profileSpec, ok := pipelineData.TargetProfiles[namespace+Delimiter+targetSpec.Profile] + profileSpec, ok := pipelineData.TargetProfiles[namespace+Delimiter+target.Spec.Profile] if !ok { continue } - // build target config - target := &gnmicv1alpha1.Target{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - }, - Spec: targetSpec, - } - // fetch credentials if needed var creds *Credentials if profileSpec.CredentialsRef != "" && b.credsFetcher != nil { @@ -240,7 +302,7 @@ func (b *PlanBuilder) buildTargets(plan *ApplyPlan, pipelineData *PipelineData) } } - targetConfig := buildTargetConfig(target, &profileSpec, creds, b.clientTLS) + targetConfig := buildTargetConfig(&target, &profileSpec, creds, b.clientTLS) targetConfig.Subscriptions = subscriptions plan.Targets[targetNN] = targetConfig diff --git a/internal/gnmic/types.go b/internal/gnmic/types.go index c1651ff..9148a3d 100644 --- a/internal/gnmic/types.go +++ b/internal/gnmic/types.go @@ -10,13 +10,15 @@ const Delimiter = "/" // ApplyPlan represents the configuration to be applied to gNMIc type ApplyPlan struct { - Targets map[string]*gapi.TargetConfig `json:"targets,omitempty"` - Subscriptions map[string]*gapi.SubscriptionConfig `json:"subscriptions,omitempty"` - Outputs map[string]map[string]any `json:"outputs,omitempty"` - Inputs map[string]map[string]any `json:"inputs,omitempty"` - Processors map[string]map[string]any `json:"processors,omitempty"` - TunnelTargetMatches map[string]*TunnelTargetMatch `json:"tunnel-target-matches,omitempty"` - PrometheusPorts map[string]int32 `json:"prometheus-output-ports,omitempty"` // For status reporting + Targets map[string]*gapi.TargetConfig `json:"targets,omitempty"` + // pod index -> target names + CurrentTargetAssignment map[int]map[string]struct{} `json:"current-target-assignment,omitempty"` + Subscriptions map[string]*gapi.SubscriptionConfig `json:"subscriptions,omitempty"` + Outputs map[string]map[string]any `json:"outputs,omitempty"` + Inputs map[string]map[string]any `json:"inputs,omitempty"` + Processors map[string]map[string]any `json:"processors,omitempty"` + TunnelTargetMatches map[string]*TunnelTargetMatch `json:"tunnel-target-matches,omitempty"` + PrometheusPorts map[string]int32 `json:"prometheus-output-ports,omitempty"` // For status reporting } // TunnelTargetMatch defines a policy for matching tunnel targets @@ -34,7 +36,7 @@ type TunnelTargetMatch struct { // PipelineData holds the resolved resources for a single pipeline type PipelineData struct { - Targets map[string]gnmicv1alpha1.TargetSpec + Targets map[string]gnmicv1alpha1.Target TargetProfiles map[string]gnmicv1alpha1.TargetProfileSpec Subscriptions map[string]gnmicv1alpha1.SubscriptionSpec Outputs map[string]gnmicv1alpha1.OutputSpec @@ -53,7 +55,7 @@ type PipelineData struct { // NewPipelineData creates a new PipelineData with initialized maps func NewPipelineData() *PipelineData { return &PipelineData{ - Targets: make(map[string]gnmicv1alpha1.TargetSpec), + Targets: make(map[string]gnmicv1alpha1.Target), TargetProfiles: make(map[string]gnmicv1alpha1.TargetProfileSpec), Subscriptions: make(map[string]gnmicv1alpha1.SubscriptionSpec), Outputs: make(map[string]gnmicv1alpha1.OutputSpec), diff --git a/test/integration/resources/clusters/cluster1.yaml b/test/integration/resources/clusters/cluster1.yaml index ecef021..513b948 100644 --- a/test/integration/resources/clusters/cluster1.yaml +++ b/test/integration/resources/clusters/cluster1.yaml @@ -3,5 +3,14 @@ kind: Cluster metadata: name: c1 spec: - image: ghcr.io/openconfig/gnmic:0.44.1 + image: registry.kmrd.dev/gnmic/gnmic:0.45.0-rc1 replicas: 1 + resources: + requests: + memory: "256Mi" + cpu: "200m" + limits: + memory: "500Mi" + cpu: "1" + targetDistribution: + podCapacity: 5 \ No newline at end of file