From 6809ccdeebc835c4a5906e184388cc59922eb774 Mon Sep 17 00:00:00 2001 From: Evan Baker Date: Wed, 6 Apr 2022 20:42:57 +0000 Subject: [PATCH] create nnc listener concept and adapt existing poolmonitor and swift service to it Signed-off-by: Evan Baker --- cns/api.go | 2 +- cns/fakes/monitor.go | 3 +- cns/ipampool/monitor.go | 3 +- cns/ipampool/monitor_test.go | 3 +- cns/service/main.go | 2 +- cns/singletenantcontroller/conversion.go | 39 +++++++++++++ cns/singletenantcontroller/reconciler.go | 58 +++++++------------ cns/singletenantcontroller/reconciler_test.go | 12 ++-- 8 files changed, 74 insertions(+), 48 deletions(-) diff --git a/cns/api.go b/cns/api.go index c080cb6193..1e2b8349db 100644 --- a/cns/api.go +++ b/cns/api.go @@ -271,7 +271,7 @@ type NodeConfiguration struct { type IPAMPoolMonitor interface { Start(ctx context.Context) error - Update(nnc *v1alpha.NodeNetworkConfig) + Update(nnc *v1alpha.NodeNetworkConfig) error GetStateSnapshot() IpamPoolMonitorStateSnapshot } diff --git a/cns/fakes/monitor.go b/cns/fakes/monitor.go index c41f1c2f77..e3dd49b2a8 100644 --- a/cns/fakes/monitor.go +++ b/cns/fakes/monitor.go @@ -19,8 +19,9 @@ func (*MonitorFake) Start(ctx context.Context) error { return nil } -func (f *MonitorFake) Update(nnc *v1alpha.NodeNetworkConfig) { +func (f *MonitorFake) Update(nnc *v1alpha.NodeNetworkConfig) error { f.NodeNetworkConfig = nnc + return nil } func (*MonitorFake) Reconcile() error { diff --git a/cns/ipampool/monitor.go b/cns/ipampool/monitor.go index 9e280ad67d..645a2d8834 100644 --- a/cns/ipampool/monitor.go +++ b/cns/ipampool/monitor.go @@ -367,7 +367,7 @@ func GenerateARMID(nc *v1alpha.NetworkContainer) string { // the pool reconcile loop. // If the Monitor has not been Started, this will block until Start() is called, which will // immediately read this passed NNC and start the pool reconcile loop. -func (pm *Monitor) Update(nnc *v1alpha.NodeNetworkConfig) { +func (pm *Monitor) Update(nnc *v1alpha.NodeNetworkConfig) error { pm.clampScaler(&nnc.Status.Scaler) // if the nnc has converged, observe the pool scaling latency (if any). @@ -377,6 +377,7 @@ func (pm *Monitor) Update(nnc *v1alpha.NodeNetworkConfig) { metric.ObserverPoolScaleLatency() } pm.nncSource <- *nnc + return nil } // clampScaler makes sure that the values stored in the scaler are sane. diff --git a/cns/ipampool/monitor_test.go b/cns/ipampool/monitor_test.go index 34d81581de..fcdbbb00c0 100644 --- a/cns/ipampool/monitor_test.go +++ b/cns/ipampool/monitor_test.go @@ -26,10 +26,11 @@ type directUpdatePoolMonitor struct { cns.IPAMPoolMonitor } -func (d *directUpdatePoolMonitor) Update(nnc *v1alpha.NodeNetworkConfig) { +func (d *directUpdatePoolMonitor) Update(nnc *v1alpha.NodeNetworkConfig) error { scaler := nnc.Status.Scaler d.m.spec = nnc.Spec d.m.metastate.minFreeCount, d.m.metastate.maxFreeCount = CalculateMinFreeIPs(scaler), CalculateMaxFreeIPs(scaler) + return nil } type testState struct { diff --git a/cns/service/main.go b/cns/service/main.go index 5abaf7069f..cecbd7fbad 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -1010,7 +1010,7 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn return errors.Wrapf(err, "failed to get node %s", nodeName) } - reconciler := kubecontroller.NewReconciler(nnccli, httpRestServiceImplementation, poolMonitor) + reconciler := kubecontroller.NewReconciler(nnccli, kubecontroller.SwiftNodeNetworkConfigListener(httpRestServiceImplementation), poolMonitor) // pass Node to the Reconciler for Controller xref if err := reconciler.SetupWithManager(manager, node); err != nil { return errors.Wrapf(err, "failed to setup reconciler with manager") diff --git a/cns/singletenantcontroller/conversion.go b/cns/singletenantcontroller/conversion.go index e1dbd5be5d..8cf7dd2de0 100644 --- a/cns/singletenantcontroller/conversion.go +++ b/cns/singletenantcontroller/conversion.go @@ -7,6 +7,9 @@ import ( "strings" "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/logger" + "github.com/Azure/azure-container-networking/cns/restserver" + cnstypes "github.com/Azure/azure-container-networking/cns/types" "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" "github.com/pkg/errors" ) @@ -20,6 +23,42 @@ var ( ErrUnsupportedNCQuantity = errors.New("unsupported number of network containers") ) +type cnsClient interface { + CreateOrUpdateNetworkContainerInternal(*cns.CreateNetworkContainerRequest) cnstypes.ResponseCode +} + +var _ nodeNetworkConfigListener = (NodeNetworkConfigListenerFunc)(nil) //nolint:gocritic // clarity + +type NodeNetworkConfigListenerFunc func(*v1alpha.NodeNetworkConfig) error + +func (f NodeNetworkConfigListenerFunc) Update(nnc *v1alpha.NodeNetworkConfig) error { + return f(nnc) +} + +// SwiftNodeNetworkConfigListener return a function which satisfies the NodeNetworkConfigListener +// interface. It accepts a CreateOrUpdateNetworkContainerInternal implementation, and when Update +// is called, transforms the NNC in to an NC Request and calls the CNS Service implementation with +// that request. +func SwiftNodeNetworkConfigListener(cnscli cnsClient) NodeNetworkConfigListenerFunc { + return func(nnc *v1alpha.NodeNetworkConfig) error { + // Create NC request and hand it off to CNS + ncRequest, err := CRDStatusToNCRequest(&nnc.Status) + if err != nil { + return errors.Wrap(err, "failed to convert NNC status to network container request") + } + responseCode := cnscli.CreateOrUpdateNetworkContainerInternal(&ncRequest) + err = restserver.ResponseCodeToError(responseCode) + if err != nil { + logger.Errorf("[cns-rc] Error creating or updating NC in reconcile: %v", err) + return errors.Wrap(err, "failed to create or update network container") + } + + // record assigned IPs metric + allocatedIPs.Set(float64(len(nnc.Status.NetworkContainers[0].IPAssignments))) + return nil + } +} + // CRDStatusToNCRequest translates a crd status to createnetworkcontainer request func CRDStatusToNCRequest(status *v1alpha.NodeNetworkConfigStatus) (cns.CreateNetworkContainerRequest, error) { // if NNC has no NC, return an empty request diff --git a/cns/singletenantcontroller/reconciler.go b/cns/singletenantcontroller/reconciler.go index f591eb058a..e0b436911b 100644 --- a/cns/singletenantcontroller/reconciler.go +++ b/cns/singletenantcontroller/reconciler.go @@ -4,10 +4,7 @@ import ( "context" "sync" - "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/logger" - "github.com/Azure/azure-container-networking/cns/restserver" - cnstypes "github.com/Azure/azure-container-networking/cns/types" "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" @@ -21,12 +18,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -type cnsClient interface { - CreateOrUpdateNetworkContainerInternal(*cns.CreateNetworkContainerRequest) cnstypes.ResponseCode -} - -type ipamPoolMonitorClient interface { - Update(*v1alpha.NodeNetworkConfig) +type nodeNetworkConfigListener interface { + Update(*v1alpha.NodeNetworkConfig) error } type nncGetter interface { @@ -35,19 +28,21 @@ type nncGetter interface { // Reconciler watches for CRD status changes type Reconciler struct { - cnscli cnsClient - ipampoolmonitorcli ipamPoolMonitorClient - nnccli nncGetter - started chan interface{} - once sync.Once + nncListeners []nodeNetworkConfigListener + nnccli nncGetter + once sync.Once + started chan interface{} } -func NewReconciler(nnccli nncGetter, cnscli cnsClient, ipampipampoolmonitorcli ipamPoolMonitorClient) *Reconciler { +// NewReconciler creates a NodeNetworkConfig Reconciler which will get updates from the Kubernetes +// apiserver for NNC events. +// Provided nncListeners are passed the NNC after the Reconcile preprocesses it. Note: order matters! The +// passed Listeners are notified in the order provided. +func NewReconciler(nnccli nncGetter, nncListeners ...nodeNetworkConfigListener) *Reconciler { return &Reconciler{ - cnscli: cnscli, - ipampoolmonitorcli: ipampipampoolmonitorcli, - nnccli: nnccli, - started: make(chan interface{}), + nncListeners: nncListeners, + nnccli: nnccli, + started: make(chan interface{}), } } @@ -65,32 +60,19 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco logger.Printf("[cns-rc] CRD Spec: %v", nnc.Spec) - // If there are no network containers, don't hand it off to CNS + // If there are no network containers, don't continue to updating Listeners if len(nnc.Status.NetworkContainers) == 0 { logger.Errorf("[cns-rc] Empty NetworkContainers") return reconcile.Result{}, nil } - // Create NC request and hand it off to CNS - ncRequest, err := CRDStatusToNCRequest(&nnc.Status) - if err != nil { - logger.Errorf("[cns-rc] Error translating crd status to nc request %v", err) - // requeue - return reconcile.Result{}, errors.Wrap(err, "failed to convert NNC status to network container request") - } - - responseCode := r.cnscli.CreateOrUpdateNetworkContainerInternal(&ncRequest) - err = restserver.ResponseCodeToError(responseCode) - if err != nil { - logger.Errorf("[cns-rc] Error creating or updating NC in reconcile: %v", err) - // requeue - return reconcile.Result{}, errors.Wrap(err, "failed to create or update network container") + // push the NNC to the registered NNC Sinks + for i := range r.nncListeners { + if err := r.nncListeners[i].Update(nnc); err != nil { + return reconcile.Result{}, errors.Wrap(err, "nnc listener return error during update") + } } - r.ipampoolmonitorcli.Update(nnc) - // record assigned IPs metric - allocatedIPs.Set(float64(len(nnc.Status.NetworkContainers[0].IPAssignments))) - // we have received and pushed an NNC update, we are "Started" r.once.Do(func() { close(r.started) }) return reconcile.Result{}, nil diff --git a/cns/singletenantcontroller/reconciler_test.go b/cns/singletenantcontroller/reconciler_test.go index 9e396c3ae2..39c9cd2e4e 100644 --- a/cns/singletenantcontroller/reconciler_test.go +++ b/cns/singletenantcontroller/reconciler_test.go @@ -25,7 +25,7 @@ type cnsClientState struct { type mockCNSClient struct { state cnsClientState createOrUpdateNC func(*cns.CreateNetworkContainerRequest) cnstypes.ResponseCode - update func(*v1alpha.NodeNetworkConfig) + update func(*v1alpha.NodeNetworkConfig) error } //nolint:gocritic // ignore hugeParam pls @@ -34,9 +34,9 @@ func (m *mockCNSClient) CreateOrUpdateNetworkContainerInternal(req *cns.CreateNe return m.createOrUpdateNC(req) } -func (m *mockCNSClient) Update(nnc *v1alpha.NodeNetworkConfig) { +func (m *mockCNSClient) Update(nnc *v1alpha.NodeNetworkConfig) error { m.state.nnc = nnc - m.update(nnc) + return m.update(nnc) } type mockNCGetter struct { @@ -131,7 +131,9 @@ func TestReconcile(t *testing.T) { createOrUpdateNC: func(*cns.CreateNetworkContainerRequest) cnstypes.ResponseCode { return cnstypes.Success }, - update: func(*v1alpha.NodeNetworkConfig) {}, + update: func(*v1alpha.NodeNetworkConfig) error { + return nil + }, }, wantErr: false, wantCNSClientState: cnsClientState{ @@ -148,7 +150,7 @@ func TestReconcile(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - r := NewReconciler(&tt.ncGetter, &tt.cnsClient, &tt.cnsClient) + r := NewReconciler(&tt.ncGetter, SwiftNodeNetworkConfigListener(&tt.cnsClient), &tt.cnsClient) got, err := r.Reconcile(context.Background(), tt.in) if tt.wantErr { require.Error(t, err)