diff --git a/cns/api.go b/cns/api.go index 2b719eaf06..0c06500e28 100644 --- a/cns/api.go +++ b/cns/api.go @@ -4,9 +4,11 @@ package cns import ( + "context" "encoding/json" "github.com/Azure/azure-container-networking/cns/common" + nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" ) // Container Network Service remote API Contract @@ -36,8 +38,10 @@ type HTTPService interface { SetNodeOrchestrator(*SetOrchestratorTypeRequest) SyncNodeStatus(string, string, string, json.RawMessage) (int, string) GetAvailableIPConfigs() []IPConfigurationStatus + GetAllocatedIPConfigs() []IPConfigurationStatus + GetPendingReleaseIPConfigs() []IPConfigurationStatus GetPodIPConfigState() map[string]IPConfigurationStatus - MarkIPsAsPending(numberToMark int) (map[string]SecondaryIPConfig, error) + MarkIPsAsPending(numberToMark int) (map[string]IPConfigurationStatus, error) } // This is used for KubernetesCRD orchastrator Type where NC has multiple ips. @@ -160,16 +164,9 @@ type NodeConfiguration struct { NodeID string NodeSubnet Subnet } - type IPAMPoolMonitor interface { - Start() error - UpdatePoolLimitsTransacted(ScalarUnits) -} - -type ScalarUnits struct { - BatchSize int64 - RequestThresholdPercent int64 - ReleaseThresholdPercent int64 + Start(ctx context.Context, poolMonitorRefreshMilliseconds int) error + Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error } // Response describes generic response from CNS. diff --git a/cns/cnsclient/apiclient.go b/cns/cnsclient/apiclient.go index 3fc0493840..6982926790 100644 --- a/cns/cnsclient/apiclient.go +++ b/cns/cnsclient/apiclient.go @@ -1,9 +1,12 @@ package cnsclient -import "github.com/Azure/azure-container-networking/cns" +import ( + "github.com/Azure/azure-container-networking/cns" + nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" +) // APIClient interface to update cns state type APIClient interface { - ReconcileNCState(nc *cns.CreateNetworkContainerRequest, pods map[string]cns.KubernetesPodInfo, scalarUnits cns.ScalarUnits) error - CreateOrUpdateNC(nc cns.CreateNetworkContainerRequest, scalarUnits cns.ScalarUnits) error + ReconcileNCState(nc *cns.CreateNetworkContainerRequest, pods map[string]cns.KubernetesPodInfo, scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error + CreateOrUpdateNC(nc cns.CreateNetworkContainerRequest, scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error } diff --git a/cns/cnsclient/cnsclient_test.go b/cns/cnsclient/cnsclient_test.go index e96a39565c..372f134e48 100644 --- a/cns/cnsclient/cnsclient_test.go +++ b/cns/cnsclient/cnsclient_test.go @@ -15,7 +15,6 @@ import ( "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/common" "github.com/Azure/azure-container-networking/cns/fakes" - "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/restserver" "github.com/Azure/azure-container-networking/log" @@ -31,6 +30,10 @@ const ( gatewayIp = "10.0.0.1" subnetPrfixLength = 24 dockerContainerType = cns.Docker + releasePercent = 50 + requestPercent = 100 + batchSize = 10 + initPoolSize = 10 ) var ( @@ -39,7 +42,6 @@ var ( func addTestStateToRestServer(t *testing.T, secondaryIps []string) { var ipConfig cns.IPConfiguration - var scalarUnits cns.ScalarUnits ipConfig.DNSServers = dnsservers ipConfig.GatewayIPAddress = gatewayIp var ipSubnet cns.IPSubnet @@ -63,7 +65,7 @@ func addTestStateToRestServer(t *testing.T, secondaryIps []string) { SecondaryIPConfigs: secondaryIPConfigs, } - returnCode := svc.CreateOrUpdateNetworkContainerInternal(req, scalarUnits) + returnCode := svc.CreateOrUpdateNetworkContainerInternal(req, fakes.NewFakeScalar(releasePercent, requestPercent, batchSize), fakes.NewFakeNodeNetworkConfigSpec(initPoolSize)) if returnCode != 0 { t.Fatalf("Failed to createNetworkContainerRequest, req: %+v, err: %d", req, returnCode) } @@ -123,7 +125,7 @@ func TestMain(m *testing.M) { httpRestService, err := restserver.NewHTTPRestService(&config, fakes.NewFakeImdsClient()) svc = httpRestService.(*restserver.HTTPRestService) svc.Name = "cns-test-server" - svc.PoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(nil, nil) + svc.IPAMPoolMonitor = fakes.NewIPAMPoolMonitorFake() if err != nil { logger.Errorf("Failed to create CNS object, err:%v.\n", err) diff --git a/cns/cnsclient/httpapi/client.go b/cns/cnsclient/httpapi/client.go index a7821b98f9..d3add84ce8 100644 --- a/cns/cnsclient/httpapi/client.go +++ b/cns/cnsclient/httpapi/client.go @@ -5,6 +5,7 @@ import ( "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/restserver" + nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" ) // Client implements APIClient interface. Used to update CNS state @@ -13,8 +14,8 @@ type Client struct { } // CreateOrUpdateNC updates cns state -func (client *Client) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerRequest, scalarUnits cns.ScalarUnits) error { - returnCode := client.RestService.CreateOrUpdateNetworkContainerInternal(ncRequest, scalarUnits) +func (client *Client) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerRequest, scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error { + returnCode := client.RestService.CreateOrUpdateNetworkContainerInternal(ncRequest, scalar, spec) if returnCode != 0 { return fmt.Errorf("Failed to Create NC request: %+v, errorCode: %d", ncRequest, returnCode) @@ -24,8 +25,8 @@ func (client *Client) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerReque } // ReconcileNCState initializes cns state -func (client *Client) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.KubernetesPodInfo, scalarUnits cns.ScalarUnits) error { - returnCode := client.RestService.ReconcileNCState(ncRequest, podInfoByIP, scalarUnits) +func (client *Client) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.KubernetesPodInfo, scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error { + returnCode := client.RestService.ReconcileNCState(ncRequest, podInfoByIP, scalar, spec) if returnCode != 0 { return fmt.Errorf("Failed to Reconcile ncState: ncRequest %+v, podInfoMap: %+v, errorCode: %d", *ncRequest, podInfoByIP, returnCode) diff --git a/cns/fakes/cnsfake.go b/cns/fakes/cnsfake.go index 0686d66d2d..80afdad04d 100644 --- a/cns/fakes/cnsfake.go +++ b/cns/fakes/cnsfake.go @@ -2,17 +2,211 @@ package fakes import ( "encoding/json" + "errors" + "sync" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/common" + nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" ) +// available IP's stack +// all IP's map + +type StringStack struct { + lock sync.Mutex // you don't have to do this if you don't want thread safety + items []string +} + +func NewFakeScalar(releaseThreshold, requestThreshold, batchSize int) nnc.Scaler { + return nnc.Scaler{ + BatchSize: int64(batchSize), + ReleaseThresholdPercent: int64(releaseThreshold), + RequestThresholdPercent: int64(requestThreshold), + } +} + +func NewFakeNodeNetworkConfigSpec(requestedIPCount int) nnc.NodeNetworkConfigSpec { + return nnc.NodeNetworkConfigSpec{ + RequestedIPCount: int64(requestedIPCount), + } +} + +func NewStack() *StringStack { + return &StringStack{sync.Mutex{}, make([]string, 0)} +} + +func (stack *StringStack) Push(v string) { + stack.lock.Lock() + defer stack.lock.Unlock() + + stack.items = append(stack.items, v) +} + +func (stack *StringStack) Pop() (string, error) { + stack.lock.Lock() + defer stack.lock.Unlock() + + length := len(stack.items) + if length == 0 { + return "", errors.New("Empty Stack") + } + + res := stack.items[length-1] + stack.items = stack.items[:length-1] + return res, nil +} + +type IPStateManager struct { + AvailableIPConfigState map[string]cns.IPConfigurationStatus + AllocatedIPConfigState map[string]cns.IPConfigurationStatus + PendingReleaseIPConfigState map[string]cns.IPConfigurationStatus + AvailableIPIDStack StringStack + sync.RWMutex +} + +func NewIPStateManager() IPStateManager { + return IPStateManager{ + AvailableIPConfigState: make(map[string]cns.IPConfigurationStatus), + AllocatedIPConfigState: make(map[string]cns.IPConfigurationStatus), + PendingReleaseIPConfigState: make(map[string]cns.IPConfigurationStatus), + AvailableIPIDStack: StringStack{}, + } +} + +func (ipm *IPStateManager) AddIPConfigs(ipconfigs []cns.IPConfigurationStatus) { + ipm.Lock() + defer ipm.Unlock() + + for i := 0; i < len(ipconfigs); i++ { + + switch { + case ipconfigs[i].State == cns.Available: + ipm.AvailableIPConfigState[ipconfigs[i].ID] = ipconfigs[i] + ipm.AvailableIPIDStack.Push(ipconfigs[i].ID) + case ipconfigs[i].State == cns.Allocated: + ipm.AllocatedIPConfigState[ipconfigs[i].ID] = ipconfigs[i] + case ipconfigs[i].State == cns.PendingRelease: + ipm.PendingReleaseIPConfigState[ipconfigs[i].ID] = ipconfigs[i] + } + } + + return +} + +func (ipm *IPStateManager) RemovePendingReleaseIPConfigs(ipconfigNames []string) { + ipm.Lock() + defer ipm.Unlock() + + for i := 0; i < len(ipconfigNames); i++ { + delete(ipm.PendingReleaseIPConfigState, ipconfigNames[i]) + } + + return +} + +func (ipm *IPStateManager) ReserveIPConfig() (cns.IPConfigurationStatus, error) { + ipm.Lock() + defer ipm.Unlock() + id, err := ipm.AvailableIPIDStack.Pop() + if err != nil { + return cns.IPConfigurationStatus{}, err + } + ipm.AllocatedIPConfigState[id] = ipm.AvailableIPConfigState[id] + delete(ipm.AvailableIPConfigState, id) + return ipm.AllocatedIPConfigState[id], nil +} + +func (ipm *IPStateManager) ReleaseIPConfig(ipconfigID string) (cns.IPConfigurationStatus, error) { + ipm.Lock() + defer ipm.Unlock() + ipm.AvailableIPConfigState[ipconfigID] = ipm.AllocatedIPConfigState[ipconfigID] + ipm.AvailableIPIDStack.Push(ipconfigID) + delete(ipm.AllocatedIPConfigState, ipconfigID) + return ipm.AvailableIPConfigState[ipconfigID], nil +} + +func (ipm *IPStateManager) MarkIPsAsPending(numberOfIPsToMark int) (map[string]cns.IPConfigurationStatus, error) { + ipm.Lock() + defer ipm.Unlock() + + var ( + err error + pendingRelease []cns.IPConfigurationStatus + ) + + defer func() { + // if there was an error, and not all ip's have been freed, restore state + if err != nil && len(pendingRelease) != numberOfIPsToMark { + for i := range pendingRelease { + ipm.AvailableIPIDStack.Push(pendingRelease[i].ID) + ipm.AvailableIPConfigState[pendingRelease[i].ID] = pendingRelease[i] + delete(ipm.PendingReleaseIPConfigState, pendingRelease[i].ID) + } + } + }() + + for i := 0; i < numberOfIPsToMark; i++ { + id, err := ipm.AvailableIPIDStack.Pop() + if err != nil { + return ipm.PendingReleaseIPConfigState, err + } + + // add all pending release to a slice + pendingRelease = append(pendingRelease, ipm.AvailableIPConfigState[id]) + delete(ipm.AvailableIPConfigState, id) + } + + // if no errors at this point, add the pending ips to the Pending state + for i := range pendingRelease { + ipm.PendingReleaseIPConfigState[pendingRelease[i].ID] = pendingRelease[i] + } + + return ipm.PendingReleaseIPConfigState, nil +} + type HTTPServiceFake struct { - PoolMonitor cns.IPAMPoolMonitor + IPStateManager IPStateManager + PoolMonitor cns.IPAMPoolMonitor } func NewHTTPServiceFake() *HTTPServiceFake { - return &HTTPServiceFake{} + svc := &HTTPServiceFake{ + IPStateManager: NewIPStateManager(), + } + + return svc +} + +func (fake *HTTPServiceFake) SetNumberOfAllocatedIPs(desiredAllocatedIPCount int) error { + currentAllocatedIPCount := len(fake.IPStateManager.AllocatedIPConfigState) + delta := (desiredAllocatedIPCount - currentAllocatedIPCount) + + if delta > 0 { + for i := 0; i < delta; i++ { + if _, err := fake.IPStateManager.ReserveIPConfig(); err != nil { + return err + } + } + } else if delta < 0 { + + // deallocate IP's + delta *= -1 + i := 0 + for id := range fake.IPStateManager.AllocatedIPConfigState { + if i < delta { + if _, err := fake.IPStateManager.ReleaseIPConfig(id); err != nil { + return err + } + + } else { + break + } + i++ + } + } + + return nil } func (fake *HTTPServiceFake) SendNCSnapShotPeriodically(int, chan bool) { @@ -28,29 +222,63 @@ func (fake *HTTPServiceFake) SyncNodeStatus(string, string, string, json.RawMess } func (fake *HTTPServiceFake) GetAvailableIPConfigs() []cns.IPConfigurationStatus { - return []cns.IPConfigurationStatus{} + ipconfigs := []cns.IPConfigurationStatus{} + for _, ipconfig := range fake.IPStateManager.AvailableIPConfigState { + ipconfigs = append(ipconfigs, ipconfig) + } + + return ipconfigs +} + +func (fake *HTTPServiceFake) GetAllocatedIPConfigs() []cns.IPConfigurationStatus { + ipconfigs := []cns.IPConfigurationStatus{} + for _, ipconfig := range fake.IPStateManager.AllocatedIPConfigState { + ipconfigs = append(ipconfigs, ipconfig) + } + + return ipconfigs +} + +func (fake *HTTPServiceFake) GetPendingReleaseIPConfigs() []cns.IPConfigurationStatus { + ipconfigs := []cns.IPConfigurationStatus{} + for _, ipconfig := range fake.IPStateManager.PendingReleaseIPConfigState { + ipconfigs = append(ipconfigs, ipconfig) + } + + return ipconfigs } +// Return union of all state maps func (fake *HTTPServiceFake) GetPodIPConfigState() map[string]cns.IPConfigurationStatus { - return make(map[string]cns.IPConfigurationStatus) + ipconfigs := make(map[string]cns.IPConfigurationStatus) + for key, val := range fake.IPStateManager.AllocatedIPConfigState { + ipconfigs[key] = val + } + + for key, val := range fake.IPStateManager.AvailableIPConfigState { + ipconfigs[key] = val + } + + for key, val := range fake.IPStateManager.PendingReleaseIPConfigState { + ipconfigs[key] = val + } + + return ipconfigs } -func (fake *HTTPServiceFake) MarkIPsAsPending(numberToMark int) (map[string]cns.SecondaryIPConfig, error) { - return make(map[string]cns.SecondaryIPConfig), nil +// TODO: Populate on scale down +func (fake *HTTPServiceFake) MarkIPsAsPending(numberToMark int) (map[string]cns.IPConfigurationStatus, error) { + return fake.IPStateManager.MarkIPsAsPending(numberToMark) } func (fake *HTTPServiceFake) GetOption(string) interface{} { return nil } -func (fake *HTTPServiceFake) SetOption(string, interface{}) { - -} +func (fake *HTTPServiceFake) SetOption(string, interface{}) {} func (fake *HTTPServiceFake) Start(*common.ServiceConfig) error { return nil } -func (fake *HTTPServiceFake) Stop() { - -} +func (fake *HTTPServiceFake) Stop() {} diff --git a/cns/fakes/ipampoolmonitorfake.go b/cns/fakes/ipampoolmonitorfake.go new file mode 100644 index 0000000000..03c707d6c7 --- /dev/null +++ b/cns/fakes/ipampoolmonitorfake.go @@ -0,0 +1,25 @@ +package fakes + +import ( + "context" + + nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" +) + +type IPAMPoolMonitorFake struct{} + +func NewIPAMPoolMonitorFake() *IPAMPoolMonitorFake { + return &IPAMPoolMonitorFake{} +} + +func (ipm *IPAMPoolMonitorFake) Start(ctx context.Context, poolMonitorRefreshMilliseconds int) error { + return nil +} + +func (ipm *IPAMPoolMonitorFake) Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error { + return nil +} + +func (ipm *IPAMPoolMonitorFake) Reconcile() error { + return nil +} diff --git a/cns/fakes/requestcontrollerfake.go b/cns/fakes/requestcontrollerfake.go index 936cd731f3..6b46c6f2c0 100644 --- a/cns/fakes/requestcontrollerfake.go +++ b/cns/fakes/requestcontrollerfake.go @@ -2,21 +2,125 @@ package fakes import ( "context" + "net" + "github.com/Azure/azure-container-networking/cns" nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" + "github.com/google/uuid" ) type RequestControllerFake struct { + fakecns *HTTPServiceFake + cachedCRD nnc.NodeNetworkConfig + ip net.IP } -func NewRequestControllerFake() *RequestControllerFake { - return &RequestControllerFake{} +func NewRequestControllerFake(cnsService *HTTPServiceFake, scalar nnc.Scaler, subnetAddressSpace string, numberOfIPConfigs int) *RequestControllerFake { + rc := &RequestControllerFake{ + fakecns: cnsService, + cachedCRD: nnc.NodeNetworkConfig{ + Spec: nnc.NodeNetworkConfigSpec{}, + Status: nnc.NodeNetworkConfigStatus{ + Scaler: scalar, + NetworkContainers: []nnc.NetworkContainer{nnc.NetworkContainer{ + SubnetAddressSpace: subnetAddressSpace, + }}, + }, + }, + } + + rc.ip, _, _ = net.ParseCIDR(subnetAddressSpace) + + rc.CarveIPConfigsAndAddToStatusAndCNS(numberOfIPConfigs) + + return rc } -func (rc RequestControllerFake) StartRequestController(exitChan <-chan struct{}) error { +func (rc *RequestControllerFake) CarveIPConfigsAndAddToStatusAndCNS(numberOfIPConfigs int) []cns.IPConfigurationStatus { + var cnsIPConfigs []cns.IPConfigurationStatus + for i := 0; i < numberOfIPConfigs; i++ { + + ipconfigCRD := nnc.IPAssignment{ + Name: uuid.New().String(), + IP: rc.ip.String(), + } + rc.cachedCRD.Status.NetworkContainers[0].IPAssignments = append(rc.cachedCRD.Status.NetworkContainers[0].IPAssignments, ipconfigCRD) + + ipconfigCNS := cns.IPConfigurationStatus{ + ID: ipconfigCRD.Name, + IPAddress: ipconfigCRD.IP, + State: cns.Available, + } + cnsIPConfigs = append(cnsIPConfigs, ipconfigCNS) + + incrementIP(rc.ip) + } + + rc.fakecns.IPStateManager.AddIPConfigs(cnsIPConfigs) + rc.cachedCRD.Spec.RequestedIPCount = int64(len(cnsIPConfigs)) + + return cnsIPConfigs +} + +func (rc *RequestControllerFake) StartRequestController(exitChan <-chan struct{}) error { return nil } -func (rc RequestControllerFake) UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error { +func (rc *RequestControllerFake) UpdateCRDSpec(cntxt context.Context, desiredSpec nnc.NodeNetworkConfigSpec) error { + rc.cachedCRD.Spec = desiredSpec + return nil } + +func remove(slice []nnc.IPAssignment, s int) []nnc.IPAssignment { + return append(slice[:s], slice[s+1:]...) +} + +func (rc *RequestControllerFake) Reconcile() error { + + diff := int(rc.cachedCRD.Spec.RequestedIPCount) - len(rc.fakecns.GetPodIPConfigState()) + + if diff > 0 { + // carve the difference of test IPs and add them to CNS, assume dnc has populated the CRD status + rc.CarveIPConfigsAndAddToStatusAndCNS(diff) + + } else if diff < 0 { + + // Assume DNC has removed the IPConfigs from the status + + // mimic DNC removing IPConfigs from the CRD + for _, notInUseIPConfigName := range rc.cachedCRD.Spec.IPsNotInUse { + + // remove ipconfig from status + index := 0 + for _, ipconfig := range rc.cachedCRD.Status.NetworkContainers[0].IPAssignments { + if notInUseIPConfigName == ipconfig.Name { + break + } + index++ + } + rc.cachedCRD.Status.NetworkContainers[0].IPAssignments = remove(rc.cachedCRD.Status.NetworkContainers[0].IPAssignments, index) + + } + + // remove ipconfig from CNS + rc.fakecns.IPStateManager.RemovePendingReleaseIPConfigs(rc.cachedCRD.Spec.IPsNotInUse) + + // empty the not in use ip's from the spec + rc.cachedCRD.Spec.IPsNotInUse = []string{} + } + + // update + rc.fakecns.PoolMonitor.Update(rc.cachedCRD.Status.Scaler, rc.cachedCRD.Spec) + + return nil +} + +func incrementIP(ip net.IP) { + for j := len(ip) - 1; j >= 0; j-- { + ip[j]++ + if ip[j] > 0 { + break + } + } +} diff --git a/cns/ipampoolmonitor/ipampoolmonitor.go b/cns/ipampoolmonitor/ipampoolmonitor.go index 1e3b897efa..716ea3ce84 100644 --- a/cns/ipampoolmonitor/ipampoolmonitor.go +++ b/cns/ipampoolmonitor/ipampoolmonitor.go @@ -2,7 +2,9 @@ package ipampoolmonitor import ( "context" + "fmt" "sync" + "time" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/logger" @@ -10,122 +12,177 @@ import ( nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" ) -var ( - increasePoolSize = 1 - decreasePoolSize = -1 - doNothing = 0 -) - type CNSIPAMPoolMonitor struct { - initialized bool + pendingRelease bool + + cachedNNC nnc.NodeNetworkConfig + scalarUnits nnc.Scaler cns cns.HTTPService rc requestcontroller.RequestController - scalarUnits cns.ScalarUnits - MinimumFreeIps int - MaximumFreeIps int + MinimumFreeIps int64 + MaximumFreeIps int64 - sync.RWMutex + mu sync.RWMutex } -func NewCNSIPAMPoolMonitor(cnsService cns.HTTPService, requestController requestcontroller.RequestController) *CNSIPAMPoolMonitor { +func NewCNSIPAMPoolMonitor(cns cns.HTTPService, rc requestcontroller.RequestController) *CNSIPAMPoolMonitor { return &CNSIPAMPoolMonitor{ - initialized: false, - cns: cnsService, - rc: requestController, + pendingRelease: false, + cns: cns, + rc: rc, } } -// TODO: add looping and cancellation to this, and add to CNS MAIN -func (pm *CNSIPAMPoolMonitor) Start() error { - - if pm.initialized { - availableIPConfigs := pm.cns.GetAvailableIPConfigs() - rebatchAction := pm.checkForResize(len(availableIPConfigs)) - switch rebatchAction { - case increasePoolSize: - return pm.increasePoolSize() - case decreasePoolSize: - return pm.decreasePoolSize() - } +func stopReconcile(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: } - return nil + return false } -// UpdatePoolLimitsTransacted called by request controller on reconcile to set the batch size limits -func (pm *CNSIPAMPoolMonitor) UpdatePoolLimitsTransacted(scalarUnits cns.ScalarUnits) { - pm.Lock() - defer pm.Unlock() - pm.scalarUnits = scalarUnits +func (pm *CNSIPAMPoolMonitor) Start(ctx context.Context, poolMonitorRefreshMilliseconds int) error { + logger.Printf("[ipam-pool-monitor] Starting CNS IPAM Pool Monitor") - // TODO rounding? - pm.MinimumFreeIps = int(pm.scalarUnits.BatchSize * (pm.scalarUnits.RequestThresholdPercent / 100)) - pm.MaximumFreeIps = int(pm.scalarUnits.BatchSize * (pm.scalarUnits.ReleaseThresholdPercent / 100)) + ticker := time.NewTicker(time.Duration(poolMonitorRefreshMilliseconds) * time.Millisecond) - pm.initialized = true + for { + select { + case <-ctx.Done(): + return fmt.Errorf("CNS IPAM Pool Monitor received cancellation signal") + case <-ticker.C: + err := pm.Reconcile() + if err != nil { + logger.Printf("[ipam-pool-monitor] Reconcile failed with err %v", err) + } + } + } } -func (pm *CNSIPAMPoolMonitor) checkForResize(freeIPConfigCount int) int { +func (pm *CNSIPAMPoolMonitor) Reconcile() error { + cnsPodIPConfigCount := len(pm.cns.GetPodIPConfigState()) + allocatedPodIPCount := len(pm.cns.GetAllocatedIPConfigs()) + pendingReleaseIPCount := len(pm.cns.GetPendingReleaseIPConfigs()) + availableIPConfigCount := len(pm.cns.GetAvailableIPConfigs()) // TODO: add pending allocation count to real cns + freeIPConfigCount := pm.cachedNNC.Spec.RequestedIPCount - int64(allocatedPodIPCount) + + logger.Printf("[ipam-pool-monitor] Checking pool for resize, Pool Size: %v, Goal Size: %v, BatchSize: %v, MinFree: %v, MaxFree:%v, Allocated: %v, Available: %v, Pending Release: %v, Free: %v", cnsPodIPConfigCount, pm.cachedNNC.Spec.RequestedIPCount, pm.scalarUnits.BatchSize, pm.MinimumFreeIps, pm.MaximumFreeIps, allocatedPodIPCount, availableIPConfigCount, pendingReleaseIPCount, freeIPConfigCount) + switch { // pod count is increasing case freeIPConfigCount < pm.MinimumFreeIps: - logger.Printf("Number of free IP's (%d) < minimum free IPs (%d), request batch increase\n", freeIPConfigCount, pm.MinimumFreeIps) - return increasePoolSize + logger.Printf("[ipam-pool-monitor] Increasing pool size...") + return pm.increasePoolSize() // pod count is decreasing case freeIPConfigCount > pm.MaximumFreeIps: - logger.Printf("Number of free IP's (%d) > maximum free IPs (%d), request batch decrease\n", freeIPConfigCount, pm.MaximumFreeIps) - return decreasePoolSize + logger.Printf("[ipam-pool-monitor] Decreasing pool size...") + return pm.decreasePoolSize() + + // CRD has reconciled CNS state, and target spec is now the same size as the state + // free to remove the IP's from the CRD + case pm.pendingRelease && int(pm.cachedNNC.Spec.RequestedIPCount) == cnsPodIPConfigCount: + logger.Printf("[ipam-pool-monitor] Removing Pending Release IP's from CRD...") + return pm.cleanPendingRelease() + + // no pods scheduled + case allocatedPodIPCount == 0: + logger.Printf("[ipam-pool-monitor] No pods scheduled") + return nil } - return doNothing + + return nil } func (pm *CNSIPAMPoolMonitor) increasePoolSize() error { - increaseIPCount := len(pm.cns.GetPodIPConfigState()) + int(pm.scalarUnits.BatchSize) + pm.mu.Lock() + defer pm.mu.Unlock() + + var err error + pm.cachedNNC.Spec.RequestedIPCount += pm.scalarUnits.BatchSize // pass nil map to CNStoCRDSpec because we don't want to modify the to be deleted ipconfigs - spec, err := CNSToCRDSpec(nil, increaseIPCount) + pm.cachedNNC.Spec, err = MarkIPsAsPendingInCRD(nil, pm.cachedNNC.Spec.RequestedIPCount) if err != nil { return err } - return pm.rc.UpdateCRDSpec(context.Background(), spec) + logger.Printf("[ipam-pool-monitor] Increasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's:%v", len(pm.cns.GetPodIPConfigState()), pm.cachedNNC.Spec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs())) + return pm.rc.UpdateCRDSpec(context.Background(), pm.cachedNNC.Spec) } func (pm *CNSIPAMPoolMonitor) decreasePoolSize() error { + pm.mu.Lock() + defer pm.mu.Unlock() - // TODO: Better handling here, negatives - // TODO: Maintain desired state to check against if pool size adjustment is already happening - decreaseIPCount := len(pm.cns.GetPodIPConfigState()) - int(pm.scalarUnits.BatchSize) + // TODO: Better handling here for negatives + pm.cachedNNC.Spec.RequestedIPCount -= pm.scalarUnits.BatchSize // mark n number of IP's as pending - pendingIPAddresses, err := pm.cns.MarkIPsAsPending(decreaseIPCount) + pendingIPAddresses, err := pm.cns.MarkIPsAsPending(int(pm.scalarUnits.BatchSize)) if err != nil { return err } // convert the pending IP addresses to a spec - spec, err := CNSToCRDSpec(pendingIPAddresses, decreaseIPCount) + pm.cachedNNC.Spec, err = MarkIPsAsPendingInCRD(pendingIPAddresses, pm.cachedNNC.Spec.RequestedIPCount) if err != nil { return err } + pm.pendingRelease = true + logger.Printf("[ipam-pool-monitor] Decreasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's: %v", len(pm.cns.GetPodIPConfigState()), pm.cachedNNC.Spec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs())) + return pm.rc.UpdateCRDSpec(context.Background(), pm.cachedNNC.Spec) +} + +// if cns pending ip release map is empty, request controller has already reconciled the CNS state, +// so we can remove it from our cache and remove the IP's from the CRD +func (pm *CNSIPAMPoolMonitor) cleanPendingRelease() error { + pm.mu.Lock() + defer pm.mu.Unlock() - return pm.rc.UpdateCRDSpec(context.Background(), spec) + var err error + pm.cachedNNC.Spec, err = MarkIPsAsPendingInCRD(nil, pm.cachedNNC.Spec.RequestedIPCount) + if err != nil { + logger.Printf("[ipam-pool-monitor] Failed to translate ") + } + + pm.pendingRelease = false + return pm.rc.UpdateCRDSpec(context.Background(), pm.cachedNNC.Spec) } // CNSToCRDSpec translates CNS's map of Ips to be released and requested ip count into a CRD Spec -func CNSToCRDSpec(toBeDeletedSecondaryIPConfigs map[string]cns.SecondaryIPConfig, ipCount int) (nnc.NodeNetworkConfigSpec, error) { +func MarkIPsAsPendingInCRD(toBeDeletedSecondaryIPConfigs map[string]cns.IPConfigurationStatus, ipCount int64) (nnc.NodeNetworkConfigSpec, error) { var ( spec nnc.NodeNetworkConfigSpec uuid string ) - spec.RequestedIPCount = int64(ipCount) + spec.RequestedIPCount = ipCount - for uuid = range toBeDeletedSecondaryIPConfigs { - spec.IPsNotInUse = append(spec.IPsNotInUse, uuid) + if toBeDeletedSecondaryIPConfigs == nil { + spec.IPsNotInUse = make([]string, 0) + } else { + for uuid = range toBeDeletedSecondaryIPConfigs { + spec.IPsNotInUse = append(spec.IPsNotInUse, uuid) + } } return spec, nil } + +// UpdatePoolLimitsTransacted called by request controller on reconcile to set the batch size limits +func (pm *CNSIPAMPoolMonitor) Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error { + pm.mu.Lock() + defer pm.mu.Unlock() + pm.scalarUnits = scalar + + pm.MinimumFreeIps = int64(float64(pm.scalarUnits.BatchSize) * (float64(pm.scalarUnits.RequestThresholdPercent) / 100)) + pm.MaximumFreeIps = int64(float64(pm.scalarUnits.BatchSize) * (float64(pm.scalarUnits.ReleaseThresholdPercent) / 100)) + + pm.cachedNNC.Spec = spec + + return nil +} diff --git a/cns/ipampoolmonitor/ipampoolmonitor_test.go b/cns/ipampoolmonitor/ipampoolmonitor_test.go index 54f5847d7d..934019bb34 100644 --- a/cns/ipampoolmonitor/ipampoolmonitor_test.go +++ b/cns/ipampoolmonitor/ipampoolmonitor_test.go @@ -1,19 +1,315 @@ package ipampoolmonitor import ( + "log" "testing" - "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/fakes" + "github.com/Azure/azure-container-networking/cns/logger" + nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" ) -func TestInterfaces(t *testing.T) { +func initFakes(batchSize, initialIPConfigCount, requestThresholdPercent, releaseThresholdPercent int) (*fakes.HTTPServiceFake, *fakes.RequestControllerFake, *CNSIPAMPoolMonitor) { + logger.InitLogger("testlogs", 0, 0, "./") + + scalarUnits := nnc.Scaler{ + BatchSize: int64(batchSize), + RequestThresholdPercent: int64(requestThresholdPercent), + ReleaseThresholdPercent: int64(releaseThresholdPercent), + } + subnetaddresspace := "10.0.0.0/8" + fakecns := fakes.NewHTTPServiceFake() - fakerc := fakes.NewRequestControllerFake() + fakerc := fakes.NewRequestControllerFake(fakecns, scalarUnits, subnetaddresspace, initialIPConfigCount) + + poolmonitor := NewCNSIPAMPoolMonitor(fakecns, fakerc) + + fakecns.PoolMonitor = poolmonitor + + fakerc.Reconcile() + + return fakecns, fakerc, poolmonitor +} + +func TestPoolSizeIncrease(t *testing.T) { + var ( + batchSize = 10 + initialIPConfigCount = 10 + requestThresholdPercent = 30 + releaseThresholdPercent = 150 + ) + + fakecns, fakerc, poolmonitor := initFakes(batchSize, initialIPConfigCount, requestThresholdPercent, releaseThresholdPercent) + + // increase number of allocated IP's in CNS + err := fakecns.SetNumberOfAllocatedIPs(8) + if err != nil { + t.Fatalf("Failed to allocate test ipconfigs with err: %v", err) + } + + // When poolmonitor reconcile is called, trigger increase and cache goal state + err = poolmonitor.Reconcile() + if err != nil { + t.Fatalf("Failed to allocate test ipconfigs with err: %v", err) + } + + // ensure pool monitor has reached quorum with cns + if poolmonitor.cachedNNC.Spec.RequestedIPCount != int64(initialIPConfigCount+(1*batchSize)) { + t.Fatalf("Pool monitor target IP count doesn't match CNS pool state after reconcile: %v, actual %v", poolmonitor.cachedNNC.Spec.RequestedIPCount, len(fakecns.GetPodIPConfigState())) + } + + // request controller reconciles, carves new IP's from the test subnet and adds to CNS state + err = fakerc.Reconcile() + if err != nil { + t.Fatalf("Failed to reconcile fake requestcontroller with err: %v", err) + } + + // when poolmonitor reconciles again here, the IP count will be within the thresholds + // so no CRD update and nothing pending + err = poolmonitor.Reconcile() + if err != nil { + t.Fatalf("Failed to reconcile pool monitor after request controller updates CNS state: %v", err) + } + + // ensure pool monitor has reached quorum with cns + if poolmonitor.cachedNNC.Spec.RequestedIPCount != int64(initialIPConfigCount+(1*batchSize)) { + t.Fatalf("Pool monitor target IP count doesn't match CNS pool state after reconcile: %v, actual %v", poolmonitor.cachedNNC.Spec.RequestedIPCount, len(fakecns.GetPodIPConfigState())) + } + + // make sure IPConfig state size reflects the new pool size + if len(fakecns.GetPodIPConfigState()) != initialIPConfigCount+(1*batchSize) { + t.Fatalf("CNS Pod IPConfig state count doesn't match, expected: %v, actual %v", len(fakecns.GetPodIPConfigState()), initialIPConfigCount+(1*batchSize)) + } + + t.Logf("Pool size %v, Target pool size %v, Allocated IP's %v, ", len(fakecns.GetPodIPConfigState()), poolmonitor.cachedNNC.Spec.RequestedIPCount, len(fakecns.GetAllocatedIPConfigs())) +} + +func TestPoolIncreaseDoesntChangeWhenIncreaseIsAlreadyInProgress(t *testing.T) { + var ( + batchSize = 10 + initialIPConfigCount = 10 + requestThresholdPercent = 30 + releaseThresholdPercent = 150 + ) + + fakecns, fakerc, poolmonitor := initFakes(batchSize, initialIPConfigCount, requestThresholdPercent, releaseThresholdPercent) + + // increase number of allocated IP's in CNS + err := fakecns.SetNumberOfAllocatedIPs(8) + if err != nil { + t.Fatalf("Failed to allocate test ipconfigs with err: %v", err) + } + + // When poolmonitor reconcile is called, trigger increase and cache goal state + err = poolmonitor.Reconcile() + if err != nil { + t.Fatalf("Failed to allocate test ipconfigs with err: %v", err) + } + + // increase number of allocated IP's in CNS, within allocatable size but still inside trigger threshold, + err = fakecns.SetNumberOfAllocatedIPs(9) + if err != nil { + t.Fatalf("Failed to allocate test ipconfigs with err: %v", err) + } + + // poolmonitor reconciles, but doesn't actually update the CRD, because there is already a pending update + err = poolmonitor.Reconcile() + if err != nil { + t.Fatalf("Failed to reconcile pool monitor after allocation ip increase with err: %v", err) + } + + // ensure pool monitor has reached quorum with cns + if poolmonitor.cachedNNC.Spec.RequestedIPCount != int64(initialIPConfigCount+(1*batchSize)) { + t.Fatalf("Pool monitor target IP count doesn't match CNS pool state after reconcile: %v, actual %v", poolmonitor.cachedNNC.Spec.RequestedIPCount, len(fakecns.GetPodIPConfigState())) + } + + // request controller reconciles, carves new IP's from the test subnet and adds to CNS state + err = fakerc.Reconcile() + if err != nil { + t.Fatalf("Failed to reconcile fake requestcontroller with err: %v", err) + } + + // when poolmonitor reconciles again here, the IP count will be within the thresholds + // so no CRD update and nothing pending + err = poolmonitor.Reconcile() + if err != nil { + t.Fatalf("Failed to reconcile pool monitor after request controller updates CNS state: %v", err) + } + + // make sure IPConfig state size reflects the new pool size + if len(fakecns.GetPodIPConfigState()) != initialIPConfigCount+(1*batchSize) { + t.Fatalf("CNS Pod IPConfig state count doesn't match, expected: %v, actual %v", len(fakecns.GetPodIPConfigState()), initialIPConfigCount+(1*batchSize)) + } + + // ensure pool monitor has reached quorum with cns + if poolmonitor.cachedNNC.Spec.RequestedIPCount != int64(initialIPConfigCount+(1*batchSize)) { + t.Fatalf("Pool monitor target IP count doesn't match CNS pool state after reconcile: %v, actual %v", poolmonitor.cachedNNC.Spec.RequestedIPCount, len(fakecns.GetPodIPConfigState())) + } + + t.Logf("Pool size %v, Target pool size %v, Allocated IP's %v, ", len(fakecns.GetPodIPConfigState()), poolmonitor.cachedNNC.Spec.RequestedIPCount, len(fakecns.GetAllocatedIPConfigs())) +} + +func TestPoolSizeIncreaseIdempotency(t *testing.T) { + var ( + batchSize = 10 + initialIPConfigCount = 10 + requestThresholdPercent = 30 + releaseThresholdPercent = 150 + ) + + fakecns, _, poolmonitor := initFakes(batchSize, initialIPConfigCount, requestThresholdPercent, releaseThresholdPercent) + + t.Logf("Minimum free IPs to request: %v", poolmonitor.MinimumFreeIps) + t.Logf("Maximum free IPs to release: %v", poolmonitor.MaximumFreeIps) + + // increase number of allocated IP's in CNS + err := fakecns.SetNumberOfAllocatedIPs(8) + if err != nil { + t.Fatalf("Failed to allocate test ipconfigs with err: %v", err) + } + + // When poolmonitor reconcile is called, trigger increase and cache goal state + err = poolmonitor.Reconcile() + if err != nil { + t.Fatalf("Failed to allocate test ipconfigs with err: %v", err) + } + + // ensure pool monitor has increased batch size + if poolmonitor.cachedNNC.Spec.RequestedIPCount != int64(initialIPConfigCount+(1*batchSize)) { + t.Fatalf("Pool monitor target IP count doesn't match CNS pool state after reconcile: %v, actual %v", poolmonitor.cachedNNC.Spec.RequestedIPCount, len(fakecns.GetPodIPConfigState())) + } + + // reconcile pool monitor a second time, then verify requested ip count is still the same + err = poolmonitor.Reconcile() + if err != nil { + t.Fatalf("Failed to allocate test ipconfigs with err: %v", err) + } + + // ensure pool monitor requested pool size is unchanged as request controller hasn't reconciled yet + if poolmonitor.cachedNNC.Spec.RequestedIPCount != int64(initialIPConfigCount+(1*batchSize)) { + t.Fatalf("Pool monitor target IP count doesn't match CNS pool state after reconcile: %v, actual %v", poolmonitor.cachedNNC.Spec.RequestedIPCount, len(fakecns.GetPodIPConfigState())) + } +} + +func TestPoolDecrease(t *testing.T) { + var ( + batchSize = 10 + initialIPConfigCount = 20 + requestThresholdPercent = 30 + releaseThresholdPercent = 150 + ) + + fakecns, fakerc, poolmonitor := initFakes(batchSize, initialIPConfigCount, requestThresholdPercent, releaseThresholdPercent) + + log.Printf("Min free IP's %v", poolmonitor.MinimumFreeIps) + log.Printf("Max free IP %v", poolmonitor.MaximumFreeIps) + + // initial pool count is 20, set 15 of them to be allocated + err := fakecns.SetNumberOfAllocatedIPs(15) + if err != nil { + t.Fatal(err) + } + + // Pool monitor does nothing, as the current number of IP's falls in the threshold + err = poolmonitor.Reconcile() + if err != nil { + t.Fatal(err) + } + + // Decrease the number of allocated IP's down to 5. This should trigger a scale down + err = fakecns.SetNumberOfAllocatedIPs(4) + if err != nil { + t.Fatal(err) + } + + // Pool monitor will adjust the spec so the pool size will be 1 batch size smaller + err = poolmonitor.Reconcile() + if err != nil { + t.Fatal(err) + } + + // ensure that the adjusted spec is smaller than the initial pool size + if len(poolmonitor.cachedNNC.Spec.IPsNotInUse) != (initialIPConfigCount - batchSize) { + t.Fatalf("Expected pool size to be one batch size smaller after reconcile, expected %v, actual %v", (initialIPConfigCount - batchSize), len(poolmonitor.cachedNNC.Spec.IPsNotInUse)) + } + + // reconcile the fake request controller + err = fakerc.Reconcile() + if err != nil { + t.Fatal(err) + } + + // Ensure the size of the requested spec is still the same + if len(poolmonitor.cachedNNC.Spec.IPsNotInUse) != 0 { + t.Fatalf("Expected IPsNotInUse to be 0 after request controller reconcile, actual %v", poolmonitor.cachedNNC.Spec.IPsNotInUse) + } + + return +} + +func TestPoolSizeDecreaseWhenDecreaseHasAlreadyBeenRequested(t *testing.T) { + var ( + batchSize = 10 + initialIPConfigCount = 20 + requestThresholdPercent = 30 + releaseThresholdPercent = 100 + ) + + fakecns, fakerc, poolmonitor := initFakes(batchSize, initialIPConfigCount, requestThresholdPercent, releaseThresholdPercent) + + log.Printf("Min free IP's %v", poolmonitor.MinimumFreeIps) + log.Printf("Max free IP %v", poolmonitor.MaximumFreeIps) + + // initial pool count is 30, set 25 of them to be allocated + err := fakecns.SetNumberOfAllocatedIPs(5) + if err != nil { + t.Error(err) + } + + // Pool monitor does nothing, as the current number of IP's falls in the threshold + err = poolmonitor.Reconcile() + if err != nil { + t.Errorf("Expected pool monitor to not fail after CNS set number of allocated IP's %v", err) + } + + // Ensure the size of the requested spec is still the same + if len(poolmonitor.cachedNNC.Spec.IPsNotInUse) != (initialIPConfigCount - batchSize) { + t.Fatalf("Expected IP's not in use be one batch size smaller after reconcile, expected %v, actual %v", (initialIPConfigCount - batchSize), len(poolmonitor.cachedNNC.Spec.IPsNotInUse)) + } + + // Ensure the request ipcount is now one batch size smaller than the inital IP count + if poolmonitor.cachedNNC.Spec.RequestedIPCount != int64(initialIPConfigCount-batchSize) { + t.Fatalf("Expected pool size to be one batch size smaller after reconcile, expected %v, actual %v", (initialIPConfigCount - batchSize), len(poolmonitor.cachedNNC.Spec.IPsNotInUse)) + } + + // Update pods with IP count, ensure pool monitor stays the same until request controller reconciles + err = fakecns.SetNumberOfAllocatedIPs(6) + if err != nil { + t.Error(err) + } + + // Ensure the size of the requested spec is still the same + if len(poolmonitor.cachedNNC.Spec.IPsNotInUse) != (initialIPConfigCount - batchSize) { + t.Fatalf("Expected IP's not in use to be one batch size smaller after reconcile, and not change after reconcile, expected %v, actual %v", (initialIPConfigCount - batchSize), len(poolmonitor.cachedNNC.Spec.IPsNotInUse)) + } + + // Ensure the request ipcount is now one batch size smaller than the inital IP count + if poolmonitor.cachedNNC.Spec.RequestedIPCount != int64(initialIPConfigCount-batchSize) { + t.Fatalf("Expected pool size to be one batch size smaller after reconcile, and not change after existing call, expected %v, actual %v", (initialIPConfigCount - batchSize), len(poolmonitor.cachedNNC.Spec.IPsNotInUse)) + } - fakecns.PoolMonitor = NewCNSIPAMPoolMonitor(fakecns, fakerc) + err = fakerc.Reconcile() + if err != nil { + t.Error(err) + } - scalarUnits := cns.ScalarUnits{} + err = poolmonitor.Reconcile() + if err != nil { + t.Errorf("Expected no pool monitor failure after request controller reconcile: %v", err) + } - fakecns.PoolMonitor.UpdatePoolLimitsTransacted(scalarUnits) + // Ensure the spec doesn't have any IPsNotInUse after request controller has reconciled + if len(poolmonitor.cachedNNC.Spec.IPsNotInUse) != 0 { + t.Fatalf("Expected IP's not in use to be 0 after reconcile, expected %v, actual %v", (initialIPConfigCount - batchSize), len(poolmonitor.cachedNNC.Spec.IPsNotInUse)) + } } diff --git a/cns/requestcontroller/kubecontroller/crdreconciler.go b/cns/requestcontroller/kubecontroller/crdreconciler.go index eb2ab0e9f6..a4ecc8281e 100644 --- a/cns/requestcontroller/kubecontroller/crdreconciler.go +++ b/cns/requestcontroller/kubecontroller/crdreconciler.go @@ -15,9 +15,10 @@ import ( // CrdReconciler watches for CRD status changes type CrdReconciler struct { - KubeClient KubeClient - NodeName string - CNSClient cnsclient.APIClient + KubeClient KubeClient + NodeName string + CNSClient cnsclient.APIClient + IPAMPoolMonitor cns.IPAMPoolMonitor } // Reconcile is called on CRD status changes @@ -55,13 +56,7 @@ func (r *CrdReconciler) Reconcile(request reconcile.Request) (reconcile.Result, return reconcile.Result{}, err } - scalarUnits := cns.ScalarUnits{ - BatchSize: nodeNetConfig.Status.Scaler.BatchSize, - RequestThresholdPercent: nodeNetConfig.Status.Scaler.RequestThresholdPercent, - ReleaseThresholdPercent: nodeNetConfig.Status.Scaler.ReleaseThresholdPercent, - } - - if err = r.CNSClient.CreateOrUpdateNC(ncRequest, scalarUnits); err != nil { + if err = r.CNSClient.CreateOrUpdateNC(ncRequest, nodeNetConfig.Status.Scaler, nodeNetConfig.Spec); err != nil { logger.Errorf("[cns-rc] Error creating or updating NC in reconcile: %v", err) // requeue return reconcile.Result{}, err diff --git a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go index ab17045983..79a5386566 100644 --- a/cns/requestcontroller/kubecontroller/crdrequestcontroller.go +++ b/cns/requestcontroller/kubecontroller/crdrequestcontroller.go @@ -177,7 +177,6 @@ func (crdRC *crdRequestController) initCNS() error { cntxt context.Context ncRequest cns.CreateNetworkContainerRequest err error - scalarUnits cns.ScalarUnits ) cntxt = context.Background() @@ -190,15 +189,9 @@ func (crdRC *crdRequestController) initCNS() error { os.Exit(1) } - scalarUnits = cns.ScalarUnits{ - BatchSize: nodeNetConfig.Status.Scaler.BatchSize, - RequestThresholdPercent: nodeNetConfig.Status.Scaler.RequestThresholdPercent, - ReleaseThresholdPercent: nodeNetConfig.Status.Scaler.ReleaseThresholdPercent, - } - // If instance of crd is not found, pass nil to CNSClient if client.IgnoreNotFound(err) == nil { - return crdRC.CNSClient.ReconcileNCState(nil, nil, scalarUnits) + return crdRC.CNSClient.ReconcileNCState(nil, nil, nodeNetConfig.Status.Scaler, nodeNetConfig.Spec) } // If it's any other error, log it and return @@ -208,7 +201,7 @@ func (crdRC *crdRequestController) initCNS() error { // If there are no NCs, pass nil to CNSClient if len(nodeNetConfig.Status.NetworkContainers) == 0 { - return crdRC.CNSClient.ReconcileNCState(nil, nil, scalarUnits) + return crdRC.CNSClient.ReconcileNCState(nil, nil, nodeNetConfig.Status.Scaler, nodeNetConfig.Spec) } // Convert to CreateNetworkContainerRequest @@ -239,7 +232,7 @@ func (crdRC *crdRequestController) initCNS() error { } // Call cnsclient init cns passing those two things - return crdRC.CNSClient.ReconcileNCState(&ncRequest, podInfoByIP, scalarUnits) + return crdRC.CNSClient.ReconcileNCState(&ncRequest, podInfoByIP, nodeNetConfig.Status.Scaler, nodeNetConfig.Spec) } @@ -251,9 +244,13 @@ func (crdRC *crdRequestController) UpdateCRDSpec(cntxt context.Context, crdSpec return err } + logger.Printf("[cns-rc] Received update for IP count %+v", crdSpec) + //Update the CRD spec crdSpec.DeepCopyInto(&nodeNetworkConfig.Spec) + logger.Printf("[cns-rc] After deep copy %+v", nodeNetworkConfig.Spec) + //Send update to API server if err := crdRC.updateNodeNetConfig(cntxt, nodeNetworkConfig); err != nil { logger.Errorf("[cns-rc] Error updating CRD spec %v", err) diff --git a/cns/requestcontroller/kubecontroller/crdrequestcontroller_test.go b/cns/requestcontroller/kubecontroller/crdrequestcontroller_test.go index 57113fcc46..7f280a7907 100644 --- a/cns/requestcontroller/kubecontroller/crdrequestcontroller_test.go +++ b/cns/requestcontroller/kubecontroller/crdrequestcontroller_test.go @@ -99,12 +99,12 @@ type MockCNSClient struct { } // we're just testing that reconciler interacts with CNS on Reconcile(). -func (mi *MockCNSClient) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerRequest, scalarUnits cns.ScalarUnits) error { +func (mi *MockCNSClient) CreateOrUpdateNC(ncRequest cns.CreateNetworkContainerRequest, scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error { mi.MockCNSUpdated = true return nil } -func (mi *MockCNSClient) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.KubernetesPodInfo, scalarUnits cns.ScalarUnits) error { +func (mi *MockCNSClient) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.KubernetesPodInfo, scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error { mi.MockCNSInitialized = true mi.Pods = podInfoByIP mi.NCRequest = ncRequest diff --git a/cns/restserver/api_test.go b/cns/restserver/api_test.go index 875d32d1ee..3cc1cc0559 100644 --- a/cns/restserver/api_test.go +++ b/cns/restserver/api_test.go @@ -669,6 +669,8 @@ func startService() { return } + svc.IPAMPoolMonitor = fakes.NewIPAMPoolMonitorFake() + if service != nil { err = service.Start(&config) if err != nil { diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index a15915e15e..f82046cbfb 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -12,12 +12,11 @@ import ( "reflect" "github.com/Azure/azure-container-networking/cns" - "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/nmagentclient" - "github.com/Azure/azure-container-networking/cns/requestcontroller" "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" + nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" ) // This file contains the internal functions called by either HTTP APIs (api.go) or @@ -152,21 +151,15 @@ func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string, return } -func (service *HTTPRestService) StartCNSIPAMPoolMonitor(cnsService cns.HTTPService, requestController requestcontroller.RequestController) { - - // TODO, start pool monitor as well - service.PoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(cnsService, requestController) -} - // This API will be called by CNS RequestController on CRD update. -func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIp map[string]cns.KubernetesPodInfo, scalarUnits cns.ScalarUnits) int { +func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIp map[string]cns.KubernetesPodInfo, scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) int { // check if ncRequest is null, then return as there is no CRD state yet if ncRequest == nil { log.Logf("CNS starting with no NC state, podInfoMap count %d", len(podInfoByIp)) return Success } - returnCode := service.CreateOrUpdateNetworkContainerInternal(*ncRequest, scalarUnits) + returnCode := service.CreateOrUpdateNetworkContainerInternal(*ncRequest, scalar, spec) // If the NC was created successfully, then reconcile the allocated pod state if returnCode != Success { @@ -202,7 +195,7 @@ func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkCon } // This API will be called by CNS RequestController on CRD update. -func (service *HTTPRestService) CreateOrUpdateNetworkContainerInternal(req cns.CreateNetworkContainerRequest, scalarUnits cns.ScalarUnits) int { +func (service *HTTPRestService) CreateOrUpdateNetworkContainerInternal(req cns.CreateNetworkContainerRequest, scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) int { if req.NetworkContainerid == "" { logger.Errorf("[Azure CNS] Error. NetworkContainerid is empty") return NetworkContainerNotSpecified @@ -252,7 +245,12 @@ func (service *HTTPRestService) CreateOrUpdateNetworkContainerInternal(req cns.C logger.Errorf(returnMessage) } - service.PoolMonitor.UpdatePoolLimitsTransacted(scalarUnits) + if err = service.IPAMPoolMonitor.Update(scalar, spec); err != nil { + logger.Errorf("[cns-rc] Error creating or updating IPAM Pool Monitor: %v", err) + // requeue + return UnexpectedError + } return returnCode + } diff --git a/cns/restserver/internalapi_test.go b/cns/restserver/internalapi_test.go index 25dc1d77c8..b6721bf8d9 100644 --- a/cns/restserver/internalapi_test.go +++ b/cns/restserver/internalapi_test.go @@ -11,7 +11,7 @@ import ( "testing" "github.com/Azure/azure-container-networking/cns" - "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" + "github.com/Azure/azure-container-networking/cns/fakes" "github.com/google/uuid" ) @@ -20,6 +20,10 @@ const ( gatewayIp = "10.0.0.1" subnetPrfixLength = 24 dockerContainerType = cns.Docker + releasePercent = 50 + requestPercent = 100 + batchSize = 10 + initPoolSize = 10 ) var ( @@ -37,13 +41,12 @@ func TestCreateOrUpdateNetworkContainerInternal(t *testing.T) { func TestReconcileNCWithEmptyState(t *testing.T) { restartService() - var scalarUnits cns.ScalarUnits setEnv(t) setOrchestratorTypeInternal(cns.KubernetesCRD) expectedNcCount := len(svc.state.ContainerStatus) expectedAllocatedPods := make(map[string]cns.KubernetesPodInfo) - returnCode := svc.ReconcileNCState(nil, expectedAllocatedPods, scalarUnits) + returnCode := svc.ReconcileNCState(nil, expectedAllocatedPods, fakes.NewFakeScalar(releasePercent, requestPercent, batchSize), fakes.NewFakeNodeNetworkConfigSpec(initPoolSize)) if returnCode != Success { t.Errorf("Unexpected failure on reconcile with no state %d", returnCode) } @@ -53,8 +56,6 @@ func TestReconcileNCWithEmptyState(t *testing.T) { func TestReconcileNCWithExistingState(t *testing.T) { restartService() - var scalarUnits cns.ScalarUnits - svc.PoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(nil, nil) setEnv(t) setOrchestratorTypeInternal(cns.KubernetesCRD) @@ -82,7 +83,7 @@ func TestReconcileNCWithExistingState(t *testing.T) { } expectedNcCount := len(svc.state.ContainerStatus) - returnCode := svc.ReconcileNCState(&req, expectedAllocatedPods, scalarUnits) + returnCode := svc.ReconcileNCState(&req, expectedAllocatedPods, fakes.NewFakeScalar(releasePercent, requestPercent, batchSize), fakes.NewFakeNodeNetworkConfigSpec(initPoolSize)) if returnCode != Success { t.Errorf("Unexpected failure on reconcile with no state %d", returnCode) } @@ -92,8 +93,6 @@ func TestReconcileNCWithExistingState(t *testing.T) { func TestReconcileNCWithSystemPods(t *testing.T) { restartService() - svc.PoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(nil, nil) - var scalarUnits cns.ScalarUnits setEnv(t) setOrchestratorTypeInternal(cns.KubernetesCRD) @@ -122,7 +121,7 @@ func TestReconcileNCWithSystemPods(t *testing.T) { } expectedNcCount := len(svc.state.ContainerStatus) - returnCode := svc.ReconcileNCState(&req, expectedAllocatedPods, scalarUnits) + returnCode := svc.ReconcileNCState(&req, expectedAllocatedPods, fakes.NewFakeScalar(releasePercent, requestPercent, batchSize), fakes.NewFakeNodeNetworkConfigSpec(initPoolSize)) if returnCode != Success { t.Errorf("Unexpected failure on reconcile with no state %d", returnCode) } @@ -188,9 +187,7 @@ func validateCreateOrUpdateNCInternal(t *testing.T, secondaryIpCount int) { func createAndValidateNCRequest(t *testing.T, secondaryIPConfigs map[string]cns.SecondaryIPConfig, ncId string) { req := generateNetworkContainerRequest(secondaryIPConfigs, ncId) - var scalarUnits cns.ScalarUnits - svc.PoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(nil, nil) - returnCode := svc.CreateOrUpdateNetworkContainerInternal(req, scalarUnits) + returnCode := svc.CreateOrUpdateNetworkContainerInternal(req, fakes.NewFakeScalar(releasePercent, requestPercent, batchSize), fakes.NewFakeNodeNetworkConfigSpec(initPoolSize)) if returnCode != 0 { t.Fatalf("Failed to createNetworkContainerRequest, req: %+v, err: %d", req, returnCode) } diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index c076bc439c..cf4e9c7201 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -90,18 +90,18 @@ func (service *HTTPRestService) releaseIPConfigHandler(w http.ResponseWriter, r return } -func (service *HTTPRestService) MarkIPsAsPending(numberToMark int) (map[string]cns.SecondaryIPConfig, error) { - pendingReleaseIPs := make(map[string]cns.SecondaryIPConfig) +func (service *HTTPRestService) MarkIPsAsPending(numberToMark int) (map[string]cns.IPConfigurationStatus, error) { + pendingReleaseIPs := make(map[string]cns.IPConfigurationStatus) markedIPCount := 0 service.Lock() defer service.Unlock() - for uuid, ipconfig := range service.PodIPConfigState { - if ipconfig.State == cns.Available { - ipconfig.State = cns.PendingRelease - pendingReleaseIPs[uuid] = cns.SecondaryIPConfig{ - IPAddress: ipconfig.IPAddress, - } + for uuid, _ := range service.PodIPConfigState { + mutableIPConfig := service.PodIPConfigState[uuid] + if mutableIPConfig.State == cns.Available { + mutableIPConfig.State = cns.PendingRelease + service.PodIPConfigState[uuid] = mutableIPConfig + pendingReleaseIPs[uuid] = mutableIPConfig markedIPCount++ if markedIPCount == numberToMark { return pendingReleaseIPs, nil @@ -113,6 +113,8 @@ func (service *HTTPRestService) MarkIPsAsPending(numberToMark int) (map[string]c } func (service *HTTPRestService) GetPodIPConfigState() map[string]cns.IPConfigurationStatus { + service.RLock() + defer service.RUnlock() return service.PodIPConfigState } @@ -132,6 +134,14 @@ func (service *HTTPRestService) GetAvailableIPConfigs() []cns.IPConfigurationSta }) } +func (service *HTTPRestService) GetPendingReleaseIPConfigs() []cns.IPConfigurationStatus { + service.RLock() + defer service.RUnlock() + return filterIPConfigMap(service.PodIPConfigState, func(ipconfig cns.IPConfigurationStatus) bool { + return ipconfig.State == cns.PendingRelease + }) +} + func filterIPConfigMap(toBeAdded map[string]cns.IPConfigurationStatus, f func(cns.IPConfigurationStatus) bool) []cns.IPConfigurationStatus { vsf := make([]cns.IPConfigurationStatus, 0) for _, v := range toBeAdded { diff --git a/cns/restserver/ipam_test.go b/cns/restserver/ipam_test.go index 3418a65e48..1f72b55aa7 100644 --- a/cns/restserver/ipam_test.go +++ b/cns/restserver/ipam_test.go @@ -43,6 +43,7 @@ func getTestService() *HTTPRestService { var config common.ServiceConfig httpsvc, _ := NewHTTPRestService(&config, fakes.NewFakeImdsClient()) svc = httpsvc.(*HTTPRestService) + svc.IPAMPoolMonitor = fakes.NewIPAMPoolMonitorFake() setOrchestratorTypeInternal(cns.KubernetesCRD) return svc @@ -532,3 +533,44 @@ func validateIpState(t *testing.T, actualIps []cns.IPConfigurationStatus, expect } } } + +func TestIPAMMarkIPConfigAsPending(t *testing.T) { + svc := getTestService() + // set state as already allocated + state1, _ := NewPodStateWithOrchestratorContext(testIP1, 24, testPod1GUID, testNCID, cns.Available, testPod1Info) + ipconfigs := map[string]cns.IPConfigurationStatus{ + state1.ID: state1, + } + + err := UpdatePodIpConfigState(t, svc, ipconfigs) + if err != nil { + t.Fatalf("Expected to not fail adding IP's to state: %+v", err) + } + + // Release Test Pod 1 + ips, err := svc.MarkIPsAsPending(1) + if err != nil { + t.Fatalf("Unexpected failure releasing IP: %+v", err) + } + + if _, exists := ips[testPod1GUID]; !exists { + t.Fatalf("Expected ID not marked as pending: %+v", err) + } + + // Release Test Pod 1 + pendingrelease := svc.GetPendingReleaseIPConfigs() + if len(pendingrelease) != 1 { + t.Fatal("Expected pending release slice to be nonzero after pending release") + } + + available := svc.GetAvailableIPConfigs() + if len(available) != 0 { + t.Fatal("Expected available ips to be zero after marked as pending") + } + + // Call release again, should be fine + err = svc.releaseIPConfig(testPod1Info) + if err != nil { + t.Fatalf("Unexpected failure releasing IP: %+v", err) + } +} diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index 17e2d050aa..cce2494743 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -42,8 +42,8 @@ type HTTPRestService struct { PodIPIDByOrchestratorContext map[string]string // OrchestratorContext is key and value is Pod IP uuid. PodIPConfigState map[string]cns.IPConfigurationStatus // seondaryipid(uuid) is key AllocatedIPCount map[string]allocatedIPCount // key - ncid + IPAMPoolMonitor cns.IPAMPoolMonitor routingTable *routes.RoutingTable - PoolMonitor cns.IPAMPoolMonitor store store.KeyValueStore state *httpRestServiceState sync.RWMutex diff --git a/cns/service/main.go b/cns/service/main.go index 533a2edbd5..e6b256479a 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -5,6 +5,7 @@ package main import ( "bytes" + "context" "encoding/json" "fmt" "net/http" @@ -38,11 +39,12 @@ import ( const ( // Service name. - name = "azure-cns" - pluginName = "azure-vnet" - defaultCNINetworkConfigFileName = "10-azure.conflist" - configFileName = "config.json" - dncApiVersion = "?api-version=2018-03-01" + name = "azure-cns" + pluginName = "azure-vnet" + defaultCNINetworkConfigFileName = "10-azure.conflist" + configFileName = "config.json" + dncApiVersion = "?api-version=2018-03-01" + poolIPAMRefreshRateInMilliseconds = 1000 ) // Version is populated by make during build. @@ -462,6 +464,9 @@ func main() { return } + // initialize the ipam pool monitor + httpRestServiceImplementation.IPAMPoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(httpRestServiceImplementation, requestController) + //Start the RequestController which starts the reconcile loop requestControllerStopChannel := make(chan struct{}) defer close(requestControllerStopChannel) @@ -472,9 +477,13 @@ func main() { } }() - poolMonitor := ipampoolmonitor.NewCNSIPAMPoolMonitor(httpRestService, requestController) - - httpRestServiceImplementation.PoolMonitor = poolMonitor + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + if err := httpRestServiceImplementation.IPAMPoolMonitor.Start(ctx, poolIPAMRefreshRateInMilliseconds); err != nil { + logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", err) + } + }() } var netPlugin network.NetPlugin