From f83aaca6f00b3f7438a855a433e2f5df24792360 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 15:40:38 -0800 Subject: [PATCH 01/22] Fix CNS reconcilation before starting the httpservice listener --- .../kubecontroller/crdrequestcontroller.go | 42 ++++- .../requestcontrollerintreface.go | 2 + cns/service/main.go | 162 +++++++++++------- 3 files changed, 135 insertions(+), 71 deletions(-) diff --git a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go index 9e7575109d..588a6246bd 100644 --- a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go +++ b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "sync" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/cnsclient" @@ -42,6 +43,9 @@ type crdRequestController struct { CNSClient cnsclient.APIClient nodeName string //name of node running this program Reconciler *CrdReconciler + initialized bool + Started bool + lock sync.Mutex } // GetKubeConfig precedence @@ -139,20 +143,38 @@ func NewCrdRequestController(restService *restserver.HTTPRestService, kubeconfig return &crdRequestController, nil } +// InitRequestController will initialize/reconcile the CNS state +func (crdRC *crdRequestController) InitRequestController() error { + logger.Printf("Initializing CNS state") + + defer crdRC.lock.Unlock() + crdRC.lock.Lock() + + if err := crdRC.initCNS(); err != nil { + logger.Errorf("[cns-rc] Error initializing cns state: %v", err) + return err + } + + crdRC.initialized = true + return nil +} + // StartRequestController starts the Reconciler loop which watches for CRD status updates // Blocks until SIGINT or SIGTERM is received // Notifies exitChan when kill signal received func (crdRC *crdRequestController) StartRequestController(exitChan <-chan struct{}) error { - var ( - err error - ) + logger.Printf("StartRequestController") - logger.Printf("Initializing CNS state") - if err = crdRC.initCNS(); err != nil { - logger.Errorf("[cns-rc] Error initializing cns state: %v", err) - return err + crdRC.lock.Lock() + if crdRC.initialized != true { + crdRC.lock.Unlock() + return fmt.Errorf("Failed to start requestController, state is not initialized [%v]", crdRC) } + // Setting the started state + crdRC.Started = true + crdRC.lock.Unlock() + logger.Printf("Starting reconcile loop") if err := crdRC.mgr.Start(exitChan); err != nil { if crdRC.isNotDefined(err) { @@ -166,6 +188,12 @@ func (crdRC *crdRequestController) StartRequestController(exitChan <-chan struct return nil } +func (crdRC *crdRequestController) isStarted() bool { + defer crdRC.lock.Unlock() + crdRC.lock.Lock() + return crdRC.Started +} + // InitCNS initializes cns by passing pods and a createnetworkcontainerrequest func (crdRC *crdRequestController) initCNS() error { var ( diff --git a/cns/requestcontroller/requestcontrollerintreface.go b/cns/requestcontroller/requestcontrollerintreface.go index 24336b8061..19cfa9cc4a 100644 --- a/cns/requestcontroller/requestcontrollerintreface.go +++ b/cns/requestcontroller/requestcontrollerintreface.go @@ -8,6 +8,8 @@ import ( // RequestController interface for cns to interact with the request controller type RequestController interface { + InitRequestController() error StartRequestController(exitChan <-chan struct{}) error UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error + IsStarted() bool } diff --git a/cns/service/main.go b/cns/service/main.go index 3657a578a6..50fdc9a5e6 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -5,9 +5,11 @@ package main import ( "bytes" - "context" "encoding/json" "fmt" + "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" + "github.com/Azure/azure-container-networking/cns/requestcontroller" + "github.com/Azure/azure-container-networking/cns/requestcontroller/kubecontroller" "net/http" "os" "os/signal" @@ -19,7 +21,6 @@ import ( localtls "github.com/Azure/azure-container-networking/server/tls" - "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" "github.com/Azure/azure-container-networking/cns/nmagentclient" "github.com/Azure/azure-container-networking/aitelemetry" @@ -32,8 +33,6 @@ import ( "github.com/Azure/azure-container-networking/cns/hnsclient" "github.com/Azure/azure-container-networking/cns/imdsclient" "github.com/Azure/azure-container-networking/cns/logger" - "github.com/Azure/azure-container-networking/cns/requestcontroller" - "github.com/Azure/azure-container-networking/cns/requestcontroller/kubecontroller" "github.com/Azure/azure-container-networking/cns/restserver" acn "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" @@ -483,6 +482,14 @@ func main() { } } + + // Initialze state in if CNS is running in CRD mode + // State must be initialized before we start HTTPRestService + if config.ChannelMode == cns.CRD { + IniitalizeCRDState(httpRestService, cnsconfig) + } + + logger.Printf("[Azure CNS] Starting HTTPRestService") // Start CNS. if httpRestService != nil { if cnsconfig.UseHTTPS { @@ -536,66 +543,6 @@ func main() { httpRestService.SyncNodeStatus(ep, vnet, node, json.RawMessage{}) } }(privateEndpoint, infravnet, nodeID) - } else if config.ChannelMode == cns.CRD { - var requestController requestcontroller.RequestController - - logger.Printf("[Azure CNS] Starting request controller") - - kubeConfig, err := kubecontroller.GetKubeConfig() - if err != nil { - logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err) - return - } - - //convert interface type to implementation type - httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService) - if !ok { - logger.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", httpRestService) - return - } - - // Set orchestrator type - orchestrator := cns.SetOrchestratorTypeRequest{ - OrchestratorType: cns.KubernetesCRD, - } - httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator) - - // Get crd implementation of request controller - requestController, err = kubecontroller.NewCrdRequestController(httpRestServiceImplementation, kubeConfig) - if err != nil { - logger.Errorf("[Azure CNS] Failed to make crd request controller :%v", err) - return - } - - logger.Printf("Starting SyncHostNCVersion") - rootCxt := context.Background() - go func() { - // Periodically poll vfp programmed NC version from NMAgent - for { - <-time.NewTicker(cnsconfig.SyncHostNCVersionIntervalMs * time.Millisecond).C - httpRestServiceImplementation.SyncHostNCVersion(rootCxt, config.ChannelMode, cnsconfig.SyncHostNCTimeoutMs) - } - }() - - // initialize the ipam pool monitor - httpRestServiceImplementation.IPAMPoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(httpRestServiceImplementation, requestController) - - //Start the RequestController which starts the reconcile loop - requestControllerStopChannel := make(chan struct{}) - defer close(requestControllerStopChannel) - go func() { - if err := requestController.StartRequestController(requestControllerStopChannel); err != nil { - logger.Errorf("[Azure CNS] Failed to start request controller: %v", err) - return - } - }() - - ctx := context.Background() - go func() { - if err := httpRestServiceImplementation.IPAMPoolMonitor.Start(ctx, poolIPAMRefreshRateInMilliseconds); err != nil { - logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", err) - } - }() } var netPlugin network.NetPlugin @@ -709,3 +656,90 @@ func main() { logger.Printf("CNS exited") logger.Close() } + +// initializeCRD state +func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration.CNSConfig) { + var requestController requestcontroller.RequestController + + logger.Printf("[Azure CNS] Starting request controller") + + kubeConfig, err := kubecontroller.GetKubeConfig() + if err != nil { + logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err) + return + } + + //convert interface type to implementation type + httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService) + if !ok { + logger.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", httpRestService) + return + } + + // Set orchestrator type + orchestrator := cns.SetOrchestratorTypeRequest{ + OrchestratorType: cns.KubernetesCRD, + } + httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator) + + // Get crd implementation of request controller + requestController, err = kubecontroller.NewCrdRequestController(httpRestServiceImplementation, kubeConfig) + if err != nil { + logger.Errorf("[Azure CNS] Failed to make crd request controller :%v", err) + return + } + + err := requestController.InitRequestController() + if err != nil { + logger.Errorf("[Azure CNS] Failed to initialized cns state :%v", err) + return + } + + // initialize the ipam pool monitor + httpRestServiceImplementation.IPAMPoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(httpRestServiceImplementation, requestController) + + //Start the RequestController which starts the reconcile loop + requestControllerStopChannel := make(chan struct{}) + defer close(requestControllerStopChannel) + go func() { + for { + if err := requestController.StartRequestController(requestControllerStopChannel); err != nil { + logger.Errorf("[Azure CNS] Failed to start request controller: %v", err) + // retry to start the request controller + // todo: add a CNS metric to count # of failures + } else { + return + } + + // Retry after 1sec + time.Sleep(time.Second) + } + }() + + for { + if requestController.IsStarted() { + logger.Printf("RequestController is started") + break + } + + logger.Printf("Waiting for requestController to start...") + time.Sleep(time.Millisecond * 500) + } + + ctx := context.Background() + go func() { + if err := httpRestServiceImplementation.IPAMPoolMonitor.Start(ctx, poolIPAMRefreshRateInMilliseconds); err != nil { + logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", err) + } + }() + + logger.Printf("Starting SyncHostNCVersion") + rootCxt := context.Background() + go func() { + // Periodically poll vfp programmed NC version from NMAgent + for { + <-time.NewTicker(cnsconfig.SyncHostNCVersionIntervalMs * time.Millisecond).C + httpRestServiceImplementation.SyncHostNCVersion(rootCxt, cnsconfig.ChannelMode, cnsconfig.SyncHostNCTimeoutMs) + } + }() +} From 1239912e37afc0203ca1170c62a83cce8f62a7aa Mon Sep 17 00:00:00 2001 From: neaggarw Date: Sun, 28 Feb 2021 01:28:04 -0800 Subject: [PATCH 02/22] Fixed build --- cns/requestcontroller/kubecontroller/crdrequestcontroller.go | 3 ++- cns/service/main.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go index 588a6246bd..f21691d59d 100644 --- a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go +++ b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go @@ -188,7 +188,8 @@ func (crdRC *crdRequestController) StartRequestController(exitChan <-chan struct return nil } -func (crdRC *crdRequestController) isStarted() bool { +// return if RequestController is started +func (crdRC *crdRequestController) IsStarted() bool { defer crdRC.lock.Unlock() crdRC.lock.Lock() return crdRC.Started diff --git a/cns/service/main.go b/cns/service/main.go index 50fdc9a5e6..8d0b93f3c8 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -38,6 +38,7 @@ import ( "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" "github.com/Azure/azure-container-networking/store" + "context" ) const ( From f58bfc94d44aa55cf9da8c3b4cc1e15f6800b353 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Sun, 28 Feb 2021 01:37:57 -0800 Subject: [PATCH 03/22] More fixes --- cns/service/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cns/service/main.go b/cns/service/main.go index 8d0b93f3c8..bada4c16e0 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -690,7 +690,7 @@ func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration return } - err := requestController.InitRequestController() + err = requestController.InitRequestController() if err != nil { logger.Errorf("[Azure CNS] Failed to initialized cns state :%v", err) return From d01938810e65eadea8b1503b22576ca948e396a3 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Sun, 28 Feb 2021 02:06:15 -0800 Subject: [PATCH 04/22] Fixed reordering --- cns/service/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cns/service/main.go b/cns/service/main.go index bada4c16e0..0062d71f50 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -690,15 +690,15 @@ func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration return } + // initialize the ipam pool monitor + httpRestServiceImplementation.IPAMPoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(httpRestServiceImplementation, requestController) + err = requestController.InitRequestController() if err != nil { logger.Errorf("[Azure CNS] Failed to initialized cns state :%v", err) return } - // initialize the ipam pool monitor - httpRestServiceImplementation.IPAMPoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(httpRestServiceImplementation, requestController) - //Start the RequestController which starts the reconcile loop requestControllerStopChannel := make(chan struct{}) defer close(requestControllerStopChannel) From 19c2b82aa5943a0e198d6c5c367d007b21ee2147 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Sun, 28 Feb 2021 03:37:10 -0800 Subject: [PATCH 05/22] Fix IPAM pool monitor init --- cns/ipampoolmonitor/ipampoolmonitor.go | 5 +++-- cns/requestcontroller/kubecontroller/crdrequestcontroller.go | 2 +- cns/service/main.go | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cns/ipampoolmonitor/ipampoolmonitor.go b/cns/ipampoolmonitor/ipampoolmonitor.go index 7a57fa68be..ca065b5143 100644 --- a/cns/ipampoolmonitor/ipampoolmonitor.go +++ b/cns/ipampoolmonitor/ipampoolmonitor.go @@ -27,6 +27,7 @@ type CNSIPAMPoolMonitor struct { } func NewCNSIPAMPoolMonitor(cns cns.HTTPService, rc requestcontroller.RequestController) *CNSIPAMPoolMonitor { + logger.Printf("NewCNSIPAMPoolMonitor: Create IPAM Pool Monitor") return &CNSIPAMPoolMonitor{ pendingRelease: false, cns: cns, @@ -52,7 +53,7 @@ func (pm *CNSIPAMPoolMonitor) Start(ctx context.Context, poolMonitorRefreshMilli for { select { case <-ctx.Done(): - return fmt.Errorf("CNS IPAM Pool Monitor received cancellation signal") + return fmt.Errorf("[ipam-pool-monitor] CNS IPAM Pool Monitor received cancellation signal") case <-ticker.C: err := pm.Reconcile() if err != nil { @@ -70,7 +71,7 @@ func (pm *CNSIPAMPoolMonitor) Reconcile() error { availableIPConfigCount := len(pm.cns.GetAvailableIPConfigs()) // TODO: add pending allocation count to real cns freeIPConfigCount := pm.cachedNNC.Spec.RequestedIPCount - int64(allocatedPodIPCount) - msg := fmt.Sprintf("Pool Size: %v, Goal Size: %v, BatchSize: %v, MinFree: %v, MaxFree:%v, Allocated: %v, Available: %v, Pending Release: %v, Free: %v, Pending Program: %v", + msg := fmt.Sprintf("[ipam-pool-monitor] Pool Size: %v, Goal Size: %v, BatchSize: %v, MinFree: %v, MaxFree:%v, Allocated: %v, Available: %v, Pending Release: %v, Free: %v, Pending Program: %v", cnsPodIPConfigCount, pm.cachedNNC.Spec.RequestedIPCount, pm.scalarUnits.BatchSize, pm.MinimumFreeIps, pm.MaximumFreeIps, allocatedPodIPCount, availableIPConfigCount, pendingReleaseIPCount, freeIPConfigCount, pendingProgramCount) switch { diff --git a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go index f21691d59d..f0eeeae984 100644 --- a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go +++ b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go @@ -145,7 +145,7 @@ func NewCrdRequestController(restService *restserver.HTTPRestService, kubeconfig // InitRequestController will initialize/reconcile the CNS state func (crdRC *crdRequestController) InitRequestController() error { - logger.Printf("Initializing CNS state") + logger.Printf("InitRequestController") defer crdRC.lock.Unlock() crdRC.lock.Lock() diff --git a/cns/service/main.go b/cns/service/main.go index 0062d71f50..b507c140e9 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -728,6 +728,7 @@ func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration } ctx := context.Background() + logger.Printf("Starting IPAM Pool Monitor") go func() { if err := httpRestServiceImplementation.IPAMPoolMonitor.Start(ctx, poolIPAMRefreshRateInMilliseconds); err != nil { logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", err) From 6af486733f3bc2fae5d5a57d22b1c39ae3dbd13c Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 17:44:11 -0800 Subject: [PATCH 06/22] DONTPUSH --- cns/common/service.go | 1 + cns/restserver/ipam.go | 4 +++- cns/restserver/restserver.go | 17 ++++++++++++++-- cns/service.go | 38 +++++++++++++++++++++++++++++++----- 4 files changed, 52 insertions(+), 8 deletions(-) diff --git a/cns/common/service.go b/cns/common/service.go index ff9918f717..31a3e7464c 100644 --- a/cns/common/service.go +++ b/cns/common/service.go @@ -24,6 +24,7 @@ type Service struct { // ServiceAPI defines base interface. type ServiceAPI interface { + Init(*ServiceConfig) error Start(*ServiceConfig) error Stop() GetOption(string) interface{} diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index d2960b90a7..5a00101e6a 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -375,7 +375,9 @@ func (service *HTTPRestService) AllocateDesiredIPConfig(podInfo cns.KubernetesPo json.Unmarshal(ipState.OrchestratorContext, &pInfo) return podIpInfo, fmt.Errorf("Desired IP is already allocated %+v to Pod: %+v, requested for pod %+v", ipState, pInfo, podInfo) } - } else if ipState.State == cns.Available { + } else if ipState.State == cns.Available || ipState.State == cns.PendingProgramming { + // This race can happen during restart, where CNS state is lost and thus we have lost the NC programmed version + // As part of reconcile, we mark IPs as Allocated which are already allocated to PODs (listed from APIServer) service.setIPConfigAsAllocated(ipState, podInfo, orchestratorContext) found = true } else { diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index c2fd6649a2..a3f1b77e0d 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -130,8 +130,8 @@ func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdscl }, nil } -// Start starts the CNS listener. -func (service *HTTPRestService) Start(config *common.ServiceConfig) error { +// Init starts the CNS listener. +func (service *HTTPRestService) Init(config *common.ServiceConfig) error { err := service.Initialize(config) if err != nil { logger.Errorf("[Azure CNS] Failed to initialize base service, err:%v.", err) @@ -209,6 +209,19 @@ func (service *HTTPRestService) Start(config *common.ServiceConfig) error { return nil } +// Start starts the CNS listener. +func (service *HTTPRestService) Start(config *common.ServiceConfig) error { + + // Start the listener. + // continue to listen on the normal endpoint for http traffic, this will be supported + // for sometime until partners migrate fully to https + if err := service.StartListener(config); err != nil { + return err + } + + return nil +} + // Stop stops the CNS. func (service *HTTPRestService) Stop() { service.Uninitialize() diff --git a/cns/service.go b/cns/service.go index 79200bdc8b..c907b73f06 100644 --- a/cns/service.go +++ b/cns/service.go @@ -4,6 +4,8 @@ package cns import ( + "fmt" + "github.com/Azure/azure-container-networking/cns/logger" "net/http" "net/url" "strings" @@ -80,11 +82,18 @@ func (service *Service) Initialize(config *common.ServiceConfig) error { return err } } - // Start the listener. - // continue to listen on the normal endpoint for http traffic, this will be supported - // for sometime until partners migrate fully to https - if err = listener.Start(config.ErrChan); err != nil { - return err + + // todo: Do we need to run this logic for all channel types + if config.ChannelMode == CRD { + logger.Printf("HTTP listener will be started later after CNS state has been reconciled") + } else { + logger.Printf("Starting HTTP listener") + // Start the listener. + // continue to listen on the normal endpoint for http traffic, this will be supported + // for sometime until partners migrate fully to https + if err = listener.Start(config.ErrChan); err != nil { + return err + } } config.Listener = listener } @@ -95,6 +104,25 @@ func (service *Service) Initialize(config *common.ServiceConfig) error { return nil } +func (service *Service) StartListener(config *common.ServiceConfig) error { + log.Debugf("[Azure CNS] Going to start listener: %+v", config) + + // Initialize the listener. + if service.Listener != nil { + log.Debugf("[Azure CNS] Starting listener: %+v", config) + // Start the listener. + // continue to listen on the normal endpoint for http traffic, this will be supported + // for sometime until partners migrate fully to https + if err := service.Listener.Start(config.ErrChan); err != nil { + return err + } + } else { + return fmt.Errorf("Failed to start a listener, it is not initialized, config %+v", config) + } + + return nil +} + // Uninitialize cleans up the plugin. func (service *Service) Uninitialize() { service.Listener.Stop() From 013b9df9e6a0ce3854b00c52cf7efdbdd96574e7 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 17:56:28 -0800 Subject: [PATCH 07/22] Fixed HTTPRestService initialization --- cns/cnsclient/cnsclient_test.go | 8 +++++++- cns/restserver/api_test.go | 6 ++++++ cns/service.go | 13 +------------ cns/service/main.go | 30 ++++++++++++++++++------------ 4 files changed, 32 insertions(+), 25 deletions(-) diff --git a/cns/cnsclient/cnsclient_test.go b/cns/cnsclient/cnsclient_test.go index 850438e922..b566033d62 100644 --- a/cns/cnsclient/cnsclient_test.go +++ b/cns/cnsclient/cnsclient_test.go @@ -137,9 +137,15 @@ func TestMain(m *testing.M) { } if httpRestService != nil { + err = httpRestService.Init(&config) + if err != nil { + logger.Errorf("Failed to initialize HttpService, err:%v.\n", err) + return + } + err = httpRestService.Start(&config) if err != nil { - logger.Errorf("Failed to start CNS, err:%v.\n", err) + logger.Errorf("Failed to start HttpService, err:%v.\n", err) return } } diff --git a/cns/restserver/api_test.go b/cns/restserver/api_test.go index 280130b860..49cc8e5fb4 100644 --- a/cns/restserver/api_test.go +++ b/cns/restserver/api_test.go @@ -941,6 +941,12 @@ func startService() error { file, _ := os.Create(cnsJsonFileName) file.Close() + err = service.Init(&config) + if err != nil { + logger.Errorf("Failed to Init CNS, err:%v.\n", err) + return err + } + err = service.Start(&config) if err != nil { logger.Errorf("Failed to start CNS, err:%v.\n", err) diff --git a/cns/service.go b/cns/service.go index c907b73f06..32b439a985 100644 --- a/cns/service.go +++ b/cns/service.go @@ -83,18 +83,7 @@ func (service *Service) Initialize(config *common.ServiceConfig) error { } } - // todo: Do we need to run this logic for all channel types - if config.ChannelMode == CRD { - logger.Printf("HTTP listener will be started later after CNS state has been reconciled") - } else { - logger.Printf("Starting HTTP listener") - // Start the listener. - // continue to listen on the normal endpoint for http traffic, this will be supported - // for sometime until partners migrate fully to https - if err = listener.Start(config.ErrChan); err != nil { - return err - } - } + logger.Printf("HTTP listener will be started later after CNS state has been reconciled") config.Listener = listener } diff --git a/cns/service/main.go b/cns/service/main.go index 0062d71f50..99d0ed6ebe 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -10,6 +10,7 @@ import ( "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" "github.com/Azure/azure-container-networking/cns/requestcontroller" "github.com/Azure/azure-container-networking/cns/requestcontroller/kubecontroller" + localtls "github.com/Azure/azure-container-networking/server/tls" "net/http" "os" "os/signal" @@ -19,10 +20,9 @@ import ( "syscall" "time" - localtls "github.com/Azure/azure-container-networking/server/tls" - "github.com/Azure/azure-container-networking/cns/nmagentclient" + "context" "github.com/Azure/azure-container-networking/aitelemetry" "github.com/Azure/azure-container-networking/cnm/ipam" "github.com/Azure/azure-container-networking/cnm/network" @@ -38,7 +38,6 @@ import ( "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" "github.com/Azure/azure-container-networking/store" - "context" ) const ( @@ -483,15 +482,7 @@ func main() { } } - - // Initialze state in if CNS is running in CRD mode - // State must be initialized before we start HTTPRestService - if config.ChannelMode == cns.CRD { - IniitalizeCRDState(httpRestService, cnsconfig) - } - - logger.Printf("[Azure CNS] Starting HTTPRestService") - // Start CNS. + logger.Printf("[Azure CNS] Initialize HTTPRestService") if httpRestService != nil { if cnsconfig.UseHTTPS { config.TlsSettings = localtls.TlsSettings{ @@ -501,6 +492,21 @@ func main() { } } + err = httpRestService.Init(&config) + if err != nil { + logger.Errorf("Failed to init HTTPService, err:%v.\n", err) + return + } + } + + // Initialze state in if CNS is running in CRD mode + // State must be initialized before we start HTTPRestService + if config.ChannelMode == cns.CRD { + IniitalizeCRDState(httpRestService, cnsconfig) + } + + logger.Printf("[Azure CNS] Start HTTP listener") + if (httpRestService != nil) { err = httpRestService.Start(&config) if err != nil { logger.Errorf("Failed to start CNS, err:%v.\n", err) From c7eaf64ba242a500b569e4a078cf4ad4e8664d77 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 18:34:19 -0800 Subject: [PATCH 08/22] Fixed fakes --- cns/fakes/cnsfake.go | 5 +++++ cns/fakes/requestcontrollerfake.go | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/cns/fakes/cnsfake.go b/cns/fakes/cnsfake.go index 41bf451c19..3141ef959e 100644 --- a/cns/fakes/cnsfake.go +++ b/cns/fakes/cnsfake.go @@ -306,4 +306,9 @@ func (fake *HTTPServiceFake) Start(*common.ServiceConfig) error { return nil } + +func (fake *HTTPServiceFake) Init(*common.ServiceConfig) error { + return nil +} + func (fake *HTTPServiceFake) Stop() {} diff --git a/cns/fakes/requestcontrollerfake.go b/cns/fakes/requestcontrollerfake.go index 6b46c6f2c0..9060f88bcc 100644 --- a/cns/fakes/requestcontrollerfake.go +++ b/cns/fakes/requestcontrollerfake.go @@ -62,10 +62,18 @@ func (rc *RequestControllerFake) CarveIPConfigsAndAddToStatusAndCNS(numberOfIPCo return cnsIPConfigs } +func (rc *RequestControllerFake) InitRequestController() error { + return nil +} + func (rc *RequestControllerFake) StartRequestController(exitChan <-chan struct{}) error { return nil } +func (rc *RequestControllerFake) IsStarted() bool { + return true +} + func (rc *RequestControllerFake) UpdateCRDSpec(cntxt context.Context, desiredSpec nnc.NodeNetworkConfigSpec) error { rc.cachedCRD.Spec = desiredSpec From baca2a9c3f7cfa2c9c128bddffc282c4013b7b82 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 15:40:38 -0800 Subject: [PATCH 09/22] Fix CNS reconcilation before starting the httpservice listener --- .../kubecontroller/crdrequestcontroller.go | 42 ++++- .../requestcontrollerintreface.go | 2 + cns/service/main.go | 162 +++++++++++------- 3 files changed, 135 insertions(+), 71 deletions(-) diff --git a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go index 9e7575109d..588a6246bd 100644 --- a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go +++ b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "sync" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/cnsclient" @@ -42,6 +43,9 @@ type crdRequestController struct { CNSClient cnsclient.APIClient nodeName string //name of node running this program Reconciler *CrdReconciler + initialized bool + Started bool + lock sync.Mutex } // GetKubeConfig precedence @@ -139,20 +143,38 @@ func NewCrdRequestController(restService *restserver.HTTPRestService, kubeconfig return &crdRequestController, nil } +// InitRequestController will initialize/reconcile the CNS state +func (crdRC *crdRequestController) InitRequestController() error { + logger.Printf("Initializing CNS state") + + defer crdRC.lock.Unlock() + crdRC.lock.Lock() + + if err := crdRC.initCNS(); err != nil { + logger.Errorf("[cns-rc] Error initializing cns state: %v", err) + return err + } + + crdRC.initialized = true + return nil +} + // StartRequestController starts the Reconciler loop which watches for CRD status updates // Blocks until SIGINT or SIGTERM is received // Notifies exitChan when kill signal received func (crdRC *crdRequestController) StartRequestController(exitChan <-chan struct{}) error { - var ( - err error - ) + logger.Printf("StartRequestController") - logger.Printf("Initializing CNS state") - if err = crdRC.initCNS(); err != nil { - logger.Errorf("[cns-rc] Error initializing cns state: %v", err) - return err + crdRC.lock.Lock() + if crdRC.initialized != true { + crdRC.lock.Unlock() + return fmt.Errorf("Failed to start requestController, state is not initialized [%v]", crdRC) } + // Setting the started state + crdRC.Started = true + crdRC.lock.Unlock() + logger.Printf("Starting reconcile loop") if err := crdRC.mgr.Start(exitChan); err != nil { if crdRC.isNotDefined(err) { @@ -166,6 +188,12 @@ func (crdRC *crdRequestController) StartRequestController(exitChan <-chan struct return nil } +func (crdRC *crdRequestController) isStarted() bool { + defer crdRC.lock.Unlock() + crdRC.lock.Lock() + return crdRC.Started +} + // InitCNS initializes cns by passing pods and a createnetworkcontainerrequest func (crdRC *crdRequestController) initCNS() error { var ( diff --git a/cns/requestcontroller/requestcontrollerintreface.go b/cns/requestcontroller/requestcontrollerintreface.go index 24336b8061..19cfa9cc4a 100644 --- a/cns/requestcontroller/requestcontrollerintreface.go +++ b/cns/requestcontroller/requestcontrollerintreface.go @@ -8,6 +8,8 @@ import ( // RequestController interface for cns to interact with the request controller type RequestController interface { + InitRequestController() error StartRequestController(exitChan <-chan struct{}) error UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error + IsStarted() bool } diff --git a/cns/service/main.go b/cns/service/main.go index 3657a578a6..50fdc9a5e6 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -5,9 +5,11 @@ package main import ( "bytes" - "context" "encoding/json" "fmt" + "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" + "github.com/Azure/azure-container-networking/cns/requestcontroller" + "github.com/Azure/azure-container-networking/cns/requestcontroller/kubecontroller" "net/http" "os" "os/signal" @@ -19,7 +21,6 @@ import ( localtls "github.com/Azure/azure-container-networking/server/tls" - "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" "github.com/Azure/azure-container-networking/cns/nmagentclient" "github.com/Azure/azure-container-networking/aitelemetry" @@ -32,8 +33,6 @@ import ( "github.com/Azure/azure-container-networking/cns/hnsclient" "github.com/Azure/azure-container-networking/cns/imdsclient" "github.com/Azure/azure-container-networking/cns/logger" - "github.com/Azure/azure-container-networking/cns/requestcontroller" - "github.com/Azure/azure-container-networking/cns/requestcontroller/kubecontroller" "github.com/Azure/azure-container-networking/cns/restserver" acn "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" @@ -483,6 +482,14 @@ func main() { } } + + // Initialze state in if CNS is running in CRD mode + // State must be initialized before we start HTTPRestService + if config.ChannelMode == cns.CRD { + IniitalizeCRDState(httpRestService, cnsconfig) + } + + logger.Printf("[Azure CNS] Starting HTTPRestService") // Start CNS. if httpRestService != nil { if cnsconfig.UseHTTPS { @@ -536,66 +543,6 @@ func main() { httpRestService.SyncNodeStatus(ep, vnet, node, json.RawMessage{}) } }(privateEndpoint, infravnet, nodeID) - } else if config.ChannelMode == cns.CRD { - var requestController requestcontroller.RequestController - - logger.Printf("[Azure CNS] Starting request controller") - - kubeConfig, err := kubecontroller.GetKubeConfig() - if err != nil { - logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err) - return - } - - //convert interface type to implementation type - httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService) - if !ok { - logger.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", httpRestService) - return - } - - // Set orchestrator type - orchestrator := cns.SetOrchestratorTypeRequest{ - OrchestratorType: cns.KubernetesCRD, - } - httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator) - - // Get crd implementation of request controller - requestController, err = kubecontroller.NewCrdRequestController(httpRestServiceImplementation, kubeConfig) - if err != nil { - logger.Errorf("[Azure CNS] Failed to make crd request controller :%v", err) - return - } - - logger.Printf("Starting SyncHostNCVersion") - rootCxt := context.Background() - go func() { - // Periodically poll vfp programmed NC version from NMAgent - for { - <-time.NewTicker(cnsconfig.SyncHostNCVersionIntervalMs * time.Millisecond).C - httpRestServiceImplementation.SyncHostNCVersion(rootCxt, config.ChannelMode, cnsconfig.SyncHostNCTimeoutMs) - } - }() - - // initialize the ipam pool monitor - httpRestServiceImplementation.IPAMPoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(httpRestServiceImplementation, requestController) - - //Start the RequestController which starts the reconcile loop - requestControllerStopChannel := make(chan struct{}) - defer close(requestControllerStopChannel) - go func() { - if err := requestController.StartRequestController(requestControllerStopChannel); err != nil { - logger.Errorf("[Azure CNS] Failed to start request controller: %v", err) - return - } - }() - - ctx := context.Background() - go func() { - if err := httpRestServiceImplementation.IPAMPoolMonitor.Start(ctx, poolIPAMRefreshRateInMilliseconds); err != nil { - logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", err) - } - }() } var netPlugin network.NetPlugin @@ -709,3 +656,90 @@ func main() { logger.Printf("CNS exited") logger.Close() } + +// initializeCRD state +func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration.CNSConfig) { + var requestController requestcontroller.RequestController + + logger.Printf("[Azure CNS] Starting request controller") + + kubeConfig, err := kubecontroller.GetKubeConfig() + if err != nil { + logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err) + return + } + + //convert interface type to implementation type + httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService) + if !ok { + logger.Errorf("[Azure CNS] Failed to convert interface httpRestService to implementation: %v", httpRestService) + return + } + + // Set orchestrator type + orchestrator := cns.SetOrchestratorTypeRequest{ + OrchestratorType: cns.KubernetesCRD, + } + httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator) + + // Get crd implementation of request controller + requestController, err = kubecontroller.NewCrdRequestController(httpRestServiceImplementation, kubeConfig) + if err != nil { + logger.Errorf("[Azure CNS] Failed to make crd request controller :%v", err) + return + } + + err := requestController.InitRequestController() + if err != nil { + logger.Errorf("[Azure CNS] Failed to initialized cns state :%v", err) + return + } + + // initialize the ipam pool monitor + httpRestServiceImplementation.IPAMPoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(httpRestServiceImplementation, requestController) + + //Start the RequestController which starts the reconcile loop + requestControllerStopChannel := make(chan struct{}) + defer close(requestControllerStopChannel) + go func() { + for { + if err := requestController.StartRequestController(requestControllerStopChannel); err != nil { + logger.Errorf("[Azure CNS] Failed to start request controller: %v", err) + // retry to start the request controller + // todo: add a CNS metric to count # of failures + } else { + return + } + + // Retry after 1sec + time.Sleep(time.Second) + } + }() + + for { + if requestController.IsStarted() { + logger.Printf("RequestController is started") + break + } + + logger.Printf("Waiting for requestController to start...") + time.Sleep(time.Millisecond * 500) + } + + ctx := context.Background() + go func() { + if err := httpRestServiceImplementation.IPAMPoolMonitor.Start(ctx, poolIPAMRefreshRateInMilliseconds); err != nil { + logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", err) + } + }() + + logger.Printf("Starting SyncHostNCVersion") + rootCxt := context.Background() + go func() { + // Periodically poll vfp programmed NC version from NMAgent + for { + <-time.NewTicker(cnsconfig.SyncHostNCVersionIntervalMs * time.Millisecond).C + httpRestServiceImplementation.SyncHostNCVersion(rootCxt, cnsconfig.ChannelMode, cnsconfig.SyncHostNCTimeoutMs) + } + }() +} From cf9d8a46cd69be7e609eb9f7fe579ccd48075c60 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Sun, 28 Feb 2021 01:28:04 -0800 Subject: [PATCH 10/22] Fixed build --- cns/requestcontroller/kubecontroller/crdrequestcontroller.go | 3 ++- cns/service/main.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go index 588a6246bd..f21691d59d 100644 --- a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go +++ b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go @@ -188,7 +188,8 @@ func (crdRC *crdRequestController) StartRequestController(exitChan <-chan struct return nil } -func (crdRC *crdRequestController) isStarted() bool { +// return if RequestController is started +func (crdRC *crdRequestController) IsStarted() bool { defer crdRC.lock.Unlock() crdRC.lock.Lock() return crdRC.Started diff --git a/cns/service/main.go b/cns/service/main.go index 50fdc9a5e6..8d0b93f3c8 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -38,6 +38,7 @@ import ( "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" "github.com/Azure/azure-container-networking/store" + "context" ) const ( From b1ba0297989b6e37120c99f4862998acc7c7e060 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Sun, 28 Feb 2021 01:37:57 -0800 Subject: [PATCH 11/22] More fixes --- cns/service/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cns/service/main.go b/cns/service/main.go index 8d0b93f3c8..bada4c16e0 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -690,7 +690,7 @@ func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration return } - err := requestController.InitRequestController() + err = requestController.InitRequestController() if err != nil { logger.Errorf("[Azure CNS] Failed to initialized cns state :%v", err) return From e0e993a7557d06a6a0747841a5a2398499f11806 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Sun, 28 Feb 2021 02:06:15 -0800 Subject: [PATCH 12/22] Fixed reordering --- cns/service/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cns/service/main.go b/cns/service/main.go index bada4c16e0..0062d71f50 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -690,15 +690,15 @@ func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration return } + // initialize the ipam pool monitor + httpRestServiceImplementation.IPAMPoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(httpRestServiceImplementation, requestController) + err = requestController.InitRequestController() if err != nil { logger.Errorf("[Azure CNS] Failed to initialized cns state :%v", err) return } - // initialize the ipam pool monitor - httpRestServiceImplementation.IPAMPoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(httpRestServiceImplementation, requestController) - //Start the RequestController which starts the reconcile loop requestControllerStopChannel := make(chan struct{}) defer close(requestControllerStopChannel) From fb7eef83547dc340ef18ffd72fb43b43bf84b010 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Sun, 28 Feb 2021 03:37:10 -0800 Subject: [PATCH 13/22] Fix IPAM pool monitor init --- cns/ipampoolmonitor/ipampoolmonitor.go | 5 +++-- cns/requestcontroller/kubecontroller/crdrequestcontroller.go | 2 +- cns/service/main.go | 1 + 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cns/ipampoolmonitor/ipampoolmonitor.go b/cns/ipampoolmonitor/ipampoolmonitor.go index 33f2fe6f88..df4ab42b76 100644 --- a/cns/ipampoolmonitor/ipampoolmonitor.go +++ b/cns/ipampoolmonitor/ipampoolmonitor.go @@ -27,6 +27,7 @@ type CNSIPAMPoolMonitor struct { } func NewCNSIPAMPoolMonitor(cns cns.HTTPService, rc requestcontroller.RequestController) *CNSIPAMPoolMonitor { + logger.Printf("NewCNSIPAMPoolMonitor: Create IPAM Pool Monitor") return &CNSIPAMPoolMonitor{ pendingRelease: false, cns: cns, @@ -52,7 +53,7 @@ func (pm *CNSIPAMPoolMonitor) Start(ctx context.Context, poolMonitorRefreshMilli for { select { case <-ctx.Done(): - return fmt.Errorf("CNS IPAM Pool Monitor received cancellation signal") + return fmt.Errorf("[ipam-pool-monitor] CNS IPAM Pool Monitor received cancellation signal") case <-ticker.C: err := pm.Reconcile() if err != nil { @@ -70,7 +71,7 @@ func (pm *CNSIPAMPoolMonitor) Reconcile() error { availableIPConfigCount := len(pm.cns.GetAvailableIPConfigs()) // TODO: add pending allocation count to real cns freeIPConfigCount := pm.cachedNNC.Spec.RequestedIPCount - int64(allocatedPodIPCount) - msg := fmt.Sprintf("Pool Size: %v, Goal Size: %v, BatchSize: %v, MinFree: %v, MaxFree:%v, Allocated: %v, Available: %v, Pending Release: %v, Free: %v, Pending Program: %v", + msg := fmt.Sprintf("[ipam-pool-monitor] Pool Size: %v, Goal Size: %v, BatchSize: %v, MinFree: %v, MaxFree:%v, Allocated: %v, Available: %v, Pending Release: %v, Free: %v, Pending Program: %v", cnsPodIPConfigCount, pm.cachedNNC.Spec.RequestedIPCount, pm.scalarUnits.BatchSize, pm.MinimumFreeIps, pm.MaximumFreeIps, allocatedPodIPCount, availableIPConfigCount, pendingReleaseIPCount, freeIPConfigCount, pendingProgramCount) switch { diff --git a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go index f21691d59d..f0eeeae984 100644 --- a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go +++ b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go @@ -145,7 +145,7 @@ func NewCrdRequestController(restService *restserver.HTTPRestService, kubeconfig // InitRequestController will initialize/reconcile the CNS state func (crdRC *crdRequestController) InitRequestController() error { - logger.Printf("Initializing CNS state") + logger.Printf("InitRequestController") defer crdRC.lock.Unlock() crdRC.lock.Lock() diff --git a/cns/service/main.go b/cns/service/main.go index 0062d71f50..b507c140e9 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -728,6 +728,7 @@ func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration } ctx := context.Background() + logger.Printf("Starting IPAM Pool Monitor") go func() { if err := httpRestServiceImplementation.IPAMPoolMonitor.Start(ctx, poolIPAMRefreshRateInMilliseconds); err != nil { logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", err) From 194b0c86e02140d667d17cd511055e4c11cedc20 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 17:44:11 -0800 Subject: [PATCH 14/22] DONTPUSH --- cns/common/service.go | 1 + cns/restserver/ipam.go | 5 +++-- cns/restserver/restserver.go | 17 ++++++++++++++-- cns/service.go | 38 +++++++++++++++++++++++++++++++----- 4 files changed, 52 insertions(+), 9 deletions(-) diff --git a/cns/common/service.go b/cns/common/service.go index ff9918f717..31a3e7464c 100644 --- a/cns/common/service.go +++ b/cns/common/service.go @@ -24,6 +24,7 @@ type Service struct { // ServiceAPI defines base interface. type ServiceAPI interface { + Init(*ServiceConfig) error Start(*ServiceConfig) error Stop() GetOption(string) interface{} diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index 9fafd7ddbb..506c1c16ca 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -407,12 +407,13 @@ func (service *HTTPRestService) AllocateDesiredIPConfig(podInfo cns.KubernetesPo } return podIpInfo, fmt.Errorf("[AllocateDesiredIPConfig] Desired IP is already allocated %+v to Pod: %+v, requested for pod %+v", ipConfig, pInfo, podInfo) } - } else if ipConfig.State == cns.Available { + } else if ipConfig.State == cns.Available || ipConfig.State == cns.PendingProgramming { + // This race can happen during restart, where CNS state is lost and thus we have lost the NC programmed version + // As part of reconcile, we mark IPs as Allocated which are already allocated to PODs (listed from APIServer) _, err := service.setIPConfigAsAllocated(ipConfig, podInfo, orchestratorContext) if err != nil { return podIpInfo, err } - found = true } else { return podIpInfo, fmt.Errorf("[AllocateDesiredIPConfig] Desired IP is not available %+v", ipConfig) diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index c2fd6649a2..a3f1b77e0d 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -130,8 +130,8 @@ func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdscl }, nil } -// Start starts the CNS listener. -func (service *HTTPRestService) Start(config *common.ServiceConfig) error { +// Init starts the CNS listener. +func (service *HTTPRestService) Init(config *common.ServiceConfig) error { err := service.Initialize(config) if err != nil { logger.Errorf("[Azure CNS] Failed to initialize base service, err:%v.", err) @@ -209,6 +209,19 @@ func (service *HTTPRestService) Start(config *common.ServiceConfig) error { return nil } +// Start starts the CNS listener. +func (service *HTTPRestService) Start(config *common.ServiceConfig) error { + + // Start the listener. + // continue to listen on the normal endpoint for http traffic, this will be supported + // for sometime until partners migrate fully to https + if err := service.StartListener(config); err != nil { + return err + } + + return nil +} + // Stop stops the CNS. func (service *HTTPRestService) Stop() { service.Uninitialize() diff --git a/cns/service.go b/cns/service.go index 79200bdc8b..c907b73f06 100644 --- a/cns/service.go +++ b/cns/service.go @@ -4,6 +4,8 @@ package cns import ( + "fmt" + "github.com/Azure/azure-container-networking/cns/logger" "net/http" "net/url" "strings" @@ -80,11 +82,18 @@ func (service *Service) Initialize(config *common.ServiceConfig) error { return err } } - // Start the listener. - // continue to listen on the normal endpoint for http traffic, this will be supported - // for sometime until partners migrate fully to https - if err = listener.Start(config.ErrChan); err != nil { - return err + + // todo: Do we need to run this logic for all channel types + if config.ChannelMode == CRD { + logger.Printf("HTTP listener will be started later after CNS state has been reconciled") + } else { + logger.Printf("Starting HTTP listener") + // Start the listener. + // continue to listen on the normal endpoint for http traffic, this will be supported + // for sometime until partners migrate fully to https + if err = listener.Start(config.ErrChan); err != nil { + return err + } } config.Listener = listener } @@ -95,6 +104,25 @@ func (service *Service) Initialize(config *common.ServiceConfig) error { return nil } +func (service *Service) StartListener(config *common.ServiceConfig) error { + log.Debugf("[Azure CNS] Going to start listener: %+v", config) + + // Initialize the listener. + if service.Listener != nil { + log.Debugf("[Azure CNS] Starting listener: %+v", config) + // Start the listener. + // continue to listen on the normal endpoint for http traffic, this will be supported + // for sometime until partners migrate fully to https + if err := service.Listener.Start(config.ErrChan); err != nil { + return err + } + } else { + return fmt.Errorf("Failed to start a listener, it is not initialized, config %+v", config) + } + + return nil +} + // Uninitialize cleans up the plugin. func (service *Service) Uninitialize() { service.Listener.Stop() From b7553d860b7513ef7fa3ebc7a5491a69d13d692e Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 17:56:28 -0800 Subject: [PATCH 15/22] Fixed HTTPRestService initialization --- cns/cnsclient/cnsclient_test.go | 8 +++++++- cns/restserver/api_test.go | 6 ++++++ cns/service.go | 13 +------------ cns/service/main.go | 30 ++++++++++++++++++------------ 4 files changed, 32 insertions(+), 25 deletions(-) diff --git a/cns/cnsclient/cnsclient_test.go b/cns/cnsclient/cnsclient_test.go index 850438e922..b566033d62 100644 --- a/cns/cnsclient/cnsclient_test.go +++ b/cns/cnsclient/cnsclient_test.go @@ -137,9 +137,15 @@ func TestMain(m *testing.M) { } if httpRestService != nil { + err = httpRestService.Init(&config) + if err != nil { + logger.Errorf("Failed to initialize HttpService, err:%v.\n", err) + return + } + err = httpRestService.Start(&config) if err != nil { - logger.Errorf("Failed to start CNS, err:%v.\n", err) + logger.Errorf("Failed to start HttpService, err:%v.\n", err) return } } diff --git a/cns/restserver/api_test.go b/cns/restserver/api_test.go index 280130b860..49cc8e5fb4 100644 --- a/cns/restserver/api_test.go +++ b/cns/restserver/api_test.go @@ -941,6 +941,12 @@ func startService() error { file, _ := os.Create(cnsJsonFileName) file.Close() + err = service.Init(&config) + if err != nil { + logger.Errorf("Failed to Init CNS, err:%v.\n", err) + return err + } + err = service.Start(&config) if err != nil { logger.Errorf("Failed to start CNS, err:%v.\n", err) diff --git a/cns/service.go b/cns/service.go index c907b73f06..32b439a985 100644 --- a/cns/service.go +++ b/cns/service.go @@ -83,18 +83,7 @@ func (service *Service) Initialize(config *common.ServiceConfig) error { } } - // todo: Do we need to run this logic for all channel types - if config.ChannelMode == CRD { - logger.Printf("HTTP listener will be started later after CNS state has been reconciled") - } else { - logger.Printf("Starting HTTP listener") - // Start the listener. - // continue to listen on the normal endpoint for http traffic, this will be supported - // for sometime until partners migrate fully to https - if err = listener.Start(config.ErrChan); err != nil { - return err - } - } + logger.Printf("HTTP listener will be started later after CNS state has been reconciled") config.Listener = listener } diff --git a/cns/service/main.go b/cns/service/main.go index b507c140e9..397d3de089 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -10,6 +10,7 @@ import ( "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" "github.com/Azure/azure-container-networking/cns/requestcontroller" "github.com/Azure/azure-container-networking/cns/requestcontroller/kubecontroller" + localtls "github.com/Azure/azure-container-networking/server/tls" "net/http" "os" "os/signal" @@ -19,10 +20,9 @@ import ( "syscall" "time" - localtls "github.com/Azure/azure-container-networking/server/tls" - "github.com/Azure/azure-container-networking/cns/nmagentclient" + "context" "github.com/Azure/azure-container-networking/aitelemetry" "github.com/Azure/azure-container-networking/cnm/ipam" "github.com/Azure/azure-container-networking/cnm/network" @@ -38,7 +38,6 @@ import ( "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" "github.com/Azure/azure-container-networking/store" - "context" ) const ( @@ -483,15 +482,7 @@ func main() { } } - - // Initialze state in if CNS is running in CRD mode - // State must be initialized before we start HTTPRestService - if config.ChannelMode == cns.CRD { - IniitalizeCRDState(httpRestService, cnsconfig) - } - - logger.Printf("[Azure CNS] Starting HTTPRestService") - // Start CNS. + logger.Printf("[Azure CNS] Initialize HTTPRestService") if httpRestService != nil { if cnsconfig.UseHTTPS { config.TlsSettings = localtls.TlsSettings{ @@ -501,6 +492,21 @@ func main() { } } + err = httpRestService.Init(&config) + if err != nil { + logger.Errorf("Failed to init HTTPService, err:%v.\n", err) + return + } + } + + // Initialze state in if CNS is running in CRD mode + // State must be initialized before we start HTTPRestService + if config.ChannelMode == cns.CRD { + IniitalizeCRDState(httpRestService, cnsconfig) + } + + logger.Printf("[Azure CNS] Start HTTP listener") + if (httpRestService != nil) { err = httpRestService.Start(&config) if err != nil { logger.Errorf("Failed to start CNS, err:%v.\n", err) From 438acda83e86457b3f3ff98edcf71a0d56864cfd Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 18:34:19 -0800 Subject: [PATCH 16/22] Fixed fakes --- cns/fakes/cnsfake.go | 5 +++++ cns/fakes/requestcontrollerfake.go | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/cns/fakes/cnsfake.go b/cns/fakes/cnsfake.go index 41bf451c19..3141ef959e 100644 --- a/cns/fakes/cnsfake.go +++ b/cns/fakes/cnsfake.go @@ -306,4 +306,9 @@ func (fake *HTTPServiceFake) Start(*common.ServiceConfig) error { return nil } + +func (fake *HTTPServiceFake) Init(*common.ServiceConfig) error { + return nil +} + func (fake *HTTPServiceFake) Stop() {} diff --git a/cns/fakes/requestcontrollerfake.go b/cns/fakes/requestcontrollerfake.go index 6b46c6f2c0..9060f88bcc 100644 --- a/cns/fakes/requestcontrollerfake.go +++ b/cns/fakes/requestcontrollerfake.go @@ -62,10 +62,18 @@ func (rc *RequestControllerFake) CarveIPConfigsAndAddToStatusAndCNS(numberOfIPCo return cnsIPConfigs } +func (rc *RequestControllerFake) InitRequestController() error { + return nil +} + func (rc *RequestControllerFake) StartRequestController(exitChan <-chan struct{}) error { return nil } +func (rc *RequestControllerFake) IsStarted() bool { + return true +} + func (rc *RequestControllerFake) UpdateCRDSpec(cntxt context.Context, desiredSpec nnc.NodeNetworkConfigSpec) error { rc.cachedCRD.Spec = desiredSpec From 78590ead2c678bca94d0a88cc548ed724fd8dcef Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 19:39:01 -0800 Subject: [PATCH 17/22] Fix IPAMPoolMonitor to handle error from CRD during decrease --- cns/ipampoolmonitor/ipampoolmonitor.go | 61 ++++++++++++------- .../kubecontroller/crdrequestcontroller.go | 2 +- 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/cns/ipampoolmonitor/ipampoolmonitor.go b/cns/ipampoolmonitor/ipampoolmonitor.go index df4ab42b76..b16c35d55b 100644 --- a/cns/ipampoolmonitor/ipampoolmonitor.go +++ b/cns/ipampoolmonitor/ipampoolmonitor.go @@ -16,9 +16,10 @@ type CNSIPAMPoolMonitor struct { pendingRelease bool cachedNNC nnc.NodeNetworkConfig + updatingPendingIpsNotInUse map[string]cns.IPConfigurationStatus scalarUnits nnc.Scaler - cns cns.HTTPService + httpService cns.HTTPService rc requestcontroller.RequestController MinimumFreeIps int64 MaximumFreeIps int64 @@ -26,12 +27,13 @@ type CNSIPAMPoolMonitor struct { mu sync.RWMutex } -func NewCNSIPAMPoolMonitor(cns cns.HTTPService, rc requestcontroller.RequestController) *CNSIPAMPoolMonitor { +func NewCNSIPAMPoolMonitor(httpService cns.HTTPService, rc requestcontroller.RequestController) *CNSIPAMPoolMonitor { logger.Printf("NewCNSIPAMPoolMonitor: Create IPAM Pool Monitor") return &CNSIPAMPoolMonitor{ - pendingRelease: false, - cns: cns, - rc: rc, + pendingRelease: false, + httpService: httpService, + rc: rc, + updatingPendingIpsNotInUse: make(map[string]cns.IPConfigurationStatus), } } @@ -64,11 +66,11 @@ func (pm *CNSIPAMPoolMonitor) Start(ctx context.Context, poolMonitorRefreshMilli } func (pm *CNSIPAMPoolMonitor) Reconcile() error { - cnsPodIPConfigCount := len(pm.cns.GetPodIPConfigState()) - pendingProgramCount := len(pm.cns.GetPendingProgramIPConfigs()) // TODO: add pending program count to real cns - allocatedPodIPCount := len(pm.cns.GetAllocatedIPConfigs()) - pendingReleaseIPCount := len(pm.cns.GetPendingReleaseIPConfigs()) - availableIPConfigCount := len(pm.cns.GetAvailableIPConfigs()) // TODO: add pending allocation count to real cns + cnsPodIPConfigCount := len(pm.httpService.GetPodIPConfigState()) + pendingProgramCount := len(pm.httpService.GetPendingProgramIPConfigs()) // TODO: add pending program count to real cns + allocatedPodIPCount := len(pm.httpService.GetAllocatedIPConfigs()) + pendingReleaseIPCount := len(pm.httpService.GetPendingReleaseIPConfigs()) + availableIPConfigCount := len(pm.httpService.GetAvailableIPConfigs()) // TODO: add pending allocation count to real cns freeIPConfigCount := pm.cachedNNC.Spec.RequestedIPCount - int64(allocatedPodIPCount) msg := fmt.Sprintf("[ipam-pool-monitor] Pool Size: %v, Goal Size: %v, BatchSize: %v, MinFree: %v, MaxFree:%v, Allocated: %v, Available: %v, Pending Release: %v, Free: %v, Pending Program: %v", @@ -83,7 +85,7 @@ func (pm *CNSIPAMPoolMonitor) Reconcile() error { // pod count is decreasing case freeIPConfigCount > pm.MaximumFreeIps: logger.Printf("[ipam-pool-monitor] Decreasing pool size...%s ", msg) - return pm.decreasePoolSize() + return pm.decreasePoolSize(pendingReleaseIPCount) // CRD has reconciled CNS state, and target spec is now the same size as the state // free to remove the IP's from the CRD @@ -112,7 +114,7 @@ func (pm *CNSIPAMPoolMonitor) increasePoolSize() error { } tempNNCSpec.RequestedIPCount += pm.scalarUnits.BatchSize - logger.Printf("[ipam-pool-monitor] Increasing pool size, Current Pool Size: %v, Updated Requested IP Count: %v, Pods with IP's:%v, ToBeDeleted Count: %v", len(pm.cns.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse)) + logger.Printf("[ipam-pool-monitor] Increasing pool size, Current Pool Size: %v, Updated Requested IP Count: %v, Pods with IP's:%v, ToBeDeleted Count: %v", len(pm.httpService.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.httpService.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse)) err = pm.rc.UpdateCRDSpec(context.Background(), tempNNCSpec) if err != nil { @@ -125,18 +127,29 @@ func (pm *CNSIPAMPoolMonitor) increasePoolSize() error { return nil } -func (pm *CNSIPAMPoolMonitor) decreasePoolSize() error { +func (pm *CNSIPAMPoolMonitor) decreasePoolSize(existingPendingReleaseIPCount int) error { pm.mu.Lock() defer pm.mu.Unlock() // mark n number of IP's as pending - pendingIpAddresses, err := pm.cns.MarkIPAsPendingRelease(int(pm.scalarUnits.BatchSize)) - if err != nil { - return err + var err error + var pendingIpAddresses map[string]cns.IPConfigurationStatus + updatingPendingReleaseIpsCount := len(pm.updatingPendingIpsNotInUse) + if updatingPendingReleaseIpsCount == 0 || + updatingPendingReleaseIpsCount <= existingPendingReleaseIPCount { + logger.Printf("[ipam-pool-monitor] Marking IPs as PendingRelease, ipsToBeReleasedCount %d", int(pm.scalarUnits.BatchSize)) + pendingIpAddresses, err = pm.httpService.MarkIPAsPendingRelease(int(pm.scalarUnits.BatchSize)) + if err != nil { + return err + } + + // cache the updatingPendingRelease so that we dont re-set new IPs to PendingRelease in case UpdateCRD call fails + for _, ipConfig := range pendingIpAddresses { + pm.updatingPendingIpsNotInUse[ipConfig.ID] = ipConfig + } } - totalIpsSetForRelease := len(pendingIpAddresses) - logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d", totalIpsSetForRelease) + logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d, updatingPendingIpsNotInUse count %d", len(pendingIpAddresses), len(pm.updatingPendingIpsNotInUse)) var tempNNCSpec nnc.NodeNetworkConfigSpec tempNNCSpec, err = pm.createNNCSpecForCRD(false) @@ -144,8 +157,8 @@ func (pm *CNSIPAMPoolMonitor) decreasePoolSize() error { return err } - tempNNCSpec.RequestedIPCount -= int64(totalIpsSetForRelease) - logger.Printf("[ipam-pool-monitor] Decreasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's: %v, ToBeDeleted Count: %v", len(pm.cns.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse)) + tempNNCSpec.RequestedIPCount -= int64(len(pendingIpAddresses)) + logger.Printf("[ipam-pool-monitor] Decreasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's: %v, ToBeDeleted Count: %v", len(pm.httpService.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.httpService.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse)) err = pm.rc.UpdateCRDSpec(context.Background(), tempNNCSpec) if err != nil { @@ -156,6 +169,12 @@ func (pm *CNSIPAMPoolMonitor) decreasePoolSize() error { // save the updated state to cachedSpec pm.cachedNNC.Spec = tempNNCSpec pm.pendingRelease = true + + // clear the updatingPendingIpsNotInUse, as we have Updated the CRD + logger.Printf("[ipam-pool-monitor] cleaning the updatingPendingIpsNotInUse, existing length %d", len(pm.updatingPendingIpsNotInUse)) + for key, _ := range pm.updatingPendingIpsNotInUse { + delete(pm.updatingPendingIpsNotInUse, key) + } return nil } @@ -198,7 +217,7 @@ func (pm *CNSIPAMPoolMonitor) createNNCSpecForCRD(resetNotInUseList bool) (nnc.N spec.IPsNotInUse = make([]string, 0) } else { // Get All Pending IPs from CNS and populate it again. - pendingIps := pm.cns.GetPendingReleaseIPConfigs() + pendingIps := pm.httpService.GetPendingReleaseIPConfigs() for _, pendingIp := range pendingIps { spec.IPsNotInUse = append(spec.IPsNotInUse, pendingIp.ID) } diff --git a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go index f0eeeae984..59d7709009 100644 --- a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go +++ b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go @@ -271,7 +271,7 @@ func (crdRC *crdRequestController) initCNS() error { } // UpdateCRDSpec updates the CRD spec -func (crdRC *crdRequestController) UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error { +func (crdRC *crdRequestController) UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error { nodeNetworkConfig, err := crdRC.getNodeNetConfig(cntxt, crdRC.nodeName, k8sNamespace) if err != nil { logger.Errorf("[cns-rc] Error getting CRD when updating spec %v", err) From d11a00c848d3f5def97a454d14b541d01541e0b9 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 20:46:50 -0800 Subject: [PATCH 18/22] additional logs --- cns/ipampoolmonitor/ipampoolmonitor.go | 18 ++++++++++++++---- cns/service/main.go | 5 +++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/cns/ipampoolmonitor/ipampoolmonitor.go b/cns/ipampoolmonitor/ipampoolmonitor.go index b16c35d55b..4aa93758b2 100644 --- a/cns/ipampoolmonitor/ipampoolmonitor.go +++ b/cns/ipampoolmonitor/ipampoolmonitor.go @@ -103,8 +103,8 @@ func (pm *CNSIPAMPoolMonitor) Reconcile() error { } func (pm *CNSIPAMPoolMonitor) increasePoolSize() error { - pm.mu.Lock() defer pm.mu.Unlock() + pm.mu.Lock() var err error var tempNNCSpec nnc.NodeNetworkConfigSpec @@ -122,14 +122,15 @@ func (pm *CNSIPAMPoolMonitor) increasePoolSize() error { return err } + logger.Printf("[ipam-pool-monitor] Increasing pool size: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec) // save the updated state to cachedSpec pm.cachedNNC.Spec = tempNNCSpec return nil } func (pm *CNSIPAMPoolMonitor) decreasePoolSize(existingPendingReleaseIPCount int) error { - pm.mu.Lock() defer pm.mu.Unlock() + pm.mu.Lock() // mark n number of IP's as pending var err error @@ -166,6 +167,8 @@ func (pm *CNSIPAMPoolMonitor) decreasePoolSize(existingPendingReleaseIPCount int return err } + logger.Printf("[ipam-pool-monitor] Decreasing pool size: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec) + // save the updated state to cachedSpec pm.cachedNNC.Spec = tempNNCSpec pm.pendingRelease = true @@ -181,8 +184,8 @@ func (pm *CNSIPAMPoolMonitor) decreasePoolSize(existingPendingReleaseIPCount int // if cns pending ip release map is empty, request controller has already reconciled the CNS state, // so we can remove it from our cache and remove the IP's from the CRD func (pm *CNSIPAMPoolMonitor) cleanPendingRelease() error { - pm.mu.Lock() defer pm.mu.Unlock() + pm.mu.Lock() var err error var tempNNCSpec nnc.NodeNetworkConfigSpec @@ -197,6 +200,9 @@ func (pm *CNSIPAMPoolMonitor) cleanPendingRelease() error { return err } + logger.Printf("[ipam-pool-monitor] cleanPendingRelease: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec) + + // save the updated state to cachedSpec pm.cachedNNC.Spec = tempNNCSpec pm.pendingRelease = false @@ -228,8 +234,9 @@ func (pm *CNSIPAMPoolMonitor) createNNCSpecForCRD(resetNotInUseList bool) (nnc.N // UpdatePoolLimitsTransacted called by request controller on reconcile to set the batch size limits func (pm *CNSIPAMPoolMonitor) Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error { - pm.mu.Lock() defer pm.mu.Unlock() + pm.mu.Lock() + pm.scalarUnits = scalar pm.MinimumFreeIps = int64(float64(pm.scalarUnits.BatchSize) * (float64(pm.scalarUnits.RequestThresholdPercent) / 100)) @@ -237,5 +244,8 @@ func (pm *CNSIPAMPoolMonitor) Update(scalar nnc.Scaler, spec nnc.NodeNetworkConf pm.cachedNNC.Spec = spec + logger.Printf("[ipam-pool-monitor] Update spec %+v, pm.MinimumFreeIps %d, pm.MaximumFreeIps %d", + pm.cachedNNC.Spec, pm.MinimumFreeIps, pm.MaximumFreeIps) + return nil } diff --git a/cns/service/main.go b/cns/service/main.go index 397d3de089..090d7771ca 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -715,6 +715,7 @@ func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration // retry to start the request controller // todo: add a CNS metric to count # of failures } else { + logger.Printf("[Azure CNS] Exiting RequestController") return } @@ -739,6 +740,8 @@ func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration if err := httpRestServiceImplementation.IPAMPoolMonitor.Start(ctx, poolIPAMRefreshRateInMilliseconds); err != nil { logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", err) } + + logger.Printf("[Azure CNS] Exiting IPAM Pool Monitor") }() logger.Printf("Starting SyncHostNCVersion") @@ -749,5 +752,7 @@ func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration <-time.NewTicker(cnsconfig.SyncHostNCVersionIntervalMs * time.Millisecond).C httpRestServiceImplementation.SyncHostNCVersion(rootCxt, cnsconfig.ChannelMode, cnsconfig.SyncHostNCTimeoutMs) } + + logger.Printf("[Azure CNS] Exiting SyncHostNCVersion") }() } From 559f71fc14682336519ff206556e581248e5ed56 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 21:05:26 -0800 Subject: [PATCH 19/22] Fixed exit channel for CRD REquestController --- cns/service/main.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cns/service/main.go b/cns/service/main.go index 090d7771ca..b0d72cb139 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -502,7 +502,9 @@ func main() { // Initialze state in if CNS is running in CRD mode // State must be initialized before we start HTTPRestService if config.ChannelMode == cns.CRD { - IniitalizeCRDState(httpRestService, cnsconfig) + requestControllerStopChannel := make(chan struct{}) + defer close(requestControllerStopChannel) + IniitalizeCRDState(httpRestService, cnsconfig, requestControllerStopChannel) } logger.Printf("[Azure CNS] Start HTTP listener") @@ -665,7 +667,7 @@ func main() { } // initializeCRD state -func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration.CNSConfig) { +func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration.CNSConfig, exitChan <-chan struct{}) { var requestController requestcontroller.RequestController logger.Printf("[Azure CNS] Starting request controller") @@ -706,11 +708,9 @@ func IniitalizeCRDState(httpRestService cns.HTTPService, cnsconfig configuration } //Start the RequestController which starts the reconcile loop - requestControllerStopChannel := make(chan struct{}) - defer close(requestControllerStopChannel) go func() { for { - if err := requestController.StartRequestController(requestControllerStopChannel); err != nil { + if err := requestController.StartRequestController(exitChan); err != nil { logger.Errorf("[Azure CNS] Failed to start request controller: %v", err) // retry to start the request controller // todo: add a CNS metric to count # of failures From 02444e00f0a7a7bb715e808a5b1c2f711e5ea4a1 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 21:27:59 -0800 Subject: [PATCH 20/22] Fixed logging --- cns/NetworkContainerContract.go | 5 +++++ .../kubecontroller/crdreconciler.go | 14 +++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/cns/NetworkContainerContract.go b/cns/NetworkContainerContract.go index 5d8ee0ca6e..770540635c 100644 --- a/cns/NetworkContainerContract.go +++ b/cns/NetworkContainerContract.go @@ -219,6 +219,11 @@ type IPConfigRequest struct { OrchestratorContext json.RawMessage } +func (i IPConfigRequest) String() string { + return fmt.Sprintf("[IPConfigRequest: DesiredIPAddress %s, OrchestratorContext %s]", + i.DesiredIPAddress, string(i.OrchestratorContext)) +} + // IPConfigResponse is used in CNS IPAM mode as a response to CNI ADD type IPConfigResponse struct { PodIpInfo PodIpInfo diff --git a/cns/requestcontroller/kubecontroller/crdreconciler.go b/cns/requestcontroller/kubecontroller/crdreconciler.go index a4ecc8281e..51845feb2f 100644 --- a/cns/requestcontroller/kubecontroller/crdreconciler.go +++ b/cns/requestcontroller/kubecontroller/crdreconciler.go @@ -41,13 +41,25 @@ func (r *CrdReconciler) Reconcile(request reconcile.Request) (reconcile.Result, } logger.Printf("[cns-rc] CRD Spec: %v", nodeNetConfig.Spec) - logger.Printf("[cns-rc] CRD Status: %v", nodeNetConfig.Status) + // If there are no network containers, don't hand it off to CNS if len(nodeNetConfig.Status.NetworkContainers) == 0 { + logger.Errorf("[cns-rc] Empty NetworkContainers") return reconcile.Result{}, nil } + networkContainer := nodeNetConfig.Status.NetworkContainers[0] + logger.Printf("[cns-rc] CRD Status: NcId: [%s], Version: [%s], podSubnet: [%s], Subnet CIDR: [%s], " + + "Gateway Addr: [%s], Primary IP: [%s], SecondaryIpsCount: [%d]", + networkContainer.ID, + networkContainer.Version, + networkContainer.SubnetName, + networkContainer.SubnetAddressSpace, + networkContainer.DefaultGateway, + networkContainer.PrimaryIP, + len(networkContainer.IPAssignments)) + // Otherwise, create NC request and hand it off to CNS ncRequest, err = CRDStatusToNCRequest(nodeNetConfig.Status) if err != nil { From cca6fbf56b017cace781a6c1620e2950da281807 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 21:39:39 -0800 Subject: [PATCH 21/22] Fixed logging --- cns/requestcontroller/kubecontroller/crdreconciler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cns/requestcontroller/kubecontroller/crdreconciler.go b/cns/requestcontroller/kubecontroller/crdreconciler.go index 51845feb2f..ba408a2b59 100644 --- a/cns/requestcontroller/kubecontroller/crdreconciler.go +++ b/cns/requestcontroller/kubecontroller/crdreconciler.go @@ -50,7 +50,7 @@ func (r *CrdReconciler) Reconcile(request reconcile.Request) (reconcile.Result, } networkContainer := nodeNetConfig.Status.NetworkContainers[0] - logger.Printf("[cns-rc] CRD Status: NcId: [%s], Version: [%s], podSubnet: [%s], Subnet CIDR: [%s], " + + logger.Printf("[cns-rc] CRD Status: NcId: [%s], Version: [%d], podSubnet: [%s], Subnet CIDR: [%s], " + "Gateway Addr: [%s], Primary IP: [%s], SecondaryIpsCount: [%d]", networkContainer.ID, networkContainer.Version, From 0cc36b3eeb7826c940de8f26f2dc2f243fa7ae95 Mon Sep 17 00:00:00 2001 From: neaggarw Date: Mon, 1 Mar 2021 23:15:51 -0800 Subject: [PATCH 22/22] Fixed decrease ipam pool monitor --- cns/ipampoolmonitor/ipampoolmonitor.go | 30 +++++++++++++------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/cns/ipampoolmonitor/ipampoolmonitor.go b/cns/ipampoolmonitor/ipampoolmonitor.go index 4aa93758b2..b4fcd2b94d 100644 --- a/cns/ipampoolmonitor/ipampoolmonitor.go +++ b/cns/ipampoolmonitor/ipampoolmonitor.go @@ -16,7 +16,7 @@ type CNSIPAMPoolMonitor struct { pendingRelease bool cachedNNC nnc.NodeNetworkConfig - updatingPendingIpsNotInUse map[string]cns.IPConfigurationStatus + updatingIpsNotInUseCount int scalarUnits nnc.Scaler httpService cns.HTTPService @@ -33,7 +33,6 @@ func NewCNSIPAMPoolMonitor(httpService cns.HTTPService, rc requestcontroller.Req pendingRelease: false, httpService: httpService, rc: rc, - updatingPendingIpsNotInUse: make(map[string]cns.IPConfigurationStatus), } } @@ -134,30 +133,32 @@ func (pm *CNSIPAMPoolMonitor) decreasePoolSize(existingPendingReleaseIPCount int // mark n number of IP's as pending var err error + var newIpsMarkedAsPending bool var pendingIpAddresses map[string]cns.IPConfigurationStatus - updatingPendingReleaseIpsCount := len(pm.updatingPendingIpsNotInUse) - if updatingPendingReleaseIpsCount == 0 || - updatingPendingReleaseIpsCount <= existingPendingReleaseIPCount { + if pm.updatingIpsNotInUseCount == 0 || + pm.updatingIpsNotInUseCount < existingPendingReleaseIPCount { logger.Printf("[ipam-pool-monitor] Marking IPs as PendingRelease, ipsToBeReleasedCount %d", int(pm.scalarUnits.BatchSize)) pendingIpAddresses, err = pm.httpService.MarkIPAsPendingRelease(int(pm.scalarUnits.BatchSize)) if err != nil { return err } - // cache the updatingPendingRelease so that we dont re-set new IPs to PendingRelease in case UpdateCRD call fails - for _, ipConfig := range pendingIpAddresses { - pm.updatingPendingIpsNotInUse[ipConfig.ID] = ipConfig - } + newIpsMarkedAsPending = true } - logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d, updatingPendingIpsNotInUse count %d", len(pendingIpAddresses), len(pm.updatingPendingIpsNotInUse)) - var tempNNCSpec nnc.NodeNetworkConfigSpec tempNNCSpec, err = pm.createNNCSpecForCRD(false) if err != nil { return err } + if newIpsMarkedAsPending { + // cache the updatingPendingRelease so that we dont re-set new IPs to PendingRelease in case UpdateCRD call fails + pm.updatingIpsNotInUseCount = len(tempNNCSpec.IPsNotInUse) + } + + logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d, updatingPendingIpsNotInUse count %d", len(pendingIpAddresses), pm.updatingIpsNotInUseCount) + tempNNCSpec.RequestedIPCount -= int64(len(pendingIpAddresses)) logger.Printf("[ipam-pool-monitor] Decreasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's: %v, ToBeDeleted Count: %v", len(pm.httpService.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.httpService.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse)) @@ -174,10 +175,9 @@ func (pm *CNSIPAMPoolMonitor) decreasePoolSize(existingPendingReleaseIPCount int pm.pendingRelease = true // clear the updatingPendingIpsNotInUse, as we have Updated the CRD - logger.Printf("[ipam-pool-monitor] cleaning the updatingPendingIpsNotInUse, existing length %d", len(pm.updatingPendingIpsNotInUse)) - for key, _ := range pm.updatingPendingIpsNotInUse { - delete(pm.updatingPendingIpsNotInUse, key) - } + logger.Printf("[ipam-pool-monitor] cleaning the updatingPendingIpsNotInUse, existing length %d", pm.updatingIpsNotInUseCount) + pm.updatingIpsNotInUseCount = 0 + return nil }