Skip to content
6 changes: 4 additions & 2 deletions npm/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,13 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error {
k8sServerVersion := k8sServerVersion(clientset)

var dp dataplane.GenericDataplane
stopChannel := wait.NeverStop
if config.Toggles.EnableV2NPM {
dp, err = dataplane.NewDataPlane(npm.GetNodeName(), common.NewIOShim(), npmV2DataplaneCfg)
dp, err = dataplane.NewDataPlane(npm.GetNodeName(), common.NewIOShim(), npmV2DataplaneCfg, stopChannel)
if err != nil {
return fmt.Errorf("failed to create dataplane with error %w", err)
}
dp.RunPeriodicTasks()
}
npMgr := npm.NewNetworkPolicyManager(config, factory, dp, exec.New(), version, k8sServerVersion)
err = metrics.CreateTelemetryHandle(version, npm.GetAIMetadata())
Expand All @@ -152,7 +154,7 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error {

go restserver.NPMRestServerListenAndServe(config, npMgr)

if err = npMgr.Start(config, wait.NeverStop); err != nil {
if err = npMgr.Start(config, stopChannel); err != nil {
metrics.SendErrorLogAndMetric(util.NpmID, "Failed to start NPM due to %+v", err)
return fmt.Errorf("failed to start with err: %w", err)
}
Expand Down
18 changes: 12 additions & 6 deletions npm/pkg/dataplane/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
}

type DataPlane struct {
*Config
policyMgr *policies.PolicyManager
ipsetMgr *ipsets.IPSetManager
networkID string
Expand All @@ -35,7 +36,7 @@ type DataPlane struct {
endpointCache map[string]*NPMEndpoint
ioShim *common.IOShim
updatePodCache map[string]*updateNPMPod
*Config
stopChannel <-chan struct{}
}

type NPMEndpoint struct {
Expand All @@ -48,16 +49,17 @@ type NPMEndpoint struct {
NetPolReference map[string]struct{}
}

func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config) (*DataPlane, error) {
func NewDataPlane(nodeName string, ioShim *common.IOShim, cfg *Config, stopChannel <-chan struct{}) (*DataPlane, error) {
metrics.InitializeAll()
dp := &DataPlane{
Config: cfg,
policyMgr: policies.NewPolicyManager(ioShim, cfg.PolicyManagerCfg),
ipsetMgr: ipsets.NewIPSetManager(cfg.IPSetManagerCfg, ioShim),
endpointCache: make(map[string]*NPMEndpoint),
nodeName: nodeName,
ioShim: ioShim,
updatePodCache: make(map[string]*updateNPMPod),
Config: cfg,
stopChannel: stopChannel,
}

err := dp.ResetDataPlane()
Expand All @@ -74,14 +76,18 @@ func (dp *DataPlane) InitializeDataPlane() error {
return nil
}

// ResetDataPlane helps in cleaning up dataplane sets and policies programmed
// by NPM, returning a clean slate
// ResetDataPlane cleans the NPM sets and policies in the dataplane and performs initialization.
// TODO rename this function to BootupDataplane
func (dp *DataPlane) ResetDataPlane() error {
// TODO rename this function to BootupDataplane
// NOTE: used to create an all-namespaces set, but there's no need since it will be created by the control plane
return dp.bootupDataPlane()
}

// RunPeriodicTasks runs periodic tasks. Should only be called once.
func (dp *DataPlane) RunPeriodicTasks() {
dp.policyMgr.Reconcile(dp.stopChannel)
}

func (dp *DataPlane) GetIPSet(setName string) *ipsets.IPSet {
return dp.ipsetMgr.GetIPSet(setName)
}
Expand Down
12 changes: 6 additions & 6 deletions npm/pkg/dataplane/dataplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestNewDataPlane(t *testing.T) {
calls := getBootupTestCalls()
ioshim := common.NewMockIOShim(calls)
defer ioshim.VerifyCalls(t, calls)
dp, err := NewDataPlane("testnode", ioshim, dpCfg)
dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil)
require.NoError(t, err)
assert.NotNil(t, dp)
}
Expand All @@ -95,7 +95,7 @@ func TestCreateAndDeleteIpSets(t *testing.T) {
calls := getBootupTestCalls()
ioshim := common.NewMockIOShim(calls)
defer ioshim.VerifyCalls(t, calls)
dp, err := NewDataPlane("testnode", ioshim, dpCfg)
dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil)
require.NoError(t, err)
assert.NotNil(t, dp)
setsTocreate := []*ipsets.IPSetMetadata{
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestAddToSet(t *testing.T) {
calls := getBootupTestCalls()
ioshim := common.NewMockIOShim(calls)
defer ioshim.VerifyCalls(t, calls)
dp, err := NewDataPlane("testnode", ioshim, dpCfg)
dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil)
require.NoError(t, err)

setsTocreate := []*ipsets.IPSetMetadata{
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestApplyPolicy(t *testing.T) {
calls := append(getBootupTestCalls(), getAddPolicyTestCallsForDP(&testPolicyobj)...)
ioshim := common.NewMockIOShim(calls)
defer ioshim.VerifyCalls(t, calls)
dp, err := NewDataPlane("testnode", ioshim, dpCfg)
dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil)
require.NoError(t, err)

err = dp.AddPolicy(&testPolicyobj)
Expand All @@ -215,7 +215,7 @@ func TestRemovePolicy(t *testing.T) {
calls = append(calls, getRemovePolicyTestCallsForDP(&testPolicyobj)...)
ioshim := common.NewMockIOShim(calls)
defer ioshim.VerifyCalls(t, calls)
dp, err := NewDataPlane("testnode", ioshim, dpCfg)
dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil)
require.NoError(t, err)

err = dp.AddPolicy(&testPolicyobj)
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestUpdatePolicy(t *testing.T) {
}
ioshim := common.NewMockIOShim(calls)
defer ioshim.VerifyCalls(t, calls)
dp, err := NewDataPlane("testnode", ioshim, dpCfg)
dp, err := NewDataPlane("testnode", ioshim, dpCfg, nil)
require.NoError(t, err)

err = dp.AddPolicy(&testPolicyobj)
Expand Down
4 changes: 3 additions & 1 deletion npm/pkg/dataplane/dpshim/dpshim.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type DPShim struct {
outChannel chan *protos.Events
}

func NewDPSim(outChannel chan *protos.Events) *DPShim {
func NewDPShim(outChannel chan *protos.Events) *DPShim {
return &DPShim{outChannel: outChannel}
}

Expand All @@ -24,6 +24,8 @@ func (dp *DPShim) ResetDataPlane() error {
return nil
}

func (dp *DPShim) RunPeriodicTasks() {}

func (dp *DPShim) GetIPSet(setName string) *ipsets.IPSet {
return nil
}
Expand Down
14 changes: 13 additions & 1 deletion npm/pkg/dataplane/mocks/genericdataplane_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 52 additions & 14 deletions npm/pkg/dataplane/policies/chain-management_linux.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package policies

// This file contains code for booting up and reconciling iptables

import (
"errors"
"fmt"
Expand Down Expand Up @@ -64,7 +66,25 @@ type staleChains struct {
}

func newStaleChains() *staleChains {
return &staleChains{make(map[string]struct{})}
return &staleChains{
chainsToCleanup: make(map[string]struct{}),
}
}

// forceLock stops reconciling if it is running, and then locks the reconcileManager
func (rm *reconcileManager) forceLock() {
rm.releaseLockSignal <- struct{}{}
rm.Lock()
}

// forceUnlock makes sure that the releaseLockSignal channel is empty (in case reconciling
// wasn't running when forceLock was called), and then unlocks the reconcileManager.
func (rm *reconcileManager) forceUnlock() {
select {
case <-rm.releaseLockSignal:
default:
}
rm.Unlock()
}

// Adds the chain if it isn't one of the iptablesAzureChains.
Expand Down Expand Up @@ -128,6 +148,11 @@ func isBaseChain(chain string) bool {
func (pMgr *PolicyManager) bootup(_ []string) error {
klog.Infof("booting up iptables Azure chains")

// Stop reconciling so we don't centend for iptables, and so we don't update the staleChains at the same time as reconcile()
// Reconciling would only be happening if this function were called to reset iptables well into the azure-npm pod lifecycle.
pMgr.reconcileManager.forceLock()
defer pMgr.reconcileManager.forceUnlock()

// 1. delete the deprecated jump to AZURE-NPM
deprecatedErrCode, deprecatedErr := pMgr.runIPTablesCommand(util.IptablesDeletionFlag, deprecatedJumpFromForwardToAzureChainArgs...)
if deprecatedErr == nil {
Expand Down Expand Up @@ -166,13 +191,16 @@ func (pMgr *PolicyManager) bootup(_ []string) error {
}

// reconcile does the following:
// - cleans up stale policy chains
// - creates the jump rule from FORWARD chain to AZURE-NPM chain (if it does not exist) and makes sure it's after the jumps to KUBE-FORWARD & KUBE-SERVICES chains (if they exist).
// - cleans up stale policy chains. It can be forced to stop this process if reconcileManager.forceLock() is called.
func (pMgr *PolicyManager) reconcile() {
klog.Infof("repositioning azure chain jump rule")
if err := pMgr.positionAzureChainJumpRule(); err != nil {
klog.Errorf("failed to reconcile jump rule to Azure-NPM due to %s", err.Error())
}

pMgr.reconcileManager.Lock()
defer pMgr.reconcileManager.Unlock()
staleChains := pMgr.staleChains.emptyAndGetAll()
klog.Infof("cleaning up these stale chains: %+v", staleChains)
if err := pMgr.cleanupChains(staleChains); err != nil {
Expand All @@ -182,19 +210,29 @@ func (pMgr *PolicyManager) reconcile() {

// cleanupChains deletes all the chains in the given list.
// If a chain fails to delete and it isn't one of the iptablesAzureChains, then it is added to the staleChains.
// have to use slice argument for deterministic behavior for ioshim in UTs
// This is a separate function for with a slice argument so that UTs can have deterministic behavior for ioshim.
func (pMgr *PolicyManager) cleanupChains(chains []string) error {
var aggregateError error
for _, chain := range chains {
errCode, err := pMgr.runIPTablesCommand(util.IptablesDestroyFlag, chain)
if err != nil && errCode != doesNotExistErrorCode {
// add to staleChains if it's not one of the iptablesAzureChains
pMgr.staleChains.add(chain)
currentErrString := fmt.Sprintf("failed to clean up chain %s with err [%v]", chain, err)
if aggregateError == nil {
aggregateError = npmerrors.SimpleError(currentErrString)
} else {
aggregateError = npmerrors.SimpleErrorWrapper(fmt.Sprintf("%s and had previous error", currentErrString), aggregateError)
deleteLoop:
for k, chain := range chains {
select {
case <-pMgr.reconcileManager.releaseLockSignal:
// if reconcileManager.forceLock() was called, then stop deleting stale chains so that reconcileManager can be unlocked right away
for j := k; j < len(chains); j++ {
pMgr.staleChains.add(chains[j])
}
break deleteLoop
default:
errCode, err := pMgr.runIPTablesCommand(util.IptablesDestroyFlag, chain)
if err != nil && errCode != doesNotExistErrorCode {
// add to staleChains if it's not one of the iptablesAzureChains
pMgr.staleChains.add(chain)
currentErrString := fmt.Sprintf("failed to clean up chain %s with err [%v]", chain, err)
if aggregateError == nil {
aggregateError = npmerrors.SimpleError(currentErrString)
} else {
aggregateError = npmerrors.SimpleErrorWrapper(fmt.Sprintf("%s and had previous error", currentErrString), aggregateError)
}
}
}
}
Expand Down Expand Up @@ -232,7 +270,6 @@ func (pMgr *PolicyManager) runIPTablesCommand(operationFlag string, args ...stri
// Writes the restore file for bootup, and marks the following as stale: deprecated chains and old v2 policy chains.
// This is a separate function to help with UTs.
func (pMgr *PolicyManager) creatorForBootup(currentChains map[string]struct{}) *ioutil.FileCreator {
pMgr.staleChains.empty()
chainsToCreate := make([]string, 0, len(iptablesAzureChains))
for _, chain := range iptablesAzureChains {
_, exists := currentChains[chain]
Expand All @@ -244,6 +281,7 @@ func (pMgr *PolicyManager) creatorForBootup(currentChains map[string]struct{}) *
// Step 2.1 in bootup() comment: cleanup old NPM chains, and configure base chains and their rules
// To leave NPM deactivated, don't specify any rules for AZURE-NPM chain.
creator := pMgr.newCreatorWithChains(chainsToCreate)
pMgr.staleChains.empty()
for chain := range currentChains {
creator.AddLine("", nil, fmt.Sprintf("-F %s", chain))
// Step 2.2 in bootup() comment: delete deprecated chains and old v2 policy chains in the background
Expand Down
44 changes: 44 additions & 0 deletions npm/pkg/dataplane/policies/chain-management_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,50 @@ Chain AZURE-NPM-ACCEPT (1 references)
`
)

func TestStaleChainsForceLock(t *testing.T) {
testChains := []string{}
for i := 0; i < 100000; i++ {
testChains = append(testChains, fmt.Sprintf("test-chain-%d", i))
}
calls := []testutils.TestCmd{}
for _, chain := range testChains {
calls = append(calls, getFakeDestroyCommand(chain))
}
ioshim := common.NewMockIOShim(calls)
// don't verify calls because there shouldn't be as many commands as we create if forceLock works properly
pMgr := NewPolicyManager(ioshim, ipsetConfig)

start := make(chan struct{}, 1)
done := make(chan struct{}, 1)
go func() {
pMgr.reconcileManager.Lock()
defer pMgr.reconcileManager.Unlock()
start <- struct{}{}
require.NoError(t, pMgr.cleanupChains(testChains))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for cleanupChains function can we move

errCode, err := pMgr.runIPTablesCommand(util.IptablesDestroyFlag, chain)
if err != nil && errCode != doesNotExistErrorCode {
// add to staleChains if it's not one of the iptablesAzureChains
pMgr.staleChains.add(chain)
currentErrString := fmt.Sprintf("failed to clean up chain %s with err [%v]", chain, err)
if aggregateError == nil {
aggregateError = npmerrors.SimpleError(currentErrString)
} else {
aggregateError = npmerrors.SimpleErrorWrapper(fmt.Sprintf("%s and had previous error", currentErrString), aggregateError)
}
}
to the default case in the select loop above it?

done <- struct{}{}
}()
<-start
pMgr.reconcileManager.forceLock()
<-done
// the releaseLockSignal should be empty, there should be some stale chains, and staleChains should be unlockable
fmt.Println("weren't able to delete this many chains:", len(pMgr.staleChains.chainsToCleanup))
require.NotEqual(t, 0, len(pMgr.staleChains.chainsToCleanup), "stale chains should not be empty")
require.Equal(t, 0, len(pMgr.reconcileManager.releaseLockSignal), "releaseLockSignal should be empty")
pMgr.reconcileManager.Unlock()
}

func TestStaleChainsForceUnlock(t *testing.T) {
ioshim := common.NewMockIOShim(nil)
defer ioshim.VerifyCalls(t, nil)
pMgr := NewPolicyManager(ioshim, ipsetConfig)
pMgr.reconcileManager.forceLock()
require.Equal(t, 1, len(pMgr.reconcileManager.releaseLockSignal), "releaseLockSignal should be non-empty")
pMgr.reconcileManager.forceUnlock()
// the releaseLockSignal should be empty and staleChains should be lockable
require.Equal(t, 0, len(pMgr.reconcileManager.releaseLockSignal), "releaseLockSignal should be empty")
pMgr.reconcileManager.Lock()
}

func TestStaleChainsAddAndRemove(t *testing.T) {
ioshim := common.NewMockIOShim(nil)
defer ioshim.VerifyCalls(t, nil)
Expand Down
Loading