diff --git a/cns/api.go b/cns/api.go index e933d42b56..2b719eaf06 100644 --- a/cns/api.go +++ b/cns/api.go @@ -3,7 +3,11 @@ package cns -import "encoding/json" +import ( + "encoding/json" + + "github.com/Azure/azure-container-networking/cns/common" +) // Container Network Service remote API Contract const ( @@ -25,6 +29,27 @@ const ( V2Prefix = "/v0.2" ) +// HTTPService describes the min API interface that every service should have. +type HTTPService interface { + common.ServiceAPI + SendNCSnapShotPeriodically(int, chan bool) + SetNodeOrchestrator(*SetOrchestratorTypeRequest) + SyncNodeStatus(string, string, string, json.RawMessage) (int, string) + GetAvailableIPConfigs() []IPConfigurationStatus + GetPodIPConfigState() map[string]IPConfigurationStatus + MarkIPsAsPending(numberToMark int) (map[string]SecondaryIPConfig, error) +} + +// This is used for KubernetesCRD orchastrator Type where NC has multiple ips. +// This struct captures the state for SecondaryIPs associated to a given NC +type IPConfigurationStatus struct { + NCID string + ID string //uuid + IPAddress string + State string + OrchestratorContext json.RawMessage +} + // SetEnvironmentRequest describes the Request to set the environment in CNS. type SetEnvironmentRequest struct { Location string @@ -136,6 +161,17 @@ type NodeConfiguration struct { NodeSubnet Subnet } +type IPAMPoolMonitor interface { + Start() error + UpdatePoolLimitsTransacted(ScalarUnits) +} + +type ScalarUnits struct { + BatchSize int64 + RequestThresholdPercent int64 + ReleaseThresholdPercent int64 +} + // Response describes generic response from CNS. type Response struct { ReturnCode int diff --git a/cns/cnsclient/apiclient.go b/cns/cnsclient/apiclient.go index 30a31d872b..3fc0493840 100644 --- a/cns/cnsclient/apiclient.go +++ b/cns/cnsclient/apiclient.go @@ -4,6 +4,6 @@ import "github.com/Azure/azure-container-networking/cns" // APIClient interface to update cns state type APIClient interface { - ReconcileNCState(*cns.CreateNetworkContainerRequest, map[string]cns.KubernetesPodInfo) error - CreateOrUpdateNC(cns.CreateNetworkContainerRequest) error + ReconcileNCState(nc *cns.CreateNetworkContainerRequest, pods map[string]cns.KubernetesPodInfo, scalarUnits cns.ScalarUnits) error + CreateOrUpdateNC(nc cns.CreateNetworkContainerRequest, scalarUnits cns.ScalarUnits) error } diff --git a/cns/cnsclient/cnsclient_test.go b/cns/cnsclient/cnsclient_test.go index fb49b0d9cd..e96a39565c 100644 --- a/cns/cnsclient/cnsclient_test.go +++ b/cns/cnsclient/cnsclient_test.go @@ -15,6 +15,7 @@ import ( "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/common" "github.com/Azure/azure-container-networking/cns/fakes" + "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/restserver" "github.com/Azure/azure-container-networking/log" @@ -38,6 +39,7 @@ var ( func addTestStateToRestServer(t *testing.T, secondaryIps []string) { var ipConfig cns.IPConfiguration + var scalarUnits cns.ScalarUnits ipConfig.DNSServers = dnsservers ipConfig.GatewayIPAddress = gatewayIp var ipSubnet cns.IPSubnet @@ -61,7 +63,7 @@ func addTestStateToRestServer(t *testing.T, secondaryIps []string) { SecondaryIPConfigs: secondaryIPConfigs, } - returnCode := svc.CreateOrUpdateNetworkContainerInternal(req) + returnCode := svc.CreateOrUpdateNetworkContainerInternal(req, scalarUnits) if returnCode != 0 { t.Fatalf("Failed to createNetworkContainerRequest, req: %+v, err: %d", req, returnCode) } @@ -121,6 +123,8 @@ func TestMain(m *testing.M) { httpRestService, err := restserver.NewHTTPRestService(&config, fakes.NewFakeImdsClient()) svc = httpRestService.(*restserver.HTTPRestService) svc.Name = "cns-test-server" + svc.PoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(nil, nil) + if err != nil { logger.Errorf("Failed to create CNS object, err:%v.\n", err) return diff --git a/cns/cnsclient/httpapi/client.go b/cns/cnsclient/httpapi/client.go index 04a49f9740..a7821b98f9 100644 --- a/cns/cnsclient/httpapi/client.go +++ b/cns/cnsclient/httpapi/client.go @@ -13,8 +13,8 @@ type Client struct { } // CreateOrUpdateNC updates cns state -func (client *Client) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerRequest) error { - returnCode := client.RestService.CreateOrUpdateNetworkContainerInternal(ncRequest) +func (client *Client) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerRequest, scalarUnits cns.ScalarUnits) error { + returnCode := client.RestService.CreateOrUpdateNetworkContainerInternal(ncRequest, scalarUnits) if returnCode != 0 { return fmt.Errorf("Failed to Create NC request: %+v, errorCode: %d", ncRequest, returnCode) @@ -24,8 +24,8 @@ func (client *Client) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerReque } // ReconcileNCState initializes cns state -func (client *Client) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.KubernetesPodInfo) error { - returnCode := client.RestService.ReconcileNCState(ncRequest, podInfoByIP) +func (client *Client) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.KubernetesPodInfo, scalarUnits cns.ScalarUnits) error { + returnCode := client.RestService.ReconcileNCState(ncRequest, podInfoByIP, scalarUnits) if returnCode != 0 { return fmt.Errorf("Failed to Reconcile ncState: ncRequest %+v, podInfoMap: %+v, errorCode: %d", *ncRequest, podInfoByIP, returnCode) diff --git a/cns/fakes/cnsfake.go b/cns/fakes/cnsfake.go new file mode 100644 index 0000000000..0686d66d2d --- /dev/null +++ b/cns/fakes/cnsfake.go @@ -0,0 +1,56 @@ +package fakes + +import ( + "encoding/json" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/common" +) + +type HTTPServiceFake struct { + PoolMonitor cns.IPAMPoolMonitor +} + +func NewHTTPServiceFake() *HTTPServiceFake { + return &HTTPServiceFake{} +} + +func (fake *HTTPServiceFake) SendNCSnapShotPeriodically(int, chan bool) { + +} + +func (fake *HTTPServiceFake) SetNodeOrchestrator(*cns.SetOrchestratorTypeRequest) { + +} + +func (fake *HTTPServiceFake) SyncNodeStatus(string, string, string, json.RawMessage) (int, string) { + return 0, "" +} + +func (fake *HTTPServiceFake) GetAvailableIPConfigs() []cns.IPConfigurationStatus { + return []cns.IPConfigurationStatus{} +} + +func (fake *HTTPServiceFake) GetPodIPConfigState() map[string]cns.IPConfigurationStatus { + return make(map[string]cns.IPConfigurationStatus) +} + +func (fake *HTTPServiceFake) MarkIPsAsPending(numberToMark int) (map[string]cns.SecondaryIPConfig, error) { + return make(map[string]cns.SecondaryIPConfig), nil +} + +func (fake *HTTPServiceFake) GetOption(string) interface{} { + return nil +} + +func (fake *HTTPServiceFake) SetOption(string, interface{}) { + +} + +func (fake *HTTPServiceFake) Start(*common.ServiceConfig) error { + return nil +} + +func (fake *HTTPServiceFake) Stop() { + +} diff --git a/cns/fakes/requestcontrollerfake.go b/cns/fakes/requestcontrollerfake.go new file mode 100644 index 0000000000..936cd731f3 --- /dev/null +++ b/cns/fakes/requestcontrollerfake.go @@ -0,0 +1,22 @@ +package fakes + +import ( + "context" + + nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" +) + +type RequestControllerFake struct { +} + +func NewRequestControllerFake() *RequestControllerFake { + return &RequestControllerFake{} +} + +func (rc RequestControllerFake) StartRequestController(exitChan <-chan struct{}) error { + return nil +} + +func (rc RequestControllerFake) UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error { + return nil +} diff --git a/cns/ipampoolmonitor/ipampoolmonitor.go b/cns/ipampoolmonitor/ipampoolmonitor.go new file mode 100644 index 0000000000..1e3b897efa --- /dev/null +++ b/cns/ipampoolmonitor/ipampoolmonitor.go @@ -0,0 +1,131 @@ +package ipampoolmonitor + +import ( + "context" + "sync" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/logger" + "github.com/Azure/azure-container-networking/cns/requestcontroller" + nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" +) + +var ( + increasePoolSize = 1 + decreasePoolSize = -1 + doNothing = 0 +) + +type CNSIPAMPoolMonitor struct { + initialized bool + + cns cns.HTTPService + rc requestcontroller.RequestController + scalarUnits cns.ScalarUnits + MinimumFreeIps int + MaximumFreeIps int + + sync.RWMutex +} + +func NewCNSIPAMPoolMonitor(cnsService cns.HTTPService, requestController requestcontroller.RequestController) *CNSIPAMPoolMonitor { + return &CNSIPAMPoolMonitor{ + initialized: false, + cns: cnsService, + rc: requestController, + } +} + +// TODO: add looping and cancellation to this, and add to CNS MAIN +func (pm *CNSIPAMPoolMonitor) Start() error { + + if pm.initialized { + availableIPConfigs := pm.cns.GetAvailableIPConfigs() + rebatchAction := pm.checkForResize(len(availableIPConfigs)) + switch rebatchAction { + case increasePoolSize: + return pm.increasePoolSize() + case decreasePoolSize: + return pm.decreasePoolSize() + } + } + + return nil +} + +// UpdatePoolLimitsTransacted called by request controller on reconcile to set the batch size limits +func (pm *CNSIPAMPoolMonitor) UpdatePoolLimitsTransacted(scalarUnits cns.ScalarUnits) { + pm.Lock() + defer pm.Unlock() + pm.scalarUnits = scalarUnits + + // TODO rounding? + pm.MinimumFreeIps = int(pm.scalarUnits.BatchSize * (pm.scalarUnits.RequestThresholdPercent / 100)) + pm.MaximumFreeIps = int(pm.scalarUnits.BatchSize * (pm.scalarUnits.ReleaseThresholdPercent / 100)) + + pm.initialized = true +} + +func (pm *CNSIPAMPoolMonitor) checkForResize(freeIPConfigCount int) int { + switch { + // pod count is increasing + case freeIPConfigCount < pm.MinimumFreeIps: + logger.Printf("Number of free IP's (%d) < minimum free IPs (%d), request batch increase\n", freeIPConfigCount, pm.MinimumFreeIps) + return increasePoolSize + + // pod count is decreasing + case freeIPConfigCount > pm.MaximumFreeIps: + logger.Printf("Number of free IP's (%d) > maximum free IPs (%d), request batch decrease\n", freeIPConfigCount, pm.MaximumFreeIps) + return decreasePoolSize + } + return doNothing +} + +func (pm *CNSIPAMPoolMonitor) increasePoolSize() error { + increaseIPCount := len(pm.cns.GetPodIPConfigState()) + int(pm.scalarUnits.BatchSize) + + // pass nil map to CNStoCRDSpec because we don't want to modify the to be deleted ipconfigs + spec, err := CNSToCRDSpec(nil, increaseIPCount) + if err != nil { + return err + } + + return pm.rc.UpdateCRDSpec(context.Background(), spec) +} + +func (pm *CNSIPAMPoolMonitor) decreasePoolSize() error { + + // TODO: Better handling here, negatives + // TODO: Maintain desired state to check against if pool size adjustment is already happening + decreaseIPCount := len(pm.cns.GetPodIPConfigState()) - int(pm.scalarUnits.BatchSize) + + // mark n number of IP's as pending + pendingIPAddresses, err := pm.cns.MarkIPsAsPending(decreaseIPCount) + if err != nil { + return err + } + + // convert the pending IP addresses to a spec + spec, err := CNSToCRDSpec(pendingIPAddresses, decreaseIPCount) + if err != nil { + return err + } + + return pm.rc.UpdateCRDSpec(context.Background(), spec) +} + +// CNSToCRDSpec translates CNS's map of Ips to be released and requested ip count into a CRD Spec +func CNSToCRDSpec(toBeDeletedSecondaryIPConfigs map[string]cns.SecondaryIPConfig, ipCount int) (nnc.NodeNetworkConfigSpec, error) { + var ( + spec nnc.NodeNetworkConfigSpec + uuid string + ) + + spec.RequestedIPCount = int64(ipCount) + + for uuid = range toBeDeletedSecondaryIPConfigs { + spec.IPsNotInUse = append(spec.IPsNotInUse, uuid) + } + + return spec, nil +} diff --git a/cns/ipampoolmonitor/ipampoolmonitor_test.go b/cns/ipampoolmonitor/ipampoolmonitor_test.go new file mode 100644 index 0000000000..54f5847d7d --- /dev/null +++ b/cns/ipampoolmonitor/ipampoolmonitor_test.go @@ -0,0 +1,19 @@ +package ipampoolmonitor + +import ( + "testing" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/fakes" +) + +func TestInterfaces(t *testing.T) { + fakecns := fakes.NewHTTPServiceFake() + fakerc := fakes.NewRequestControllerFake() + + fakecns.PoolMonitor = NewCNSIPAMPoolMonitor(fakecns, fakerc) + + scalarUnits := cns.ScalarUnits{} + + fakecns.PoolMonitor.UpdatePoolLimitsTransacted(scalarUnits) +} diff --git a/cns/requestcontroller/kubecontroller/crdreconciler.go b/cns/requestcontroller/kubecontroller/crdreconciler.go index 649e13403f..eb2ab0e9f6 100644 --- a/cns/requestcontroller/kubecontroller/crdreconciler.go +++ b/cns/requestcontroller/kubecontroller/crdreconciler.go @@ -55,7 +55,13 @@ func (r *CrdReconciler) Reconcile(request reconcile.Request) (reconcile.Result, return reconcile.Result{}, err } - if err = r.CNSClient.CreateOrUpdateNC(ncRequest); err != nil { + scalarUnits := cns.ScalarUnits{ + BatchSize: nodeNetConfig.Status.Scaler.BatchSize, + RequestThresholdPercent: nodeNetConfig.Status.Scaler.RequestThresholdPercent, + ReleaseThresholdPercent: nodeNetConfig.Status.Scaler.ReleaseThresholdPercent, + } + + if err = r.CNSClient.CreateOrUpdateNC(ncRequest, scalarUnits); err != nil { logger.Errorf("[cns-rc] Error creating or updating NC in reconcile: %v", err) // requeue return reconcile.Result{}, err diff --git a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go index e3fe5d386c..ab17045983 100644 --- a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go +++ b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go @@ -177,6 +177,7 @@ func (crdRC *crdRequestController) initCNS() error { cntxt context.Context ncRequest cns.CreateNetworkContainerRequest err error + scalarUnits cns.ScalarUnits ) cntxt = context.Background() @@ -189,9 +190,15 @@ func (crdRC *crdRequestController) initCNS() error { os.Exit(1) } + scalarUnits = cns.ScalarUnits{ + BatchSize: nodeNetConfig.Status.Scaler.BatchSize, + RequestThresholdPercent: nodeNetConfig.Status.Scaler.RequestThresholdPercent, + ReleaseThresholdPercent: nodeNetConfig.Status.Scaler.ReleaseThresholdPercent, + } + // If instance of crd is not found, pass nil to CNSClient if client.IgnoreNotFound(err) == nil { - return crdRC.CNSClient.ReconcileNCState(nil, nil) + return crdRC.CNSClient.ReconcileNCState(nil, nil, scalarUnits) } // If it's any other error, log it and return @@ -201,7 +208,7 @@ func (crdRC *crdRequestController) initCNS() error { // If there are no NCs, pass nil to CNSClient if len(nodeNetConfig.Status.NetworkContainers) == 0 { - return crdRC.CNSClient.ReconcileNCState(nil, nil) + return crdRC.CNSClient.ReconcileNCState(nil, nil, scalarUnits) } // Convert to CreateNetworkContainerRequest @@ -232,7 +239,7 @@ func (crdRC *crdRequestController) initCNS() error { } // Call cnsclient init cns passing those two things - return crdRC.CNSClient.ReconcileNCState(&ncRequest, podInfoByIP) + return crdRC.CNSClient.ReconcileNCState(&ncRequest, podInfoByIP, scalarUnits) } diff --git a/cns/requestcontroller/kubecontroller/crdrequestcontroller_test.go b/cns/requestcontroller/kubecontroller/crdrequestcontroller_test.go index fd6490116a..57113fcc46 100644 --- a/cns/requestcontroller/kubecontroller/crdrequestcontroller_test.go +++ b/cns/requestcontroller/kubecontroller/crdrequestcontroller_test.go @@ -99,12 +99,12 @@ type MockCNSClient struct { } // we're just testing that reconciler interacts with CNS on Reconcile(). -func (mi *MockCNSClient) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerRequest) error { +func (mi *MockCNSClient) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerRequest, scalarUnits cns.ScalarUnits) error { mi.MockCNSUpdated = true return nil } -func (mi *MockCNSClient) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.KubernetesPodInfo) error { +func (mi *MockCNSClient) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.KubernetesPodInfo, scalarUnits cns.ScalarUnits) error { mi.MockCNSInitialized = true mi.Pods = podInfoByIP mi.NCRequest = ncRequest diff --git a/cns/restserver/api_test.go b/cns/restserver/api_test.go index 8ab654ad7f..875d32d1ee 100644 --- a/cns/restserver/api_test.go +++ b/cns/restserver/api_test.go @@ -51,7 +51,7 @@ type xmlDocument struct { } var ( - service HTTPService + service cns.HTTPService svc *HTTPRestService mux *http.ServeMux hostQueryForProgrammedVersionResponse = `{"httpStatusCode":"200","networkContainerId":"eab2470f-test-test-test-b3cd316979d5","version":"1"}` diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index 03dfac4885..a15915e15e 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -12,8 +12,10 @@ import ( "reflect" "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/nmagentclient" + "github.com/Azure/azure-container-networking/cns/requestcontroller" "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" ) @@ -150,15 +152,21 @@ func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string, return } +func (service *HTTPRestService) StartCNSIPAMPoolMonitor(cnsService cns.HTTPService, requestController requestcontroller.RequestController) { + + // TODO, start pool monitor as well + service.PoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(cnsService, requestController) +} + // This API will be called by CNS RequestController on CRD update. -func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIp map[string]cns.KubernetesPodInfo) int { +func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIp map[string]cns.KubernetesPodInfo, scalarUnits cns.ScalarUnits) int { // check if ncRequest is null, then return as there is no CRD state yet if ncRequest == nil { log.Logf("CNS starting with no NC state, podInfoMap count %d", len(podInfoByIp)) return Success } - returnCode := service.CreateOrUpdateNetworkContainerInternal(*ncRequest) + returnCode := service.CreateOrUpdateNetworkContainerInternal(*ncRequest, scalarUnits) // If the NC was created successfully, then reconcile the allocated pod state if returnCode != Success { @@ -194,7 +202,7 @@ func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkCon } // This API will be called by CNS RequestController on CRD update. -func (service *HTTPRestService) CreateOrUpdateNetworkContainerInternal(req cns.CreateNetworkContainerRequest) int { +func (service *HTTPRestService) CreateOrUpdateNetworkContainerInternal(req cns.CreateNetworkContainerRequest, scalarUnits cns.ScalarUnits) int { if req.NetworkContainerid == "" { logger.Errorf("[Azure CNS] Error. NetworkContainerid is empty") return NetworkContainerNotSpecified @@ -244,5 +252,7 @@ func (service *HTTPRestService) CreateOrUpdateNetworkContainerInternal(req cns.C logger.Errorf(returnMessage) } + service.PoolMonitor.UpdatePoolLimitsTransacted(scalarUnits) + return returnCode } diff --git a/cns/restserver/internalapi_test.go b/cns/restserver/internalapi_test.go index c8c3631de1..25dc1d77c8 100644 --- a/cns/restserver/internalapi_test.go +++ b/cns/restserver/internalapi_test.go @@ -11,6 +11,7 @@ import ( "testing" "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" "github.com/google/uuid" ) @@ -36,12 +37,13 @@ func TestCreateOrUpdateNetworkContainerInternal(t *testing.T) { func TestReconcileNCWithEmptyState(t *testing.T) { restartService() + var scalarUnits cns.ScalarUnits setEnv(t) setOrchestratorTypeInternal(cns.KubernetesCRD) expectedNcCount := len(svc.state.ContainerStatus) expectedAllocatedPods := make(map[string]cns.KubernetesPodInfo) - returnCode := svc.ReconcileNCState(nil, expectedAllocatedPods) + returnCode := svc.ReconcileNCState(nil, expectedAllocatedPods, scalarUnits) if returnCode != Success { t.Errorf("Unexpected failure on reconcile with no state %d", returnCode) } @@ -51,6 +53,8 @@ func TestReconcileNCWithEmptyState(t *testing.T) { func TestReconcileNCWithExistingState(t *testing.T) { restartService() + var scalarUnits cns.ScalarUnits + svc.PoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(nil, nil) setEnv(t) setOrchestratorTypeInternal(cns.KubernetesCRD) @@ -78,7 +82,7 @@ func TestReconcileNCWithExistingState(t *testing.T) { } expectedNcCount := len(svc.state.ContainerStatus) - returnCode := svc.ReconcileNCState(&req, expectedAllocatedPods) + returnCode := svc.ReconcileNCState(&req, expectedAllocatedPods, scalarUnits) if returnCode != Success { t.Errorf("Unexpected failure on reconcile with no state %d", returnCode) } @@ -88,6 +92,8 @@ func TestReconcileNCWithExistingState(t *testing.T) { func TestReconcileNCWithSystemPods(t *testing.T) { restartService() + svc.PoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(nil, nil) + var scalarUnits cns.ScalarUnits setEnv(t) setOrchestratorTypeInternal(cns.KubernetesCRD) @@ -116,7 +122,7 @@ func TestReconcileNCWithSystemPods(t *testing.T) { } expectedNcCount := len(svc.state.ContainerStatus) - returnCode := svc.ReconcileNCState(&req, expectedAllocatedPods) + returnCode := svc.ReconcileNCState(&req, expectedAllocatedPods, scalarUnits) if returnCode != Success { t.Errorf("Unexpected failure on reconcile with no state %d", returnCode) } @@ -182,7 +188,9 @@ func validateCreateOrUpdateNCInternal(t *testing.T, secondaryIpCount int) { func createAndValidateNCRequest(t *testing.T, secondaryIPConfigs map[string]cns.SecondaryIPConfig, ncId string) { req := generateNetworkContainerRequest(secondaryIPConfigs, ncId) - returnCode := svc.CreateOrUpdateNetworkContainerInternal(req) + var scalarUnits cns.ScalarUnits + svc.PoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(nil, nil) + returnCode := svc.CreateOrUpdateNetworkContainerInternal(req, scalarUnits) if returnCode != 0 { t.Fatalf("Failed to createNetworkContainerRequest, req: %+v, err: %d", req, returnCode) } diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index a22227762c..c076bc439c 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -90,24 +90,50 @@ func (service *HTTPRestService) releaseIPConfigHandler(w http.ResponseWriter, r return } -func (service *HTTPRestService) GetAllocatedIPConfigs() []ipConfigurationStatus { +func (service *HTTPRestService) MarkIPsAsPending(numberToMark int) (map[string]cns.SecondaryIPConfig, error) { + pendingReleaseIPs := make(map[string]cns.SecondaryIPConfig) + markedIPCount := 0 + + service.Lock() + defer service.Unlock() + for uuid, ipconfig := range service.PodIPConfigState { + if ipconfig.State == cns.Available { + ipconfig.State = cns.PendingRelease + pendingReleaseIPs[uuid] = cns.SecondaryIPConfig{ + IPAddress: ipconfig.IPAddress, + } + markedIPCount++ + if markedIPCount == numberToMark { + return pendingReleaseIPs, nil + } + } + } + + return nil, fmt.Errorf("Failed to mark %d IP's as pending, only marked %d IP's", numberToMark, len(pendingReleaseIPs)) +} + +func (service *HTTPRestService) GetPodIPConfigState() map[string]cns.IPConfigurationStatus { + return service.PodIPConfigState +} + +func (service *HTTPRestService) GetAllocatedIPConfigs() []cns.IPConfigurationStatus { service.RLock() defer service.RUnlock() - return filterIPConfigMap(service.PodIPConfigState, func(ipconfig ipConfigurationStatus) bool { + return filterIPConfigMap(service.PodIPConfigState, func(ipconfig cns.IPConfigurationStatus) bool { return ipconfig.State == cns.Allocated }) } -func (service *HTTPRestService) GetAvailableIPConfigs() []ipConfigurationStatus { +func (service *HTTPRestService) GetAvailableIPConfigs() []cns.IPConfigurationStatus { service.RLock() defer service.RUnlock() - return filterIPConfigMap(service.PodIPConfigState, func(ipconfig ipConfigurationStatus) bool { + return filterIPConfigMap(service.PodIPConfigState, func(ipconfig cns.IPConfigurationStatus) bool { return ipconfig.State == cns.Available }) } -func filterIPConfigMap(toBeAdded map[string]ipConfigurationStatus, f func(ipConfigurationStatus) bool) []ipConfigurationStatus { - vsf := make([]ipConfigurationStatus, 0) +func filterIPConfigMap(toBeAdded map[string]cns.IPConfigurationStatus, f func(cns.IPConfigurationStatus) bool) []cns.IPConfigurationStatus { + vsf := make([]cns.IPConfigurationStatus, 0) for _, v := range toBeAdded { if f(v) { vsf = append(vsf, v) @@ -117,7 +143,7 @@ func filterIPConfigMap(toBeAdded map[string]ipConfigurationStatus, f func(ipConf } //SetIPConfigAsAllocated takes a lock of the service, and sets the ipconfig in the CNS state as allocated, does not take a lock -func (service *HTTPRestService) setIPConfigAsAllocated(ipconfig ipConfigurationStatus, podInfo cns.KubernetesPodInfo, marshalledOrchestratorContext json.RawMessage) ipConfigurationStatus { +func (service *HTTPRestService) setIPConfigAsAllocated(ipconfig cns.IPConfigurationStatus, podInfo cns.KubernetesPodInfo, marshalledOrchestratorContext json.RawMessage) cns.IPConfigurationStatus { ipconfig.State = cns.Allocated ipconfig.OrchestratorContext = marshalledOrchestratorContext service.PodIPIDByOrchestratorContext[podInfo.GetOrchestratorContextKey()] = ipconfig.ID @@ -126,7 +152,7 @@ func (service *HTTPRestService) setIPConfigAsAllocated(ipconfig ipConfigurationS } //SetIPConfigAsAllocated and sets the ipconfig in the CNS state as allocated, does not take a lock -func (service *HTTPRestService) setIPConfigAsAvailable(ipconfig ipConfigurationStatus, podInfo cns.KubernetesPodInfo) ipConfigurationStatus { +func (service *HTTPRestService) setIPConfigAsAvailable(ipconfig cns.IPConfigurationStatus, podInfo cns.KubernetesPodInfo) cns.IPConfigurationStatus { ipconfig.State = cns.Available ipconfig.OrchestratorContext = nil service.PodIPConfigState[ipconfig.ID] = ipconfig diff --git a/cns/restserver/ipam_test.go b/cns/restserver/ipam_test.go index 0f39dcdd64..3418a65e48 100644 --- a/cns/restserver/ipam_test.go +++ b/cns/restserver/ipam_test.go @@ -54,10 +54,10 @@ func newSecondaryIPConfig(ipAddress string) cns.SecondaryIPConfig { } } -func NewPodState(ipaddress string, prefixLength uint8, id, ncid, state string) ipConfigurationStatus { +func NewPodState(ipaddress string, prefixLength uint8, id, ncid, state string) cns.IPConfigurationStatus { ipconfig := newSecondaryIPConfig(ipaddress) - return ipConfigurationStatus{ + return cns.IPConfigurationStatus{ IPAddress: ipconfig.IPAddress, ID: id, NCID: ncid, @@ -65,10 +65,10 @@ func NewPodState(ipaddress string, prefixLength uint8, id, ncid, state string) i } } -func requestIpAddressAndGetState(t *testing.T, req cns.GetIPConfigRequest) (ipConfigurationStatus, error) { +func requestIpAddressAndGetState(t *testing.T, req cns.GetIPConfigRequest) (cns.IPConfigurationStatus, error) { var ( podInfo cns.KubernetesPodInfo - ipState ipConfigurationStatus + ipState cns.IPConfigurationStatus PodIpInfo cns.PodIpInfo err error ) @@ -118,10 +118,10 @@ func requestIpAddressAndGetState(t *testing.T, req cns.GetIPConfigRequest) (ipCo return ipState, err } -func NewPodStateWithOrchestratorContext(ipaddress string, prefixLength uint8, id, ncid, state string, orchestratorContext cns.KubernetesPodInfo) (ipConfigurationStatus, error) { +func NewPodStateWithOrchestratorContext(ipaddress string, prefixLength uint8, id, ncid, state string, orchestratorContext cns.KubernetesPodInfo) (cns.IPConfigurationStatus, error) { ipconfig := newSecondaryIPConfig(ipaddress) b, err := json.Marshal(orchestratorContext) - return ipConfigurationStatus{ + return cns.IPConfigurationStatus{ IPAddress: ipconfig.IPAddress, ID: id, NCID: ncid, @@ -131,7 +131,7 @@ func NewPodStateWithOrchestratorContext(ipaddress string, prefixLength uint8, id } // Test function to populate the IPConfigState -func UpdatePodIpConfigState(t *testing.T, svc *HTTPRestService, ipconfigs map[string]ipConfigurationStatus) error { +func UpdatePodIpConfigState(t *testing.T, svc *HTTPRestService, ipconfigs map[string]cns.IPConfigurationStatus) error { // Create NC secondaryIPConfigs := make(map[string]cns.SecondaryIPConfig) for _, ipconfig := range ipconfigs { @@ -166,7 +166,7 @@ func TestIPAMGetAvailableIPConfig(t *testing.T) { svc := getTestService() testState := NewPodState(testIP1, 24, testPod1GUID, testNCID, cns.Available) - ipconfigs := map[string]ipConfigurationStatus{ + ipconfigs := map[string]cns.IPConfigurationStatus{ testState.ID: testState, } UpdatePodIpConfigState(t, svc, ipconfigs) @@ -197,7 +197,7 @@ func TestIPAMGetNextAvailableIPConfig(t *testing.T) { state1, _ := NewPodStateWithOrchestratorContext(testIP1, 24, testPod1GUID, testNCID, cns.Allocated, testPod1Info) state2 := NewPodState(testIP2, 24, testPod2GUID, testNCID, cns.Available) - ipconfigs := map[string]ipConfigurationStatus{ + ipconfigs := map[string]cns.IPConfigurationStatus{ state1.ID: state1, state2.ID: state2, } @@ -227,7 +227,7 @@ func TestIPAMGetAlreadyAllocatedIPConfigForSamePod(t *testing.T) { // Add Allocated Pod IP to state testState, _ := NewPodStateWithOrchestratorContext(testIP1, 24, testPod1GUID, testNCID, cns.Allocated, testPod1Info) - ipconfigs := map[string]ipConfigurationStatus{ + ipconfigs := map[string]cns.IPConfigurationStatus{ testState.ID: testState, } err := UpdatePodIpConfigState(t, svc, ipconfigs) @@ -256,7 +256,7 @@ func TestIPAMAttemptToRequestIPNotFoundInPool(t *testing.T) { // Add Available Pod IP to state testState := NewPodState(testIP1, 24, testPod1GUID, testNCID, cns.Available) - ipconfigs := map[string]ipConfigurationStatus{ + ipconfigs := map[string]cns.IPConfigurationStatus{ testState.ID: testState, } @@ -281,7 +281,7 @@ func TestIPAMGetDesiredIPConfigWithSpecfiedIP(t *testing.T) { // Add Available Pod IP to state testState := NewPodState(testIP1, 24, testPod1GUID, testNCID, cns.Available) - ipconfigs := map[string]ipConfigurationStatus{ + ipconfigs := map[string]cns.IPConfigurationStatus{ testState.ID: testState, } @@ -313,7 +313,7 @@ func TestIPAMFailToGetDesiredIPConfigWithAlreadyAllocatedSpecfiedIP(t *testing.T // set state as already allocated testState, _ := NewPodStateWithOrchestratorContext(testIP1, 24, testPod1GUID, testNCID, cns.Allocated, testPod1Info) - ipconfigs := map[string]ipConfigurationStatus{ + ipconfigs := map[string]cns.IPConfigurationStatus{ testState.ID: testState, } err := UpdatePodIpConfigState(t, svc, ipconfigs) @@ -340,7 +340,7 @@ func TestIPAMFailToGetIPWhenAllIPsAreAllocated(t *testing.T) { state1, _ := NewPodStateWithOrchestratorContext(testIP1, 24, testPod1GUID, testNCID, cns.Allocated, testPod1Info) state2, _ := NewPodStateWithOrchestratorContext(testIP2, 24, testPod2GUID, testNCID, cns.Allocated, testPod2Info) - ipconfigs := map[string]ipConfigurationStatus{ + ipconfigs := map[string]cns.IPConfigurationStatus{ state1.ID: state1, state2.ID: state2, } @@ -369,7 +369,7 @@ func TestIPAMRequestThenReleaseThenRequestAgain(t *testing.T) { // set state as already allocated state1, _ := NewPodStateWithOrchestratorContext(testIP1, 24, testPod1GUID, testNCID, cns.Allocated, testPod1Info) - ipconfigs := map[string]ipConfigurationStatus{ + ipconfigs := map[string]cns.IPConfigurationStatus{ state1.ID: state1, } @@ -422,7 +422,7 @@ func TestIPAMReleaseIPIdempotency(t *testing.T) { svc := getTestService() // set state as already allocated state1, _ := NewPodStateWithOrchestratorContext(testIP1, 24, testPod1GUID, testNCID, cns.Allocated, testPod1Info) - ipconfigs := map[string]ipConfigurationStatus{ + ipconfigs := map[string]cns.IPConfigurationStatus{ state1.ID: state1, } @@ -449,7 +449,7 @@ func TestIPAMAllocateIPIdempotency(t *testing.T) { // set state as already allocated state1, _ := NewPodStateWithOrchestratorContext(testIP1, 24, testPod1GUID, testNCID, cns.Allocated, testPod1Info) - ipconfigs := map[string]ipConfigurationStatus{ + ipconfigs := map[string]cns.IPConfigurationStatus{ state1.ID: state1, } @@ -471,14 +471,14 @@ func TestAvailableIPConfigs(t *testing.T) { state2 := NewPodState(testIP2, 24, testPod2GUID, testNCID, cns.Available) state3 := NewPodState(testIP3, 24, testPod3GUID, testNCID, cns.Available) - ipconfigs := map[string]ipConfigurationStatus{ + ipconfigs := map[string]cns.IPConfigurationStatus{ state1.ID: state1, state2.ID: state2, state3.ID: state3, } UpdatePodIpConfigState(t, svc, ipconfigs) - desiredAvailableIps := map[string]ipConfigurationStatus{ + desiredAvailableIps := map[string]cns.IPConfigurationStatus{ state1.ID: state1, state2.ID: state2, state3.ID: state3, @@ -486,7 +486,7 @@ func TestAvailableIPConfigs(t *testing.T) { availableIps := svc.GetAvailableIPConfigs() validateIpState(t, availableIps, desiredAvailableIps) - desiredAllocatedIpConfigs := make(map[string]ipConfigurationStatus) + desiredAllocatedIpConfigs := make(map[string]cns.IPConfigurationStatus) allocatedIps := svc.GetAllocatedIPConfigs() validateIpState(t, allocatedIps, desiredAllocatedIpConfigs) @@ -512,13 +512,13 @@ func TestAvailableIPConfigs(t *testing.T) { } -func validateIpState(t *testing.T, actualIps []ipConfigurationStatus, expectedList map[string]ipConfigurationStatus) { +func validateIpState(t *testing.T, actualIps []cns.IPConfigurationStatus, expectedList map[string]cns.IPConfigurationStatus) { if len(actualIps) != len(expectedList) { t.Fatalf("Actual and expected count doesnt match, expected %d, actual %d", len(actualIps), len(expectedList)) } for _, actualIp := range actualIps { - var expectedIp ipConfigurationStatus + var expectedIp cns.IPConfigurationStatus var found bool for _, expectedIp = range expectedList { if reflect.DeepEqual(actualIp, expectedIp) == true { diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index 065960ac11..17e2d050aa 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -4,7 +4,6 @@ package restserver import ( - "encoding/json" "sync" "time" @@ -40,10 +39,11 @@ type HTTPRestService struct { imdsClient imdsclient.ImdsClientInterface ipamClient *ipamclient.IpamClient networkContainer *networkcontainers.NetworkContainers - PodIPIDByOrchestratorContext map[string]string // OrchestratorContext is key and value is Pod IP uuid. - PodIPConfigState map[string]ipConfigurationStatus // seondaryipid(uuid) is key - AllocatedIPCount map[string]allocatedIPCount // key - ncid + PodIPIDByOrchestratorContext map[string]string // OrchestratorContext is key and value is Pod IP uuid. + PodIPConfigState map[string]cns.IPConfigurationStatus // seondaryipid(uuid) is key + AllocatedIPCount map[string]allocatedIPCount // key - ncid routingTable *routes.RoutingTable + PoolMonitor cns.IPAMPoolMonitor store store.KeyValueStore state *httpRestServiceState sync.RWMutex @@ -54,16 +54,6 @@ type allocatedIPCount struct { Count int } -// This is used for KubernetesCRD orchastrator Type where NC has multiple ips. -// This struct captures the state for SecondaryIPs associated to a given NC -type ipConfigurationStatus struct { - NCID string - ID string //uuid - IPAddress string - State string - OrchestratorContext json.RawMessage -} - // containerstatus is used to save status of an existing container type containerstatus struct { ID string @@ -93,16 +83,8 @@ type networkInfo struct { Options map[string]interface{} } -// HTTPService describes the min API interface that every service should have. -type HTTPService interface { - common.ServiceAPI - SendNCSnapShotPeriodically(int, chan bool) - SetNodeOrchestrator(*cns.SetOrchestratorTypeRequest) - SyncNodeStatus(string, string, string, json.RawMessage) (int, string) -} - // NewHTTPRestService creates a new HTTP Service object. -func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdsclient.ImdsClientInterface) (HTTPService, error) { +func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdsclient.ImdsClientInterface) (cns.HTTPService, error) { service, err := cns.NewService(config.Name, config.Version, config.ChannelMode, config.Store) if err != nil { return nil, err @@ -127,7 +109,7 @@ func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdscl serviceState.joinedNetworks = make(map[string]struct{}) podIPIDByOrchestratorContext := make(map[string]string) - podIPConfigState := make(map[string]ipConfigurationStatus) + podIPConfigState := make(map[string]cns.IPConfigurationStatus) allocatedIPCount := make(map[string]allocatedIPCount) // key - ncid return &HTTPRestService{ diff --git a/cns/restserver/util.go b/cns/restserver/util.go index 0fed060471..b493bd160a 100644 --- a/cns/restserver/util.go +++ b/cns/restserver/util.go @@ -253,7 +253,7 @@ func (service *HTTPRestService) addIPConfigStateUntransacted(ncId string, ipconf } // add the new State - ipconfigStatus := ipConfigurationStatus{ + ipconfigStatus := cns.IPConfigurationStatus{ NCID: ncId, ID: ipId, IPAddress: ipconfig.IPAddress, @@ -641,7 +641,7 @@ func (service *HTTPRestService) validateIpConfigRequest(ipConfigRequest cns.GetI return podInfo, Success, "" } -func (service *HTTPRestService) populateIpConfigInfoUntransacted(ipConfigStatus ipConfigurationStatus, podIpInfo *cns.PodIpInfo) error { +func (service *HTTPRestService) populateIpConfigInfoUntransacted(ipConfigStatus cns.IPConfigurationStatus, podIpInfo *cns.PodIpInfo) error { var ( ncStatus containerstatus exists bool diff --git a/cns/service/main.go b/cns/service/main.go index e4b3c4dfcf..3cd8d073d3 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -227,7 +227,7 @@ func printVersion() { } // Try to register node with DNC when CNS is started in managed DNC mode -func registerNode(httpRestService restserver.HTTPService, dncEP, infraVnet, nodeID string) { +func registerNode(httpRestService cns.HTTPService, dncEP, infraVnet, nodeID string) { logger.Printf("[Azure CNS] Registering node %s with Infrastructure Network: %s PrivateEndpoint: %s", nodeID, infraVnet, dncEP) var (