Skip to content
Merged
11 changes: 5 additions & 6 deletions npm/iptm/iptm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import (
)

const (
defaultlockWaitTimeInSeconds string = "60"
iptablesErrDoesNotExist int = 1
reconcileChainTimeInMinutes = 5
minLineNumberStringLength int = 3
iptablesErrDoesNotExist int = 1
reconcileChainTimeInMinutes = 5
minLineNumberStringLength int = 3
)

var (
Expand Down Expand Up @@ -139,7 +138,7 @@ func (iptMgr *IptablesManager) UninitNpmChains() error {
util.IptablesAzureTargetSetsChain,
util.IptablesAzureIngressWrongDropsChain,
)
currentAzureChains, err := ioutil.AllCurrentAzureChains(iptMgr.exec, defaultlockWaitTimeInSeconds)
currentAzureChains, err := ioutil.AllCurrentAzureChains(iptMgr.exec, util.IptablesDefaultWaitTime)
if err != nil {
metrics.SendErrorLogAndMetric(util.IptmID, "Warning: failed to get all current AZURE-NPM chains, so stale v2 chains may exist")
} else {
Expand Down Expand Up @@ -490,7 +489,7 @@ func (iptMgr *IptablesManager) run(entry *IptEntry) (int, error) {
}

if entry.LockWaitTimeInSeconds == "" {
entry.LockWaitTimeInSeconds = defaultlockWaitTimeInSeconds
entry.LockWaitTimeInSeconds = util.IptablesDefaultWaitTime
}

cmdArgs := append([]string{util.IptablesWaitFlag, entry.LockWaitTimeInSeconds, iptMgr.OperationFlag, entry.Chain}, entry.Specs...)
Expand Down
158 changes: 141 additions & 17 deletions npm/pkg/dataplane/ipsets/ipsetmanager_linux.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package ipsets

import (
"errors"
"fmt"
"strings"

"github.com/Azure/azure-container-networking/npm/metrics"
"github.com/Azure/azure-container-networking/npm/pkg/dataplane/parse"
"github.com/Azure/azure-container-networking/npm/util"
npmerrors "github.com/Azure/azure-container-networking/npm/util/errors"
"github.com/Azure/azure-container-networking/npm/util/ioutil"
"k8s.io/klog"
utilexec "k8s.io/utils/exec"
)

const (
azureNPMPrefix = "azure-npm-"
ipsetFlushAndDestroyString = "ipset flush && ipset destroy"

azureNPMPrefix = "azure-npm-"
azureNPMRegex = "azure-npm-\\d+"
positiveRefsRegex = "References: [1-9]"
referenceGrepLookBack = "5"
maxLinesToPrint = 10

ipsetCommand = "ipset"
ipsetListFlag = "list"
Expand Down Expand Up @@ -58,6 +67,9 @@ var (
/*
based on ipset list output with azure-npm- prefix, create an ipset restore file where we flush all sets first, then destroy all sets

NOTE: the behavior has changed to run two separate restore files. The first to flush all, the second to destroy all. In between restores,
we determine if there are any sets with leaked ipset reference counts. We ignore destroys for those sets in-line with v1.

overall error handling:
- if flush fails because the set doesn't exist (should never happen because we're listing sets right before), then ignore it and the destroy
- if flush fails otherwise, then add to destroyFailureCount and continue (aborting the destroy too)
Expand Down Expand Up @@ -85,54 +97,113 @@ var (
If a flush fails, we could update the num entries for that set, but that would be a lot of overhead.
*/
func (iMgr *IPSetManager) resetIPSets() error {
listCommand := iMgr.ioShim.Exec.Command(ipsetCommand, ipsetListFlag, ipsetNameFlag)
if success := iMgr.resetWithoutRestore(); success {
return nil
}

// get current NPM ipsets
listNamesCommand := iMgr.ioShim.Exec.Command(ipsetCommand, ipsetListFlag, ipsetNameFlag)
grepCommand := iMgr.ioShim.Exec.Command(ioutil.Grep, azureNPMPrefix)
azureIPSets, haveAzureIPSets, commandError := ioutil.PipeCommandToGrep(listCommand, grepCommand)
klog.Infof("running this command while resetting ipsets: [%s %s %s | %s %s]", ipsetCommand, ipsetListFlag, ipsetNameFlag, ioutil.Grep, azureNPMRegex)
azureIPSets, haveAzureNPMIPSets, commandError := ioutil.PipeCommandToGrep(listNamesCommand, grepCommand)
if commandError != nil {
return npmerrors.SimpleErrorWrapper("failed to run ipset list for resetting IPSets (prometheus metrics may be off now)", commandError)
}
if !haveAzureIPSets {
if !haveAzureNPMIPSets {
return nil
}
creator, originalNumAzureSets, destroyFailureCount := iMgr.fileCreatorForReset(azureIPSets)

// flush all NPM sets
creator, names, failedNames := iMgr.fileCreatorForFlushAll(azureIPSets)
restoreError := creator.RunCommandWithFile(ipsetCommand, ipsetRestoreFlag)
if restoreError != nil {
klog.Errorf(
"failed to restore ipsets (prometheus metrics may be off now). Had originalNumAzureSets %d and destroyFailureCount %d with err: %v",
originalNumAzureSets, destroyFailureCount, restoreError,
"failed to flush all ipsets (prometheus metrics may be off now). originalNumAzureSets: %d. failed flushes: %+v. err: %v",
len(names), failedNames, restoreError,
)
return npmerrors.SimpleErrorWrapper("failed to run ipset restore for resetting IPSets", restoreError)
return npmerrors.SimpleErrorWrapper("failed to run ipset restore while flushing all for resetting IPSets", restoreError)
}

// destroy all NPM sets
creator, destroyFailureCount := iMgr.fileCreatorForDestroyAll(names, failedNames, iMgr.setsWithReferences())
destroyError := creator.RunCommandWithFile(ipsetCommand, ipsetRestoreFlag)
if destroyError != nil {
klog.Errorf(
"failed to destroy all ipsets (prometheus metrics may be off now). destroyFailureCount %d. err: %v",
destroyFailureCount, destroyError,
)
return npmerrors.SimpleErrorWrapper("failed to run ipset restore while destroying all for resetting IPSets", destroyError)
}
return nil
}

// resetWithoutRestore will return true (success) if able to reset without restore
func (iMgr *IPSetManager) resetWithoutRestore() bool {
listNamesCommand := iMgr.ioShim.Exec.Command(ipsetCommand, ipsetListFlag, ipsetNameFlag)
grepCommand := iMgr.ioShim.Exec.Command(ioutil.Grep, ioutil.GrepQuietFlag, ioutil.GrepAntiMatchFlag, azureNPMPrefix)
commandString := fmt.Sprintf(" [%s %s %s | %s %s %s %s]", ipsetCommand, ipsetListFlag, ipsetNameFlag, ioutil.Grep, ioutil.GrepQuietFlag, ioutil.GrepAntiMatchFlag, azureNPMPrefix)
klog.Infof("running this command while resetting ipsets: [%s]", commandString)
_, haveNonAzureNPMIPSets, commandError := ioutil.PipeCommandToGrep(listNamesCommand, grepCommand)
if commandError != nil {
metrics.SendErrorLogAndMetric(util.IpsmID, "failed to determine if there were non-azure sets while resetting. err: %v", commandError)
return false
}
if haveNonAzureNPMIPSets {
return false
}

flushAndDestroy := iMgr.ioShim.Exec.Command(util.BashCommand, util.BashCommandFlag, ipsetFlushAndDestroyString)
klog.Infof("running this command while resetting ipsets: [%s %s '%s']", util.BashCommand, util.BashCommandFlag, ipsetFlushAndDestroyString)
output, err := flushAndDestroy.CombinedOutput()
if err != nil {
exitCode := -1
stdErr := "no stdErr detected"
var exitError utilexec.ExitError
if ok := errors.As(err, &exitError); ok {
exitCode = exitError.ExitStatus()
stdErr = strings.TrimSuffix(string(output), "\n")
}
metrics.SendErrorLogAndMetric(util.IpsmID, "failed to flush and destroy ipsets at once. exitCode: %d. stdErr: [%v]", exitCode, stdErr)
return false
}
return true
}

// this needs to be a separate function because we need to check creator contents in UTs
func (iMgr *IPSetManager) fileCreatorForReset(ipsetListOutput []byte) (*ioutil.FileCreator, int, *int) {
// named returns to appease lint
func (iMgr *IPSetManager) fileCreatorForFlushAll(ipsetListOutput []byte) (creator *ioutil.FileCreator, names []string, failedNames map[string]struct{}) {
destroyFailureCount := 0
creator := ioutil.NewFileCreator(iMgr.ioShim, maxTryCount, ipsetRestoreLineFailurePattern)
names := make([]string, 0)
creator = ioutil.NewFileCreator(iMgr.ioShim, maxTryCount, ipsetRestoreLineFailurePattern)
names = make([]string, 0)
failedNames = make(map[string]struct{}, 0)
readIndex := 0
var line []byte
// flush all the sets and create a list of the sets so we can destroy them
for readIndex < len(ipsetListOutput) {
line, readIndex = parse.Line(readIndex, ipsetListOutput)
hashedSetName := string(line)
if readIndex >= len(ipsetListOutput) {
// parse.Line() will include the newline character for the end of the byte array
hashedSetName = strings.Trim(hashedSetName, "\n")
}
names = append(names, hashedSetName)
// error handlers specific to resetting ipsets
errorHandlers := []*ioutil.LineErrorHandler{
{
Definition: setDoesntExistDefinition,
Method: ioutil.ContinueAndAbortSection,
Method: ioutil.Continue,
Callback: func() {
klog.Infof("[RESET-IPSETS] skipping flush and upcoming destroy for set %s since the set doesn't exist", hashedSetName)
failedNames[hashedSetName] = struct{}{}
},
},
{
Definition: ioutil.AlwaysMatchDefinition,
Method: ioutil.ContinueAndAbortSection,
Method: ioutil.Continue,
Callback: func() {
klog.Errorf("[RESET-IPSETS] marking flush and upcoming destroy for set %s as a failure due to unknown error", hashedSetName)
destroyFailureCount++
failedNames[hashedSetName] = struct{}{}
// TODO mark as a failure
},
},
Expand All @@ -141,8 +212,46 @@ func (iMgr *IPSetManager) fileCreatorForReset(ipsetListOutput []byte) (*ioutil.F
creator.AddLine(sectionID, errorHandlers, ipsetFlushFlag, hashedSetName) // flush set
}

return creator, names, failedNames
}

func (iMgr *IPSetManager) setsWithReferences() map[string]struct{} {
listAllCommand := iMgr.ioShim.Exec.Command(ipsetCommand, ipsetListFlag)
grep1 := iMgr.ioShim.Exec.Command(ioutil.Grep, ioutil.GrepBeforeFlag, referenceGrepLookBack, ioutil.GrepRegexFlag, positiveRefsRegex)
grep2 := iMgr.ioShim.Exec.Command(ioutil.Grep, ioutil.GrepOnlyMatchingFlag, ioutil.GrepRegexFlag, azureNPMRegex)
klog.Infof("running this command while resetting ipsets: [%s %s | %s %s %s %s %s | %s %s %s %s]", ipsetCommand, ipsetListFlag,
ioutil.Grep, ioutil.GrepBeforeFlag, referenceGrepLookBack, ioutil.GrepRegexFlag, positiveRefsRegex,
ioutil.Grep, ioutil.GrepOnlyMatchingFlag, ioutil.GrepRegexFlag, azureNPMRegex)
setsWithReferencesBytes, haveRefsStill, err := ioutil.DoublePipeToGrep(listAllCommand, grep1, grep2)

var setsWithReferences map[string]struct{}
if haveRefsStill {
setsWithReferences = readByteLinesToMap(setsWithReferencesBytes)
metrics.SendErrorLogAndMetric(util.IpsmID, "error: found leaked reference counts in kernel. ipsets (max %d): %+v. err: %v",
maxLinesToPrint, setsWithReferences, err)
}

return setsWithReferences
}

// named returns to appease lint
func (iMgr *IPSetManager) fileCreatorForDestroyAll(names []string, failedNames, setsWithReferences map[string]struct{}) (creator *ioutil.FileCreator, failureCount *int) {
failureCountVal := 0
failureCount = &failureCountVal
creator = ioutil.NewFileCreator(iMgr.ioShim, maxTryCount, ipsetRestoreLineFailurePattern)

// destroy all the sets
for _, hashedSetName := range names {
if _, ok := failedNames[hashedSetName]; ok {
klog.Infof("skipping destroy for set %s since it failed to flush", hashedSetName)
continue
}

if _, ok := setsWithReferences[hashedSetName]; ok {
klog.Infof("skipping destroy for set %s since it has leaked reference counts", hashedSetName)
continue
}

hashedSetName := hashedSetName // to appease go lint
errorHandlers := []*ioutil.LineErrorHandler{
// error handlers specific to resetting ipsets
Expand All @@ -151,7 +260,7 @@ func (iMgr *IPSetManager) fileCreatorForReset(ipsetListOutput []byte) (*ioutil.F
Method: ioutil.Continue,
Callback: func() {
klog.Errorf("[RESET-IPSETS] marking destroy for set %s as a failure since the set is in use by a kernel component", hashedSetName)
destroyFailureCount++
failureCountVal++
// TODO mark the set as a failure and reconcile what iptables rule or ipset is referring to it
},
},
Expand All @@ -167,16 +276,16 @@ func (iMgr *IPSetManager) fileCreatorForReset(ipsetListOutput []byte) (*ioutil.F
Method: ioutil.Continue,
Callback: func() {
klog.Errorf("[RESET-IPSETS] marking destroy for set %s as a failure due to unknown error", hashedSetName)
destroyFailureCount++
failureCountVal++
// TODO mark the set as a failure and reconcile what iptables rule or ipset is referring to it
},
},
}
sectionID := sectionID(destroySectionPrefix, hashedSetName)
creator.AddLine(sectionID, errorHandlers, ipsetDestroyFlag, hashedSetName) // destroy set
}
originalNumAzureSets := len(names)
return creator, originalNumAzureSets, &destroyFailureCount

return creator, failureCount
}

/*
Expand Down Expand Up @@ -705,3 +814,18 @@ func (iMgr *IPSetManager) addMemberForApply(creator *ioutil.FileCreator, set *IP
func sectionID(prefix, prefixedName string) string {
return fmt.Sprintf("%s-%s", prefix, prefixedName)
}

func readByteLinesToMap(output []byte) map[string]struct{} {
readIndex := 0
var line []byte
lines := make(map[string]struct{})
for readIndex < len(output) {
line, readIndex = parse.Line(readIndex, output)
hashedSetName := strings.Trim(string(line), "\n")
lines[hashedSetName] = struct{}{}
if len(lines) > maxLinesToPrint {
break
}
}
return lines
}
Loading