From b8696590b33f5dbec165bef7cefdeaf70fd5ec7e Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Thu, 22 Jun 2023 12:09:55 -0700 Subject: [PATCH 1/4] fix: lock while adding policy in bootup phase --- npm/pkg/dataplane/dataplane.go | 81 ++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 33 deletions(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 0ab6a76bd6..bef9139d5d 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -276,24 +276,21 @@ func (dp *DataPlane) RemoveFromList(listName *ipsets.IPSetMetadata, setNames []* // and accordingly makes changes in dataplane. This function helps emulate a single call to // dataplane instead of multiple ipset operations calls ipset operations calls to dataplane func (dp *DataPlane) ApplyDataPlane() error { - if dp.applyInBackground { - return dp.incrementBatchAndApplyIfNeeded(contextApplyDP) + if !dp.applyInBackground { + return dp.applyDataPlaneNow(contextApplyDP) } - return dp.applyDataPlaneNow(contextApplyDP) -} - -func (dp *DataPlane) incrementBatchAndApplyIfNeeded(context string) error { + // increment batch and apply if needed dp.applyInfo.Lock() dp.applyInfo.numBatches++ newCount := dp.applyInfo.numBatches dp.applyInfo.Unlock() - klog.Infof("[DataPlane] [%s] new batch count: %d", context, newCount) + klog.Infof("[DataPlane] [%s] new batch count: %d", contextApplyDP, newCount) if newCount >= dp.ApplyMaxBatches { - klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", context, newCount) - return dp.applyDataPlaneNow(context) + klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextApplyDP, newCount) + return dp.applyDataPlaneNow(contextApplyDP) } return nil @@ -380,32 +377,61 @@ func (dp *DataPlane) AddPolicy(policy *policies.NPMNetworkPolicy) error { return fmt.Errorf("[DataPlane] error while adding Rule IPSet references: %w", err) } - var endpointList map[string]string - if dp.inBootupPhase() { + inBootupPhase := false + if dp.applyInBackground { + dp.applyInfo.Lock() + inBootupPhase = dp.applyInfo.inBootupPhase + if inBootupPhase { + // keep holding the lock to block FinishBootupPhase() and prevent PodController from + // coming back online and causing race issues from updatePod() within applyDataPlaneNow() + defer dp.applyInfo.Unlock() + } else { + dp.applyInfo.Unlock() + } + } + + if inBootupPhase { // During bootup phase, the Pod controller will not be running. // We don't need to worry about adding Policies to Endpoints, so we don't need IPSets in the kernel yet. // Ideally, we get all NetworkPolicies in the cache before the Pod controller starts - err = dp.incrementBatchAndApplyIfNeeded(contextAddNetPol) - if err != nil { - return err - } - } else { - err = dp.applyDataPlaneNow(contextAddNetPol) - if err != nil { - return err + + // increment batch and apply if needed + dp.applyInfo.numBatches++ + newCount := dp.applyInfo.numBatches + klog.Infof("[DataPlane] [bootup] [%s] new batch count: %d", contextAddNetPol, newCount) + if newCount >= dp.ApplyMaxBatches { + klog.Infof("[DataPlane] [bootup] [%s] applying now since reached maximum batch count: %d", contextAddNetPol, newCount) + err = dp.applyDataPlaneNow(contextAddNetPol) + if err != nil { + return err + } } - endpointList, err = dp.getEndpointsToApplyPolicy(policy) + err = dp.policyMgr.AddPolicy(policy, nil) if err != nil { - return fmt.Errorf("[DataPlane] error while getting endpoints to apply policy after applying dataplane: %w", err) + return fmt.Errorf("[DataPlane] [bootup] error while adding policy: %w", err) } + + return nil + } + + // standard, non-bootup phase + err = dp.applyDataPlaneNow(contextAddNetPol) + if err != nil { + return err + } + + var endpointList map[string]string + endpointList, err = dp.getEndpointsToApplyPolicy(policy) + if err != nil { + return fmt.Errorf("[DataPlane] error while getting endpoints to apply policy after applying dataplane: %w", err) } - // endpointList will be empty if in bootup phase err = dp.policyMgr.AddPolicy(policy, endpointList) if err != nil { return fmt.Errorf("[DataPlane] error while adding policy: %w", err) } + return nil } @@ -582,14 +608,3 @@ func (dp *DataPlane) deleteIPSetsAndReferences(sets []*ipsets.TranslatedIPSet, n } return nil } - -func (dp *DataPlane) inBootupPhase() bool { - if !dp.applyInBackground { - return false - } - - dp.applyInfo.Lock() - defer dp.applyInfo.Unlock() - - return dp.applyInfo.inBootupPhase -} From 9d5b887a5e02e17b76d8156fae9a1f0051cd9b7d Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Thu, 22 Jun 2023 12:11:53 -0700 Subject: [PATCH 2/4] test: fix UT to model true pod controller behavior --- npm/pkg/dataplane/dataplane-test-cases_windows_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go index 72b16daacf..82741f7697 100644 --- a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go +++ b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go @@ -2224,15 +2224,13 @@ func getAllMultiJobTests() []*MultiJobTestCase { { Description: "create namespaces, pods, and a policy which applies to a pod", Jobs: map[string][]*Action{ - "finish_bootup_phase": { - FinishBootupPhase(), - }, "namespace_controller": { CreateNamespace("x", map[string]string{"k1": "v1"}), CreateNamespace("y", map[string]string{"k2": "v2"}), ApplyDP(), }, "pod_controller": { + FinishBootupPhase(), CreatePod("x", "a", ip1, thisNode, map[string]string{"k1": "v1"}), CreatePod("y", "a", ip2, otherNode, map[string]string{"k1": "v1"}), ApplyDP(), From 1aeaa6afb751fd8b2435eae5f7972a50728ce5ab Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Thu, 22 Jun 2023 12:19:07 -0700 Subject: [PATCH 3/4] fix: prevent deadlock --- npm/pkg/dataplane/dataplane.go | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index bef9139d5d..64e48177fe 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -18,10 +18,11 @@ import ( const ( reconcileDuration = time.Duration(5 * time.Minute) - contextBackground = "BACKGROUND" - contextApplyDP = "APPLY-DP" - contextAddNetPol = "ADD-NETPOL" - contextDelNetPol = "DEL-NETPOL" + contextBackground = "BACKGROUND" + contextApplyDP = "APPLY-DP" + contextAddNetPol = "ADD-NETPOL" + contextAddNetPolBootup = "BOOTUP-ADD-NETPOL" + contextDelNetPol = "DEL-NETPOL" ) var ErrInvalidApplyConfig = errors.New("invalid apply config") @@ -280,7 +281,7 @@ func (dp *DataPlane) ApplyDataPlane() error { return dp.applyDataPlaneNow(contextApplyDP) } - // increment batch and apply if needed + // increment batch and apply dataplane if needed dp.applyInfo.Lock() dp.applyInfo.numBatches++ newCount := dp.applyInfo.numBatches @@ -395,21 +396,25 @@ func (dp *DataPlane) AddPolicy(policy *policies.NPMNetworkPolicy) error { // We don't need to worry about adding Policies to Endpoints, so we don't need IPSets in the kernel yet. // Ideally, we get all NetworkPolicies in the cache before the Pod controller starts - // increment batch and apply if needed + // increment batch and apply IPSets if needed dp.applyInfo.numBatches++ newCount := dp.applyInfo.numBatches - klog.Infof("[DataPlane] [bootup] [%s] new batch count: %d", contextAddNetPol, newCount) + klog.Infof("[DataPlane] [%s] new batch count: %d", contextAddNetPolBootup, newCount) if newCount >= dp.ApplyMaxBatches { - klog.Infof("[DataPlane] [bootup] [%s] applying now since reached maximum batch count: %d", contextAddNetPol, newCount) - err = dp.applyDataPlaneNow(contextAddNetPol) + klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextAddNetPolBootup, newCount) + klog.Infof("[DataPlane] [%s] starting to apply ipsets", contextAddNetPolBootup) + err := dp.ipsetMgr.ApplyIPSets() if err != nil { - return err + return fmt.Errorf("[DataPlane] [%s] error while applying IPSets: %w", contextAddNetPolBootup, err) } + klog.Infof("[DataPlane] [%s] finished applying ipsets", contextAddNetPolBootup) + + dp.applyInfo.numBatches = 0 } err = dp.policyMgr.AddPolicy(policy, nil) if err != nil { - return fmt.Errorf("[DataPlane] [bootup] error while adding policy: %w", err) + return fmt.Errorf("[DataPlane] [%s] error while adding policy: %w", contextAddNetPolBootup, err) } return nil From 153f90ccb2b5d5b93183b12e84ba651428d38c85 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Thu, 22 Jun 2023 14:06:32 -0700 Subject: [PATCH 4/4] style: lint --- npm/pkg/dataplane/dataplane.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 64e48177fe..8164a06027 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -403,7 +403,7 @@ func (dp *DataPlane) AddPolicy(policy *policies.NPMNetworkPolicy) error { if newCount >= dp.ApplyMaxBatches { klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextAddNetPolBootup, newCount) klog.Infof("[DataPlane] [%s] starting to apply ipsets", contextAddNetPolBootup) - err := dp.ipsetMgr.ApplyIPSets() + err = dp.ipsetMgr.ApplyIPSets() if err != nil { return fmt.Errorf("[DataPlane] [%s] error while applying IPSets: %w", contextAddNetPolBootup, err) }