From fa8eb39b681fcabc46325d07e9be89b9be78ee34 Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Mon, 14 Feb 2022 17:38:10 +0000 Subject: [PATCH 1/4] remove early nnc send to pool monitor Signed-off-by: Evan Baker --- cns/restserver/internalapi.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index 692241312f..dcfacf6a9b 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -227,7 +227,6 @@ func (service *HTTPRestService) ReconcileNCState( if returnCode != types.Success { return returnCode } - service.IPAMPoolMonitor.Update(nnc) // now parse the secondaryIP list, if it exists in PodInfo list, then assign that ip. for _, secIpConfig := range ncRequest.SecondaryIPConfigs { From d4ac22d25db37b4c60462920d67512549b0ab5e6 Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Mon, 14 Feb 2022 22:11:59 +0000 Subject: [PATCH 2/4] order service starts better and add logging in initialization Signed-off-by: Evan Baker --- cns/ipampool/monitor.go | 8 ++-- cns/restserver/internalapi.go | 2 +- cns/restserver/ipam.go | 4 +- cns/restserver/ipam_test.go | 4 +- cns/service/main.go | 73 ++++++++++++++++++++--------------- 5 files changed, 51 insertions(+), 40 deletions(-) diff --git a/cns/ipampool/monitor.go b/cns/ipampool/monitor.go index 103de87387..f0ce477578 100644 --- a/cns/ipampool/monitor.go +++ b/cns/ipampool/monitor.go @@ -44,7 +44,7 @@ type Monitor struct { metastate metaState nnccli nodeNetworkConfigSpecUpdater httpService cns.HTTPService - initialized chan interface{} + started chan interface{} nncSource chan v1alpha.NodeNetworkConfig once sync.Once } @@ -60,7 +60,7 @@ func NewMonitor(httpService cns.HTTPService, nnccli nodeNetworkConfigSpecUpdater opts: opts, httpService: httpService, nnccli: nnccli, - initialized: make(chan interface{}), + started: make(chan interface{}), nncSource: make(chan v1alpha.NodeNetworkConfig), } } @@ -78,7 +78,7 @@ func (pm *Monitor) Start(ctx context.Context) error { return errors.Wrap(ctx.Err(), "pool monitor context closed") case <-ticker.C: // attempt to reconcile every tick. select { - case <-pm.initialized: // this blocks until we have initialized + case <-pm.started: // this blocks until we have initialized // if we have initialized and enter this case, we proceed out of the select and continue to reconcile. default: // if we have NOT initialized and enter this case, we continue out of this iteration and let the for loop begin again. @@ -90,7 +90,7 @@ func (pm *Monitor) Start(ctx context.Context) error { pm.metastate.batch = scaler.BatchSize pm.metastate.max = scaler.MaxIPCount pm.metastate.minFreeCount, pm.metastate.maxFreeCount = CalculateMinFreeIPs(scaler), CalculateMaxFreeIPs(scaler) - pm.once.Do(func() { close(pm.initialized) }) // close the init channel the first time we receive a NodeNetworkConfig. + pm.once.Do(func() { close(pm.started) }) // close the init channel the first time we receive a NodeNetworkConfig. } // if control has flowed through the select(s) to this point, we can now reconcile. err := pm.reconcile(ctx) diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index dcfacf6a9b..65930ec061 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -255,7 +255,7 @@ func (service *HTTPRestService) ReconcileNCState( } } - err := service.MarkExistingIPsAsPending(nnc.Spec.IPsNotInUse) + err := service.MarkExistingIPsAsPendingRelease(nnc.Spec.IPsNotInUse) if err != nil { logger.Errorf("[Azure CNS] Error. Failed to mark IPs as pending %v", nnc.Spec.IPsNotInUse) return types.UnexpectedError diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index 50c11cb7a5..0c5a975caa 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -331,8 +331,8 @@ func (service *HTTPRestService) releaseIPConfig(podInfo cns.PodInfo) error { return nil } -// called when CNS is starting up and there are existing ipconfigs in the CRD that are marked as pending -func (service *HTTPRestService) MarkExistingIPsAsPending(pendingIPIDs []string) error { +// MarkExistingIPsAsPendingRelease is called when CNS is starting up and there are existing ipconfigs in the CRD that are marked as pending. +func (service *HTTPRestService) MarkExistingIPsAsPendingRelease(pendingIPIDs []string) error { service.Lock() defer service.Unlock() diff --git a/cns/restserver/ipam_test.go b/cns/restserver/ipam_test.go index 13ea553743..dc0a5f8e9c 100644 --- a/cns/restserver/ipam_test.go +++ b/cns/restserver/ipam_test.go @@ -693,7 +693,7 @@ func TestIPAMMarkExistingIPConfigAsPending(t *testing.T) { // mark available ip as as pending pendingIPIDs := []string{testPod2GUID} - err = svc.MarkExistingIPsAsPending(pendingIPIDs) + err = svc.MarkExistingIPsAsPendingRelease(pendingIPIDs) if err != nil { t.Fatalf("Expected to successfully mark available ip as pending") } @@ -705,7 +705,7 @@ func TestIPAMMarkExistingIPConfigAsPending(t *testing.T) { // attempt to mark assigned ipconfig as pending, expect fail pendingIPIDs = []string{testPod1GUID} - err = svc.MarkExistingIPsAsPending(pendingIPIDs) + err = svc.MarkExistingIPsAsPendingRelease(pendingIPIDs) if err == nil { t.Fatalf("Expected to fail when marking assigned ip as pending") } diff --git a/cns/service/main.go b/cns/service/main.go index f997f13904..5848963532 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -820,8 +820,8 @@ type ncStateReconciler interface { } // TODO(rbtr) where should this live?? -// InitCNS initializes cns by passing pods and a createnetworkcontainerrequest -func initCNS(ctx context.Context, cli nodeNetworkConfigGetter, ncReconciler ncStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error { +// reconcileInitialCNSState initializes cns by passing pods and a CreateNetworkContainerRequest +func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ncReconciler ncStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error { // Get nnc using direct client nnc, err := cli.Get(ctx) if err != nil { @@ -864,8 +864,6 @@ func initCNS(ctx context.Context, cli nodeNetworkConfigGetter, ncReconciler ncSt // InitializeCRDState builds and starts the CRD controllers. func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cnsconfig *configuration.CNSConfig) error { - logger.Printf("[Azure CNS] Starting request controller") - // convert interface type to implementation type httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService) if !ok { @@ -880,40 +878,23 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn } httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator) + // build default clientset. kubeConfig, err := ctrl.GetConfig() kubeConfig.UserAgent = fmt.Sprintf("azure-cns-%s", version) if err != nil { logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err) return err } - nnccli, err := nodenetworkconfig.NewClient(kubeConfig) + clientset, err := kubernetes.NewForConfig(kubeConfig) if err != nil { - return errors.Wrap(err, "failed to create NNC client") + return errors.Wrap(err, "failed to build clientset") } + + // get nodename for scoping kube requests to node. nodeName, err := configuration.NodeName() if err != nil { return errors.Wrap(err, "failed to get NodeName") } - // TODO(rbtr): nodename and namespace should be in the cns config - scopedcli := kubecontroller.NewScopedClient(nnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName}) - - // initialize the ipam pool monitor - poolOpts := ipampool.Options{ - RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond, - } - poolMonitor := ipampool.NewMonitor(httpRestServiceImplementation, scopedcli, &poolOpts) - httpRestServiceImplementation.IPAMPoolMonitor = poolMonitor - logger.Printf("Starting IPAM Pool Monitor") - go func() { - if e := poolMonitor.Start(ctx); e != nil { - logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", e) - } - }() - - clientset, err := kubernetes.NewForConfig(kubeConfig) - if err != nil { - return errors.Wrap(err, "failed to build clientset") - } var podInfoByIPProvider cns.PodInfoByIPProvider if cnsconfig.InitializeFromCNI { @@ -939,19 +920,46 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn }) } + // create scoped kube clients. + nnccli, err := nodenetworkconfig.NewClient(kubeConfig) + if err != nil { + return errors.Wrap(err, "failed to create NNC client") + } + // TODO(rbtr): nodename and namespace should be in the cns config + scopedcli := kubecontroller.NewScopedClient(nnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName}) + + // reconcile initial CNS state from CNI or apiserver. // apiserver nnc might not be registered or api server might be down and crashloop backof puts us outside of 5-10 minutes we have for // aks addons to come up so retry a bit more aggresively here. // will retry 10 times maxing out at a minute taking about 8 minutes before it gives up. + attempt := 0 err = retry.Do(func() error { - err = initCNS(ctx, scopedcli, httpRestServiceImplementation, podInfoByIPProvider) + attempt++ + logger.Printf("reconciling initial CNS state attempt: %d", attempt) + err = reconcileInitialCNSState(ctx, scopedcli, httpRestServiceImplementation, podInfoByIPProvider) if err != nil { - logger.Errorf("[Azure CNS] Failed to init cns with err: %v", err) + logger.Errorf("failed to reconcile initial CNS state, attempt: %d err: %v", attempt, err) } return errors.Wrap(err, "failed to initialize CNS state") }, retry.Context(ctx), retry.Delay(initCNSInitalDelay), retry.MaxDelay(time.Minute)) if err != nil { return err } + logger.Printf("reconciled initial CNS state after %d attempts", attempt) + + // initialize the ipam pool monitor + poolOpts := ipampool.Options{ + RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond, + } + poolMonitor := ipampool.NewMonitor(httpRestServiceImplementation, scopedcli, &poolOpts) + httpRestServiceImplementation.IPAMPoolMonitor = poolMonitor + go func() { + logger.Printf("Starting IPAM Pool Monitor") + if e := poolMonitor.Start(ctx); e != nil { + logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", e) + } + }() + logger.Printf("initialized and started IPAM pool monitor") // the nodeScopedCache sets Selector options on the Manager cache which are used // to perform *server-side* filtering of the cached objects. This is very important @@ -982,7 +990,7 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn return errors.Wrapf(err, "failed to get node %s", nodeName) } - reconciler := kubecontroller.NewReconciler(nnccli, httpRestServiceImplementation, httpRestServiceImplementation.IPAMPoolMonitor) + reconciler := kubecontroller.NewReconciler(nnccli, httpRestServiceImplementation, poolMonitor) // pass Node to the Reconciler for Controller xref if err := reconciler.SetupWithManager(manager, node); err != nil { return errors.Wrapf(err, "failed to setup reconciler with manager") @@ -990,13 +998,14 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn // Start the RequestController which starts the reconcile loop go func() { + logger.Printf("Starting NodeNetworkConfig reconciler.") for { if err := manager.Start(ctx); err != nil { logger.Errorf("[Azure CNS] Failed to start request controller: %v", err) // retry to start the request controller // todo: add a CNS metric to count # of failures } else { - logger.Printf("[Azure CNS] Exiting RequestController") + logger.Printf("exiting NodeNetworkConfig reconciler") return } @@ -1005,8 +1014,8 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn } }() - logger.Printf("Starting SyncHostNCVersion") go func() { + logger.Printf("starting SyncHostNCVersion loop") // Periodically poll vfp programmed NC version from NMAgent tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond) for { @@ -1016,10 +1025,12 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn httpRestServiceImplementation.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) cancel() case <-ctx.Done(): + logger.Printf("exiting SyncHostNCVersion") return } } }() + logger.Printf("initialized and started SyncHostNCVersion loop") return nil } From e7f357398f0f3847232ccd4c88c0a3e18b88d98c Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Tue, 15 Feb 2022 17:54:26 +0000 Subject: [PATCH 3/4] wait for reconciler to start Signed-off-by: Evan Baker --- cns/service/main.go | 21 +++++++++++++++------ cns/singletenantcontroller/reconciler.go | 21 +++++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/cns/service/main.go b/cns/service/main.go index 5848963532..37967da0fa 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -928,6 +928,13 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn // TODO(rbtr): nodename and namespace should be in the cns config scopedcli := kubecontroller.NewScopedClient(nnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName}) + // initialize the ipam pool monitor + poolOpts := ipampool.Options{ + RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond, + } + poolMonitor := ipampool.NewMonitor(httpRestServiceImplementation, scopedcli, &poolOpts) + httpRestServiceImplementation.IPAMPoolMonitor = poolMonitor + // reconcile initial CNS state from CNI or apiserver. // apiserver nnc might not be registered or api server might be down and crashloop backof puts us outside of 5-10 minutes we have for // aks addons to come up so retry a bit more aggresively here. @@ -947,12 +954,6 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn } logger.Printf("reconciled initial CNS state after %d attempts", attempt) - // initialize the ipam pool monitor - poolOpts := ipampool.Options{ - RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond, - } - poolMonitor := ipampool.NewMonitor(httpRestServiceImplementation, scopedcli, &poolOpts) - httpRestServiceImplementation.IPAMPoolMonitor = poolMonitor go func() { logger.Printf("Starting IPAM Pool Monitor") if e := poolMonitor.Start(ctx); e != nil { @@ -1013,6 +1014,14 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn time.Sleep(time.Second) } }() + logger.Printf("initialized NodeNetworkConfig reconciler") + // wait for up to 10m for the Reconciler to run once. + timedCtx, cancel := context.WithTimeout(ctx, 10*time.Minute) //nolint:gomnd // default 10m + defer cancel() + if started := reconciler.Started(timedCtx); !started { + return errors.Errorf("timed out waiting for reconciler start") + } + logger.Printf("started NodeNetworkConfig reconciler") go func() { logger.Printf("starting SyncHostNCVersion loop") diff --git a/cns/singletenantcontroller/reconciler.go b/cns/singletenantcontroller/reconciler.go index fd1f82a603..976ef11b5a 100644 --- a/cns/singletenantcontroller/reconciler.go +++ b/cns/singletenantcontroller/reconciler.go @@ -2,6 +2,7 @@ package kubecontroller import ( "context" + "sync" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/logger" @@ -37,6 +38,8 @@ type Reconciler struct { cnscli cnsClient ipampoolmonitorcli ipamPoolMonitorClient nnccli nncGetter + started chan interface{} + once sync.Once } func NewReconciler(nnccli nncGetter, cnscli cnsClient, ipampipampoolmonitorcli ipamPoolMonitorClient) *Reconciler { @@ -44,11 +47,15 @@ func NewReconciler(nnccli nncGetter, cnscli cnsClient, ipampipampoolmonitorcli i cnscli: cnscli, ipampoolmonitorcli: ipampipampoolmonitorcli, nnccli: nnccli, + started: make(chan interface{}), } } // Reconcile is called on CRD status changes func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + defer func() { + r.once.Do(func() { close(r.started) }) + }() nnc, err := r.nnccli.Get(ctx, req.NamespacedName) if err != nil { if apierrors.IsNotFound(err) { @@ -90,6 +97,20 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco return reconcile.Result{}, nil } +// Started blocks until the Reconciler has reconciled at least once, +// then, and any time that it is called after that, it immediately returns true. +// It accepts a cancellable Context and if the context is closed +// before Start it will return false. Passing a closed Context after the +// Reconciler is started is indeterminate and the response is psuedorandom. +func (r *Reconciler) Started(ctx context.Context) bool { + select { + case <-r.started: + return true + case <-ctx.Done(): + return false + } +} + // SetupWithManager Sets up the reconciler with a new manager, filtering using NodeNetworkConfigFilter on nodeName. func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, node *v1.Node) error { err := ctrl.NewControllerManagedBy(mgr). From 4a020504b5da5bf7472aae7b5f28a179023de967 Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Tue, 15 Feb 2022 22:48:14 +0000 Subject: [PATCH 4/4] update comments and docs Signed-off-by: GitHub --- cns/ipampool/monitor.go | 7 +++++++ cns/service/main.go | 6 +++++- cns/singletenantcontroller/reconciler.go | 5 ++--- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/cns/ipampool/monitor.go b/cns/ipampool/monitor.go index f0ce477578..07b0ef4149 100644 --- a/cns/ipampool/monitor.go +++ b/cns/ipampool/monitor.go @@ -65,6 +65,9 @@ func NewMonitor(httpService cns.HTTPService, nnccli nodeNetworkConfigSpecUpdater } } +// Start begins the Monitor's pool reconcile loop. +// On first run, it will block until a NodeNetworkConfig is received (through a call to Update()). +// Subsequently, it will run run once per RefreshDelay and attempt to re-reconcile the pool. func (pm *Monitor) Start(ctx context.Context) error { logger.Printf("[ipam-pool-monitor] Starting CNS IPAM Pool Monitor") @@ -334,6 +337,10 @@ func (pm *Monitor) GetStateSnapshot() cns.IpamPoolMonitorStateSnapshot { // Update ingests a NodeNetworkConfig, clamping some values to ensure they are legal and then // pushing it to the Monitor's source channel. +// If the Monitor has been Started but is blocking until it receives an NNC, this will start +// the pool reconcile loop. +// If the Monitor has not been Started, this will block until Start() is called, which will +// immediately read this passed NNC and start the pool reconcile loop. func (pm *Monitor) Update(nnc *v1alpha.NodeNetworkConfig) { pm.clampScaler(&nnc.Status.Scaler) diff --git a/cns/service/main.go b/cns/service/main.go index 37967da0fa..19bf2dc06c 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -954,6 +954,8 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn } logger.Printf("reconciled initial CNS state after %d attempts", attempt) + // start the pool Monitor before the Reconciler, since it needs to be ready to receive an + // NodeNetworkConfig update by the time the Reconciler tries to send it. go func() { logger.Printf("Starting IPAM Pool Monitor") if e := poolMonitor.Start(ctx); e != nil { @@ -997,7 +999,9 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn return errors.Wrapf(err, "failed to setup reconciler with manager") } - // Start the RequestController which starts the reconcile loop + // Start the Manager which starts the reconcile loop. + // The Reconciler will send an initial NodeNetworkConfig update to the PoolMonitor, starting the + // Monitor's internal loop. go func() { logger.Printf("Starting NodeNetworkConfig reconciler.") for { diff --git a/cns/singletenantcontroller/reconciler.go b/cns/singletenantcontroller/reconciler.go index 976ef11b5a..f591eb058a 100644 --- a/cns/singletenantcontroller/reconciler.go +++ b/cns/singletenantcontroller/reconciler.go @@ -53,9 +53,6 @@ func NewReconciler(nnccli nncGetter, cnscli cnsClient, ipampipampoolmonitorcli i // Reconcile is called on CRD status changes func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - defer func() { - r.once.Do(func() { close(r.started) }) - }() nnc, err := r.nnccli.Get(ctx, req.NamespacedName) if err != nil { if apierrors.IsNotFound(err) { @@ -94,6 +91,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco // record assigned IPs metric allocatedIPs.Set(float64(len(nnc.Status.NetworkContainers[0].IPAssignments))) + // we have received and pushed an NNC update, we are "Started" + r.once.Do(func() { close(r.started) }) return reconcile.Result{}, nil }