diff --git a/go.mod b/go.mod index 9680bdb2d9..569f9a778a 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( k8s.io/api v0.18.2 k8s.io/apimachinery v0.18.2 k8s.io/client-go v0.18.2 + k8s.io/klog v1.0.0 sigs.k8s.io/controller-runtime v0.6.0 software.sslmate.com/src/go-pkcs12 v0.0.0-20201102150903-66718f75db0e // indirect ) diff --git a/npm/metrics/prometheus_metrics.go b/npm/metrics/prometheus_metrics.go index 717db27312..852b5db6bd 100644 --- a/npm/metrics/prometheus_metrics.go +++ b/npm/metrics/prometheus_metrics.go @@ -61,6 +61,11 @@ var nodeLevelRegistry = prometheus.NewRegistry() var clusterLevelRegistry = prometheus.NewRegistry() var haveInitialized = false +func ReInitializeAllMetrics() { + haveInitialized = false + InitializeAll() +} + // InitializeAll creates all the Prometheus Metrics. The metrics will be nil before this method is called. func InitializeAll() { if !haveInitialized { diff --git a/npm/networkPolicyController.go b/npm/networkPolicyController.go new file mode 100644 index 0000000000..1f3af4c32a --- /dev/null +++ b/npm/networkPolicyController.go @@ -0,0 +1,488 @@ +// Copyright 2018 Microsoft. All rights reserved. +// MIT License +package npm + +import ( + "fmt" + "strconv" + "time" + + "github.com/Azure/azure-container-networking/npm/ipsm" + "github.com/Azure/azure-container-networking/npm/metrics" + "github.com/Azure/azure-container-networking/npm/util" + networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + networkinginformers "k8s.io/client-go/informers/networking/v1" + "k8s.io/client-go/kubernetes" + netpollister "k8s.io/client-go/listers/networking/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +// IsSafeCleanUpAzureNpmChain is used to indicate whether default Azure NPM chain can be safely deleted or not. +type IsSafeCleanUpAzureNpmChain bool + +const ( + SafeToCleanUpAzureNpmChain IsSafeCleanUpAzureNpmChain = true + unSafeToCleanUpAzureNpmChain IsSafeCleanUpAzureNpmChain = false +) + +type networkPolicyController struct { + clientset kubernetes.Interface + netPolLister netpollister.NetworkPolicyLister + netPolListerSynced cache.InformerSynced + workqueue workqueue.RateLimitingInterface + // (TODO): networkPolController does not need to have whole NetworkPolicyManager pointer. Need to improve it + npMgr *NetworkPolicyManager + // flag to indicate default Azure NPM chain is created or not + isAzureNpmChainCreated bool +} + +func NewNetworkPolicyController(npInformer networkinginformers.NetworkPolicyInformer, clientset kubernetes.Interface, npMgr *NetworkPolicyManager) *networkPolicyController { + netPolController := &networkPolicyController{ + clientset: clientset, + netPolLister: npInformer.Lister(), + netPolListerSynced: npInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "NetworkPolicy"), + npMgr: npMgr, + isAzureNpmChainCreated: false, + } + + npInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: netPolController.addNetworkPolicy, + UpdateFunc: netPolController.updateNetworkPolicy, + DeleteFunc: netPolController.deleteNetworkPolicy, + }, + ) + return netPolController +} + +// getNetworkPolicyKey returns namespace/name of network policy object if it is valid network policy object and has valid namespace/name. +// If not, it returns error. +func (c *networkPolicyController) getNetworkPolicyKey(obj interface{}) (string, error) { + var key string + _, ok := obj.(*networkingv1.NetworkPolicy) + if !ok { + return key, fmt.Errorf("cannot cast obj (%v) to network policy obj", obj) + } + + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + return key, fmt.Errorf("error due to %s", err) + } + + return key, nil +} + +func (c *networkPolicyController) addNetworkPolicy(obj interface{}) { + netPolkey, err := c.getNetworkPolicyKey(obj) + if err != nil { + utilruntime.HandleError(err) + return + } + + c.workqueue.Add(netPolkey) +} + +func (c *networkPolicyController) updateNetworkPolicy(old, new interface{}) { + netPolkey, err := c.getNetworkPolicyKey(new) + if err != nil { + utilruntime.HandleError(err) + return + } + + // new network policy object is already checked validation by calling getNetworkPolicyKey function. + newNetPol, _ := new.(*networkingv1.NetworkPolicy) + oldNetPol, ok := old.(*networkingv1.NetworkPolicy) + if ok { + if oldNetPol.ResourceVersion == newNetPol.ResourceVersion { + // Periodic resync will send update events for all known network plicies. + // Two different versions of the same network policy will always have different RVs. + return + } + } + + c.npMgr.Lock() + cachedNetPolObj, netPolExists := c.npMgr.RawNpMap[netPolkey] + c.npMgr.Unlock() + if netPolExists { + // if network policy does not have different states against lastly applied states stored in cachedNetPolObj, + // netPolController does not need to reconcile this update. + // in this updateNetworkPolicy event, newNetPol was updated with states which netPolController does not need to reconcile. + if isSameNetworkPolicy(cachedNetPolObj, newNetPol) { + return + } + } + + c.workqueue.Add(netPolkey) +} + +func (c *networkPolicyController) deleteNetworkPolicy(obj interface{}) { + _, ok := obj.(*networkingv1.NetworkPolicy) + // DeleteFunc gets the final state of the resource (if it is known). + // Otherwise, it gets an object of type DeletedFinalStateUnknown. + // This can happen if the watch is closed and misses the delete event and + // the controller doesn't notice the deletion until the subsequent re-list + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + metrics.SendErrorLogAndMetric(util.NetpolID, "[NETPOL DELETE EVENT] Received unexpected object type: %v", obj) + utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type")) + return + } + + if _, ok = tombstone.Obj.(*networkingv1.NetworkPolicy); !ok { + metrics.SendErrorLogAndMetric(util.NetpolID, "[NETPOL DELETE EVENT] Received unexpected object type (error decoding object tombstone, invalid type): %v", obj) + utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) + return + } + } + + var netPolkey string + var err error + if netPolkey, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + utilruntime.HandleError(err) + return + } + + // (TODO): need to decouple this lock from npMgr if possible + c.npMgr.Lock() + _, netPolExists := c.npMgr.RawNpMap[netPolkey] + c.npMgr.Unlock() + // If a network policy object is not in the RawNpMap, do not need to clean-up states for the network policy + // since netPolController did not apply for any states for the network policy + if !netPolExists { + return + } + + c.workqueue.Add(netPolkey) +} + +func (c *networkPolicyController) Run(threadiness int, stopCh <-chan struct{}) error { + defer utilruntime.HandleCrash() + defer c.workqueue.ShutDown() + + klog.Infof("Starting Network Policy %d worker(s)", threadiness) + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + klog.Infof("Started Network Policy %d worker(s)", threadiness) + <-stopCh + klog.Info("Shutting down Network Policy workers") + + return nil +} + +func (c *networkPolicyController) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *networkPolicyController) processNextWorkItem() bool { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer c.workqueue.Done(obj) + var key string + var ok bool + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.workqueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the syncNetPol, passing it the namespace/name string of the + // network policy resource to be synced. + // TODO : may consider using "c.queue.AddAfter(key, *requeueAfter)" according to error type later + if err := c.syncNetPol(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.workqueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(obj) + klog.Infof("Successfully synced '%s'", key) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + metrics.SendErrorLogAndMetric(util.NetpolID, "syncNetPol error due to %v", err) + return true + } + + return true +} + +// syncNetPol compares the actual state with the desired, and attempts to converge the two. +func (c *networkPolicyController) syncNetPol(key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + // Get the network policy resource with this namespace/name + netPolObj, err := c.netPolLister.NetworkPolicies(namespace).Get(name) + + // (TODO): Reduce scope of lock later + c.npMgr.Lock() + defer c.npMgr.Unlock() + + if err != nil { + if errors.IsNotFound(err) { + klog.Infof("Network Policy %s is not found, may be it is deleted", key) + // netPolObj is not found, but should need to check the RawNpMap cache with key. + // cleanUpNetworkPolicy method will take care of the deletion of a cached network policy if the cached network policy exists with key in our RawNpMap cache. + err = c.cleanUpNetworkPolicy(key, SafeToCleanUpAzureNpmChain) + if err != nil { + return fmt.Errorf("[syncNetPol] Error: %v when network policy is not found\n", err) + } + return err + } + return err + } + + // If DeletionTimestamp of the netPolObj is set, start cleaning up lastly applied states. + // This is early cleaning up process from updateNetPol event + if netPolObj.ObjectMeta.DeletionTimestamp != nil || netPolObj.ObjectMeta.DeletionGracePeriodSeconds != nil { + err = c.cleanUpNetworkPolicy(key, SafeToCleanUpAzureNpmChain) + if err != nil { + return fmt.Errorf("Error: %v when ObjectMeta.DeletionTimestamp field is set\n", err) + } + return nil + } + + err = c.syncAddAndUpdateNetPol(netPolObj) + if err != nil { + return fmt.Errorf("[syncNetPol] Error due to %s\n", err.Error()) + } + + return nil +} + +// initializeDefaultAzureNpmChain install default rules for kube-system and iptables +func (c *networkPolicyController) initializeDefaultAzureNpmChain() error { + if c.isAzureNpmChainCreated { + return nil + } + + ipsMgr := c.npMgr.NsMap[util.KubeAllNamespacesFlag].IpsMgr + iptMgr := c.npMgr.NsMap[util.KubeAllNamespacesFlag].iptMgr + if err := ipsMgr.CreateSet(util.KubeSystemFlag, append([]string{util.IpsetNetHashFlag})); err != nil { + return fmt.Errorf("[initializeDefaultAzureNpmChain] Error: failed to initialize kube-system ipset with err %s", err) + } + if err := iptMgr.InitNpmChains(); err != nil { + return fmt.Errorf("[initializeDefaultAzureNpmChain] Error: failed to initialize azure-npm chains with err %s", err) + } + + c.isAzureNpmChainCreated = true + return nil +} + +// syncAddAndUpdateNetPol handles a new network policy or an updated network policy object triggered by add and update events +func (c *networkPolicyController) syncAddAndUpdateNetPol(netPolObj *networkingv1.NetworkPolicy) error { + // This timer measures execution time to run this function regardless of success or failure cases + timer := metrics.StartNewTimer() + defer timer.StopAndRecord(metrics.AddPolicyExecTime) + + var err error + netpolKey, err := cache.MetaNamespaceKeyFunc(netPolObj) + if err != nil { + return fmt.Errorf("[syncAddAndUpdateNetPol] Error: while running MetaNamespaceKeyFunc err: %s", err) + } + + // Start reconciling loop to eventually meet cached states against the desired states from network policy. + // #1 If a new network policy is created, the network policy is not in RawNPMap. + // start translating policy and install translated ipset and iptables rules into kernel + // #2 If a network policy with -- is applied before and two network policy are the same object (same UID), + // first delete the applied network policy, then start translating policy and install translated ipset and iptables rules into kernel + // #3 If a network policy with -- is applied before and two network policy are the different object (different UID) due to missing some events for the old object + // first delete the applied network policy, then start translating policy and install translated ipset and iptables rules into kernel + // To deal with all three cases, we first delete network policy if possible, then install translated rules into kernel. + // (TODO): can optimize logic more to reduce computations. For example, apply only difference if possible like podController + + // Do not need to clean up default Azure NPM chain in deleteNetworkPolicy function, if network policy object is applied soon. + // So, avoid extra overhead to install default Azure NPM chain in initializeDefaultAzureNpmChain function. + // To achieve it, use flag unSafeToCleanUpAzureNpmChain to indicate that the default Azure NPM chain cannot be deleted. + // delete existing network policy + err = c.cleanUpNetworkPolicy(netpolKey, unSafeToCleanUpAzureNpmChain) + if err != nil { + return fmt.Errorf("[syncAddAndUpdateNetPol] Error: failed to deleteNetworkPolicy due to %s", err) + } + + // Install this default rules for kube-system and azure-npm chains if they are not initilized. + // Execute initializeDefaultAzureNpmChain function first before actually starting processing network policy object. + if err = c.initializeDefaultAzureNpmChain(); err != nil { + return fmt.Errorf("[syncNetPol] Error: due to %v", err) + } + + // Cache network object first before applying ipsets and iptables. + // If error happens while applying ipsets and iptables, + // the key is re-queued in workqueue and process this function again, which eventually meets desired states of network policy + c.npMgr.RawNpMap[netpolKey] = netPolObj + metrics.NumPolicies.Inc() + + ipsMgr := c.npMgr.NsMap[util.KubeAllNamespacesFlag].IpsMgr + iptMgr := c.npMgr.NsMap[util.KubeAllNamespacesFlag].iptMgr + sets, namedPorts, lists, ingressIPCidrs, egressIPCidrs, iptEntries := translatePolicy(netPolObj) + for _, set := range sets { + klog.Infof("Creating set: %v, hashedSet: %v", set, util.GetHashedName(set)) + if err = ipsMgr.CreateSet(set, append([]string{util.IpsetNetHashFlag})); err != nil { + return fmt.Errorf("[syncAddAndUpdateNetPol] Error: creating ipset %s with err: %v", set, err) + } + } + for _, set := range namedPorts { + klog.Infof("Creating set: %v, hashedSet: %v", set, util.GetHashedName(set)) + if err = ipsMgr.CreateSet(set, append([]string{util.IpsetIPPortHashFlag})); err != nil { + return fmt.Errorf("[syncAddAndUpdateNetPol] Error: creating ipset named port %s with err: %v", set, err) + } + } + for _, list := range lists { + if err = ipsMgr.CreateList(list); err != nil { + return fmt.Errorf("[syncAddAndUpdateNetPol] Error: creating ipset list %s with err: %v", list, err) + } + } + + if err = c.createCidrsRule("in", netPolObj.ObjectMeta.Name, netPolObj.ObjectMeta.Namespace, ingressIPCidrs, ipsMgr); err != nil { + return fmt.Errorf("[syncAddAndUpdateNetPol] Error: createCidrsRule in due to %v", err) + } + + if err = c.createCidrsRule("out", netPolObj.ObjectMeta.Name, netPolObj.ObjectMeta.Namespace, egressIPCidrs, ipsMgr); err != nil { + return fmt.Errorf("[syncAddAndUpdateNetPol] Error: createCidrsRule out due to %v", err) + } + + for _, iptEntry := range iptEntries { + if err = iptMgr.Add(iptEntry); err != nil { + return fmt.Errorf("[syncAddAndUpdateNetPol] Error: failed to apply iptables rule. Rule: %+v with err: %v", iptEntry, err) + } + } + + return nil +} + +// DeleteNetworkPolicy handles deleting network policy based on netPolKey. +func (c *networkPolicyController) cleanUpNetworkPolicy(netPolKey string, isSafeCleanUpAzureNpmChain IsSafeCleanUpAzureNpmChain) error { + cachedNetPolObj, cachedNetPolObjExists := c.npMgr.RawNpMap[netPolKey] + // if there is no applied network policy with the netPolKey, do not need to clean up process. + if !cachedNetPolObjExists { + return nil + } + + ipsMgr := c.npMgr.NsMap[util.KubeAllNamespacesFlag].IpsMgr + iptMgr := c.npMgr.NsMap[util.KubeAllNamespacesFlag].iptMgr + // translate policy from "cachedNetPolObj" + _, _, _, ingressIPCidrs, egressIPCidrs, iptEntries := translatePolicy(cachedNetPolObj) + + var err error + // delete iptables entries + for _, iptEntry := range iptEntries { + if err = iptMgr.Delete(iptEntry); err != nil { + return fmt.Errorf("[cleanUpNetworkPolicy] Error: failed to apply iptables rule. Rule: %+v with err: %v", iptEntry, err) + } + } + + // delete ipset list related to ingress CIDRs + if err = c.removeCidrsRule("in", cachedNetPolObj.Name, cachedNetPolObj.Namespace, ingressIPCidrs, ipsMgr); err != nil { + return fmt.Errorf("[cleanUpNetworkPolicy] Error: removeCidrsRule in due to %v", err) + } + + // delete ipset list related to egress CIDRs + if err = c.removeCidrsRule("out", cachedNetPolObj.Name, cachedNetPolObj.Namespace, egressIPCidrs, ipsMgr); err != nil { + return fmt.Errorf("[cleanUpNetworkPolicy] Error: removeCidrsRule out due to %v", err) + } + + // Sucess to clean up ipset and iptables operations in kernel and delete the cached network policy from RawNpMap + delete(c.npMgr.RawNpMap, netPolKey) + metrics.NumPolicies.Dec() + + // If there is no cached network policy in RawNPMap anymore and no immediate network policy to process, start cleaning up default azure npm chains + // However, UninitNpmChains function is failed which left failed states and will not retry, but functionally it is ok. + // (TODO): Ideally, need to decouple cleaning-up default azure npm chains from "network policy deletion" event. + if isSafeCleanUpAzureNpmChain && len(c.npMgr.RawNpMap) == 0 { + // Even though UninitNpmChains function returns error, isAzureNpmChainCreated sets up false. + // So, when a new network policy is added, the "default Azure NPM chain" can be installed. + c.isAzureNpmChainCreated = false + if err = iptMgr.UninitNpmChains(); err != nil { + utilruntime.HandleError(fmt.Errorf("Error: failed to uninitialize azure-npm chains with err: %s", err)) + return nil + } + } + + return nil +} + +func (c *networkPolicyController) createCidrsRule(ingressOrEgress, policyName, ns string, ipsetEntries [][]string, ipsMgr *ipsm.IpsetManager) error { + spec := append([]string{util.IpsetNetHashFlag, util.IpsetMaxelemName, util.IpsetMaxelemNum}) + + for i, ipCidrSet := range ipsetEntries { + if ipCidrSet == nil || len(ipCidrSet) == 0 { + continue + } + setName := policyName + "-in-ns-" + ns + "-" + strconv.Itoa(i) + ingressOrEgress + klog.Infof("Creating set: %v, hashedSet: %v", setName, util.GetHashedName(setName)) + if err := ipsMgr.CreateSet(setName, spec); err != nil { + return fmt.Errorf("[createCidrsRule] Error: creating ipset %s with err: %v", ipCidrSet, err) + } + for _, ipCidrEntry := range util.DropEmptyFields(ipCidrSet) { + // Ipset doesn't allow 0.0.0.0/0 to be added. A general solution is split 0.0.0.0/1 in half which convert to + // 1.0.0.0/1 and 128.0.0.0/1 + if ipCidrEntry == "0.0.0.0/0" { + splitEntry := [2]string{"1.0.0.0/1", "128.0.0.0/1"} + for _, entry := range splitEntry { + if err := ipsMgr.AddToSet(setName, entry, util.IpsetNetHashFlag, ""); err != nil { + return fmt.Errorf("[createCidrsRule] adding ip cidrs %s into ipset %s with err: %v", entry, ipCidrSet, err) + } + } + } else { + if err := ipsMgr.AddToSet(setName, ipCidrEntry, util.IpsetNetHashFlag, ""); err != nil { + return fmt.Errorf("[createCidrsRule] adding ip cidrs %s into ipset %s with err: %v", ipCidrEntry, ipCidrSet, err) + } + } + } + } + + return nil +} + +func (c *networkPolicyController) removeCidrsRule(ingressOrEgress, policyName, ns string, ipsetEntries [][]string, ipsMgr *ipsm.IpsetManager) error { + for i, ipCidrSet := range ipsetEntries { + if ipCidrSet == nil || len(ipCidrSet) == 0 { + continue + } + setName := policyName + "-in-ns-" + ns + "-" + strconv.Itoa(i) + ingressOrEgress + klog.Infof("Delete set: %v, hashedSet: %v", setName, util.GetHashedName(setName)) + if err := ipsMgr.DeleteSet(setName); err != nil { + return fmt.Errorf("[removeCidrsRule] deleting ipset %s with err: %v", ipCidrSet, err) + } + } + + return nil +} + +// GetProcessedNPKey will return netpolKey +// (TODO): will use this function when optimizing management of multiple network policies with merging and deducting multiple network policies. +// func (c *networkPolicyController) getProcessedNPKey(netPolObj *networkingv1.NetworkPolicy) string { +// // hashSelector will never be empty +// // (TODO): what if PodSelector is [] or nothing? - make the Unit test for this +// hashedPodSelector := HashSelector(&netPolObj.Spec.PodSelector) + +// // (TODO): any chance to have namespace has zero length? +// if len(netPolObj.GetNamespace()) > 0 { +// hashedPodSelector = netPolObj.GetNamespace() + "/" + hashedPodSelector +// } +// return util.GetNSNameWithPrefix(hashedPodSelector) +// } diff --git a/npm/networkPolicyController_test.go b/npm/networkPolicyController_test.go new file mode 100644 index 0000000000..a1904e7dee --- /dev/null +++ b/npm/networkPolicyController_test.go @@ -0,0 +1,366 @@ +// Copyright 2018 Microsoft. All rights reserved. +// MIT License +package npm + +import ( + "fmt" + "strconv" + "testing" + + "github.com/Azure/azure-container-networking/npm/ipsm" + "github.com/Azure/azure-container-networking/npm/iptm" + "github.com/Azure/azure-container-networking/npm/metrics" + "github.com/Azure/azure-container-networking/npm/metrics/promutil" + + corev1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" + kubeinformers "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" +) + +type netPolFixture struct { + t *testing.T + + kubeclient *k8sfake.Clientset + // Objects to put in the store. + netPolLister []*networkingv1.NetworkPolicy + // (TODO) Actions expected to happen on the client. Will use this to check action. + kubeactions []core.Action + // Objects from here preloaded into NewSimpleFake. + kubeobjects []runtime.Object + + // (TODO) will remove npMgr if possible + npMgr *NetworkPolicyManager + ipsMgr *ipsm.IpsetManager + iptMgr *iptm.IptablesManager + + netPolController *networkPolicyController + kubeInformer kubeinformers.SharedInformerFactory + + // to test whether unnecessary enqueuing event into workqueue was correctly filtered in eventhandler code of network policy controller + isEnqueueEventIntoWorkQueue bool +} + +func newNetPolFixture(t *testing.T) *netPolFixture { + f := &netPolFixture{ + t: t, + netPolLister: []*networkingv1.NetworkPolicy{}, + kubeobjects: []runtime.Object{}, + npMgr: newNPMgr(t), + ipsMgr: ipsm.NewIpsetManager(), + iptMgr: iptm.NewIptablesManager(), + isEnqueueEventIntoWorkQueue: true, + } + + f.npMgr.RawNpMap = make(map[string]*networkingv1.NetworkPolicy) + + // While running "make test-all", metrics hold states which was executed in previous unit test. + // (TODO): Need to fix to remove this fundamental dependency + metrics.ReInitializeAllMetrics() + + return f +} + +func (f *netPolFixture) newNetPolController(stopCh chan struct{}) { + f.kubeclient = k8sfake.NewSimpleClientset(f.kubeobjects...) + f.kubeInformer = kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriodFunc()) + + f.netPolController = NewNetworkPolicyController(f.kubeInformer.Networking().V1().NetworkPolicies(), f.kubeclient, f.npMgr) + f.netPolController.netPolListerSynced = alwaysReady + + for _, netPol := range f.netPolLister { + f.kubeInformer.Networking().V1().NetworkPolicies().Informer().GetIndexer().Add(netPol) + } + + f.kubeInformer.Start(stopCh) +} + +func (f *netPolFixture) saveIpTables(iptablesConfigFile string) { + if err := f.iptMgr.Save(iptablesConfigFile); err != nil { + f.t.Errorf("Failed to save iptables rules") + } +} + +func (f *netPolFixture) restoreIpTables(iptablesConfigFile string) { + if err := f.iptMgr.Restore(iptablesConfigFile); err != nil { + f.t.Errorf("Failed to restore iptables rules") + } +} + +func (f *netPolFixture) saveIpSet(ipsetConfigFile string) { + // call /sbin/ipset save -file /var/log/ipset-test.conf + f.t.Logf("Start storing ipset to %s", ipsetConfigFile) + if err := f.ipsMgr.Save(ipsetConfigFile); err != nil { + f.t.Errorf("Failed to save ipsets") + } +} +func (f *netPolFixture) restoreIpSet(ipsetConfigFile string) { + // call /sbin/ipset restore -file /var/log/ipset-test.conf + f.t.Logf("Start re-storing ipset to %s", ipsetConfigFile) + if err := f.ipsMgr.Restore(ipsetConfigFile); err != nil { + f.t.Errorf("failed to restore ipsets") + } +} + +// (TODO): make createNetPol flexible +func createNetPol() *networkingv1.NetworkPolicy { + tcp := corev1.ProtocolTCP + port8000 := intstr.FromInt(8000) + return &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "allow-ingress", + Namespace: "test-nwpolicy", + }, + Spec: networkingv1.NetworkPolicySpec{ + Ingress: []networkingv1.NetworkPolicyIngressRule{ + networkingv1.NetworkPolicyIngressRule{ + From: []networkingv1.NetworkPolicyPeer{ + networkingv1.NetworkPolicyPeer{ + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "test"}, + }, + }, + networkingv1.NetworkPolicyPeer{ + IPBlock: &networkingv1.IPBlock{ + CIDR: "0.0.0.0/0", + }, + }, + }, + Ports: []networkingv1.NetworkPolicyPort{{ + Protocol: &tcp, + Port: &port8000, + }}, + }, + }, + Egress: []networkingv1.NetworkPolicyEgressRule{ + networkingv1.NetworkPolicyEgressRule{ + To: []networkingv1.NetworkPolicyPeer{{ + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "test"}, + }, + }}, + Ports: []networkingv1.NetworkPolicyPort{{ + Protocol: &tcp, + Port: &intstr.IntOrString{StrVal: "8000"}, // namedPort + }}, + }, + }, + }, + } +} + +func addNetPol(t *testing.T, f *netPolFixture, netPolObj *networkingv1.NetworkPolicy) { + // simulate "network policy" add event and add network policy object to sharedInformer cache + f.netPolController.addNetworkPolicy(netPolObj) + + if f.netPolController.workqueue.Len() == 0 { + f.isEnqueueEventIntoWorkQueue = false + return + } + + f.netPolController.processNextWorkItem() +} + +func deleteNetPol(t *testing.T, f *netPolFixture, netPolObj *networkingv1.NetworkPolicy) { + addNetPol(t, f, netPolObj) + t.Logf("Complete adding network policy event") + + // simulate network policy deletion event and delete network policy object from sharedInformer cache + f.kubeInformer.Networking().V1().NetworkPolicies().Informer().GetIndexer().Delete(netPolObj) + f.netPolController.deleteNetworkPolicy(netPolObj) + + if f.netPolController.workqueue.Len() == 0 { + f.isEnqueueEventIntoWorkQueue = false + return + } + + f.netPolController.processNextWorkItem() +} + +func updateNetPol(t *testing.T, f *netPolFixture, oldNetPolObj, netNetPolObj *networkingv1.NetworkPolicy) { + addNetPol(t, f, oldNetPolObj) + t.Logf("Complete adding network policy event") + + // simulate network policy update event and update the network policy to shared informer's cache + f.kubeInformer.Networking().V1().NetworkPolicies().Informer().GetIndexer().Update(netNetPolObj) + f.netPolController.updateNetworkPolicy(oldNetPolObj, netNetPolObj) + + if f.netPolController.workqueue.Len() == 0 { + f.isEnqueueEventIntoWorkQueue = false + return + } + + f.netPolController.processNextWorkItem() +} + +type expectedNetPolValues struct { + expectedLenOfNsMap int + expectedLenOfRawNpMap int + expectedLenOfWorkQueue int + expectedIsAzureNpmChainCreated bool + expectedEnqueueEventIntoWorkQueue bool + // prometheus metrics + expectedNumPoliciesMetric int + expectedNumPoliciesMetricError error + expectedCountOfAddPolicyExecTimeMetric int + expectedCountOfAddPolicyExecTimeMetricError error +} + +func checkNetPolTestResult(testName string, f *netPolFixture, testCases []expectedNetPolValues) { + for _, test := range testCases { + if got := len(f.npMgr.NsMap); got != test.expectedLenOfNsMap { + f.t.Errorf("npMgr namespace map length = %d, want %d", got, test.expectedLenOfNsMap) + } + + if got := len(f.netPolController.npMgr.RawNpMap); got != test.expectedLenOfRawNpMap { + f.t.Errorf("Raw NetPol Map length = %d, want %d", got, test.expectedLenOfRawNpMap) + } + + if got := f.netPolController.workqueue.Len(); got != test.expectedLenOfWorkQueue { + f.t.Errorf("Workqueue length = %d, want %d", got, test.expectedLenOfWorkQueue) + } + + if got := f.netPolController.isAzureNpmChainCreated; got != test.expectedIsAzureNpmChainCreated { + f.t.Errorf("isAzureNpmChainCreated %v, want %v", got, test.expectedIsAzureNpmChainCreated) + } + + if got := f.isEnqueueEventIntoWorkQueue; got != test.expectedEnqueueEventIntoWorkQueue { + f.t.Errorf("isEnqueueEventIntoWorkQueue %v, want %v", got, test.expectedEnqueueEventIntoWorkQueue) + } + + // Check prometheus metrics + expectedNumPoliciesMetrics, expectedNumPoliciesMetricsError := promutil.GetValue(metrics.NumPolicies) + if expectedNumPoliciesMetrics != test.expectedNumPoliciesMetric { + f.t.Errorf("NumPolicies metric length = %d, want %d", expectedNumPoliciesMetrics, test.expectedNumPoliciesMetric) + } + if expectedNumPoliciesMetricsError != test.expectedNumPoliciesMetricError { + f.t.Errorf("NumPolicies metric error = %s, want %s", expectedNumPoliciesMetricsError, test.expectedNumPoliciesMetricError) + } + + expectedCountOfAddPolicyExecTimeMetric, expectedCountOfAddPolicyExecTimeMetricError := promutil.GetCountValue(metrics.AddPolicyExecTime) + if expectedCountOfAddPolicyExecTimeMetric != test.expectedCountOfAddPolicyExecTimeMetric { + f.t.Errorf("CountOfAddPolicyExecTime metric length = %d, want %d", expectedCountOfAddPolicyExecTimeMetric, test.expectedCountOfAddPolicyExecTimeMetric) + } + if expectedCountOfAddPolicyExecTimeMetricError != test.expectedCountOfAddPolicyExecTimeMetricError { + f.t.Errorf("CountOfAddPolicyExecTime metric error = %s, want %s", expectedCountOfAddPolicyExecTimeMetricError, test.expectedCountOfAddPolicyExecTimeMetricError) + } + } +} + +func TestAddMultipleNetworkPolicies(t *testing.T) { + netPolObj1 := createNetPol() + + // deep copy netPolObj1 and change namespace, name, and porttype (to namedPort) since current createNetPol is not flexble. + netPolObj2 := netPolObj1.DeepCopy() + netPolObj2.Namespace = fmt.Sprintf("%s-new", netPolObj1.Namespace) + netPolObj2.Name = fmt.Sprintf("%s-new", netPolObj1.Name) + // namedPort + netPolObj2.Spec.Ingress[0].Ports[0].Port = &intstr.IntOrString{StrVal: fmt.Sprintf("%s", netPolObj2.Name)} + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, netPolObj1, netPolObj2) + f.kubeobjects = append(f.kubeobjects, netPolObj1, netPolObj2) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNetPolController(stopCh) + + addNetPol(t, f, netPolObj1) + addNetPol(t, f, netPolObj2) + + testCases := []expectedNetPolValues{ + {1, 2, 0, true, true, 2, nil, 2, nil}, + } + checkNetPolTestResult("TestAddMultipleNetPols", f, testCases) +} + +func TestAddNetworkPolicy(t *testing.T) { + netPolObj := createNetPol() + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, netPolObj) + f.kubeobjects = append(f.kubeobjects, netPolObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNetPolController(stopCh) + + addNetPol(t, f, netPolObj) + testCases := []expectedNetPolValues{ + {1, 1, 0, true, true, 1, nil, 1, nil}, + } + + checkNetPolTestResult("TestAddNetPol", f, testCases) +} + +func TestDeleteNetworkPolicy(t *testing.T) { + netPolObj := createNetPol() + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, netPolObj) + f.kubeobjects = append(f.kubeobjects, netPolObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNetPolController(stopCh) + + deleteNetPol(t, f, netPolObj) + testCases := []expectedNetPolValues{ + {1, 0, 0, false, true, 0, nil, 1, nil}, + } + checkNetPolTestResult("TestDelNetPol", f, testCases) +} + +// this unit test is for the case where states of network policy are changed, but network policy controller does not need to reconcile. +// Check it with expectedEnqueueEventIntoWorkQueue variable. +func TestUpdateNetworkPolicy(t *testing.T) { + oldNetPolObj := createNetPol() + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, oldNetPolObj) + f.kubeobjects = append(f.kubeobjects, oldNetPolObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNetPolController(stopCh) + + newNetPolObj := oldNetPolObj.DeepCopy() + // oldNetPolObj.ResourceVersion value is "0" + newRV, _ := strconv.Atoi(oldNetPolObj.ResourceVersion) + newNetPolObj.ResourceVersion = fmt.Sprintf("%d", newRV+1) + + updateNetPol(t, f, oldNetPolObj, newNetPolObj) + testCases := []expectedNetPolValues{ + {1, 1, 0, true, false, 1, nil, 1, nil}, + } + checkNetPolTestResult("TestUpdateNetPol", f, testCases) +} + +func TestLabelUpdateNetworkPolicy(t *testing.T) { + oldNetPolObj := createNetPol() + + f := newNetPolFixture(t) + f.netPolLister = append(f.netPolLister, oldNetPolObj) + f.kubeobjects = append(f.kubeobjects, oldNetPolObj) + stopCh := make(chan struct{}) + defer close(stopCh) + f.newNetPolController(stopCh) + + newNetPolObj := oldNetPolObj.DeepCopy() + // update podSelctor in a new network policy field + newNetPolObj.Spec.PodSelector = metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + "new": "test", + }, + } + // oldNetPolObj.ResourceVersion value is "0" + newRV, _ := strconv.Atoi(oldNetPolObj.ResourceVersion) + newNetPolObj.ResourceVersion = fmt.Sprintf("%d", newRV+1) + updateNetPol(t, f, oldNetPolObj, newNetPolObj) + + testCases := []expectedNetPolValues{ + {1, 1, 0, true, true, 1, nil, 2, nil}, + } + checkNetPolTestResult("TestUpdateNetPol", f, testCases) +} diff --git a/npm/npm.go b/npm/npm.go index 7efd6bf049..658b1402a5 100644 --- a/npm/npm.go +++ b/npm/npm.go @@ -49,16 +49,16 @@ type NetworkPolicyManager struct { podController *podController nsInformer coreinformers.NamespaceInformer - npInformer networkinginformers.NetworkPolicyInformer nameSpaceController *nameSpaceController - NodeName string - NsMap map[string]*Namespace - PodMap map[string]*NpmPod // Key is / - RawNpMap map[string]*networkingv1.NetworkPolicy // Key is ns-/ - ProcessedNpMap map[string]*networkingv1.NetworkPolicy // Key is ns-/ - isAzureNpmChainCreated bool - isSafeToCleanUpAzureNpmChain bool + npInformer networkinginformers.NetworkPolicyInformer + netPolController *networkPolicyController + + NodeName string + NsMap map[string]*Namespace // Key is ns- + PodMap map[string]*NpmPod // Key is / + RawNpMap map[string]*networkingv1.NetworkPolicy // Key is / + ProcessedNpMap map[string]*networkingv1.NetworkPolicy // Key is / clusterState telemetry.ClusterState version string @@ -187,10 +187,10 @@ func (npMgr *NetworkPolicyManager) Start(stopCh <-chan struct{}) error { return fmt.Errorf("Network policy informer failed to sync") } - // TODO: any dependency among below functions? - // start pod controller after synced + // start controllers after synced go npMgr.podController.Run(threadness, stopCh) go npMgr.nameSpaceController.Run(threadness, stopCh) + go npMgr.netPolController.Run(threadness, stopCh) go npMgr.reconcileChains() go npMgr.backup() @@ -234,18 +234,16 @@ func NewNetworkPolicyManager(clientset *kubernetes.Clientset, informerFactory in } npMgr := &NetworkPolicyManager{ - clientset: clientset, - informerFactory: informerFactory, - podInformer: podInformer, - nsInformer: nsInformer, - npInformer: npInformer, - NodeName: os.Getenv("HOSTNAME"), - NsMap: make(map[string]*Namespace), - PodMap: make(map[string]*NpmPod), - RawNpMap: make(map[string]*networkingv1.NetworkPolicy), - ProcessedNpMap: make(map[string]*networkingv1.NetworkPolicy), - isAzureNpmChainCreated: false, - isSafeToCleanUpAzureNpmChain: false, + clientset: clientset, + informerFactory: informerFactory, + podInformer: podInformer, + nsInformer: nsInformer, + npInformer: npInformer, + NodeName: os.Getenv("HOSTNAME"), + NsMap: make(map[string]*Namespace), + PodMap: make(map[string]*NpmPod), + RawNpMap: make(map[string]*networkingv1.NetworkPolicy), + ProcessedNpMap: make(map[string]*networkingv1.NetworkPolicy), clusterState: telemetry.ClusterState{ PodCount: 0, NsCount: 0, @@ -271,53 +269,8 @@ func NewNetworkPolicyManager(clientset *kubernetes.Clientset, informerFactory in // create NameSpace controller npMgr.nameSpaceController = NewNameSpaceController(nsInformer, clientset, npMgr) - npInformer.Informer().AddEventHandler( - // Network policy event handlers - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - networkPolicyObj, ok := obj.(*networkingv1.NetworkPolicy) - if !ok { - metrics.SendErrorLogAndMetric(util.NpmID, "ADD Network Policy: Received unexpected object type: %v", obj) - return - } - npMgr.Lock() - npMgr.AddNetworkPolicy(networkPolicyObj) - npMgr.Unlock() - }, - UpdateFunc: func(old, new interface{}) { - oldNetworkPolicyObj, ok := old.(*networkingv1.NetworkPolicy) - if !ok { - metrics.SendErrorLogAndMetric(util.NpmID, "UPDATE Network Policy: Received unexpected old object type: %v", oldNetworkPolicyObj) - return - } - newNetworkPolicyObj, ok := new.(*networkingv1.NetworkPolicy) - if !ok { - metrics.SendErrorLogAndMetric(util.NpmID, "UPDATE Network Policy: Received unexpected new object type: %v", newNetworkPolicyObj) - return - } - npMgr.Lock() - npMgr.UpdateNetworkPolicy(oldNetworkPolicyObj, newNetworkPolicyObj) - npMgr.Unlock() - }, - DeleteFunc: func(obj interface{}) { - networkPolicyObj, ok := obj.(*networkingv1.NetworkPolicy) - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - metrics.SendErrorLogAndMetric(util.NpmID, "DELETE Network Policy: Received unexpected object type: %v", obj) - return - } - if networkPolicyObj, ok = tombstone.Obj.(*networkingv1.NetworkPolicy); !ok { - metrics.SendErrorLogAndMetric(util.NpmID, "DELETE Network Policy: Received unexpected object type: %v", obj) - return - } - } - npMgr.Lock() - npMgr.DeleteNetworkPolicy(networkPolicyObj) - npMgr.Unlock() - }, - }, - ) + // create network policy controller + npMgr.netPolController = NewNetworkPolicyController(npInformer, clientset, npMgr) return npMgr } diff --git a/npm/nwpolicy.go b/npm/nwpolicy.go deleted file mode 100644 index e03ef79049..0000000000 --- a/npm/nwpolicy.go +++ /dev/null @@ -1,298 +0,0 @@ -// Copyright 2018 Microsoft. All rights reserved. -// MIT License -package npm - -import ( - "fmt" - "strconv" - - "github.com/Azure/azure-container-networking/log" - "github.com/Azure/azure-container-networking/npm/ipsm" - "github.com/Azure/azure-container-networking/npm/iptm" - "github.com/Azure/azure-container-networking/npm/metrics" - "github.com/Azure/azure-container-networking/npm/util" - networkingv1 "k8s.io/api/networking/v1" -) - -// GetNetworkPolicyKey will return netpolKey -func GetNetworkPolicyKey(npObj *networkingv1.NetworkPolicy) string { - netpolKey, err := util.GetObjKeyFunc(npObj) - if err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[GetNetworkPolicyKey] Error: while running MetaNamespaceKeyFunc err: %s", err) - return "" - } - if len(netpolKey) == 0 { - return "" - } - return util.GetNSNameWithPrefix(netpolKey) -} - -// GetProcessedNPKey will return netpolKey -func GetProcessedNPKey(npObj *networkingv1.NetworkPolicy, hashSelector string) string { - // hashSelector will never be empty - netpolKey := hashSelector - if len(npObj.GetNamespace()) > 0 { - netpolKey = npObj.GetNamespace() + "/" + netpolKey - } - return util.GetNSNameWithPrefix(netpolKey) -} - -func (npMgr *NetworkPolicyManager) canCleanUpNpmChains() bool { - if !npMgr.isSafeToCleanUpAzureNpmChain { - return false - } - - if len(npMgr.ProcessedNpMap) > 0 { - return false - } - - return true -} - -func (npMgr *NetworkPolicyManager) policyExists(npObj *networkingv1.NetworkPolicy) bool { - npKey := GetNetworkPolicyKey(npObj) - if npKey == "" { - return false - } - - np, exists := npMgr.RawNpMap[npKey] - if !exists { - return false - } - - if !util.CompareResourceVersions(np.ObjectMeta.ResourceVersion, npObj.ObjectMeta.ResourceVersion) { - log.Logf("Cached Network Policy has larger ResourceVersion number than new Obj. Name: %s Cached RV: %d New RV: %d\n", - npObj.ObjectMeta.Name, - np.ObjectMeta.ResourceVersion, - npObj.ObjectMeta.ResourceVersion, - ) - return true - } - - if isSamePolicy(np, npObj) { - return true - } - - return false -} - -// AddNetworkPolicy handles adding network policy to iptables. -func (npMgr *NetworkPolicyManager) AddNetworkPolicy(npObj *networkingv1.NetworkPolicy) error { - var ( - err error - npNs = util.GetNSNameWithPrefix(npObj.ObjectMeta.Namespace) - npName = npObj.ObjectMeta.Name - allNs = npMgr.NsMap[util.KubeAllNamespacesFlag] - timer = metrics.StartNewTimer() - hashedSelector = HashSelector(&npObj.Spec.PodSelector) - npKey = GetNetworkPolicyKey(npObj) - npProcessedKey = GetProcessedNPKey(npObj, hashedSelector) - ) - - log.Logf("NETWORK POLICY CREATING: NameSpace%s, Name:%s", npNs, npName) - - if npKey == "" { - err = fmt.Errorf("[AddNetworkPolicy] Error: npKey is empty for %s network policy in %s", npName, npNs) - metrics.SendErrorLogAndMetric(util.NetpolID, err.Error()) - return err - } - - if npMgr.policyExists(npObj) { - return nil - } - - if !npMgr.isAzureNpmChainCreated { - if err = allNs.IpsMgr.CreateSet(util.KubeSystemFlag, append([]string{util.IpsetNetHashFlag})); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[AddNetworkPolicy] Error: failed to initialize kube-system ipset with err %s", err) - return err - } - - if err = allNs.iptMgr.InitNpmChains(); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[AddNetworkPolicy] Error: failed to initialize azure-npm chains with err %s", err) - return err - } - - npMgr.isAzureNpmChainCreated = true - } - - var ( - addedPolicy *networkingv1.NetworkPolicy - sets, namedPorts, lists []string - ingressIPCidrs, egressIPCidrs [][]string - iptEntries []*iptm.IptEntry - ipsMgr = allNs.IpsMgr - ) - - // Remove the existing policy from processed (merged) network policy map - if oldPolicy, oldPolicyExists := npMgr.RawNpMap[npKey]; oldPolicyExists { - npMgr.isSafeToCleanUpAzureNpmChain = false - npMgr.DeleteNetworkPolicy(oldPolicy) - npMgr.isSafeToCleanUpAzureNpmChain = true - } - - // Add (merge) the new policy with others who apply to the same pods - if oldPolicy, oldPolicyExists := npMgr.ProcessedNpMap[npProcessedKey]; oldPolicyExists { - addedPolicy, err = addPolicy(oldPolicy, npObj) - if err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[AddNetworkPolicy] Error: adding policy %s to %s with err: %v", npName, oldPolicy.ObjectMeta.Name, err) - return err - } - } - - if addedPolicy != nil { - npMgr.ProcessedNpMap[npProcessedKey] = addedPolicy - } else { - npMgr.ProcessedNpMap[npProcessedKey] = npObj - } - - sets, namedPorts, lists, ingressIPCidrs, egressIPCidrs, iptEntries = translatePolicy(npObj) - for _, set := range sets { - log.Logf("Creating set: %v, hashedSet: %v", set, util.GetHashedName(set)) - if err = ipsMgr.CreateSet(set, append([]string{util.IpsetNetHashFlag})); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[AddNetworkPolicy] Error: creating ipset %s with err: %v", set, err) - return err - } - } - for _, set := range namedPorts { - log.Logf("Creating set: %v, hashedSet: %v", set, util.GetHashedName(set)) - if err = ipsMgr.CreateSet(set, append([]string{util.IpsetIPPortHashFlag})); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[AddNetworkPolicy] Error: creating ipset named port %s with err: %v", set, err) - return err - } - } - for _, list := range lists { - if err = ipsMgr.CreateList(list); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[AddNetworkPolicy] Error: creating ipset list %s with err: %v", list, err) - return err - } - } - - createCidrsRule("in", npObj.ObjectMeta.Name, npObj.ObjectMeta.Namespace, ingressIPCidrs, ipsMgr) - createCidrsRule("out", npObj.ObjectMeta.Name, npObj.ObjectMeta.Namespace, egressIPCidrs, ipsMgr) - iptMgr := allNs.iptMgr - for _, iptEntry := range iptEntries { - if err = iptMgr.Add(iptEntry); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[AddNetworkPolicy] Error: failed to apply iptables rule. Rule: %+v with err: %v", iptEntry, err) - return err - } - } - npMgr.RawNpMap[npKey] = npObj - - metrics.NumPolicies.Inc() - timer.StopAndRecord(metrics.AddPolicyExecTime) - - return nil -} - -// UpdateNetworkPolicy handles updateing network policy in iptables. -func (npMgr *NetworkPolicyManager) UpdateNetworkPolicy(oldNpObj *networkingv1.NetworkPolicy, newNpObj *networkingv1.NetworkPolicy) error { - if newNpObj.ObjectMeta.DeletionTimestamp == nil && newNpObj.ObjectMeta.DeletionGracePeriodSeconds == nil { - log.Logf("NETWORK POLICY UPDATING") - return npMgr.AddNetworkPolicy(newNpObj) - } - - return nil -} - -// DeleteNetworkPolicy handles deleting network policy from iptables. -func (npMgr *NetworkPolicyManager) DeleteNetworkPolicy(npObj *networkingv1.NetworkPolicy) error { - var ( - err error - allNs = npMgr.NsMap[util.KubeAllNamespacesFlag] - hashedSelector = HashSelector(&npObj.Spec.PodSelector) - npKey = GetNetworkPolicyKey(npObj) - npProcessedKey = GetProcessedNPKey(npObj, hashedSelector) - ) - - npNs, npName := util.GetNSNameWithPrefix(npObj.ObjectMeta.Namespace), npObj.ObjectMeta.Name - log.Logf("NETWORK POLICY DELETING: Namespace: %s, Name:%s", npNs, npName) - - if npKey == "" { - err = fmt.Errorf("[AddNetworkPolicy] Error: npKey is empty for %s network policy in %s", npName, npNs) - metrics.SendErrorLogAndMetric(util.NetpolID, err.Error()) - return err - } - - _, _, _, ingressIPCidrs, egressIPCidrs, iptEntries := translatePolicy(npObj) - - iptMgr := allNs.iptMgr - for _, iptEntry := range iptEntries { - if err = iptMgr.Delete(iptEntry); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[DeleteNetworkPolicy] Error: failed to apply iptables rule. Rule: %+v with err: %v", iptEntry, err) - return err - } - } - - removeCidrsRule("in", npObj.ObjectMeta.Name, npObj.ObjectMeta.Namespace, ingressIPCidrs, allNs.IpsMgr) - removeCidrsRule("out", npObj.ObjectMeta.Name, npObj.ObjectMeta.Namespace, egressIPCidrs, allNs.IpsMgr) - - if oldPolicy, oldPolicyExists := npMgr.ProcessedNpMap[npProcessedKey]; oldPolicyExists { - deductedPolicy, err := deductPolicy(oldPolicy, npObj) - if err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[DeleteNetworkPolicy] Error: deducting policy %s from %s with err: %v", npName, oldPolicy.ObjectMeta.Name, err) - return err - } - - if deductedPolicy == nil { - delete(npMgr.ProcessedNpMap, npProcessedKey) - } else { - npMgr.ProcessedNpMap[npProcessedKey] = deductedPolicy - } - } - - if npMgr.canCleanUpNpmChains() { - npMgr.isAzureNpmChainCreated = false - if err = iptMgr.UninitNpmChains(); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[DeleteNetworkPolicy] Error: failed to uninitialize azure-npm chains with err: %s", err) - return err - } - } - delete(npMgr.RawNpMap, npKey) - - metrics.NumPolicies.Dec() - - return nil -} - -func createCidrsRule(ingressOrEgress, policyName, ns string, ipsetEntries [][]string, ipsMgr *ipsm.IpsetManager) { - spec := append([]string{util.IpsetNetHashFlag, util.IpsetMaxelemName, util.IpsetMaxelemNum}) - for i, ipCidrSet := range ipsetEntries { - if ipCidrSet == nil || len(ipCidrSet) == 0 { - continue - } - setName := policyName + "-in-ns-" + ns + "-" + strconv.Itoa(i) + ingressOrEgress - log.Logf("Creating set: %v, hashedSet: %v", setName, util.GetHashedName(setName)) - if err := ipsMgr.CreateSet(setName, spec); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[createCidrsRule] Error: creating ipset %s with err: %v", ipCidrSet, err) - } - for _, ipCidrEntry := range util.DropEmptyFields(ipCidrSet) { - // Ipset doesn't allow 0.0.0.0/0 to be added. A general solution is split 0.0.0.0/1 in half which convert to - // 1.0.0.0/1 and 128.0.0.0/1 - if ipCidrEntry == "0.0.0.0/0" { - splitEntry := [2]string{"1.0.0.0/1", "128.0.0.0/1"} - for _, entry := range splitEntry { - if err := ipsMgr.AddToSet(setName, entry, util.IpsetNetHashFlag, ""); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[createCidrsRule] adding ip cidrs %s into ipset %s with err: %v", entry, ipCidrSet, err) - } - } - } else { - if err := ipsMgr.AddToSet(setName, ipCidrEntry, util.IpsetNetHashFlag, ""); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[createCidrsRule] adding ip cidrs %s into ipset %s with err: %v", ipCidrEntry, ipCidrSet, err) - } - } - } - } -} - -func removeCidrsRule(ingressOrEgress, policyName, ns string, ipsetEntries [][]string, ipsMgr *ipsm.IpsetManager) { - for i, ipCidrSet := range ipsetEntries { - if ipCidrSet == nil || len(ipCidrSet) == 0 { - continue - } - setName := policyName + "-in-ns-" + ns + "-" + strconv.Itoa(i) + ingressOrEgress - log.Logf("Delete set: %v, hashedSet: %v", setName, util.GetHashedName(setName)) - if err := ipsMgr.DeleteSet(setName); err != nil { - metrics.SendErrorLogAndMetric(util.NetpolID, "[removeCidrsRule] deleting ipset %s with err: %v", ipCidrSet, err) - } - } -} diff --git a/npm/nwpolicy_test.go b/npm/nwpolicy_test.go deleted file mode 100644 index 256d0556f8..0000000000 --- a/npm/nwpolicy_test.go +++ /dev/null @@ -1,364 +0,0 @@ -// Copyright 2018 Microsoft. All rights reserved. -// MIT License -package npm - -import ( - "testing" - - "github.com/Azure/azure-container-networking/npm/ipsm" - "github.com/Azure/azure-container-networking/npm/iptm" - "github.com/Azure/azure-container-networking/npm/metrics" - "github.com/Azure/azure-container-networking/npm/metrics/promutil" - "github.com/Azure/azure-container-networking/npm/util" - - corev1 "k8s.io/api/core/v1" - networkingv1 "k8s.io/api/networking/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" -) - -func TestAddNetworkPolicy(t *testing.T) { - npMgr := &NetworkPolicyManager{ - NsMap: make(map[string]*Namespace), - PodMap: make(map[string]*NpmPod), - RawNpMap: make(map[string]*networkingv1.NetworkPolicy), - ProcessedNpMap: make(map[string]*networkingv1.NetworkPolicy), - TelemetryEnabled: false, - } - - allNs, err := newNs(util.KubeAllNamespacesFlag) - if err != nil { - panic(err.Error) - } - npMgr.NsMap[util.KubeAllNamespacesFlag] = allNs - - iptMgr := iptm.NewIptablesManager() - if err := iptMgr.Save(util.IptablesTestConfigFile); err != nil { - t.Errorf("TestAddNetworkPolicy failed @ iptMgr.Save") - } - - ipsMgr := ipsm.NewIpsetManager() - if err := ipsMgr.Save(util.IpsetTestConfigFile); err != nil { - t.Errorf("TestAddNetworkPolicy failed @ ipsMgr.Save") - } - - // Create ns-kube-system set - if err := ipsMgr.CreateSet("ns-"+util.KubeSystemFlag, append([]string{util.IpsetNetHashFlag})); err != nil { - t.Errorf("TestAddNetworkPolicy failed @ ipsMgr.CreateSet, adding kube-system set%+v", err) - } - - defer func() { - if err := iptMgr.Restore(util.IptablesTestConfigFile); err != nil { - t.Errorf("TestAddNetworkPolicy failed @ iptMgr.Restore") - } - - if err := ipsMgr.Restore(util.IpsetTestConfigFile); err != nil { - t.Errorf("TestAddNetworkPolicy failed @ ipsMgr.Restore") - } - }() - - tcp := corev1.ProtocolTCP - port8000 := intstr.FromInt(8000) - allowIngress := &networkingv1.NetworkPolicy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "allow-ingress", - Namespace: "test-nwpolicy", - }, - Spec: networkingv1.NetworkPolicySpec{ - Ingress: []networkingv1.NetworkPolicyIngressRule{ - networkingv1.NetworkPolicyIngressRule{ - From: []networkingv1.NetworkPolicyPeer{ - networkingv1.NetworkPolicyPeer{ - PodSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "test"}, - }, - }, - networkingv1.NetworkPolicyPeer{ - IPBlock: &networkingv1.IPBlock{ - CIDR: "0.0.0.0/0", - }, - }, - }, - Ports: []networkingv1.NetworkPolicyPort{{ - Protocol: &tcp, - Port: &port8000, - }}, - }, - }, - }, - } - - gaugeVal, err1 := promutil.GetValue(metrics.NumPolicies) - countVal, err2 := promutil.GetCountValue(metrics.AddPolicyExecTime) - - npMgr.Lock() - if err := npMgr.AddNetworkPolicy(allowIngress); err != nil { - t.Errorf("TestAddNetworkPolicy failed @ allowIngress AddNetworkPolicy") - t.Errorf("Error: %v", err) - } - npMgr.Unlock() - - ipsMgr = npMgr.NsMap[util.KubeAllNamespacesFlag].IpsMgr - - // Check whether 0.0.0.0/0 got translated to 1.0.0.0/1 and 128.0.0.0/1 - if !ipsMgr.Exists("allow-ingress-in-ns-test-nwpolicy-0in", "1.0.0.0/1", util.IpsetNetHashFlag) { - t.Errorf("TestDeleteFromSet failed @ ipsMgr.AddToSet") - } - - if !ipsMgr.Exists("allow-ingress-in-ns-test-nwpolicy-0in", "128.0.0.0/1", util.IpsetNetHashFlag) { - t.Errorf("TestDeleteFromSet failed @ ipsMgr.AddToSet") - } - - allowEgress := &networkingv1.NetworkPolicy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "allow-egress", - Namespace: "test-nwpolicy", - }, - Spec: networkingv1.NetworkPolicySpec{ - Egress: []networkingv1.NetworkPolicyEgressRule{ - networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{{ - PodSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "test"}, - }, - }}, - Ports: []networkingv1.NetworkPolicyPort{{ - Protocol: &tcp, - Port: &port8000, - }}, - }, - }, - }, - } - - npMgr.Lock() - if err := npMgr.AddNetworkPolicy(allowEgress); err != nil { - t.Errorf("TestAddNetworkPolicy failed @ allowEgress AddNetworkPolicy") - t.Errorf("Error: %v", err) - } - npMgr.Unlock() - - newGaugeVal, err3 := promutil.GetValue(metrics.NumPolicies) - newCountVal, err4 := promutil.GetCountValue(metrics.AddPolicyExecTime) - promutil.NotifyIfErrors(t, err1, err2, err3, err4) - if newGaugeVal != gaugeVal+2 { - t.Errorf("Change in policy number didn't register in prometheus") - } - if newCountVal != countVal+2 { - t.Errorf("Execution time didn't register in prometheus") - } -} - -func TestUpdateNetworkPolicy(t *testing.T) { - npMgr := &NetworkPolicyManager{ - NsMap: make(map[string]*Namespace), - PodMap: make(map[string]*NpmPod), - RawNpMap: make(map[string]*networkingv1.NetworkPolicy), - ProcessedNpMap: make(map[string]*networkingv1.NetworkPolicy), - TelemetryEnabled: false, - } - - allNs, err := newNs(util.KubeAllNamespacesFlag) - if err != nil { - panic(err.Error) - } - npMgr.NsMap[util.KubeAllNamespacesFlag] = allNs - - iptMgr := iptm.NewIptablesManager() - if err := iptMgr.Save(util.IptablesTestConfigFile); err != nil { - t.Errorf("TestUpdateNetworkPolicy failed @ iptMgr.Save") - } - - ipsMgr := ipsm.NewIpsetManager() - if err := ipsMgr.Save(util.IpsetTestConfigFile); err != nil { - t.Errorf("TestUpdateNetworkPolicy failed @ ipsMgr.Save") - } - - defer func() { - if err := iptMgr.Restore(util.IptablesTestConfigFile); err != nil { - t.Errorf("TestUpdateNetworkPolicy failed @ iptMgr.Restore") - } - - if err := ipsMgr.Restore(util.IpsetTestConfigFile); err != nil { - t.Errorf("TestUpdateNetworkPolicy failed @ ipsMgr.Restore") - } - }() - - // Create ns-kube-system set - if err := ipsMgr.CreateSet("ns-"+util.KubeSystemFlag, append([]string{util.IpsetNetHashFlag})); err != nil { - t.Errorf("TestUpdateNetworkPolicy failed @ ipsMgr.CreateSet, adding kube-system set%+v", err) - } - - tcp, udp := corev1.ProtocolTCP, corev1.ProtocolUDP - allowIngress := &networkingv1.NetworkPolicy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "allow-ingress", - Namespace: "test-nwpolicy", - }, - Spec: networkingv1.NetworkPolicySpec{ - Ingress: []networkingv1.NetworkPolicyIngressRule{ - networkingv1.NetworkPolicyIngressRule{ - From: []networkingv1.NetworkPolicyPeer{{ - PodSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "test"}, - }, - }}, - Ports: []networkingv1.NetworkPolicyPort{{ - Protocol: &tcp, - Port: &intstr.IntOrString{ - StrVal: "8000", - }, - }}, - }, - }, - }, - } - - allowEgress := &networkingv1.NetworkPolicy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "allow-egress", - Namespace: "test-nwpolicy", - }, - Spec: networkingv1.NetworkPolicySpec{ - Egress: []networkingv1.NetworkPolicyEgressRule{ - networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{{ - NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"ns": "test"}, - }, - }}, - Ports: []networkingv1.NetworkPolicyPort{{ - Protocol: &udp, - Port: &intstr.IntOrString{ - StrVal: "8001", - }, - }}, - }, - }, - }, - } - - npMgr.Lock() - if err := npMgr.AddNetworkPolicy(allowIngress); err != nil { - t.Errorf("TestUpdateNetworkPolicy failed @ AddNetworkPolicy") - } - - if err := npMgr.UpdateNetworkPolicy(allowIngress, allowEgress); err != nil { - t.Errorf("TestUpdateNetworkPolicy failed @ UpdateNetworkPolicy") - } - npMgr.Unlock() -} - -func TestDeleteNetworkPolicy(t *testing.T) { - npMgr := &NetworkPolicyManager{ - NsMap: make(map[string]*Namespace), - PodMap: make(map[string]*NpmPod), - RawNpMap: make(map[string]*networkingv1.NetworkPolicy), - ProcessedNpMap: make(map[string]*networkingv1.NetworkPolicy), - TelemetryEnabled: false, - } - - allNs, err := newNs(util.KubeAllNamespacesFlag) - if err != nil { - panic(err.Error) - } - npMgr.NsMap[util.KubeAllNamespacesFlag] = allNs - - iptMgr := iptm.NewIptablesManager() - if err := iptMgr.Save(util.IptablesTestConfigFile); err != nil { - t.Errorf("TestDeleteNetworkPolicy failed @ iptMgr.Save") - } - - ipsMgr := ipsm.NewIpsetManager() - if err := ipsMgr.Save(util.IpsetTestConfigFile); err != nil { - t.Errorf("TestDeleteNetworkPolicy failed @ ipsMgr.Save") - } - - defer func() { - if err := iptMgr.Restore(util.IptablesTestConfigFile); err != nil { - t.Errorf("TestDeleteNetworkPolicy failed @ iptMgr.Restore") - } - - if err := ipsMgr.Restore(util.IpsetTestConfigFile); err != nil { - t.Errorf("TestDeleteNetworkPolicy failed @ ipsMgr.Restore") - } - }() - - // Create ns-kube-system set - if err := ipsMgr.CreateSet("ns-"+util.KubeSystemFlag, append([]string{util.IpsetNetHashFlag})); err != nil { - t.Errorf("TestDeleteNetworkPolicy failed @ ipsMgr.CreateSet, adding kube-system set%+v", err) - } - - tcp := corev1.ProtocolTCP - allow := &networkingv1.NetworkPolicy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "allow-ingress", - Namespace: "test-nwpolicy", - }, - Spec: networkingv1.NetworkPolicySpec{ - Ingress: []networkingv1.NetworkPolicyIngressRule{ - networkingv1.NetworkPolicyIngressRule{ - From: []networkingv1.NetworkPolicyPeer{{ - PodSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "test"}, - }, - }}, - Ports: []networkingv1.NetworkPolicyPort{{ - Protocol: &tcp, - Port: &intstr.IntOrString{ - StrVal: "8000", - }, - }}, - }, - }, - }, - } - - npMgr.Lock() - if err := npMgr.AddNetworkPolicy(allow); err != nil { - t.Errorf("TestAddNetworkPolicy failed @ AddNetworkPolicy") - } - - gaugeVal, err1 := promutil.GetValue(metrics.NumPolicies) - - if err := npMgr.DeleteNetworkPolicy(allow); err != nil { - t.Errorf("TestDeleteNetworkPolicy failed @ DeleteNetworkPolicy") - } - npMgr.Unlock() - - newGaugeVal, err2 := promutil.GetValue(metrics.NumPolicies) - promutil.NotifyIfErrors(t, err1, err2) - if newGaugeVal != gaugeVal-1 { - t.Errorf("Change in policy number didn't register in prometheus") - } -} -func TestGetNetworkPolicyKey(t *testing.T) { - npObj := &networkingv1.NetworkPolicy{ - ObjectMeta: metav1.ObjectMeta{ - Name: "allow-egress", - Namespace: "test-nwpolicy", - }, - Spec: networkingv1.NetworkPolicySpec{ - Egress: []networkingv1.NetworkPolicyEgressRule{ - networkingv1.NetworkPolicyEgressRule{ - To: []networkingv1.NetworkPolicyPeer{{ - NamespaceSelector: &metav1.LabelSelector{ - MatchLabels: map[string]string{"ns": "test"}, - }, - }}, - }, - }, - }, - } - - netpolKey := GetNetworkPolicyKey(npObj) - - if netpolKey == "" { - t.Errorf("TestGetNetworkPolicyKey failed @ netpolKey length check %s", netpolKey) - } - - expectedKey := util.GetNSNameWithPrefix("test-nwpolicy/allow-egress") - if netpolKey != expectedKey { - t.Errorf("TestGetNetworkPolicyKey failed @ netpolKey did not match expected value %s", netpolKey) - } -} diff --git a/npm/parsePolicy.go b/npm/parsePolicy.go index 43d1d31d59..17f7c482b1 100644 --- a/npm/parsePolicy.go +++ b/npm/parsePolicy.go @@ -10,6 +10,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// compare all fields including name of two network policies, which network policy controller need to care about. +func isSameNetworkPolicy(old, new *networkingv1.NetworkPolicy) bool { + if old.ObjectMeta.Name != new.ObjectMeta.Name { + return false + } + return isSamePolicy(old, new) +} + +// (TODO): isSamePolicy function does not compare name of two network policies since trying to reduce the number of rules if below three conditions are the same. +// Will optimize networkPolicyController code with addPolicy and deductPolicy functions if possible. Refer to https://github.com/Azure/azure-container-networking/pull/390 func isSamePolicy(old, new *networkingv1.NetworkPolicy) bool { if !reflect.DeepEqual(old.TypeMeta, new.TypeMeta) { return false @@ -27,9 +37,8 @@ func isSamePolicy(old, new *networkingv1.NetworkPolicy) bool { } // addPolicy merges policies based on labels. +// if namespace matches && podSelector matches, then merge two network policies. Otherwise, return as is. func addPolicy(old, new *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) { - // if namespace matches && podSelector matches, then merge - // else return as is. if !reflect.DeepEqual(old.TypeMeta, new.TypeMeta) { return nil, fmt.Errorf("Old and new networkpolicies don't have the same TypeMeta") } @@ -63,6 +72,7 @@ func addPolicy(old, new *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolic } } + // (TODO): It seems ingress and egress may have duplicated fields, but in translatePolicy, it seems duplicated fields are removed. ingress := append(old.Spec.Ingress, new.Spec.Ingress...) egress := append(old.Spec.Egress, new.Spec.Egress...) addedPolicy.Spec.Ingress = ingress