diff --git a/npm/cmd/start.go b/npm/cmd/start.go index b18de9af93..bd45424611 100644 --- a/npm/cmd/start.go +++ b/npm/cmd/start.go @@ -137,11 +137,13 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error { k8sServerVersion := k8sServerVersion(clientset) var dp dataplane.GenericDataplane + stopChannel := wait.NeverStop if config.Toggles.EnableV2NPM { - dp, err = dataplane.NewDataPlane(npm.GetNodeName(), common.NewIOShim(), npmV2DataplaneCfg) + dp, err = dataplane.NewDataPlane(npm.GetNodeName(), common.NewIOShim(), npmV2DataplaneCfg, stopChannel) if err != nil { return fmt.Errorf("failed to create dataplane with error %w", err) } + dp.RunPeriodicTasks() } npMgr := npm.NewNetworkPolicyManager(config, factory, dp, exec.New(), version, k8sServerVersion) err = metrics.CreateTelemetryHandle(version, npm.GetAIMetadata()) @@ -152,7 +154,7 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error { go restserver.NPMRestServerListenAndServe(config, npMgr) - if err = npMgr.Start(config, wait.NeverStop); err != nil { + if err = npMgr.Start(config, stopChannel); err != nil { metrics.SendErrorLogAndMetric(util.NpmID, "Failed to start NPM due to %+v", err) return fmt.Errorf("failed to start with err: %w", err) } diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 19ad894c07..d205d38d11 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -27,6 +27,7 @@ type Config struct { } type DataPlane struct { + *Config policyMgr *policies.PolicyManager ipsetMgr *ipsets.IPSetManager networkID string @@ -35,7 +36,7 @@ type DataPlane struct { endpointCache map[string]*NPMEndpoint ioShim *common.IOShim updatePodCache map[string]*updateNPMPod - *Config + stopChannel <-chan struct{} } type NPMEndpoint struct { @@ -48,16 +49,17 @@ type NPMEndpoint struct { NetPolReference map[string]struct{} } -func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config) (*DataPlane, error) { +func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChannel <-chan struct{}) (*DataPlane, error) { metrics.InitializeAll() dp := &DataPlane{ + Config: cfg, policyMgr: policies.NewPolicyManager(ioShim, cfg.PolicyManagerCfg), ipsetMgr: ipsets.NewIPSetManager(cfg.IPSetManagerCfg, ioShim), endpointCache: make(map[string]*NPMEndpoint), nodeName: nodeName, ioShim: ioShim, updatePodCache: make(map[string]*updateNPMPod), - Config: cfg, + stopChannel: stopChannel, } err := dp.ResetDataPlane() @@ -74,14 +76,18 @@ func (dp *DataPlane) InitializeDataPlane() error { return nil } -// ResetDataPlane helps in cleaning up dataplane sets and policies programmed -// by NPM, returning a clean slate +// ResetDataPlane cleans the NPM sets and policies in the dataplane and performs initialization. +// TODO rename this function to BootupDataplane func (dp *DataPlane) ResetDataPlane() error { - // TODO rename this function to BootupDataplane // NOTE: used to create an all-namespaces set, but there's no need since it will be created by the control plane return dp.bootupDataPlane() } +// RunPeriodicTasks runs periodic tasks. Should only be called once. +func (dp *DataPlane) RunPeriodicTasks() { + dp.policyMgr.Reconcile(dp.stopChannel) +} + func (dp *DataPlane) GetIPSet(setName string) *ipsets.IPSet { return dp.ipsetMgr.GetIPSet(setName) } diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index f1a4590800..b44c5559b1 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -84,7 +84,7 @@ func TestNewDataPlane(t *testing.T) { calls := getBootupTestCalls() ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) assert.NotNil(t, dp) } @@ -95,7 +95,7 @@ func TestCreateAndDeleteIpSets(t *testing.T) { calls := getBootupTestCalls() ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) assert.NotNil(t, dp) setsTocreate := []*ipsets.IPSetMetadata{ @@ -137,7 +137,7 @@ func TestAddToSet(t *testing.T) { calls := getBootupTestCalls() ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) setsTocreate := []*ipsets.IPSetMetadata{ @@ -201,7 +201,7 @@ func TestApplyPolicy(t *testing.T) { calls := append(getBootupTestCalls(), getAddPolicyTestCallsForDP(&testPolicyobj)...) ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) err = dp.AddPolicy(&testPolicyobj) @@ -215,7 +215,7 @@ func TestRemovePolicy(t *testing.T) { calls = append(calls, getRemovePolicyTestCallsForDP(&testPolicyobj)...) ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) err = dp.AddPolicy(&testPolicyobj) @@ -245,7 +245,7 @@ func TestUpdatePolicy(t *testing.T) { } ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) err = dp.AddPolicy(&testPolicyobj) diff --git a/npm/pkg/dataplane/dpshim/dpshim.go b/npm/pkg/dataplane/dpshim/dpshim.go index ab356eba0b..6fc628ef2b 100644 --- a/npm/pkg/dataplane/dpshim/dpshim.go +++ b/npm/pkg/dataplane/dpshim/dpshim.go @@ -12,7 +12,7 @@ type DPShim struct { outChannel chan *protos.Events } -func NewDPSim(outChannel chan *protos.Events) *DPShim { +func NewDPShim(outChannel chan *protos.Events) *DPShim { return &DPShim{outChannel: outChannel} } @@ -24,6 +24,8 @@ func (dp *DPShim) ResetDataPlane() error { return nil } +func (dp *DPShim) RunPeriodicTasks() {} + func (dp *DPShim) GetIPSet(setName string) *ipsets.IPSet { return nil } diff --git a/npm/pkg/dataplane/mocks/genericdataplane_generated.go b/npm/pkg/dataplane/mocks/genericdataplane_generated.go index a309d6d57a..0c3bfc1352 100644 --- a/npm/pkg/dataplane/mocks/genericdataplane_generated.go +++ b/npm/pkg/dataplane/mocks/genericdataplane_generated.go @@ -2,7 +2,7 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: /mnt/c/Users/vamsi/Desktop/Microsoft_ws/azure-container-networking/npm/pkg/dataplane/types.go +// Source: /home/hunter/dev/azure-container-networking/npm/pkg/dataplane/types.go // Package mocks is a generated GoMock package. package mocks @@ -203,6 +203,18 @@ func (mr *MockGenericDataplaneMockRecorder) ResetDataPlane() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetDataPlane", reflect.TypeOf((*MockGenericDataplane)(nil).ResetDataPlane)) } +// RunPeriodicTasks mocks base method. +func (m *MockGenericDataplane) RunPeriodicTasks() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RunPeriodicTasks") +} + +// RunPeriodicTasks indicates an expected call of RunPeriodicTasks. +func (mr *MockGenericDataplaneMockRecorder) RunPeriodicTasks() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunPeriodicTasks", reflect.TypeOf((*MockGenericDataplane)(nil).RunPeriodicTasks)) +} + // UpdatePolicy mocks base method. func (m *MockGenericDataplane) UpdatePolicy(policies *policies.NPMNetworkPolicy) error { m.ctrl.T.Helper() diff --git a/npm/pkg/dataplane/policies/chain-management_linux.go b/npm/pkg/dataplane/policies/chain-management_linux.go index 1fd03cbc8a..d617ebca21 100644 --- a/npm/pkg/dataplane/policies/chain-management_linux.go +++ b/npm/pkg/dataplane/policies/chain-management_linux.go @@ -1,5 +1,7 @@ package policies +// This file contains code for booting up and reconciling iptables + import ( "errors" "fmt" @@ -64,7 +66,25 @@ type staleChains struct { } func newStaleChains() *staleChains { - return &staleChains{make(map[string]struct{})} + return &staleChains{ + chainsToCleanup: make(map[string]struct{}), + } +} + +// forceLock stops reconciling if it is running, and then locks the reconcileManager +func (rm *reconcileManager) forceLock() { + rm.releaseLockSignal <- struct{}{} + rm.Lock() +} + +// forceUnlock makes sure that the releaseLockSignal channel is empty (in case reconciling +// wasn't running when forceLock was called), and then unlocks the reconcileManager. +func (rm *reconcileManager) forceUnlock() { + select { + case <-rm.releaseLockSignal: + default: + } + rm.Unlock() } // Adds the chain if it isn't one of the iptablesAzureChains. @@ -128,6 +148,11 @@ func isBaseChain(chain string) bool { func (pMgr *PolicyManager) bootup(_ []string) error { klog.Infof("booting up iptables Azure chains") + // Stop reconciling so we don't centend for iptables, and so we don't update the staleChains at the same time as reconcile() + // Reconciling would only be happening if this function were called to reset iptables well into the azure-npm pod lifecycle. + pMgr.reconcileManager.forceLock() + defer pMgr.reconcileManager.forceUnlock() + // 1. delete the deprecated jump to AZURE-NPM deprecatedErrCode, deprecatedErr := pMgr.runIPTablesCommand(util.IptablesDeletionFlag, deprecatedJumpFromForwardToAzureChainArgs...) if deprecatedErr == nil { @@ -166,13 +191,16 @@ func (pMgr *PolicyManager) bootup(_ []string) error { } // reconcile does the following: -// - cleans up stale policy chains // - creates the jump rule from FORWARD chain to AZURE-NPM chain (if it does not exist) and makes sure it's after the jumps to KUBE-FORWARD & KUBE-SERVICES chains (if they exist). +// - cleans up stale policy chains. It can be forced to stop this process if reconcileManager.forceLock() is called. func (pMgr *PolicyManager) reconcile() { klog.Infof("repositioning azure chain jump rule") if err := pMgr.positionAzureChainJumpRule(); err != nil { klog.Errorf("failed to reconcile jump rule to Azure-NPM due to %s", err.Error()) } + + pMgr.reconcileManager.Lock() + defer pMgr.reconcileManager.Unlock() staleChains := pMgr.staleChains.emptyAndGetAll() klog.Infof("cleaning up these stale chains: %+v", staleChains) if err := pMgr.cleanupChains(staleChains); err != nil { @@ -182,19 +210,29 @@ func (pMgr *PolicyManager) reconcile() { // cleanupChains deletes all the chains in the given list. // If a chain fails to delete and it isn't one of the iptablesAzureChains, then it is added to the staleChains. -// have to use slice argument for deterministic behavior for ioshim in UTs +// This is a separate function for with a slice argument so that UTs can have deterministic behavior for ioshim. func (pMgr *PolicyManager) cleanupChains(chains []string) error { var aggregateError error - for _, chain := range chains { - errCode, err := pMgr.runIPTablesCommand(util.IptablesDestroyFlag, chain) - if err != nil && errCode != doesNotExistErrorCode { - // add to staleChains if it's not one of the iptablesAzureChains - pMgr.staleChains.add(chain) - currentErrString := fmt.Sprintf("failed to clean up chain %s with err [%v]", chain, err) - if aggregateError == nil { - aggregateError = npmerrors.SimpleError(currentErrString) - } else { - aggregateError = npmerrors.SimpleErrorWrapper(fmt.Sprintf("%s and had previous error", currentErrString), aggregateError) +deleteLoop: + for k, chain := range chains { + select { + case <-pMgr.reconcileManager.releaseLockSignal: + // if reconcileManager.forceLock() was called, then stop deleting stale chains so that reconcileManager can be unlocked right away + for j := k; j < len(chains); j++ { + pMgr.staleChains.add(chains[j]) + } + break deleteLoop + default: + errCode, err := pMgr.runIPTablesCommand(util.IptablesDestroyFlag, chain) + if err != nil && errCode != doesNotExistErrorCode { + // add to staleChains if it's not one of the iptablesAzureChains + pMgr.staleChains.add(chain) + currentErrString := fmt.Sprintf("failed to clean up chain %s with err [%v]", chain, err) + if aggregateError == nil { + aggregateError = npmerrors.SimpleError(currentErrString) + } else { + aggregateError = npmerrors.SimpleErrorWrapper(fmt.Sprintf("%s and had previous error", currentErrString), aggregateError) + } } } } @@ -232,7 +270,6 @@ func (pMgr *PolicyManager) runIPTablesCommand(operationFlag string, args ...stri // Writes the restore file for bootup, and marks the following as stale: deprecated chains and old v2 policy chains. // This is a separate function to help with UTs. func (pMgr *PolicyManager) creatorForBootup(currentChains map[string]struct{}) *ioutil.FileCreator { - pMgr.staleChains.empty() chainsToCreate := make([]string, 0, len(iptablesAzureChains)) for _, chain := range iptablesAzureChains { _, exists := currentChains[chain] @@ -244,6 +281,7 @@ func (pMgr *PolicyManager) creatorForBootup(currentChains map[string]struct{}) * // Step 2.1 in bootup() comment: cleanup old NPM chains, and configure base chains and their rules // To leave NPM deactivated, don't specify any rules for AZURE-NPM chain. creator := pMgr.newCreatorWithChains(chainsToCreate) + pMgr.staleChains.empty() for chain := range currentChains { creator.AddLine("", nil, fmt.Sprintf("-F %s", chain)) // Step 2.2 in bootup() comment: delete deprecated chains and old v2 policy chains in the background diff --git a/npm/pkg/dataplane/policies/chain-management_linux_test.go b/npm/pkg/dataplane/policies/chain-management_linux_test.go index 6f032dc863..4b3f6c03b5 100644 --- a/npm/pkg/dataplane/policies/chain-management_linux_test.go +++ b/npm/pkg/dataplane/policies/chain-management_linux_test.go @@ -47,6 +47,50 @@ Chain AZURE-NPM-ACCEPT (1 references) ` ) +func TestStaleChainsForceLock(t *testing.T) { + testChains := []string{} + for i := 0; i < 100000; i++ { + testChains = append(testChains, fmt.Sprintf("test-chain-%d", i)) + } + calls := []testutils.TestCmd{} + for _, chain := range testChains { + calls = append(calls, getFakeDestroyCommand(chain)) + } + ioshim := common.NewMockIOShim(calls) + // don't verify calls because there shouldn't be as many commands as we create if forceLock works properly + pMgr := NewPolicyManager(ioshim, ipsetConfig) + + start := make(chan struct{}, 1) + done := make(chan struct{}, 1) + go func() { + pMgr.reconcileManager.Lock() + defer pMgr.reconcileManager.Unlock() + start <- struct{}{} + require.NoError(t, pMgr.cleanupChains(testChains)) + done <- struct{}{} + }() + <-start + pMgr.reconcileManager.forceLock() + <-done + // the releaseLockSignal should be empty, there should be some stale chains, and staleChains should be unlockable + fmt.Println("weren't able to delete this many chains:", len(pMgr.staleChains.chainsToCleanup)) + require.NotEqual(t, 0, len(pMgr.staleChains.chainsToCleanup), "stale chains should not be empty") + require.Equal(t, 0, len(pMgr.reconcileManager.releaseLockSignal), "releaseLockSignal should be empty") + pMgr.reconcileManager.Unlock() +} + +func TestStaleChainsForceUnlock(t *testing.T) { + ioshim := common.NewMockIOShim(nil) + defer ioshim.VerifyCalls(t, nil) + pMgr := NewPolicyManager(ioshim, ipsetConfig) + pMgr.reconcileManager.forceLock() + require.Equal(t, 1, len(pMgr.reconcileManager.releaseLockSignal), "releaseLockSignal should be non-empty") + pMgr.reconcileManager.forceUnlock() + // the releaseLockSignal should be empty and staleChains should be lockable + require.Equal(t, 0, len(pMgr.reconcileManager.releaseLockSignal), "releaseLockSignal should be empty") + pMgr.reconcileManager.Lock() +} + func TestStaleChainsAddAndRemove(t *testing.T) { ioshim := common.NewMockIOShim(nil) defer ioshim.VerifyCalls(t, nil) diff --git a/npm/pkg/dataplane/policies/policymanager.go b/npm/pkg/dataplane/policies/policymanager.go index 05c6e4c487..f72f2d5473 100644 --- a/npm/pkg/dataplane/policies/policymanager.go +++ b/npm/pkg/dataplane/policies/policymanager.go @@ -32,12 +32,17 @@ type PolicyMap struct { cache map[string]*NPMNetworkPolicy } +type reconcileManager struct { + sync.Mutex + releaseLockSignal chan struct{} +} + type PolicyManager struct { - policyMap *PolicyMap - ioShim *common.IOShim - staleChains *staleChains + policyMap *PolicyMap + ioShim *common.IOShim + staleChains *staleChains + reconcileManager *reconcileManager *PolicyManagerCfg - sync.Mutex } func NewPolicyManager(ioShim *common.IOShim, cfg *PolicyManagerCfg) *PolicyManager { @@ -45,8 +50,11 @@ func NewPolicyManager(ioShim *common.IOShim, cfg *PolicyManagerCfg) *PolicyManag policyMap: &PolicyMap{ cache: make(map[string]*NPMNetworkPolicy), }, - ioShim: ioShim, - staleChains: newStaleChains(), + ioShim: ioShim, + staleChains: newStaleChains(), + reconcileManager: &reconcileManager{ + releaseLockSignal: make(chan struct{}, 1), + }, PolicyManagerCfg: cfg, } } @@ -58,8 +66,6 @@ func (pMgr *PolicyManager) Bootup(epIDs []string) error { return nil } -// TODO call this function in DP - func (pMgr *PolicyManager) Reconcile(stopChannel <-chan struct{}) { go func() { ticker := time.NewTicker(time.Minute * time.Duration(reconcileTimeInMinutes)) @@ -70,8 +76,6 @@ func (pMgr *PolicyManager) Reconcile(stopChannel <-chan struct{}) { case <-stopChannel: return case <-ticker.C: - pMgr.Lock() - defer pMgr.Unlock() pMgr.reconcile() } } @@ -138,7 +142,7 @@ func (pMgr *PolicyManager) RemovePolicy(policyKey string, endpointList map[strin } func (pMgr *PolicyManager) isLastPolicy() bool { - // if change our code to delete more than one policy at once, we can specify numPoliciesToDelete as an argument + // if we change our code to delete more than one policy at once, we can specify numPoliciesToDelete as an argument numPoliciesToDelete := 1 return len(pMgr.policyMap.cache) == numPoliciesToDelete } diff --git a/npm/pkg/dataplane/policies/policymanager_linux.go b/npm/pkg/dataplane/policies/policymanager_linux.go index cf1163016b..d309e210a2 100644 --- a/npm/pkg/dataplane/policies/policymanager_linux.go +++ b/npm/pkg/dataplane/policies/policymanager_linux.go @@ -1,5 +1,7 @@ package policies +// This file contains code for the iptables implementation of adding/removing policies. + import ( "fmt" @@ -20,8 +22,13 @@ const ( func (pMgr *PolicyManager) addPolicy(networkPolicy *NPMNetworkPolicy, _ map[string]string) error { // 1. Add rules for the network policies and activate NPM (if necessary). - chainsToCreate := allChainNames([]*NPMNetworkPolicy{networkPolicy}) + chainsToCreate := chainNames([]*NPMNetworkPolicy{networkPolicy}) creator := pMgr.creatorForNewNetworkPolicies(chainsToCreate, []*NPMNetworkPolicy{networkPolicy}) + + // Stop reconciling so we don't contend for iptables, and so reconcile doesn't delete chainsToCreate. + pMgr.reconcileManager.forceLock() + defer pMgr.reconcileManager.forceUnlock() + err := restore(creator) if err != nil { return npmerrors.SimpleErrorWrapper("failed to restore iptables with updated policies", err) @@ -35,6 +42,13 @@ func (pMgr *PolicyManager) addPolicy(networkPolicy *NPMNetworkPolicy, _ map[stri } func (pMgr *PolicyManager) removePolicy(networkPolicy *NPMNetworkPolicy, _ map[string]string) error { + chainsToDelete := chainNames([]*NPMNetworkPolicy{networkPolicy}) + creator := pMgr.creatorForRemovingPolicies(chainsToDelete) + + // Stop reconciling so we don't contend for iptables, and so we don't update the staleChains at the same time as reconcile() + pMgr.reconcileManager.forceLock() + defer pMgr.reconcileManager.forceUnlock() + // 1. Delete jump rules from ingress/egress chains to ingress/egress policy chains. // We ought to delete these jump rules here in the foreground since if we add an NP back after deleting, iptables-restore --noflush can add duplicate jump rules. deleteErr := pMgr.deleteOldJumpRulesOnRemove(networkPolicy) @@ -43,8 +57,6 @@ func (pMgr *PolicyManager) removePolicy(networkPolicy *NPMNetworkPolicy, _ map[s } // 2. Flush the policy chains and deactivate NPM (if necessary). - chainsToDelete := allChainNames([]*NPMNetworkPolicy{networkPolicy}) - creator := pMgr.creatorForRemovingPolicies(chainsToDelete) restoreErr := restore(creator) if restoreErr != nil { return npmerrors.SimpleErrorWrapper("failed to flush policies", restoreErr) @@ -80,8 +92,8 @@ func (pMgr *PolicyManager) creatorForRemovingPolicies(allChainNames []string) *i return creator } -// returns all chain names (ingress and egress policy chain names) -func allChainNames(networkPolicies []*NPMNetworkPolicy) []string { +// returns ingress and egress chain names for the policies +func chainNames(networkPolicies []*NPMNetworkPolicy) []string { chainNames := make([]string, 0) for _, networkPolicy := range networkPolicies { hasIngress, hasEgress := networkPolicy.hasIngressAndEgress() diff --git a/npm/pkg/dataplane/policies/policymanager_linux_test.go b/npm/pkg/dataplane/policies/policymanager_linux_test.go index 5a1b7b00fe..4c9ddf43d5 100644 --- a/npm/pkg/dataplane/policies/policymanager_linux_test.go +++ b/npm/pkg/dataplane/policies/policymanager_linux_test.go @@ -209,7 +209,7 @@ func TestCreatorForAddPolicies(t *testing.T) { // 1. test with activation policies := []*NPMNetworkPolicy{allTestNetworkPolicies[0]} - creator := pMgr.creatorForNewNetworkPolicies(allChainNames(policies), policies) + creator := pMgr.creatorForNewNetworkPolicies(chainNames(policies), policies) actualLines := strings.Split(creator.ToString(), "\n") expectedLines := []string{ "*filter", @@ -236,7 +236,7 @@ func TestCreatorForAddPolicies(t *testing.T) { // 2. test without activation // add a policy to the cache so that we don't activate (the cache doesn't impact creatorForNewNetworkPolicies) require.NoError(t, pMgr.AddPolicy(allTestNetworkPolicies[0], nil)) - creator = pMgr.creatorForNewNetworkPolicies(allChainNames(allTestNetworkPolicies), allTestNetworkPolicies) + creator = pMgr.creatorForNewNetworkPolicies(chainNames(allTestNetworkPolicies), allTestNetworkPolicies) actualLines = strings.Split(creator.ToString(), "\n") expectedLines = []string{ "*filter", @@ -272,7 +272,7 @@ func TestCreatorForRemovePolicies(t *testing.T) { // 1. test without deactivation // hack: the cache is empty (and len(cache) != len(allTestNetworkPolicies)), so shouldDeactivate will be false - creator := pMgr.creatorForRemovingPolicies(allChainNames(allTestNetworkPolicies)) + creator := pMgr.creatorForRemovingPolicies(chainNames(allTestNetworkPolicies)) actualLines := strings.Split(creator.ToString(), "\n") expectedLines := []string{ "*filter", @@ -289,7 +289,7 @@ func TestCreatorForRemovePolicies(t *testing.T) { // add to the cache so that we deactivate policy := TestNetworkPolicies[0] require.NoError(t, pMgr.AddPolicy(policy, nil)) - creator = pMgr.creatorForRemovingPolicies(allChainNames([]*NPMNetworkPolicy{policy})) + creator = pMgr.creatorForRemovingPolicies(chainNames([]*NPMNetworkPolicy{policy})) actualLines = strings.Split(creator.ToString(), "\n") expectedLines = []string{ "*filter", diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index d24210c240..28e2139a82 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -8,6 +8,7 @@ import ( type GenericDataplane interface { InitializeDataPlane() error ResetDataPlane() error + RunPeriodicTasks() GetIPSet(setName string) *ipsets.IPSet CreateIPSets(setNames []*ipsets.IPSetMetadata) DeleteIPSet(setMetadata *ipsets.IPSetMetadata) diff --git a/test/integration/npm/main.go b/test/integration/npm/main.go index 726b02e7fd..f0eba83ef7 100644 --- a/test/integration/npm/main.go +++ b/test/integration/npm/main.go @@ -11,8 +11,9 @@ import ( ) const ( - MaxSleepTime = 2 - includeLists = false + MaxSleepTime = 2 + finalWaitTimeInMinutes = 10 + includeLists = false ) var ( @@ -73,7 +74,8 @@ var ( ) func main() { - dp, err := dataplane.NewDataPlane(nodeName, common.NewIOShim(), dpCfg) + dp, err := dataplane.NewDataPlane(nodeName, common.NewIOShim(), dpCfg, make(chan struct{}, 1)) + dp.RunPeriodicTasks() panicOnError(err) printAndWait(true) @@ -120,7 +122,9 @@ func main() { dp.DeleteIPSet(ipsets.TestKVPodSet.Metadata) panicOnError(dp.ApplyDataPlane()) - panicOnError(dp.AddToLists([]*ipsets.IPSetMetadata{ipsets.TestNestedLabelList.Metadata}, []*ipsets.IPSetMetadata{ipsets.TestKVPodSet.Metadata, ipsets.TestNSSet.Metadata})) + if includeLists { + panicOnError(dp.AddToLists([]*ipsets.IPSetMetadata{ipsets.TestNestedLabelList.Metadata}, []*ipsets.IPSetMetadata{ipsets.TestKVPodSet.Metadata, ipsets.TestNSSet.Metadata})) + } printAndWait(true) panicOnError(dp.RemoveFromSets([]*ipsets.IPSetMetadata{ipsets.TestNSSet.Metadata}, podMetadata)) @@ -164,6 +168,9 @@ func main() { printAndWait(true) panicOnError(dp.AddPolicy(policies.TestNetworkPolicies[0])) fmt.Println("AZURE-NPM should have rules now") + + fmt.Println("waiting for reconcile to finish (will be a while if you don't update the reconcile time in policymanager.go") + time.Sleep(finalWaitTimeInMinutes * time.Minute) } func panicOnError(err error) {