From 4264911c207a5f3f865e418a82f7b0f42e74ea34 Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Thu, 18 May 2023 23:57:51 +0000 Subject: [PATCH] feat: v2 ipampool with idempotent scaling math --- cns/api.go | 3 +- cns/configuration/configuration.go | 28 +- cns/fakes/cnsfake.go | 9 + cns/ipampool/metrics.go | 108 +++--- cns/ipampool/monitor.go | 2 +- cns/ipampool/v2/adapter.go | 36 ++ cns/ipampool/v2/math_test.go | 116 ++++++ cns/ipampool/v2/monitor.go | 167 ++++++++ cns/ipampool/v2/monitor_test.go | 358 ++++++++++++++++++ cns/kubecontroller/pod/reconciler.go | 100 +++-- cns/restserver/ipam.go | 62 ++- cns/service/main.go | 41 +- go.mod | 2 +- .../manifests/cnsconfig/swiftconfigmap.yaml | 7 +- 14 files changed, 913 insertions(+), 126 deletions(-) create mode 100644 cns/ipampool/v2/adapter.go create mode 100644 cns/ipampool/v2/math_test.go create mode 100644 cns/ipampool/v2/monitor.go create mode 100644 cns/ipampool/v2/monitor_test.go diff --git a/cns/api.go b/cns/api.go index 9252825c25..3894e9aff6 100644 --- a/cns/api.go +++ b/cns/api.go @@ -50,6 +50,7 @@ type HTTPService interface { GetPodIPConfigState() map[string]IPConfigurationStatus MarkIPAsPendingRelease(numberToMark int) (map[string]IPConfigurationStatus, error) AttachIPConfigsHandlerMiddleware(IPConfigsHandlerMiddleware) + MarkNIPsPendingRelease(n int) (map[string]IPConfigurationStatus, error) } // IPConfigsHandlerFunc @@ -281,13 +282,13 @@ type NodeConfiguration struct { NodeSubnet Subnet } +// IpamPoolMonitorStateSnapshot struct to expose state values for IPAMPoolMonitor struct type IPAMPoolMonitor interface { Start(ctx context.Context) error Update(nnc *v1alpha.NodeNetworkConfig) error GetStateSnapshot() IpamPoolMonitorStateSnapshot } -// IpamPoolMonitorStateSnapshot struct to expose state values for IPAMPoolMonitor struct type IpamPoolMonitorStateSnapshot struct { MinimumFreeIps int64 MaximumFreeIps int64 diff --git a/cns/configuration/configuration.go b/cns/configuration/configuration.go index 005ed3f83b..fd49613131 100644 --- a/cns/configuration/configuration.go +++ b/cns/configuration/configuration.go @@ -26,14 +26,26 @@ const ( ) type CNSConfig struct { + AZRSettings AZRSettings + AsyncPodDeletePath string + CNIConflistFilepath string + CNIConflistScenario string ChannelMode string + EnableAsyncPodDelete bool + EnableCNIConflistGeneration bool + EnableIPAMv2 bool EnablePprof bool EnableSubnetScarcity bool EnableSwiftV2 bool - SWIFTV2Mode SWIFTV2Mode InitializeFromCNI bool + KeyVaultSettings KeyVaultSettings + MSISettings MSISettings + ManageEndpointState bool ManagedSettings ManagedSettings + MellanoxMonitorIntervalSecs int MetricsBindAddress string + ProgramSNATIPTables bool + SWIFTV2Mode SWIFTV2Mode SyncHostNCTimeoutMs int SyncHostNCVersionIntervalMs int TLSCertificatePath string @@ -42,19 +54,8 @@ type CNSConfig struct { TLSSubjectName string TelemetrySettings TelemetrySettings UseHTTPS bool + WatchPods bool `json:"-"` WireserverIP string - KeyVaultSettings KeyVaultSettings - MSISettings MSISettings - ProgramSNATIPTables bool - ManageEndpointState bool - CNIConflistScenario string - EnableCNIConflistGeneration bool - CNIConflistFilepath string - MellanoxMonitorIntervalSecs int - AZRSettings AZRSettings - WatchPods bool - EnableAsyncPodDelete bool - AsyncPodDeletePath string } type TelemetrySettings struct { @@ -219,4 +220,5 @@ func SetCNSConfigDefaults(config *CNSConfig) { if config.AsyncPodDeletePath == "" { config.AsyncPodDeletePath = "/var/run/azure-vnet/deleteIDs" } + config.WatchPods = config.EnableIPAMv2 || config.EnableSwiftV2 } diff --git a/cns/fakes/cnsfake.go b/cns/fakes/cnsfake.go index aabedae2a5..1158651de4 100644 --- a/cns/fakes/cnsfake.go +++ b/cns/fakes/cnsfake.go @@ -115,6 +115,11 @@ func (ipm *IPStateManager) ReleaseIPConfig(ipconfigID string) (cns.IPConfigurati return ipm.AvailableIPConfigState[ipconfigID], nil } +func (ipm *IPStateManager) MarkNIPsPendingRelease(n int) (map[string]cns.IPConfigurationStatus, error) { + // MarkIPASPendingRelease actually already errors if it is unable to release all N IPs + return ipm.MarkIPAsPendingRelease(n) +} + func (ipm *IPStateManager) MarkIPAsPendingRelease(numberOfIPsToMark int) (map[string]cns.IPConfigurationStatus, error) { ipm.Lock() defer ipm.Unlock() @@ -256,6 +261,10 @@ func (fake *HTTPServiceFake) GetPodIPConfigState() map[string]cns.IPConfiguratio return ipconfigs } +func (fake *HTTPServiceFake) MarkNIPsPendingRelease(n int) (map[string]cns.IPConfigurationStatus, error) { + return fake.IPStateManager.MarkIPAsPendingRelease(n) +} + // TODO: Populate on scale down func (fake *HTTPServiceFake) MarkIPAsPendingRelease(numberToMark int) (map[string]cns.IPConfigurationStatus, error) { return fake.IPStateManager.MarkIPAsPendingRelease(numberToMark) diff --git a/cns/ipampool/metrics.go b/cns/ipampool/metrics.go index 1f98ddc4b2..75bcdf806d 100644 --- a/cns/ipampool/metrics.go +++ b/cns/ipampool/metrics.go @@ -12,12 +12,12 @@ const ( customerMetricLabel = "customer_metric" customerMetricLabelValue = "customer metric" subnetExhaustionStateLabel = "subnet_exhaustion_state" - subnetIPExhausted = 1 - subnetIPNotExhausted = 0 + SubnetIPExhausted = 1 + SubnetIPNotExhausted = 0 ) var ( - ipamAllocatedIPCount = prometheus.NewGaugeVec( + IpamAllocatedIPCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cx_ipam_pod_allocated_ips", Help: "IPs currently in use by Pods on this CNS Node.", @@ -25,7 +25,7 @@ var ( }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamAvailableIPCount = prometheus.NewGaugeVec( + IpamAvailableIPCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cx_ipam_available_ips", Help: "IPs available on this CNS Node for use by a Pod.", @@ -33,7 +33,7 @@ var ( }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamBatchSize = prometheus.NewGaugeVec( + IpamBatchSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cx_ipam_batch_size", Help: "IPAM IP pool scaling batch size.", @@ -41,7 +41,7 @@ var ( }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamCurrentAvailableIPcount = prometheus.NewGaugeVec( + IpamCurrentAvailableIPcount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cx_ipam_current_available_ips", Help: "Current available IP count.", @@ -49,7 +49,7 @@ var ( }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamExpectedAvailableIPCount = prometheus.NewGaugeVec( + IpamExpectedAvailableIPCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cx_ipam_expect_available_ips", Help: "Expected future available IP count assuming the Requested IP count is honored.", @@ -57,7 +57,7 @@ var ( }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamMaxIPCount = prometheus.NewGaugeVec( + IpamMaxIPCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cx_ipam_max_ips", Help: "Maximum Secondary IPs allowed on this Node.", @@ -65,7 +65,7 @@ var ( }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamPendingProgramIPCount = prometheus.NewGaugeVec( + IpamPendingProgramIPCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cx_ipam_pending_programming_ips", Help: "IPs reserved but not yet available (Pending Programming).", @@ -73,7 +73,7 @@ var ( }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamPendingReleaseIPCount = prometheus.NewGaugeVec( + IpamPendingReleaseIPCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cx_ipam_pending_release_ips", Help: "IPs reserved but not available anymore (Pending Release).", @@ -81,7 +81,7 @@ var ( }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamPrimaryIPCount = prometheus.NewGaugeVec( + IpamPrimaryIPCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cx_ipam_primary_ips", Help: "NC Primary IP count (reserved from Pod Subnet for DNS and IMDS SNAT).", @@ -89,7 +89,7 @@ var ( }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamRequestedIPConfigCount = prometheus.NewGaugeVec( + IpamRequestedIPConfigCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cx_ipam_requested_ips", Help: "Secondary Pod Subnet IPs requested by this CNS Node (for Pods).", @@ -97,7 +97,7 @@ var ( }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamSecondaryIPCount = prometheus.NewGaugeVec( + IpamSecondaryIPCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "cx_ipam_secondary_ips", Help: "Node NC Secondary IP count (reserved usable by Pods).", @@ -105,67 +105,67 @@ var ( }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamSubnetExhaustionCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "cx_ipam_subnet_exhaustion_state_count_total", - Help: "Count of the number of times the ipam pool monitor sees subnet exhaustion", - }, - []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel, subnetExhaustionStateLabel}, - ) - ipamSubnetExhaustionState = prometheus.NewGaugeVec( + IpamTotalIPCount = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "cx_ipam_subnet_exhaustion_state", - Help: "CNS view of subnet exhaustion state", + Name: "cx_ipam_total_ips", + Help: "Count of total IP pool size allocated to CNS by DNC.", ConstLabels: prometheus.Labels{customerMetricLabel: customerMetricLabelValue}, }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) - ipamTotalIPCount = prometheus.NewGaugeVec( + IpamSubnetExhaustionState = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Name: "cx_ipam_total_ips", - Help: "Total IPs reserved from the Pod Subnet by this Node.", + Name: "cx_ipam_subnet_exhaustion_state", + Help: "IPAM view of subnet exhaustion state", ConstLabels: prometheus.Labels{customerMetricLabel: customerMetricLabelValue}, }, []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel}, ) + IpamSubnetExhaustionCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cx_ipam_subnet_exhaustion_state_count_total", + Help: "Count of the number of times the ipam pool monitor sees subnet exhaustion", + }, + []string{subnetLabel, subnetCIDRLabel, podnetARMIDLabel, subnetExhaustionStateLabel}, + ) ) func init() { metrics.Registry.MustRegister( - ipamAllocatedIPCount, - ipamAvailableIPCount, - ipamBatchSize, - ipamCurrentAvailableIPcount, - ipamExpectedAvailableIPCount, - ipamMaxIPCount, - ipamPendingProgramIPCount, - ipamPendingReleaseIPCount, - ipamPrimaryIPCount, - ipamRequestedIPConfigCount, - ipamSecondaryIPCount, - ipamSubnetExhaustionCount, - ipamSubnetExhaustionState, - ipamTotalIPCount, + IpamAllocatedIPCount, + IpamAvailableIPCount, + IpamBatchSize, + IpamCurrentAvailableIPcount, + IpamExpectedAvailableIPCount, + IpamMaxIPCount, + IpamPendingProgramIPCount, + IpamPendingReleaseIPCount, + IpamPrimaryIPCount, + IpamSecondaryIPCount, + IpamRequestedIPConfigCount, + IpamTotalIPCount, + IpamSubnetExhaustionState, + IpamSubnetExhaustionCount, ) } func observeIPPoolState(state ipPoolState, meta metaState) { labels := []string{meta.subnet, meta.subnetCIDR, meta.subnetARMID} - ipamAllocatedIPCount.WithLabelValues(labels...).Set(float64(state.allocatedToPods)) - ipamAvailableIPCount.WithLabelValues(labels...).Set(float64(state.available)) - ipamBatchSize.WithLabelValues(labels...).Set(float64(meta.batch)) - ipamCurrentAvailableIPcount.WithLabelValues(labels...).Set(float64(state.currentAvailableIPs)) - ipamExpectedAvailableIPCount.WithLabelValues(labels...).Set(float64(state.expectedAvailableIPs)) - ipamMaxIPCount.WithLabelValues(labels...).Set(float64(meta.max)) - ipamPendingProgramIPCount.WithLabelValues(labels...).Set(float64(state.pendingProgramming)) - ipamPendingReleaseIPCount.WithLabelValues(labels...).Set(float64(state.pendingRelease)) - ipamPrimaryIPCount.WithLabelValues(labels...).Set(float64(len(meta.primaryIPAddresses))) - ipamRequestedIPConfigCount.WithLabelValues(labels...).Set(float64(state.requestedIPs)) - ipamSecondaryIPCount.WithLabelValues(labels...).Set(float64(state.secondaryIPs)) - ipamTotalIPCount.WithLabelValues(labels...).Set(float64(state.secondaryIPs + int64(len(meta.primaryIPAddresses)))) + IpamAllocatedIPCount.WithLabelValues(labels...).Set(float64(state.allocatedToPods)) + IpamAvailableIPCount.WithLabelValues(labels...).Set(float64(state.available)) + IpamBatchSize.WithLabelValues(labels...).Set(float64(meta.batch)) + IpamCurrentAvailableIPcount.WithLabelValues(labels...).Set(float64(state.currentAvailableIPs)) + IpamExpectedAvailableIPCount.WithLabelValues(labels...).Set(float64(state.expectedAvailableIPs)) + IpamMaxIPCount.WithLabelValues(labels...).Set(float64(meta.max)) + IpamPendingProgramIPCount.WithLabelValues(labels...).Set(float64(state.pendingProgramming)) + IpamPendingReleaseIPCount.WithLabelValues(labels...).Set(float64(state.pendingRelease)) + IpamPrimaryIPCount.WithLabelValues(labels...).Set(float64(len(meta.primaryIPAddresses))) + IpamRequestedIPConfigCount.WithLabelValues(labels...).Set(float64(state.requestedIPs)) + IpamSecondaryIPCount.WithLabelValues(labels...).Set(float64(state.secondaryIPs)) + IpamTotalIPCount.WithLabelValues(labels...).Set(float64(state.secondaryIPs + int64(len(meta.primaryIPAddresses)))) if meta.exhausted { - ipamSubnetExhaustionState.WithLabelValues(labels...).Set(float64(subnetIPExhausted)) + IpamSubnetExhaustionState.WithLabelValues(labels...).Set(float64(SubnetIPExhausted)) } else { - ipamSubnetExhaustionState.WithLabelValues(labels...).Set(float64(subnetIPNotExhausted)) + IpamSubnetExhaustionState.WithLabelValues(labels...).Set(float64(SubnetIPNotExhausted)) } } diff --git a/cns/ipampool/monitor.go b/cns/ipampool/monitor.go index 18caf00e03..babf863964 100644 --- a/cns/ipampool/monitor.go +++ b/cns/ipampool/monitor.go @@ -105,7 +105,7 @@ func (pm *Monitor) Start(ctx context.Context) error { case css := <-pm.cssSource: // received an updated ClusterSubnetState pm.metastate.exhausted = css.Status.Exhausted logger.Printf("subnet exhausted status = %t", pm.metastate.exhausted) - ipamSubnetExhaustionCount.With(prometheus.Labels{ + IpamSubnetExhaustionCount.With(prometheus.Labels{ subnetLabel: pm.metastate.subnet, subnetCIDRLabel: pm.metastate.subnetCIDR, podnetARMIDLabel: pm.metastate.subnetARMID, subnetExhaustionStateLabel: strconv.FormatBool(pm.metastate.exhausted), }).Inc() diff --git a/cns/ipampool/v2/adapter.go b/cns/ipampool/v2/adapter.go new file mode 100644 index 0000000000..3cc02d0e00 --- /dev/null +++ b/cns/ipampool/v2/adapter.go @@ -0,0 +1,36 @@ +package v2 + +import ( + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" + v1 "k8s.io/api/core/v1" +) + +var _ cns.IPAMPoolMonitor = (*adapter)(nil) + +type adapter struct { + nncSink chan<- v1alpha.NodeNetworkConfig + *Monitor +} + +func (m *Monitor) AsV1(nncSink chan<- v1alpha.NodeNetworkConfig) cns.IPAMPoolMonitor { + return &adapter{ + nncSink: nncSink, + Monitor: m, + } +} + +func (m *adapter) Update(nnc *v1alpha.NodeNetworkConfig) error { + m.nncSink <- *nnc + return nil +} + +func (m *adapter) GetStateSnapshot() cns.IpamPoolMonitorStateSnapshot { + return cns.IpamPoolMonitorStateSnapshot{} +} + +func PodIPDemandListener(ch chan<- int) func([]v1.Pod) { + return func(pods []v1.Pod) { + ch <- len(pods) + } +} diff --git a/cns/ipampool/v2/math_test.go b/cns/ipampool/v2/math_test.go new file mode 100644 index 0000000000..e0c2ba9af1 --- /dev/null +++ b/cns/ipampool/v2/math_test.go @@ -0,0 +1,116 @@ +package v2 + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCalculateTargetIPCount(t *testing.T) { + tests := []struct { + name string + demand int64 + batch int64 + buffer float64 + want int64 + }{ + { + name: "base case", + demand: 0, + batch: 16, + buffer: .5, + want: 16, + }, + { + name: "1/2 demand", + demand: 8, + batch: 16, + buffer: .5, + want: 16, + }, + { + name: "1x demand", + demand: 16, + batch: 16, + buffer: .5, + want: 32, + }, + { + name: "2x demand", + demand: 32, + batch: 16, + buffer: .5, + want: 48, + }, + { + name: "3x demand", + demand: 48, + batch: 16, + buffer: .5, + want: 64, + }, + { + name: "batch of one", + demand: 10, + batch: 1, + buffer: .5, + want: 11, + }, + { + name: "zero buffer", + demand: 10, + batch: 16, + buffer: 0, + want: 16, + }, + { + name: "zero buffer batch of one", + demand: 13, + batch: 1, + buffer: 0, + want: 13, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.want, calculateTargetIPCount(tt.demand, tt.batch, tt.buffer)) + }) + } +} + +func TestCalculateTargetIPCountOrMax(t *testing.T) { + tests := []struct { + name string + demand int64 + batch int64 + buffer float64 + max int64 + want int64 + }{ + { + name: "base case", + demand: 0, + batch: 16, + buffer: .5, + max: 100, + want: 16, + }, + { + name: "clamp to max", + demand: 500, + batch: 16, + buffer: .5, + max: 250, + want: 250, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.want, calculateTargetIPCountOrMax(tt.demand, tt.batch, tt.max, tt.buffer)) + }) + } +} diff --git a/cns/ipampool/v2/monitor.go b/cns/ipampool/v2/monitor.go new file mode 100644 index 0000000000..9f2081d3e7 --- /dev/null +++ b/cns/ipampool/v2/monitor.go @@ -0,0 +1,167 @@ +package v2 + +import ( + "context" + "math" + "sync" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/crd/clustersubnetstate/api/v1alpha1" + "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +const ( + // DefaultMaxIPs default maximum allocatable IPs on a k8s Node. + DefaultMaxIPs = 250 + // fieldManager is the field manager used when patching the NodeNetworkConfig. + fieldManager = "azure-cns" +) + +type nodeNetworkConfigSpecUpdater interface { + PatchSpec(context.Context, *v1alpha.NodeNetworkConfigSpec, string) (*v1alpha.NodeNetworkConfig, error) +} + +type ipStateStore interface { + GetPendingReleaseIPConfigs() []cns.IPConfigurationStatus + MarkNIPsPendingRelease(n int) (map[string]cns.IPConfigurationStatus, error) +} + +type scaler struct { + batch int64 + buffer float64 + exhausted bool + max int64 +} + +type Monitor struct { + z *zap.Logger + scaler scaler + nnccli nodeNetworkConfigSpecUpdater + store ipStateStore + demand int64 + request int64 + demandSource <-chan int + cssSource <-chan v1alpha1.ClusterSubnetState + nncSource <-chan v1alpha.NodeNetworkConfig + started chan interface{} + once sync.Once +} + +func NewMonitor(z *zap.Logger, store ipStateStore, nnccli nodeNetworkConfigSpecUpdater, demandSource <-chan int, nncSource <-chan v1alpha.NodeNetworkConfig, cssSource <-chan v1alpha1.ClusterSubnetState) *Monitor { //nolint:lll // it's fine + return &Monitor{ + z: z.With(zap.String("component", "ipam-pool-monitor")), + store: store, + nnccli: nnccli, + demandSource: demandSource, + cssSource: cssSource, + nncSource: nncSource, + started: make(chan interface{}), + } +} + +// Start begins the Monitor's pool reconcile loop. +// On first run, it will block until a NodeNetworkConfig is received (through a call to Update()). +// Subsequently, it will run run once per RefreshDelay and attempt to re-reconcile the pool. +func (pm *Monitor) Start(ctx context.Context) error { + pm.z.Debug("starting") + for { + // proceed when things happen: + select { + case <-ctx.Done(): // calling context has closed, we'll exit. + return errors.Wrap(ctx.Err(), "pool monitor context closed") + case demand := <-pm.demandSource: // updated demand for IPs, recalculate request + pm.demand = int64(demand) + pm.z.Info("demand update", zap.Int64("demand", pm.demand)) + case css := <-pm.cssSource: // received an updated ClusterSubnetState, recalculate request + pm.scaler.exhausted = css.Status.Exhausted + pm.z.Info("exhaustion update", zap.Bool("exhausted", pm.scaler.exhausted)) + case nnc := <-pm.nncSource: // received a new NodeNetworkConfig, extract the data from it and recalculate request + pm.scaler.max = int64(math.Min(float64(nnc.Status.Scaler.MaxIPCount), DefaultMaxIPs)) + pm.scaler.batch = int64(math.Min(math.Max(float64(nnc.Status.Scaler.BatchSize), 1), float64(pm.scaler.max))) + pm.scaler.buffer = math.Abs(float64(nnc.Status.Scaler.RequestThresholdPercent)) / 100 //nolint:gomnd // it's a percentage + pm.once.Do(func() { + pm.request = nnc.Spec.RequestedIPCount + close(pm.started) // close the init channel the first time we fully receive a NodeNetworkConfig. + pm.z.Debug("started", zap.Int64("initial request", pm.request)) + }) + pm.z.Info("scaler update", zap.Int64("batch", pm.scaler.batch), zap.Float64("buffer", pm.scaler.buffer), zap.Int64("max", pm.scaler.max), zap.Int64("request", pm.request)) + } + select { + case <-pm.started: // this blocks until we have initialized + default: + // if we haven't started yet, we need to wait for the first NNC to be received. + continue // jumps to the next iteration of the outer for-loop + } + // if control has flowed through the select(s) to this point, we can now reconcile. + if err := pm.reconcile(ctx); err != nil { + pm.z.Error("reconcile failed", zap.Error(err)) + } + } +} + +func (pm *Monitor) reconcile(ctx context.Context) error { + // if the subnet is exhausted, locally overwrite the batch/minfree/maxfree in the meta copy for this iteration + // (until the controlplane owns this and modifies the scaler values for us directly instead of writing "exhausted") + // TODO(rbtr) + s := pm.scaler + if s.exhausted { + s.batch = 1 + s.buffer = 1 + } + + // calculate the target state from the current pool state and scaler + target := calculateTargetIPCountOrMax(pm.demand, s.batch, s.max, s.buffer) + pm.z.Info("calculated new request", zap.Int64("demand", pm.demand), zap.Int64("batch", s.batch), zap.Int64("max", s.max), zap.Float64("buffer", s.buffer), zap.Int64("target", target)) + delta := target - pm.request + if delta == 0 { + return nil + } + pm.z.Info("scaling pool", zap.Int64("delta", delta)) + // try to release -delta IPs. this is no-op if delta is negative. + if _, err := pm.store.MarkNIPsPendingRelease(int(-delta)); err != nil { + return errors.Wrapf(err, "failed to mark sufficient IPs as PendingRelease, wanted %d", pm.request-target) + } + spec := pm.buildNNCSpec(target) + if _, err := pm.nnccli.PatchSpec(ctx, &spec, fieldManager); err != nil { + return errors.Wrap(err, "failed to UpdateSpec with NNC client") + } + pm.request = target + pm.z.Info("scaled pool", zap.Int64("request", pm.request)) + return nil +} + +// buildNNCSpec translates CNS's map of IPs to be released and requested IP count into an NNC Spec. +func (pm *Monitor) buildNNCSpec(request int64) v1alpha.NodeNetworkConfigSpec { + // Get All Pending IPs from CNS and populate it again. + pendingReleaseIPs := pm.store.GetPendingReleaseIPConfigs() + spec := v1alpha.NodeNetworkConfigSpec{ + RequestedIPCount: request, + IPsNotInUse: make([]string, len(pendingReleaseIPs)), + } + for i := range pendingReleaseIPs { + spec.IPsNotInUse[i] = pendingReleaseIPs[i].ID + } + return spec +} + +// calculateTargetIPCountOrMax calculates the target IP count request +// using the scaling function and clamps the result at the max IPs. +func calculateTargetIPCountOrMax(demand, batch, max int64, buffer float64) int64 { + targetRequest := calculateTargetIPCount(demand, batch, buffer) + if targetRequest > max { + // clamp request at the max IPs + targetRequest = max + } + return targetRequest +} + +// calculateTargetIPCount calculates an IP count request based on the +// current demand, batch size, and buffer. +// ref: https://github.com/Azure/azure-container-networking/blob/master/docs/feature/ipammath/0-background.md +// the idempotent scaling function is: +// Target = Batch \times \lceil buffer + \frac{Demand}{Batch} \rceil +func calculateTargetIPCount(demand, batch int64, buffer float64) int64 { + return batch * int64(math.Ceil(buffer+float64(demand)/float64(batch))) +} diff --git a/cns/ipampool/v2/monitor_test.go b/cns/ipampool/v2/monitor_test.go new file mode 100644 index 0000000000..8d10b5b988 --- /dev/null +++ b/cns/ipampool/v2/monitor_test.go @@ -0,0 +1,358 @@ +package v2 + +import ( + "context" + "math/rand" + "net/netip" + "testing" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/types" + "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "golang.org/x/exp/maps" +) + +type ipStateStoreMock struct { + pendingReleaseIPConfigs map[string]cns.IPConfigurationStatus + err error +} + +func (m *ipStateStoreMock) GetPendingReleaseIPConfigs() []cns.IPConfigurationStatus { + return maps.Values(m.pendingReleaseIPConfigs) +} + +func (m *ipStateStoreMock) MarkNIPsPendingRelease(n int) (map[string]cns.IPConfigurationStatus, error) { + if m.err != nil { + return nil, m.err + } + newPendingRelease := pendingReleaseGenerator(n) + maps.Copy(newPendingRelease, m.pendingReleaseIPConfigs) + m.pendingReleaseIPConfigs = newPendingRelease + return m.pendingReleaseIPConfigs, nil +} + +// pendingReleaseGenerator generates a variable number of random pendingRelease IPConfigs. +func pendingReleaseGenerator(n int) map[string]cns.IPConfigurationStatus { + m := make(map[string]cns.IPConfigurationStatus, n) + ip := netip.MustParseAddr("10.0.0.0") + for i := 0; i < n; i++ { + id := uuid.New().String() + ip = ip.Next() + status := cns.IPConfigurationStatus{ + ID: id, + IPAddress: ip.String(), + } + status.SetState(types.PendingRelease) + m[id] = status + } + return m +} + +func TestPendingReleaseIPConfigsGenerator(t *testing.T) { + t.Parallel() + n := rand.Intn(100) //nolint:gosec // test + m := pendingReleaseGenerator(n) + assert.Len(t, m, n, "pendingReleaseGenerator made the wrong quantity") + for k, v := range m { + _, err := uuid.Parse(v.ID) + require.NoError(t, err, "pendingReleaseGenerator made a bad UUID") + assert.Equal(t, k, v.ID, "pendingReleaseGenerator stored using the wrong key ") + _, err = netip.ParseAddr(v.IPAddress) + require.NoError(t, err, "pendingReleaseGenerator made a bad IP") + assert.Equal(t, types.PendingRelease, v.GetState(), "pendingReleaseGenerator set the wrong State") + } +} + +func TestBuildNNCSpec(t *testing.T) { + tests := []struct { + name string + pendingReleaseIPConfigs map[string]cns.IPConfigurationStatus + request int64 + }{ + { + name: "without no pending release", + request: 16, + }, + { + name: "with pending release", + pendingReleaseIPConfigs: pendingReleaseGenerator(16), + request: 16, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + pm := &Monitor{ + store: &ipStateStoreMock{ + pendingReleaseIPConfigs: tt.pendingReleaseIPConfigs, + }, + } + spec := pm.buildNNCSpec(tt.request) + assert.Equal(t, tt.request, spec.RequestedIPCount) + assert.Equal(t, len(tt.pendingReleaseIPConfigs), len(spec.IPsNotInUse)) + assert.ElementsMatch(t, maps.Keys(tt.pendingReleaseIPConfigs), spec.IPsNotInUse) + }) + } +} + +type nncClientMock struct { + req v1alpha.NodeNetworkConfigSpec + err error +} + +func (m *nncClientMock) PatchSpec(_ context.Context, spec *v1alpha.NodeNetworkConfigSpec, _ string) (*v1alpha.NodeNetworkConfig, error) { + if m.err != nil { + return nil, m.err + } + m.req = *spec + return nil, nil +} + +func TestReconcile(t *testing.T) { + tests := []struct { + name string + demand int64 + request int64 + scaler scaler + nnccli nncClientMock + store ipStateStoreMock + wantRequest int64 + wantPendingRelease int + wantErr bool + }{ + // no-op case + { + name: "no delta", + demand: 5, + request: 16, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{ + req: v1alpha.NodeNetworkConfigSpec{ + RequestedIPCount: 16, + }, + }, + store: ipStateStoreMock{}, + wantRequest: 16, + }, + // fail to mark IPs pending release + { + name: "fail to release", + demand: 6, + request: 32, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{ + req: v1alpha.NodeNetworkConfigSpec{ + RequestedIPCount: 32, + }, + }, + store: ipStateStoreMock{ + err: errors.Errorf("failed to mark IPs pending release"), + }, + wantRequest: 32, + wantErr: true, + }, + // fail to Patch NNC Spec + { + name: "fail to patch", + demand: 20, + request: 16, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{ + req: v1alpha.NodeNetworkConfigSpec{ + RequestedIPCount: 16, + }, + err: errors.Errorf("failed to patch NNC Spec"), + }, + store: ipStateStoreMock{}, + wantRequest: 16, + wantErr: true, + }, + // normal scale ups with no pending release + { + name: "single scale up", + demand: 15, + request: 16, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{}, + store: ipStateStoreMock{}, + wantRequest: 32, + }, + { + name: "big scale up", + demand: 75, + request: 16, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{}, + store: ipStateStoreMock{}, + wantRequest: 96, + }, + { + name: "capped scale up", + demand: 300, + request: 16, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{}, + store: ipStateStoreMock{}, + wantRequest: 250, + }, + // normal scale down with no previously pending release + { + name: "single scale down", + demand: 5, + request: 32, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{}, + store: ipStateStoreMock{}, + wantRequest: 16, + wantPendingRelease: 16, + }, + { + name: "big scale down", + demand: 5, + request: 128, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{}, + store: ipStateStoreMock{}, + wantRequest: 16, + wantPendingRelease: 112, + }, + { + name: "capped scale down", + demand: 0, + request: 32, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{}, + store: ipStateStoreMock{}, + wantRequest: 16, + wantPendingRelease: 16, + }, + // realign to batch if request is skewed + { + name: "scale up unskew", + demand: 15, + request: 3, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{}, + store: ipStateStoreMock{}, + wantRequest: 32, + }, + { + name: "scale down unskew", + demand: 5, + request: 37, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{}, + store: ipStateStoreMock{}, + wantRequest: 16, + wantPendingRelease: 21, + }, + // normal scale up with previous pending release + { + name: "single scale up with pending release", + demand: 20, + request: 16, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{}, + store: ipStateStoreMock{ + pendingReleaseIPConfigs: pendingReleaseGenerator(16), + }, + wantRequest: 32, + wantPendingRelease: 16, + }, + // normal scale down with previous pending release + { + name: "single scale down with pending release", + demand: 5, + request: 32, + scaler: scaler{ + batch: 16, + buffer: .5, + max: 250, + }, + nnccli: nncClientMock{}, + store: ipStateStoreMock{ + pendingReleaseIPConfigs: pendingReleaseGenerator(16), + }, + wantRequest: 16, + wantPendingRelease: 32, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt := tt + t.Parallel() + pm := &Monitor{ + z: zap.NewNop(), + demand: tt.demand, + request: tt.request, + scaler: tt.scaler, + nnccli: &tt.nnccli, + store: &tt.store, + } + err := pm.reconcile(context.Background()) + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + assert.Equal(t, tt.wantRequest, pm.request) + assert.Equal(t, tt.wantRequest, tt.nnccli.req.RequestedIPCount) + assert.Len(t, tt.nnccli.req.IPsNotInUse, tt.wantPendingRelease) + assert.Equal(t, tt.wantRequest, pm.request) + }) + } +} diff --git a/cns/kubecontroller/pod/reconciler.go b/cns/kubecontroller/pod/reconciler.go index c984574327..9767ddad22 100644 --- a/cns/kubecontroller/pod/reconciler.go +++ b/cns/kubecontroller/pod/reconciler.go @@ -2,10 +2,11 @@ package pod import ( "context" + "strconv" "github.com/pkg/errors" + "go.uber.org/zap" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -13,68 +14,87 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -type podcli interface { +type cli interface { List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error } -type podListener interface { - Update([]v1.Pod) +// watcher watches Pods on the current Node and notifies listeners of changes. +type watcher struct { + z *zap.Logger + cli cli + reconcileFuncs []reconcile.Func } -type PodWatcher struct { - cli podcli - listOpt client.ListOption - ReconcileFuncs []reconcile.Func +func New(z *zap.Logger) *watcher { //nolint:revive // force usage of new by keeping the struct private + return &watcher{ + z: z.With(zap.String("component", "pod-watcher")), + } } -func New(nodename string) *PodWatcher { //nolint:revive // private struct to force constructor - return &PodWatcher{ - listOpt: &client.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodename})}, - } +// With adds reconcile.Funcs to the Watcher. +func (p *watcher) With(fs ...reconcile.Func) *watcher { + p.reconcileFuncs = append(p.reconcileFuncs, fs...) + return p } -func (p *PodWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - for _, f := range p.ReconcileFuncs { - if _, err := f(ctx, req); err != nil { - return reconcile.Result{}, errors.Wrap(err, "failed to reconcile") +func (p *watcher) Reconcile(ctx context.Context, req reconcile.Request) (ctrl.Result, error) { + for _, f := range p.reconcileFuncs { + if res, err := f(ctx, req); !res.IsZero() || err != nil { + return res, errors.Wrap(err, "failed to reconcile") } } - return reconcile.Result{}, nil + return ctrl.Result{}, nil } -type PodFilter func([]v1.Pod) []v1.Pod - -var PodNetworkFilter PodFilter = func(pods []v1.Pod) []v1.Pod { - var filtered []v1.Pod - for _, pod := range pods { - if !pod.Spec.HostNetwork { - filtered = append(filtered, pod) - } - } - return filtered +type limiter interface { + Allow() bool } -func (p *PodWatcher) PodNotifierFunc(f PodFilter, listeners ...podListener) reconcile.Func { +// NotifierFunc returns a reconcile.Func that lists Pods to get the latest +// state and notifies listeners of the resulting Pods. +// listOpts are passed to the client.List call to filter the Pod list. +// limiter is an optional rate limiter which may be used to limit the +// rate at which listeners are notified of list changes. This guarantees +// that all Pod events will eventually be processed, but allows the listeners +// to react to less (but more complete) changes. If we rate limit events, we +// end up sending a version of the Pod list that is newer, without missing +// any events. +// listeners are called with the new Pod list. +func (p *watcher) NewNotifierFunc(listOpts *client.ListOptions, limiter limiter, listeners ...func([]v1.Pod)) reconcile.Func { + p.z.Debug("adding notified for listeners", zap.Int("listeners", len(listeners))) return func(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + if !limiter.Allow() { + // rate limit exceeded, requeue + p.z.Debug("rate limit exceeded") + return ctrl.Result{Requeue: true}, nil + } podList := &v1.PodList{} - if err := p.cli.List(ctx, podList, p.listOpt); err != nil { - return reconcile.Result{}, errors.Wrap(err, "failed to list pods") + if err := p.cli.List(ctx, podList, listOpts); err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to list pods") } pods := podList.Items - if f != nil { - pods = f(pods) - } for _, l := range listeners { - l.Update(pods) + l(pods) } - return reconcile.Result{}, nil + return ctrl.Result{}, nil } } +var hostNetworkIndexer = client.IndexerFunc(func(o client.Object) []string { + pod, ok := o.(*v1.Pod) + if !ok { + return nil + } + return []string{strconv.FormatBool(pod.Spec.HostNetwork)} +}) + // SetupWithManager Sets up the reconciler with a new manager, filtering using NodeNetworkConfigFilter on nodeName. -func (p *PodWatcher) SetupWithManager(mgr ctrl.Manager) error { +func (p *watcher) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { p.cli = mgr.GetClient() - err := ctrl.NewControllerManagedBy(mgr). + if err := mgr.GetFieldIndexer().IndexField(ctx, &v1.Pod{}, "spec.hostNetwork", hostNetworkIndexer); err != nil { + return errors.Wrap(err, "failed to set up hostNetwork indexer") + } + if err := ctrl.NewControllerManagedBy(mgr). For(&v1.Pod{}). WithEventFilter(predicate.Funcs{ // we only want create/delete events UpdateFunc: func(event.UpdateEvent) bool { @@ -84,6 +104,8 @@ func (p *PodWatcher) SetupWithManager(mgr ctrl.Manager) error { return false }, }). - Complete(p) - return errors.Wrap(err, "failed to set up pod watcher with manager") + Complete(p); err != nil { + return errors.Wrap(err, "failed to set up pod watcher with manager") + } + return nil } diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index c9f4d059cf..d9cf742848 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -18,6 +18,7 @@ import ( "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/store" "github.com/pkg/errors" + "golang.org/x/exp/maps" ) var ( @@ -437,6 +438,65 @@ func (service *HTTPRestService) MarkIPAsPendingRelease(totalIpsToRelease int) (m return pendingReleasedIps, nil } +// MarkIPAsPendingRelease will attempt to set [n] number of ips to PendingRelease state. +// It will start with any IPs in PendingProgramming state and then move on to any IPs in Allocated state +// until it has reached the target release quantity. +// If it is unable to set the expected number of IPs to PendingRelease, it will revert the changed IPs +// and return an error. +// MarkNIPsPendingRelease is no-op if [n] is not a positive integer. +func (service *HTTPRestService) MarkNIPsPendingRelease(n int) (map[string]cns.IPConfigurationStatus, error) { + service.Lock() + defer service.Unlock() + // try to release from PendingProgramming + pendingProgrammingIPs := make(map[string]cns.IPConfigurationStatus) + for uuid, ipConfig := range service.PodIPConfigState { //nolint:gocritic // intentional value copy + if n <= 0 { + break + } + if ipConfig.GetState() == types.PendingProgramming { + updatedIPConfig, err := service.updateIPConfigState(uuid, types.PendingRelease, ipConfig.PodInfo) + if err != nil { + return nil, err + } + + pendingProgrammingIPs[uuid] = updatedIPConfig + n-- + } + } + + // try to release from Available + availableIPs := make(map[string]cns.IPConfigurationStatus) + for uuid, ipConfig := range service.PodIPConfigState { //nolint:gocritic // intentional value copy + if n <= 0 { + break + } + if ipConfig.GetState() == types.Available { + updatedIPConfig, err := service.updateIPConfigState(uuid, types.PendingRelease, ipConfig.PodInfo) + if err != nil { + return nil, err + } + + availableIPs[uuid] = updatedIPConfig + n-- + } + } + + // if we can release the requested quantity, return the IPs + if n <= 0 { + maps.Copy(pendingProgrammingIPs, availableIPs) + return pendingProgrammingIPs, nil + } + + // else revert changes + for uuid, ipConfig := range pendingProgrammingIPs { //nolint:gocritic // intentional value copy + _, _ = service.updateIPConfigState(uuid, types.PendingProgramming, ipConfig.PodInfo) + } + for uuid, ipConfig := range availableIPs { //nolint:gocritic // intentional value copy + _, _ = service.updateIPConfigState(uuid, types.Available, ipConfig.PodInfo) + } + return nil, errors.New("unable to release requested number of IPs") +} + // TODO: Add a change so that we should only update the current state if it is different than the new state func (service *HTTPRestService) updateIPConfigState(ipID string, updatedState types.IPState, podInfo cns.PodInfo) (cns.IPConfigurationStatus, error) { if ipConfig, found := service.PodIPConfigState[ipID]; found { @@ -447,7 +507,7 @@ func (service *HTTPRestService) updateIPConfigState(ipID string, updatedState ty return ipConfig, nil } - //nolint:goerr113 + //nolint:goerr113 //legacy return cns.IPConfigurationStatus{}, fmt.Errorf("[updateIPConfigState] Failed to update state %s for the IPConfig. ID %s not found PodIPConfigState", updatedState, ipID) } diff --git a/cns/service/main.go b/cns/service/main.go index 94a8cfd878..ed72869389 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -32,6 +32,7 @@ import ( "github.com/Azure/azure-container-networking/cns/healthserver" "github.com/Azure/azure-container-networking/cns/hnsclient" "github.com/Azure/azure-container-networking/cns/ipampool" + ipampoolv2 "github.com/Azure/azure-container-networking/cns/ipampool/v2" cssctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/clustersubnetstate" mtpncctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/multitenantpodnetworkconfig" nncctrl "github.com/Azure/azure-container-networking/cns/kubecontroller/nodenetworkconfig" @@ -61,6 +62,7 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -111,6 +113,7 @@ const ( var ( rootCtx context.Context rootErrCh chan error + z *zap.Logger ) // Version is populated by make during build. @@ -613,7 +616,10 @@ func main() { // configure zap logger zconfig := zap.NewProductionConfig() zconfig.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - z, _ := zconfig.Build() + if z, err = zconfig.Build(); err != nil { + fmt.Printf("failed to create logger: %v", err) + os.Exit(1) + } // start the healthz/readyz/metrics server readyCh := make(chan interface{}) @@ -1331,9 +1337,6 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn return errors.Wrap(err, "failed to create manager") } - // Build the IPAM Pool monitor - clusterSubnetStateChan := make(chan cssv1alpha1.ClusterSubnetState) - // this cachedscopedclient is built using the Manager's cached client, which is // NOT SAFE TO USE UNTIL THE MANAGER IS STARTED! // This is okay because it is only used to build the IPAMPoolMonitor, which does not @@ -1342,18 +1345,24 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn // reconciler has pushed the Monitor a NodeNetworkConfig. cachedscopedcli := nncctrl.NewScopedClient(nodenetworkconfig.NewClient(manager.GetClient()), types.NamespacedName{Namespace: "kube-system", Name: nodeName}) - poolOpts := ipampool.Options{ - RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond, + // Build the IPAM Pool monitor + var poolMonitor cns.IPAMPoolMonitor + cssCh := make(chan cssv1alpha1.ClusterSubnetState) + ipDemandCh := make(chan int) + if cnsconfig.EnableIPAMv2 { + nncCh := make(chan v1alpha.NodeNetworkConfig) + poolMonitor = ipampoolv2.NewMonitor(z, httpRestServiceImplementation, cachedscopedcli, ipDemandCh, nncCh, cssCh).AsV1(nncCh) + } else { + poolOpts := ipampool.Options{ + RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond, + } + poolMonitor = ipampool.NewMonitor(httpRestServiceImplementation, cachedscopedcli, cssCh, &poolOpts) } - poolMonitor := ipampool.NewMonitor(httpRestServiceImplementation, cachedscopedcli, clusterSubnetStateChan, &poolOpts) - httpRestServiceImplementation.IPAMPoolMonitor = poolMonitor // Start building the NNC Reconciler // get CNS Node IP to compare NC Node IP with this Node IP to ensure NCs were created for this node nodeIP := configuration.NodeIP() - - // NodeNetworkConfig reconciler nncReconciler := nncctrl.NewReconciler(httpRestServiceImplementation, poolMonitor, nodeIP) // pass Node to the Reconciler for Controller xref if err := nncReconciler.SetupWithManager(manager, node); err != nil { //nolint:govet // intentional shadow @@ -1362,7 +1371,7 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn if cnsconfig.EnableSubnetScarcity { // ClusterSubnetState reconciler - cssReconciler := cssctrl.New(clusterSubnetStateChan) + cssReconciler := cssctrl.New(cssCh) if err := cssReconciler.SetupWithManager(manager); err != nil { return errors.Wrapf(err, "failed to setup css reconciler with manager") } @@ -1370,8 +1379,14 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn // TODO: add pod listeners based on Swift V1 vs MT/V2 configuration if cnsconfig.WatchPods { - pw := podctrl.New(nodeName) - if err := pw.SetupWithManager(manager); err != nil { + pw := podctrl.New(z) + if cnsconfig.EnableIPAMv2 { + hostNetworkListOpt := &client.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.hostNetwork": "false"})} // filter only podsubnet pods + // don't relist pods more than every 500ms + limit := rate.NewLimiter(rate.Every(500*time.Millisecond), 1) //nolint:gomnd // clearly 500ms + pw.With(pw.NewNotifierFunc(hostNetworkListOpt, limit, ipampoolv2.PodIPDemandListener(ipDemandCh))) + } + if err := pw.SetupWithManager(ctx, manager); err != nil { return errors.Wrapf(err, "failed to setup pod watcher with manager") } } diff --git a/go.mod b/go.mod index 52d4d339df..685e612679 100644 --- a/go.mod +++ b/go.mod @@ -113,7 +113,7 @@ require ( golang.org/x/oauth2 v0.16.0 // indirect golang.org/x/term v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.5.0 // indirect + golang.org/x/time v0.5.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/test/integration/manifests/cnsconfig/swiftconfigmap.yaml b/test/integration/manifests/cnsconfig/swiftconfigmap.yaml index 76bc6642c7..254118e391 100644 --- a/test/integration/manifests/cnsconfig/swiftconfigmap.yaml +++ b/test/integration/manifests/cnsconfig/swiftconfigmap.yaml @@ -21,10 +21,11 @@ data: "NodeID": "", "NodeSyncIntervalInSeconds": 30 }, + "AsyncPodDeletePath": "/var/run/azure-vnet/deleteIDs", "ChannelMode": "CRD", + "EnableAsyncPodDelete": true, + "EnableIPAMv2": true, "InitializeFromCNI": true, "ManageEndpointState": false, - "ProgramSNATIPTables" : false, - "EnableAsyncPodDelete": true, - "AsyncPodDeletePath": "/var/run/azure-vnet/deleteIDs" + "ProgramSNATIPTables" : false }