From 215891414a5c388e2400d064fe86a94e39b89ceb Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Fri, 7 Jan 2022 16:11:08 -0800 Subject: [PATCH 01/11] call policy reconcile in dataplane --- npm/cmd/start.go | 5 +++-- npm/pkg/dataplane/dataplane.go | 19 +++++++++++++------ npm/pkg/dataplane/dataplane_test.go | 13 +++++++------ npm/pkg/dataplane/policies/policymanager.go | 6 +----- test/integration/npm/main.go | 5 ++++- 5 files changed, 28 insertions(+), 20 deletions(-) diff --git a/npm/cmd/start.go b/npm/cmd/start.go index b18de9af93..3913d21ec6 100644 --- a/npm/cmd/start.go +++ b/npm/cmd/start.go @@ -137,8 +137,9 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error { k8sServerVersion := k8sServerVersion(clientset) var dp dataplane.GenericDataplane + stopChannel := wait.NeverStop if config.Toggles.EnableV2NPM { - dp, err = dataplane.NewDataPlane(npm.GetNodeName(), common.NewIOShim(), npmV2DataplaneCfg) + dp, err = dataplane.NewDataPlane(npm.GetNodeName(), common.NewIOShim(), npmV2DataplaneCfg, stopChannel) if err != nil { return fmt.Errorf("failed to create dataplane with error %w", err) } @@ -152,7 +153,7 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error { go restserver.NPMRestServerListenAndServe(config, npMgr) - if err = npMgr.Start(config, wait.NeverStop); err != nil { + if err = npMgr.Start(config, stopChannel); err != nil { metrics.SendErrorLogAndMetric(util.NpmID, "Failed to start NPM due to %+v", err) return fmt.Errorf("failed to start with err: %w", err) } diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index b4b6800bbd..f8d19aba5e 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -24,9 +24,12 @@ type PolicyMode string type Config struct { *ipsets.IPSetManagerCfg *policies.PolicyManagerCfg + // helpful for UTs (defaults to false for external packages) + disableGoRoutines bool } type DataPlane struct { + *Config policyMgr *policies.PolicyManager ipsetMgr *ipsets.IPSetManager networkID string @@ -35,7 +38,7 @@ type DataPlane struct { endpointCache map[string]*NPMEndpoint ioShim *common.IOShim updatePodCache map[string]*updateNPMPod - *Config + stopChannel <-chan struct{} } type NPMEndpoint struct { @@ -48,16 +51,17 @@ type NPMEndpoint struct { NetPolReference map[string]struct{} } -func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config) (*DataPlane, error) { +func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChannel <-chan struct{}) (*DataPlane, error) { metrics.InitializeAll() dp := &DataPlane{ + Config: cfg, policyMgr: policies.NewPolicyManager(ioShim, cfg.PolicyManagerCfg), ipsetMgr: ipsets.NewIPSetManager(cfg.IPSetManagerCfg, ioShim), endpointCache: make(map[string]*NPMEndpoint), nodeName: nodeName, ioShim: ioShim, updatePodCache: make(map[string]*updateNPMPod), - Config: cfg, + stopChannel: stopChannel, } err := dp.ResetDataPlane() @@ -65,6 +69,10 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config) (*DataPla klog.Errorf("Failed to reset dataplane: %v", err) return nil, err } + // necessary for UTs because of ioshim + if !dp.disableGoRoutines { + dp.policyMgr.Reconcile(dp.stopChannel) + } return dp, nil } @@ -74,10 +82,9 @@ func (dp *DataPlane) InitializeDataPlane() error { return nil } -// ResetDataPlane helps in cleaning up dataplane sets and policies programmed -// by NPM, returning a clean slate +// ResetDataPlane cleans the NPM sets and policies in the dataplane and performs initialization. +// TODO rename this function to BootupDataplane func (dp *DataPlane) ResetDataPlane() error { - // TODO rename this function to BootupDataplane // NOTE: used to create an all-namespaces set, but there's no need since it will be created by the control plane return dp.bootupDataPlane() } diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index f1a4590800..42cb90850d 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -25,6 +25,7 @@ var ( PolicyManagerCfg: &policies.PolicyManagerCfg{ PolicyMode: policies.IPSetPolicyMode, }, + disableGoRoutines: true, } fakeIPSetRestoreSuccess = testutils.TestCmd{ @@ -84,7 +85,7 @@ func TestNewDataPlane(t *testing.T) { calls := getBootupTestCalls() ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) assert.NotNil(t, dp) } @@ -95,7 +96,7 @@ func TestCreateAndDeleteIpSets(t *testing.T) { calls := getBootupTestCalls() ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) assert.NotNil(t, dp) setsTocreate := []*ipsets.IPSetMetadata{ @@ -137,7 +138,7 @@ func TestAddToSet(t *testing.T) { calls := getBootupTestCalls() ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) setsTocreate := []*ipsets.IPSetMetadata{ @@ -201,7 +202,7 @@ func TestApplyPolicy(t *testing.T) { calls := append(getBootupTestCalls(), getAddPolicyTestCallsForDP(&testPolicyobj)...) ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) err = dp.AddPolicy(&testPolicyobj) @@ -215,7 +216,7 @@ func TestRemovePolicy(t *testing.T) { calls = append(calls, getRemovePolicyTestCallsForDP(&testPolicyobj)...) ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) err = dp.AddPolicy(&testPolicyobj) @@ -245,7 +246,7 @@ func TestUpdatePolicy(t *testing.T) { } ioshim := common.NewMockIOShim(calls) defer ioshim.VerifyCalls(t, calls) - dp, err := NewDataPlane("testnode", ioshim, dpCfg) + dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil) require.NoError(t, err) err = dp.AddPolicy(&testPolicyobj) diff --git a/npm/pkg/dataplane/policies/policymanager.go b/npm/pkg/dataplane/policies/policymanager.go index 05c6e4c487..4c770eb471 100644 --- a/npm/pkg/dataplane/policies/policymanager.go +++ b/npm/pkg/dataplane/policies/policymanager.go @@ -58,8 +58,6 @@ func (pMgr *PolicyManager) Bootup(epIDs []string) error { return nil } -// TODO call this function in DP - func (pMgr *PolicyManager) Reconcile(stopChannel <-chan struct{}) { go func() { ticker := time.NewTicker(time.Minute * time.Duration(reconcileTimeInMinutes)) @@ -70,8 +68,6 @@ func (pMgr *PolicyManager) Reconcile(stopChannel <-chan struct{}) { case <-stopChannel: return case <-ticker.C: - pMgr.Lock() - defer pMgr.Unlock() pMgr.reconcile() } } @@ -138,7 +134,7 @@ func (pMgr *PolicyManager) RemovePolicy(policyKey string, endpointList map[strin } func (pMgr *PolicyManager) isLastPolicy() bool { - // if change our code to delete more than one policy at once, we can specify numPoliciesToDelete as an argument + // if we change our code to delete more than one policy at once, we can specify numPoliciesToDelete as an argument numPoliciesToDelete := 1 return len(pMgr.policyMap.cache) == numPoliciesToDelete } diff --git a/test/integration/npm/main.go b/test/integration/npm/main.go index c7ff556aed..c70a88cb21 100644 --- a/test/integration/npm/main.go +++ b/test/integration/npm/main.go @@ -73,7 +73,7 @@ var ( ) func main() { - dp, err := dataplane.NewDataPlane(nodeName, common.NewIOShim(), dpCfg) + dp, err := dataplane.NewDataPlane(nodeName, common.NewIOShim(), dpCfg, nil) panicOnError(err) printAndWait(true) @@ -162,6 +162,9 @@ func main() { printAndWait(true) panicOnError(dp.AddPolicy(policies.TestNetworkPolicies[0])) fmt.Println("AZURE-NPM should have rules now") + + fmt.Println("waiting for reconcile to finish (will be a while if you don't update the reconcile time in policymanager.go") + time.Sleep(10 * time.Minute) } func panicOnError(err error) { From a5dcc10ada4aa2b6edea8f095a40bf0dbd2c08d2 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 11 Jan 2022 15:02:38 -0800 Subject: [PATCH 02/11] lock to staleChains --- .../dataplane/policies/chain-management_linux.go | 15 ++++++++++++--- npm/pkg/dataplane/policies/policymanager.go | 4 +--- .../dataplane/policies/policymanager_linux.go | 16 ++++++++++++---- .../policies/policymanager_linux_test.go | 8 ++++---- test/integration/npm/main.go | 7 ++++--- 5 files changed, 33 insertions(+), 17 deletions(-) diff --git a/npm/pkg/dataplane/policies/chain-management_linux.go b/npm/pkg/dataplane/policies/chain-management_linux.go index 1fd03cbc8a..711c0b7844 100644 --- a/npm/pkg/dataplane/policies/chain-management_linux.go +++ b/npm/pkg/dataplane/policies/chain-management_linux.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "sync" "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ioutil" @@ -61,10 +62,13 @@ var ( type staleChains struct { chainsToCleanup map[string]struct{} + sync.Mutex } func newStaleChains() *staleChains { - return &staleChains{make(map[string]struct{})} + return &staleChains{ + chainsToCleanup: make(map[string]struct{}), + } } // Adds the chain if it isn't one of the iptablesAzureChains. @@ -173,6 +177,9 @@ func (pMgr *PolicyManager) reconcile() { if err := pMgr.positionAzureChainJumpRule(); err != nil { klog.Errorf("failed to reconcile jump rule to Azure-NPM due to %s", err.Error()) } + + pMgr.staleChains.Lock() + defer pMgr.staleChains.Unlock() staleChains := pMgr.staleChains.emptyAndGetAll() klog.Infof("cleaning up these stale chains: %+v", staleChains) if err := pMgr.cleanupChains(staleChains); err != nil { @@ -182,7 +189,7 @@ func (pMgr *PolicyManager) reconcile() { // cleanupChains deletes all the chains in the given list. // If a chain fails to delete and it isn't one of the iptablesAzureChains, then it is added to the staleChains. -// have to use slice argument for deterministic behavior for ioshim in UTs +// This is a separate function for with a slice argument so that UTs can have deterministic behavior for ioshim. func (pMgr *PolicyManager) cleanupChains(chains []string) error { var aggregateError error for _, chain := range chains { @@ -232,7 +239,6 @@ func (pMgr *PolicyManager) runIPTablesCommand(operationFlag string, args ...stri // Writes the restore file for bootup, and marks the following as stale: deprecated chains and old v2 policy chains. // This is a separate function to help with UTs. func (pMgr *PolicyManager) creatorForBootup(currentChains map[string]struct{}) *ioutil.FileCreator { - pMgr.staleChains.empty() chainsToCreate := make([]string, 0, len(iptablesAzureChains)) for _, chain := range iptablesAzureChains { _, exists := currentChains[chain] @@ -244,11 +250,14 @@ func (pMgr *PolicyManager) creatorForBootup(currentChains map[string]struct{}) * // Step 2.1 in bootup() comment: cleanup old NPM chains, and configure base chains and their rules // To leave NPM deactivated, don't specify any rules for AZURE-NPM chain. creator := pMgr.newCreatorWithChains(chainsToCreate) + pMgr.staleChains.Lock() + pMgr.staleChains.empty() for chain := range currentChains { creator.AddLine("", nil, fmt.Sprintf("-F %s", chain)) // Step 2.2 in bootup() comment: delete deprecated chains and old v2 policy chains in the background pMgr.staleChains.add(chain) // won't add base chains } + pMgr.staleChains.Unlock() // add AZURE-NPM-INGRESS chain rules ingressDropSpecs := []string{util.IptablesAppendFlag, util.IptablesAzureIngressChain, util.IptablesJumpFlag, util.IptablesDrop} diff --git a/npm/pkg/dataplane/policies/policymanager.go b/npm/pkg/dataplane/policies/policymanager.go index 4c770eb471..dcec158f24 100644 --- a/npm/pkg/dataplane/policies/policymanager.go +++ b/npm/pkg/dataplane/policies/policymanager.go @@ -2,7 +2,6 @@ package policies import ( "fmt" - "sync" "time" "github.com/Azure/azure-container-networking/common" @@ -37,7 +36,6 @@ type PolicyManager struct { ioShim *common.IOShim staleChains *staleChains *PolicyManagerCfg - sync.Mutex } func NewPolicyManager(ioShim *common.IOShim, cfg *PolicyManagerCfg) *PolicyManager { @@ -60,7 +58,7 @@ func (pMgr *PolicyManager) Bootup(epIDs []string) error { func (pMgr *PolicyManager) Reconcile(stopChannel <-chan struct{}) { go func() { - ticker := time.NewTicker(time.Minute * time.Duration(reconcileTimeInMinutes)) + ticker := time.NewTicker(time.Second * time.Duration(reconcileTimeInMinutes)) defer ticker.Stop() for { diff --git a/npm/pkg/dataplane/policies/policymanager_linux.go b/npm/pkg/dataplane/policies/policymanager_linux.go index cf1163016b..77ca43bd25 100644 --- a/npm/pkg/dataplane/policies/policymanager_linux.go +++ b/npm/pkg/dataplane/policies/policymanager_linux.go @@ -20,8 +20,13 @@ const ( func (pMgr *PolicyManager) addPolicy(networkPolicy *NPMNetworkPolicy, _ map[string]string) error { // 1. Add rules for the network policies and activate NPM (if necessary). - chainsToCreate := allChainNames([]*NPMNetworkPolicy{networkPolicy}) + chainsToCreate := chainNames([]*NPMNetworkPolicy{networkPolicy}) creator := pMgr.creatorForNewNetworkPolicies(chainsToCreate, []*NPMNetworkPolicy{networkPolicy}) + + // Lock stale chains so we don't delete chainsToCreate + pMgr.staleChains.Lock() + defer pMgr.staleChains.Unlock() + err := restore(creator) if err != nil { return npmerrors.SimpleErrorWrapper("failed to restore iptables with updated policies", err) @@ -43,7 +48,7 @@ func (pMgr *PolicyManager) removePolicy(networkPolicy *NPMNetworkPolicy, _ map[s } // 2. Flush the policy chains and deactivate NPM (if necessary). - chainsToDelete := allChainNames([]*NPMNetworkPolicy{networkPolicy}) + chainsToDelete := chainNames([]*NPMNetworkPolicy{networkPolicy}) creator := pMgr.creatorForRemovingPolicies(chainsToDelete) restoreErr := restore(creator) if restoreErr != nil { @@ -51,6 +56,9 @@ func (pMgr *PolicyManager) removePolicy(networkPolicy *NPMNetworkPolicy, _ map[s } // 3. Delete policy chains in the background. + // lock here since stale chains are only affected if we successfully remove policies + pMgr.staleChains.Lock() + defer pMgr.staleChains.Unlock() for _, chain := range chainsToDelete { pMgr.staleChains.add(chain) } @@ -80,8 +88,8 @@ func (pMgr *PolicyManager) creatorForRemovingPolicies(allChainNames []string) *i return creator } -// returns all chain names (ingress and egress policy chain names) -func allChainNames(networkPolicies []*NPMNetworkPolicy) []string { +// returns ingress and egress chain names for the policies +func chainNames(networkPolicies []*NPMNetworkPolicy) []string { chainNames := make([]string, 0) for _, networkPolicy := range networkPolicies { hasIngress, hasEgress := networkPolicy.hasIngressAndEgress() diff --git a/npm/pkg/dataplane/policies/policymanager_linux_test.go b/npm/pkg/dataplane/policies/policymanager_linux_test.go index 5a1b7b00fe..4c9ddf43d5 100644 --- a/npm/pkg/dataplane/policies/policymanager_linux_test.go +++ b/npm/pkg/dataplane/policies/policymanager_linux_test.go @@ -209,7 +209,7 @@ func TestCreatorForAddPolicies(t *testing.T) { // 1. test with activation policies := []*NPMNetworkPolicy{allTestNetworkPolicies[0]} - creator := pMgr.creatorForNewNetworkPolicies(allChainNames(policies), policies) + creator := pMgr.creatorForNewNetworkPolicies(chainNames(policies), policies) actualLines := strings.Split(creator.ToString(), "\n") expectedLines := []string{ "*filter", @@ -236,7 +236,7 @@ func TestCreatorForAddPolicies(t *testing.T) { // 2. test without activation // add a policy to the cache so that we don't activate (the cache doesn't impact creatorForNewNetworkPolicies) require.NoError(t, pMgr.AddPolicy(allTestNetworkPolicies[0], nil)) - creator = pMgr.creatorForNewNetworkPolicies(allChainNames(allTestNetworkPolicies), allTestNetworkPolicies) + creator = pMgr.creatorForNewNetworkPolicies(chainNames(allTestNetworkPolicies), allTestNetworkPolicies) actualLines = strings.Split(creator.ToString(), "\n") expectedLines = []string{ "*filter", @@ -272,7 +272,7 @@ func TestCreatorForRemovePolicies(t *testing.T) { // 1. test without deactivation // hack: the cache is empty (and len(cache) != len(allTestNetworkPolicies)), so shouldDeactivate will be false - creator := pMgr.creatorForRemovingPolicies(allChainNames(allTestNetworkPolicies)) + creator := pMgr.creatorForRemovingPolicies(chainNames(allTestNetworkPolicies)) actualLines := strings.Split(creator.ToString(), "\n") expectedLines := []string{ "*filter", @@ -289,7 +289,7 @@ func TestCreatorForRemovePolicies(t *testing.T) { // add to the cache so that we deactivate policy := TestNetworkPolicies[0] require.NoError(t, pMgr.AddPolicy(policy, nil)) - creator = pMgr.creatorForRemovingPolicies(allChainNames([]*NPMNetworkPolicy{policy})) + creator = pMgr.creatorForRemovingPolicies(chainNames([]*NPMNetworkPolicy{policy})) actualLines = strings.Split(creator.ToString(), "\n") expectedLines = []string{ "*filter", diff --git a/test/integration/npm/main.go b/test/integration/npm/main.go index c70a88cb21..750bb32584 100644 --- a/test/integration/npm/main.go +++ b/test/integration/npm/main.go @@ -11,8 +11,9 @@ import ( ) const ( - MaxSleepTime = 2 - includeLists = false + MaxSleepTime = 2 + finalWaitTimeInMinutes = 10 + includeLists = false ) var ( @@ -164,7 +165,7 @@ func main() { fmt.Println("AZURE-NPM should have rules now") fmt.Println("waiting for reconcile to finish (will be a while if you don't update the reconcile time in policymanager.go") - time.Sleep(10 * time.Minute) + time.Sleep(finalWaitTimeInMinutes * time.Minute) } func panicOnError(err error) { From d9f2e1ac35a2edc9b3313ddbedd3eb856a986211 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 11 Jan 2022 16:54:30 -0800 Subject: [PATCH 03/11] allow interruption of deleting stale chains while reconciling --- .../policies/chain-management_linux.go | 34 +++++++++++--- .../policies/chain-management_linux_test.go | 44 +++++++++++++++++++ .../dataplane/policies/policymanager_linux.go | 8 ++-- 3 files changed, 77 insertions(+), 9 deletions(-) diff --git a/npm/pkg/dataplane/policies/chain-management_linux.go b/npm/pkg/dataplane/policies/chain-management_linux.go index 711c0b7844..6acfa24920 100644 --- a/npm/pkg/dataplane/policies/chain-management_linux.go +++ b/npm/pkg/dataplane/policies/chain-management_linux.go @@ -61,16 +61,31 @@ var ( ) type staleChains struct { - chainsToCleanup map[string]struct{} + chainsToCleanup map[string]struct{} + releaseLockSignal chan struct{} sync.Mutex } func newStaleChains() *staleChains { return &staleChains{ - chainsToCleanup: make(map[string]struct{}), + chainsToCleanup: make(map[string]struct{}), + releaseLockSignal: make(chan struct{}, 1), } } +func (s *staleChains) forceLock() { + s.releaseLockSignal <- struct{}{} + s.Lock() +} + +func (s *staleChains) forceUnlock() { + select { + case <-s.releaseLockSignal: + default: + } + s.Unlock() +} + // Adds the chain if it isn't one of the iptablesAzureChains. // This protects against trying to delete any core NPM chain. func (s *staleChains) add(chain string) { @@ -192,7 +207,16 @@ func (pMgr *PolicyManager) reconcile() { // This is a separate function for with a slice argument so that UTs can have deterministic behavior for ioshim. func (pMgr *PolicyManager) cleanupChains(chains []string) error { var aggregateError error - for _, chain := range chains { +deleteLoop: + for k, chain := range chains { + select { + case <-pMgr.staleChains.releaseLockSignal: + for j := k; j < len(chains); j++ { + pMgr.staleChains.add(chains[j]) + } + break deleteLoop + default: + } errCode, err := pMgr.runIPTablesCommand(util.IptablesDestroyFlag, chain) if err != nil && errCode != doesNotExistErrorCode { // add to staleChains if it's not one of the iptablesAzureChains @@ -250,14 +274,14 @@ func (pMgr *PolicyManager) creatorForBootup(currentChains map[string]struct{}) * // Step 2.1 in bootup() comment: cleanup old NPM chains, and configure base chains and their rules // To leave NPM deactivated, don't specify any rules for AZURE-NPM chain. creator := pMgr.newCreatorWithChains(chainsToCreate) - pMgr.staleChains.Lock() + pMgr.staleChains.forceLock() pMgr.staleChains.empty() for chain := range currentChains { creator.AddLine("", nil, fmt.Sprintf("-F %s", chain)) // Step 2.2 in bootup() comment: delete deprecated chains and old v2 policy chains in the background pMgr.staleChains.add(chain) // won't add base chains } - pMgr.staleChains.Unlock() + defer pMgr.staleChains.forceUnlock() // add AZURE-NPM-INGRESS chain rules ingressDropSpecs := []string{util.IptablesAppendFlag, util.IptablesAzureIngressChain, util.IptablesJumpFlag, util.IptablesDrop} diff --git a/npm/pkg/dataplane/policies/chain-management_linux_test.go b/npm/pkg/dataplane/policies/chain-management_linux_test.go index 6f032dc863..7315c45c40 100644 --- a/npm/pkg/dataplane/policies/chain-management_linux_test.go +++ b/npm/pkg/dataplane/policies/chain-management_linux_test.go @@ -47,6 +47,50 @@ Chain AZURE-NPM-ACCEPT (1 references) ` ) +func TestStaleChainsForceLock(t *testing.T) { + testChains := []string{} + for i := 0; i < 100000; i++ { + testChains = append(testChains, fmt.Sprintf("test-chain-%d", i)) + } + calls := []testutils.TestCmd{} + for _, chain := range testChains { + calls = append(calls, getFakeDestroyCommand(chain)) + } + ioshim := common.NewMockIOShim(calls) + // don't verify calls because there shouldn't be as many commands as we create if forceLock works properly + pMgr := NewPolicyManager(ioshim, ipsetConfig) + + start := make(chan struct{}, 1) + done := make(chan struct{}, 1) + go func() { + pMgr.staleChains.Lock() + defer pMgr.staleChains.Unlock() + start <- struct{}{} + pMgr.cleanupChains(testChains) + done <- struct{}{} + }() + <-start + pMgr.staleChains.forceLock() + <-done + // the releaseLockSignal should be empty, there should be some stale chains, and staleChains should be unlockable + fmt.Println("weren't able to delete this many chains:", len(pMgr.staleChains.chainsToCleanup)) + require.NotEqual(t, 0, len(pMgr.staleChains.chainsToCleanup), "stale chains should not be empty") + require.Equal(t, 0, len(pMgr.staleChains.releaseLockSignal), "releaseLockSignal should be empty") + pMgr.staleChains.Unlock() +} + +func TestStaleChainsForceUnlock(t *testing.T) { + ioshim := common.NewMockIOShim(nil) + defer ioshim.VerifyCalls(t, nil) + pMgr := NewPolicyManager(ioshim, ipsetConfig) + pMgr.staleChains.forceLock() + require.Equal(t, 1, len(pMgr.staleChains.releaseLockSignal), "releaseLockSignal should be non-empty") + pMgr.staleChains.forceUnlock() + // the releaseLockSignal should be empty and staleChains should be lockable + require.Equal(t, 0, len(pMgr.staleChains.releaseLockSignal), "releaseLockSignal should be empty") + pMgr.staleChains.Lock() +} + func TestStaleChainsAddAndRemove(t *testing.T) { ioshim := common.NewMockIOShim(nil) defer ioshim.VerifyCalls(t, nil) diff --git a/npm/pkg/dataplane/policies/policymanager_linux.go b/npm/pkg/dataplane/policies/policymanager_linux.go index 77ca43bd25..e4b3e60b5b 100644 --- a/npm/pkg/dataplane/policies/policymanager_linux.go +++ b/npm/pkg/dataplane/policies/policymanager_linux.go @@ -24,8 +24,8 @@ func (pMgr *PolicyManager) addPolicy(networkPolicy *NPMNetworkPolicy, _ map[stri creator := pMgr.creatorForNewNetworkPolicies(chainsToCreate, []*NPMNetworkPolicy{networkPolicy}) // Lock stale chains so we don't delete chainsToCreate - pMgr.staleChains.Lock() - defer pMgr.staleChains.Unlock() + pMgr.staleChains.forceLock() + defer pMgr.staleChains.forceUnlock() err := restore(creator) if err != nil { @@ -57,8 +57,8 @@ func (pMgr *PolicyManager) removePolicy(networkPolicy *NPMNetworkPolicy, _ map[s // 3. Delete policy chains in the background. // lock here since stale chains are only affected if we successfully remove policies - pMgr.staleChains.Lock() - defer pMgr.staleChains.Unlock() + pMgr.staleChains.forceLock() + defer pMgr.staleChains.forceUnlock() for _, chain := range chainsToDelete { pMgr.staleChains.add(chain) } From 7a8322481853d727c124c24caaa809dc1182cc17 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 11 Jan 2022 17:08:32 -0800 Subject: [PATCH 04/11] fix lint --- npm/pkg/dataplane/policies/chain-management_linux_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/npm/pkg/dataplane/policies/chain-management_linux_test.go b/npm/pkg/dataplane/policies/chain-management_linux_test.go index 7315c45c40..9c41091bd8 100644 --- a/npm/pkg/dataplane/policies/chain-management_linux_test.go +++ b/npm/pkg/dataplane/policies/chain-management_linux_test.go @@ -66,7 +66,7 @@ func TestStaleChainsForceLock(t *testing.T) { pMgr.staleChains.Lock() defer pMgr.staleChains.Unlock() start <- struct{}{} - pMgr.cleanupChains(testChains) + require.NoError(t, pMgr.cleanupChains(testChains)) done <- struct{}{} }() <-start From 66be2717bd7c1516ef121ff546cd5456c36fd579 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 11 Jan 2022 17:28:10 -0800 Subject: [PATCH 05/11] switch reconcile period back from seconds to minutes --- npm/pkg/dataplane/policies/policymanager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/npm/pkg/dataplane/policies/policymanager.go b/npm/pkg/dataplane/policies/policymanager.go index dcec158f24..170f5d58ac 100644 --- a/npm/pkg/dataplane/policies/policymanager.go +++ b/npm/pkg/dataplane/policies/policymanager.go @@ -58,7 +58,7 @@ func (pMgr *PolicyManager) Bootup(epIDs []string) error { func (pMgr *PolicyManager) Reconcile(stopChannel <-chan struct{}) { go func() { - ticker := time.NewTicker(time.Second * time.Duration(reconcileTimeInMinutes)) + ticker := time.NewTicker(time.Minute * time.Duration(reconcileTimeInMinutes)) defer ticker.Stop() for { From fea82838d9c7b8c23f20eaca9e8b16789e40a1e6 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Fri, 14 Jan 2022 13:32:44 -0800 Subject: [PATCH 06/11] address comments --- npm/pkg/dataplane/dataplane.go | 6 +-- npm/pkg/dataplane/dataplane_test.go | 2 +- .../policies/chain-management_linux.go | 41 +++++++++++-------- .../policies/chain-management_linux_test.go | 20 ++++----- npm/pkg/dataplane/policies/policymanager.go | 20 ++++++--- .../dataplane/policies/policymanager_linux.go | 20 +++++---- 6 files changed, 64 insertions(+), 45 deletions(-) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index f8d19aba5e..0032f3c087 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -24,8 +24,8 @@ type PolicyMode string type Config struct { *ipsets.IPSetManagerCfg *policies.PolicyManagerCfg - // helpful for UTs (defaults to false for external packages) - disableGoRoutines bool + // defaults to false for external packages + disableReconcileForUTs bool } type DataPlane struct { @@ -70,7 +70,7 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann return nil, err } // necessary for UTs because of ioshim - if !dp.disableGoRoutines { + if !dp.disableReconcileForUTs { dp.policyMgr.Reconcile(dp.stopChannel) } return dp, nil diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index 42cb90850d..1b2b5a9b66 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -25,7 +25,7 @@ var ( PolicyManagerCfg: &policies.PolicyManagerCfg{ PolicyMode: policies.IPSetPolicyMode, }, - disableGoRoutines: true, + disableReconcileForUTs: true, } fakeIPSetRestoreSuccess = testutils.TestCmd{ diff --git a/npm/pkg/dataplane/policies/chain-management_linux.go b/npm/pkg/dataplane/policies/chain-management_linux.go index 6acfa24920..9dccd75906 100644 --- a/npm/pkg/dataplane/policies/chain-management_linux.go +++ b/npm/pkg/dataplane/policies/chain-management_linux.go @@ -1,11 +1,12 @@ package policies +// This file contains code for booting up and reconciling iptables + import ( "errors" "fmt" "strconv" "strings" - "sync" "github.com/Azure/azure-container-networking/npm/metrics" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ioutil" @@ -61,29 +62,29 @@ var ( ) type staleChains struct { - chainsToCleanup map[string]struct{} - releaseLockSignal chan struct{} - sync.Mutex + chainsToCleanup map[string]struct{} } func newStaleChains() *staleChains { return &staleChains{ - chainsToCleanup: make(map[string]struct{}), - releaseLockSignal: make(chan struct{}, 1), + chainsToCleanup: make(map[string]struct{}), } } -func (s *staleChains) forceLock() { - s.releaseLockSignal <- struct{}{} - s.Lock() +// forceLock stops reconciling if it is running, and then locks the reconcileManager +func (rm *reconcileManager) forceLock() { + rm.releaseLockSignal <- struct{}{} + rm.Lock() } -func (s *staleChains) forceUnlock() { +// forceUnlock makes sure that the releaseLockSignal channel is empty (in case reconciling +// wasn't running when forceLock was called), and then unlocks the reconcileManager. +func (rm *reconcileManager) forceUnlock() { select { - case <-s.releaseLockSignal: + case <-rm.releaseLockSignal: default: } - s.Unlock() + rm.Unlock() } // Adds the chain if it isn't one of the iptablesAzureChains. @@ -147,6 +148,11 @@ func isBaseChain(chain string) bool { func (pMgr *PolicyManager) bootup(_ []string) error { klog.Infof("booting up iptables Azure chains") + // Stop reconciling so we don't centend for iptables, and so we don't update the staleChains at the same time as reconcile() + // Reconciling would only be happening if this function were called to reset iptables well into the azure-npm pod lifecycle. + pMgr.reconcileManager.forceLock() + defer pMgr.reconcileManager.forceUnlock() + // 1. delete the deprecated jump to AZURE-NPM deprecatedErrCode, deprecatedErr := pMgr.runIPTablesCommand(util.IptablesDeletionFlag, deprecatedJumpFromForwardToAzureChainArgs...) if deprecatedErr == nil { @@ -185,16 +191,16 @@ func (pMgr *PolicyManager) bootup(_ []string) error { } // reconcile does the following: -// - cleans up stale policy chains // - creates the jump rule from FORWARD chain to AZURE-NPM chain (if it does not exist) and makes sure it's after the jumps to KUBE-FORWARD & KUBE-SERVICES chains (if they exist). +// - cleans up stale policy chains. It can be forced to stop this process if reconcileManager.forceLock() is called. func (pMgr *PolicyManager) reconcile() { klog.Infof("repositioning azure chain jump rule") if err := pMgr.positionAzureChainJumpRule(); err != nil { klog.Errorf("failed to reconcile jump rule to Azure-NPM due to %s", err.Error()) } - pMgr.staleChains.Lock() - defer pMgr.staleChains.Unlock() + pMgr.reconcileManager.Lock() + defer pMgr.reconcileManager.Unlock() staleChains := pMgr.staleChains.emptyAndGetAll() klog.Infof("cleaning up these stale chains: %+v", staleChains) if err := pMgr.cleanupChains(staleChains); err != nil { @@ -210,7 +216,8 @@ func (pMgr *PolicyManager) cleanupChains(chains []string) error { deleteLoop: for k, chain := range chains { select { - case <-pMgr.staleChains.releaseLockSignal: + case <-pMgr.reconcileManager.releaseLockSignal: + // if reconcileManager.forceLock() was called, then stop deleting stale chains so that reconcileManager can be unlocked right away for j := k; j < len(chains); j++ { pMgr.staleChains.add(chains[j]) } @@ -274,14 +281,12 @@ func (pMgr *PolicyManager) creatorForBootup(currentChains map[string]struct{}) * // Step 2.1 in bootup() comment: cleanup old NPM chains, and configure base chains and their rules // To leave NPM deactivated, don't specify any rules for AZURE-NPM chain. creator := pMgr.newCreatorWithChains(chainsToCreate) - pMgr.staleChains.forceLock() pMgr.staleChains.empty() for chain := range currentChains { creator.AddLine("", nil, fmt.Sprintf("-F %s", chain)) // Step 2.2 in bootup() comment: delete deprecated chains and old v2 policy chains in the background pMgr.staleChains.add(chain) // won't add base chains } - defer pMgr.staleChains.forceUnlock() // add AZURE-NPM-INGRESS chain rules ingressDropSpecs := []string{util.IptablesAppendFlag, util.IptablesAzureIngressChain, util.IptablesJumpFlag, util.IptablesDrop} diff --git a/npm/pkg/dataplane/policies/chain-management_linux_test.go b/npm/pkg/dataplane/policies/chain-management_linux_test.go index 9c41091bd8..4b3f6c03b5 100644 --- a/npm/pkg/dataplane/policies/chain-management_linux_test.go +++ b/npm/pkg/dataplane/policies/chain-management_linux_test.go @@ -63,32 +63,32 @@ func TestStaleChainsForceLock(t *testing.T) { start := make(chan struct{}, 1) done := make(chan struct{}, 1) go func() { - pMgr.staleChains.Lock() - defer pMgr.staleChains.Unlock() + pMgr.reconcileManager.Lock() + defer pMgr.reconcileManager.Unlock() start <- struct{}{} require.NoError(t, pMgr.cleanupChains(testChains)) done <- struct{}{} }() <-start - pMgr.staleChains.forceLock() + pMgr.reconcileManager.forceLock() <-done // the releaseLockSignal should be empty, there should be some stale chains, and staleChains should be unlockable fmt.Println("weren't able to delete this many chains:", len(pMgr.staleChains.chainsToCleanup)) require.NotEqual(t, 0, len(pMgr.staleChains.chainsToCleanup), "stale chains should not be empty") - require.Equal(t, 0, len(pMgr.staleChains.releaseLockSignal), "releaseLockSignal should be empty") - pMgr.staleChains.Unlock() + require.Equal(t, 0, len(pMgr.reconcileManager.releaseLockSignal), "releaseLockSignal should be empty") + pMgr.reconcileManager.Unlock() } func TestStaleChainsForceUnlock(t *testing.T) { ioshim := common.NewMockIOShim(nil) defer ioshim.VerifyCalls(t, nil) pMgr := NewPolicyManager(ioshim, ipsetConfig) - pMgr.staleChains.forceLock() - require.Equal(t, 1, len(pMgr.staleChains.releaseLockSignal), "releaseLockSignal should be non-empty") - pMgr.staleChains.forceUnlock() + pMgr.reconcileManager.forceLock() + require.Equal(t, 1, len(pMgr.reconcileManager.releaseLockSignal), "releaseLockSignal should be non-empty") + pMgr.reconcileManager.forceUnlock() // the releaseLockSignal should be empty and staleChains should be lockable - require.Equal(t, 0, len(pMgr.staleChains.releaseLockSignal), "releaseLockSignal should be empty") - pMgr.staleChains.Lock() + require.Equal(t, 0, len(pMgr.reconcileManager.releaseLockSignal), "releaseLockSignal should be empty") + pMgr.reconcileManager.Lock() } func TestStaleChainsAddAndRemove(t *testing.T) { diff --git a/npm/pkg/dataplane/policies/policymanager.go b/npm/pkg/dataplane/policies/policymanager.go index 170f5d58ac..f72f2d5473 100644 --- a/npm/pkg/dataplane/policies/policymanager.go +++ b/npm/pkg/dataplane/policies/policymanager.go @@ -2,6 +2,7 @@ package policies import ( "fmt" + "sync" "time" "github.com/Azure/azure-container-networking/common" @@ -31,10 +32,16 @@ type PolicyMap struct { cache map[string]*NPMNetworkPolicy } +type reconcileManager struct { + sync.Mutex + releaseLockSignal chan struct{} +} + type PolicyManager struct { - policyMap *PolicyMap - ioShim *common.IOShim - staleChains *staleChains + policyMap *PolicyMap + ioShim *common.IOShim + staleChains *staleChains + reconcileManager *reconcileManager *PolicyManagerCfg } @@ -43,8 +50,11 @@ func NewPolicyManager(ioShim *common.IOShim, cfg *PolicyManagerCfg) *PolicyManag policyMap: &PolicyMap{ cache: make(map[string]*NPMNetworkPolicy), }, - ioShim: ioShim, - staleChains: newStaleChains(), + ioShim: ioShim, + staleChains: newStaleChains(), + reconcileManager: &reconcileManager{ + releaseLockSignal: make(chan struct{}, 1), + }, PolicyManagerCfg: cfg, } } diff --git a/npm/pkg/dataplane/policies/policymanager_linux.go b/npm/pkg/dataplane/policies/policymanager_linux.go index e4b3e60b5b..d309e210a2 100644 --- a/npm/pkg/dataplane/policies/policymanager_linux.go +++ b/npm/pkg/dataplane/policies/policymanager_linux.go @@ -1,5 +1,7 @@ package policies +// This file contains code for the iptables implementation of adding/removing policies. + import ( "fmt" @@ -23,9 +25,9 @@ func (pMgr *PolicyManager) addPolicy(networkPolicy *NPMNetworkPolicy, _ map[stri chainsToCreate := chainNames([]*NPMNetworkPolicy{networkPolicy}) creator := pMgr.creatorForNewNetworkPolicies(chainsToCreate, []*NPMNetworkPolicy{networkPolicy}) - // Lock stale chains so we don't delete chainsToCreate - pMgr.staleChains.forceLock() - defer pMgr.staleChains.forceUnlock() + // Stop reconciling so we don't contend for iptables, and so reconcile doesn't delete chainsToCreate. + pMgr.reconcileManager.forceLock() + defer pMgr.reconcileManager.forceUnlock() err := restore(creator) if err != nil { @@ -40,6 +42,13 @@ func (pMgr *PolicyManager) addPolicy(networkPolicy *NPMNetworkPolicy, _ map[stri } func (pMgr *PolicyManager) removePolicy(networkPolicy *NPMNetworkPolicy, _ map[string]string) error { + chainsToDelete := chainNames([]*NPMNetworkPolicy{networkPolicy}) + creator := pMgr.creatorForRemovingPolicies(chainsToDelete) + + // Stop reconciling so we don't contend for iptables, and so we don't update the staleChains at the same time as reconcile() + pMgr.reconcileManager.forceLock() + defer pMgr.reconcileManager.forceUnlock() + // 1. Delete jump rules from ingress/egress chains to ingress/egress policy chains. // We ought to delete these jump rules here in the foreground since if we add an NP back after deleting, iptables-restore --noflush can add duplicate jump rules. deleteErr := pMgr.deleteOldJumpRulesOnRemove(networkPolicy) @@ -48,17 +57,12 @@ func (pMgr *PolicyManager) removePolicy(networkPolicy *NPMNetworkPolicy, _ map[s } // 2. Flush the policy chains and deactivate NPM (if necessary). - chainsToDelete := chainNames([]*NPMNetworkPolicy{networkPolicy}) - creator := pMgr.creatorForRemovingPolicies(chainsToDelete) restoreErr := restore(creator) if restoreErr != nil { return npmerrors.SimpleErrorWrapper("failed to flush policies", restoreErr) } // 3. Delete policy chains in the background. - // lock here since stale chains are only affected if we successfully remove policies - pMgr.staleChains.forceLock() - defer pMgr.staleChains.forceUnlock() for _, chain := range chainsToDelete { pMgr.staleChains.add(chain) } From e433728d06e25aad10b893d860e5b16684990592 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 18 Jan 2022 14:31:48 -0800 Subject: [PATCH 07/11] address comments --- npm/cmd/start.go | 1 + npm/pkg/dataplane/dataplane.go | 11 +++++----- npm/pkg/dataplane/dataplane_test.go | 1 - .../mocks/genericdataplane_generated.go | 14 ++++++++++++- .../policies/chain-management_linux.go | 20 +++++++++---------- npm/pkg/dataplane/types.go | 1 + test/integration/npm/main.go | 3 ++- 7 files changed, 32 insertions(+), 19 deletions(-) diff --git a/npm/cmd/start.go b/npm/cmd/start.go index 3913d21ec6..bd45424611 100644 --- a/npm/cmd/start.go +++ b/npm/cmd/start.go @@ -143,6 +143,7 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error { if err != nil { return fmt.Errorf("failed to create dataplane with error %w", err) } + dp.RunPeriodicTasks() } npMgr := npm.NewNetworkPolicyManager(config, factory, dp, exec.New(), version, k8sServerVersion) err = metrics.CreateTelemetryHandle(version, npm.GetAIMetadata()) diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index 0032f3c087..bbc8bd15c8 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -24,8 +24,6 @@ type PolicyMode string type Config struct { *ipsets.IPSetManagerCfg *policies.PolicyManagerCfg - // defaults to false for external packages - disableReconcileForUTs bool } type DataPlane struct { @@ -69,10 +67,6 @@ func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChann klog.Errorf("Failed to reset dataplane: %v", err) return nil, err } - // necessary for UTs because of ioshim - if !dp.disableReconcileForUTs { - dp.policyMgr.Reconcile(dp.stopChannel) - } return dp, nil } @@ -89,6 +83,11 @@ func (dp *DataPlane) ResetDataPlane() error { return dp.bootupDataPlane() } +// RunPeriodicTasks runs periodic tasks. Should only be called once. +func (dp *DataPlane) RunPeriodicTasks() { + dp.policyMgr.Reconcile(dp.stopChannel) +} + // CreateIPSets takes in a set object and updates local cache with this set func (dp *DataPlane) CreateIPSets(setMetadata []*ipsets.IPSetMetadata) { dp.ipsetMgr.CreateIPSets(setMetadata) diff --git a/npm/pkg/dataplane/dataplane_test.go b/npm/pkg/dataplane/dataplane_test.go index 1b2b5a9b66..b44c5559b1 100644 --- a/npm/pkg/dataplane/dataplane_test.go +++ b/npm/pkg/dataplane/dataplane_test.go @@ -25,7 +25,6 @@ var ( PolicyManagerCfg: &policies.PolicyManagerCfg{ PolicyMode: policies.IPSetPolicyMode, }, - disableReconcileForUTs: true, } fakeIPSetRestoreSuccess = testutils.TestCmd{ diff --git a/npm/pkg/dataplane/mocks/genericdataplane_generated.go b/npm/pkg/dataplane/mocks/genericdataplane_generated.go index 5dd6243e6e..994fcc7f43 100644 --- a/npm/pkg/dataplane/mocks/genericdataplane_generated.go +++ b/npm/pkg/dataplane/mocks/genericdataplane_generated.go @@ -2,7 +2,7 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: /home/nitishm/github/nitishm/azure-container-networking/npm/pkg/dataplane/types.go +// Source: /home/hunter/dev/azure-container-networking/npm/pkg/dataplane/types.go // Package mocks is a generated GoMock package. package mocks @@ -189,6 +189,18 @@ func (mr *MockGenericDataplaneMockRecorder) ResetDataPlane() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetDataPlane", reflect.TypeOf((*MockGenericDataplane)(nil).ResetDataPlane)) } +// RunPeriodicTasks mocks base method. +func (m *MockGenericDataplane) RunPeriodicTasks() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RunPeriodicTasks") +} + +// RunPeriodicTasks indicates an expected call of RunPeriodicTasks. +func (mr *MockGenericDataplaneMockRecorder) RunPeriodicTasks() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunPeriodicTasks", reflect.TypeOf((*MockGenericDataplane)(nil).RunPeriodicTasks)) +} + // UpdatePolicy mocks base method. func (m *MockGenericDataplane) UpdatePolicy(policies *policies.NPMNetworkPolicy) error { m.ctrl.T.Helper() diff --git a/npm/pkg/dataplane/policies/chain-management_linux.go b/npm/pkg/dataplane/policies/chain-management_linux.go index 9dccd75906..d617ebca21 100644 --- a/npm/pkg/dataplane/policies/chain-management_linux.go +++ b/npm/pkg/dataplane/policies/chain-management_linux.go @@ -223,16 +223,16 @@ deleteLoop: } break deleteLoop default: - } - errCode, err := pMgr.runIPTablesCommand(util.IptablesDestroyFlag, chain) - if err != nil && errCode != doesNotExistErrorCode { - // add to staleChains if it's not one of the iptablesAzureChains - pMgr.staleChains.add(chain) - currentErrString := fmt.Sprintf("failed to clean up chain %s with err [%v]", chain, err) - if aggregateError == nil { - aggregateError = npmerrors.SimpleError(currentErrString) - } else { - aggregateError = npmerrors.SimpleErrorWrapper(fmt.Sprintf("%s and had previous error", currentErrString), aggregateError) + errCode, err := pMgr.runIPTablesCommand(util.IptablesDestroyFlag, chain) + if err != nil && errCode != doesNotExistErrorCode { + // add to staleChains if it's not one of the iptablesAzureChains + pMgr.staleChains.add(chain) + currentErrString := fmt.Sprintf("failed to clean up chain %s with err [%v]", chain, err) + if aggregateError == nil { + aggregateError = npmerrors.SimpleError(currentErrString) + } else { + aggregateError = npmerrors.SimpleErrorWrapper(fmt.Sprintf("%s and had previous error", currentErrString), aggregateError) + } } } } diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index 88a48a1165..47b3032823 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -8,6 +8,7 @@ import ( type GenericDataplane interface { InitializeDataPlane() error ResetDataPlane() error + RunPeriodicTasks() CreateIPSets(setNames []*ipsets.IPSetMetadata) DeleteIPSet(setMetadata *ipsets.IPSetMetadata) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *PodMetadata) error diff --git a/test/integration/npm/main.go b/test/integration/npm/main.go index 750bb32584..c9c7127bc1 100644 --- a/test/integration/npm/main.go +++ b/test/integration/npm/main.go @@ -74,7 +74,8 @@ var ( ) func main() { - dp, err := dataplane.NewDataPlane(nodeName, common.NewIOShim(), dpCfg, nil) + dp, err := dataplane.NewDataPlane(nodeName, common.NewIOShim(), dpCfg, make(chan struct{}, 1)) + dp.RunPeriodicTasks() panicOnError(err) printAndWait(true) From a0d13776d18f6838d235a05808459fc711320206 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 18 Jan 2022 14:57:24 -0800 Subject: [PATCH 08/11] remove RunPeriodicTasks from GenericDP interface --- .../dataplane/mocks/genericdataplane_generated.go | 14 +------------- npm/pkg/dataplane/types.go | 1 - 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/npm/pkg/dataplane/mocks/genericdataplane_generated.go b/npm/pkg/dataplane/mocks/genericdataplane_generated.go index 994fcc7f43..5dd6243e6e 100644 --- a/npm/pkg/dataplane/mocks/genericdataplane_generated.go +++ b/npm/pkg/dataplane/mocks/genericdataplane_generated.go @@ -2,7 +2,7 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: /home/hunter/dev/azure-container-networking/npm/pkg/dataplane/types.go +// Source: /home/nitishm/github/nitishm/azure-container-networking/npm/pkg/dataplane/types.go // Package mocks is a generated GoMock package. package mocks @@ -189,18 +189,6 @@ func (mr *MockGenericDataplaneMockRecorder) ResetDataPlane() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetDataPlane", reflect.TypeOf((*MockGenericDataplane)(nil).ResetDataPlane)) } -// RunPeriodicTasks mocks base method. -func (m *MockGenericDataplane) RunPeriodicTasks() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "RunPeriodicTasks") -} - -// RunPeriodicTasks indicates an expected call of RunPeriodicTasks. -func (mr *MockGenericDataplaneMockRecorder) RunPeriodicTasks() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunPeriodicTasks", reflect.TypeOf((*MockGenericDataplane)(nil).RunPeriodicTasks)) -} - // UpdatePolicy mocks base method. func (m *MockGenericDataplane) UpdatePolicy(policies *policies.NPMNetworkPolicy) error { m.ctrl.T.Helper() diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index 47b3032823..88a48a1165 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -8,7 +8,6 @@ import ( type GenericDataplane interface { InitializeDataPlane() error ResetDataPlane() error - RunPeriodicTasks() CreateIPSets(setNames []*ipsets.IPSetMetadata) DeleteIPSet(setMetadata *ipsets.IPSetMetadata) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *PodMetadata) error From de3d6e20c58c9f48c9c5405070aa947325a8a669 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 18 Jan 2022 14:59:35 -0800 Subject: [PATCH 09/11] fix build error --- npm/cmd/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/npm/cmd/start.go b/npm/cmd/start.go index bd45424611..fb1018870e 100644 --- a/npm/cmd/start.go +++ b/npm/cmd/start.go @@ -136,7 +136,7 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error { k8sServerVersion := k8sServerVersion(clientset) - var dp dataplane.GenericDataplane + var dp *dataplane.DataPlane stopChannel := wait.NeverStop if config.Toggles.EnableV2NPM { dp, err = dataplane.NewDataPlane(npm.GetNodeName(), common.NewIOShim(), npmV2DataplaneCfg, stopChannel) From e1e4a3e4f13d1faafd54e0679bd95945d1cb4376 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 18 Jan 2022 15:18:40 -0800 Subject: [PATCH 10/11] Revert "fix build error" This reverts commit de3d6e20c58c9f48c9c5405070aa947325a8a669. --- npm/cmd/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/npm/cmd/start.go b/npm/cmd/start.go index fb1018870e..bd45424611 100644 --- a/npm/cmd/start.go +++ b/npm/cmd/start.go @@ -136,7 +136,7 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error { k8sServerVersion := k8sServerVersion(clientset) - var dp *dataplane.DataPlane + var dp dataplane.GenericDataplane stopChannel := wait.NeverStop if config.Toggles.EnableV2NPM { dp, err = dataplane.NewDataPlane(npm.GetNodeName(), common.NewIOShim(), npmV2DataplaneCfg, stopChannel) From 729f7f88f1909414c34ebbbbacfe84aa33c1e0b8 Mon Sep 17 00:00:00 2001 From: Hunter Gregory Date: Tue, 18 Jan 2022 15:24:08 -0800 Subject: [PATCH 11/11] make RunPeriodicTasks an interface method again --- npm/pkg/dataplane/dpshim/dpshim.go | 4 +++- .../dataplane/mocks/genericdataplane_generated.go | 14 +++++++++++++- npm/pkg/dataplane/types.go | 1 + 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/npm/pkg/dataplane/dpshim/dpshim.go b/npm/pkg/dataplane/dpshim/dpshim.go index ab356eba0b..6fc628ef2b 100644 --- a/npm/pkg/dataplane/dpshim/dpshim.go +++ b/npm/pkg/dataplane/dpshim/dpshim.go @@ -12,7 +12,7 @@ type DPShim struct { outChannel chan *protos.Events } -func NewDPSim(outChannel chan *protos.Events) *DPShim { +func NewDPShim(outChannel chan *protos.Events) *DPShim { return &DPShim{outChannel: outChannel} } @@ -24,6 +24,8 @@ func (dp *DPShim) ResetDataPlane() error { return nil } +func (dp *DPShim) RunPeriodicTasks() {} + func (dp *DPShim) GetIPSet(setName string) *ipsets.IPSet { return nil } diff --git a/npm/pkg/dataplane/mocks/genericdataplane_generated.go b/npm/pkg/dataplane/mocks/genericdataplane_generated.go index a309d6d57a..0c3bfc1352 100644 --- a/npm/pkg/dataplane/mocks/genericdataplane_generated.go +++ b/npm/pkg/dataplane/mocks/genericdataplane_generated.go @@ -2,7 +2,7 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: /mnt/c/Users/vamsi/Desktop/Microsoft_ws/azure-container-networking/npm/pkg/dataplane/types.go +// Source: /home/hunter/dev/azure-container-networking/npm/pkg/dataplane/types.go // Package mocks is a generated GoMock package. package mocks @@ -203,6 +203,18 @@ func (mr *MockGenericDataplaneMockRecorder) ResetDataPlane() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetDataPlane", reflect.TypeOf((*MockGenericDataplane)(nil).ResetDataPlane)) } +// RunPeriodicTasks mocks base method. +func (m *MockGenericDataplane) RunPeriodicTasks() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RunPeriodicTasks") +} + +// RunPeriodicTasks indicates an expected call of RunPeriodicTasks. +func (mr *MockGenericDataplaneMockRecorder) RunPeriodicTasks() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunPeriodicTasks", reflect.TypeOf((*MockGenericDataplane)(nil).RunPeriodicTasks)) +} + // UpdatePolicy mocks base method. func (m *MockGenericDataplane) UpdatePolicy(policies *policies.NPMNetworkPolicy) error { m.ctrl.T.Helper() diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index d24210c240..28e2139a82 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -8,6 +8,7 @@ import ( type GenericDataplane interface { InitializeDataPlane() error ResetDataPlane() error + RunPeriodicTasks() GetIPSet(setName string) *ipsets.IPSet CreateIPSets(setNames []*ipsets.IPSetMetadata) DeleteIPSet(setMetadata *ipsets.IPSetMetadata)