Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions npm/pkg/dataplane/dataplane-test-cases_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2224,15 +2224,13 @@ func getAllMultiJobTests() []*MultiJobTestCase {
{
Description: "create namespaces, pods, and a policy which applies to a pod",
Jobs: map[string][]*Action{
"finish_bootup_phase": {
FinishBootupPhase(),
},
"namespace_controller": {
CreateNamespace("x", map[string]string{"k1": "v1"}),
CreateNamespace("y", map[string]string{"k2": "v2"}),
ApplyDP(),
},
"pod_controller": {
FinishBootupPhase(),
CreatePod("x", "a", ip1, thisNode, map[string]string{"k1": "v1"}),
CreatePod("y", "a", ip2, otherNode, map[string]string{"k1": "v1"}),
ApplyDP(),
Expand Down
94 changes: 57 additions & 37 deletions npm/pkg/dataplane/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (
const (
reconcileDuration = time.Duration(5 * time.Minute)

contextBackground = "BACKGROUND"
contextApplyDP = "APPLY-DP"
contextAddNetPol = "ADD-NETPOL"
contextDelNetPol = "DEL-NETPOL"
contextBackground = "BACKGROUND"
contextApplyDP = "APPLY-DP"
contextAddNetPol = "ADD-NETPOL"
contextAddNetPolBootup = "BOOTUP-ADD-NETPOL"
contextDelNetPol = "DEL-NETPOL"
)

var ErrInvalidApplyConfig = errors.New("invalid apply config")
Expand Down Expand Up @@ -276,24 +277,21 @@ func (dp *DataPlane) RemoveFromList(listName *ipsets.IPSetMetadata, setNames []*
// and accordingly makes changes in dataplane. This function helps emulate a single call to
// dataplane instead of multiple ipset operations calls ipset operations calls to dataplane
func (dp *DataPlane) ApplyDataPlane() error {
if dp.applyInBackground {
return dp.incrementBatchAndApplyIfNeeded(contextApplyDP)
if !dp.applyInBackground {
return dp.applyDataPlaneNow(contextApplyDP)
}

return dp.applyDataPlaneNow(contextApplyDP)
}

func (dp *DataPlane) incrementBatchAndApplyIfNeeded(context string) error {
// increment batch and apply dataplane if needed
dp.applyInfo.Lock()
dp.applyInfo.numBatches++
newCount := dp.applyInfo.numBatches
dp.applyInfo.Unlock()

klog.Infof("[DataPlane] [%s] new batch count: %d", context, newCount)
klog.Infof("[DataPlane] [%s] new batch count: %d", contextApplyDP, newCount)

if newCount >= dp.ApplyMaxBatches {
klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", context, newCount)
return dp.applyDataPlaneNow(context)
klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextApplyDP, newCount)
return dp.applyDataPlaneNow(contextApplyDP)
}

return nil
Expand Down Expand Up @@ -380,32 +378,65 @@ func (dp *DataPlane) AddPolicy(policy *policies.NPMNetworkPolicy) error {
return fmt.Errorf("[DataPlane] error while adding Rule IPSet references: %w", err)
}

var endpointList map[string]string
if dp.inBootupPhase() {
inBootupPhase := false
if dp.applyInBackground {
dp.applyInfo.Lock()
inBootupPhase = dp.applyInfo.inBootupPhase
if inBootupPhase {
// keep holding the lock to block FinishBootupPhase() and prevent PodController from
// coming back online and causing race issues from updatePod() within applyDataPlaneNow()
defer dp.applyInfo.Unlock()
} else {
dp.applyInfo.Unlock()
}
}

if inBootupPhase {
// During bootup phase, the Pod controller will not be running.
// We don't need to worry about adding Policies to Endpoints, so we don't need IPSets in the kernel yet.
// Ideally, we get all NetworkPolicies in the cache before the Pod controller starts
err = dp.incrementBatchAndApplyIfNeeded(contextAddNetPol)
if err != nil {
return err
}
} else {
err = dp.applyDataPlaneNow(contextAddNetPol)
if err != nil {
return err

// increment batch and apply IPSets if needed
dp.applyInfo.numBatches++
newCount := dp.applyInfo.numBatches
klog.Infof("[DataPlane] [%s] new batch count: %d", contextAddNetPolBootup, newCount)
if newCount >= dp.ApplyMaxBatches {
klog.Infof("[DataPlane] [%s] applying now since reached maximum batch count: %d", contextAddNetPolBootup, newCount)
klog.Infof("[DataPlane] [%s] starting to apply ipsets", contextAddNetPolBootup)
err = dp.ipsetMgr.ApplyIPSets()
if err != nil {
return fmt.Errorf("[DataPlane] [%s] error while applying IPSets: %w", contextAddNetPolBootup, err)
}
klog.Infof("[DataPlane] [%s] finished applying ipsets", contextAddNetPolBootup)

dp.applyInfo.numBatches = 0
}

endpointList, err = dp.getEndpointsToApplyPolicy(policy)
err = dp.policyMgr.AddPolicy(policy, nil)
if err != nil {
return fmt.Errorf("[DataPlane] error while getting endpoints to apply policy after applying dataplane: %w", err)
return fmt.Errorf("[DataPlane] [%s] error while adding policy: %w", contextAddNetPolBootup, err)
}

return nil
}

// standard, non-bootup phase
err = dp.applyDataPlaneNow(contextAddNetPol)
if err != nil {
return err
}

var endpointList map[string]string
endpointList, err = dp.getEndpointsToApplyPolicy(policy)
if err != nil {
return fmt.Errorf("[DataPlane] error while getting endpoints to apply policy after applying dataplane: %w", err)
}

// endpointList will be empty if in bootup phase
err = dp.policyMgr.AddPolicy(policy, endpointList)
if err != nil {
return fmt.Errorf("[DataPlane] error while adding policy: %w", err)
}

return nil
}

Expand Down Expand Up @@ -582,14 +613,3 @@ func (dp *DataPlane) deleteIPSetsAndReferences(sets []*ipsets.TranslatedIPSet, n
}
return nil
}

func (dp *DataPlane) inBootupPhase() bool {
if !dp.applyInBackground {
return false
}

dp.applyInfo.Lock()
defer dp.applyInfo.Unlock()

return dp.applyInfo.inBootupPhase
}