From 6f0e357d23dff83d8df3611bd1405581d324119f Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Mon, 1 Aug 2022 12:37:37 -0700 Subject: [PATCH 01/18] print statements --- npm/pkg/controlplane/controllers/v2/podController.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index b9a84e0b7b..66c83d7b55 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -92,7 +92,13 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo return key, needSync } - if !hasValidPodIP(podObj) { + if !hasValidPodIP(podObj) { // podObj.Status.Phase == corev1.PodFailed) { + // TODO: ensure it is in failed state when has status Error and no IP + conditionsStrings := make([]string, len(podObj.Status.Conditions)) + for i, condition := range podObj.Status.Conditions { + conditionsStrings[i] = fmt.Sprintf("[%+v]", condition) + } + klog.Infof("DEBUGME: Pod %s/%s has no IP. status: %v. conditions: %+v", podObj.Namespace, podObj.Name, podObj.Status.Phase, conditionsStrings) return key, needSync } From c8b13a6dbbe103b677234575930621593f5e7805 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 16 Aug 2022 09:43:38 -0700 Subject: [PATCH 02/18] cleanup Running pod with empty IP --- .../controllers/v2/podController.go | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index 66c83d7b55..2851169621 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -42,6 +42,8 @@ type PodController struct { workqueue workqueue.RateLimitingInterface dp dataplane.GenericDataplane podMap map[string]*common.NpmPod // Key is / + // ipMap maps IP to pod key. It is relevant to the block comment in syncPod + ipMap map[string]string sync.RWMutex npmNamespaceCache *NpmNamespaceCache } @@ -52,6 +54,7 @@ func NewPodController(podInformer coreinformer.PodInformer, dp dataplane.Generic workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"), dp: dp, podMap: make(map[string]*common.NpmPod), + ipMap: make(map[string]string), npmNamespaceCache: npmNamespaceCache, } @@ -82,6 +85,10 @@ func (c *PodController) LengthOfPodMap() int { } // needSync filters the event if the event is not required to handle +// TODO allow sync for Running pods with empty IP if the pod exists in the pod cache? +// This would reduce member count in kernel, and allow us to delete ipsets that reference the pod's old IP. +// Might not be worth it since customers likely don't want to use clusters that are so memory-constrained that +// many of their pods are stuck in Error state with this Running but empty IP state. func (c *PodController) needSync(eventType string, obj interface{}) (string, bool) { needSync := false var key string @@ -92,13 +99,7 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo return key, needSync } - if !hasValidPodIP(podObj) { // podObj.Status.Phase == corev1.PodFailed) { - // TODO: ensure it is in failed state when has status Error and no IP - conditionsStrings := make([]string, len(podObj.Status.Conditions)) - for i, condition := range podObj.Status.Conditions { - conditionsStrings[i] = fmt.Sprintf("[%+v]", condition) - } - klog.Infof("DEBUGME: Pod %s/%s has no IP. status: %v. conditions: %+v", podObj.Namespace, podObj.Name, podObj.Status.Phase, conditionsStrings) + if !hasValidPodIP(podObj) { return key, needSync } @@ -333,6 +334,35 @@ func (c *PodController) syncPod(key string) error { } } + /* + Sometimes in Windows Server '22 nodes, pods will enter and remain in an Error state, where the Pod is running but has an empty IP. + Originally, the pod wasn't getting cleaned up (since it won't be completed), and the pod's old IP remained a part of the pod's old IPSets. + Any new pod that takes up the IP may have unexpected network policy behavior. + + At this point, we could see a Running pod with an empty IP only if deletePod() enqueued it, yet the current pod is Running when we get it from podLister. + + The old pod would have state similar to: + status: Running + IP: empty + conditions: + - [{Type:Initialized Status:True LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2022-08-08 17:12:41 +0000 GMT Reason: Message:}] + - [{Type:Ready Status:False LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2022-08-08 21:25:46 +0000 GMT Reason:ContainersNotReady ...}] + - [{Type:ContainersReady Status:False LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2022-08-08 21:30:01 +0000 GMT Reason:ContainersNotReady ...}] + - [{Type:PodScheduled Status:True LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2022-08-08 17:12:41 +0000 GMT Reason: Message:}] + */ + if pod.Status.PodIP == "" { + if npmPodExists { + operationKind = metrics.DeleteOp + if err = c.cleanUpDeletedPod(key); err != nil { + return fmt.Errorf("error: clean up failed when pod is running with empty IP. err: %w", err) + } + return nil + } + + // ignore update if somehow the Pod doesn't exist in the cache + return nil + } + operationKind, err = c.syncAddAndUpdatePod(pod) if err != nil { return fmt.Errorf("failed to sync pod due to %w", err) @@ -368,6 +398,7 @@ func (c *PodController) syncAddedPod(podObj *corev1.Pod) error { // Create npmPod and add it to the podMap npmPodObj := common.NewNpmPod(podObj) c.podMap[podKey] = npmPodObj + c.ipMap[podObj.Status.PodIP] = podKey // Get lists of podLabelKey and podLabelKey + podLavelValue ,and then start adding them to ipsets. for labelKey, labelVal := range podObj.Labels { @@ -401,6 +432,15 @@ func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) (metrics.Oper var err error podKey, _ := cache.MetaNamespaceKeyFunc(newPodObj) + oldPodKeyForIP := c.ipMap[newPodObj.Status.PodIP] + if oldPodKeyForIP != podKey { + klog.Infof("[syncAddAndUpdatePod] cleaning up old pod %s for IP %s", oldPodKeyForIP, newPodObj.Status.PodIP) + err = c.cleanUpDeletedPod(oldPodKeyForIP) + if err != nil { + return metrics.DeleteOp, fmt.Errorf("[syncAddAndUpdatePod] error: failed to clean up old pod %s for IP %s with err: %w", oldPodKeyForIP, newPodObj.Status.PodIP, err) + } + } + // lock before using nsMap since nsMap is shared with namespace controller c.npmNamespaceCache.Lock() if _, exists := c.npmNamespaceCache.NsMap[newPodObj.Namespace]; !exists { @@ -572,6 +612,11 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error { } delete(c.podMap, cachedNpmPodKey) + + podKey := c.ipMap[cachedNpmPod.PodIP] + if podKey == cachedNpmPodKey { + delete(c.ipMap, cachedNpmPod.PodIP) + } return nil } From f02347341a7d68dc9e71abd307556c8ac4a1a5b9 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 16 Aug 2022 13:30:41 -0700 Subject: [PATCH 03/18] add log line --- npm/pkg/controlplane/controllers/v2/podController.go | 1 + 1 file changed, 1 insertion(+) diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index 2851169621..33a436dc90 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -352,6 +352,7 @@ func (c *PodController) syncPod(key string) error { */ if pod.Status.PodIP == "" { if npmPodExists { + klog.Infof("[syncPod] cleaning up cached, non-complete pod with empty IP. pod: %s", key) operationKind = metrics.DeleteOp if err = c.cleanUpDeletedPod(key); err != nil { return fmt.Errorf("error: clean up failed when pod is running with empty IP. err: %w", err) From d66c7f20ed054c3cdbb556825d77bd2920b91ae0 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 16 Aug 2022 18:22:11 -0700 Subject: [PATCH 04/18] revert previous 3 commits --- .../controllers/v2/podController.go | 52 ------------------- 1 file changed, 52 deletions(-) diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index 33a436dc90..b9a84e0b7b 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -42,8 +42,6 @@ type PodController struct { workqueue workqueue.RateLimitingInterface dp dataplane.GenericDataplane podMap map[string]*common.NpmPod // Key is / - // ipMap maps IP to pod key. It is relevant to the block comment in syncPod - ipMap map[string]string sync.RWMutex npmNamespaceCache *NpmNamespaceCache } @@ -54,7 +52,6 @@ func NewPodController(podInformer coreinformer.PodInformer, dp dataplane.Generic workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Pods"), dp: dp, podMap: make(map[string]*common.NpmPod), - ipMap: make(map[string]string), npmNamespaceCache: npmNamespaceCache, } @@ -85,10 +82,6 @@ func (c *PodController) LengthOfPodMap() int { } // needSync filters the event if the event is not required to handle -// TODO allow sync for Running pods with empty IP if the pod exists in the pod cache? -// This would reduce member count in kernel, and allow us to delete ipsets that reference the pod's old IP. -// Might not be worth it since customers likely don't want to use clusters that are so memory-constrained that -// many of their pods are stuck in Error state with this Running but empty IP state. func (c *PodController) needSync(eventType string, obj interface{}) (string, bool) { needSync := false var key string @@ -334,36 +327,6 @@ func (c *PodController) syncPod(key string) error { } } - /* - Sometimes in Windows Server '22 nodes, pods will enter and remain in an Error state, where the Pod is running but has an empty IP. - Originally, the pod wasn't getting cleaned up (since it won't be completed), and the pod's old IP remained a part of the pod's old IPSets. - Any new pod that takes up the IP may have unexpected network policy behavior. - - At this point, we could see a Running pod with an empty IP only if deletePod() enqueued it, yet the current pod is Running when we get it from podLister. - - The old pod would have state similar to: - status: Running - IP: empty - conditions: - - [{Type:Initialized Status:True LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2022-08-08 17:12:41 +0000 GMT Reason: Message:}] - - [{Type:Ready Status:False LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2022-08-08 21:25:46 +0000 GMT Reason:ContainersNotReady ...}] - - [{Type:ContainersReady Status:False LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2022-08-08 21:30:01 +0000 GMT Reason:ContainersNotReady ...}] - - [{Type:PodScheduled Status:True LastProbeTime:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2022-08-08 17:12:41 +0000 GMT Reason: Message:}] - */ - if pod.Status.PodIP == "" { - if npmPodExists { - klog.Infof("[syncPod] cleaning up cached, non-complete pod with empty IP. pod: %s", key) - operationKind = metrics.DeleteOp - if err = c.cleanUpDeletedPod(key); err != nil { - return fmt.Errorf("error: clean up failed when pod is running with empty IP. err: %w", err) - } - return nil - } - - // ignore update if somehow the Pod doesn't exist in the cache - return nil - } - operationKind, err = c.syncAddAndUpdatePod(pod) if err != nil { return fmt.Errorf("failed to sync pod due to %w", err) @@ -399,7 +362,6 @@ func (c *PodController) syncAddedPod(podObj *corev1.Pod) error { // Create npmPod and add it to the podMap npmPodObj := common.NewNpmPod(podObj) c.podMap[podKey] = npmPodObj - c.ipMap[podObj.Status.PodIP] = podKey // Get lists of podLabelKey and podLabelKey + podLavelValue ,and then start adding them to ipsets. for labelKey, labelVal := range podObj.Labels { @@ -433,15 +395,6 @@ func (c *PodController) syncAddAndUpdatePod(newPodObj *corev1.Pod) (metrics.Oper var err error podKey, _ := cache.MetaNamespaceKeyFunc(newPodObj) - oldPodKeyForIP := c.ipMap[newPodObj.Status.PodIP] - if oldPodKeyForIP != podKey { - klog.Infof("[syncAddAndUpdatePod] cleaning up old pod %s for IP %s", oldPodKeyForIP, newPodObj.Status.PodIP) - err = c.cleanUpDeletedPod(oldPodKeyForIP) - if err != nil { - return metrics.DeleteOp, fmt.Errorf("[syncAddAndUpdatePod] error: failed to clean up old pod %s for IP %s with err: %w", oldPodKeyForIP, newPodObj.Status.PodIP, err) - } - } - // lock before using nsMap since nsMap is shared with namespace controller c.npmNamespaceCache.Lock() if _, exists := c.npmNamespaceCache.NsMap[newPodObj.Namespace]; !exists { @@ -613,11 +566,6 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error { } delete(c.podMap, cachedNpmPodKey) - - podKey := c.ipMap[cachedNpmPod.PodIP] - if podKey == cachedNpmPodKey { - delete(c.ipMap, cachedNpmPod.PodIP) - } return nil } From 56b79949d06335359e3d089479b477f3adedaa31 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 16 Aug 2022 18:28:54 -0700 Subject: [PATCH 05/18] enqueue updates with empty IPs and add prometheus metric --- npm/metrics/pods.go | 14 ++++++++ npm/metrics/pods_test.go | 17 +++++++++- npm/metrics/prometheus-metrics.go | 33 +++++++++++++++---- npm/metrics/prometheus-values.go | 16 +++++++++ .../controllers/v2/podController.go | 33 +++++++++++++++---- 5 files changed, 98 insertions(+), 15 deletions(-) diff --git a/npm/metrics/pods.go b/npm/metrics/pods.go index 09d8ec5a05..c422539423 100644 --- a/npm/metrics/pods.go +++ b/npm/metrics/pods.go @@ -1,5 +1,7 @@ package metrics +import "github.com/prometheus/client_golang/prometheus" + // RecordControllerPodExecTime adds an observation of pod exec time for the specified operation (unless the operation is NoOp). // The execution time is from the timer's start until now. func RecordControllerPodExecTime(timer *Timer, op OperationKind, hadError bool) { @@ -11,3 +13,15 @@ func RecordControllerPodExecTime(timer *Timer, op OperationKind, hadError bool) func GetControllerPodExecCount(op OperationKind, hadError bool) (int, error) { return getCountVecValue(controllerPodExecTime, getCRUDExecTimeLabels(op, hadError)) } + +func IncPodEventCount(op OperationKind) { + podEventCount.With(getPodEventCountLabels(op)).Inc() +} + +func getPodEventCount(op OperationKind) (int, error) { + return getCounterVecValue(podEventCount, getPodEventCountLabels(op)) +} + +func getPodEventCountLabels(op OperationKind) prometheus.Labels { + return prometheus.Labels{operationLabel: string(op)} +} diff --git a/npm/metrics/pods_test.go b/npm/metrics/pods_test.go index ffcfbd8600..056a75e7f2 100644 --- a/npm/metrics/pods_test.go +++ b/npm/metrics/pods_test.go @@ -1,6 +1,11 @@ package metrics -import "testing" +import ( + "testing" + + "github.com/Azure/azure-container-networking/npm/metrics/promutil" + "github.com/stretchr/testify/require" +) func TestRecordControllerPodExecTime(t *testing.T) { testStopAndRecordCRUDExecTime(t, &crudExecMetric{ @@ -8,3 +13,13 @@ func TestRecordControllerPodExecTime(t *testing.T) { GetControllerPodExecCount, }) } + +func TestIncPodEventCount(t *testing.T) { + InitializeAll() + for _, op := range []OperationKind{CreateOp} { //, UpdateOp, DeleteOp, UpdateWithEmptyIPOp} { + IncPodEventCount(op) + val, err := getPodEventCount(op) + promutil.NotifyIfErrors(t, err) + require.Equal(t, 1, val, "expected metric count to be incremented for op: %s", op) + } +} diff --git a/npm/metrics/prometheus-metrics.go b/npm/metrics/prometheus-metrics.go index ad930d9410..3e26db0b43 100644 --- a/npm/metrics/prometheus-metrics.go +++ b/npm/metrics/prometheus-metrics.go @@ -54,14 +54,16 @@ const ( namespaceExecTimeName = "namespace_exec_time" controllerNamespaceExecTimeHelp = "Execution time in milliseconds for adding/updating/deleting a namespace" - // TODO add health metrics - quantileMedian float64 = 0.5 deltaMedian float64 = 0.05 quantile90th float64 = 0.9 delta90th float64 = 0.01 quantil99th float64 = 0.99 delta99th float64 = 0.001 + + // controller workqueue metrics + podEventCountName = "pod_event_count" + podEventCountHelp = "The total number of pod events ever added to the controller workqueue" ) // Gauge metrics have the methods Inc(), Dec(), and Set(float64) @@ -95,7 +97,9 @@ var ( controllerNamespaceExecTime *prometheus.SummaryVec controllerExecTimeLabels = []string{operationLabel, hadErrorLabel} - // TODO add health metrics + // controller workqueue metrics + podEventCount *prometheus.CounterVec + podEventCountLabels = []string{operationLabel} ) type RegistryType string @@ -112,6 +116,8 @@ const ( UpdateOp OperationKind = "update" DeleteOp OperationKind = "delete" NoOp OperationKind = "noop" + // UpdateWithEmptyIPOp is intended to be used for the PodEvent counter only + UpdateWithEmptyIPOp OperationKind = "update-with-empty-ip" ) func (op OperationKind) isValid() bool { @@ -128,14 +134,11 @@ func (op OperationKind) isValid() bool { // TODO consider refactoring the functionality of the metrics package into a "Metrics" struct with methods (this would require code changes throughout npm). // Would need to consider how it seems like you can't register a metric twice, even in a separate registry, so you couldn't throw away the Metrics struct and create a new one. func InitializeAll() { - // TODO introduce isFanOut parameter to determine when to create fan-out controller/daemon metrics if haveInitialized { klog.Infof("metrics have already been initialized") } else { initializeDaemonMetrics() initializeControllerMetrics() - // TODO include dataplane health metrics: - // num failures for apply ipsets, updating policies, deleting policies, and running periodic policy tasks, etc. log.Logf("Finished initializing all Prometheus metrics") haveInitialized = true } @@ -177,7 +180,9 @@ func initializeDaemonMetrics() { func initializeControllerMetrics() { // CLUSTER METRICS numPolicies = createClusterGauge(numPoliciesName, numPoliciesHelp) - // TODO include health metrics: num failures for validating policies & ipsets + + // controller workqueue metrics + podEventCount = newPodEventCount() // NODE METRICS addPolicyExecTime = createNodeSummaryVec(addPolicyExecTimeName, "", addPolicyExecTimeHelp, addPolicyExecTimeLabels) @@ -261,3 +266,17 @@ func createNodeSummaryVec(name, subsystem, helpMessage string, labels []string) func createControllerExecTimeSummaryVec(name, helpMessage string) *prometheus.SummaryVec { return createNodeSummaryVec(name, controllerPrefix, helpMessage, controllerExecTimeLabels) } + +func newPodEventCount() *prometheus.CounterVec { + counter := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: controllerPrefix, + Name: podEventCountName, + Help: podEventCountHelp, + }, + podEventCountLabels, + ) + register(counter, podEventCountName, ClusterMetrics) + return counter +} diff --git a/npm/metrics/prometheus-values.go b/npm/metrics/prometheus-values.go index 53a265154b..65c2f5fcac 100644 --- a/npm/metrics/prometheus-values.go +++ b/npm/metrics/prometheus-values.go @@ -35,6 +35,14 @@ func getCountValue(collector prometheus.Collector) (int, error) { return int(dtoMetric.Summary.GetSampleCount()), nil } +func getCounterValue(collector prometheus.Collector) (int, error) { + dtoMetric, err := getDTOMetric(collector) + if err != nil { + return 0, err + } + return int(dtoMetric.Counter.GetValue()), nil +} + func getCountVecValue(summaryVecMetric *prometheus.SummaryVec, labels prometheus.Labels) (int, error) { collector, ok := summaryVecMetric.With(labels).(prometheus.Collector) if !ok { @@ -43,6 +51,14 @@ func getCountVecValue(summaryVecMetric *prometheus.SummaryVec, labels prometheus return getCountValue(collector) } +func getCounterVecValue(counterVecMetric *prometheus.CounterVec, labels prometheus.Labels) (int, error) { + collector, ok := counterVecMetric.With(labels).(prometheus.Collector) + if !ok { + return 0, errNotCollector + } + return getCounterValue(collector) +} + func getCRUDExecTimeLabels(op OperationKind, hadError bool) prometheus.Labels { hadErrorVal := "false" if hadError { diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index b9a84e0b7b..2bc573ea77 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -14,7 +14,6 @@ import ( "github.com/Azure/azure-container-networking/npm/pkg/dataplane" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" "github.com/Azure/azure-container-networking/npm/util" - npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -33,9 +32,19 @@ type NamedPortOperation string const ( deleteNamedPort NamedPortOperation = "del" addNamedPort NamedPortOperation = "add" + + addEvent string = "ADD" + updateEvent string = "UPDATE" ) -var kubeAllNamespaces = &ipsets.IPSetMetadata{Name: util.KubeAllNamespacesFlag, Type: ipsets.KeyLabelOfNamespace} +var ( + kubeAllNamespaces = &ipsets.IPSetMetadata{Name: util.KubeAllNamespacesFlag, Type: ipsets.KeyLabelOfNamespace} + + eventOperations = map[string]metrics.OperationKind{ + addEvent: metrics.CreateOp, + updateEvent: metrics.UpdateOp, + } +) type PodController struct { podLister corelisters.PodLister @@ -92,8 +101,13 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo return key, needSync } + op := eventOperations[eventType] if !hasValidPodIP(podObj) { - return key, needSync + if eventType == addEvent { + return key, needSync + } else { + op = metrics.UpdateWithEmptyIPOp + } } if isHostNetworkPod(podObj) { @@ -108,12 +122,13 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo return key, needSync } + metrics.IncPodEventCount(op) needSync = true return key, needSync } func (c *PodController) addPod(obj interface{}) { - key, needSync := c.needSync("ADD", obj) + key, needSync := c.needSync(addEvent, obj) if !needSync { return } @@ -129,7 +144,7 @@ func (c *PodController) addPod(obj interface{}) { } func (c *PodController) updatePod(old, newp interface{}) { - key, needSync := c.needSync("UPDATE", newp) + key, needSync := c.needSync(updateEvent, newp) if !needSync { return } @@ -340,10 +355,14 @@ func (c *PodController) syncAddedPod(podObj *corev1.Pod) error { podObj.Name, podObj.Spec.NodeName, podObj.Labels, podObj.Status.PodIP) if !util.IsIPV4(podObj.Status.PodIP) { - msg := fmt.Sprintf("[syncAddedPod] Error: ADD POD [%s/%s/%s/%+v/%s] failed as the PodIP is not valid ipv4 address", podObj.Namespace, + msg := fmt.Sprintf("[syncAddedPod] Error: ADD POD [%s/%s/%s/%+v] failed as the PodIP is not valid ipv4 address. ip: [%s]", podObj.Namespace, podObj.Name, podObj.Spec.NodeName, podObj.Labels, podObj.Status.PodIP) metrics.SendErrorLogAndMetric(util.PodID, msg) - return npmerrors.Errorf(npmerrors.AddPod, true, msg) + // return nil so that we don't requeue. + // Wait until an update event comes from API Server where the IP is valid e.g. if the IP is empty. + // There may be latency in receiving the update event versus retrying on our own, + // but this prevents us from retrying indefinitely for pods stuck in Running state with no IP as seen in AKS Windows Server '22. + return nil } var err error From 66d3cc8eb23416efaa75e15254f6c9a9da77e6a6 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 16 Aug 2022 18:47:49 -0700 Subject: [PATCH 06/18] fix lints --- npm/metrics/pods.go | 10 +++++----- npm/metrics/pods_test.go | 8 ++++---- npm/metrics/prometheus-metrics.go | 14 +++++++------- npm/metrics/prometheus-values.go | 6 +++--- .../controlplane/controllers/v2/podController.go | 5 ++--- 5 files changed, 21 insertions(+), 22 deletions(-) diff --git a/npm/metrics/pods.go b/npm/metrics/pods.go index c422539423..55c8bf6160 100644 --- a/npm/metrics/pods.go +++ b/npm/metrics/pods.go @@ -14,14 +14,14 @@ func GetControllerPodExecCount(op OperationKind, hadError bool) (int, error) { return getCountVecValue(controllerPodExecTime, getCRUDExecTimeLabels(op, hadError)) } -func IncPodEventCount(op OperationKind) { - podEventCount.With(getPodEventCountLabels(op)).Inc() +func IncPodEventTotal(op OperationKind) { + podEventCount.With(getPodEventTotalLabels(op)).Inc() } -func getPodEventCount(op OperationKind) (int, error) { - return getCounterVecValue(podEventCount, getPodEventCountLabels(op)) +func getPodEventTotal(op OperationKind) (int, error) { + return getTotalVecValue(podEventCount, getPodEventTotalLabels(op)) } -func getPodEventCountLabels(op OperationKind) prometheus.Labels { +func getPodEventTotalLabels(op OperationKind) prometheus.Labels { return prometheus.Labels{operationLabel: string(op)} } diff --git a/npm/metrics/pods_test.go b/npm/metrics/pods_test.go index 056a75e7f2..dd5a8f55af 100644 --- a/npm/metrics/pods_test.go +++ b/npm/metrics/pods_test.go @@ -14,11 +14,11 @@ func TestRecordControllerPodExecTime(t *testing.T) { }) } -func TestIncPodEventCount(t *testing.T) { +func TestIncPodEventTotal(t *testing.T) { InitializeAll() - for _, op := range []OperationKind{CreateOp} { //, UpdateOp, DeleteOp, UpdateWithEmptyIPOp} { - IncPodEventCount(op) - val, err := getPodEventCount(op) + for _, op := range []OperationKind{CreateOp, UpdateOp, DeleteOp, UpdateWithEmptyIPOp} { + IncPodEventTotal(op) + val, err := getPodEventTotal(op) promutil.NotifyIfErrors(t, err) require.Equal(t, 1, val, "expected metric count to be incremented for op: %s", op) } diff --git a/npm/metrics/prometheus-metrics.go b/npm/metrics/prometheus-metrics.go index 3e26db0b43..9e5bee5820 100644 --- a/npm/metrics/prometheus-metrics.go +++ b/npm/metrics/prometheus-metrics.go @@ -62,8 +62,8 @@ const ( delta99th float64 = 0.001 // controller workqueue metrics - podEventCountName = "pod_event_count" - podEventCountHelp = "The total number of pod events ever added to the controller workqueue" + podEventTotalName = "pod_event_total" + podEventTotalHelp = "The total number of pod events ever added to the controller workqueue" ) // Gauge metrics have the methods Inc(), Dec(), and Set(float64) @@ -99,7 +99,7 @@ var ( // controller workqueue metrics podEventCount *prometheus.CounterVec - podEventCountLabels = []string{operationLabel} + podEventTotalLabels = []string{operationLabel} ) type RegistryType string @@ -272,11 +272,11 @@ func newPodEventCount() *prometheus.CounterVec { prometheus.CounterOpts{ Namespace: namespace, Subsystem: controllerPrefix, - Name: podEventCountName, - Help: podEventCountHelp, + Name: podEventTotalName, + Help: podEventTotalHelp, }, - podEventCountLabels, + podEventTotalLabels, ) - register(counter, podEventCountName, ClusterMetrics) + register(counter, podEventTotalName, ClusterMetrics) return counter } diff --git a/npm/metrics/prometheus-values.go b/npm/metrics/prometheus-values.go index 65c2f5fcac..8991d11d12 100644 --- a/npm/metrics/prometheus-values.go +++ b/npm/metrics/prometheus-values.go @@ -35,7 +35,7 @@ func getCountValue(collector prometheus.Collector) (int, error) { return int(dtoMetric.Summary.GetSampleCount()), nil } -func getCounterValue(collector prometheus.Collector) (int, error) { +func getTotal(collector prometheus.Collector) (int, error) { dtoMetric, err := getDTOMetric(collector) if err != nil { return 0, err @@ -51,12 +51,12 @@ func getCountVecValue(summaryVecMetric *prometheus.SummaryVec, labels prometheus return getCountValue(collector) } -func getCounterVecValue(counterVecMetric *prometheus.CounterVec, labels prometheus.Labels) (int, error) { +func getTotalVecValue(counterVecMetric *prometheus.CounterVec, labels prometheus.Labels) (int, error) { collector, ok := counterVecMetric.With(labels).(prometheus.Collector) if !ok { return 0, errNotCollector } - return getCounterValue(collector) + return getTotal(collector) } func getCRUDExecTimeLabels(op OperationKind, hadError bool) prometheus.Labels { diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index 2bc573ea77..b32f0a5e54 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -105,9 +105,8 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo if !hasValidPodIP(podObj) { if eventType == addEvent { return key, needSync - } else { - op = metrics.UpdateWithEmptyIPOp } + op = metrics.UpdateWithEmptyIPOp } if isHostNetworkPod(podObj) { @@ -122,7 +121,7 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo return key, needSync } - metrics.IncPodEventCount(op) + metrics.IncPodEventTotal(op) needSync = true return key, needSync } From b0c5bc0765c98d64abda606ca339da8e5c5b789e Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Wed, 17 Aug 2022 17:31:03 -0700 Subject: [PATCH 07/18] handle pod assigned to wrong endpoint edge case --- .../controllers/v2/podController.go | 2 +- npm/pkg/dataplane/dataplane_windows.go | 64 +++++++++++++++---- npm/pkg/dataplane/types.go | 20 ++++-- 3 files changed, 66 insertions(+), 20 deletions(-) diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index b32f0a5e54..0728e068ab 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -553,7 +553,7 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error { } var err error - cachedPodMetadata := dataplane.NewPodMetadata(cachedNpmPodKey, cachedNpmPod.PodIP, "") + cachedPodMetadata := dataplane.NewPodMetadataMarkedForDelete(cachedNpmPodKey, cachedNpmPod.PodIP, "") // Delete the pod from its namespace's ipset. // note: NodeName empty is not going to call update pod if err = c.dp.RemoveFromSets( diff --git a/npm/pkg/dataplane/dataplane_windows.go b/npm/pkg/dataplane/dataplane_windows.go index ea6b55888d..fba9047322 100644 --- a/npm/pkg/dataplane/dataplane_windows.go +++ b/npm/pkg/dataplane/dataplane_windows.go @@ -20,7 +20,7 @@ const ( var ( errPolicyModeUnsupported = errors.New("only IPSet policy mode is supported") - errMismanagedPodKey = errors.New("the pod key was not managed correctly when refreshing pod endpoints") + errMismanagedPodKey = errors.New("the endpoint corresponds to a different pod") ) // initializeDataPlane will help gather network and endpoint details @@ -105,9 +105,10 @@ func (dp *DataPlane) shouldUpdatePod() bool { // 2. Will check for existing applicable network policies and applies it on endpoint func (dp *DataPlane) updatePod(pod *updateNPMPod) error { klog.Infof("[DataPlane] updatePod called for Pod Key %s", pod.PodKey) - // Check if pod is part of this node - if pod.NodeName != dp.nodeName { - klog.Infof("[DataPlane] ignoring update pod as expected Node: [%s] got: [%s]", dp.nodeName, pod.NodeName) + if pod.NodeName != dp.nodeName && !pod.markedForDelete { + // Ignore updates if the pod is not part of this node. + // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. + klog.Infof("[DataPlane] ignoring update pod as expected Node: [%s] got: [%s]. pod: [%s]", dp.nodeName, pod.NodeName, pod.PodKey) return nil } @@ -122,20 +123,57 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error { if !ok { // ignore this err and pod endpoint will be deleted in ApplyDP // if the endpoint is not found, it means the pod is not part of this node or pod got deleted. - klog.Warningf("[DataPlane] did not find endpoint with IPaddress %s", pod.PodIP) + klog.Warningf("[DataPlane] did not find endpoint with IPaddress %s for pod %s", pod.PodIP, pod.PodKey) return nil } - if endpoint.podKey == unspecifiedPodKey { - // while refreshing pod endpoints, newly discovered endpoints are given an unspecified pod key - if endpoint.isStalePodKey(pod.PodKey) { - // NOTE: if a pod restarts and takes up its previous IP, then its endpoint would be new and this branch would be taken. - // Updates to this pod would not occur. Pod IPs are expected to change on restart though. - // See: https://stackoverflow.com/questions/52362514/when-will-the-kubernetes-pod-ip-change - // If a pod does restart and take up its previous IP, then the pod can be deleted/restarted to mitigate this problem. - klog.Infof("ignoring pod update since pod with key %s is stale and likely was deleted", pod.PodKey) + // While refreshing pod endpoints, newly discovered endpoints are given an unspecified pod key. + // Additionally, a pod key may be stale if the pod was wrongly assigned to the endpoint for this scenario: + // 1. pod A previously had IP i and EP x + // 2. pod A restarts w/ no ip AND NPM restarts AND pod B comes up with the same IP i and EP y + // 3. controller processes an update event for pod A with IP i before the update event for pod B with IP i, so pod A is wrongly assigned to EP y + if endpoint.isStalePodKey(pod.PodKey) { + // NOTE: if a pod restarts and takes up its previous IP, then its endpoint would be new and this branch would be taken. + // Updates to this pod would not occur. Pod IPs are expected to change on restart though. + // See: https://stackoverflow.com/questions/52362514/when-will-the-kubernetes-pod-ip-change + // If a pod does restart and take up its previous IP, then the pod can be deleted/restarted to mitigate this problem. + klog.Infof("ignoring pod update since pod with key %s is stale and likely was deleted for endpoint %s", pod.PodKey, endpoint.ID) + return nil + } + + // handle scenario where pod marked for delete + if pod.markedForDelete { + // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. + if endpoint.podKey != pod.PodKey { + klog.Infof( + "[DataPlane] ignoring update pod since pod is marked for delete and the pod isn't assigned to this endpoint. pod: %s. endpoint ID: %s. endpoint pod key: %s", + pod.PodKey, endpoint.ID, endpoint.PodKey) return nil } + + endpoint.stalePodKey = &staleKey{ + key: ep.PodKey, + timestamp: time.Now().Unix(), + } + endpoint.podKey = unspecifiedPodKey + + // remove all policies on the endpoint + for policyKey := range endpoint.netPolReference { + // Delete the network policy + endpointList := map[string]string{ + endpoint.ip: endpoint.id, + } + err := dp.policyMgr.RemovePolicy(policyKey, endpointList) + if err != nil { + klog.Infof("[DataPlane] remove policy unsuccessful for pod marked for delete. policy key: %s. endpoint ID: %s. pod key: %s", policyKey, endpoint.id, pod.PodKey) + } + delete(endpoint.netPolReference, policyKey) + } + + return nil + } + + if endpoint.podKey == unspecifiedPodKey { endpoint.podKey = pod.PodKey } else if pod.PodKey != endpoint.podKey { return fmt.Errorf("pod key mismatch. Expected: %s, Actual: %s. Error: [%w]", pod.PodKey, endpoint.podKey, errMismanagedPodKey) diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index cab116ced1..d6da0212ab 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -37,19 +37,27 @@ type updateNPMPod struct { // todo definitely requires further optimization between the intersection // of types, PodMetadata, NpmPod and corev1.pod type PodMetadata struct { - PodKey string - PodIP string - NodeName string + PodKey string + PodIP string + NodeName string + MarkedForDelete bool } func NewPodMetadata(podKey, podIP, nodeName string) *PodMetadata { return &PodMetadata{ - PodKey: podKey, - PodIP: podIP, - NodeName: nodeName, + PodKey: podKey, + PodIP: podIP, + NodeName: nodeName, + MarkedForDelete: false, } } +func NewPodMetadataMarkedForDelete(podKey, podIP, nodeName string) *PodMetadata { + pm := NewPodMetadata(podKey, podIP, nodeName) + pm.MarkedForDelete = true + return pm +} + func newUpdateNPMPod(podMetadata *PodMetadata) *updateNPMPod { return &updateNPMPod{ PodMetadata: podMetadata, From 5e708cebb17ff70fc0572168ff45103531b5c1da Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Wed, 17 Aug 2022 17:46:12 -0700 Subject: [PATCH 08/18] log and update comment --- npm/pkg/dataplane/dataplane.go | 1 + npm/pkg/dataplane/dataplane_windows.go | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index db1c99dded..4fa02d1cbd 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -185,6 +185,7 @@ func (dp *DataPlane) ApplyDataPlane() error { } if dp.shouldUpdatePod() { + // lock for podKey, pod := range dp.updatePodCache { err := dp.updatePod(pod) if err != nil { diff --git a/npm/pkg/dataplane/dataplane_windows.go b/npm/pkg/dataplane/dataplane_windows.go index fba9047322..8a7e92a0cb 100644 --- a/npm/pkg/dataplane/dataplane_windows.go +++ b/npm/pkg/dataplane/dataplane_windows.go @@ -143,14 +143,20 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error { // handle scenario where pod marked for delete if pod.markedForDelete { - // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. + // From looking at logs, it seems most likely that HNS endpoints are always updated before we receive/process a pod deletion in the controller. + // Therefore, we should never (or at least rarely) try to delete policies off of an endpoint that is getting destroyed. + // Instead, if the pod is marked for delete, we would likely only reach this code path if we encounter the situation numbered above. if endpoint.podKey != pod.PodKey { + // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. klog.Infof( "[DataPlane] ignoring update pod since pod is marked for delete and the pod isn't assigned to this endpoint. pod: %s. endpoint ID: %s. endpoint pod key: %s", pod.PodKey, endpoint.ID, endpoint.PodKey) return nil } + msg := fmt.Sprintf("[DataPlane] deleting pod and cleaning up policies from endpoint. pod: %s. endpoint: %s", pod.PodKey, endpoint.ID) + metrics.SendLog(util.DaemonDataplaneID, msg, metrics.PrintLog) + endpoint.stalePodKey = &staleKey{ key: ep.PodKey, timestamp: time.Now().Unix(), From f384257373b3cf5e66d62ce494a6724ad51fb689 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Thu, 18 Aug 2022 16:25:33 -0700 Subject: [PATCH 09/18] UTs and fixed named port + build --- .../controllers/v2/podController.go | 15 ++- .../controllers/v2/podController_test.go | 126 ++++++++++++++++-- npm/pkg/dataplane/dataplane_windows.go | 4 +- 3 files changed, 127 insertions(+), 18 deletions(-) diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index 0728e068ab..ecb80bf6ad 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -30,8 +30,9 @@ import ( type NamedPortOperation string const ( - deleteNamedPort NamedPortOperation = "del" - addNamedPort NamedPortOperation = "add" + deletePodAndNamedPort NamedPortOperation = "del-pod-and-namedport" + deleteNamedPort NamedPortOperation = "del" + addNamedPort NamedPortOperation = "add" addEvent string = "ADD" updateEvent string = "UPDATE" @@ -579,7 +580,7 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error { // Delete pod's named ports from its ipset. Need to pass true in the manageNamedPortIpsets function call if err = c.manageNamedPortIpsets( - cachedNpmPod.ContainerPorts, cachedNpmPodKey, cachedNpmPod.PodIP, "", deleteNamedPort); err != nil { + cachedNpmPod.ContainerPorts, cachedNpmPodKey, cachedNpmPod.PodIP, "", deletePodAndNamedPort); err != nil { return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from named port ipset with err: %w", err) } @@ -610,16 +611,22 @@ func (c *PodController) manageNamedPortIpsets(portList []corev1.ContainerPort, p namedPortIpsetEntry := fmt.Sprintf("%s,%s%d", podIP, protocol, port.ContainerPort) // nodename in NewPodMetadata is nil so UpdatePod is ignored - podMetadata := dataplane.NewPodMetadata(podKey, namedPortIpsetEntry, nodeName) switch namedPortOperation { case deleteNamedPort: + podMetadata := dataplane.NewPodMetadata(podKey, namedPortIpsetEntry, nodeName) if err := c.dp.RemoveFromSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(port.Name, ipsets.NamedPorts)}, podMetadata); err != nil { return fmt.Errorf("failed to remove from set when deleting named port with err %w", err) } case addNamedPort: + podMetadata := dataplane.NewPodMetadata(podKey, namedPortIpsetEntry, nodeName) if err := c.dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(port.Name, ipsets.NamedPorts)}, podMetadata); err != nil { return fmt.Errorf("failed to add to set when deleting named port with err %w", err) } + case deletePodAndNamedPort: + podMetadata := dataplane.NewPodMetadataMarkedForDelete(podKey, namedPortIpsetEntry, nodeName) + if err := c.dp.RemoveFromSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(port.Name, ipsets.NamedPorts)}, podMetadata); err != nil { + return fmt.Errorf("failed to remove from set when deleting pod and named port with err %w", err) + } } } diff --git a/npm/pkg/controlplane/controllers/v2/podController_test.go b/npm/pkg/controlplane/controllers/v2/podController_test.go index 5beb3e592f..f130e08739 100644 --- a/npm/pkg/controlplane/controllers/v2/podController_test.go +++ b/npm/pkg/controlplane/controllers/v2/podController_test.go @@ -422,12 +422,13 @@ func TestDeletePod(t *testing.T) { Return(nil).Times(1) dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) // Delete pod section - dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) - dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + deleteMetadata := dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4", "") + dp.EXPECT().RemoveFromSets(mockIPSets[:1], deleteMetadata).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], deleteMetadata).Return(nil).Times(1) dp.EXPECT(). RemoveFromSets( []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, - dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4,8080", ""), ). Return(nil).Times(1) @@ -535,12 +536,13 @@ func TestDeletePodWithTombstoneAfterAddingPod(t *testing.T) { Return(nil).Times(1) dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) // Delete pod section - dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) - dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + deleteMetadata := dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4", "") + dp.EXPECT().RemoveFromSets(mockIPSets[:1], deleteMetadata).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], deleteMetadata).Return(nil).Times(1) dp.EXPECT(). RemoveFromSets( []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, - dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4,8080", ""), ). Return(nil).Times(1) @@ -607,6 +609,96 @@ func TestLabelUpdatePod(t *testing.T) { checkNpmPodWithInput("TestLabelUpdatePod", f, newPodObj) } +func TestEmptyIPUpdate(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + oldPodObj := createPod("test-pod", "test-namespace", "0", "1.2.3.4", labels, NonHostNetwork, corev1.PodRunning) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + f.podLister = append(f.podLister, oldPodObj) + f.kubeobjects = append(f.kubeobjects, oldPodObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + newPodObj := oldPodObj.DeepCopy() + // oldPodObj.ResourceVersion value is "0" + newRV, _ := strconv.Atoi(oldPodObj.ResourceVersion) + newPodObj.ResourceVersion = fmt.Sprintf("%d", newRV+1) + // oldPodObj PodIP is "1.2.3.4" + newPodObj.Status.PodIP = "" + // Add pod section + mockIPSets := []*ipsets.IPSetMetadata{ + ipsets.NewIPSetMetadata("test-namespace", ipsets.Namespace), + ipsets.NewIPSetMetadata("app", ipsets.KeyLabelOfPod), + ipsets.NewIPSetMetadata("app:test-pod", ipsets.KeyValueLabelOfPod), + } + podMetadata1 := dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4", "") + + dp.EXPECT().AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, mockIPSets[:1]).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().AddToSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + dp.EXPECT(). + AddToSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) + // Delete pod section + deleteMetadata := dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4", "") + dp.EXPECT().RemoveFromSets(mockIPSets[:1], deleteMetadata).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], deleteMetadata).Return(nil).Times(1) + dp.EXPECT(). + RemoveFromSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + // New IP Pod add fails + + updatePod(t, f, oldPodObj, newPodObj) + + testCases := []expectedValues{ + {0, 1, 0, podPromVals{1, 1, 0}}, + } + checkPodTestResult("TestEmptyIPUpdate", f, testCases) +} + +func TestEmptyIPAdd(t *testing.T) { + labels := map[string]string{ + "app": "test-pod", + } + podObj := createPod("test-pod", "test-namespace", "0", "", labels, NonHostNetwork, corev1.PodRunning) + podKey := getKey(podObj, t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + f := newFixture(t, dp) + f.podLister = append(f.podLister, podObj) + f.kubeobjects = append(f.kubeobjects, podObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newPodController(stopCh) + + addPod(t, f, podObj) + testCases := []expectedValues{ + {0, 0, 0, podPromVals{0, 0, 0}}, + } + checkPodTestResult("TestEmptyIPAdd", f, testCases) + + if _, exists := f.podController.podMap[podKey]; exists { + t.Error("TestEmptyIPAdd failed @ cached pod obj exists check") + } +} + func TestIPAddressUpdatePod(t *testing.T) { labels := map[string]string{ "app": "test-pod", @@ -649,12 +741,13 @@ func TestIPAddressUpdatePod(t *testing.T) { Return(nil).Times(1) dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) // Delete pod section - dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) - dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + deleteMetadata := dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4", "") + dp.EXPECT().RemoveFromSets(mockIPSets[:1], deleteMetadata).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], deleteMetadata).Return(nil).Times(1) dp.EXPECT(). RemoveFromSets( []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, - dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4,8080", ""), ). Return(nil).Times(1) // New IP Pod add @@ -718,12 +811,13 @@ func TestPodStatusUpdatePod(t *testing.T) { Return(nil).Times(1) dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) // Delete pod section - dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) - dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) + deleteMetadata := dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4", "") + dp.EXPECT().RemoveFromSets(mockIPSets[:1], deleteMetadata).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], deleteMetadata).Return(nil).Times(1) dp.EXPECT(). RemoveFromSets( []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, - dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4,8080", ""), ). Return(nil).Times(1) @@ -776,6 +870,14 @@ func TestHasValidPodIP(t *testing.T) { if ok := hasValidPodIP(podObj); !ok { t.Errorf("TestisValidPod failed @ isValidPod") } + + podObj = &corev1.Pod{ + Status: corev1.PodStatus{ + Phase: "Running", + PodIP: "", + }, + } + require.False(t, hasValidPodIP(podObj)) } func TestIsCompletePod(t *testing.T) { diff --git a/npm/pkg/dataplane/dataplane_windows.go b/npm/pkg/dataplane/dataplane_windows.go index 8a7e92a0cb..ce13a0641b 100644 --- a/npm/pkg/dataplane/dataplane_windows.go +++ b/npm/pkg/dataplane/dataplane_windows.go @@ -105,7 +105,7 @@ func (dp *DataPlane) shouldUpdatePod() bool { // 2. Will check for existing applicable network policies and applies it on endpoint func (dp *DataPlane) updatePod(pod *updateNPMPod) error { klog.Infof("[DataPlane] updatePod called for Pod Key %s", pod.PodKey) - if pod.NodeName != dp.nodeName && !pod.markedForDelete { + if pod.NodeName != dp.nodeName && !pod.MarkedforDelete { // Ignore updates if the pod is not part of this node. // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. klog.Infof("[DataPlane] ignoring update pod as expected Node: [%s] got: [%s]. pod: [%s]", dp.nodeName, pod.NodeName, pod.PodKey) @@ -142,7 +142,7 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error { } // handle scenario where pod marked for delete - if pod.markedForDelete { + if pod.MarkedforDelete { // From looking at logs, it seems most likely that HNS endpoints are always updated before we receive/process a pod deletion in the controller. // Therefore, we should never (or at least rarely) try to delete policies off of an endpoint that is getting destroyed. // Instead, if the pod is marked for delete, we would likely only reach this code path if we encounter the situation numbered above. From 094069fdc48e9a43ac4305260603c585c153027a Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Thu, 18 Aug 2022 16:37:27 -0700 Subject: [PATCH 10/18] reset entire endpoint regardless of cache --- npm/pkg/dataplane/dataplane_windows.go | 13 +++---------- npm/pkg/dataplane/policies/policymanager.go | 7 +++++++ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/npm/pkg/dataplane/dataplane_windows.go b/npm/pkg/dataplane/dataplane_windows.go index ce13a0641b..95a614f10e 100644 --- a/npm/pkg/dataplane/dataplane_windows.go +++ b/npm/pkg/dataplane/dataplane_windows.go @@ -164,17 +164,10 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error { endpoint.podKey = unspecifiedPodKey // remove all policies on the endpoint - for policyKey := range endpoint.netPolReference { - // Delete the network policy - endpointList := map[string]string{ - endpoint.ip: endpoint.id, - } - err := dp.policyMgr.RemovePolicy(policyKey, endpointList) - if err != nil { - klog.Infof("[DataPlane] remove policy unsuccessful for pod marked for delete. policy key: %s. endpoint ID: %s. pod key: %s", policyKey, endpoint.id, pod.PodKey) - } - delete(endpoint.netPolReference, policyKey) + if err := dp.policyMgr.ResetEndpoint(endpoint.id); err != nil { + klog.Infof("[DataPlane] resetting endpoint policies unsuccessful for pod marked for delete. endpoint ID: %s. pod key: %s", endpoint.id, pod.PodKey) } + endpoint.netPolReference = make(map[string]string) return nil } diff --git a/npm/pkg/dataplane/policies/policymanager.go b/npm/pkg/dataplane/policies/policymanager.go index a1ae22bcf6..829c36da8c 100644 --- a/npm/pkg/dataplane/policies/policymanager.go +++ b/npm/pkg/dataplane/policies/policymanager.go @@ -66,6 +66,13 @@ func NewPolicyManager(ioShim *common.IOShim, cfg *PolicyManagerCfg) *PolicyManag } } +func (pMgr *PolicyManager) ResetEndpoint(epID string) error { + if util.IsWindowsDP() { + return pMgr.bootup([]string{epID}) + } + return nil +} + func (pMgr *PolicyManager) Bootup(epIDs []string) error { metrics.ResetNumACLRules() if err := pMgr.bootup(epIDs); err != nil { From d15462311b9e58dabbcf1247d2ce032dba6c2f72 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Thu, 18 Aug 2022 16:38:24 -0700 Subject: [PATCH 11/18] remove comment in dp.go --- npm/pkg/dataplane/dataplane.go | 1 - 1 file changed, 1 deletion(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 4fa02d1cbd..db1c99dded 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -185,7 +185,6 @@ func (dp *DataPlane) ApplyDataPlane() error { } if dp.shouldUpdatePod() { - // lock for podKey, pod := range dp.updatePodCache { err := dp.updatePod(pod) if err != nil { From 31ac38641670e871c06576f29226deb61e9835a0 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Thu, 18 Aug 2022 16:54:32 -0700 Subject: [PATCH 12/18] fix windows build issues --- npm/pkg/dataplane/dataplane_windows.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/npm/pkg/dataplane/dataplane_windows.go b/npm/pkg/dataplane/dataplane_windows.go index 95a614f10e..5e3b222483 100644 --- a/npm/pkg/dataplane/dataplane_windows.go +++ b/npm/pkg/dataplane/dataplane_windows.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" "github.com/Azure/azure-container-networking/npm/util" npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" @@ -105,7 +106,7 @@ func (dp *DataPlane) shouldUpdatePod() bool { // 2. Will check for existing applicable network policies and applies it on endpoint func (dp *DataPlane) updatePod(pod *updateNPMPod) error { klog.Infof("[DataPlane] updatePod called for Pod Key %s", pod.PodKey) - if pod.NodeName != dp.nodeName && !pod.MarkedforDelete { + if pod.NodeName != dp.nodeName && !pod.MarkedForDelete { // Ignore updates if the pod is not part of this node. // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. klog.Infof("[DataPlane] ignoring update pod as expected Node: [%s] got: [%s]. pod: [%s]", dp.nodeName, pod.NodeName, pod.PodKey) @@ -137,12 +138,12 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error { // Updates to this pod would not occur. Pod IPs are expected to change on restart though. // See: https://stackoverflow.com/questions/52362514/when-will-the-kubernetes-pod-ip-change // If a pod does restart and take up its previous IP, then the pod can be deleted/restarted to mitigate this problem. - klog.Infof("ignoring pod update since pod with key %s is stale and likely was deleted for endpoint %s", pod.PodKey, endpoint.ID) + klog.Infof("ignoring pod update since pod with key %s is stale and likely was deleted for endpoint %s", pod.PodKey, endpoint.id) return nil } // handle scenario where pod marked for delete - if pod.MarkedforDelete { + if pod.MarkedForDelete { // From looking at logs, it seems most likely that HNS endpoints are always updated before we receive/process a pod deletion in the controller. // Therefore, we should never (or at least rarely) try to delete policies off of an endpoint that is getting destroyed. // Instead, if the pod is marked for delete, we would likely only reach this code path if we encounter the situation numbered above. @@ -150,15 +151,15 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error { // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. klog.Infof( "[DataPlane] ignoring update pod since pod is marked for delete and the pod isn't assigned to this endpoint. pod: %s. endpoint ID: %s. endpoint pod key: %s", - pod.PodKey, endpoint.ID, endpoint.PodKey) + pod.PodKey, endpoint.id, endpoint.podKey) return nil } - msg := fmt.Sprintf("[DataPlane] deleting pod and cleaning up policies from endpoint. pod: %s. endpoint: %s", pod.PodKey, endpoint.ID) + msg := fmt.Sprintf("[DataPlane] deleting pod and cleaning up policies from endpoint. pod: %s. endpoint: %s", pod.PodKey, endpoint.id) metrics.SendLog(util.DaemonDataplaneID, msg, metrics.PrintLog) endpoint.stalePodKey = &staleKey{ - key: ep.PodKey, + key: endpoint.podKey, timestamp: time.Now().Unix(), } endpoint.podKey = unspecifiedPodKey @@ -167,7 +168,7 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error { if err := dp.policyMgr.ResetEndpoint(endpoint.id); err != nil { klog.Infof("[DataPlane] resetting endpoint policies unsuccessful for pod marked for delete. endpoint ID: %s. pod key: %s", endpoint.id, pod.PodKey) } - endpoint.netPolReference = make(map[string]string) + endpoint.netPolReference = make(map[string]struct{}) return nil } From b6287cf38b97c37415c1e78d9349b261885eca76 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Fri, 19 Aug 2022 13:07:51 -0700 Subject: [PATCH 13/18] skip refreshing endpoints and address comments --- .../controllers/v2/podController.go | 4 +- npm/pkg/dataplane/dataplane_windows.go | 136 +++++++++++++----- npm/pkg/dataplane/types.go | 10 +- 3 files changed, 107 insertions(+), 43 deletions(-) diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index ecb80bf6ad..7327e37f54 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -355,9 +355,9 @@ func (c *PodController) syncAddedPod(podObj *corev1.Pod) error { podObj.Name, podObj.Spec.NodeName, podObj.Labels, podObj.Status.PodIP) if !util.IsIPV4(podObj.Status.PodIP) { - msg := fmt.Sprintf("[syncAddedPod] Error: ADD POD [%s/%s/%s/%+v] failed as the PodIP is not valid ipv4 address. ip: [%s]", podObj.Namespace, + msg := fmt.Sprintf("[syncAddedPod] warning: ADD POD [%s/%s/%s/%+v] ignored as the PodIP is not valid ipv4 address. ip: [%s]", podObj.Namespace, podObj.Name, podObj.Spec.NodeName, podObj.Labels, podObj.Status.PodIP) - metrics.SendErrorLogAndMetric(util.PodID, msg) + metrics.SendLog(util.PodID, msg, metrics.PrintLog) // return nil so that we don't requeue. // Wait until an update event comes from API Server where the IP is valid e.g. if the IP is empty. // There may be latency in receiving the update event versus retrying on our own, diff --git a/npm/pkg/dataplane/dataplane_windows.go b/npm/pkg/dataplane/dataplane_windows.go index 5e3b222483..16d0c6c42a 100644 --- a/npm/pkg/dataplane/dataplane_windows.go +++ b/npm/pkg/dataplane/dataplane_windows.go @@ -103,10 +103,16 @@ func (dp *DataPlane) shouldUpdatePod() bool { // updatePod has two responsibilities in windows // 1. Will call into dataplane and updates endpoint references of this pod. -// 2. Will check for existing applicable network policies and applies it on endpoint +// 2. Will check for existing applicable network policies and applies it on endpoint. +// Assumptions: +// Not possible to know a pod's IP before endpoints are refreshed (i.e. IP has an endpoint ID switch means that the pod assigned to the first ID is dead) func (dp *DataPlane) updatePod(pod *updateNPMPod) error { + if pod.isMarkedForDelete() { + return dp.updatePodForDelete(pod) + } + klog.Infof("[DataPlane] updatePod called for Pod Key %s", pod.PodKey) - if pod.NodeName != dp.nodeName && !pod.MarkedForDelete { + if pod.NodeName != dp.nodeName { // Ignore updates if the pod is not part of this node. // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. klog.Infof("[DataPlane] ignoring update pod as expected Node: [%s] got: [%s]. pod: [%s]", dp.nodeName, pod.NodeName, pod.PodKey) @@ -129,47 +135,12 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error { } // While refreshing pod endpoints, newly discovered endpoints are given an unspecified pod key. - // Additionally, a pod key may be stale if the pod was wrongly assigned to the endpoint for this scenario: - // 1. pod A previously had IP i and EP x - // 2. pod A restarts w/ no ip AND NPM restarts AND pod B comes up with the same IP i and EP y - // 3. controller processes an update event for pod A with IP i before the update event for pod B with IP i, so pod A is wrongly assigned to EP y if endpoint.isStalePodKey(pod.PodKey) { // NOTE: if a pod restarts and takes up its previous IP, then its endpoint would be new and this branch would be taken. // Updates to this pod would not occur. Pod IPs are expected to change on restart though. // See: https://stackoverflow.com/questions/52362514/when-will-the-kubernetes-pod-ip-change // If a pod does restart and take up its previous IP, then the pod can be deleted/restarted to mitigate this problem. - klog.Infof("ignoring pod update since pod with key %s is stale and likely was deleted for endpoint %s", pod.PodKey, endpoint.id) - return nil - } - - // handle scenario where pod marked for delete - if pod.MarkedForDelete { - // From looking at logs, it seems most likely that HNS endpoints are always updated before we receive/process a pod deletion in the controller. - // Therefore, we should never (or at least rarely) try to delete policies off of an endpoint that is getting destroyed. - // Instead, if the pod is marked for delete, we would likely only reach this code path if we encounter the situation numbered above. - if endpoint.podKey != pod.PodKey { - // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. - klog.Infof( - "[DataPlane] ignoring update pod since pod is marked for delete and the pod isn't assigned to this endpoint. pod: %s. endpoint ID: %s. endpoint pod key: %s", - pod.PodKey, endpoint.id, endpoint.podKey) - return nil - } - - msg := fmt.Sprintf("[DataPlane] deleting pod and cleaning up policies from endpoint. pod: %s. endpoint: %s", pod.PodKey, endpoint.id) - metrics.SendLog(util.DaemonDataplaneID, msg, metrics.PrintLog) - - endpoint.stalePodKey = &staleKey{ - key: endpoint.podKey, - timestamp: time.Now().Unix(), - } - endpoint.podKey = unspecifiedPodKey - - // remove all policies on the endpoint - if err := dp.policyMgr.ResetEndpoint(endpoint.id); err != nil { - klog.Infof("[DataPlane] resetting endpoint policies unsuccessful for pod marked for delete. endpoint ID: %s. pod key: %s", endpoint.id, pod.PodKey) - } - endpoint.netPolReference = make(map[string]struct{}) - + klog.Infof("[DataPlane] ignoring pod update since pod with key %s is stale and likely was deleted for endpoint %s", pod.PodKey, endpoint.id) return nil } @@ -257,6 +228,94 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error { return nil } +// updatePodForDelete resets the endpoint corresponding to the pod's IP. +// This functionality is required for the edge case below. +// Assumptions: +// Not possible to know a pod's IP before endpoints are refreshed (i.e. IP has an endpoint ID switch means that the pod assigned to the first ID is dead) +/* + Notes on the edge case in memory-starved Windows Server '22 where: + - pod A previously had IP and EP x + - around same time: + - pod A restarts + - NPM restarts + - pod B comes up with same IP and EP y + - controller events can be jumbled e.g. an update event for pod A w/ its old IP can happen before the pod B create event + + To start, have a new EP with an unspecified pod key: + 1. B --> A w/ IP --> A's cleanup: + - in updatePod(), proceed as usual for B + - in updatePod(), get a mismanaged err for A, which is good. Requeued in controller + - eventually, the pod will be cleaned up + - A's cleanup should be ignored in updatePodForDelete(), or else the controller will requeue + 2. A w/ IP --> B comes up --> A's cleanup: + - in updatePod(), A's policies applied to EP + - in updatePod(), get a get a mismanaged err for B. Requeued in controller + - in updatePodForDelete(), A's cleanup triggers 1) policies removed and 2) endpoint marked unspecified with A stale + - on B retry, all ipsets are in the podupdatecache + - in updatePod(), proceed as usual for B + 3. A w/ IP--> A's cleanup --> B comes up: + - in updatePod(), A's policies applied to EP + - in updatePodForDelete(), A's cleanup triggers 1) policies removed and 2) endpoint marked unspecified with A stale + - in updatePod(), proceed as usual for B + + From looking at logs, it seems most likely that HNS endpoints are always updated before we receive/process a pod deletion in the controller. + Therefore, we should never (or at least rarely) try to delete policies off of an endpoint that is getting destroyed. + Instead, if the pod is marked for delete, we would likely only reach this code path if we encounter number 2 or 3. +*/ +func (dp *DataPlane) updatePodForDelete(pod *updateNPMPod) error { + // No need to make compute-intensive refreshAllPodEndpoints() call. + // Instead, only get the HNS endpoint within ResetEndpoint(). + // This function will handle the case where the endpoint doesn't exist anymore, + // and we will ignore the reset if the pod key is stale (the endpoint belongs to another pod), + // i.e. an updatePod() call came in for another Pod of the same IP. + klog.Infof("[DataPlane] updatePodForDelete called for Pod Key %s", pod.PodKey) + + // Check if pod is already present in cache + endpoint, ok := dp.endpointCache[pod.PodIP] + if !ok { + // ignore this err and pod endpoint will be deleted in ApplyDP + // if the endpoint is not found, it means the pod is not part of this node or pod got deleted. + klog.Warningf("[DataPlane] for pod marked for delete, did not find endpoint with IPaddress %s for pod %s", pod.PodIP, pod.PodKey) + return nil + } + + // While refreshing pod endpoints, newly discovered endpoints are given an unspecified pod key. + // In this code path, a pod key may be stale if the pod was wrongly assigned to the endpoint for this scenario: + // 1. pod A previously had IP i and EP x + // 2. pod A restarts w/ no ip AND NPM restarts AND pod B comes up with the same IP i and EP y + // 3. controller processes an update event for pod A with IP i before the update event for pod B with IP i, so pod A is wrongly assigned to EP y + if endpoint.isStalePodKey(pod.PodKey) { + // this check is technically covered by the podKey mismatch check below, assuming podKey can never equal staleKey + klog.Infof("[DataPlane] ignoring pod marked for delete since pod with key %s is stale and likely was deleted for endpoint %s", pod.PodKey, endpoint.id) + return nil + } + + if endpoint.podKey != pod.PodKey { + // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. + klog.Infof( + "[DataPlane] ignoring update pod since pod is marked for delete and the pod isn't assigned to this endpoint. pod: %s. endpoint ID: %s. endpoint pod key: %s", + pod.PodKey, endpoint.id, endpoint.podKey) + return nil + } + + msg := fmt.Sprintf("[DataPlane] deleting pod and cleaning up policies from endpoint. pod: %s. endpoint: %s", pod.PodKey, endpoint.id) + metrics.SendLog(util.DaemonDataplaneID, msg, metrics.PrintLog) + + endpoint.stalePodKey = &staleKey{ + key: endpoint.podKey, + timestamp: time.Now().Unix(), + } + endpoint.podKey = unspecifiedPodKey + + // remove all policies on the endpoint + if err := dp.policyMgr.ResetEndpoint(endpoint.id); err != nil { + klog.Warningf("[DataPlane] warning: resetting endpoint policies unsuccessful for pod marked for delete. endpoint ID: %s. pod key: %s", endpoint.id, pod.PodKey) + } + endpoint.netPolReference = make(map[string]struct{}) + + return nil +} + func (dp *DataPlane) getSelectorIPSets(policy *policies.NPMNetworkPolicy) map[string]struct{} { selectorIpSets := make(map[string]struct{}) for _, ipset := range policy.PodSelectorIPSets { @@ -337,6 +396,7 @@ func (dp *DataPlane) refreshAllPodEndpoints() error { npmEP := newNPMEndpoint(&endpoint) if oldNPMEP.podKey == unspecifiedPodKey { klog.Infof("updating endpoint cache since endpoint changed for IP which never had a pod key. new endpoint: %s, old endpoint: %s, ip: %s", npmEP.id, oldNPMEP.id, npmEP.ip) + npmEP.stalePodKey = oldNPMEP.stalePodKey dp.endpointCache[ip] = npmEP } else { npmEP.stalePodKey = &staleKey{ diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index d6da0212ab..ed4ab21246 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -40,7 +40,7 @@ type PodMetadata struct { PodKey string PodIP string NodeName string - MarkedForDelete bool + markedForDelete bool } func NewPodMetadata(podKey, podIP, nodeName string) *PodMetadata { @@ -48,16 +48,20 @@ func NewPodMetadata(podKey, podIP, nodeName string) *PodMetadata { PodKey: podKey, PodIP: podIP, NodeName: nodeName, - MarkedForDelete: false, + markedForDelete: false, } } func NewPodMetadataMarkedForDelete(podKey, podIP, nodeName string) *PodMetadata { pm := NewPodMetadata(podKey, podIP, nodeName) - pm.MarkedForDelete = true + pm.markedForDelete = true return pm } +func (pm *PodMetadata) isMarkedForDelete() bool { + return pm.markedForDelete +} + func newUpdateNPMPod(podMetadata *PodMetadata) *updateNPMPod { return &updateNPMPod{ PodMetadata: podMetadata, From 8f4fd49ae6e07ece07d710264184fe3f5c0d9aad Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Mon, 22 Aug 2022 15:55:39 -0700 Subject: [PATCH 14/18] only sync empty ip if pod running. add tmp log --- npm/pkg/controlplane/controllers/v2/podController.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index 7327e37f54..920af3e434 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -104,9 +104,11 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo op := eventOperations[eventType] if !hasValidPodIP(podObj) { - if eventType == addEvent { + if eventType == addEvent || podObj.Status.Phase != corev1.PodRunning { return key, needSync } + klog.Infof("[needSync] adding pod with empty IP. pod: [%+v]. status: [%+v]. conditions: [%+v]. podIPs: [%+v]. InitContainerStatuses: [%+v]. ContainerStatuses: [%+v]. EphemeralContainerStatuses: [%+v]", + podObj, podObj.Status, podObj.Status.Conditions, podObj.Status.PodIPs, podObj.Status.InitContainerStatuses, podObj.Status.ContainerStatuses, podObj.Status.EphemeralContainerStatuses) op = metrics.UpdateWithEmptyIPOp } From 398c99e4d1d2fbbf013a4429a68bb065e79215be Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Thu, 15 Dec 2022 14:17:59 -0800 Subject: [PATCH 15/18] undo special pod delete logic --- .../controllers/v2/podController.go | 17 +-- .../controllers/v2/podController_test.go | 36 +++--- npm/pkg/dataplane/dataplane_windows.go | 117 +++--------------- npm/pkg/dataplane/policies/policymanager.go | 7 -- npm/pkg/dataplane/types.go | 24 +--- 5 files changed, 46 insertions(+), 155 deletions(-) diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index 0c619d3526..436003766e 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -30,9 +30,8 @@ import ( type NamedPortOperation string const ( - deletePodAndNamedPort NamedPortOperation = "del-pod-and-namedport" - deleteNamedPort NamedPortOperation = "del" - addNamedPort NamedPortOperation = "add" + deleteNamedPort NamedPortOperation = "del" + addNamedPort NamedPortOperation = "add" addEvent string = "ADD" updateEvent string = "UPDATE" @@ -556,7 +555,7 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error { } var err error - cachedPodMetadata := dataplane.NewPodMetadataMarkedForDelete(cachedNpmPodKey, cachedNpmPod.PodIP, "") + cachedPodMetadata := dataplane.NewPodMetadata(cachedNpmPodKey, cachedNpmPod.PodIP, "") // Delete the pod from its namespace's ipset. // note: NodeName empty is not going to call update pod if err = c.dp.RemoveFromSets( @@ -582,7 +581,7 @@ func (c *PodController) cleanUpDeletedPod(cachedNpmPodKey string) error { // Delete pod's named ports from its ipset. Need to pass true in the manageNamedPortIpsets function call if err = c.manageNamedPortIpsets( - cachedNpmPod.ContainerPorts, cachedNpmPodKey, cachedNpmPod.PodIP, "", deletePodAndNamedPort); err != nil { + cachedNpmPod.ContainerPorts, cachedNpmPodKey, cachedNpmPod.PodIP, "", deleteNamedPort); err != nil { return fmt.Errorf("[cleanUpDeletedPod] Error: failed to delete pod from named port ipset with err: %w", err) } @@ -615,22 +614,16 @@ func (c *PodController) manageNamedPortIpsets(portList []corev1.ContainerPort, p namedPortIpsetEntry := fmt.Sprintf("%s,%s%d", podIP, protocol, port.ContainerPort) // nodename in NewPodMetadata is nil so UpdatePod is ignored + podMetadata := dataplane.NewPodMetadata(podKey, namedPortIpsetEntry, nodeName) switch namedPortOperation { case deleteNamedPort: - podMetadata := dataplane.NewPodMetadata(podKey, namedPortIpsetEntry, nodeName) if err := c.dp.RemoveFromSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(port.Name, ipsets.NamedPorts)}, podMetadata); err != nil { return fmt.Errorf("failed to remove from set when deleting named port with err %w", err) } case addNamedPort: - podMetadata := dataplane.NewPodMetadata(podKey, namedPortIpsetEntry, nodeName) if err := c.dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(port.Name, ipsets.NamedPorts)}, podMetadata); err != nil { return fmt.Errorf("failed to add to set when deleting named port with err %w", err) } - case deletePodAndNamedPort: - podMetadata := dataplane.NewPodMetadataMarkedForDelete(podKey, namedPortIpsetEntry, nodeName) - if err := c.dp.RemoveFromSets([]*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata(port.Name, ipsets.NamedPorts)}, podMetadata); err != nil { - return fmt.Errorf("failed to remove from set when deleting pod and named port with err %w", err) - } } } diff --git a/npm/pkg/controlplane/controllers/v2/podController_test.go b/npm/pkg/controlplane/controllers/v2/podController_test.go index 9d843e82ea..88a9c37a52 100644 --- a/npm/pkg/controlplane/controllers/v2/podController_test.go +++ b/npm/pkg/controlplane/controllers/v2/podController_test.go @@ -429,15 +429,13 @@ func TestDeletePod(t *testing.T) { } dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) // Delete pod section - deleteMetadata := dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4", "") - dp.EXPECT().RemoveFromSets(mockIPSets[:1], deleteMetadata).Return(nil).Times(1) - dp.EXPECT().RemoveFromSets(mockIPSets[1:], deleteMetadata).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) if !util.IsWindowsDP() { - dp.EXPECT(). RemoveFromSets( []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, - dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4,8080", ""), + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), ). Return(nil).Times(1) } @@ -547,14 +545,13 @@ func TestDeletePodWithTombstoneAfterAddingPod(t *testing.T) { } dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) // Delete pod section - deleteMetadata := dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4", "") - dp.EXPECT().RemoveFromSets(mockIPSets[:1], deleteMetadata).Return(nil).Times(1) - dp.EXPECT().RemoveFromSets(mockIPSets[1:], deleteMetadata).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) if !util.IsWindowsDP() { dp.EXPECT(). RemoveFromSets( []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, - dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4,8080", ""), + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), ). Return(nil).Times(1) } @@ -665,13 +662,12 @@ func TestEmptyIPUpdate(t *testing.T) { Return(nil).Times(1) dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) // Delete pod section - deleteMetadata := dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4", "") - dp.EXPECT().RemoveFromSets(mockIPSets[:1], deleteMetadata).Return(nil).Times(1) - dp.EXPECT().RemoveFromSets(mockIPSets[1:], deleteMetadata).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) dp.EXPECT(). RemoveFromSets( []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, - dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4,8080", ""), + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), ). Return(nil).Times(1) // New IP Pod add fails @@ -757,14 +753,13 @@ func TestIPAddressUpdatePod(t *testing.T) { } dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) // Delete pod section - deleteMetadata := dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4", "") - dp.EXPECT().RemoveFromSets(mockIPSets[:1], deleteMetadata).Return(nil).Times(1) - dp.EXPECT().RemoveFromSets(mockIPSets[1:], deleteMetadata).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) if !util.IsWindowsDP() { dp.EXPECT(). RemoveFromSets( []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, - dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4,8080", ""), + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), ). Return(nil).Times(1) } @@ -832,14 +827,13 @@ func TestPodStatusUpdatePod(t *testing.T) { } dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) // Delete pod section - deleteMetadata := dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4", "") - dp.EXPECT().RemoveFromSets(mockIPSets[:1], deleteMetadata).Return(nil).Times(1) - dp.EXPECT().RemoveFromSets(mockIPSets[1:], deleteMetadata).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) + dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) if !util.IsWindowsDP() { dp.EXPECT(). RemoveFromSets( []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, - dataplane.NewPodMetadataMarkedForDelete("test-namespace/test-pod", "1.2.3.4,8080", ""), + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), ). Return(nil).Times(1) } diff --git a/npm/pkg/dataplane/dataplane_windows.go b/npm/pkg/dataplane/dataplane_windows.go index 57bd378b88..c6d5d5c44d 100644 --- a/npm/pkg/dataplane/dataplane_windows.go +++ b/npm/pkg/dataplane/dataplane_windows.go @@ -6,7 +6,6 @@ import ( "strings" "time" - "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" "github.com/Azure/azure-container-networking/npm/util" npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" @@ -114,17 +113,30 @@ func (dp *DataPlane) shouldUpdatePod() bool { // updatePod has two responsibilities in windows // 1. Will call into dataplane and updates endpoint references of this pod. // 2. Will check for existing applicable network policies and applies it on endpoint. -// Assumptions: -// Not possible to know a pod's IP before endpoints are refreshed (i.e. IP has an endpoint ID switch means that the pod assigned to the first ID is dead) -func (dp *DataPlane) updatePod(pod *updateNPMPod) error { - if pod.isMarkedForDelete() { - return dp.updatePodForDelete(pod) - } +/* + FIXME: there is a rare edge case for memory-starved WS22 nodes: + - pod A previously had an IP k and EP x + - around same time: + - pod A restarts + - NPM restarts + - pod B comes up with IP k and EP y + + Controller events can be jumbled. The possible sequences are: + 1. Pod A create --> Pod A cleanup --> Pod B create + 2. Pod A create --> Pod B create --> Pod A cleanup + 3. Pod B create --> Pod A create --> Pod A cleanup + + 1 and 2 are accounted for, but 3 is a weird edge case that can happen under the above scenario. + The fix can be to reset all ACLs on the Endpoint, then add back Pod B's ACLs. + If a Pod encountered this issue, correct connectivity could be restored by bumping the Pod. + + TODO: one other note. It would be good to replace stalePodKey behavior since it is complex. +*/ +func (dp *DataPlane) updatePod(pod *updateNPMPod) error { klog.Infof("[DataPlane] updatePod called for Pod Key %s", pod.PodKey) if pod.NodeName != dp.nodeName { // Ignore updates if the pod is not part of this node. - // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. klog.Infof("[DataPlane] ignoring update pod as expected Node: [%s] got: [%s]. pod: [%s]", dp.nodeName, pod.NodeName, pod.PodKey) return nil } @@ -262,94 +274,6 @@ func (dp *DataPlane) updatePod(pod *updateNPMPod) error { return nil } -// updatePodForDelete resets the endpoint corresponding to the pod's IP. -// This functionality is required for the edge case below. -// Assumptions: -// Not possible to know a pod's IP before endpoints are refreshed (i.e. IP has an endpoint ID switch means that the pod assigned to the first ID is dead) -/* - Notes on the edge case in memory-starved Windows Server '22 where: - - pod A previously had IP and EP x - - around same time: - - pod A restarts - - NPM restarts - - pod B comes up with same IP and EP y - - controller events can be jumbled e.g. an update event for pod A w/ its old IP can happen before the pod B create event - - To start, have a new EP with an unspecified pod key: - 1. B --> A w/ IP --> A's cleanup: - - in updatePod(), proceed as usual for B - - in updatePod(), get a mismanaged err for A, which is good. Requeued in controller - - eventually, the pod will be cleaned up - - A's cleanup should be ignored in updatePodForDelete(), or else the controller will requeue - 2. A w/ IP --> B comes up --> A's cleanup: - - in updatePod(), A's policies applied to EP - - in updatePod(), get a get a mismanaged err for B. Requeued in controller - - in updatePodForDelete(), A's cleanup triggers 1) policies removed and 2) endpoint marked unspecified with A stale - - on B retry, all ipsets are in the podupdatecache - - in updatePod(), proceed as usual for B - 3. A w/ IP--> A's cleanup --> B comes up: - - in updatePod(), A's policies applied to EP - - in updatePodForDelete(), A's cleanup triggers 1) policies removed and 2) endpoint marked unspecified with A stale - - in updatePod(), proceed as usual for B - - From looking at logs, it seems most likely that HNS endpoints are always updated before we receive/process a pod deletion in the controller. - Therefore, we should never (or at least rarely) try to delete policies off of an endpoint that is getting destroyed. - Instead, if the pod is marked for delete, we would likely only reach this code path if we encounter number 2 or 3. -*/ -func (dp *DataPlane) updatePodForDelete(pod *updateNPMPod) error { - // No need to make compute-intensive refreshAllPodEndpoints() call. - // Instead, only get the HNS endpoint within ResetEndpoint(). - // This function will handle the case where the endpoint doesn't exist anymore, - // and we will ignore the reset if the pod key is stale (the endpoint belongs to another pod), - // i.e. an updatePod() call came in for another Pod of the same IP. - klog.Infof("[DataPlane] updatePodForDelete called for Pod Key %s", pod.PodKey) - - // Check if pod is already present in cache - endpoint, ok := dp.endpointCache.cache[pod.PodIP] - if !ok { - // ignore this err and pod endpoint will be deleted in ApplyDP - // if the endpoint is not found, it means the pod is not part of this node or pod got deleted. - klog.Warningf("[DataPlane] for pod marked for delete, did not find endpoint with IPaddress %s for pod %s", pod.PodIP, pod.PodKey) - return nil - } - - // While refreshing pod endpoints, newly discovered endpoints are given an unspecified pod key. - // In this code path, a pod key may be stale if the pod was wrongly assigned to the endpoint for this scenario: - // 1. pod A previously had IP i and EP x - // 2. pod A restarts w/ no ip AND NPM restarts AND pod B comes up with the same IP i and EP y - // 3. controller processes an update event for pod A with IP i before the update event for pod B with IP i, so pod A is wrongly assigned to EP y - if endpoint.isStalePodKey(pod.PodKey) { - // this check is technically covered by the podKey mismatch check below, assuming podKey can never equal staleKey - klog.Infof("[DataPlane] ignoring pod marked for delete since pod with key %s is stale and likely was deleted for endpoint %s", pod.PodKey, endpoint.id) - return nil - } - - if endpoint.podKey != pod.PodKey { - // If the pod is marked for delete, then the pod is on the node if and only if the endpoint's pod key equals this pod key. - klog.Infof( - "[DataPlane] ignoring update pod since pod is marked for delete and the pod isn't assigned to this endpoint. pod: %s. endpoint ID: %s. endpoint pod key: %s", - pod.PodKey, endpoint.id, endpoint.podKey) - return nil - } - - msg := fmt.Sprintf("[DataPlane] deleting pod and cleaning up policies from endpoint. pod: %s. endpoint: %s", pod.PodKey, endpoint.id) - metrics.SendLog(util.DaemonDataplaneID, msg, metrics.PrintLog) - - endpoint.stalePodKey = &staleKey{ - key: endpoint.podKey, - timestamp: time.Now().Unix(), - } - endpoint.podKey = unspecifiedPodKey - - // remove all policies on the endpoint - if err := dp.policyMgr.ResetEndpoint(endpoint.id); err != nil { - klog.Warningf("[DataPlane] warning: resetting endpoint policies unsuccessful for pod marked for delete. endpoint ID: %s. pod key: %s", endpoint.id, pod.PodKey) - } - endpoint.netPolReference = make(map[string]struct{}) - - return nil -} - func (dp *DataPlane) getSelectorIPSets(policy *policies.NPMNetworkPolicy) map[string]struct{} { selectorIpSets := make(map[string]struct{}) for _, ipset := range policy.PodSelectorIPSets { @@ -469,7 +393,6 @@ func (dp *DataPlane) refreshPodEndpoints() error { npmEP := newNPMEndpoint(endpoint) if oldNPMEP.podKey == unspecifiedPodKey { klog.Infof("updating endpoint cache since endpoint changed for IP which never had a pod key. new endpoint: %s, old endpoint: %s, ip: %s", npmEP.id, oldNPMEP.id, npmEP.ip) - npmEP.stalePodKey = oldNPMEP.stalePodKey dp.endpointCache.cache[ip] = npmEP } else { npmEP.stalePodKey = &staleKey{ diff --git a/npm/pkg/dataplane/policies/policymanager.go b/npm/pkg/dataplane/policies/policymanager.go index 152a76c9c0..775bfd44ac 100644 --- a/npm/pkg/dataplane/policies/policymanager.go +++ b/npm/pkg/dataplane/policies/policymanager.go @@ -73,13 +73,6 @@ func NewPolicyManager(ioShim *common.IOShim, cfg *PolicyManagerCfg) *PolicyManag } } -func (pMgr *PolicyManager) ResetEndpoint(epID string) error { - if util.IsWindowsDP() { - return pMgr.bootup([]string{epID}) - } - return nil -} - func (pMgr *PolicyManager) Bootup(epIDs []string) error { metrics.ResetNumACLRules() if err := pMgr.bootup(epIDs); err != nil { diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index 87fb1f55f5..d8fd26afc3 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -40,31 +40,19 @@ type updateNPMPod struct { // todo definitely requires further optimization between the intersection // of types, PodMetadata, NpmPod and corev1.pod type PodMetadata struct { - PodKey string - PodIP string - NodeName string - markedForDelete bool + PodKey string + PodIP string + NodeName string } func NewPodMetadata(podKey, podIP, nodeName string) *PodMetadata { return &PodMetadata{ - PodKey: podKey, - PodIP: podIP, - NodeName: nodeName, - markedForDelete: false, + PodKey: podKey, + PodIP: podIP, + NodeName: nodeName, } } -func NewPodMetadataMarkedForDelete(podKey, podIP, nodeName string) *PodMetadata { - pm := NewPodMetadata(podKey, podIP, nodeName) - pm.markedForDelete = true - return pm -} - -func (pm *PodMetadata) isMarkedForDelete() bool { - return pm.markedForDelete -} - func (p *PodMetadata) Namespace() string { return strings.Split(p.PodKey, "/")[0] } From 56f64bb0e4d0d62dc73451b13129e5951a7ce0b1 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Fri, 16 Dec 2022 14:02:37 -0800 Subject: [PATCH 16/18] reference GH issue --- npm/pkg/dataplane/dataplane_windows.go | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/npm/pkg/dataplane/dataplane_windows.go b/npm/pkg/dataplane/dataplane_windows.go index c6d5d5c44d..5f1bc4b557 100644 --- a/npm/pkg/dataplane/dataplane_windows.go +++ b/npm/pkg/dataplane/dataplane_windows.go @@ -114,24 +114,8 @@ func (dp *DataPlane) shouldUpdatePod() bool { // 1. Will call into dataplane and updates endpoint references of this pod. // 2. Will check for existing applicable network policies and applies it on endpoint. /* - FIXME: there is a rare edge case for memory-starved WS22 nodes: - - pod A previously had an IP k and EP x - - around same time: - - pod A restarts - - NPM restarts - - pod B comes up with IP k and EP y - - Controller events can be jumbled. The possible sequences are: - 1. Pod A create --> Pod A cleanup --> Pod B create - 2. Pod A create --> Pod B create --> Pod A cleanup - 3. Pod B create --> Pod A create --> Pod A cleanup - - 1 and 2 are accounted for, but 3 is a weird edge case that can happen under the above scenario. - The fix can be to reset all ACLs on the Endpoint, then add back Pod B's ACLs. - - If a Pod encountered this issue, correct connectivity could be restored by bumping the Pod. - - TODO: one other note. It would be good to replace stalePodKey behavior since it is complex. + FIXME: see https://github.com/Azure/azure-container-networking/issues/1729 + TODO: it would be good to replace stalePodKey behavior since it is complex. */ func (dp *DataPlane) updatePod(pod *updateNPMPod) error { klog.Infof("[DataPlane] updatePod called for Pod Key %s", pod.PodKey) From 05ab431409da24aea6f611db3bdfa59640960f0d Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Fri, 16 Dec 2022 14:16:05 -0800 Subject: [PATCH 17/18] fix Windows UTs --- .../controllers/v2/podController_test.go | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/npm/pkg/controlplane/controllers/v2/podController_test.go b/npm/pkg/controlplane/controllers/v2/podController_test.go index 88a9c37a52..d1ae9c5c02 100644 --- a/npm/pkg/controlplane/controllers/v2/podController_test.go +++ b/npm/pkg/controlplane/controllers/v2/podController_test.go @@ -654,23 +654,27 @@ func TestEmptyIPUpdate(t *testing.T) { dp.EXPECT().AddToLists([]*ipsets.IPSetMetadata{kubeAllNamespaces}, mockIPSets[:1]).Return(nil).Times(1) dp.EXPECT().AddToSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) dp.EXPECT().AddToSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) - dp.EXPECT(). - AddToSets( - []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, - dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), - ). - Return(nil).Times(1) + if !util.IsWindowsDP() { + dp.EXPECT(). + AddToSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + } dp.EXPECT().ApplyDataPlane().Return(nil).Times(2) // Delete pod section dp.EXPECT().RemoveFromSets(mockIPSets[:1], podMetadata1).Return(nil).Times(1) dp.EXPECT().RemoveFromSets(mockIPSets[1:], podMetadata1).Return(nil).Times(1) - dp.EXPECT(). - RemoveFromSets( - []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, - dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), - ). - Return(nil).Times(1) - // New IP Pod add fails + if !util.IsWindowsDP() { + dp.EXPECT(). + RemoveFromSets( + []*ipsets.IPSetMetadata{ipsets.NewIPSetMetadata("app:test-pod", ipsets.NamedPorts)}, + dataplane.NewPodMetadata("test-namespace/test-pod", "1.2.3.4,8080", ""), + ). + Return(nil).Times(1) + } + // since the new IP is invalid, adding the new Pod object is ignored updatePod(t, f, oldPodObj, newPodObj) From d76ae6a81c9e1f471a0b3907e9b5d7e7900d1a71 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Fri, 16 Dec 2022 14:13:02 -0800 Subject: [PATCH 18/18] remove prometheus metrics and a log --- npm/metrics/pods.go | 14 --------- npm/metrics/pods_test.go | 17 +---------- npm/metrics/prometheus-metrics.go | 29 +------------------ npm/metrics/prometheus-values.go | 16 ---------- .../controllers/v2/podController.go | 21 +++----------- 5 files changed, 6 insertions(+), 91 deletions(-) diff --git a/npm/metrics/pods.go b/npm/metrics/pods.go index 55c8bf6160..09d8ec5a05 100644 --- a/npm/metrics/pods.go +++ b/npm/metrics/pods.go @@ -1,7 +1,5 @@ package metrics -import "github.com/prometheus/client_golang/prometheus" - // RecordControllerPodExecTime adds an observation of pod exec time for the specified operation (unless the operation is NoOp). // The execution time is from the timer's start until now. func RecordControllerPodExecTime(timer *Timer, op OperationKind, hadError bool) { @@ -13,15 +11,3 @@ func RecordControllerPodExecTime(timer *Timer, op OperationKind, hadError bool) func GetControllerPodExecCount(op OperationKind, hadError bool) (int, error) { return getCountVecValue(controllerPodExecTime, getCRUDExecTimeLabels(op, hadError)) } - -func IncPodEventTotal(op OperationKind) { - podEventCount.With(getPodEventTotalLabels(op)).Inc() -} - -func getPodEventTotal(op OperationKind) (int, error) { - return getTotalVecValue(podEventCount, getPodEventTotalLabels(op)) -} - -func getPodEventTotalLabels(op OperationKind) prometheus.Labels { - return prometheus.Labels{operationLabel: string(op)} -} diff --git a/npm/metrics/pods_test.go b/npm/metrics/pods_test.go index dd5a8f55af..ffcfbd8600 100644 --- a/npm/metrics/pods_test.go +++ b/npm/metrics/pods_test.go @@ -1,11 +1,6 @@ package metrics -import ( - "testing" - - "github.com/Azure/azure-container-networking/npm/metrics/promutil" - "github.com/stretchr/testify/require" -) +import "testing" func TestRecordControllerPodExecTime(t *testing.T) { testStopAndRecordCRUDExecTime(t, &crudExecMetric{ @@ -13,13 +8,3 @@ func TestRecordControllerPodExecTime(t *testing.T) { GetControllerPodExecCount, }) } - -func TestIncPodEventTotal(t *testing.T) { - InitializeAll() - for _, op := range []OperationKind{CreateOp, UpdateOp, DeleteOp, UpdateWithEmptyIPOp} { - IncPodEventTotal(op) - val, err := getPodEventTotal(op) - promutil.NotifyIfErrors(t, err) - require.Equal(t, 1, val, "expected metric count to be incremented for op: %s", op) - } -} diff --git a/npm/metrics/prometheus-metrics.go b/npm/metrics/prometheus-metrics.go index 9e5bee5820..73c63b3eda 100644 --- a/npm/metrics/prometheus-metrics.go +++ b/npm/metrics/prometheus-metrics.go @@ -60,16 +60,12 @@ const ( delta90th float64 = 0.01 quantil99th float64 = 0.99 delta99th float64 = 0.001 - - // controller workqueue metrics - podEventTotalName = "pod_event_total" - podEventTotalHelp = "The total number of pod events ever added to the controller workqueue" ) // Gauge metrics have the methods Inc(), Dec(), and Set(float64) // Summary metrics have the method Observe(float64) // For any Vector metric, you can call With(prometheus.Labels) before the above methods -// e.g. SomeGaugeVec.With(prometheus.Labels{label1: val1, label2: val2, ...).Dec() +// e.g. SomeGaugeVec.With(prometheus.Labels{label1: val1, label2: val2, ...).Dec() var ( nodeRegistry = prometheus.NewRegistry() clusterRegistry = prometheus.NewRegistry() @@ -96,10 +92,6 @@ var ( controllerPodExecTime *prometheus.SummaryVec controllerNamespaceExecTime *prometheus.SummaryVec controllerExecTimeLabels = []string{operationLabel, hadErrorLabel} - - // controller workqueue metrics - podEventCount *prometheus.CounterVec - podEventTotalLabels = []string{operationLabel} ) type RegistryType string @@ -116,8 +108,6 @@ const ( UpdateOp OperationKind = "update" DeleteOp OperationKind = "delete" NoOp OperationKind = "noop" - // UpdateWithEmptyIPOp is intended to be used for the PodEvent counter only - UpdateWithEmptyIPOp OperationKind = "update-with-empty-ip" ) func (op OperationKind) isValid() bool { @@ -181,9 +171,6 @@ func initializeControllerMetrics() { // CLUSTER METRICS numPolicies = createClusterGauge(numPoliciesName, numPoliciesHelp) - // controller workqueue metrics - podEventCount = newPodEventCount() - // NODE METRICS addPolicyExecTime = createNodeSummaryVec(addPolicyExecTimeName, "", addPolicyExecTimeHelp, addPolicyExecTimeLabels) @@ -266,17 +253,3 @@ func createNodeSummaryVec(name, subsystem, helpMessage string, labels []string) func createControllerExecTimeSummaryVec(name, helpMessage string) *prometheus.SummaryVec { return createNodeSummaryVec(name, controllerPrefix, helpMessage, controllerExecTimeLabels) } - -func newPodEventCount() *prometheus.CounterVec { - counter := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: controllerPrefix, - Name: podEventTotalName, - Help: podEventTotalHelp, - }, - podEventTotalLabels, - ) - register(counter, podEventTotalName, ClusterMetrics) - return counter -} diff --git a/npm/metrics/prometheus-values.go b/npm/metrics/prometheus-values.go index 8991d11d12..53a265154b 100644 --- a/npm/metrics/prometheus-values.go +++ b/npm/metrics/prometheus-values.go @@ -35,14 +35,6 @@ func getCountValue(collector prometheus.Collector) (int, error) { return int(dtoMetric.Summary.GetSampleCount()), nil } -func getTotal(collector prometheus.Collector) (int, error) { - dtoMetric, err := getDTOMetric(collector) - if err != nil { - return 0, err - } - return int(dtoMetric.Counter.GetValue()), nil -} - func getCountVecValue(summaryVecMetric *prometheus.SummaryVec, labels prometheus.Labels) (int, error) { collector, ok := summaryVecMetric.With(labels).(prometheus.Collector) if !ok { @@ -51,14 +43,6 @@ func getCountVecValue(summaryVecMetric *prometheus.SummaryVec, labels prometheus return getCountValue(collector) } -func getTotalVecValue(counterVecMetric *prometheus.CounterVec, labels prometheus.Labels) (int, error) { - collector, ok := counterVecMetric.With(labels).(prometheus.Collector) - if !ok { - return 0, errNotCollector - } - return getTotal(collector) -} - func getCRUDExecTimeLabels(op OperationKind, hadError bool) prometheus.Labels { hadErrorVal := "false" if hadError { diff --git a/npm/pkg/controlplane/controllers/v2/podController.go b/npm/pkg/controlplane/controllers/v2/podController.go index 436003766e..f7d92119fa 100644 --- a/npm/pkg/controlplane/controllers/v2/podController.go +++ b/npm/pkg/controlplane/controllers/v2/podController.go @@ -37,14 +37,7 @@ const ( updateEvent string = "UPDATE" ) -var ( - kubeAllNamespaces = &ipsets.IPSetMetadata{Name: util.KubeAllNamespacesFlag, Type: ipsets.KeyLabelOfNamespace} - - eventOperations = map[string]metrics.OperationKind{ - addEvent: metrics.CreateOp, - updateEvent: metrics.UpdateOp, - } -) +var kubeAllNamespaces = &ipsets.IPSetMetadata{Name: util.KubeAllNamespacesFlag, Type: ipsets.KeyLabelOfNamespace} type PodController struct { podLister corelisters.PodLister @@ -101,14 +94,9 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo return key, needSync } - op := eventOperations[eventType] - if !hasValidPodIP(podObj) { - if eventType == addEvent || podObj.Status.Phase != corev1.PodRunning { - return key, needSync - } - klog.Infof("[needSync] adding pod with empty IP. pod: [%+v]. status: [%+v]. conditions: [%+v]. podIPs: [%+v]. InitContainerStatuses: [%+v]. ContainerStatuses: [%+v]. EphemeralContainerStatuses: [%+v]", - podObj, podObj.Status, podObj.Status.Conditions, podObj.Status.PodIPs, podObj.Status.InitContainerStatuses, podObj.Status.ContainerStatuses, podObj.Status.EphemeralContainerStatuses) - op = metrics.UpdateWithEmptyIPOp + // should enqueue updates for Pods with an empty IP if they are also Running + if !hasValidPodIP(podObj) && (eventType == addEvent || podObj.Status.Phase != corev1.PodRunning) { + return key, needSync } if isHostNetworkPod(podObj) { @@ -123,7 +111,6 @@ func (c *PodController) needSync(eventType string, obj interface{}) (string, boo return key, needSync } - metrics.IncPodEventTotal(op) needSync = true return key, needSync }