Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f58bfc9
More fixes
neaggarwMS Feb 28, 2021
d019388
Fixed reordering
neaggarwMS Feb 28, 2021
19c2b82
Fix IPAM pool monitor init
neaggarwMS Feb 28, 2021
f83aaca
Fix CNS reconcilation before starting the httpservice listener
neaggarwMS Mar 1, 2021
1239912
Fixed build
neaggarwMS Feb 28, 2021
6af4867
DONTPUSH
neaggarwMS Mar 2, 2021
013b9df
Fixed HTTPRestService initialization
neaggarwMS Mar 2, 2021
d425294
Merge branch 'cns_fixReconcileBug' of https://github.com/neaggarwMS/a…
neaggarwMS Feb 28, 2021
c7eaf64
Fixed fakes
neaggarwMS Mar 2, 2021
4182053
Merge changes
neaggarwMS Feb 28, 2021
baca2a9
Fix CNS reconcilation before starting the httpservice listener
neaggarwMS Mar 1, 2021
cf9d8a4
Fixed build
neaggarwMS Feb 28, 2021
b1ba029
More fixes
neaggarwMS Feb 28, 2021
e0e993a
Fixed reordering
neaggarwMS Feb 28, 2021
fb7eef8
Fix IPAM pool monitor init
neaggarwMS Feb 28, 2021
194b0c8
DONTPUSH
neaggarwMS Mar 2, 2021
b7553d8
Fixed HTTPRestService initialization
neaggarwMS Mar 2, 2021
438acda
Fixed fakes
neaggarwMS Mar 2, 2021
78590ea
Fix IPAMPoolMonitor to handle error from CRD during decrease
neaggarwMS Mar 2, 2021
57f1707
Merge branch 'cns_fixReconcileBug' of https://github.com/neaggarwMS/a…
neaggarwMS Mar 2, 2021
d11a00c
additional logs
neaggarwMS Mar 2, 2021
559f71f
Fixed exit channel for CRD REquestController
neaggarwMS Mar 2, 2021
02444e0
Fixed logging
neaggarwMS Mar 2, 2021
cca6fbf
Fixed logging
neaggarwMS Mar 2, 2021
0cc36b3
Fixed decrease ipam pool monitor
neaggarwMS Mar 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cns/NetworkContainerContract.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ type IPConfigRequest struct {
OrchestratorContext json.RawMessage
}

func (i IPConfigRequest) String() string {
return fmt.Sprintf("[IPConfigRequest: DesiredIPAddress %s, OrchestratorContext %s]",
i.DesiredIPAddress, string(i.OrchestratorContext))
}

// IPConfigResponse is used in CNS IPAM mode as a response to CNI ADD
type IPConfigResponse struct {
PodIpInfo PodIpInfo
Expand Down
8 changes: 7 additions & 1 deletion cns/cnsclient/cnsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,15 @@ func TestMain(m *testing.M) {
}

if httpRestService != nil {
err = httpRestService.Init(&config)
if err != nil {
logger.Errorf("Failed to initialize HttpService, err:%v.\n", err)
return
}

err = httpRestService.Start(&config)
if err != nil {
logger.Errorf("Failed to start CNS, err:%v.\n", err)
logger.Errorf("Failed to start HttpService, err:%v.\n", err)
return
}
}
Expand Down
1 change: 1 addition & 0 deletions cns/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Service struct {

// ServiceAPI defines base interface.
type ServiceAPI interface {
Init(*ServiceConfig) error
Start(*ServiceConfig) error
Stop()
GetOption(string) interface{}
Expand Down
5 changes: 5 additions & 0 deletions cns/fakes/cnsfake.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,9 @@ func (fake *HTTPServiceFake) Start(*common.ServiceConfig) error {
return nil
}


func (fake *HTTPServiceFake) Init(*common.ServiceConfig) error {
return nil
}

func (fake *HTTPServiceFake) Stop() {}
8 changes: 8 additions & 0 deletions cns/fakes/requestcontrollerfake.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,18 @@ func (rc *RequestControllerFake) CarveIPConfigsAndAddToStatusAndCNS(numberOfIPCo
return cnsIPConfigs
}

func (rc *RequestControllerFake) InitRequestController() error {
return nil
}

func (rc *RequestControllerFake) StartRequestController(exitChan <-chan struct{}) error {
return nil
}

func (rc *RequestControllerFake) IsStarted() bool {
return true
}

func (rc *RequestControllerFake) UpdateCRDSpec(cntxt context.Context, desiredSpec nnc.NodeNetworkConfigSpec) error {
rc.cachedCRD.Spec = desiredSpec

Expand Down
86 changes: 58 additions & 28 deletions cns/ipampoolmonitor/ipampoolmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,23 @@ type CNSIPAMPoolMonitor struct {
pendingRelease bool

cachedNNC nnc.NodeNetworkConfig
updatingIpsNotInUseCount int
scalarUnits nnc.Scaler

cns cns.HTTPService
httpService cns.HTTPService
rc requestcontroller.RequestController
MinimumFreeIps int64
MaximumFreeIps int64

mu sync.RWMutex
}

func NewCNSIPAMPoolMonitor(cns cns.HTTPService, rc requestcontroller.RequestController) *CNSIPAMPoolMonitor {
func NewCNSIPAMPoolMonitor(httpService cns.HTTPService, rc requestcontroller.RequestController) *CNSIPAMPoolMonitor {
logger.Printf("NewCNSIPAMPoolMonitor: Create IPAM Pool Monitor")
return &CNSIPAMPoolMonitor{
pendingRelease: false,
cns: cns,
rc: rc,
pendingRelease: false,
httpService: httpService,
rc: rc,
}
}

Expand All @@ -52,7 +54,7 @@ func (pm *CNSIPAMPoolMonitor) Start(ctx context.Context, poolMonitorRefreshMilli
for {
select {
case <-ctx.Done():
return fmt.Errorf("CNS IPAM Pool Monitor received cancellation signal")
return fmt.Errorf("[ipam-pool-monitor] CNS IPAM Pool Monitor received cancellation signal")
case <-ticker.C:
err := pm.Reconcile()
if err != nil {
Expand All @@ -63,14 +65,14 @@ func (pm *CNSIPAMPoolMonitor) Start(ctx context.Context, poolMonitorRefreshMilli
}

func (pm *CNSIPAMPoolMonitor) Reconcile() error {
cnsPodIPConfigCount := len(pm.cns.GetPodIPConfigState())
pendingProgramCount := len(pm.cns.GetPendingProgramIPConfigs()) // TODO: add pending program count to real cns
allocatedPodIPCount := len(pm.cns.GetAllocatedIPConfigs())
pendingReleaseIPCount := len(pm.cns.GetPendingReleaseIPConfigs())
availableIPConfigCount := len(pm.cns.GetAvailableIPConfigs()) // TODO: add pending allocation count to real cns
cnsPodIPConfigCount := len(pm.httpService.GetPodIPConfigState())
pendingProgramCount := len(pm.httpService.GetPendingProgramIPConfigs()) // TODO: add pending program count to real cns
allocatedPodIPCount := len(pm.httpService.GetAllocatedIPConfigs())
pendingReleaseIPCount := len(pm.httpService.GetPendingReleaseIPConfigs())
availableIPConfigCount := len(pm.httpService.GetAvailableIPConfigs()) // TODO: add pending allocation count to real cns
freeIPConfigCount := pm.cachedNNC.Spec.RequestedIPCount - int64(allocatedPodIPCount)

msg := fmt.Sprintf("Pool Size: %v, Goal Size: %v, BatchSize: %v, MinFree: %v, MaxFree:%v, Allocated: %v, Available: %v, Pending Release: %v, Free: %v, Pending Program: %v",
msg := fmt.Sprintf("[ipam-pool-monitor] Pool Size: %v, Goal Size: %v, BatchSize: %v, MinFree: %v, MaxFree:%v, Allocated: %v, Available: %v, Pending Release: %v, Free: %v, Pending Program: %v",
cnsPodIPConfigCount, pm.cachedNNC.Spec.RequestedIPCount, pm.scalarUnits.BatchSize, pm.MinimumFreeIps, pm.MaximumFreeIps, allocatedPodIPCount, availableIPConfigCount, pendingReleaseIPCount, freeIPConfigCount, pendingProgramCount)

switch {
Expand All @@ -82,7 +84,7 @@ func (pm *CNSIPAMPoolMonitor) Reconcile() error {
// pod count is decreasing
case freeIPConfigCount > pm.MaximumFreeIps:
logger.Printf("[ipam-pool-monitor] Decreasing pool size...%s ", msg)
return pm.decreasePoolSize()
return pm.decreasePoolSize(pendingReleaseIPCount)

// CRD has reconciled CNS state, and target spec is now the same size as the state
// free to remove the IP's from the CRD
Expand All @@ -100,8 +102,8 @@ func (pm *CNSIPAMPoolMonitor) Reconcile() error {
}

func (pm *CNSIPAMPoolMonitor) increasePoolSize() error {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.mu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, why the flip?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason in particular, just coding practice to add a defer unlock API before Lock.


In reply to: 585721003 [](ancestors = 585721003)


var err error
var tempNNCSpec nnc.NodeNetworkConfigSpec
Expand All @@ -111,58 +113,79 @@ func (pm *CNSIPAMPoolMonitor) increasePoolSize() error {
}

tempNNCSpec.RequestedIPCount += pm.scalarUnits.BatchSize
logger.Printf("[ipam-pool-monitor] Increasing pool size, Current Pool Size: %v, Updated Requested IP Count: %v, Pods with IP's:%v, ToBeDeleted Count: %v", len(pm.cns.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))
logger.Printf("[ipam-pool-monitor] Increasing pool size, Current Pool Size: %v, Updated Requested IP Count: %v, Pods with IP's:%v, ToBeDeleted Count: %v", len(pm.httpService.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.httpService.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))

err = pm.rc.UpdateCRDSpec(context.Background(), tempNNCSpec)
if err != nil {
// caller will retry to update the CRD again
return err
}

logger.Printf("[ipam-pool-monitor] Increasing pool size: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec)
// save the updated state to cachedSpec
pm.cachedNNC.Spec = tempNNCSpec
return nil
}

func (pm *CNSIPAMPoolMonitor) decreasePoolSize() error {
pm.mu.Lock()
func (pm *CNSIPAMPoolMonitor) decreasePoolSize(existingPendingReleaseIPCount int) error {
defer pm.mu.Unlock()
pm.mu.Lock()

// mark n number of IP's as pending
pendingIpAddresses, err := pm.cns.MarkIPAsPendingRelease(int(pm.scalarUnits.BatchSize))
if err != nil {
return err
}
var err error
var newIpsMarkedAsPending bool
var pendingIpAddresses map[string]cns.IPConfigurationStatus
if pm.updatingIpsNotInUseCount == 0 ||
pm.updatingIpsNotInUseCount < existingPendingReleaseIPCount {
logger.Printf("[ipam-pool-monitor] Marking IPs as PendingRelease, ipsToBeReleasedCount %d", int(pm.scalarUnits.BatchSize))
pendingIpAddresses, err = pm.httpService.MarkIPAsPendingRelease(int(pm.scalarUnits.BatchSize))
if err != nil {
return err
}

totalIpsSetForRelease := len(pendingIpAddresses)
logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d", totalIpsSetForRelease)
newIpsMarkedAsPending = true
}

var tempNNCSpec nnc.NodeNetworkConfigSpec
tempNNCSpec, err = pm.createNNCSpecForCRD(false)
if err != nil {
return err
}

tempNNCSpec.RequestedIPCount -= int64(totalIpsSetForRelease)
logger.Printf("[ipam-pool-monitor] Decreasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's: %v, ToBeDeleted Count: %v", len(pm.cns.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.cns.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))
if newIpsMarkedAsPending {
// cache the updatingPendingRelease so that we dont re-set new IPs to PendingRelease in case UpdateCRD call fails
pm.updatingIpsNotInUseCount = len(tempNNCSpec.IPsNotInUse)
}

logger.Printf("[ipam-pool-monitor] Releasing IPCount in this batch %d, updatingPendingIpsNotInUse count %d", len(pendingIpAddresses), pm.updatingIpsNotInUseCount)

tempNNCSpec.RequestedIPCount -= int64(len(pendingIpAddresses))
logger.Printf("[ipam-pool-monitor] Decreasing pool size, Current Pool Size: %v, Requested IP Count: %v, Pods with IP's: %v, ToBeDeleted Count: %v", len(pm.httpService.GetPodIPConfigState()), tempNNCSpec.RequestedIPCount, len(pm.httpService.GetAllocatedIPConfigs()), len(tempNNCSpec.IPsNotInUse))

err = pm.rc.UpdateCRDSpec(context.Background(), tempNNCSpec)
if err != nil {
// caller will retry to update the CRD again
return err
}

logger.Printf("[ipam-pool-monitor] Decreasing pool size: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec)

// save the updated state to cachedSpec
pm.cachedNNC.Spec = tempNNCSpec
pm.pendingRelease = true

// clear the updatingPendingIpsNotInUse, as we have Updated the CRD
logger.Printf("[ipam-pool-monitor] cleaning the updatingPendingIpsNotInUse, existing length %d", pm.updatingIpsNotInUseCount)
pm.updatingIpsNotInUseCount = 0

return nil
}

// if cns pending ip release map is empty, request controller has already reconciled the CNS state,
// so we can remove it from our cache and remove the IP's from the CRD
func (pm *CNSIPAMPoolMonitor) cleanPendingRelease() error {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.mu.Lock()

var err error
var tempNNCSpec nnc.NodeNetworkConfigSpec
Expand All @@ -177,6 +200,9 @@ func (pm *CNSIPAMPoolMonitor) cleanPendingRelease() error {
return err
}

logger.Printf("[ipam-pool-monitor] cleanPendingRelease: UpdateCRDSpec succeeded for spec %+v", tempNNCSpec)


// save the updated state to cachedSpec
pm.cachedNNC.Spec = tempNNCSpec
pm.pendingRelease = false
Expand All @@ -197,7 +223,7 @@ func (pm *CNSIPAMPoolMonitor) createNNCSpecForCRD(resetNotInUseList bool) (nnc.N
spec.IPsNotInUse = make([]string, 0)
} else {
// Get All Pending IPs from CNS and populate it again.
pendingIps := pm.cns.GetPendingReleaseIPConfigs()
pendingIps := pm.httpService.GetPendingReleaseIPConfigs()
for _, pendingIp := range pendingIps {
spec.IPsNotInUse = append(spec.IPsNotInUse, pendingIp.ID)
}
Expand All @@ -208,14 +234,18 @@ func (pm *CNSIPAMPoolMonitor) createNNCSpecForCRD(resetNotInUseList bool) (nnc.N

// UpdatePoolLimitsTransacted called by request controller on reconcile to set the batch size limits
func (pm *CNSIPAMPoolMonitor) Update(scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) error {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.mu.Lock()

pm.scalarUnits = scalar

pm.MinimumFreeIps = int64(float64(pm.scalarUnits.BatchSize) * (float64(pm.scalarUnits.RequestThresholdPercent) / 100))
pm.MaximumFreeIps = int64(float64(pm.scalarUnits.BatchSize) * (float64(pm.scalarUnits.ReleaseThresholdPercent) / 100))

pm.cachedNNC.Spec = spec

logger.Printf("[ipam-pool-monitor] Update spec %+v, pm.MinimumFreeIps %d, pm.MaximumFreeIps %d",
pm.cachedNNC.Spec, pm.MinimumFreeIps, pm.MaximumFreeIps)

return nil
}
14 changes: 13 additions & 1 deletion cns/requestcontroller/kubecontroller/crdreconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,25 @@ func (r *CrdReconciler) Reconcile(request reconcile.Request) (reconcile.Result,
}

logger.Printf("[cns-rc] CRD Spec: %v", nodeNetConfig.Spec)
logger.Printf("[cns-rc] CRD Status: %v", nodeNetConfig.Status)


// If there are no network containers, don't hand it off to CNS
if len(nodeNetConfig.Status.NetworkContainers) == 0 {
logger.Errorf("[cns-rc] Empty NetworkContainers")
return reconcile.Result{}, nil
}

networkContainer := nodeNetConfig.Status.NetworkContainers[0]
logger.Printf("[cns-rc] CRD Status: NcId: [%s], Version: [%d], podSubnet: [%s], Subnet CIDR: [%s], " +
"Gateway Addr: [%s], Primary IP: [%s], SecondaryIpsCount: [%d]",
networkContainer.ID,
networkContainer.Version,
networkContainer.SubnetName,
networkContainer.SubnetAddressSpace,
networkContainer.DefaultGateway,
networkContainer.PrimaryIP,
len(networkContainer.IPAssignments))

// Otherwise, create NC request and hand it off to CNS
ncRequest, err = CRDStatusToNCRequest(nodeNetConfig.Status)
if err != nil {
Expand Down
45 changes: 37 additions & 8 deletions cns/requestcontroller/kubecontroller/crdrequestcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"sync"

"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/cnsclient"
Expand Down Expand Up @@ -42,6 +43,9 @@ type crdRequestController struct {
CNSClient cnsclient.APIClient
nodeName string //name of node running this program
Reconciler *CrdReconciler
initialized bool
Started bool
lock sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at this and its usage for a bit, I understand the intent, but I think this is a sign that this struct needs to be split into different abstractions. Ideally, this shouldn't need to carry state about initialization, we should just have ready-to-go dependencies injected, or document proper usage. I would suggest adding a todo to clean up further because the intention of these is hard to follow. Also, since there is a getter, Started need not be exported.

Copy link
Member Author

@neaggarwMS neaggarwMS Mar 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of code requires adding more tests. Created a PBI to incorporate feedback and add more tests
PBI: https://msazure.visualstudio.com/One/_workitems/edit/9284845


In reply to: 585726500 [](ancestors = 585726500)

}

// GetKubeConfig precedence
Expand Down Expand Up @@ -139,20 +143,38 @@ func NewCrdRequestController(restService *restserver.HTTPRestService, kubeconfig
return &crdRequestController, nil
}

// InitRequestController will initialize/reconcile the CNS state
func (crdRC *crdRequestController) InitRequestController() error {
logger.Printf("InitRequestController")

defer crdRC.lock.Unlock()
crdRC.lock.Lock()

if err := crdRC.initCNS(); err != nil {
logger.Errorf("[cns-rc] Error initializing cns state: %v", err)
return err
}

crdRC.initialized = true
return nil
}

// StartRequestController starts the Reconciler loop which watches for CRD status updates
// Blocks until SIGINT or SIGTERM is received
// Notifies exitChan when kill signal received
func (crdRC *crdRequestController) StartRequestController(exitChan <-chan struct{}) error {
var (
err error
)
logger.Printf("StartRequestController")

logger.Printf("Initializing CNS state")
if err = crdRC.initCNS(); err != nil {
logger.Errorf("[cns-rc] Error initializing cns state: %v", err)
return err
crdRC.lock.Lock()
if crdRC.initialized != true {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could simplify with if !crdRC.initialized {}

crdRC.lock.Unlock()
return fmt.Errorf("Failed to start requestController, state is not initialized [%v]", crdRC)
}

// Setting the started state
crdRC.Started = true
crdRC.lock.Unlock()

logger.Printf("Starting reconcile loop")
if err := crdRC.mgr.Start(exitChan); err != nil {
if crdRC.isNotDefined(err) {
Expand All @@ -166,6 +188,13 @@ func (crdRC *crdRequestController) StartRequestController(exitChan <-chan struct
return nil
}

// return if RequestController is started
func (crdRC *crdRequestController) IsStarted() bool {
defer crdRC.lock.Unlock()
crdRC.lock.Lock()
return crdRC.Started
}

// InitCNS initializes cns by passing pods and a createnetworkcontainerrequest
func (crdRC *crdRequestController) initCNS() error {
var (
Expand Down Expand Up @@ -242,7 +271,7 @@ func (crdRC *crdRequestController) initCNS() error {
}

// UpdateCRDSpec updates the CRD spec
func (crdRC *crdRequestController) UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error {
func (crdRC *crdRequestController) UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think anything changed here, just an extra space snuck in before the return type

nodeNetworkConfig, err := crdRC.getNodeNetConfig(cntxt, crdRC.nodeName, k8sNamespace)
if err != nil {
logger.Errorf("[cns-rc] Error getting CRD when updating spec %v", err)
Expand Down
2 changes: 2 additions & 0 deletions cns/requestcontroller/requestcontrollerintreface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

// RequestController interface for cns to interact with the request controller
type RequestController interface {
InitRequestController() error
StartRequestController(exitChan <-chan struct{}) error
UpdateCRDSpec(cntxt context.Context, crdSpec nnc.NodeNetworkConfigSpec) error
IsStarted() bool
}
Loading