diff --git a/npm/pkg/controlplane/controllers/v1/podController.go b/npm/pkg/controlplane/controllers/v1/podController.go index c3b8acf414..4aacd5dc61 100644 --- a/npm/pkg/controlplane/controllers/v1/podController.go +++ b/npm/pkg/controlplane/controllers/v1/podController.go @@ -21,6 +21,7 @@ import ( coreinformer "k8s.io/client-go/informers/core/v1" corelisters "k8s.io/client-go/listers/core/v1" + k8slabels "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog" @@ -88,7 +89,7 @@ func (nPod *NpmPod) noUpdate(podObj *corev1.Pod) bool { nPod.Name == podObj.ObjectMeta.Name && nPod.Phase == podObj.Status.Phase && nPod.PodIP == podObj.Status.PodIP && - util.IsSameLabels(nPod.Labels, podObj.ObjectMeta.Labels) && + k8slabels.Equals(nPod.Labels, podObj.ObjectMeta.Labels) && // TODO(jungukcho) to avoid using DeepEqual for ContainerPorts, // it needs a precise sorting. Will optimize it later if needed. reflect.DeepEqual(nPod.ContainerPorts, getContainerPortList(podObj)) diff --git a/npm/pkg/controlplane/controllers/v2/podController_test.go b/npm/pkg/controlplane/controllers/v2/podController_test.go index 07e4d18965..86ebba3689 100644 --- a/npm/pkg/controlplane/controllers/v2/podController_test.go +++ b/npm/pkg/controlplane/controllers/v2/podController_test.go @@ -13,6 +13,7 @@ import ( dpmocks "github.com/Azure/azure-container-networking/npm/pkg/dataplane/mocks" gomock "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -85,7 +86,7 @@ func (f *podFixture) newPodController(_ chan struct{}) { // f.kubeInformer.Start(stopCh) } -func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNewtwork bool, podPhase corev1.PodPhase) *corev1.Pod { +func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNetwork bool, podPhase corev1.PodPhase) *corev1.Pod { return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -94,7 +95,7 @@ func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNewtw ResourceVersion: rv, }, Spec: corev1.PodSpec{ - HostNetwork: isHostNewtwork, + HostNetwork: isHostNetwork, Containers: []corev1.Container{ { Ports: []corev1.ContainerPort{ @@ -734,6 +735,88 @@ func TestHasValidPodIP(t *testing.T) { } } +func TestIsCompletePod(t *testing.T) { + var zeroGracePeriod int64 + var defaultGracePeriod int64 = 30 + + type podState struct { + phase corev1.PodPhase + deletionTimestamp *metav1.Time + deletionGracePeriodSeconds *int64 + } + + tests := []struct { + name string + podState podState + expectedCompletedPod bool + }{ + + { + name: "pod is in running status", + podState: podState{ + phase: corev1.PodRunning, + deletionTimestamp: nil, + deletionGracePeriodSeconds: nil, + }, + expectedCompletedPod: false, + }, + { + name: "pod is in completely terminating states after graceful shutdown period", + podState: podState{ + phase: corev1.PodRunning, + deletionTimestamp: &metav1.Time{}, + deletionGracePeriodSeconds: &zeroGracePeriod, + }, + expectedCompletedPod: true, + }, + { + name: "pod is in terminating states, but in graceful shutdown period", + podState: podState{ + phase: corev1.PodRunning, + deletionTimestamp: &metav1.Time{}, + deletionGracePeriodSeconds: &defaultGracePeriod, + }, + expectedCompletedPod: false, + }, + { + name: "pod is in PodSucceeded status", + podState: podState{ + phase: corev1.PodSucceeded, + deletionTimestamp: nil, + deletionGracePeriodSeconds: nil, + }, + expectedCompletedPod: true, + }, + { + name: "pod is in PodFailed status", + podState: podState{ + phase: corev1.PodSucceeded, + deletionTimestamp: nil, + deletionGracePeriodSeconds: nil, + }, + expectedCompletedPod: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + corev1Pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: tt.podState.deletionTimestamp, + DeletionGracePeriodSeconds: tt.podState.deletionGracePeriodSeconds, + }, + Status: corev1.PodStatus{ + Phase: tt.podState.phase, + }, + } + isPodCompleted := isCompletePod(corev1Pod) + require.Equal(t, tt.expectedCompletedPod, isPodCompleted) + }) + } +} + // Extra unit test which is not quite related to PodController, // but help to understand how workqueue works to make event handler logic lock-free. // If the same key are queued into workqueue in multiple times, @@ -768,3 +851,71 @@ func TestWorkQueue(t *testing.T) { } } } + +func TestNPMPodNoUpdate(t *testing.T) { + type podInfo struct { + podName string + ns string + rv string + podIP string + labels map[string]string + isHostNetwork bool + podPhase corev1.PodPhase + } + + labels := map[string]string{ + "app": "test-pod", + } + + tests := []struct { + name string + podInfo + updatingNPMPod bool + expectedNoUpdate bool + }{ + { + "Required update of NPMPod given Pod", + podInfo{ + podName: "test-pod-1", + ns: "test-namespace", + rv: "0", + podIP: "1.2.3.4", + labels: labels, + isHostNetwork: NonHostNetwork, + podPhase: corev1.PodRunning, + }, + false, + false, + }, + { + "No required update of NPMPod given Pod", + podInfo{ + podName: "test-pod-2", + ns: "test-namespace", + rv: "0", + podIP: "1.2.3.4", + labels: labels, + isHostNetwork: NonHostNetwork, + podPhase: corev1.PodRunning, + }, + true, + true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + corev1Pod := createPod(tt.podName, tt.ns, tt.rv, tt.podIP, tt.labels, tt.isHostNetwork, tt.podPhase) + npmPod := newNpmPod(corev1Pod) + if tt.updatingNPMPod { + npmPod.appendLabels(corev1Pod.Labels, appendToExistingLabels) + npmPod.updateNpmPodAttributes(corev1Pod) + npmPod.appendContainerPorts(corev1Pod) + } + noUpdate := npmPod.noUpdate(corev1Pod) + require.Equal(t, tt.expectedNoUpdate, noUpdate) + }) + } +} diff --git a/npm/pkg/controlplane/controllers/v2/podcontroller.go b/npm/pkg/controlplane/controllers/v2/podcontroller.go index 738c31bdcc..233fd8e279 100644 --- a/npm/pkg/controlplane/controllers/v2/podcontroller.go +++ b/npm/pkg/controlplane/controllers/v2/podcontroller.go @@ -16,6 +16,7 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + k8slabels "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" coreinformer "k8s.io/client-go/informers/core/v1" @@ -83,6 +84,18 @@ func (nPod *NpmPod) updateNpmPodAttributes(podObj *corev1.Pod) { } } +// noUpdate evaluates whether NpmPod is required to be update given podObj. +func (nPod *NpmPod) noUpdate(podObj *corev1.Pod) bool { + return nPod.Namespace == podObj.ObjectMeta.Namespace && + nPod.Name == podObj.ObjectMeta.Name && + nPod.Phase == podObj.Status.Phase && + nPod.PodIP == podObj.Status.PodIP && + k8slabels.Equals(nPod.Labels, podObj.ObjectMeta.Labels) && + // TODO(jungukcho) to avoid using DeepEqual for ContainerPorts, + // it needs a precise sorting. Will optimize it later if needed. + reflect.DeepEqual(nPod.ContainerPorts, getContainerPortList(podObj)) +} + type PodController struct { podLister corelisters.PodLister workqueue workqueue.RateLimitingInterface @@ -169,7 +182,8 @@ func (c *PodController) addPod(obj interface{}) { } podObj, _ := obj.(*corev1.Pod) - // If newPodObj status is either corev1.PodSucceeded or corev1.PodFailed or DeletionTimestamp is set, do not need to add it into workqueue. + // To check whether this pod is needed to queue or not. + // If the pod are in completely terminated states, the pod is not enqueued to avoid unnecessary computation. if isCompletePod(podObj) { return } @@ -333,7 +347,9 @@ func (c *PodController) syncPod(key string) error { return err } - // If newPodObj status is either corev1.PodSucceeded or corev1.PodFailed or DeletionTimestamp is set, start clean-up the lastly applied states. + // If this pod is completely in terminated states (which means pod is gracefully shutdown), + // NPM starts clean-up the lastly applied states even in update events. + // This proactive clean-up helps to miss stale pod object in case delete event is missed. if isCompletePod(pod) { if err = c.cleanUpDeletedPod(key); err != nil { return fmt.Errorf("Error: %w when when pod is in completed state", err) @@ -346,7 +362,7 @@ func (c *PodController) syncPod(key string) error { // if pod does not have different states against lastly applied states stored in cachedNpmPod, // podController does not need to reconcile this update. // in this updatePod event, newPod was updated with states which PodController does not need to reconcile. - if isInvalidPodUpdate(cachedNpmPod, pod) { + if cachedNpmPod.noUpdate(pod) { return nil } } @@ -619,13 +635,20 @@ func (c *PodController) manageNamedPortIpsets(portList []corev1.ContainerPort, p return nil } +// isCompletePod evaluates whether this pod is completely in terminated states, +// which means pod is gracefully shutdown. func isCompletePod(podObj *corev1.Pod) bool { - if podObj.DeletionTimestamp != nil { + // DeletionTimestamp and DeletionGracePeriodSeconds in pod are not nil, + // which means pod is expected to be deleted and + // DeletionGracePeriodSeconds value is zero, which means the pod is gracefully terminated. + if podObj.DeletionTimestamp != nil && podObj.DeletionGracePeriodSeconds != nil && *podObj.DeletionGracePeriodSeconds == 0 { return true } - // K8s categorizes Succeeded and Failed pods as a terminated pod and will not restart them + // K8s categorizes Succeeded and Failed pods as a terminated pod and will not restart them. // So NPM will ignorer adding these pods + // TODO(jungukcho): what are the values of DeletionTimestamp and podObj.DeletionGracePeriodSeconds + // in either below status? if podObj.Status.Phase == corev1.PodSucceeded || podObj.Status.Phase == corev1.PodFailed { return true } @@ -647,15 +670,3 @@ func getContainerPortList(podObj *corev1.Pod) []corev1.ContainerPort { } return portList } - -// (TODO): better naming? -func isInvalidPodUpdate(npmPod *NpmPod, newPodObj *corev1.Pod) bool { - return npmPod.Namespace == newPodObj.ObjectMeta.Namespace && - npmPod.Name == newPodObj.ObjectMeta.Name && - npmPod.Phase == newPodObj.Status.Phase && - npmPod.PodIP == newPodObj.Status.PodIP && - newPodObj.ObjectMeta.DeletionTimestamp == nil && - newPodObj.ObjectMeta.DeletionGracePeriodSeconds == nil && - reflect.DeepEqual(npmPod.Labels, newPodObj.ObjectMeta.Labels) && - reflect.DeepEqual(npmPod.ContainerPorts, getContainerPortList(newPodObj)) -} diff --git a/npm/util/util.go b/npm/util/util.go index 074d6adf95..696402e863 100644 --- a/npm/util/util.go +++ b/npm/util/util.go @@ -338,18 +338,3 @@ func CompareSlices(list1, list2 []string) bool { func SliceToString(list []string) string { return strings.Join(list, SetPolicyDelimiter) } - -// IsSameLabels return if all pairs of key and value in two maps are same. -// Otherwise, it returns false. -func IsSameLabels(labelA, labelB map[string]string) bool { - if len(labelA) != len(labelB) { - return false - } - - for labelKey, labelVal := range labelA { - if val, exist := labelB[labelKey]; !exist || labelVal != val { - return false - } - } - return true -} diff --git a/npm/util/util_test.go b/npm/util/util_test.go index 4d8b8a7a18..343e9da03e 100644 --- a/npm/util/util_test.go +++ b/npm/util/util_test.go @@ -4,7 +4,6 @@ import ( "reflect" "testing" - "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/version" ) @@ -324,108 +323,3 @@ func TestCompareSlices(t *testing.T) { t.Errorf("TestCompareSlices failed @ slice comparison 4") } } - -func TestIsSameLabels(t *testing.T) { - var nilLabel map[string]string - tests := []struct { - name string - labelA map[string]string - labelB map[string]string - expectedIsSameLabel bool - }{ - { - name: "Empty labels", - labelA: map[string]string{}, - labelB: map[string]string{}, - expectedIsSameLabel: true, - }, - { - name: "Empty label and Nil label", - labelA: map[string]string{}, - labelB: nilLabel, - expectedIsSameLabel: true, - }, - { - name: "Same labels", - labelA: map[string]string{ - "e": "f", - "c": "d", - "a": "b", - }, - labelB: map[string]string{ - "e": "f", - "c": "d", - "a": "b", - }, - expectedIsSameLabel: true, - }, - { - name: "Same labels with different ordered addition", - labelA: map[string]string{ - "e": "f", - "c": "d", - "a": "b", - }, - labelB: map[string]string{ - "c": "d", - "e": "f", - "a": "b", - }, - expectedIsSameLabel: true, - }, - { - name: "Different length", - labelA: map[string]string{ - "e": "f", - }, - labelB: map[string]string{ - "e": "f", - "a": "b", - }, - expectedIsSameLabel: false, - }, - { - name: "Different (empty map and non-empty map)", - labelA: map[string]string{}, - labelB: map[string]string{ - "e": "f", - "c": "d", - "a": "b", - }, - expectedIsSameLabel: false, - }, - { - name: "Different (nil map and non-empty map)", - labelA: nilLabel, - labelB: map[string]string{ - "e": "f", - "c": "d", - "a": "b", - }, - expectedIsSameLabel: false, - }, - { - name: "Have a different one pair of key and value", - labelA: map[string]string{ - "e": "f", - "d": "c", - "a": "b", - }, - labelB: map[string]string{ - "e": "f", - "c": "d", - "a": "b", - }, - expectedIsSameLabel: false, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - got := IsSameLabels(tt.labelA, tt.labelB) - require.Equal(t, tt.expectedIsSameLabel, got) - }) - } -}