diff --git a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go index 72b16daacf..82741f7697 100644 --- a/npm/pkg/dataplane/dataplane-test-cases_windows_test.go +++ b/npm/pkg/dataplane/dataplane-test-cases_windows_test.go @@ -2224,15 +2224,13 @@ func getAllMultiJobTests() []*MultiJobTestCase { { Description: "create namespaces, pods, and a policy which applies to a pod", Jobs: map[string][]*Action{ - "finish_bootup_phase": { - FinishBootupPhase(), - }, "namespace_controller": { CreateNamespace("x", map[string]string{"k1": "v1"}), CreateNamespace("y", map[string]string{"k2": "v2"}), ApplyDP(), }, "pod_controller": { + FinishBootupPhase(), CreatePod("x", "a", ip1, thisNode, map[string]string{"k1": "v1"}), CreatePod("y", "a", ip2, otherNode, map[string]string{"k1": "v1"}), ApplyDP(), diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 0ab6a76bd6..8164a06027 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -18,10 +18,11 @@ import ( const ( reconcileDuration = time.Duration(5 * time.Minute) - contextBackground = "BACKGROUND" - contextApplyDP = "APPLY-DP" - contextAddNetPol = "ADD-NETPOL" - contextDelNetPol = "DEL-NETPOL" + contextBackground = "BACKGROUND" + contextApplyDP = "APPLY-DP" + contextAddNetPol = "ADD-NETPOL" + contextAddNetPolBootup = "BOOTUP-ADD-NETPOL" + contextDelNetPol = "DEL-NETPOL" ) var ErrInvalidApplyConfig = errors.New("invalid apply config") @@ -276,24 +277,21 @@ func (dp *DataPlane) RemoveFromList(listName *ipsets.IPSetMetadata, setNames []* // and accordingly makes changes in dataplane. This function helps emulate a single call to // dataplane instead of multiple ipset operations calls ipset operations calls to dataplane func (dp *DataPlane) ApplyDataPlane() error { - if dp.applyInBackground { - return dp.incrementBatchAndApplyIfNeeded(contextApplyDP) + if !dp.applyInBackground { + return dp.applyDataPlaneNow(contextApplyDP) } - return dp.applyDataPlaneNow(contextApplyDP) -} - -func (dp *DataPlane) incrementBatchAndApplyIfNeeded(context string) error { + // increment batch and apply dataplane if needed dp.applyInfo.Lock() dp.applyInfo.numBatches++ newCount := dp.applyInfo.numBatches dp.applyInfo.Unlock() - klog.Infof("[DataPlane] [%s] new batch count: %d", context, newCount) + klog.Infof("[DataPlane] [%s] new batch count: %d", contextApplyDP, newCount) if newCount >= dp.ApplyMaxBatches { - klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", context, newCount) - return dp.applyDataPlaneNow(context) + klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextApplyDP, newCount) + return dp.applyDataPlaneNow(contextApplyDP) } return nil @@ -380,32 +378,65 @@ func (dp *DataPlane) AddPolicy(policy *policies.NPMNetworkPolicy) error { return fmt.Errorf("[DataPlane] error while adding Rule IPSet references: %w", err) } - var endpointList map[string]string - if dp.inBootupPhase() { + inBootupPhase := false + if dp.applyInBackground { + dp.applyInfo.Lock() + inBootupPhase = dp.applyInfo.inBootupPhase + if inBootupPhase { + // keep holding the lock to block FinishBootupPhase() and prevent PodController from + // coming back online and causing race issues from updatePod() within applyDataPlaneNow() + defer dp.applyInfo.Unlock() + } else { + dp.applyInfo.Unlock() + } + } + + if inBootupPhase { // During bootup phase, the Pod controller will not be running. // We don't need to worry about adding Policies to Endpoints, so we don't need IPSets in the kernel yet. // Ideally, we get all NetworkPolicies in the cache before the Pod controller starts - err = dp.incrementBatchAndApplyIfNeeded(contextAddNetPol) - if err != nil { - return err - } - } else { - err = dp.applyDataPlaneNow(contextAddNetPol) - if err != nil { - return err + + // increment batch and apply IPSets if needed + dp.applyInfo.numBatches++ + newCount := dp.applyInfo.numBatches + klog.Infof("[DataPlane] [%s] new batch count: %d", contextAddNetPolBootup, newCount) + if newCount >= dp.ApplyMaxBatches { + klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextAddNetPolBootup, newCount) + klog.Infof("[DataPlane] [%s] starting to apply ipsets", contextAddNetPolBootup) + err = dp.ipsetMgr.ApplyIPSets() + if err != nil { + return fmt.Errorf("[DataPlane] [%s] error while applying IPSets: %w", contextAddNetPolBootup, err) + } + klog.Infof("[DataPlane] [%s] finished applying ipsets", contextAddNetPolBootup) + + dp.applyInfo.numBatches = 0 } - endpointList, err = dp.getEndpointsToApplyPolicy(policy) + err = dp.policyMgr.AddPolicy(policy, nil) if err != nil { - return fmt.Errorf("[DataPlane] error while getting endpoints to apply policy after applying dataplane: %w", err) + return fmt.Errorf("[DataPlane] [%s] error while adding policy: %w", contextAddNetPolBootup, err) } + + return nil + } + + // standard, non-bootup phase + err = dp.applyDataPlaneNow(contextAddNetPol) + if err != nil { + return err + } + + var endpointList map[string]string + endpointList, err = dp.getEndpointsToApplyPolicy(policy) + if err != nil { + return fmt.Errorf("[DataPlane] error while getting endpoints to apply policy after applying dataplane: %w", err) } - // endpointList will be empty if in bootup phase err = dp.policyMgr.AddPolicy(policy, endpointList) if err != nil { return fmt.Errorf("[DataPlane] error while adding policy: %w", err) } + return nil } @@ -582,14 +613,3 @@ func (dp *DataPlane) deleteIPSetsAndReferences(sets []*ipsets.TranslatedIPSet, n } return nil } - -func (dp *DataPlane) inBootupPhase() bool { - if !dp.applyInBackground { - return false - } - - dp.applyInfo.Lock() - defer dp.applyInfo.Unlock() - - return dp.applyInfo.inBootupPhase -}