Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions npm/npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ type NetworkPolicyManager struct {
namespaceControllerV2 *controllersv2.NamespaceController
npmNamespaceCacheV2 *controllersv2.NpmNamespaceCache

npInformer networkinginformers.NetworkPolicyInformer
netPolController *networkPolicyController
npInformer networkinginformers.NetworkPolicyInformer
netPolControllerV1 *controllersv1.NetworkPolicyController

// ipsMgr are shared in all controllers. Thus, only one ipsMgr is created for simple management
// and uses lock to avoid unintentional race condictions in IpsetManager.
Expand Down Expand Up @@ -116,7 +116,7 @@ func NewNetworkPolicyManager(config npmconfig.Config,
// create NameSpace controller
npMgr.namespaceControllerV1 = controllersv1.NewNameSpaceController(npMgr.nsInformer, npMgr.ipsMgr, npMgr.npmNamespaceCacheV1)
// create network policy controller
npMgr.netPolController = NewNetworkPolicyController(npMgr.npInformer, npMgr.ipsMgr)
npMgr.netPolControllerV1 = controllersv1.NewNetworkPolicyController(npMgr.npInformer, npMgr.ipsMgr)

return npMgr
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func (npMgr *NetworkPolicyManager) SendClusterMetrics() {
lenOfNsMap := len(npMgr.npmNamespaceCacheV1.NsMap)
nsCount.Value = float64(lenOfNsMap - 1)

lenOfRawNpMap := npMgr.netPolController.lengthOfRawNpMap()
lenOfRawNpMap := npMgr.netPolControllerV1.LengthOfRawNpMap()
nwPolicyCount.Value += float64(lenOfRawNpMap)

lenOfPodMap := npMgr.podControllerV1.LengthOfPodMap()
Expand All @@ -219,7 +219,7 @@ func (npMgr *NetworkPolicyManager) SendClusterMetrics() {
// Start starts shared informers and waits for the shared informer cache to sync.
func (npMgr *NetworkPolicyManager) Start(config npmconfig.Config, stopCh <-chan struct{}) error {
// Do initialization of data plane before starting syncup of each controller to avoid heavy call to api-server
if err := npMgr.netPolController.resetDataPlane(); err != nil {
if err := npMgr.netPolControllerV1.ResetDataPlane(); err != nil {
return fmt.Errorf("Failed to initialized data plane")
}

Expand All @@ -242,16 +242,17 @@ func (npMgr *NetworkPolicyManager) Start(config npmconfig.Config, stopCh <-chan
if config.Toggles.EnableV2Controllers {
go npMgr.podControllerV2.Run(stopCh)
go npMgr.namespaceControllerV2.Run(stopCh)
go npMgr.netPolController.Run(stopCh)
go npMgr.netPolController.runPeriodicTasks(stopCh)
// TODO add in netpol controller v2
// go npMgr.netPolControllerV1.Run(stopCh)
// go npMgr.netPolControllerV1.RunPeriodicTasks(stopCh)
return nil
}

// start controllers after synced
go npMgr.podControllerV1.Run(stopCh)
go npMgr.namespaceControllerV1.Run(stopCh)
go npMgr.netPolController.Run(stopCh)
go npMgr.netPolController.runPeriodicTasks(stopCh)
go npMgr.netPolControllerV1.Run(stopCh)
go npMgr.netPolControllerV1.RunPeriodicTasks(stopCh)

return nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018 Microsoft. All rights reserved.
// MIT License
package npm
package controllers

import (
"fmt"
Expand Down Expand Up @@ -31,7 +31,7 @@ const (
unSafeToCleanUpAzureNpmChain IsSafeCleanUpAzureNpmChain = false
)

type networkPolicyController struct {
type NetworkPolicyController struct {
netPolLister netpollister.NetworkPolicyLister
workqueue workqueue.RateLimitingInterface
rawNpMap map[string]*networkingv1.NetworkPolicy // Key is <nsname>/<policyname>
Expand All @@ -43,8 +43,8 @@ type networkPolicyController struct {
iptMgr *iptm.IptablesManager
}

func NewNetworkPolicyController(npInformer networkinginformers.NetworkPolicyInformer, ipsMgr *ipsm.IpsetManager) *networkPolicyController {
netPolController := &networkPolicyController{
func NewNetworkPolicyController(npInformer networkinginformers.NetworkPolicyInformer, ipsMgr *ipsm.IpsetManager) *NetworkPolicyController {
netPolController := &NetworkPolicyController{
netPolLister: npInformer.Lister(),
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "NetworkPolicy"),
rawNpMap: make(map[string]*networkingv1.NetworkPolicy),
Expand All @@ -66,7 +66,7 @@ func NewNetworkPolicyController(npInformer networkinginformers.NetworkPolicyInfo

// initializeDataPlane do all initialization tasks for data plane
// TODO(jungukcho) Need to refactor UninitNpmChains since it assumes it has already AZURE-NPM chains
func (c *networkPolicyController) resetDataPlane() error {
func (c *NetworkPolicyController) ResetDataPlane() error {
klog.Infof("Initiailize data plane. Clean up Azure-NPM chains and start reconcile iptables")

// TODO(jungukcho): will clean-up error handling codes to initialize iptables and ipset in a separate PR
Expand All @@ -85,18 +85,18 @@ func (c *networkPolicyController) resetDataPlane() error {
return nil
}

func (c *networkPolicyController) runPeriodicTasks(stopCh <-chan struct{}) {
func (c *NetworkPolicyController) RunPeriodicTasks(stopCh <-chan struct{}) {
// (TODO): Check any side effects
c.iptMgr.ReconcileIPTables(stopCh)
}

func (c *networkPolicyController) lengthOfRawNpMap() int {
func (c *NetworkPolicyController) LengthOfRawNpMap() int {
return len(c.rawNpMap)
}

// getNetworkPolicyKey returns namespace/name of network policy object if it is valid network policy object and has valid namespace/name.
// If not, it returns error.
func (c *networkPolicyController) getNetworkPolicyKey(obj interface{}) (string, error) {
func (c *NetworkPolicyController) getNetworkPolicyKey(obj interface{}) (string, error) {
var key string
_, ok := obj.(*networkingv1.NetworkPolicy)
if !ok {
Expand All @@ -111,7 +111,7 @@ func (c *networkPolicyController) getNetworkPolicyKey(obj interface{}) (string,
return key, nil
}

func (c *networkPolicyController) addNetworkPolicy(obj interface{}) {
func (c *NetworkPolicyController) addNetworkPolicy(obj interface{}) {
netPolkey, err := c.getNetworkPolicyKey(obj)
if err != nil {
utilruntime.HandleError(err)
Expand All @@ -121,15 +121,15 @@ func (c *networkPolicyController) addNetworkPolicy(obj interface{}) {
c.workqueue.Add(netPolkey)
}

func (c *networkPolicyController) updateNetworkPolicy(old, new interface{}) {
netPolkey, err := c.getNetworkPolicyKey(new)
func (c *NetworkPolicyController) updateNetworkPolicy(old, newnetpol interface{}) {
netPolkey, err := c.getNetworkPolicyKey(newnetpol)
if err != nil {
utilruntime.HandleError(err)
return
}

// new network policy object is already checked validation by calling getNetworkPolicyKey function.
newNetPol, _ := new.(*networkingv1.NetworkPolicy)
newNetPol, _ := newnetpol.(*networkingv1.NetworkPolicy)
oldNetPol, ok := old.(*networkingv1.NetworkPolicy)
if ok {
if oldNetPol.ResourceVersion == newNetPol.ResourceVersion {
Expand All @@ -142,7 +142,7 @@ func (c *networkPolicyController) updateNetworkPolicy(old, new interface{}) {
c.workqueue.Add(netPolkey)
}

func (c *networkPolicyController) deleteNetworkPolicy(obj interface{}) {
func (c *NetworkPolicyController) deleteNetworkPolicy(obj interface{}) {
netPolObj, ok := obj.(*networkingv1.NetworkPolicy)
// DeleteFunc gets the final state of the resource (if it is known).
// Otherwise, it gets an object of type DeletedFinalStateUnknown.
Expand Down Expand Up @@ -173,7 +173,7 @@ func (c *networkPolicyController) deleteNetworkPolicy(obj interface{}) {
c.workqueue.Add(netPolkey)
}

func (c *networkPolicyController) Run(stopCh <-chan struct{}) {
func (c *NetworkPolicyController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()

Expand All @@ -185,12 +185,12 @@ func (c *networkPolicyController) Run(stopCh <-chan struct{}) {
klog.Info("Shutting down Network Policy workers")
}

func (c *networkPolicyController) runWorker() {
func (c *NetworkPolicyController) runWorker() {
for c.processNextWorkItem() {
}
}

func (c *networkPolicyController) processNextWorkItem() bool {
func (c *NetworkPolicyController) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()

if shutdown {
Expand Down Expand Up @@ -232,7 +232,7 @@ func (c *networkPolicyController) processNextWorkItem() bool {
}

// syncNetPol compares the actual state with the desired, and attempts to converge the two.
func (c *networkPolicyController) syncNetPol(key string) error {
func (c *NetworkPolicyController) syncNetPol(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
Expand Down Expand Up @@ -286,7 +286,7 @@ func (c *networkPolicyController) syncNetPol(key string) error {
}

// initializeDefaultAzureNpmChain install default rules for kube-system and iptables
func (c *networkPolicyController) initializeDefaultAzureNpmChain() error {
func (c *NetworkPolicyController) initializeDefaultAzureNpmChain() error {
if c.isAzureNpmChainCreated {
return nil
}
Expand All @@ -303,7 +303,7 @@ func (c *networkPolicyController) initializeDefaultAzureNpmChain() error {
}

// syncAddAndUpdateNetPol handles a new network policy or an updated network policy object triggered by add and update events
func (c *networkPolicyController) syncAddAndUpdateNetPol(netPolObj *networkingv1.NetworkPolicy) error {
func (c *NetworkPolicyController) syncAddAndUpdateNetPol(netPolObj *networkingv1.NetworkPolicy) error {
prometheusTimer := metrics.StartNewTimer()
defer metrics.RecordPolicyExecTime(prometheusTimer) // record execution time regardless of failure

Expand Down Expand Up @@ -395,7 +395,7 @@ func (c *networkPolicyController) syncAddAndUpdateNetPol(netPolObj *networkingv1
}

// DeleteNetworkPolicy handles deleting network policy based on netPolKey.
func (c *networkPolicyController) cleanUpNetworkPolicy(netPolKey string, isSafeCleanUpAzureNpmChain IsSafeCleanUpAzureNpmChain) error {
func (c *NetworkPolicyController) cleanUpNetworkPolicy(netPolKey string, isSafeCleanUpAzureNpmChain IsSafeCleanUpAzureNpmChain) error {
cachedNetPolObj, cachedNetPolObjExists := c.rawNpMap[netPolKey]
// if there is no applied network policy with the netPolKey, do not need to clean up process.
if !cachedNetPolObjExists {
Expand Down Expand Up @@ -455,7 +455,7 @@ func (c *networkPolicyController) cleanUpNetworkPolicy(netPolKey string, isSafeC
}

// (TODO) do not need to ipsMgr parameter
func (c *networkPolicyController) createCidrsRule(direction, policyName, ns string, ipsets [][]string) error {
func (c *NetworkPolicyController) createCidrsRule(direction, policyName, ns string, ipsets [][]string) error {
spec := []string{util.IpsetNetHashFlag, util.IpsetMaxelemName, util.IpsetMaxelemNum}

for i, ipCidrSet := range ipsets {
Expand Down Expand Up @@ -488,7 +488,7 @@ func (c *networkPolicyController) createCidrsRule(direction, policyName, ns stri
return nil
}

func (c *networkPolicyController) removeCidrsRule(direction, policyName, ns string, ipsets [][]string) error {
func (c *NetworkPolicyController) removeCidrsRule(direction, policyName, ns string, ipsets [][]string) error {
for i, ipCidrSet := range ipsets {
if len(ipCidrSet) == 0 {
continue
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright 2018 Microsoft. All rights reserved.
// MIT License
package npm
package controllers

import (
"fmt"
"strconv"
"testing"
"time"

"github.com/Azure/azure-container-networking/npm/ipsm"
"github.com/Azure/azure-container-networking/npm/metrics"
Expand All @@ -25,8 +24,6 @@ import (
"k8s.io/utils/exec"
)

var noResyncPeriodFunc = func() time.Duration { return 0 }

type netPolFixture struct {
t *testing.T

Expand All @@ -38,7 +35,7 @@ type netPolFixture struct {
kubeobjects []runtime.Object

ipsMgr *ipsm.IpsetManager
netPolController *networkPolicyController
netPolController *NetworkPolicyController
kubeInformer kubeinformers.SharedInformerFactory
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018 Microsoft. All rights reserved.
// MIT License
package npm
package controllers

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package npm
package controllers

import (
"reflect"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package npm
package controllers

import (
"container/heap"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package npm
package controllers

import (
"container/heap"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018 Microsoft. All rights reserved.
// MIT License
package npm
package controllers

import (
"sort"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package npm
package controllers

import (
"encoding/json"
"io/ioutil"
"path/filepath"
"reflect"
"testing"

Expand All @@ -15,6 +16,8 @@ import (
"k8s.io/client-go/kubernetes/scheme"
)

const testPolicyDir = "../../../../"

func TestCraftPartialIptEntrySpecFromPort(t *testing.T) {
portRule := networkingv1.NetworkPolicyPort{}

Expand Down Expand Up @@ -1112,7 +1115,8 @@ func TestTranslateEgress(t *testing.T) {

func readPolicyYaml(policyYaml string) (*networkingv1.NetworkPolicy, error) {
decode := scheme.Codecs.UniversalDeserializer().Decode
b, err := ioutil.ReadFile(policyYaml)
policyYamlLocation := filepath.Join(testPolicyDir, policyYaml)
b, err := ioutil.ReadFile(policyYamlLocation)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2711,6 +2715,9 @@ func TestAllowAppFrontendToTCPPort53UDPPort53Policy(t *testing.T) {

func TestComplexPolicy(t *testing.T) {
k8sExamplePolicy, err := readPolicyYaml("testpolicies/complex-policy.yaml")
if err != nil {
t.Fatal(err)
}
k8sExamplePolicyDiffOrder, err := readPolicyYaml("testpolicies/complex-policy-diff-order.yaml")
if err != nil {
t.Fatal(err)
Expand Down
Loading