diff --git a/npm/iptm/iptm.go b/npm/iptm/iptm.go index 8344f39c6e..db797f1d01 100644 --- a/npm/iptm/iptm.go +++ b/npm/iptm/iptm.go @@ -1,12 +1,22 @@ +/* + +Part of this file is modified from iptables package from Kuberenetes. +https://github.com/kubernetes/kubernetes/blob/master/pkg/util/iptables + +*/ package iptm import ( "os" "os/exec" "syscall" + "time" + + "golang.org/x/sys/unix" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/npm/util" + "k8s.io/apimachinery/pkg/util/wait" ) // IptEntry represents an iptables rule. @@ -320,7 +330,7 @@ func (iptMgr *IptablesManager) Delete(entry *IptEntry) error { // Run execute an iptables command to update iptables. func (iptMgr *IptablesManager) Run(entry *IptEntry) (int, error) { cmdName := util.Iptables - cmdArgs := append([]string{iptMgr.OperationFlag, entry.Chain}, entry.Specs...) + cmdArgs := append([]string{util.IptablesWaitFlag, iptMgr.OperationFlag, entry.Chain}, entry.Specs...) cmdOut, err := exec.Command(cmdName, cmdArgs...).Output() log.Printf("%s\n", string(cmdOut)) @@ -343,6 +353,17 @@ func (iptMgr *IptablesManager) Save(configFile string) error { configFile = util.IptablesConfigFile } + l, err := grabIptablesLocks() + if err != nil { + return err + } + + defer func(l *os.File) { + if err = l.Close(); err != nil { + log.Printf("Failed to close iptables locks") + } + }(l) + // create the config file for writing f, err := os.Create(configFile) if err != nil { @@ -354,7 +375,7 @@ func (iptMgr *IptablesManager) Save(configFile string) error { cmd := exec.Command(util.IptablesSave) cmd.Stdout = f if err := cmd.Start(); err != nil { - log.Printf("Error running iptables-save.\n") + log.Printf("Error running iptables-save.") return err } cmd.Wait() @@ -368,6 +389,17 @@ func (iptMgr *IptablesManager) Restore(configFile string) error { configFile = util.IptablesConfigFile } + l, err := grabIptablesLocks() + if err != nil { + return err + } + + defer func(l *os.File) { + if err = l.Close(); err != nil { + log.Printf("Failed to close iptables locks") + } + }(l) + // open the config file for reading f, err := os.Open(configFile) if err != nil { @@ -386,3 +418,41 @@ func (iptMgr *IptablesManager) Restore(configFile string) error { return nil } + +// grabs iptables v1.6 xtable lock +func grabIptablesLocks() (*os.File, error) { + var success bool + + l := &os.File{} + defer func(l *os.File) { + // Clean up immediately on failure + if !success { + l.Close() + } + }(l) + + // Grab 1.6.x style lock. + l, err := os.OpenFile(util.IptablesLockFile, os.O_CREATE, 0600) + if err != nil { + log.Printf("failed to open iptables lock") + return nil, err + } + + if err := wait.PollImmediate(200*time.Millisecond, 2*time.Second, func() (bool, error) { + if err := grabIptablesFileLock(l); err != nil { + return false, nil + } + + return true, nil + }); err != nil { + log.Printf("failed to acquire new iptables lock: %v", err) + return nil, err + } + + success = true + return l, nil +} + +func grabIptablesFileLock(f *os.File) error { + return unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB) +} diff --git a/npm/namespace.go b/npm/namespace.go index 9bb855ff20..9ce7548811 100644 --- a/npm/namespace.go +++ b/npm/namespace.go @@ -121,8 +121,6 @@ func (npMgr *NetworkPolicyManager) AddNamespace(nsObj *corev1.Namespace) error { } npMgr.nsMap[nsName] = ns - npMgr.clusterState.NsCount++ - return nil } @@ -203,7 +201,5 @@ func (npMgr *NetworkPolicyManager) DeleteNamespace(nsObj *corev1.Namespace) erro delete(npMgr.nsMap, nsName) - npMgr.clusterState.NsCount-- - return nil } diff --git a/npm/namespace_test.go b/npm/namespace_test.go index bc466cdb97..9cc69176dd 100644 --- a/npm/namespace_test.go +++ b/npm/namespace_test.go @@ -45,7 +45,8 @@ func TestAllNsList(t *testing.T) { func TestAddNamespace(t *testing.T) { npMgr := &NetworkPolicyManager{ - nsMap: make(map[string]*namespace), + nsMap: make(map[string]*namespace), + TelemetryEnabled: false, reportManager: &telemetry.ReportManager{ HostNetAgentURL: hostNetAgentURLForNpm, ContentType: contentType, @@ -86,7 +87,8 @@ func TestAddNamespace(t *testing.T) { func TestUpdateNamespace(t *testing.T) { npMgr := &NetworkPolicyManager{ - nsMap: make(map[string]*namespace), + nsMap: make(map[string]*namespace), + TelemetryEnabled: false, reportManager: &telemetry.ReportManager{ HostNetAgentURL: hostNetAgentURLForNpm, ContentType: contentType, @@ -140,7 +142,8 @@ func TestUpdateNamespace(t *testing.T) { func TestDeleteNamespace(t *testing.T) { npMgr := &NetworkPolicyManager{ - nsMap: make(map[string]*namespace), + nsMap: make(map[string]*namespace), + TelemetryEnabled: false, reportManager: &telemetry.ReportManager{ HostNetAgentURL: hostNetAgentURLForNpm, ContentType: contentType, diff --git a/npm/npm.go b/npm/npm.go index 32c31ba8f8..57f2e307ac 100644 --- a/npm/npm.go +++ b/npm/npm.go @@ -14,6 +14,7 @@ import ( "github.com/Azure/azure-container-networking/telemetry" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/version" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" @@ -22,9 +23,10 @@ import ( "k8s.io/client-go/tools/cache" ) -var ( - hostNetAgentURLForNpm = "http://168.63.129.16/machine/plugins?comp=netagent&type=npmreport" - contentType = "application/json" +const ( + hostNetAgentURLForNpm = "http://168.63.129.16/machine/plugins?comp=netagent&type=npmreport" + contentType = "application/json" + telemetryRetryWaitTimeInSeconds = 60 ) // NetworkPolicyManager contains informers for pod, namespace and networkpolicy. @@ -44,17 +46,41 @@ type NetworkPolicyManager struct { clusterState telemetry.ClusterState reportManager *telemetry.ReportManager - serverVersion *version.Info + serverVersion *version.Info + TelemetryEnabled bool } // GetClusterState returns current cluster state. func (npMgr *NetworkPolicyManager) GetClusterState() telemetry.ClusterState { + pods, err := npMgr.clientset.CoreV1().Pods("").List(metav1.ListOptions{}) + if err != nil { + log.Printf("Error Listing pods in GetClusterState") + } + + namespaces, err := npMgr.clientset.CoreV1().Namespaces().List(metav1.ListOptions{}) + if err != nil { + log.Printf("Error Listing namespaces in GetClusterState") + } + + networkpolicies, err := npMgr.clientset.NetworkingV1().NetworkPolicies("").List(metav1.ListOptions{}) + if err != nil { + log.Printf("Error Listing networkpolicies in GetClusterState") + } + + npMgr.clusterState.PodCount = len(pods.Items) + npMgr.clusterState.NsCount = len(namespaces.Items) + npMgr.clusterState.NwPolicyCount = len(networkpolicies.Items) + return npMgr.clusterState } // UpdateAndSendReport updates the npm report then send it. // This function should only be called when npMgr is locked. func (npMgr *NetworkPolicyManager) UpdateAndSendReport(err error, eventMsg string) error { + if !npMgr.TelemetryEnabled { + return nil + } + clusterState := npMgr.GetClusterState() v := reflect.ValueOf(npMgr.reportManager.Report).Elem().FieldByName("ClusterState") if v.CanSet() { @@ -69,7 +95,10 @@ func (npMgr *NetworkPolicyManager) UpdateAndSendReport(err error, eventMsg strin reflect.ValueOf(npMgr.reportManager.Report).Elem().FieldByName("EventMessage").SetString(err.Error()) } - return npMgr.reportManager.SendReport(nil) + var telemetryBuffer *telemetry.TelemetryBuffer + connectToTelemetryServer(telemetryBuffer) + + return npMgr.reportManager.SendReport(telemetryBuffer) } // Run starts shared informers and waits for the shared informer cache to sync. @@ -93,8 +122,33 @@ func (npMgr *NetworkPolicyManager) Run(stopCh <-chan struct{}) error { return nil } +func connectToTelemetryServer(telemetryBuffer *telemetry.TelemetryBuffer) { + for { + telemetryBuffer = telemetry.NewTelemetryBuffer("") + err := telemetryBuffer.StartServer() + if err == nil || telemetryBuffer.FdExists { + connErr := telemetryBuffer.Connect() + if connErr == nil { + break + } + + log.Printf("[NPM-Telemetry] Failed to establish telemetry manager connection.") + time.Sleep(time.Second * telemetryRetryWaitTimeInSeconds) + } + } +} + // RunReportManager starts NPMReportManager and send telemetry periodically. func (npMgr *NetworkPolicyManager) RunReportManager() { + if !npMgr.TelemetryEnabled { + return + } + + var telemetryBuffer *telemetry.TelemetryBuffer + connectToTelemetryServer(telemetryBuffer) + + go telemetryBuffer.BufferAndPushData(time.Duration(0)) + for { clusterState := npMgr.GetClusterState() v := reflect.ValueOf(npMgr.reportManager.Report).Elem().FieldByName("ClusterState") @@ -104,11 +158,12 @@ func (npMgr *NetworkPolicyManager) RunReportManager() { v.FieldByName("NwPolicyCount").SetInt(int64(clusterState.NwPolicyCount)) } - if err := npMgr.reportManager.SendReport(nil); err != nil { - log.Printf("Error sending NPM telemetry report") + if err := npMgr.reportManager.SendReport(telemetryBuffer); err != nil { + log.Printf("[NPM-Telemetry] Error sending NPM telemetry report") + connectToTelemetryServer(telemetryBuffer) } - time.Sleep(1 * time.Minute) + time.Sleep(5 * time.Minute) } } @@ -150,7 +205,8 @@ func NewNetworkPolicyManager(clientset *kubernetes.Clientset, informerFactory in ContentType: contentType, Report: &telemetry.NPMReport{}, }, - serverVersion: serverVersion, + serverVersion: serverVersion, + TelemetryEnabled: true, } clusterID := util.GetClusterID(npMgr.nodeName) diff --git a/npm/nwpolicy.go b/npm/nwpolicy.go index 2568bb6838..77b009b75e 100644 --- a/npm/nwpolicy.go +++ b/npm/nwpolicy.go @@ -72,8 +72,6 @@ func (npMgr *NetworkPolicyManager) AddNetworkPolicy(npObj *networkingv1.NetworkP allNs.npMap[npName] = npObj - npMgr.clusterState.NwPolicyCount++ - ns, err := newNs(npNs) if err != nil { log.Printf("Error creating namespace %s\n", npNs) @@ -141,8 +139,6 @@ func (npMgr *NetworkPolicyManager) DeleteNetworkPolicy(npObj *networkingv1.Netwo delete(allNs.npMap, npName) - npMgr.clusterState.NwPolicyCount-- - if len(allNs.npMap) == 0 { if err = iptMgr.UninitNpmChains(); err != nil { log.Printf("Error uninitialize azure-npm chains.\n") diff --git a/npm/nwpolicy_test.go b/npm/nwpolicy_test.go index d17a1bbbae..1c01d8cc5b 100644 --- a/npm/nwpolicy_test.go +++ b/npm/nwpolicy_test.go @@ -18,7 +18,8 @@ import ( func TestAddNetworkPolicy(t *testing.T) { npMgr := &NetworkPolicyManager{ - nsMap: make(map[string]*namespace), + nsMap: make(map[string]*namespace), + TelemetryEnabled: false, reportManager: &telemetry.ReportManager{ HostNetAgentURL: hostNetAgentURLForNpm, ContentType: contentType, @@ -97,7 +98,8 @@ func TestAddNetworkPolicy(t *testing.T) { func TestUpdateNetworkPolicy(t *testing.T) { npMgr := &NetworkPolicyManager{ - nsMap: make(map[string]*namespace), + nsMap: make(map[string]*namespace), + TelemetryEnabled: false, reportManager: &telemetry.ReportManager{ HostNetAgentURL: hostNetAgentURLForNpm, ContentType: contentType, @@ -204,7 +206,8 @@ func TestUpdateNetworkPolicy(t *testing.T) { func TestDeleteNetworkPolicy(t *testing.T) { npMgr := &NetworkPolicyManager{ - nsMap: make(map[string]*namespace), + nsMap: make(map[string]*namespace), + TelemetryEnabled: false, reportManager: &telemetry.ReportManager{ HostNetAgentURL: hostNetAgentURLForNpm, ContentType: contentType, diff --git a/npm/parse.go b/npm/parse.go index 83ecdb708b..977256c6a8 100644 --- a/npm/parse.go +++ b/npm/parse.go @@ -573,7 +573,7 @@ func parseEgress(ns string, targetSets []string, rules []networkingv1.NetworkPol util.IptablesSetFlag, util.IptablesMatchSetFlag, hashedTargetSetName, - util.IptablesDstFlag, + util.IptablesSrcFlag, util.IptablesJumpFlag, util.IptablesAzureEgressToNsChain, }, diff --git a/npm/plugin/main.go b/npm/plugin/main.go index 795eeac7fe..7c3cd9c16e 100644 --- a/npm/plugin/main.go +++ b/npm/plugin/main.go @@ -63,6 +63,8 @@ func main() { panic(err.Error) } + // Disable Azure-NPM telemetry for now since it might throttle wireserver. + npMgr.TelemetryEnabled = false go npMgr.RunReportManager() select {} diff --git a/npm/pod.go b/npm/pod.go index 5dba4e8882..5cb4f67f85 100644 --- a/npm/pod.go +++ b/npm/pod.go @@ -72,8 +72,6 @@ func (npMgr *NetworkPolicyManager) AddPod(podObj *corev1.Pod) error { labelKeys = append(labelKeys, labelKey) } - npMgr.clusterState.PodCount++ - ns, err := newNs(podNs) if err != nil { log.Printf("Error creating namespace %s\n", podNs) @@ -172,7 +170,5 @@ func (npMgr *NetworkPolicyManager) DeletePod(podObj *corev1.Pod) error { } } - npMgr.clusterState.PodCount-- - return nil } diff --git a/npm/pod_test.go b/npm/pod_test.go index 351b459666..6330c1e4ce 100644 --- a/npm/pod_test.go +++ b/npm/pod_test.go @@ -37,7 +37,8 @@ func TestisSystemPod(t *testing.T) { func TestAddPod(t *testing.T) { npMgr := &NetworkPolicyManager{ - nsMap: make(map[string]*namespace), + nsMap: make(map[string]*namespace), + TelemetryEnabled: false, reportManager: &telemetry.ReportManager{ HostNetAgentURL: hostNetAgentURLForNpm, ContentType: contentType, @@ -81,7 +82,8 @@ func TestAddPod(t *testing.T) { func TestUpdatePod(t *testing.T) { npMgr := &NetworkPolicyManager{ - nsMap: make(map[string]*namespace), + nsMap: make(map[string]*namespace), + TelemetryEnabled: false, reportManager: &telemetry.ReportManager{ HostNetAgentURL: hostNetAgentURLForNpm, ContentType: contentType, @@ -143,7 +145,8 @@ func TestUpdatePod(t *testing.T) { func TestDeletePod(t *testing.T) { npMgr := &NetworkPolicyManager{ - nsMap: make(map[string]*namespace), + nsMap: make(map[string]*namespace), + TelemetryEnabled: false, reportManager: &telemetry.ReportManager{ HostNetAgentURL: hostNetAgentURLForNpm, ContentType: contentType, diff --git a/npm/util/const.go b/npm/util/const.go index 02b2005c6b..880b893d2f 100644 --- a/npm/util/const.go +++ b/npm/util/const.go @@ -23,6 +23,7 @@ const ( IptablesRestore string = "iptables-restore" IptablesConfigFile string = "/var/log/iptables.conf" IptablesTestConfigFile string = "/var/log/iptables-test.conf" + IptablesLockFile string = "/run/xtables.lock" IptablesChainCreationFlag string = "-N" IptablesInsertionFlag string = "-I" IptablesAppendFlag string = "-A" @@ -31,6 +32,7 @@ const ( IptablesCheckFlag string = "-C" IptablesDestroyFlag string = "-X" IptablesJumpFlag string = "-j" + IptablesWaitFlag string = "-w" IptablesAccept string = "ACCEPT" IptablesReject string = "REJECT" IptablesDrop string = "DROP"