Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 95 additions & 40 deletions npm/pkg/dataplane/policies/chain-management_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/Azure/azure-container-networking/npm/metrics"
"github.com/Azure/azure-container-networking/npm/pkg/dataplane/ioutil"
Expand All @@ -17,7 +16,6 @@ import (

const (
defaultlockWaitTimeInSeconds string = "60"
reconcileChainTimeInMinutes int = 5

doesNotExistErrorCode int = 1 // Bad rule (does a matching rule exist in that chain?)
couldntLoadTargetErrorCode int = 2 // Couldn't load target `AZURE-NPM-EGRESS':No such file or directory
Expand Down Expand Up @@ -54,6 +52,51 @@ var (
ingressOrEgressPolicyChainPattern = fmt.Sprintf("'Chain %s-\\|Chain %s-'", util.IptablesAzureIngressPolicyChainPrefix, util.IptablesAzureEgressPolicyChainPrefix)
)

type staleChains struct {
chainsToCleanup map[string]struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need locks for this stalechain because two different threads are going to read/write into this, one reconcile thread and the normal pMgr thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's some discussion about this with me and Junguk above. We lock the whole pMgr before Add/Remove Policy, and I lock pMgr before reconcile now too. Besides the staleChains field that is used in all three of these methods, ioshim is also used to make syscalls in all three

}

func newStaleChains() *staleChains {
return &staleChains{make(map[string]struct{})}
}

func (s *staleChains) add(chain string) {
s.chainsToCleanup[chain] = struct{}{}
}

func (s *staleChains) remove(chain string) {
delete(s.chainsToCleanup, chain)
}

func (s *staleChains) emptyAndGetAll() []string {
result := make([]string, len(s.chainsToCleanup))
k := 0
for chain := range s.chainsToCleanup {
result[k] = chain
s.remove(chain)
k++
}
return result
}

func (s *staleChains) empty() {
s.chainsToCleanup = make(map[string]struct{})
}

// A proactive approach to avoid time to install default chains when the first networkpolicy comes again.
// Different from v1, which uninits when there are no policies and initializes when there are policies.
// The dataplane also initializes when it's created, so this keeps the policymanager in-line with that philosophy of having chains initialized at all times.
func (pMgr *PolicyManager) reboot() error {
// TODO for the sake of UTs, need to have a pMgr config specifying whether or not this reboot happens
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need reboot function?
It seems it is called when there is no more networkpolicy.
So, it is ok to call just reset.

Or is it clean-up all and then re-install default NPM chains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya we have reset which cleans up and deletes, and initialize which reinstalls. This reboot will do nothing in Windows

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious.
I guess you decide to do a pro-active approach to avoid time to install default chains when the first networkpolicy comes again while logically only reset is called when there is no more networkpolicy and initialize is called if the first networkpolicy comes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I see your point. In v1 it looks we just reset, and then when the first policy comes in, we initialize.

Given that the current DP design initializes the pMgr on creation, it might be simpler to always have pMgr initialized. Need to consider if there are any security or perf concerns for this approach.

// if err := pMgr.reset(); err != nil {
// return npmerrors.SimpleErrorWrapper("failed to remove NPM chains while rebooting", err)
// }
// if err := pMgr.initialize(); err != nil {
// return npmerrors.SimpleErrorWrapper("failed to initialize NPM chains while rebooting", err)
// }
return nil
}

func (pMgr *PolicyManager) initialize() error {
if err := pMgr.initializeNPMChains(); err != nil {
return npmerrors.SimpleErrorWrapper("failed to initialize NPM chains", err)
Expand All @@ -65,14 +108,15 @@ func (pMgr *PolicyManager) reset() error {
if err := pMgr.removeNPMChains(); err != nil {
return npmerrors.SimpleErrorWrapper("failed to remove NPM chains", err)
}
pMgr.staleChains.empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not look like we are deleting staleChains on reset ? if NPM comes up after a crash, we lost the stalechain cache and we would have to remove them right ? I think we will need to read all existing chains and delete chains with "Azure-NPM" prefix, wdyt ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removeNPMChains actually does grep for any policy chains and deletes them. It actually creates chains that might not exist though, and greps for ingress/egress policy chains only. I will change it to grep for anything with that prefix, and then we will only be flushing and deleting chains that already exist

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this in another PR

return nil
}

// initializeNPMChains creates all chains/rules and makes sure the jump from FORWARD chain to
// AZURE-NPM chain is after the jumps to KUBE-FORWARD & KUBE-SERVICES chains (if they exist).
func (pMgr *PolicyManager) initializeNPMChains() error {
klog.Infof("Initializing AZURE-NPM chains.")
creator := pMgr.getCreatorForInitChains()
creator := pMgr.creatorForInitChains()
err := restore(creator)
if err != nil {
return npmerrors.SimpleErrorWrapper("failed to create chains and rules", err)
Expand Down Expand Up @@ -101,7 +145,7 @@ func (pMgr *PolicyManager) removeNPMChains() error {
}

// flush all chains (will create any chain, including deprecated ones, if they don't exist)
creatorToFlush, chainsToDelete := pMgr.getCreatorAndChainsForReset()
creatorToFlush, chainsToDelete := pMgr.creatorAndChainsForReset()
restoreError := restore(creatorToFlush)
if restoreError != nil {
return npmerrors.SimpleErrorWrapper("failed to flush chains", restoreError)
Expand All @@ -123,26 +167,37 @@ func (pMgr *PolicyManager) removeNPMChains() error {
return nil
}

// ReconcileChains periodically creates the jump rule from FORWARD chain to AZURE-NPM chain (if it d.n.e)
// and makes sure it's after the jumps to KUBE-FORWARD & KUBE-SERVICES chains (if they exist).
func (pMgr *PolicyManager) ReconcileChains(stopChannel <-chan struct{}) {
go pMgr.reconcileChains(stopChannel)
// reconcile does the following:
// - cleans up stale policy chains
// - creates the jump rule from FORWARD chain to AZURE-NPM chain (if it does not exist) and makes sure it's after the jumps to KUBE-FORWARD & KUBE-SERVICES chains (if they exist).
func (pMgr *PolicyManager) reconcile() {
if err := pMgr.positionAzureChainJumpRule(); err != nil {
klog.Errorf("failed to reconcile jump rule to Azure-NPM due to %s", err.Error())
}
if err := pMgr.cleanupChains(pMgr.staleChains.emptyAndGetAll()); err != nil {
klog.Errorf("failed to clean up old policy chains with the following error %s", err.Error())
}
}

func (pMgr *PolicyManager) reconcileChains(stopChannel <-chan struct{}) {
ticker := time.NewTicker(time.Minute * time.Duration(reconcileChainTimeInMinutes))
defer ticker.Stop()

for {
select {
case <-stopChannel:
return
case <-ticker.C:
if err := pMgr.positionAzureChainJumpRule(); err != nil {
metrics.SendErrorLogAndMetric(util.NpmID, "Error: failed to reconcile jump rule to Azure-NPM due to %s", err.Error())
// have to use slice argument for deterministic behavior for UTs
func (pMgr *PolicyManager) cleanupChains(chains []string) error {
var aggregateError error
for _, chain := range chains {
errCode, err := pMgr.runIPTablesCommand(util.IptablesDestroyFlag, chain) // TODO run the one that ignores doesNotExistErrorCode
if err != nil && errCode != doesNotExistErrorCode {
pMgr.staleChains.add(chain)
currentErrString := fmt.Sprintf("failed to clean up policy chain %s with err [%v]", chain, err)
if aggregateError == nil {
aggregateError = npmerrors.SimpleError(currentErrString)
} else {
aggregateError = npmerrors.SimpleErrorWrapper(fmt.Sprintf("%s and had previous error", currentErrString), aggregateError)
}
}
}
if aggregateError != nil {
return npmerrors.SimpleErrorWrapper("failed to clean up some policy chains with errors", aggregateError)
}
return nil
}

// this function has a direct comparison in NPM v1 iptables manager (iptm.go)
Expand Down Expand Up @@ -170,8 +225,8 @@ func (pMgr *PolicyManager) runIPTablesCommand(operationFlag string, args ...stri
return 0, nil
}

func (pMgr *PolicyManager) getCreatorForInitChains() *ioutil.FileCreator {
creator := pMgr.getNewCreatorWithChains(iptablesAzureChains)
func (pMgr *PolicyManager) creatorForInitChains() *ioutil.FileCreator {
creator := pMgr.newCreatorWithChains(iptablesAzureChains)

// add AZURE-NPM chain rules
creator.AddLine("", nil, util.IptablesAppendFlag, util.IptablesAzureChain, util.IptablesJumpFlag, util.IptablesAzureIngressChain)
Expand All @@ -180,32 +235,32 @@ func (pMgr *PolicyManager) getCreatorForInitChains() *ioutil.FileCreator {

// add AZURE-NPM-INGRESS chain rules
ingressDropSpecs := []string{util.IptablesAppendFlag, util.IptablesAzureIngressChain, util.IptablesJumpFlag, util.IptablesDrop}
ingressDropSpecs = append(ingressDropSpecs, getOnMarkSpecs(util.IptablesAzureIngressDropMarkHex)...)
ingressDropSpecs = append(ingressDropSpecs, getCommentSpecs(fmt.Sprintf("DROP-ON-INGRESS-DROP-MARK-%s", util.IptablesAzureIngressDropMarkHex))...)
ingressDropSpecs = append(ingressDropSpecs, onMarkSpecs(util.IptablesAzureIngressDropMarkHex)...)
ingressDropSpecs = append(ingressDropSpecs, commentSpecs(fmt.Sprintf("DROP-ON-INGRESS-DROP-MARK-%s", util.IptablesAzureIngressDropMarkHex))...)
creator.AddLine("", nil, ingressDropSpecs...)

// add AZURE-NPM-INGRESS-ALLOW-MARK chain
markIngressAllowSpecs := []string{util.IptablesAppendFlag, util.IptablesAzureIngressAllowMarkChain}
markIngressAllowSpecs = append(markIngressAllowSpecs, getSetMarkSpecs(util.IptablesAzureIngressAllowMarkHex)...)
markIngressAllowSpecs = append(markIngressAllowSpecs, getCommentSpecs(fmt.Sprintf("SET-INGRESS-ALLOW-MARK-%s", util.IptablesAzureIngressAllowMarkHex))...)
markIngressAllowSpecs = append(markIngressAllowSpecs, setMarkSpecs(util.IptablesAzureIngressAllowMarkHex)...)
markIngressAllowSpecs = append(markIngressAllowSpecs, commentSpecs(fmt.Sprintf("SET-INGRESS-ALLOW-MARK-%s", util.IptablesAzureIngressAllowMarkHex))...)
creator.AddLine("", nil, markIngressAllowSpecs...)
creator.AddLine("", nil, util.IptablesAppendFlag, util.IptablesAzureIngressAllowMarkChain, util.IptablesJumpFlag, util.IptablesAzureEgressChain)

// add AZURE-NPM-EGRESS chain rules
egressDropSpecs := []string{util.IptablesAppendFlag, util.IptablesAzureEgressChain, util.IptablesJumpFlag, util.IptablesDrop}
egressDropSpecs = append(egressDropSpecs, getOnMarkSpecs(util.IptablesAzureEgressDropMarkHex)...)
egressDropSpecs = append(egressDropSpecs, getCommentSpecs(fmt.Sprintf("DROP-ON-EGRESS-DROP-MARK-%s", util.IptablesAzureEgressDropMarkHex))...)
egressDropSpecs = append(egressDropSpecs, onMarkSpecs(util.IptablesAzureEgressDropMarkHex)...)
egressDropSpecs = append(egressDropSpecs, commentSpecs(fmt.Sprintf("DROP-ON-EGRESS-DROP-MARK-%s", util.IptablesAzureEgressDropMarkHex))...)
creator.AddLine("", nil, egressDropSpecs...)

jumpOnIngressMatchSpecs := []string{util.IptablesAppendFlag, util.IptablesAzureEgressChain, util.IptablesJumpFlag, util.IptablesAzureAcceptChain}
jumpOnIngressMatchSpecs = append(jumpOnIngressMatchSpecs, getOnMarkSpecs(util.IptablesAzureIngressAllowMarkHex)...)
jumpOnIngressMatchSpecs = append(jumpOnIngressMatchSpecs, getCommentSpecs(fmt.Sprintf("ACCEPT-ON-INGRESS-ALLOW-MARK-%s", util.IptablesAzureIngressAllowMarkHex))...)
jumpOnIngressMatchSpecs = append(jumpOnIngressMatchSpecs, onMarkSpecs(util.IptablesAzureIngressAllowMarkHex)...)
jumpOnIngressMatchSpecs = append(jumpOnIngressMatchSpecs, commentSpecs(fmt.Sprintf("ACCEPT-ON-INGRESS-ALLOW-MARK-%s", util.IptablesAzureIngressAllowMarkHex))...)
creator.AddLine("", nil, jumpOnIngressMatchSpecs...)

// add AZURE-NPM-ACCEPT chain rules
clearSpecs := []string{util.IptablesAppendFlag, util.IptablesAzureAcceptChain}
clearSpecs = append(clearSpecs, getSetMarkSpecs(util.IptablesAzureClearMarkHex)...)
clearSpecs = append(clearSpecs, getCommentSpecs("Clear-AZURE-NPM-MARKS")...)
clearSpecs = append(clearSpecs, setMarkSpecs(util.IptablesAzureClearMarkHex)...)
clearSpecs = append(clearSpecs, commentSpecs("Clear-AZURE-NPM-MARKS")...)
creator.AddLine("", nil, clearSpecs...)
creator.AddLine("", nil, util.IptablesAppendFlag, util.IptablesAzureAcceptChain, util.IptablesJumpFlag, util.IptablesAccept)
creator.AddLine("", nil, util.IptablesRestoreCommit)
Expand All @@ -215,7 +270,7 @@ func (pMgr *PolicyManager) getCreatorForInitChains() *ioutil.FileCreator {
// add/reposition AZURE-NPM chain after KUBE-FORWARD and KUBE-SERVICE chains if they exist
// this function has a direct comparison in NPM v1 iptables manager (iptm.go)
func (pMgr *PolicyManager) positionAzureChainJumpRule() error {
kubeServicesLine, kubeServicesLineNumErr := pMgr.getChainLineNumber(util.IptablesKubeServicesChain)
kubeServicesLine, kubeServicesLineNumErr := pMgr.chainLineNumber(util.IptablesKubeServicesChain)
if kubeServicesLineNumErr != nil {
// not possible to cover this branch currently because of testing limitations for PipeCommandToGrep()
baseErrString := "failed to get index of jump from KUBE-SERVICES chain to FORWARD chain with error"
Expand All @@ -225,7 +280,7 @@ func (pMgr *PolicyManager) positionAzureChainJumpRule() error {

index := kubeServicesLine + 1

// TODO could call getChainLineNumber instead, and say it doesn't exist for lineNum == 0
// TODO could call chainLineNumber instead, and say it doesn't exist for lineNum == 0
jumpRuleErrCode, checkErr := pMgr.runIPTablesCommand(util.IptablesCheckFlag, jumpFromForwardToAzureChainArgs...)
hadCheckError := checkErr != nil && jumpRuleErrCode != doesNotExistErrorCode
if hadCheckError {
Expand All @@ -252,7 +307,7 @@ func (pMgr *PolicyManager) positionAzureChainJumpRule() error {
return nil
}

npmChainLine, npmLineNumErr := pMgr.getChainLineNumber(util.IptablesAzureChain)
npmChainLine, npmLineNumErr := pMgr.chainLineNumber(util.IptablesAzureChain)
if npmLineNumErr != nil {
// not possible to cover this branch currently because of testing limitations for PipeCommandToGrep()
baseErrString := "failed to get index of jump from FORWARD chain to AZURE-NPM chain"
Expand Down Expand Up @@ -293,7 +348,7 @@ func (pMgr *PolicyManager) positionAzureChainJumpRule() error {

// returns 0 if the chain d.n.e.
// this function has a direct comparison in NPM v1 iptables manager (iptm.go)
func (pMgr *PolicyManager) getChainLineNumber(chain string) (int, error) {
func (pMgr *PolicyManager) chainLineNumber(chain string) (int, error) {
// TODO could call this once and use regex instead of grep to cut down on OS calls
listForwardEntriesCommand := pMgr.ioShim.Exec.Command(util.Iptables,
util.IptablesWaitFlag, defaultlockWaitTimeInSeconds, util.IptablesTableFlag, util.IptablesFilterTable,
Expand All @@ -316,20 +371,20 @@ func (pMgr *PolicyManager) getChainLineNumber(chain string) (int, error) {
}

// make this a function for easier testing
func (pMgr *PolicyManager) getCreatorAndChainsForReset() (creator *ioutil.FileCreator, chainsToFlush []string) {
oldPolicyChains, err := pMgr.getPolicyChainNames()
func (pMgr *PolicyManager) creatorAndChainsForReset() (creator *ioutil.FileCreator, chainsToFlush []string) {
oldPolicyChains, err := pMgr.policyChainNames()
if err != nil {
// not possible to cover this branch currently because of testing limitations for PipeCommandToGrep()
metrics.SendErrorLogAndMetric(util.IptmID, "Error: failed to determine NPM ingress/egress policy chains to delete")
}
chainsToFlush = iptablesOldAndNewChains
chainsToFlush = append(chainsToFlush, oldPolicyChains...) // will work even if oldPolicyChains is nil
creator = pMgr.getNewCreatorWithChains(chainsToFlush)
creator = pMgr.newCreatorWithChains(chainsToFlush)
creator.AddLine("", nil, util.IptablesRestoreCommit)
return
}

func (pMgr *PolicyManager) getPolicyChainNames() ([]string, error) {
func (pMgr *PolicyManager) policyChainNames() ([]string, error) {
iptablesListCommand := pMgr.ioShim.Exec.Command(util.Iptables,
util.IptablesWaitFlag, defaultlockWaitTimeInSeconds, util.IptablesTableFlag, util.IptablesFilterTable,
util.IptablesNumericFlag, util.IptablesListFlag,
Expand All @@ -355,7 +410,7 @@ func (pMgr *PolicyManager) getPolicyChainNames() ([]string, error) {
return chainNames, nil
}

func getOnMarkSpecs(mark string) []string {
func onMarkSpecs(mark string) []string {
return []string{
util.IptablesModuleFlag,
util.IptablesMarkVerb,
Expand Down
Loading