Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions npm/pkg/controlplane/controllers/v1/podController.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,18 @@ func (nPod *NpmPod) updateNpmPodAttributes(podObj *corev1.Pod) {
}
}

// noUpdate evaluates whether NpmPod is required to be update given podObj.
func (nPod *NpmPod) noUpdate(podObj *corev1.Pod) bool {
return nPod.Namespace == podObj.ObjectMeta.Namespace &&
nPod.Name == podObj.ObjectMeta.Name &&
nPod.Phase == podObj.Status.Phase &&
nPod.PodIP == podObj.Status.PodIP &&
util.IsSameLabels(nPod.Labels, podObj.ObjectMeta.Labels) &&
// TODO(jungukcho) to avoid using DeepEqual for ContainerPorts,
// it needs a precise sorting. Will optimize it later if needed.
reflect.DeepEqual(nPod.ContainerPorts, getContainerPortList(podObj))
}

type PodController struct {
podLister corelisters.PodLister
workqueue workqueue.RateLimitingInterface
Expand Down Expand Up @@ -168,7 +180,8 @@ func (c *PodController) addPod(obj interface{}) {
}
podObj, _ := obj.(*corev1.Pod)

// If newPodObj status is either corev1.PodSucceeded or corev1.PodFailed or DeletionTimestamp is set, do not need to add it into workqueue.
// To check whether this pod is needed to queue or not.
// If the pod are in completely terminated states, the pod is not enqueued to avoid unnecessary computation.
if isCompletePod(podObj) {
return
}
Expand Down Expand Up @@ -324,7 +337,9 @@ func (c *PodController) syncPod(key string) error {
return err
}

// If newPodObj status is either corev1.PodSucceeded or corev1.PodFailed or DeletionTimestamp is set, start clean-up the lastly applied states.
// if this pod is completely in terminated states (which means pod is gracefully shutdown),
// NPM starts clean-up the lastly applied states even in update events.
// This proactive clean-up helps to miss stale pod object in case delete event is missed.
if isCompletePod(pod) {
if err = c.cleanUpDeletedPod(key); err != nil {
return fmt.Errorf("Error: %v when when pod is in completed state.\n", err)
Expand All @@ -337,7 +352,7 @@ func (c *PodController) syncPod(key string) error {
// if pod does not have different states against lastly applied states stored in cachedNpmPod,
// podController does not need to reconcile this update.
// in this updatePod event, newPod was updated with states which PodController does not need to reconcile.
if isInvalidPodUpdate(cachedNpmPod, pod) {
if cachedNpmPod.noUpdate(pod) {
return nil
}
}
Expand Down Expand Up @@ -590,13 +605,20 @@ func (c *PodController) manageNamedPortIpsets(portList []corev1.ContainerPort, p
return nil
}

// isCompletePod evaluates whether this pod is completely in terminated states,
// which means pod is gracefully shutdown.
func isCompletePod(podObj *corev1.Pod) bool {
if podObj.DeletionTimestamp != nil {
// DeletionTimestamp and DeletionGracePeriodSeconds in pod are not nil,
// which means pod is expected to be deleted and
// DeletionGracePeriodSeconds value is zero, which means the pod is gracefully terminated.
if podObj.DeletionTimestamp != nil && podObj.DeletionGracePeriodSeconds != nil && *podObj.DeletionGracePeriodSeconds == 0 {
return true
}

// K8s categorizes Succeeded and Failed pods as a terminated pod and will not restart them
// K8s categorizes Succeeded and Failed pods as a terminated pod and will not restart them.
// So NPM will ignorer adding these pods
// TODO(jungukcho): what are the values of DeletionTimestamp and podObj.DeletionGracePeriodSeconds
// in either below status?
if podObj.Status.Phase == corev1.PodSucceeded || podObj.Status.Phase == corev1.PodFailed {
return true
}
Expand All @@ -618,15 +640,3 @@ func getContainerPortList(podObj *corev1.Pod) []corev1.ContainerPort {
}
return portList
}

// (TODO): better naming?
func isInvalidPodUpdate(npmPod *NpmPod, newPodObj *corev1.Pod) bool {
return npmPod.Namespace == newPodObj.ObjectMeta.Namespace &&
npmPod.Name == newPodObj.ObjectMeta.Name &&
npmPod.Phase == newPodObj.Status.Phase &&
npmPod.PodIP == newPodObj.Status.PodIP &&
newPodObj.ObjectMeta.DeletionTimestamp == nil &&
newPodObj.ObjectMeta.DeletionGracePeriodSeconds == nil &&
reflect.DeepEqual(npmPod.Labels, newPodObj.ObjectMeta.Labels) &&
reflect.DeepEqual(npmPod.ContainerPorts, getContainerPortList(newPodObj))
}
155 changes: 153 additions & 2 deletions npm/pkg/controlplane/controllers/v1/podController_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/Azure/azure-container-networking/npm/util"
testutils "github.com/Azure/azure-container-networking/test/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -86,7 +87,7 @@ func (f *podFixture) newPodController(stopCh chan struct{}) {
// f.kubeInformer.Start(stopCh)
}

func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNewtwork bool, podPhase corev1.PodPhase) *corev1.Pod {
func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNetwork bool, podPhase corev1.PodPhase) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -95,7 +96,7 @@ func createPod(name, ns, rv, podIP string, labels map[string]string, isHostNewtw
ResourceVersion: rv,
},
Spec: corev1.PodSpec{
HostNetwork: isHostNewtwork,
HostNetwork: isHostNetwork,
Containers: []corev1.Container{
{
Ports: []corev1.ContainerPort{
Expand Down Expand Up @@ -690,6 +691,88 @@ func TestHasValidPodIP(t *testing.T) {
}
}

func TestIsCompletePod(t *testing.T) {
var zeroGracePeriod int64
var defaultGracePeriod int64 = 30

type podState struct {
phase corev1.PodPhase
deletionTimestamp *metav1.Time
deletionGracePeriodSeconds *int64
}

tests := []struct {
name string
podState podState
expectedCompletedPod bool
}{

{
name: "pod is in running status",
podState: podState{
phase: corev1.PodRunning,
deletionTimestamp: nil,
deletionGracePeriodSeconds: nil,
},
expectedCompletedPod: false,
},
{
name: "pod is in completely terminating states after graceful shutdown period",
podState: podState{
phase: corev1.PodRunning,
deletionTimestamp: &metav1.Time{},
deletionGracePeriodSeconds: &zeroGracePeriod,
},
expectedCompletedPod: true,
},
{
name: "pod is in terminating states, but in graceful shutdown period",
podState: podState{
phase: corev1.PodRunning,
deletionTimestamp: &metav1.Time{},
deletionGracePeriodSeconds: &defaultGracePeriod,
},
expectedCompletedPod: false,
},
{
name: "pod is in PodSucceeded status",
podState: podState{
phase: corev1.PodSucceeded,
deletionTimestamp: nil,
deletionGracePeriodSeconds: nil,
},
expectedCompletedPod: true,
},
{
name: "pod is in PodFailed status",
podState: podState{
phase: corev1.PodSucceeded,
deletionTimestamp: nil,
deletionGracePeriodSeconds: nil,
},
expectedCompletedPod: true,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
corev1Pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
DeletionTimestamp: tt.podState.deletionTimestamp,
DeletionGracePeriodSeconds: tt.podState.deletionGracePeriodSeconds,
},
Status: corev1.PodStatus{
Phase: tt.podState.phase,
},
}
isPodCompleted := isCompletePod(corev1Pod)
require.Equal(t, tt.expectedCompletedPod, isPodCompleted)
})
}
}

// Extra unit test which is not quite related to PodController,
// but help to understand how workqueue works to make event handler logic lock-free.
// If the same key are queued into workqueue in multiple times,
Expand Down Expand Up @@ -721,3 +804,71 @@ func TestWorkQueue(t *testing.T) {
}
}
}

func TestNPMPodNoUpdate(t *testing.T) {
type podInfo struct {
podName string
ns string
rv string
podIP string
labels map[string]string
isHostNetwork bool
podPhase corev1.PodPhase
}

labels := map[string]string{
"app": "test-pod",
}

tests := []struct {
name string
podInfo
updatingNPMPod bool
expectedNoUpdate bool
}{
{
"Required update of NPMPod given Pod",
podInfo{
podName: "test-pod-1",
ns: "test-namespace",
rv: "0",
podIP: "1.2.3.4",
labels: labels,
isHostNetwork: NonHostNetwork,
podPhase: corev1.PodRunning,
},
false,
false,
},
{
"No required update of NPMPod given Pod",
podInfo{
podName: "test-pod-2",
ns: "test-namespace",
rv: "0",
podIP: "1.2.3.4",
labels: labels,
isHostNetwork: NonHostNetwork,
podPhase: corev1.PodRunning,
},
true,
true,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
corev1Pod := createPod(tt.podName, tt.ns, tt.rv, tt.podIP, tt.labels, tt.isHostNetwork, tt.podPhase)
npmPod := newNpmPod(corev1Pod)
if tt.updatingNPMPod {
npmPod.appendLabels(corev1Pod.Labels, AppendToExistingLabels)
npmPod.updateNpmPodAttributes(corev1Pod)
npmPod.appendContainerPorts(corev1Pod)
}
noUpdate := npmPod.noUpdate(corev1Pod)
require.Equal(t, tt.expectedNoUpdate, noUpdate)
})
}
}
15 changes: 15 additions & 0 deletions npm/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,18 @@ func CompareSlices(list1, list2 []string) bool {
func SliceToString(list []string) string {
return strings.Join(list, SetPolicyDelimiter)
}

// IsSameLabels return if all pairs of key and value in two maps are same.
// Otherwise, it returns false.
func IsSameLabels(labelA, labelB map[string]string) bool {
if len(labelA) != len(labelB) {
return false
}

for labelKey, labelVal := range labelA {
if val, exist := labelB[labelKey]; !exist || labelVal != val {
return false
}
}
return true
}
Loading