diff --git a/npm/namespace.go b/npm/namespace.go index 58f2ccb32b..fc824105cc 100644 --- a/npm/namespace.go +++ b/npm/namespace.go @@ -55,7 +55,7 @@ func (ns *namespace) policyExists(npObj *networkingv1.NetworkPolicy) bool { // InitAllNsList syncs all-namespace ipset list. func (npMgr *NetworkPolicyManager) InitAllNsList() error { allNs := npMgr.nsMap[util.KubeAllNamespacesFlag] - for ns:= range npMgr.nsMap { + for ns := range npMgr.nsMap { if ns == util.KubeAllNamespacesFlag { continue } @@ -88,12 +88,9 @@ func (npMgr *NetworkPolicyManager) UninitAllNsList() error { // AddNamespace handles adding namespace to ipset. func (npMgr *NetworkPolicyManager) AddNamespace(nsObj *corev1.Namespace) error { - npMgr.Lock() - defer npMgr.Unlock() - var err error - nsName, nsLabel := "ns-" + nsObj.ObjectMeta.Name, nsObj.ObjectMeta.Labels + nsName, nsLabel := "ns-"+nsObj.ObjectMeta.Name, nsObj.ObjectMeta.Labels log.Printf("NAMESPACE CREATING: [%s/%v]", nsName, nsLabel) ipsMgr := npMgr.nsMap[util.KubeAllNamespacesFlag].ipsMgr @@ -139,8 +136,8 @@ func (npMgr *NetworkPolicyManager) AddNamespace(nsObj *corev1.Namespace) error { func (npMgr *NetworkPolicyManager) UpdateNamespace(oldNsObj *corev1.Namespace, newNsObj *corev1.Namespace) error { var err error - oldNsNs, oldNsLabel := "ns-" + oldNsObj.ObjectMeta.Name, oldNsObj.ObjectMeta.Labels - newNsNs, newNsLabel := "ns-" + newNsObj.ObjectMeta.Name, newNsObj.ObjectMeta.Labels + oldNsNs, oldNsLabel := "ns-"+oldNsObj.ObjectMeta.Name, oldNsObj.ObjectMeta.Labels + newNsNs, newNsLabel := "ns-"+newNsObj.ObjectMeta.Name, newNsObj.ObjectMeta.Labels log.Printf( "NAMESPACE UPDATING:\n old namespace: [%s/%v]\n new namespace: [%s/%v]", oldNsNs, oldNsLabel, newNsNs, newNsLabel, @@ -161,12 +158,9 @@ func (npMgr *NetworkPolicyManager) UpdateNamespace(oldNsObj *corev1.Namespace, n // DeleteNamespace handles deleting namespace from ipset. func (npMgr *NetworkPolicyManager) DeleteNamespace(nsObj *corev1.Namespace) error { - npMgr.Lock() - defer npMgr.Unlock() - var err error - nsName, nsLabel := "ns-" + nsObj.ObjectMeta.Name, nsObj.ObjectMeta.Labels + nsName, nsLabel := "ns-"+nsObj.ObjectMeta.Name, nsObj.ObjectMeta.Labels log.Printf("NAMESPACE DELETING: [%s/%v]", nsName, nsLabel) _, exists := npMgr.nsMap[nsName] diff --git a/npm/namespace_test.go b/npm/namespace_test.go index 307a9267bc..5d0922f186 100644 --- a/npm/namespace_test.go +++ b/npm/namespace_test.go @@ -3,8 +3,8 @@ package npm import ( - "testing" "os" + "testing" "github.com/Azure/azure-container-networking/npm/iptm" @@ -75,9 +75,11 @@ func TestAddNamespace(t *testing.T) { }, } + npMgr.Lock() if err := npMgr.AddNamespace(nsObj); err != nil { t.Errorf("TestAddNamespace @ npMgr.AddNamespace") } + npMgr.Unlock() } func TestUpdateNamespace(t *testing.T) { @@ -121,6 +123,7 @@ func TestUpdateNamespace(t *testing.T) { }, } + npMgr.Lock() if err := npMgr.AddNamespace(oldNsObj); err != nil { t.Errorf("TestUpdateNamespace failed @ npMgr.AddNamespace") } @@ -128,6 +131,7 @@ func TestUpdateNamespace(t *testing.T) { if err := npMgr.UpdateNamespace(oldNsObj, newNsObj); err != nil { t.Errorf("TestUpdateNamespace failed @ npMgr.UpdateNamespace") } + npMgr.Unlock() } func TestDeleteNamespace(t *testing.T) { @@ -162,6 +166,7 @@ func TestDeleteNamespace(t *testing.T) { }, } + npMgr.Lock() if err := npMgr.AddNamespace(nsObj); err != nil { t.Errorf("TestDeleteNamespace @ npMgr.AddNamespace") } @@ -169,6 +174,7 @@ func TestDeleteNamespace(t *testing.T) { if err := npMgr.DeleteNamespace(nsObj); err != nil { t.Errorf("TestDeleteNamespace @ npMgr.DeleteNamespace") } + npMgr.Unlock() } func TestMain(m *testing.M) { diff --git a/npm/npm.go b/npm/npm.go index eda077eb8b..4591186d40 100644 --- a/npm/npm.go +++ b/npm/npm.go @@ -49,8 +49,8 @@ type NetworkPolicyManager struct { isAzureNpmChainCreated bool isSafeToCleanUpAzureNpmChain bool - clusterState telemetry.ClusterState - version string + clusterState telemetry.ClusterState + version string serverVersion *version.Info TelemetryEnabled bool @@ -93,26 +93,26 @@ func (npMgr *NetworkPolicyManager) SendAiMetrics() { GetEnvRetryCount: 5, GetEnvRetryWaitTimeInSecs: 3, } - - th, err = aitelemetry.NewAITelemetry("", aiMetadata, aiConfig) - heartbeat = time.NewTicker(time.Minute * heartbeatIntervalInMinutes).C + + th, err = aitelemetry.NewAITelemetry("", aiMetadata, aiConfig) + heartbeat = time.NewTicker(time.Minute * heartbeatIntervalInMinutes).C customDimensions = map[string]string{"ClusterID": util.GetClusterID(npMgr.nodeName), - "APIServer": npMgr.serverVersion.String()} + "APIServer": npMgr.serverVersion.String()} podCount = aitelemetry.Metric{ - Name: "PodCount", + Name: "PodCount", CustomDimensions: customDimensions, } nsCount = aitelemetry.Metric{ - Name: "NsCount", + Name: "NsCount", CustomDimensions: customDimensions, } nwPolicyCount = aitelemetry.Metric{ - Name: "NwPolicyCount", + Name: "NwPolicyCount", CustomDimensions: customDimensions, } ) - for i := 0; err != nil && i < 5; i++{ + for i := 0; err != nil && i < 5; i++ { log.Logf("Failed to init AppInsights with err: %+v", err) time.Sleep(time.Minute * 5) th, err = aitelemetry.NewAITelemetry("", aiMetadata, aiConfig) @@ -123,7 +123,7 @@ func (npMgr *NetworkPolicyManager) SendAiMetrics() { defer th.Close(10) - for { + for { <-heartbeat clusterState := npMgr.GetClusterState() podCount.Value = float64(clusterState.PodCount) @@ -206,7 +206,7 @@ func NewNetworkPolicyManager(clientset *kubernetes.Clientset, informerFactory in err error ) - for ticker, start := time.NewTicker(1 * time.Second).C, time.Now(); time.Since(start) < time.Minute * 1; { + for ticker, start := time.NewTicker(1*time.Second).C, time.Now(); time.Since(start) < time.Minute*1; { <-ticker serverVersion, err = clientset.ServerVersion() if err == nil { @@ -239,7 +239,7 @@ func NewNetworkPolicyManager(clientset *kubernetes.Clientset, informerFactory in NsCount: 0, NwPolicyCount: 0, }, - version: npmVersion, + version: npmVersion, serverVersion: serverVersion, TelemetryEnabled: true, } @@ -257,13 +257,19 @@ func NewNetworkPolicyManager(clientset *kubernetes.Clientset, informerFactory in // Pod event handlers cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { + npMgr.Lock() npMgr.AddPod(obj.(*corev1.Pod)) + npMgr.Unlock() }, UpdateFunc: func(old, new interface{}) { + npMgr.Lock() npMgr.UpdatePod(old.(*corev1.Pod), new.(*corev1.Pod)) + npMgr.Unlock() }, DeleteFunc: func(obj interface{}) { + npMgr.Lock() npMgr.DeletePod(obj.(*corev1.Pod)) + npMgr.Unlock() }, }, ) @@ -272,13 +278,19 @@ func NewNetworkPolicyManager(clientset *kubernetes.Clientset, informerFactory in // Namespace event handlers cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { + npMgr.Lock() npMgr.AddNamespace(obj.(*corev1.Namespace)) + npMgr.Unlock() }, UpdateFunc: func(old, new interface{}) { + npMgr.Lock() npMgr.UpdateNamespace(old.(*corev1.Namespace), new.(*corev1.Namespace)) + npMgr.Unlock() }, DeleteFunc: func(obj interface{}) { + npMgr.Lock() npMgr.DeleteNamespace(obj.(*corev1.Namespace)) + npMgr.Unlock() }, }, ) @@ -287,13 +299,19 @@ func NewNetworkPolicyManager(clientset *kubernetes.Clientset, informerFactory in // Network policy event handlers cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { + npMgr.Lock() npMgr.AddNetworkPolicy(obj.(*networkingv1.NetworkPolicy)) + npMgr.Unlock() }, UpdateFunc: func(old, new interface{}) { + npMgr.Lock() npMgr.UpdateNetworkPolicy(old.(*networkingv1.NetworkPolicy), new.(*networkingv1.NetworkPolicy)) + npMgr.Unlock() }, DeleteFunc: func(obj interface{}) { + npMgr.Lock() npMgr.DeleteNetworkPolicy(obj.(*networkingv1.NetworkPolicy)) + npMgr.Unlock() }, }, ) diff --git a/npm/nwpolicy.go b/npm/nwpolicy.go index ae3f3bf630..b4c1e432ae 100644 --- a/npm/nwpolicy.go +++ b/npm/nwpolicy.go @@ -25,9 +25,6 @@ func (npMgr *NetworkPolicyManager) canCleanUpNpmChains() bool { // AddNetworkPolicy handles adding network policy to iptables. func (npMgr *NetworkPolicyManager) AddNetworkPolicy(npObj *networkingv1.NetworkPolicy) error { - npMgr.Lock() - defer npMgr.Unlock() - var ( err error ns *namespace @@ -75,9 +72,7 @@ func (npMgr *NetworkPolicyManager) AddNetworkPolicy(npObj *networkingv1.NetworkP log.Printf("Error adding policy %s to %s", npName, oldPolicy.ObjectMeta.Name) } npMgr.isSafeToCleanUpAzureNpmChain = false - npMgr.Unlock() npMgr.DeleteNetworkPolicy(oldPolicy) - npMgr.Lock() npMgr.isSafeToCleanUpAzureNpmChain = true } else { ns.processedNpMap[hashedSelector] = npObj @@ -140,9 +135,6 @@ func (npMgr *NetworkPolicyManager) UpdateNetworkPolicy(oldNpObj *networkingv1.Ne // DeleteNetworkPolicy handles deleting network policy from iptables. func (npMgr *NetworkPolicyManager) DeleteNetworkPolicy(npObj *networkingv1.NetworkPolicy) error { - npMgr.Lock() - defer npMgr.Unlock() - var ( err error ns *namespace diff --git a/npm/nwpolicy_test.go b/npm/nwpolicy_test.go index fdd05571fd..38333e906f 100644 --- a/npm/nwpolicy_test.go +++ b/npm/nwpolicy_test.go @@ -61,9 +61,11 @@ func TestAddNetworkPolicy(t *testing.T) { }, } + npMgr.Lock() if err := npMgr.AddNamespace(nsObj); err != nil { t.Errorf("TestAddNetworkPolicy @ npMgr.AddNamespace") } + npMgr.Unlock() tcp := corev1.ProtocolTCP port8000 := intstr.FromInt(8000) @@ -82,17 +84,19 @@ func TestAddNetworkPolicy(t *testing.T) { }}, Ports: []networkingv1.NetworkPolicyPort{{ Protocol: &tcp, - Port: &port8000, + Port: &port8000, }}, }, }, }, } + npMgr.Lock() if err := npMgr.AddNetworkPolicy(allowIngress); err != nil { t.Errorf("TestAddNetworkPolicy failed @ allowIngress AddNetworkPolicy") t.Errorf("Error: %v", err) } + npMgr.Unlock() allowEgress := &networkingv1.NetworkPolicy{ ObjectMeta: metav1.ObjectMeta{ @@ -109,17 +113,19 @@ func TestAddNetworkPolicy(t *testing.T) { }}, Ports: []networkingv1.NetworkPolicyPort{{ Protocol: &tcp, - Port: &port8000, + Port: &port8000, }}, }, }, }, } + npMgr.Lock() if err := npMgr.AddNetworkPolicy(allowEgress); err != nil { t.Errorf("TestAddNetworkPolicy failed @ allowEgress AddNetworkPolicy") t.Errorf("Error: %v", err) } + npMgr.Unlock() } func TestUpdateNetworkPolicy(t *testing.T) { @@ -168,9 +174,11 @@ func TestUpdateNetworkPolicy(t *testing.T) { }, } + npMgr.Lock() if err := npMgr.AddNamespace(nsObj); err != nil { t.Errorf("TestUpdateNetworkPolicy @ npMgr.AddNamespace") } + npMgr.Unlock() tcp, udp := corev1.ProtocolTCP, corev1.ProtocolUDP allowIngress := &networkingv1.NetworkPolicy{ @@ -221,6 +229,7 @@ func TestUpdateNetworkPolicy(t *testing.T) { }, } + npMgr.Lock() if err := npMgr.AddNetworkPolicy(allowIngress); err != nil { t.Errorf("TestUpdateNetworkPolicy failed @ AddNetworkPolicy") } @@ -228,6 +237,7 @@ func TestUpdateNetworkPolicy(t *testing.T) { if err := npMgr.UpdateNetworkPolicy(allowIngress, allowEgress); err != nil { t.Errorf("TestUpdateNetworkPolicy failed @ UpdateNetworkPolicy") } + npMgr.Unlock() } func TestDeleteNetworkPolicy(t *testing.T) { @@ -276,9 +286,11 @@ func TestDeleteNetworkPolicy(t *testing.T) { }, } + npMgr.Lock() if err := npMgr.AddNamespace(nsObj); err != nil { t.Errorf("TestDeleteNetworkPolicy @ npMgr.AddNamespace") } + npMgr.Unlock() tcp := corev1.ProtocolTCP allow := &networkingv1.NetworkPolicy{ @@ -305,6 +317,7 @@ func TestDeleteNetworkPolicy(t *testing.T) { }, } + npMgr.Lock() if err := npMgr.AddNetworkPolicy(allow); err != nil { t.Errorf("TestAddNetworkPolicy failed @ AddNetworkPolicy") } @@ -312,4 +325,5 @@ func TestDeleteNetworkPolicy(t *testing.T) { if err := npMgr.DeleteNetworkPolicy(allow); err != nil { t.Errorf("TestDeleteNetworkPolicy failed @ DeleteNetworkPolicy") } + npMgr.Unlock() } diff --git a/npm/pod.go b/npm/pod.go index 3d95728dc3..7252943690 100644 --- a/npm/pod.go +++ b/npm/pod.go @@ -19,9 +19,6 @@ func isSystemPod(podObj *corev1.Pod) bool { // AddPod handles adding pod ip to its label's ipset. func (npMgr *NetworkPolicyManager) AddPod(podObj *corev1.Pod) error { - npMgr.Lock() - defer npMgr.Unlock() - if !isValidPod(podObj) { return nil } @@ -113,9 +110,6 @@ func (npMgr *NetworkPolicyManager) UpdatePod(oldPodObj, newPodObj *corev1.Pod) e // DeletePod handles deleting pod from its label's ipset. func (npMgr *NetworkPolicyManager) DeletePod(podObj *corev1.Pod) error { - npMgr.Lock() - defer npMgr.Unlock() - if !isValidPod(podObj) { return nil } diff --git a/npm/pod_test.go b/npm/pod_test.go index b095e56ac9..ba49a3bf63 100644 --- a/npm/pod_test.go +++ b/npm/pod_test.go @@ -69,9 +69,12 @@ func TestAddPod(t *testing.T) { PodIP: "1.2.3.4", }, } + + npMgr.Lock() if err := npMgr.AddPod(podObj); err != nil { t.Errorf("TestAddPod failed @ AddPod") } + npMgr.Unlock() } func TestUpdatePod(t *testing.T) { @@ -123,6 +126,7 @@ func TestUpdatePod(t *testing.T) { }, } + npMgr.Lock() if err := npMgr.AddPod(oldPodObj); err != nil { t.Errorf("TestUpdatePod failed @ AddPod") } @@ -130,6 +134,7 @@ func TestUpdatePod(t *testing.T) { if err := npMgr.UpdatePod(oldPodObj, newPodObj); err != nil { t.Errorf("TestUpdatePod failed @ UpdatePod") } + npMgr.Unlock() } func TestDeletePod(t *testing.T) { @@ -167,6 +172,8 @@ func TestDeletePod(t *testing.T) { PodIP: "1.2.3.4", }, } + + npMgr.Lock() if err := npMgr.AddPod(podObj); err != nil { t.Errorf("TestDeletePod failed @ AddPod") } @@ -174,4 +181,5 @@ func TestDeletePod(t *testing.T) { if err := npMgr.DeletePod(podObj); err != nil { t.Errorf("TestDeletePod failed @ DeletePod") } + npMgr.Unlock() }