diff --git a/npm/pkg/controlplane/controllers/v2/nameSpaceController_test.go b/npm/pkg/controlplane/controllers/v2/nameSpaceController_test.go new file mode 100644 index 0000000000..2bd2549019 --- /dev/null +++ b/npm/pkg/controlplane/controllers/v2/nameSpaceController_test.go @@ -0,0 +1,712 @@ +// Copyright 2018 Microsoft. All rights reserved. +// MIT License +package controllers + +import ( + "reflect" + "testing" + "time" + + "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" + dpmocks "github.com/Azure/azure-container-networking/npm/pkg/dataplane/mocks" + "github.com/Azure/azure-container-networking/npm/util" + gomock "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kubeinformers "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" +) + +var noResyncPeriodFunc = func() time.Duration { return 0 } + +type expectedNsValues struct { + expectedLenOfNsMap int + expectedLenOfWorkQueue int +} + +type nameSpaceFixture struct { + t *testing.T + + nsLister []*corev1.Namespace + // Objects from here preloaded into NewSimpleFake. + kubeobjects []runtime.Object + + dp dataplane.GenericDataplane + nsController *NamespaceController + kubeInformer kubeinformers.SharedInformerFactory +} + +func newNsFixture(t *testing.T, dp dataplane.GenericDataplane) *nameSpaceFixture { + f := &nameSpaceFixture{ + t: t, + nsLister: []*corev1.Namespace{}, + kubeobjects: []runtime.Object{}, + dp: dp, + } + return f +} + +func (f *nameSpaceFixture) newNsController(_ chan struct{}) { + kubeclient := k8sfake.NewSimpleClientset(f.kubeobjects...) + f.kubeInformer = kubeinformers.NewSharedInformerFactory(kubeclient, noResyncPeriodFunc()) + + npmNamespaceCache := &NpmNamespaceCache{NsMap: make(map[string]*Namespace)} + f.nsController = NewNamespaceController( + f.kubeInformer.Core().V1().Namespaces(), f.dp, npmNamespaceCache) + + for _, ns := range f.nsLister { + err := f.kubeInformer.Core().V1().Namespaces().Informer().GetIndexer().Add(ns) + if err != nil { + f.t.Errorf("Error adding namespace to informer: %v", err) + } + } + // Do not start informer to avoid unnecessary event triggers. + // (TODO) Leave stopCh and below commented code to enhance UTs to even check event triggers as well later if possible + // f.kubeInformer.Start() +} + +func newNameSpace(name, rv string, labels map[string]string) *corev1.Namespace { + return &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: labels, + ResourceVersion: rv, + }, + } +} + +func addNamespace(t *testing.T, f *nameSpaceFixture, nsObj *corev1.Namespace) { + t.Logf("Calling add namespace event") + f.nsController.addNamespace(nsObj) + if f.nsController.workqueue.Len() == 0 { + t.Logf("Add Namespace: worker queue length is 0 ") + return + } + f.nsController.processNextWorkItem() +} + +func updateNamespace(t *testing.T, f *nameSpaceFixture, oldNsObj, newNsObj *corev1.Namespace) { + addNamespace(t, f, oldNsObj) + t.Logf("Complete add namespace event") + + t.Logf("Updating kubeinformer namespace object") + err := f.kubeInformer.Core().V1().Namespaces().Informer().GetIndexer().Update(newNsObj) + if err != nil { + f.t.Errorf("Error updating namespace to informer: %v", err) + } + + t.Logf("Calling update namespace event") + f.nsController.updateNamespace(oldNsObj, newNsObj) + if f.nsController.workqueue.Len() == 0 { + t.Logf("Update Namespace: worker queue length is 0 ") + return + } + f.nsController.processNextWorkItem() +} + +func deleteNamespace(t *testing.T, f *nameSpaceFixture, nsObj *corev1.Namespace, isDeletedFinalStateUnknownObject IsDeletedFinalStateUnknownObject) { + addNamespace(t, f, nsObj) + t.Logf("Complete add namespace event") + + t.Logf("Updating kubeinformer namespace object") + err := f.kubeInformer.Core().V1().Namespaces().Informer().GetIndexer().Delete(nsObj) + if err != nil { + f.t.Errorf("Error deleting namespace to informer: %v", err) + } + t.Logf("Calling delete namespace event") + if isDeletedFinalStateUnknownObject { + tombstone := cache.DeletedFinalStateUnknown{ + Key: nsObj.Name, + Obj: nsObj, + } + f.nsController.deleteNamespace(tombstone) + } else { + f.nsController.deleteNamespace(nsObj) + } + + if f.nsController.workqueue.Len() == 0 { + t.Logf("Delete Namespace: worker queue length is 0 ") + return + } + f.nsController.processNextWorkItem() +} + +func TestAddNamespace(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newNsFixture(t, dp) + + nsObj := newNameSpace( + "test-namespace", + "0", + map[string]string{ + "app": "test-namespace", + }, + ) + f.nsLister = append(f.nsLister, nsObj) + f.kubeobjects = append(f.kubeobjects, nsObj) + + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNsController(stopCh) + + // DPMock expect section + setsToAddNamespaceTo := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + kubeAllNamespaces, + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("app:test-namespace", ipsets.KeyValueLabelOfNamespace), + } + + dp.EXPECT().AddToLists(setsToAddNamespaceTo[1:], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(1) + + // Call into add NS + addNamespace(t, f, nsObj) + + // Cache and state validation section + testCases := []expectedNsValues{ + {1, 0}, + } + checkNsTestResult("TestAddNamespace", f, testCases) + + if _, exists := f.nsController.npmNamespaceCache.NsMap[nsObj.Name]; !exists { + t.Errorf("TestAddNamespace failed @ npMgr.nsMap check") + } +} + +func TestUpdateNamespace(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newNsFixture(t, dp) + + oldNsObj := newNameSpace( + "test-namespace", + "0", + map[string]string{ + "app": "test-namespace", + }, + ) + + newNsObj := newNameSpace( + "test-namespace", + "1", + map[string]string{ + "app": "new-test-namespace", + }, + ) + f.nsLister = append(f.nsLister, oldNsObj) + f.kubeobjects = append(f.kubeobjects, oldNsObj) + + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNsController(stopCh) + + // DPMock expect section + setsToAddNamespaceTo := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + kubeAllNamespaces, + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("app:test-namespace", ipsets.KeyValueLabelOfNamespace), + } + + dp.EXPECT().AddToLists(setsToAddNamespaceTo[1:], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + dp.EXPECT().RemoveFromList(setsToAddNamespaceTo[3], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + + setsToAddNamespaceToNew := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("app:new-test-namespace", ipsets.KeyValueLabelOfNamespace), + } + dp.EXPECT().AddToLists(setsToAddNamespaceToNew, setsToAddNamespaceTo[:1]).Return(nil).Times(1) + + // Call into update NS + updateNamespace(t, f, oldNsObj, newNsObj) + + // Cache and state validation section + testCases := []expectedNsValues{ + {1, 0}, + } + checkNsTestResult("TestUpdateNamespace", f, testCases) + + if _, exists := f.nsController.npmNamespaceCache.NsMap[newNsObj.Name]; !exists { + t.Errorf("TestUpdateNamespace failed @ npMgr.nsMap check") + } + + if !reflect.DeepEqual( + newNsObj.Labels, + f.nsController.npmNamespaceCache.NsMap[oldNsObj.Name].LabelsMap, + ) { + t.Fatalf("TestUpdateNamespace failed @ npMgr.nsMap labelMap check") + } +} + +func TestAddNamespaceLabel(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newNsFixture(t, dp) + + oldNsObj := newNameSpace( + "test-namespace", + "0", + map[string]string{ + "app": "test-namespace", + }, + ) + newNsObj := newNameSpace( + "test-namespace", + "1", + map[string]string{ + "app": "new-test-namespace", + "update": "true", + }, + ) + f.nsLister = append(f.nsLister, oldNsObj) + f.kubeobjects = append(f.kubeobjects, oldNsObj) + + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNsController(stopCh) + + // DPMock expect section + setsToAddNamespaceTo := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + kubeAllNamespaces, + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("app:test-namespace", ipsets.KeyValueLabelOfNamespace), + } + + dp.EXPECT().AddToLists(setsToAddNamespaceTo[1:], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + dp.EXPECT().RemoveFromList(setsToAddNamespaceTo[3], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + + setsToAddNamespaceToNew := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("app:new-test-namespace", ipsets.KeyValueLabelOfNamespace), + ipsets.NewIPSetMetadata("update", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("update:true", ipsets.KeyValueLabelOfNamespace), + } + for i := 0; i < len(setsToAddNamespaceToNew); i++ { + dp.EXPECT().AddToLists([]*ipsets.IPSetMetadata{setsToAddNamespaceToNew[i]}, setsToAddNamespaceTo[:1]).Return(nil).Times(1) + } + + // Call into update NS + updateNamespace(t, f, oldNsObj, newNsObj) + + testCases := []expectedNsValues{ + {1, 0}, + } + checkNsTestResult("TestAddNamespaceLabel", f, testCases) + + if _, exists := f.nsController.npmNamespaceCache.NsMap[newNsObj.Name]; !exists { + t.Errorf("TestAddNamespaceLabel failed @ nsMap check") + } + + if !reflect.DeepEqual( + newNsObj.Labels, + f.nsController.npmNamespaceCache.NsMap[oldNsObj.Name].LabelsMap, + ) { + t.Fatalf("TestAddNamespaceLabel failed @ nsMap labelMap check") + } +} + +func TestAddNamespaceLabelSameRv(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newNsFixture(t, dp) + + oldNsObj := newNameSpace( + "test-namespace", + "0", + map[string]string{ + "app": "test-namespace", + }, + ) + + newNsObj := newNameSpace( + "test-namespace", + "0", + map[string]string{ + "app": "new-test-namespace", + "update": "true", + }, + ) + f.nsLister = append(f.nsLister, oldNsObj) + f.kubeobjects = append(f.kubeobjects, oldNsObj) + + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNsController(stopCh) + + // DPMock expect section + setsToAddNamespaceTo := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + kubeAllNamespaces, + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("app:test-namespace", ipsets.KeyValueLabelOfNamespace), + } + + dp.EXPECT().AddToLists(setsToAddNamespaceTo[1:], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(1) + + updateNamespace(t, f, oldNsObj, newNsObj) + + testCases := []expectedNsValues{ + {1, 0}, + } + checkNsTestResult("TestAddNamespaceLabelSameRv", f, testCases) + + if _, exists := f.nsController.npmNamespaceCache.NsMap[newNsObj.Name]; !exists { + t.Errorf("TestAddNamespaceLabelSameRv failed @ nsMap check") + } + + if !reflect.DeepEqual( + oldNsObj.Labels, + f.nsController.npmNamespaceCache.NsMap[oldNsObj.Name].LabelsMap, + ) { + t.Fatalf("TestAddNamespaceLabelSameRv failed @ nsMap labelMap check") + } +} + +func TestDeleteandUpdateNamespaceLabel(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newNsFixture(t, dp) + + oldNsObj := newNameSpace( + "test-namespace", + "0", + map[string]string{ + "app": "old-test-namespace", + "update": "true", + "group": "test", + }, + ) + + newNsObj := newNameSpace( + "test-namespace", + "1", + map[string]string{ + "app": "old-test-namespace", + "update": "false", + }, + ) + f.nsLister = append(f.nsLister, oldNsObj) + f.kubeobjects = append(f.kubeobjects, oldNsObj) + + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNsController(stopCh) + + // DPMock expect section + setsToAddNamespaceTo := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + kubeAllNamespaces, + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("app:old-test-namespace", ipsets.KeyValueLabelOfNamespace), + ipsets.NewIPSetMetadata("update", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("update:true", ipsets.KeyValueLabelOfNamespace), + ipsets.NewIPSetMetadata("group", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("group:test", ipsets.KeyValueLabelOfNamespace), + } + + // Sometimes this UT fails because the order in which slice is created is not deterministic. + // and reflect.deepequal returns false if the order of slice is not equal. + // But we have multiple checks in following code which validate the desired behavior so using gomock.Any + // makes no difference + dp.EXPECT().AddToLists(gomock.Any(), setsToAddNamespaceTo[:1]).Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + setsToAddNamespaceToNew := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("update:false", ipsets.KeyValueLabelOfNamespace), + } + + // Remove calls + for i := 5; i < len(setsToAddNamespaceTo); i++ { + dp.EXPECT().RemoveFromList(setsToAddNamespaceTo[i], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + } + // Add calls + dp.EXPECT().AddToLists(setsToAddNamespaceToNew, setsToAddNamespaceTo[:1]).Return(nil).Times(1) + + updateNamespace(t, f, oldNsObj, newNsObj) + + testCases := []expectedNsValues{ + {1, 0}, + } + checkNsTestResult("TestDeleteandUpdateNamespaceLabel", f, testCases) + + if _, exists := f.nsController.npmNamespaceCache.NsMap[newNsObj.Name]; !exists { + t.Errorf("TestDeleteandUpdateNamespaceLabel failed @ nsMap check") + } + + if !reflect.DeepEqual( + newNsObj.Labels, + f.nsController.npmNamespaceCache.NsMap[oldNsObj.Name].LabelsMap, + ) { + t.Fatalf("TestDeleteandUpdateNamespaceLabel failed @ nsMap labelMap check") + } +} + +// TestNewNameSpaceUpdate will test the case where the key is same but the object is different. +// this happens when NSA delete event is missed and deleted from NPMLocalCache, +// but NSA gets added again. This will result in an update event with old and new with different UUIDs +func TestNewNameSpaceUpdate(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newNsFixture(t, dp) + + oldNsObj := newNameSpace( + "test-namespace", + "10", + map[string]string{ + "app": "old-test-namespace", + "update": "true", + "group": "test", + }, + ) + oldNsObj.SetUID("test1") + + newNsObj := newNameSpace( + "test-namespace", + "11", + map[string]string{ + "app": "old-test-namespace", + "update": "false", + }, + ) + f.nsLister = append(f.nsLister, oldNsObj) + f.kubeobjects = append(f.kubeobjects, oldNsObj) + + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNsController(stopCh) + newNsObj.SetUID("test2") + + // DPMock expect section + setsToAddNamespaceTo := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + kubeAllNamespaces, + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("app:old-test-namespace", ipsets.KeyValueLabelOfNamespace), + ipsets.NewIPSetMetadata("update", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("update:true", ipsets.KeyValueLabelOfNamespace), + ipsets.NewIPSetMetadata("group", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("group:test", ipsets.KeyValueLabelOfNamespace), + } + + // Sometimes this UT fails because the order in which slice is created is not deterministic. + // and reflect.deepequal returns false if the order of slice is not equal. + // But we have multiple checks in following code which validate the desired behavior so using gomock.Any + // makes no difference + dp.EXPECT().AddToLists(gomock.Any(), setsToAddNamespaceTo[:1]).Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + + setsToAddNamespaceToNew := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("update:false", ipsets.KeyValueLabelOfNamespace), + } + + // Remove calls + for i := 5; i < len(setsToAddNamespaceTo); i++ { + dp.EXPECT().RemoveFromList(setsToAddNamespaceTo[i], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + } + // Add calls + dp.EXPECT().AddToLists(setsToAddNamespaceToNew, setsToAddNamespaceTo[:1]).Return(nil).Times(1) + + updateNamespace(t, f, oldNsObj, newNsObj) + + testCases := []expectedNsValues{ + {1, 0}, + } + checkNsTestResult("TestDeleteandUpdateNamespaceLabel", f, testCases) + + if _, exists := f.nsController.npmNamespaceCache.NsMap[newNsObj.Name]; !exists { + t.Errorf("TestDeleteandUpdateNamespaceLabel failed @ nsMap check") + } + + if !reflect.DeepEqual( + newNsObj.Labels, + f.nsController.npmNamespaceCache.NsMap[oldNsObj.Name].LabelsMap, + ) { + t.Fatalf("TestDeleteandUpdateNamespaceLabel failed @ nsMap labelMap check") + } +} + +func TestDeleteNamespace(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newNsFixture(t, dp) + + nsObj := newNameSpace( + "test-namespace", + "0", + map[string]string{ + "app": "test-namespace", + }, + ) + f.nsLister = append(f.nsLister, nsObj) + f.kubeobjects = append(f.kubeobjects, nsObj) + + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNsController(stopCh) + + // DPMock expect section + setsToAddNamespaceTo := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + kubeAllNamespaces, + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("app:test-namespace", ipsets.KeyValueLabelOfNamespace), + } + + dp.EXPECT().AddToLists(setsToAddNamespaceTo[1:], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + + // Remove calls + for i := 1; i < len(setsToAddNamespaceTo); i++ { + dp.EXPECT().RemoveFromList(setsToAddNamespaceTo[i], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + } + dp.EXPECT().DeleteIPSet(setsToAddNamespaceTo[0]).Return().Times(1) + + deleteNamespace(t, f, nsObj, DeletedFinalStateknownObject) + + testCases := []expectedNsValues{ + {0, 0}, + } + checkNsTestResult("TestDeleteNamespace", f, testCases) + + if _, exists := f.nsController.npmNamespaceCache.NsMap[nsObj.Name]; exists { + t.Errorf("TestDeleteNamespace failed @ nsMap check") + } +} + +func TestDeleteNamespaceWithTombstone(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newNsFixture(t, dp) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNsController(stopCh) + + nsObj := newNameSpace( + "test-namespace", + "0", + map[string]string{ + "app": "test-namespace", + }, + ) + tombstone := cache.DeletedFinalStateUnknown{ + Key: nsObj.Name, + Obj: nsObj, + } + + f.nsController.deleteNamespace(tombstone) + + testCases := []expectedNsValues{ + {0, 1}, + } + checkNsTestResult("TestDeleteNamespaceWithTombstone", f, testCases) +} + +func TestDeleteNamespaceWithTombstoneAfterAddingNameSpace(t *testing.T) { + nsObj := newNameSpace( + "test-namespace", + "0", + map[string]string{ + "app": "test-namespace", + }, + ) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newNsFixture(t, dp) + f.nsLister = append(f.nsLister, nsObj) + f.kubeobjects = append(f.kubeobjects, nsObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNsController(stopCh) + + // DPMock expect section + setsToAddNamespaceTo := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + kubeAllNamespaces, + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata("app:test-namespace", ipsets.KeyValueLabelOfNamespace), + } + + dp.EXPECT().AddToLists(setsToAddNamespaceTo[1:], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + + // Remove calls + for i := 1; i < len(setsToAddNamespaceTo); i++ { + dp.EXPECT().RemoveFromList(setsToAddNamespaceTo[i], setsToAddNamespaceTo[:1]).Return(nil).Times(1) + } + dp.EXPECT().DeleteIPSet(setsToAddNamespaceTo[0]).Return().Times(1) + + deleteNamespace(t, f, nsObj, DeletedFinalStateUnknownObject) + testCases := []expectedNsValues{ + {0, 0}, + } + checkNsTestResult("TestDeleteNamespaceWithTombstoneAfterAddingNameSpace", f, testCases) +} + +func TestIsSystemNs(t *testing.T) { + nsObj := newNameSpace("kube-system", "0", map[string]string{"test": "new"}) + + if !isSystemNs(nsObj) { + t.Errorf("TestIsSystemNs failed @ nsObj isSystemNs check") + } +} + +func checkNsTestResult(testName string, f *nameSpaceFixture, testCases []expectedNsValues) { + for _, test := range testCases { + if got := len(f.nsController.npmNamespaceCache.NsMap); got != test.expectedLenOfNsMap { + f.t.Errorf("Test: %s, NsMap length = %d, want %d. Map: %+v", + testName, got, test.expectedLenOfNsMap, f.nsController.npmNamespaceCache.NsMap) + } + if got := f.nsController.workqueue.Len(); got != test.expectedLenOfWorkQueue { + f.t.Errorf("Test: %s, Workqueue length = %d, want %d", testName, got, test.expectedLenOfWorkQueue) + } + } +} + +func TestNSMapMarshalJSON(t *testing.T) { + npmNSCache := &NpmNamespaceCache{NsMap: make(map[string]*Namespace)} + nsName := "ns-test" + ns := &Namespace{ + name: nsName, + LabelsMap: map[string]string{ + "test-key": "test-value", + }, + } + + npmNSCache.NsMap[nsName] = ns + nsMapRaw, err := npmNSCache.MarshalJSON() + require.NoError(t, err) + + expect := []byte(`{"ns-test":{"LabelsMap":{"test-key":"test-value"}}}`) + assert.ElementsMatch(t, expect, nsMapRaw) +} + +func isSystemNs(nsObj *corev1.Namespace) bool { + return nsObj.ObjectMeta.Name == util.KubeSystemFlag +} diff --git a/npm/pkg/controlplane/controllers/v2/namespacecontroller.go b/npm/pkg/controlplane/controllers/v2/namespacecontroller.go index 049ff980f2..76917e8d1f 100644 --- a/npm/pkg/controlplane/controllers/v2/namespacecontroller.go +++ b/npm/pkg/controlplane/controllers/v2/namespacecontroller.go @@ -120,7 +120,7 @@ func (nsc *NamespaceController) needSync(obj interface{}, event string) (string, var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { utilruntime.HandleError(err) - metrics.SendErrorLogAndMetric(util.NSID, "[NAMESPACE %s EVENT] Error: NamespaceKey is empty for %s namespace", event, util.GetNSNameWithPrefix(nsObj.Name)) + metrics.SendErrorLogAndMetric(util.NSID, "[NAMESPACE %s EVENT] Error: NamespaceKey is empty for %s namespace", event, nsObj.Name) return key, needSync } @@ -181,7 +181,7 @@ func (nsc *NamespaceController) deleteNamespace(obj interface{}) { var key string if key, err = cache.MetaNamespaceKeyFunc(nsObj); err != nil { utilruntime.HandleError(err) - metrics.SendErrorLogAndMetric(util.NSID, "[NAMESPACE DELETE EVENT] Error: nameSpaceKey is empty for %s namespace", util.GetNSNameWithPrefix(nsObj.Name)) + metrics.SendErrorLogAndMetric(util.NSID, "[NAMESPACE DELETE EVENT] Error: nameSpaceKey is empty for %s namespace", nsObj.Name) return } @@ -249,10 +249,9 @@ func (nsc *NamespaceController) processNextWorkItem() bool { } // syncNamespace compares the actual state with the desired, and attempts to converge the two. -func (nsc *NamespaceController) syncNamespace(key string) error { +func (nsc *NamespaceController) syncNamespace(nsKey string) error { // Get the Namespace resource with this key - nsObj, err := nsc.nameSpaceLister.Get(key) - cachedNsKey := util.GetNSNameWithPrefix(key) + nsObj, err := nsc.nameSpaceLister.Get(nsKey) // apply dataplane after syncing defer func() { @@ -267,10 +266,10 @@ func (nsc *NamespaceController) syncNamespace(key string) error { defer nsc.npmNamespaceCache.Unlock() if err != nil { if k8serrors.IsNotFound(err) { - klog.Infof("Namespace %s not found, may be it is deleted", key) + klog.Infof("Namespace %s not found, may be it is deleted", nsKey) // cleanDeletedNamespace will check if the NS exists in cache, if it does, then proceeds with deletion // if it does not exists, then event will be no-op - err = nsc.cleanDeletedNamespace(cachedNsKey) + err = nsc.cleanDeletedNamespace(nsKey) if err != nil { // need to retry this cleaning-up process metrics.SendErrorLogAndMetric(util.NSID, "Error: %v when namespace is not found", err) @@ -281,13 +280,13 @@ func (nsc *NamespaceController) syncNamespace(key string) error { } if nsObj.DeletionTimestamp != nil || nsObj.DeletionGracePeriodSeconds != nil { - return nsc.cleanDeletedNamespace(cachedNsKey) + return nsc.cleanDeletedNamespace(nsKey) } - cachedNsObj, nsExists := nsc.npmNamespaceCache.NsMap[cachedNsKey] + cachedNsObj, nsExists := nsc.npmNamespaceCache.NsMap[nsKey] if nsExists { if reflect.DeepEqual(cachedNsObj.LabelsMap, nsObj.ObjectMeta.Labels) { - klog.Infof("[NAMESPACE UPDATE EVENT] Namespace [%s] labels did not change", key) + klog.Infof("[NAMESPACE UPDATE EVENT] Namespace [%s] labels did not change", nsKey) return nil } } @@ -304,20 +303,18 @@ func (nsc *NamespaceController) syncNamespace(key string) error { // syncAddNamespace handles adding namespace to ipset. func (nsc *NamespaceController) syncAddNamespace(nsObj *corev1.Namespace) error { namespaceSets := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(nsObj.ObjectMeta.Name, ipsets.Namespace)} - setsToAddNamespaceTo := []*ipsets.IPSetMetadata{} - - namespaceSets = append(namespaceSets, &ipsets.IPSetMetadata{Name: nsObj.ObjectMeta.Name, Type: ipsets.Namespace}) - setsToAddNamespaceTo = append(setsToAddNamespaceTo, kubeAllNamespaces) + setsToAddNamespaceTo := []*ipsets.IPSetMetadata{kubeAllNamespaces} npmNs := newNs(nsObj.ObjectMeta.Name) nsc.npmNamespaceCache.NsMap[nsObj.ObjectMeta.Name] = npmNs // Add the namespace to its label's ipset list. for nsLabelKey, nsLabelVal := range nsObj.ObjectMeta.Labels { - klog.Infof("Adding namespace %s to ipset list %s", nsObj.ObjectMeta.Name, nsLabelKey) + nsLabelKeyValue := util.GetIpSetFromLabelKV(nsLabelKey, nsLabelVal) + klog.Infof("Adding namespace %s to ipset list %s and %s", nsObj.ObjectMeta.Name, nsLabelKey, nsLabelKeyValue) labelIPSets := []*ipsets.IPSetMetadata{ - {Name: nsLabelKey, Type: ipsets.Namespace}, - {Name: util.GetIpSetFromLabelKV(nsLabelKey, nsLabelVal), Type: ipsets.Namespace}, + ipsets.NewIPSetMetadata(nsLabelKey, ipsets.KeyLabelOfNamespace), + ipsets.NewIPSetMetadata(nsLabelKeyValue, ipsets.KeyValueLabelOfNamespace), } setsToAddNamespaceTo = append(setsToAddNamespaceTo, labelIPSets...) @@ -326,8 +323,6 @@ func (nsc *NamespaceController) syncAddNamespace(nsObj *corev1.Namespace) error npmNs.appendLabels(map[string]string{nsLabelKey: nsLabelVal}, appendToExistingLabels) } - nsc.dp.CreateIPSets(append(namespaceSets, setsToAddNamespaceTo...)) - if err := nsc.dp.AddToLists(setsToAddNamespaceTo, namespaceSets); err != nil { return fmt.Errorf("failed to sync add namespace with error %w", err) } @@ -338,7 +333,7 @@ func (nsc *NamespaceController) syncAddNamespace(nsObj *corev1.Namespace) error // syncUpdateNamespace handles updating namespace in ipset. func (nsc *NamespaceController) syncUpdateNamespace(newNsObj *corev1.Namespace) error { var err error - newNsName, newNsLabel := util.GetNSNameWithPrefix(newNsObj.ObjectMeta.Name), newNsObj.ObjectMeta.Labels + newNsName, newNsLabel := newNsObj.ObjectMeta.Name, newNsObj.ObjectMeta.Labels klog.Infof("NAMESPACE UPDATING:\n namespace: [%s/%v]", newNsName, newNsLabel) // If previous syncAddNamespace failed for some reasons @@ -359,14 +354,17 @@ func (nsc *NamespaceController) syncUpdateNamespace(newNsObj *corev1.Namespace) addToIPSets, deleteFromIPSets := util.GetIPSetListCompareLabels(curNsObj.LabelsMap, newNsLabel) // Delete the namespace from its label's ipset list. for _, nsLabelVal := range deleteFromIPSets { - labelKey := util.GetNSNameWithPrefix(nsLabelVal) - - labelKeySet := &ipsets.IPSetMetadata{Name: nsLabelVal, Type: ipsets.KeyLabelOfNamespace} - toBeAdded := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(newNsName, ipsets.Namespace)} + var labelSet *ipsets.IPSetMetadata + if util.IsKeyValueLabelSetName(nsLabelVal) { + labelSet = ipsets.NewIPSetMetadata(nsLabelVal, ipsets.KeyValueLabelOfNamespace) + } else { + labelSet = ipsets.NewIPSetMetadata(nsLabelVal, ipsets.KeyLabelOfNamespace) + } + toBeRemoved := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(newNsName, ipsets.Namespace)} - klog.Infof("Deleting namespace %s from ipset list %s", newNsName, labelKey) - if err = nsc.dp.RemoveFromList(labelKeySet, toBeAdded); err != nil { - metrics.SendErrorLogAndMetric(util.NSID, "[UpdateNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", newNsName, labelKey, err) + klog.Infof("Deleting namespace %s from ipset list %s", newNsName, nsLabelVal) + if err = nsc.dp.RemoveFromList(labelSet, toBeRemoved); err != nil { + metrics.SendErrorLogAndMetric(util.NSID, "[UpdateNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", newNsName, nsLabelVal, err) return fmt.Errorf("failed to remove from list during sync update namespace with err %w", err) } // {IMPORTANT} The order of compared list will be key and then key+val. NPM should only append after both key @@ -382,12 +380,17 @@ func (nsc *NamespaceController) syncUpdateNamespace(newNsObj *corev1.Namespace) for _, nsLabelVal := range addToIPSets { klog.Infof("Adding namespace %s to ipset list %s", newNsName, nsLabelVal) - labelKeySet := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(nsLabelVal, ipsets.KeyLabelOfNamespace)} + var labelSet []*ipsets.IPSetMetadata + if util.IsKeyValueLabelSetName(nsLabelVal) { + labelSet = []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(nsLabelVal, ipsets.KeyValueLabelOfNamespace)} + } else { + labelSet = []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(nsLabelVal, ipsets.KeyLabelOfNamespace)} + } toBeAdded := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(newNsName, ipsets.Namespace)} - if err = nsc.dp.AddToLists(labelKeySet, toBeAdded); err != nil { + if err = nsc.dp.AddToLists(labelSet, toBeAdded); err != nil { metrics.SendErrorLogAndMetric(util.NSID, "[UpdateNamespace] Error: failed to add namespace %s to ipset list %s with err: %v", newNsName, nsLabelVal, err) - return fmt.Errorf("failed to add %v sets to %v lists during addtolists in sync update namespace with err %w", toBeAdded, labelKeySet, err) + return fmt.Errorf("failed to add %v sets to %v lists during addtolists in sync update namespace with err %w", toBeAdded, labelSet, err) } // {IMPORTANT} Same as above order is assumed to be key and then key+val. NPM should only append to existing labels // only after both ipsets for a given label's key value pair are added successfully @@ -417,25 +420,21 @@ func (nsc *NamespaceController) cleanDeletedNamespace(cachedNsKey string) error klog.Infof("NAMESPACE DELETING cached labels: [%s/%v]", cachedNsKey, cachedNsObj.LabelsMap) var err error + toBeDeletedNs := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(cachedNsKey, ipsets.Namespace)} // Delete the namespace from its label's ipset list. for nsLabelKey, nsLabelVal := range cachedNsObj.LabelsMap { - labelKey := &ipsets.IPSetMetadata{Name: nsLabelKey, Type: ipsets.KeyLabelOfNamespace} - toBeDeletedKey := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(cachedNsKey, ipsets.KeyLabelOfNamespace)} - - labelIpsetName := util.GetNSNameWithPrefix(nsLabelKey) - klog.Infof("Deleting namespace %s from ipset list %s", cachedNsKey, labelIpsetName) - if err = nsc.dp.RemoveFromList(labelKey, toBeDeletedKey); err != nil { - metrics.SendErrorLogAndMetric(util.NSID, "[DeleteNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", cachedNsKey, labelIpsetName, err) + labelKey := ipsets.NewIPSetMetadata(nsLabelKey, ipsets.KeyLabelOfNamespace) + klog.Infof("Deleting namespace %s from ipset list %s", cachedNsKey, labelKey) + if err = nsc.dp.RemoveFromList(labelKey, toBeDeletedNs); err != nil { + metrics.SendErrorLogAndMetric(util.NSID, "[DeleteNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", cachedNsKey, labelKey, err) return fmt.Errorf("failed to clean deleted namespace when deleting key with err %w", err) } - labelKeyValue := &ipsets.IPSetMetadata{Name: nsLabelKey, Type: ipsets.KeyValueLabelOfNamespace} - toBeDeletedKeyValue := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(cachedNsKey, ipsets.KeyValueLabelOfNamespace)} - - labelIpsetName = util.GetNSNameWithPrefix(util.GetIpSetFromLabelKV(nsLabelKey, nsLabelVal)) + labelIpsetName := util.GetIpSetFromLabelKV(nsLabelKey, nsLabelVal) + labelKeyValue := ipsets.NewIPSetMetadata(labelIpsetName, ipsets.KeyValueLabelOfNamespace) klog.Infof("Deleting namespace %s from ipset list %s", cachedNsKey, labelIpsetName) - if err = nsc.dp.RemoveFromList(labelKeyValue, toBeDeletedKeyValue); err != nil { + if err = nsc.dp.RemoveFromList(labelKeyValue, toBeDeletedNs); err != nil { metrics.SendErrorLogAndMetric(util.NSID, "[DeleteNamespace] Error: failed to delete namespace %s from ipset list %s with err: %v", cachedNsKey, labelIpsetName, err) return fmt.Errorf("failed to clean deleted namespace when deleting key value with err %w", err) } @@ -444,7 +443,7 @@ func (nsc *NamespaceController) cleanDeletedNamespace(cachedNsKey string) error cachedNsObj.removeLabelsWithKey(nsLabelKey) } - allNamespacesSet := &ipsets.IPSetMetadata{Name: util.KubeAllNamespacesFlag, Type: ipsets.Namespace} + allNamespacesSet := ipsets.NewIPSetMetadata(util.KubeAllNamespacesFlag, ipsets.KeyLabelOfNamespace) toBeDeletedCachedKey := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(cachedNsKey, ipsets.Namespace)} // Delete the namespace from all-namespace ipset list. diff --git a/npm/pkg/controlplane/controllers/v2/networkPolicyController.go b/npm/pkg/controlplane/controllers/v2/networkPolicyController.go new file mode 100644 index 0000000000..0227d1468e --- /dev/null +++ b/npm/pkg/controlplane/controllers/v2/networkPolicyController.go @@ -0,0 +1,297 @@ +// Copyright 2018 Microsoft. All rights reserved. +// MIT License +package controllers + +import ( + "errors" + "fmt" + "reflect" + "time" + + "github.com/Azure/azure-container-networking/npm/metrics" + "github.com/Azure/azure-container-networking/npm/pkg/controlplane/translation" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "github.com/Azure/azure-container-networking/npm/util" + networkingv1 "k8s.io/api/networking/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + networkinginformers "k8s.io/client-go/informers/networking/v1" + netpollister "k8s.io/client-go/listers/networking/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +var errNetPolKeyFormat = errors.New("invalid network policy key format") + +type NetworkPolicyController struct { + netPolLister netpollister.NetworkPolicyLister + workqueue workqueue.RateLimitingInterface + rawNpSpecMap map[string]*networkingv1.NetworkPolicySpec // Key is / + dp dataplane.GenericDataplane +} + +func NewNetworkPolicyController(npInformer networkinginformers.NetworkPolicyInformer, dp dataplane.GenericDataplane) *NetworkPolicyController { + netPolController := &NetworkPolicyController{ + netPolLister: npInformer.Lister(), + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "NetworkPolicy"), + rawNpSpecMap: make(map[string]*networkingv1.NetworkPolicySpec), + dp: dp, + } + + npInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: netPolController.addNetworkPolicy, + UpdateFunc: netPolController.updateNetworkPolicy, + DeleteFunc: netPolController.deleteNetworkPolicy, + }, + ) + return netPolController +} + +func (c *NetworkPolicyController) LengthOfRawNpMap() int { + return len(c.rawNpSpecMap) +} + +// getNetworkPolicyKey returns namespace/name of network policy object if it is valid network policy object and has valid namespace/name. +// If not, it returns error. +func (c *NetworkPolicyController) getNetworkPolicyKey(obj interface{}) (string, error) { + var key string + _, ok := obj.(*networkingv1.NetworkPolicy) + if !ok { + return key, fmt.Errorf("cannot cast obj (%v) to network policy obj err: %w", obj, errNetPolKeyFormat) + } + + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + return key, fmt.Errorf("error due to %w", err) + } + + return key, nil +} + +func (c *NetworkPolicyController) addNetworkPolicy(obj interface{}) { + netPolkey, err := c.getNetworkPolicyKey(obj) + if err != nil { + utilruntime.HandleError(err) + return + } + + c.workqueue.Add(netPolkey) +} + +func (c *NetworkPolicyController) updateNetworkPolicy(old, newnetpol interface{}) { + netPolkey, err := c.getNetworkPolicyKey(newnetpol) + if err != nil { + utilruntime.HandleError(err) + return + } + + // new network policy object is already checked validation by calling getNetworkPolicyKey function. + newNetPol, _ := newnetpol.(*networkingv1.NetworkPolicy) + oldNetPol, ok := old.(*networkingv1.NetworkPolicy) + if ok { + if oldNetPol.ResourceVersion == newNetPol.ResourceVersion { + // Periodic resync will send update events for all known network plicies. + // Two different versions of the same network policy will always have different RVs. + return + } + } + + c.workqueue.Add(netPolkey) +} + +func (c *NetworkPolicyController) deleteNetworkPolicy(obj interface{}) { + netPolObj, ok := obj.(*networkingv1.NetworkPolicy) + // DeleteFunc gets the final state of the resource (if it is known). + // Otherwise, it gets an object of type DeletedFinalStateUnknown. + // This can happen if the watch is closed and misses the delete event and + // the controller doesn't notice the deletion until the subsequent re-list + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + metrics.SendErrorLogAndMetric(util.NetpolID, "[NETPOL DELETE EVENT] Received unexpected object type: %v", obj) + return + } + + if netPolObj, ok = tombstone.Obj.(*networkingv1.NetworkPolicy); !ok { + metrics.SendErrorLogAndMetric(util.NetpolID, "[NETPOL DELETE EVENT] Received unexpected object type (error decoding object tombstone, invalid type): %v", obj) + return + } + } + + var netPolkey string + var err error + if netPolkey, err = cache.MetaNamespaceKeyFunc(netPolObj); err != nil { + utilruntime.HandleError(err) + return + } + + c.workqueue.Add(netPolkey) +} + +func (c *NetworkPolicyController) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.workqueue.ShutDown() + + klog.Infof("Starting Network Policy worker") + go wait.Until(c.runWorker, time.Second, stopCh) + + klog.Infof("Started Network Policy worker") + <-stopCh + klog.Info("Shutting down Network Policy workers") +} + +func (c *NetworkPolicyController) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *NetworkPolicyController) processNextWorkItem() bool { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer c.workqueue.Done(obj) + var key string + var ok bool + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.workqueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v, err %w", obj, errWorkqueueFormatting)) + return nil + } + // Run the syncNetPol, passing it the namespace/name string of the + // network policy resource to be synced. + if err := c.syncNetPol(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.workqueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %w, requeuing", key, err) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(obj) + klog.Infof("Successfully synced '%s'", key) + return nil + }(obj) + if err != nil { + utilruntime.HandleError(err) + metrics.SendErrorLogAndMetric(util.NetpolID, "syncNetPol error due to %v", err) + return true + } + + return true +} + +// syncNetPol compares the actual state with the desired, and attempts to converge the two. +func (c *NetworkPolicyController) syncNetPol(key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s err: %w", key, errNetPolKeyFormat)) + return nil //nolint HandleError is used instead of returning error to caller + } + + // Get the network policy resource with this namespace/name + netPolObj, err := c.netPolLister.NetworkPolicies(namespace).Get(name) + if err != nil { + if k8serrors.IsNotFound(err) { + klog.Infof("Network Policy %s is not found, may be it is deleted", key) + // netPolObj is not found, but should need to check the RawNpMap cache with key. + // cleanUpNetworkPolicy method will take care of the deletion of a cached network policy if the cached network policy exists with key in our RawNpMap cache. + err = c.cleanUpNetworkPolicy(key) + if err != nil { + return fmt.Errorf("[syncNetPol] error: %w when network policy is not found", err) + } + return err + } + return err + } + + // If DeletionTimestamp of the netPolObj is set, start cleaning up lastly applied states. + // This is early cleaning up process from updateNetPol event + if netPolObj.ObjectMeta.DeletionTimestamp != nil || netPolObj.ObjectMeta.DeletionGracePeriodSeconds != nil { + err = c.cleanUpNetworkPolicy(key) + if err != nil { + return fmt.Errorf("error: %w when ObjectMeta.DeletionTimestamp field is set", err) + } + return nil + } + + cachedNetPolSpecObj, netPolExists := c.rawNpSpecMap[key] + if netPolExists { + // if network policy does not have different states against lastly applied states stored in cachedNetPolObj, + // netPolController does not need to reconcile this update. + // In this updateNetworkPolicy event, + // newNetPol was updated with states which netPolController does not need to reconcile. + if reflect.DeepEqual(cachedNetPolSpecObj, &netPolObj.Spec) { + return nil + } + } + + err = c.syncAddAndUpdateNetPol(netPolObj) + if err != nil { + return fmt.Errorf("[syncNetPol] error due to %w", err) + } + + return nil +} + +// syncAddAndUpdateNetPol handles a new network policy or an updated network policy object triggered by add and update events +func (c *NetworkPolicyController) syncAddAndUpdateNetPol(netPolObj *networkingv1.NetworkPolicy) error { + prometheusTimer := metrics.StartNewTimer() + defer metrics.RecordPolicyExecTime(prometheusTimer) // record execution time regardless of failure + + var err error + netpolKey, err := cache.MetaNamespaceKeyFunc(netPolObj) + if err != nil { + return fmt.Errorf("[syncAddAndUpdateNetPol] Error: while running MetaNamespaceKeyFunc err: %w", err) + } + + // install translated rules into kernel + npmNetPolObj := translation.TranslatePolicy(netPolObj) + // install translated rules into Dataplane + // DP update policy call will check if this policy already exists in kernel + // if yes: then will delete old rules and program new rules + // if no: then will program add new rules + err = c.dp.UpdatePolicy(npmNetPolObj) + if err != nil { + // if error occurred the key is re-queued in workqueue and process this function again, + // which eventually meets desired states of network policy + return fmt.Errorf("[syncAddAndUpdateNetPol] Error: failed to update translated NPMNetworkPolicy into Dataplane due to %w", err) + } + + _, ok := c.rawNpSpecMap[netpolKey] + if !ok { + // inc metric for NumPolicies only if it a new network policy + metrics.IncNumPolicies() + } + + c.rawNpSpecMap[netpolKey] = &netPolObj.Spec + return nil +} + +// DeleteNetworkPolicy handles deleting network policy based on netPolKey. +func (c *NetworkPolicyController) cleanUpNetworkPolicy(netPolKey string) error { + _, cachedNetPolObjExists := c.rawNpSpecMap[netPolKey] + // if there is no applied network policy with the netPolKey, do not need to clean up process. + if !cachedNetPolObjExists { + return nil + } + + err := c.dp.RemovePolicy(netPolKey) + if err != nil { + return fmt.Errorf("[cleanUpNetworkPolicy] Error: failed to remove policy due to %w", err) + } + + // Success to clean up ipset and iptables operations in kernel and delete the cached network policy from RawNpMap + delete(c.rawNpSpecMap, netPolKey) + metrics.DecNumPolicies() + return nil +} diff --git a/npm/pkg/controlplane/controllers/v2/networkPolicyController_test.go b/npm/pkg/controlplane/controllers/v2/networkPolicyController_test.go new file mode 100644 index 0000000000..6c6ac6d6f6 --- /dev/null +++ b/npm/pkg/controlplane/controllers/v2/networkPolicyController_test.go @@ -0,0 +1,416 @@ +// Copyright 2018 Microsoft. All rights reserved. +// MIT License +package controllers + +import ( + "fmt" + "strconv" + "testing" + + "github.com/Azure/azure-container-networking/npm/metrics" + "github.com/Azure/azure-container-networking/npm/metrics/promutil" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + dpmocks "github.com/Azure/azure-container-networking/npm/pkg/dataplane/mocks" + gomock "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + kubeinformers "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" +) + +type netPolFixture struct { + t *testing.T + + // Objects to put in the store. + netPolLister []*networkingv1.NetworkPolicy + // Objects from here preloaded into NewSimpleFake. + kubeobjects []runtime.Object + + netPolController *NetworkPolicyController + kubeInformer kubeinformers.SharedInformerFactory +} + +func newNetPolFixture(t *testing.T) *netPolFixture { + f := &netPolFixture{ + t: t, + netPolLister: []*networkingv1.NetworkPolicy{}, + kubeobjects: []runtime.Object{}, + } + return f +} + +func (f *netPolFixture) newNetPolController(_ chan struct{}, dp dataplane.GenericDataplane) { + kubeclient := k8sfake.NewSimpleClientset(f.kubeobjects...) + f.kubeInformer = kubeinformers.NewSharedInformerFactory(kubeclient, noResyncPeriodFunc()) + + f.netPolController = NewNetworkPolicyController(f.kubeInformer.Networking().V1().NetworkPolicies(), dp) + + for _, netPol := range f.netPolLister { + err := f.kubeInformer.Networking().V1().NetworkPolicies().Informer().GetIndexer().Add(netPol) + if err != nil { + f.t.Errorf("Failed to add network policy %s to shared informer cache: %v", netPol.Name, err) + } + } + + // Do not start informer to avoid unnecessary event triggers + // (TODO): Leave stopCh and below commented code to enhance UTs to even check event triggers as well later if possible + // f.kubeInformer.Start(stopCh) +} + +// (TODO): make createNetPol flexible +func createNetPol() *networkingv1.NetworkPolicy { + tcp := corev1.ProtocolTCP + port8000 := intstr.FromInt(8000) + return &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "allow-ingress", + Namespace: "test-nwpolicy", + }, + Spec: networkingv1.NetworkPolicySpec{ + Ingress: []networkingv1.NetworkPolicyIngressRule{ + { + From: []networkingv1.NetworkPolicyPeer{ + { + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "test"}, + }, + }, + { + IPBlock: &networkingv1.IPBlock{ + CIDR: "0.0.0.0/0", + }, + }, + }, + Ports: []networkingv1.NetworkPolicyPort{{ + Protocol: &tcp, + Port: &port8000, + }}, + }, + }, + Egress: []networkingv1.NetworkPolicyEgressRule{ + { + To: []networkingv1.NetworkPolicyPeer{{ + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "test"}, + }, + }}, + Ports: []networkingv1.NetworkPolicyPort{{ + Protocol: &tcp, + Port: &intstr.IntOrString{StrVal: "8000"}, // namedPort + }}, + }, + }, + }, + } +} + +func addNetPol(f *netPolFixture, netPolObj *networkingv1.NetworkPolicy) { + // simulate "network policy" add event and add network policy object to sharedInformer cache + f.netPolController.addNetworkPolicy(netPolObj) + + if f.netPolController.workqueue.Len() == 0 { + return + } + + f.netPolController.processNextWorkItem() +} + +func deleteNetPol(t *testing.T, f *netPolFixture, netPolObj *networkingv1.NetworkPolicy, isDeletedFinalStateUnknownObject IsDeletedFinalStateUnknownObject) { + addNetPol(f, netPolObj) + t.Logf("Complete adding network policy event") + + // simulate network policy deletion event and delete network policy object from sharedInformer cache + err := f.kubeInformer.Networking().V1().NetworkPolicies().Informer().GetIndexer().Delete(netPolObj) + if err != nil { + f.t.Errorf("Failed to delete network policy %s to shared informer cache: %v", netPolObj.Name, err) + } + if isDeletedFinalStateUnknownObject { + netPolKey := getKey(netPolObj, t) + tombstone := cache.DeletedFinalStateUnknown{ + Key: netPolKey, + Obj: netPolObj, + } + f.netPolController.deleteNetworkPolicy(tombstone) + } else { + f.netPolController.deleteNetworkPolicy(netPolObj) + } + + if f.netPolController.workqueue.Len() == 0 { + return + } + + f.netPolController.processNextWorkItem() +} + +func updateNetPol(t *testing.T, f *netPolFixture, oldNetPolObj, netNetPolObj *networkingv1.NetworkPolicy) { + addNetPol(f, oldNetPolObj) + t.Logf("Complete adding network policy event") + + // simulate network policy update event and update the network policy to shared informer's cache + err := f.kubeInformer.Networking().V1().NetworkPolicies().Informer().GetIndexer().Update(netNetPolObj) + if err != nil { + f.t.Errorf("Failed to update network policy %s to shared informer cache: %v", netNetPolObj.Name, err) + } + f.netPolController.updateNetworkPolicy(oldNetPolObj, netNetPolObj) + + if f.netPolController.workqueue.Len() == 0 { + return + } + + f.netPolController.processNextWorkItem() +} + +type expectedNetPolValues struct { + expectedLenOfRawNpMap int + expectedLenOfWorkQueue int + // prometheus metrics + expectedNumPolicies int + expectedExecCount int +} + +func checkNetPolTestResult(testName string, f *netPolFixture, testCases []expectedNetPolValues) { + for _, test := range testCases { + if got := f.netPolController.LengthOfRawNpMap(); got != test.expectedLenOfRawNpMap { + f.t.Errorf("Test: %s, Raw NetPol Map length = %d, want %d", testName, got, test.expectedLenOfRawNpMap) + } + + if got := f.netPolController.workqueue.Len(); got != test.expectedLenOfWorkQueue { + f.t.Errorf("Test: %s, Workqueue length = %d, want %d", testName, got, test.expectedLenOfWorkQueue) + } + + testPrometheusMetrics(f.t, test.expectedNumPolicies, test.expectedExecCount) + } +} + +func resetPrometheusAndGetExecCount(t *testing.T) int { + metrics.ResetNumPolicies() + execCount, err := metrics.GetPolicyExecCount() + promutil.NotifyIfErrors(t, err) + return execCount +} + +func testPrometheusMetrics(t *testing.T, expectedNumPolicies, expectedExecCount int) { + numPolicies, err := metrics.GetNumPolicies() + promutil.NotifyIfErrors(t, err) + if numPolicies != expectedNumPolicies { + require.FailNowf(t, "", "Number of policies didn't register correctly in Prometheus. Expected %d. Got %d.", expectedNumPolicies, numPolicies) + } + + execCount, err := metrics.GetPolicyExecCount() + promutil.NotifyIfErrors(t, err) + if execCount != expectedExecCount { + require.FailNowf(t, "", "Count for execution time didn't register correctly in Prometheus. Expected %d. Got %d.", expectedExecCount, execCount) + } +} + +func TestAddMultipleNetworkPolicies(t *testing.T) { + netPolObj1 := createNetPol() + + // deep copy netPolObj1 and change namespace, name, and porttype (to namedPort) since current createNetPol is not flexble. + netPolObj2 := netPolObj1.DeepCopy() + netPolObj2.Namespace = fmt.Sprintf("%s-new", netPolObj1.Namespace) + netPolObj2.Name = fmt.Sprintf("%s-new", netPolObj1.Name) + // namedPort + netPolObj2.Spec.Ingress[0].Ports[0].Port = &intstr.IntOrString{StrVal: netPolObj2.Name} + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, netPolObj1, netPolObj2) + f.kubeobjects = append(f.kubeobjects, netPolObj1, netPolObj2) + stopCh := make(chan struct{}) + defer close(stopCh) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f.newNetPolController(stopCh, dp) + + dp.EXPECT().UpdatePolicy(gomock.Any()).Times(2) + + execCount := resetPrometheusAndGetExecCount(f.t) + + addNetPol(f, netPolObj1) + addNetPol(f, netPolObj2) + + testCases := []expectedNetPolValues{ + {2, 0, 2, execCount + 2}, + } + checkNetPolTestResult("TestAddMultipleNetPols", f, testCases) +} + +func TestAddNetworkPolicy(t *testing.T) { + netPolObj := createNetPol() + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, netPolObj) + f.kubeobjects = append(f.kubeobjects, netPolObj) + stopCh := make(chan struct{}) + defer close(stopCh) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f.newNetPolController(stopCh, dp) + + execCount := resetPrometheusAndGetExecCount(f.t) + dp.EXPECT().UpdatePolicy(gomock.Any()).Times(1) + + addNetPol(f, netPolObj) + testCases := []expectedNetPolValues{ + {1, 0, 1, execCount + 1}, + } + + checkNetPolTestResult("TestAddNetPol", f, testCases) +} + +func TestDeleteNetworkPolicy(t *testing.T) { + netPolObj := createNetPol() + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, netPolObj) + f.kubeobjects = append(f.kubeobjects, netPolObj) + stopCh := make(chan struct{}) + defer close(stopCh) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f.newNetPolController(stopCh, dp) + + execCount := resetPrometheusAndGetExecCount(f.t) + dp.EXPECT().UpdatePolicy(gomock.Any()).Times(1) + dp.EXPECT().RemovePolicy(gomock.Any()).Times(1) + + deleteNetPol(t, f, netPolObj, DeletedFinalStateknownObject) + testCases := []expectedNetPolValues{ + {0, 0, 0, execCount + 1}, + } + checkNetPolTestResult("TestDelNetPol", f, testCases) +} + +func TestDeleteNetworkPolicyWithTombstone(t *testing.T) { + netPolObj := createNetPol() + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, netPolObj) + f.kubeobjects = append(f.kubeobjects, netPolObj) + stopCh := make(chan struct{}) + defer close(stopCh) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f.newNetPolController(stopCh, dp) + + execCount := resetPrometheusAndGetExecCount(f.t) + + netPolKey := getKey(netPolObj, t) + tombstone := cache.DeletedFinalStateUnknown{ + Key: netPolKey, + Obj: netPolObj, + } + + f.netPolController.deleteNetworkPolicy(tombstone) + testCases := []expectedNetPolValues{ + {0, 1, 0, execCount}, + } + checkNetPolTestResult("TestDeleteNetworkPolicyWithTombstone", f, testCases) +} + +func TestDeleteNetworkPolicyWithTombstoneAfterAddingNetworkPolicy(t *testing.T) { + netPolObj := createNetPol() + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, netPolObj) + f.kubeobjects = append(f.kubeobjects, netPolObj) + stopCh := make(chan struct{}) + defer close(stopCh) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f.newNetPolController(stopCh, dp) + + execCount := resetPrometheusAndGetExecCount(f.t) + dp.EXPECT().UpdatePolicy(gomock.Any()).Times(1) + dp.EXPECT().RemovePolicy(gomock.Any()).Times(1) + + deleteNetPol(t, f, netPolObj, DeletedFinalStateUnknownObject) + testCases := []expectedNetPolValues{ + {0, 0, 0, execCount + 1}, + } + checkNetPolTestResult("TestDeleteNetworkPolicyWithTombstoneAfterAddingNetworkPolicy", f, testCases) +} + +// this unit test is for the case where states of network policy are changed, but network policy controller does not need to reconcile. +// Check it with expectedEnqueueEventIntoWorkQueue variable. +func TestUpdateNetworkPolicy(t *testing.T) { + oldNetPolObj := createNetPol() + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, oldNetPolObj) + f.kubeobjects = append(f.kubeobjects, oldNetPolObj) + stopCh := make(chan struct{}) + defer close(stopCh) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f.newNetPolController(stopCh, dp) + + execCount := resetPrometheusAndGetExecCount(f.t) + + newNetPolObj := oldNetPolObj.DeepCopy() + // oldNetPolObj.ResourceVersion value is "0" + newRV, _ := strconv.Atoi(oldNetPolObj.ResourceVersion) + newNetPolObj.ResourceVersion = fmt.Sprintf("%d", newRV+1) + dp.EXPECT().UpdatePolicy(gomock.Any()).Times(1) + + updateNetPol(t, f, oldNetPolObj, newNetPolObj) + testCases := []expectedNetPolValues{ + {1, 0, 1, execCount + 1}, + } + checkNetPolTestResult("TestUpdateNetPol", f, testCases) +} + +func TestLabelUpdateNetworkPolicy(t *testing.T) { + oldNetPolObj := createNetPol() + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, oldNetPolObj) + f.kubeobjects = append(f.kubeobjects, oldNetPolObj) + stopCh := make(chan struct{}) + defer close(stopCh) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f.newNetPolController(stopCh, dp) + + execCount := resetPrometheusAndGetExecCount(f.t) + + newNetPolObj := oldNetPolObj.DeepCopy() + // update podSelctor in a new network policy field + newNetPolObj.Spec.PodSelector = metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + "new": "test", + }, + } + // oldNetPolObj.ResourceVersion value is "0" + newRV, _ := strconv.Atoi(oldNetPolObj.ResourceVersion) + newNetPolObj.ResourceVersion = fmt.Sprintf("%d", newRV+1) + dp.EXPECT().UpdatePolicy(gomock.Any()).Times(2) + + updateNetPol(t, f, oldNetPolObj, newNetPolObj) + + testCases := []expectedNetPolValues{ + {1, 0, 1, execCount + 2}, + } + checkNetPolTestResult("TestUpdateNetPol", f, testCases) +} diff --git a/npm/pkg/controlplane/controllers/v2/podController_test.go b/npm/pkg/controlplane/controllers/v2/podController_test.go new file mode 100644 index 0000000000..07e4d18965 --- /dev/null +++ b/npm/pkg/controlplane/controllers/v2/podController_test.go @@ -0,0 +1,770 @@ +// Copyright 2018 Microsoft. All rights reserved. +// MIT License +package controllers + +import ( + "fmt" + "reflect" + "strconv" + "testing" + + "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" + dpmocks "github.com/Azure/azure-container-networking/npm/pkg/dataplane/mocks" + gomock "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kubeinformers "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" +) + +const ( + HostNetwork = true + NonHostNetwork = false +) + +// To indicate the object is needed to be DeletedFinalStateUnknown Object +type IsDeletedFinalStateUnknownObject bool + +const ( + DeletedFinalStateUnknownObject IsDeletedFinalStateUnknownObject = true + DeletedFinalStateknownObject IsDeletedFinalStateUnknownObject = false +) + +type podFixture struct { + t *testing.T + + // Objects to put in the store. + podLister []*corev1.Pod + // Objects from here preloaded into NewSimpleFake. + kubeobjects []runtime.Object + + dp dataplane.GenericDataplane + podController *PodController + kubeInformer kubeinformers.SharedInformerFactory +} + +func newFixture(t *testing.T, dp dataplane.GenericDataplane) *podFixture { + f := &podFixture{ + t: t, + podLister: []*corev1.Pod{}, + kubeobjects: []runtime.Object{}, + dp: dp, + } + return f +} + +func getKey(obj interface{}, t *testing.T) string { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + t.Errorf("Unexpected error getting key for obj %v: %v", obj, err) + return "" + } + return key +} + +func (f *podFixture) newPodController(_ chan struct{}) { + kubeclient := k8sfake.NewSimpleClientset(f.kubeobjects...) + f.kubeInformer = kubeinformers.NewSharedInformerFactory(kubeclient, noResyncPeriodFunc()) + + npmNamespaceCache := &NpmNamespaceCache{NsMap: make(map[string]*Namespace)} + f.podController = NewPodController(f.kubeInformer.Core().V1().Pods(), f.dp, npmNamespaceCache) + + for _, pod := range f.podLister { + err := f.kubeInformer.Core().V1().Pods().Informer().GetIndexer().Add(pod) + if err != nil { + f.t.Errorf("Failed to add pod %v to informer cache: %v", pod, err) + } + } + + // Do not start informer to avoid unnecessary event triggers + // (TODO): Leave stopCh and below commented code to enhance UTs to even check event triggers as well later if possible + // f.kubeInformer.Start(stopCh) +} + +func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNewtwork bool, podPhase corev1.PodPhase) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + Labels: labels, + ResourceVersion: rv, + }, + Spec: corev1.PodSpec{ + HostNetwork: isHostNewtwork, + Containers: []corev1.Container{ + { + Ports: []corev1.ContainerPort{ + { + Name: fmt.Sprintf("app:%s", name), + ContainerPort: 8080, + // Protocol: "TCP", + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + Phase: podPhase, + PodIP: podIP, + }, + } +} + +func addPod(t *testing.T, f *podFixture, podObj *corev1.Pod) { + // simulate pod add event and add pod object to sharedInformer cache + f.podController.addPod(podObj) + + if f.podController.workqueue.Len() == 0 { + t.Logf("Add Pod: worker queue length is 0 ") + return + } + + f.podController.processNextWorkItem() +} + +func deletePod(t *testing.T, f *podFixture, podObj *corev1.Pod, isDeletedFinalStateUnknownObject IsDeletedFinalStateUnknownObject) { + addPod(t, f, podObj) + t.Logf("Complete add pod event") + + // simulate pod delete event and delete pod object from sharedInformer cache + err := f.kubeInformer.Core().V1().Pods().Informer().GetIndexer().Delete(podObj) + if err != nil { + f.t.Errorf("Failed to add pod %v to informer cache: %v", podObj, err) + } + + if isDeletedFinalStateUnknownObject { + podKey := getKey(podObj, t) + tombstone := cache.DeletedFinalStateUnknown{ + Key: podKey, + Obj: podObj, + } + f.podController.deletePod(tombstone) + } else { + f.podController.deletePod(podObj) + } + + if f.podController.workqueue.Len() == 0 { + t.Logf("Delete Pod: worker queue length is 0 ") + return + } + + f.podController.processNextWorkItem() +} + +// Need to make more cases - interestings.. +func updatePod(t *testing.T, f *podFixture, oldPodObj, newPodObj *corev1.Pod) { + addPod(t, f, oldPodObj) + t.Logf("Complete add pod event") + + // simulate pod update event and update the pod to shared informer's cache + err := f.kubeInformer.Core().V1().Pods().Informer().GetIndexer().Update(newPodObj) + if err != nil { + f.t.Errorf("Failed to add pod %v to informer cache: %v", newPodObj, err) + } + f.podController.updatePod(oldPodObj, newPodObj) + + if f.podController.workqueue.Len() == 0 { + t.Logf("Update Pod: worker queue length is 0 ") + return + } + + f.podController.processNextWorkItem() +} + +type expectedValues struct { + expectedLenOfPodMap int + expectedLenOfNsMap int + expectedLenOfWorkQueue int +} + +func checkPodTestResult(testName string, f *podFixture, testCases []expectedValues) { + for _, test := range testCases { + if got := len(f.podController.podMap); got != test.expectedLenOfPodMap { + f.t.Errorf("%s failed @ PodMap length = %d, want %d", testName, got, test.expectedLenOfPodMap) + } + if got := len(f.podController.npmNamespaceCache.NsMap); got != test.expectedLenOfNsMap { + f.t.Errorf("%s failed @ NsMap length = %d, want %d", testName, got, test.expectedLenOfNsMap) + } + if got := f.podController.workqueue.Len(); got != test.expectedLenOfWorkQueue { + f.t.Errorf("%s failed @ Workqueue length = %d, want %d", testName, got, test.expectedLenOfWorkQueue) + } + } +} + +func checkNpmPodWithInput(testName string, f *podFixture, inputPodObj *corev1.Pod) { + podKey := getKey(inputPodObj, f.t) + cachedNpmPodObj := f.podController.podMap[podKey] + + if cachedNpmPodObj.PodIP != inputPodObj.Status.PodIP { + f.t.Errorf("%s failed @ PodIp check got = %s, want %s", testName, cachedNpmPodObj.PodIP, inputPodObj.Status.PodIP) + } + + if !reflect.DeepEqual(cachedNpmPodObj.Labels, inputPodObj.Labels) { + f.t.Errorf("%s failed @ Labels check got = %v, want %v", testName, cachedNpmPodObj.Labels, inputPodObj.Labels) + } + + inputPortList := getContainerPortList(inputPodObj) + if !reflect.DeepEqual(cachedNpmPodObj.ContainerPorts, inputPortList) { + f.t.Errorf("%s failed @ Container port check got = %v, want %v", testName, cachedNpmPodObj.PodIP, inputPortList) + } +} + +func TestAddMultiplePods(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + podObj1 := createPod("test-pod-1", "test-ns", "1", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning) + podObj2 := createPod("test-pod-2", "test-ns", "0", "1.2.3.5", labels, NonHostNetwork, corev1.PodRunning) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + + f.podLister = append(f.podLister, podObj1, podObj2) + f.kubeobjects = append(f.kubeobjects, podObj1, podObj2) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + mockIPSets := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-ns", ipsets.Namespace), + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfPod), + ipsets.NewIPSetMetadata("app:test-pod", ipsets.KeyValueLabelOfPod), + } + podMetadata1 := dataplane.NewPodMetadata("test-ns/test-pod-1", "1.2.3.4", "") + podMetadata2 := dataplane.NewPodMetadata("test-ns/test-pod-2", "1.2.3.5", "") + + dp.EXPECT().AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, mockIPSets[:1]).Return(nil).Times(1) + for _, metaData := range []*dataplane.PodMetadata{podMetadata1, podMetadata2} { + dp.EXPECT().AddToSets(mockIPSets[:1], metaData).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[1:], metaData).Return(nil).Times(1) + } + dp.EXPECT(). + AddToSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod-1", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-ns/test-pod-1", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + dp.EXPECT(). + AddToSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod-2", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-ns/test-pod-2", "1.2.3.5,8080", ""), + ). + Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + + addPod(t, f, podObj1) + addPod(t, f, podObj2) + + testCases := []expectedValues{ + {2, 1, 0}, + } + checkPodTestResult("TestAddMultiplePods", f, testCases) + checkNpmPodWithInput("TestAddMultiplePods", f, podObj1) + checkNpmPodWithInput("TestAddMultiplePods", f, podObj2) +} + +func TestAddPod(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + podObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + f.podLister = append(f.podLister, podObj) + f.kubeobjects = append(f.kubeobjects, podObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + mockIPSets := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfPod), + ipsets.NewIPSetMetadata("app:test-pod", ipsets.KeyValueLabelOfPod), + } + podMetadata1 := dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4", "") + + dp.EXPECT().AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, mockIPSets[:1]).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + dp.EXPECT(). + AddToSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(1) + + addPod(t, f, podObj) + testCases := []expectedValues{ + {1, 1, 0}, + } + checkPodTestResult("TestAddPod", f, testCases) + checkNpmPodWithInput("TestAddPod", f, podObj) +} + +func TestAddHostNetworkPod(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + podObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, HostNetwork, corev1.PodRunning) + podKey := getKey(podObj, t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + f.podLister = append(f.podLister, podObj) + f.kubeobjects = append(f.kubeobjects, podObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + addPod(t, f, podObj) + testCases := []expectedValues{ + {0, 0, 0}, + } + checkPodTestResult("TestAddHostNetworkPod", f, testCases) + + if _, exists := f.podController.podMap[podKey]; exists { + t.Error("TestAddHostNetworkPod failed @ cached pod obj exists check") + } +} + +func TestDeletePod(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + podObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning) + podKey := getKey(podObj, t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + f.podLister = append(f.podLister, podObj) + f.kubeobjects = append(f.kubeobjects, podObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + // Add pod section + mockIPSets := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfPod), + ipsets.NewIPSetMetadata("app:test-pod", ipsets.KeyValueLabelOfPod), + } + podMetadata1 := dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4", "") + + dp.EXPECT().AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, mockIPSets[:1]).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + dp.EXPECT(). + AddToSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + // Delete pod section + dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + dp.EXPECT(). + RemoveFromSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + + deletePod(t, f, podObj, DeletedFinalStateknownObject) + testCases := []expectedValues{ + {0, 1, 0}, + } + + checkPodTestResult("TestDeletePod", f, testCases) + if _, exists := f.podController.podMap[podKey]; exists { + t.Error("TestDeletePod failed @ cached pod obj exists check") + } +} + +func TestDeleteHostNetworkPod(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + podObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, HostNetwork, corev1.PodRunning) + podKey := getKey(podObj, t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + f.podLister = append(f.podLister, podObj) + f.kubeobjects = append(f.kubeobjects, podObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + deletePod(t, f, podObj, DeletedFinalStateknownObject) + testCases := []expectedValues{ + {0, 0, 0}, + } + checkPodTestResult("TestDeleteHostNetworkPod", f, testCases) + if _, exists := f.podController.podMap[podKey]; exists { + t.Error("TestDeleteHostNetworkPod failed @ cached pod obj exists check") + } +} + +// this UT only tests deletePod event handler function in podController +func TestDeletePodWithTombstone(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + podObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + podKey := getKey(podObj, t) + tombstone := cache.DeletedFinalStateUnknown{ + Key: podKey, + Obj: podObj, + } + + f.podController.deletePod(tombstone) + testCases := []expectedValues{ + {0, 0, 1}, + } + checkPodTestResult("TestDeletePodWithTombstone", f, testCases) +} + +func TestDeletePodWithTombstoneAfterAddingPod(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + podObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + f.podLister = append(f.podLister, podObj) + f.kubeobjects = append(f.kubeobjects, podObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + // Add pod section + mockIPSets := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfPod), + ipsets.NewIPSetMetadata("app:test-pod", ipsets.KeyValueLabelOfPod), + } + podMetadata1 := dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4", "") + + dp.EXPECT().AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, mockIPSets[:1]).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + dp.EXPECT(). + AddToSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + // Delete pod section + dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + dp.EXPECT(). + RemoveFromSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + + deletePod(t, f, podObj, DeletedFinalStateUnknownObject) + testCases := []expectedValues{ + {0, 1, 0}, + } + checkPodTestResult("TestDeletePodWithTombstoneAfterAddingPod", f, testCases) +} + +func TestLabelUpdatePod(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + oldPodObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + f.podLister = append(f.podLister, oldPodObj) + f.kubeobjects = append(f.kubeobjects, oldPodObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + newPodObj := oldPodObj.DeepCopy() + newPodObj.Labels = map[string]string{ + "app": "new-test-pod", + } + // oldPodObj.ResourceVersion value is "0" + newRV, _ := strconv.Atoi(oldPodObj.ResourceVersion) + newPodObj.ResourceVersion = fmt.Sprintf("%d", newRV+1) + + // Add pod section + mockIPSets := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfPod), + ipsets.NewIPSetMetadata("app:test-pod", ipsets.KeyValueLabelOfPod), + } + podMetadata1 := dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4", "") + + dp.EXPECT().AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, mockIPSets[:1]).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + dp.EXPECT(). + AddToSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + // Update section + dp.EXPECT().RemoveFromSets(mockIPSets[2:], podMetadata1).Return(nil).Times(1) + dp.EXPECT().AddToSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:new-test-pod", ipsets.KeyValueLabelOfPod)}, podMetadata1).Return(nil).Times(1) + + updatePod(t, f, oldPodObj, newPodObj) + + testCases := []expectedValues{ + {1, 1, 0}, + } + checkPodTestResult("TestLabelUpdatePod", f, testCases) + checkNpmPodWithInput("TestLabelUpdatePod", f, newPodObj) +} + +func TestIPAddressUpdatePod(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + oldPodObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + f.podLister = append(f.podLister, oldPodObj) + f.kubeobjects = append(f.kubeobjects, oldPodObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + newPodObj := oldPodObj.DeepCopy() + // oldPodObj.ResourceVersion value is "0" + newRV, _ := strconv.Atoi(oldPodObj.ResourceVersion) + newPodObj.ResourceVersion = fmt.Sprintf("%d", newRV+1) + // oldPodObj PodIP is "1.2.3.4" + newPodObj.Status.PodIP = "4.3.2.1" + // Add pod section + mockIPSets := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfPod), + ipsets.NewIPSetMetadata("app:test-pod", ipsets.KeyValueLabelOfPod), + } + podMetadata1 := dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4", "") + + dp.EXPECT().AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, mockIPSets[:1]).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + dp.EXPECT(). + AddToSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + // Delete pod section + dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + dp.EXPECT(). + RemoveFromSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + // New IP Pod add + podMetadata2 := dataplane.NewPodMetadata("test-namespace/test-pod", "4.3.2.1", "") + dp.EXPECT().AddToSets(mockIPSets[:1], podMetadata2).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[1:], podMetadata2).Return(nil).Times(1) + dp.EXPECT(). + AddToSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "4.3.2.1,8080", ""), + ). + Return(nil).Times(1) + + updatePod(t, f, oldPodObj, newPodObj) + + testCases := []expectedValues{ + {1, 1, 0}, + } + checkPodTestResult("TestIPAddressUpdatePod", f, testCases) + checkNpmPodWithInput("TestIPAddressUpdatePod", f, newPodObj) +} + +func TestPodStatusUpdatePod(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + oldPodObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning) + podKey := getKey(oldPodObj, t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + f.podLister = append(f.podLister, oldPodObj) + f.kubeobjects = append(f.kubeobjects, oldPodObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + newPodObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodSucceeded) + // oldPodObj.ResourceVersion value is "0" + newRV, _ := strconv.Atoi(oldPodObj.ResourceVersion) + newPodObj.ResourceVersion = fmt.Sprintf("%d", newRV+1) + + mockIPSets := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfPod), + ipsets.NewIPSetMetadata("app:test-pod", ipsets.KeyValueLabelOfPod), + } + podMetadata1 := dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4", "") + + dp.EXPECT().AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, mockIPSets[:1]).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + dp.EXPECT(). + AddToSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + // Delete pod section + dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + dp.EXPECT(). + RemoveFromSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + + updatePod(t, f, oldPodObj, newPodObj) + + testCases := []expectedValues{ + {0, 1, 0}, + } + checkPodTestResult("TestPodStatusUpdatePod", f, testCases) + if _, exists := f.podController.podMap[podKey]; exists { + t.Error("TestPodStatusUpdatePod failed @ cached pod obj exists check") + } +} + +func TestPodMapMarshalJSON(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + labels := map[string]string{ + "app": "test-pod", + } + pod := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning) + podKey, err := cache.MetaNamespaceKeyFunc(pod) + assert.NoError(t, err) + + npmPod := newNpmPod(pod) + f.podController.podMap[podKey] = npmPod + + npMapRaw, err := f.podController.MarshalJSON() + assert.NoError(t, err) + + expect := []byte(`{"test-namespace/test-pod":{"Name":"test-pod","Namespace":"test-namespace","PodIP":"1.2.3.4","Labels":{},"ContainerPorts":[],"Phase":"Running"}}`) + fmt.Printf("%s\n", string(npMapRaw)) + assert.ElementsMatch(t, expect, npMapRaw) +} + +func TestHasValidPodIP(t *testing.T) { + podObj := &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: "Running", + PodIP: "1.2.3.4", + }, + } + if ok := hasValidPodIP(podObj); !ok { + t.Errorf("TestisValidPod failed @ isValidPod") + } +} + +// Extra unit test which is not quite related to PodController, +// but help to understand how workqueue works to make event handler logic lock-free. +// If the same key are queued into workqueue in multiple times, +// they are combined into one item (accurately, if the item is not processed). +func TestWorkQueue(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + podObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + + f.podLister = append(f.podLister, podObj) + f.kubeobjects = append(f.kubeobjects, podObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + podKeys := []string{"test-pod", "test-pod", "test-pod1"} + expectedWorkQueueLength := []int{1, 1, 2} + + for idx, podKey := range podKeys { + f.podController.workqueue.Add(podKey) + workQueueLength := f.podController.workqueue.Len() + if workQueueLength != expectedWorkQueueLength[idx] { + t.Errorf("TestWorkQueue failed due to returned workqueue length = %d, want %d", + workQueueLength, expectedWorkQueueLength) + } + } +} diff --git a/npm/pkg/controlplane/controllers/v2/podcontroller.go b/npm/pkg/controlplane/controllers/v2/podcontroller.go index 3342969dba..738c31bdcc 100644 --- a/npm/pkg/controlplane/controllers/v2/podcontroller.go +++ b/npm/pkg/controlplane/controllers/v2/podcontroller.go @@ -269,7 +269,7 @@ func (c *PodController) processNextWorkItem() bool { // Forget here else we'd go into a loop of attempting to // process a work item that is invalid. c.workqueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %w", obj)) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue got %#v, err %w", obj, errWorkqueueFormatting)) return nil } // Run the syncPod, passing it the namespace/name string of the @@ -370,11 +370,8 @@ func (c *PodController) syncAddedPod(podObj *corev1.Pod) error { namespaceSet := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(podObj.Namespace, ipsets.Namespace)} - klog.Infof("Creating ipset %s for namespace if it doesn't already exist", podObj.Namespace) - c.dp.CreateIPSets(namespaceSet) - // Add the pod ip information into namespace's ipset. - klog.Infof("Adding pod %s to ipset %s", podObj.Status.PodIP, podObj.Namespace) + klog.Infof("Adding pod %s (ip : %s) to ipset %s", podKey, podObj.Status.PodIP, podObj.Namespace) if err = c.dp.AddToSets(namespaceSet, podMetadata); err != nil { return fmt.Errorf("[syncAddedPod] Error: failed to add pod to namespace ipset with err: %w", err) } @@ -385,22 +382,15 @@ func (c *PodController) syncAddedPod(podObj *corev1.Pod) error { // Get lists of podLabelKey and podLabelKey + podLavelValue ,and then start adding them to ipsets. for labelKey, labelVal := range podObj.Labels { - podIPSetName := util.GetIpSetFromLabelKV(labelKey, labelVal) + labelKeyValue := util.GetIpSetFromLabelKV(labelKey, labelVal) targetSetKey := ipsets.NewIPSetMetadata(labelKey, ipsets.KeyLabelOfPod) - targetSetKeyValue := ipsets.NewIPSetMetadata(podIPSetName, ipsets.KeyValueLabelOfPod) + targetSetKeyValue := ipsets.NewIPSetMetadata(labelKeyValue, ipsets.KeyValueLabelOfPod) allSets := []*ipsets.IPSetMetadata{targetSetKey, targetSetKeyValue} klog.Infof("Creating ipsets %v if it does not already exist", allSets) - c.dp.CreateIPSets(allSets) - - klog.Infof("Adding pod %s to ipset %s", npmPodObj.PodIP, labelKey) - if err = c.dp.AddToSets([]*ipsets.IPSetMetadata{targetSetKey}, podMetadata); err != nil { - return fmt.Errorf("[syncAddedPod] Error: failed to add pod to label ipset with err: %w", err) - } - - klog.Infof("Adding pod %s to ipset %s", npmPodObj.PodIP, podIPSetName) - if err = c.dp.AddToSets([]*ipsets.IPSetMetadata{targetSetKeyValue}, podMetadata); err != nil { + klog.Infof("Adding pod %s (ip : %s) to ipset %s and %s", podKey, npmPodObj.PodIP, labelKey, labelKeyValue) + if err = c.dp.AddToSets(allSets, podMetadata); err != nil { return fmt.Errorf("[syncAddedPod] Error: failed to add pod to label ipset with err: %w", err) } npmPodObj.appendLabels(map[string]string{labelKey: labelVal}, appendToExistingLabels) @@ -409,7 +399,7 @@ func (c *PodController) syncAddedPod(podObj *corev1.Pod) error { // Add pod's named ports from its ipset. klog.Infof("Adding named port ipsets") containerPorts := getContainerPortList(podObj) - if err = c.manageNamedPortIpsets(containerPorts, podKey, npmPodObj.PodIP, addNamedPort); err != nil { + if err = c.manageNamedPortIpsets(containerPorts, podKey, npmPodObj.PodIP, podObj.Spec.NodeName, addNamedPort); err != nil { return fmt.Errorf("[syncAddedPod] Error: failed to add pod to named port ipset with err: %w", err) } npmPodObj.appendContainerPorts(podObj) @@ -422,16 +412,12 @@ func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) error { var err error podKey, _ := cache.MetaNamespaceKeyFunc(newPodObj) - newPodMetadata := dataplane.NewPodMetadata(podKey, newPodObj.Status.PodIP, newPodObj.Spec.NodeName) - // lock before using nsMap since nsMap is shared with namespace controller c.npmNamespaceCache.Lock() if _, exists := c.npmNamespaceCache.NsMap[newPodObj.Namespace]; !exists { // Create ipset related to namespace which this pod belong to if it does not exist. toBeAdded := []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(newPodObj.Namespace, ipsets.Namespace)} - c.dp.CreateIPSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(newPodObj.Namespace, ipsets.Namespace)}) - if err = c.dp.AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, toBeAdded); err != nil { c.npmNamespaceCache.Unlock() return fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add %s to all-namespace ipset list with err: %w", newPodObj.Namespace, err) @@ -476,19 +462,26 @@ func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) error { // Otherwise it returns list of deleted PodIP from cached pod's labels and list of added PodIp from new pod's labels addToIPSets, deleteFromIPSets := util.GetIPSetListCompareLabels(cachedNpmPod.Labels, newPodObj.Labels) + newPodMetadata := dataplane.NewPodMetadata(podKey, newPodObj.Status.PodIP, newPodObj.Spec.NodeName) + // todo: verify pulling nodename from newpod, + // if a pod is getting deleted, we do not have to cleanup policies, so it is okay to pass in wrong nodename + cachedPodMetadata := dataplane.NewPodMetadata(podKey, cachedNpmPod.PodIP, newPodMetadata.NodeName) // Delete the pod from its label's ipset. - for _, podIPSetName := range deleteFromIPSets { - klog.Infof("Deleting pod %s from ipset %s", cachedNpmPod.PodIP, podIPSetName) - - // todo: verify pulling nodename from newpod - cachedPodMetadata := dataplane.NewPodMetadata(podKey, cachedNpmPod.PodIP, newPodMetadata.NodeName) - - if err = c.dp.RemoveFromSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(podIPSetName, ipsets.KeyLabelOfPod)}, cachedPodMetadata); err != nil { + for _, removeIPSetName := range deleteFromIPSets { + klog.Infof("Deleting pod %s (ip : %s) from ipset %s", podKey, cachedNpmPod.PodIP, removeIPSetName) + + var toRemoveSet *ipsets.IPSetMetadata + if util.IsKeyValueLabelSetName(removeIPSetName) { + toRemoveSet = ipsets.NewIPSetMetadata(removeIPSetName, ipsets.KeyValueLabelOfPod) + } else { + toRemoveSet = ipsets.NewIPSetMetadata(removeIPSetName, ipsets.KeyLabelOfPod) + } + if err = c.dp.RemoveFromSets([]*ipsets.IPSetMetadata{toRemoveSet}, cachedPodMetadata); err != nil { return fmt.Errorf("[syncAddAndUpdatePod] Error: failed to delete pod from label ipset with err: %w", err) } // {IMPORTANT} The order of compared list will be key and then key+val. NPM should only append after both key // key + val ipsets are worked on. 0th index will be key and 1st index will be value of the label - removedLabelKey, removedLabelValue := util.GetLabelKVFromSet(podIPSetName) + removedLabelKey, removedLabelValue := util.GetLabelKVFromSet(removeIPSetName) if removedLabelValue != "" { cachedNpmPod.removeLabelsWithKey(removedLabelKey) } @@ -499,10 +492,15 @@ func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) error { klog.Infof("Creating ipset %s if it doesn't already exist", addIPSetName) - c.dp.CreateIPSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(addIPSetName, ipsets.CIDRBlocks)}) + var toAddSet *ipsets.IPSetMetadata + if util.IsKeyValueLabelSetName(addIPSetName) { + toAddSet = ipsets.NewIPSetMetadata(addIPSetName, ipsets.KeyValueLabelOfPod) + } else { + toAddSet = ipsets.NewIPSetMetadata(addIPSetName, ipsets.KeyLabelOfPod) + } - klog.Infof("Adding pod %s to ipset %s", newPodObj.Status.PodIP, addIPSetName) - if err = c.dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(addIPSetName, ipsets.CIDRBlocks)}, newPodMetadata); err != nil { + klog.Infof("Adding pod %s (ip : %s) to ipset %s", podKey, newPodObj.Status.PodIP, addIPSetName) + if err = c.dp.AddToSets([]*ipsets.IPSetMetadata{toAddSet}, newPodMetadata); err != nil { return fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add pod to label ipset with err: %w", err) } // {IMPORTANT} Same as above order is assumed to be key and then key+val. NPM should only append to existing labels @@ -525,14 +523,14 @@ func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) error { if !reflect.DeepEqual(cachedNpmPod.ContainerPorts, newPodPorts) { // Delete cached pod's named ports from its ipset. if err = c.manageNamedPortIpsets( - cachedNpmPod.ContainerPorts, podKey, cachedNpmPod.PodIP, deleteNamedPort); err != nil { + cachedNpmPod.ContainerPorts, podKey, cachedNpmPod.PodIP, "", deleteNamedPort); err != nil { return fmt.Errorf("[syncAddAndUpdatePod] Error: failed to delete pod from named port ipset with err: %w", err) } // Since portList ipset deletion is successful, NPM can remove cachedContainerPorts cachedNpmPod.removeContainerPorts() // Add new pod's named ports from its ipset. - if err = c.manageNamedPortIpsets(newPodPorts, podKey, newPodObj.Status.PodIP, addNamedPort); err != nil { + if err = c.manageNamedPortIpsets(newPodPorts, podKey, newPodObj.Status.PodIP, newPodObj.Spec.NodeName, addNamedPort); err != nil { return fmt.Errorf("[syncAddAndUpdatePod] Error: failed to add pod to named port ipset with err: %w", err) } cachedNpmPod.appendContainerPorts(newPodObj) @@ -552,28 +550,25 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error { } var err error + cachedPodMetadata := dataplane.NewPodMetadata(cachedNpmPodKey, cachedNpmPod.PodIP, "") // Delete the pod from its namespace's ipset. // note: NodeName empty is not going to call update pod if err = c.dp.RemoveFromSets( []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(cachedNpmPod.Namespace, ipsets.Namespace)}, - dataplane.NewPodMetadata(cachedNpmPod.PodIP, cachedNpmPodKey, "")); err != nil { + cachedPodMetadata); err != nil { return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from namespace ipset with err: %w", err) } // Get lists of podLabelKey and podLabelKey + podLavelValue ,and then start deleting them from ipsets for labelKey, labelVal := range cachedNpmPod.Labels { - klog.Infof("Deleting pod %s from ipset %s", cachedNpmPod.PodIP, labelKey) - if err = c.dp.RemoveFromSets( - []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(labelKey, ipsets.KeyLabelOfPod)}, - dataplane.NewPodMetadata(cachedNpmPod.PodIP, cachedNpmPodKey, "")); err != nil { - return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from label ipset with err: %w", err) - } - - podIPSetName := util.GetIpSetFromLabelKV(labelKey, labelVal) - klog.Infof("Deleting pod %s from ipset %s", cachedNpmPod.PodIP, podIPSetName) + labelKeyValue := util.GetIpSetFromLabelKV(labelKey, labelVal) + klog.Infof("Deleting pod %s (ip : %s) to ipset %s and %s", cachedNpmPodKey, cachedNpmPod.PodIP, labelKey, labelKeyValue) if err = c.dp.RemoveFromSets( - []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(podIPSetName, ipsets.KeyValueLabelOfPod)}, - dataplane.NewPodMetadata(cachedNpmPod.PodIP, cachedNpmPodKey, "")); err != nil { + []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata(labelKey, ipsets.KeyLabelOfPod), + ipsets.NewIPSetMetadata(labelKeyValue, ipsets.KeyValueLabelOfPod), + }, + cachedPodMetadata); err != nil { return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from label ipset with err: %w", err) } cachedNpmPod.removeLabelsWithKey(labelKey) @@ -581,7 +576,7 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error { // Delete pod's named ports from its ipset. Need to pass true in the manageNamedPortIpsets function call if err = c.manageNamedPortIpsets( - cachedNpmPod.ContainerPorts, cachedNpmPodKey, cachedNpmPod.PodIP, deleteNamedPort); err != nil { + cachedNpmPod.ContainerPorts, cachedNpmPodKey, cachedNpmPod.PodIP, "", deleteNamedPort); err != nil { return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from named port ipset with err: %w", err) } @@ -590,8 +585,8 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error { } // manageNamedPortIpsets helps with adding or deleting Pod namedPort IPsets. -func (c *PodController) manageNamedPortIpsets(portList []corev1.ContainerPort, podKey string, - podIP string, namedPortOperation NamedPortOperation) error { +func (c *PodController) manageNamedPortIpsets(portList []corev1.ContainerPort, podKey, + podIP, nodeName string, namedPortOperation NamedPortOperation) error { for _, port := range portList { klog.Infof("port is %+v", port) if port.Name == "" { @@ -605,18 +600,17 @@ func (c *PodController) manageNamedPortIpsets(portList []corev1.ContainerPort, p protocol = fmt.Sprintf("%s:", port.Protocol) } - namedPort := util.NamedPortIPSetPrefix + port.Name namedPortIpsetEntry := fmt.Sprintf("%s,%s%d", podIP, protocol, port.ContainerPort) // nodename in NewPodMetadata is nil so UpdatePod is ignored - podMetadata := dataplane.NewPodMetadata(podKey, namedPortIpsetEntry, "") + podMetadata := dataplane.NewPodMetadata(podKey, namedPortIpsetEntry, nodeName) switch namedPortOperation { case deleteNamedPort: - if err := c.dp.RemoveFromSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(namedPort, ipsets.NamedPorts)}, podMetadata); err != nil { + if err := c.dp.RemoveFromSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(port.Name, ipsets.NamedPorts)}, podMetadata); err != nil { return fmt.Errorf("failed to remove from set when deleting named port with err %w", err) } case addNamedPort: - if err := c.dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(namedPort, ipsets.NamedPorts)}, podMetadata); err != nil { + if err := c.dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(port.Name, ipsets.NamedPorts)}, podMetadata); err != nil { return fmt.Errorf("failed to add to set when deleting named port with err %w", err) } } diff --git a/npm/pkg/controlplane/translation/translatePolicy.go b/npm/pkg/controlplane/translation/translatePolicy.go index 76d64159da..df544fb9ae 100644 --- a/npm/pkg/controlplane/translation/translatePolicy.go +++ b/npm/pkg/controlplane/translation/translatePolicy.go @@ -473,14 +473,14 @@ func translateIngress(npmNetPol *policies.NPMNetworkPolicy, targetSelector *meta klog.Info("finished parsing ingress rule") } -func existIngress(npObj *networkingv1.NetworkPolicy) bool { //nolint:unused //it will be called from v2 networkPolicyController which will come in next PR +func existIngress(npObj *networkingv1.NetworkPolicy) bool { return !(npObj.Spec.Ingress != nil && len(npObj.Spec.Ingress) == 1 && len(npObj.Spec.Ingress[0].Ports) == 0 && len(npObj.Spec.Ingress[0].From) == 0) } -func translatePolicy(npObj *networkingv1.NetworkPolicy) *policies.NPMNetworkPolicy { //nolint:deadcode,unused //it will be called from v2 networkPolicyController which will come in next PR +func TranslatePolicy(npObj *networkingv1.NetworkPolicy) *policies.NPMNetworkPolicy { npmNetPol := &policies.NPMNetworkPolicy{ Name: npObj.ObjectMeta.Name, NameSpace: npObj.ObjectMeta.Namespace, diff --git a/npm/util/util.go b/npm/util/util.go index ea56410e27..696402e863 100644 --- a/npm/util/util.go +++ b/npm/util/util.go @@ -304,6 +304,10 @@ func GetIpSetFromLabelKV(k, v string) string { return fmt.Sprintf("%s%s%s", k, IpsetLabelDelimter, v) } +func IsKeyValueLabelSetName(k string) bool { + return strings.Contains(k, IpsetLabelDelimter) +} + func GetLabelKVFromSet(ipsetName string) (string, string) { strSplit := strings.Split(ipsetName, IpsetLabelDelimter) if len(strSplit) > 1 {