Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions cns/ipampool/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Monitor struct {
metastate metaState
nnccli nodeNetworkConfigSpecUpdater
httpService cns.HTTPService
initialized chan interface{}
started chan interface{}
nncSource chan v1alpha.NodeNetworkConfig
once sync.Once
}
Expand All @@ -60,11 +60,14 @@ func NewMonitor(httpService cns.HTTPService, nnccli nodeNetworkConfigSpecUpdater
opts: opts,
httpService: httpService,
nnccli: nnccli,
initialized: make(chan interface{}),
started: make(chan interface{}),
nncSource: make(chan v1alpha.NodeNetworkConfig),
}
}

// Start begins the Monitor's pool reconcile loop.
// On first run, it will block until a NodeNetworkConfig is received (through a call to Update()).
// Subsequently, it will run run once per RefreshDelay and attempt to re-reconcile the pool.
func (pm *Monitor) Start(ctx context.Context) error {
logger.Printf("[ipam-pool-monitor] Starting CNS IPAM Pool Monitor")

Expand All @@ -78,7 +81,7 @@ func (pm *Monitor) Start(ctx context.Context) error {
return errors.Wrap(ctx.Err(), "pool monitor context closed")
case <-ticker.C: // attempt to reconcile every tick.
select {
case <-pm.initialized: // this blocks until we have initialized
case <-pm.started: // this blocks until we have initialized
// if we have initialized and enter this case, we proceed out of the select and continue to reconcile.
default:
// if we have NOT initialized and enter this case, we continue out of this iteration and let the for loop begin again.
Expand All @@ -90,7 +93,7 @@ func (pm *Monitor) Start(ctx context.Context) error {
pm.metastate.batch = scaler.BatchSize
pm.metastate.max = scaler.MaxIPCount
pm.metastate.minFreeCount, pm.metastate.maxFreeCount = CalculateMinFreeIPs(scaler), CalculateMaxFreeIPs(scaler)
pm.once.Do(func() { close(pm.initialized) }) // close the init channel the first time we receive a NodeNetworkConfig.
pm.once.Do(func() { close(pm.started) }) // close the init channel the first time we receive a NodeNetworkConfig.
}
// if control has flowed through the select(s) to this point, we can now reconcile.
err := pm.reconcile(ctx)
Expand Down Expand Up @@ -334,6 +337,10 @@ func (pm *Monitor) GetStateSnapshot() cns.IpamPoolMonitorStateSnapshot {

// Update ingests a NodeNetworkConfig, clamping some values to ensure they are legal and then
// pushing it to the Monitor's source channel.
// If the Monitor has been Started but is blocking until it receives an NNC, this will start
// the pool reconcile loop.
// If the Monitor has not been Started, this will block until Start() is called, which will
// immediately read this passed NNC and start the pool reconcile loop.
func (pm *Monitor) Update(nnc *v1alpha.NodeNetworkConfig) {
pm.clampScaler(&nnc.Status.Scaler)

Expand Down
3 changes: 1 addition & 2 deletions cns/restserver/internalapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ func (service *HTTPRestService) ReconcileNCState(
if returnCode != types.Success {
return returnCode
}
service.IPAMPoolMonitor.Update(nnc)

// now parse the secondaryIP list, if it exists in PodInfo list, then assign that ip.
for _, secIpConfig := range ncRequest.SecondaryIPConfigs {
Expand Down Expand Up @@ -256,7 +255,7 @@ func (service *HTTPRestService) ReconcileNCState(
}
}

err := service.MarkExistingIPsAsPending(nnc.Spec.IPsNotInUse)
err := service.MarkExistingIPsAsPendingRelease(nnc.Spec.IPsNotInUse)
if err != nil {
logger.Errorf("[Azure CNS] Error. Failed to mark IPs as pending %v", nnc.Spec.IPsNotInUse)
return types.UnexpectedError
Expand Down
4 changes: 2 additions & 2 deletions cns/restserver/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ func (service *HTTPRestService) releaseIPConfig(podInfo cns.PodInfo) error {
return nil
}

// called when CNS is starting up and there are existing ipconfigs in the CRD that are marked as pending
func (service *HTTPRestService) MarkExistingIPsAsPending(pendingIPIDs []string) error {
// MarkExistingIPsAsPendingRelease is called when CNS is starting up and there are existing ipconfigs in the CRD that are marked as pending.
func (service *HTTPRestService) MarkExistingIPsAsPendingRelease(pendingIPIDs []string) error {
service.Lock()
defer service.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions cns/restserver/ipam_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func TestIPAMMarkExistingIPConfigAsPending(t *testing.T) {

// mark available ip as as pending
pendingIPIDs := []string{testPod2GUID}
err = svc.MarkExistingIPsAsPending(pendingIPIDs)
err = svc.MarkExistingIPsAsPendingRelease(pendingIPIDs)
if err != nil {
t.Fatalf("Expected to successfully mark available ip as pending")
}
Expand All @@ -705,7 +705,7 @@ func TestIPAMMarkExistingIPConfigAsPending(t *testing.T) {

// attempt to mark assigned ipconfig as pending, expect fail
pendingIPIDs = []string{testPod1GUID}
err = svc.MarkExistingIPsAsPending(pendingIPIDs)
err = svc.MarkExistingIPsAsPendingRelease(pendingIPIDs)
if err == nil {
t.Fatalf("Expected to fail when marking assigned ip as pending")
}
Expand Down
88 changes: 56 additions & 32 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,8 +820,8 @@ type ncStateReconciler interface {
}

// TODO(rbtr) where should this live??
// InitCNS initializes cns by passing pods and a createnetworkcontainerrequest
func initCNS(ctx context.Context, cli nodeNetworkConfigGetter, ncReconciler ncStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error {
// reconcileInitialCNSState initializes cns by passing pods and a CreateNetworkContainerRequest
func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ncReconciler ncStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider) error {
// Get nnc using direct client
nnc, err := cli.Get(ctx)
if err != nil {
Expand Down Expand Up @@ -864,8 +864,6 @@ func initCNS(ctx context.Context, cli nodeNetworkConfigGetter, ncReconciler ncSt

// InitializeCRDState builds and starts the CRD controllers.
func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cnsconfig *configuration.CNSConfig) error {
logger.Printf("[Azure CNS] Starting request controller")

// convert interface type to implementation type
httpRestServiceImplementation, ok := httpRestService.(*restserver.HTTPRestService)
if !ok {
Expand All @@ -880,40 +878,23 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
}
httpRestServiceImplementation.SetNodeOrchestrator(&orchestrator)

// build default clientset.
kubeConfig, err := ctrl.GetConfig()
kubeConfig.UserAgent = fmt.Sprintf("azure-cns-%s", version)
if err != nil {
logger.Errorf("[Azure CNS] Failed to get kubeconfig for request controller: %v", err)
return err
}
nnccli, err := nodenetworkconfig.NewClient(kubeConfig)
clientset, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return errors.Wrap(err, "failed to create NNC client")
return errors.Wrap(err, "failed to build clientset")
}

// get nodename for scoping kube requests to node.
nodeName, err := configuration.NodeName()
if err != nil {
return errors.Wrap(err, "failed to get NodeName")
}
// TODO(rbtr): nodename and namespace should be in the cns config
scopedcli := kubecontroller.NewScopedClient(nnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName})

// initialize the ipam pool monitor
poolOpts := ipampool.Options{
RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond,
}
poolMonitor := ipampool.NewMonitor(httpRestServiceImplementation, scopedcli, &poolOpts)
httpRestServiceImplementation.IPAMPoolMonitor = poolMonitor
logger.Printf("Starting IPAM Pool Monitor")
go func() {
if e := poolMonitor.Start(ctx); e != nil {
logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", e)
}
}()

clientset, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return errors.Wrap(err, "failed to build clientset")
}

var podInfoByIPProvider cns.PodInfoByIPProvider
if cnsconfig.InitializeFromCNI {
Expand All @@ -939,19 +920,49 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
})
}

// create scoped kube clients.
nnccli, err := nodenetworkconfig.NewClient(kubeConfig)
if err != nil {
return errors.Wrap(err, "failed to create NNC client")
}
// TODO(rbtr): nodename and namespace should be in the cns config
scopedcli := kubecontroller.NewScopedClient(nnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName})

// initialize the ipam pool monitor
poolOpts := ipampool.Options{
RefreshDelay: poolIPAMRefreshRateInMilliseconds * time.Millisecond,
}
poolMonitor := ipampool.NewMonitor(httpRestServiceImplementation, scopedcli, &poolOpts)
httpRestServiceImplementation.IPAMPoolMonitor = poolMonitor

// reconcile initial CNS state from CNI or apiserver.
// apiserver nnc might not be registered or api server might be down and crashloop backof puts us outside of 5-10 minutes we have for
// aks addons to come up so retry a bit more aggresively here.
// will retry 10 times maxing out at a minute taking about 8 minutes before it gives up.
attempt := 0
err = retry.Do(func() error {
err = initCNS(ctx, scopedcli, httpRestServiceImplementation, podInfoByIPProvider)
attempt++
logger.Printf("reconciling initial CNS state attempt: %d", attempt)
err = reconcileInitialCNSState(ctx, scopedcli, httpRestServiceImplementation, podInfoByIPProvider)
if err != nil {
logger.Errorf("[Azure CNS] Failed to init cns with err: %v", err)
logger.Errorf("failed to reconcile initial CNS state, attempt: %d err: %v", attempt, err)
}
return errors.Wrap(err, "failed to initialize CNS state")
}, retry.Context(ctx), retry.Delay(initCNSInitalDelay), retry.MaxDelay(time.Minute))
if err != nil {
return err
}
logger.Printf("reconciled initial CNS state after %d attempts", attempt)

// start the pool Monitor before the Reconciler, since it needs to be ready to receive an
// NodeNetworkConfig update by the time the Reconciler tries to send it.
go func() {
logger.Printf("Starting IPAM Pool Monitor")
if e := poolMonitor.Start(ctx); e != nil {
logger.Errorf("[Azure CNS] Failed to start pool monitor with err: %v", e)
}
}()
logger.Printf("initialized and started IPAM pool monitor")

// the nodeScopedCache sets Selector options on the Manager cache which are used
// to perform *server-side* filtering of the cached objects. This is very important
Expand Down Expand Up @@ -982,31 +993,42 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
return errors.Wrapf(err, "failed to get node %s", nodeName)
}

reconciler := kubecontroller.NewReconciler(nnccli, httpRestServiceImplementation, httpRestServiceImplementation.IPAMPoolMonitor)
reconciler := kubecontroller.NewReconciler(nnccli, httpRestServiceImplementation, poolMonitor)
// pass Node to the Reconciler for Controller xref
if err := reconciler.SetupWithManager(manager, node); err != nil {
return errors.Wrapf(err, "failed to setup reconciler with manager")
}

// Start the RequestController which starts the reconcile loop
// Start the Manager which starts the reconcile loop.
// The Reconciler will send an initial NodeNetworkConfig update to the PoolMonitor, starting the
// Monitor's internal loop.
go func() {
logger.Printf("Starting NodeNetworkConfig reconciler.")
for {
if err := manager.Start(ctx); err != nil {
logger.Errorf("[Azure CNS] Failed to start request controller: %v", err)
// retry to start the request controller
// todo: add a CNS metric to count # of failures
} else {
logger.Printf("[Azure CNS] Exiting RequestController")
logger.Printf("exiting NodeNetworkConfig reconciler")
return
}

// Retry after 1sec
time.Sleep(time.Second)
}
}()
logger.Printf("initialized NodeNetworkConfig reconciler")
// wait for up to 10m for the Reconciler to run once.
timedCtx, cancel := context.WithTimeout(ctx, 10*time.Minute) //nolint:gomnd // default 10m
defer cancel()
if started := reconciler.Started(timedCtx); !started {
return errors.Errorf("timed out waiting for reconciler start")
}
logger.Printf("started NodeNetworkConfig reconciler")

logger.Printf("Starting SyncHostNCVersion")
go func() {
logger.Printf("starting SyncHostNCVersion loop")
// Periodically poll vfp programmed NC version from NMAgent
tickerChannel := time.Tick(time.Duration(cnsconfig.SyncHostNCVersionIntervalMs) * time.Millisecond)
for {
Expand All @@ -1016,10 +1038,12 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
httpRestServiceImplementation.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode)
cancel()
case <-ctx.Done():
logger.Printf("exiting SyncHostNCVersion")
return
}
}
}()
logger.Printf("initialized and started SyncHostNCVersion loop")

return nil
}
20 changes: 20 additions & 0 deletions cns/singletenantcontroller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubecontroller

import (
"context"
"sync"

"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/logger"
Expand Down Expand Up @@ -37,13 +38,16 @@ type Reconciler struct {
cnscli cnsClient
ipampoolmonitorcli ipamPoolMonitorClient
nnccli nncGetter
started chan interface{}
once sync.Once
}

func NewReconciler(nnccli nncGetter, cnscli cnsClient, ipampipampoolmonitorcli ipamPoolMonitorClient) *Reconciler {
return &Reconciler{
cnscli: cnscli,
ipampoolmonitorcli: ipampipampoolmonitorcli,
nnccli: nnccli,
started: make(chan interface{}),
}
}

Expand Down Expand Up @@ -87,9 +91,25 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco
// record assigned IPs metric
allocatedIPs.Set(float64(len(nnc.Status.NetworkContainers[0].IPAssignments)))

// we have received and pushed an NNC update, we are "Started"
r.once.Do(func() { close(r.started) })
return reconcile.Result{}, nil
}

// Started blocks until the Reconciler has reconciled at least once,
// then, and any time that it is called after that, it immediately returns true.
// It accepts a cancellable Context and if the context is closed
// before Start it will return false. Passing a closed Context after the
// Reconciler is started is indeterminate and the response is psuedorandom.
func (r *Reconciler) Started(ctx context.Context) bool {
select {
case <-r.started:
return true
case <-ctx.Done():
return false
}
}

// SetupWithManager Sets up the reconciler with a new manager, filtering using NodeNetworkConfigFilter on nodeName.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, node *v1.Node) error {
err := ctrl.NewControllerManagedBy(mgr).
Expand Down