Skip to content
74 changes: 72 additions & 2 deletions npm/iptm/iptm.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
/*

Part of this file is modified from iptables package from Kuberenetes.
https://github.com/kubernetes/kubernetes/blob/master/pkg/util/iptables

*/
package iptm

import (
"os"
"os/exec"
"syscall"
"time"

"golang.org/x/sys/unix"

"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/npm/util"
"k8s.io/apimachinery/pkg/util/wait"
)

// IptEntry represents an iptables rule.
Expand Down Expand Up @@ -320,7 +330,7 @@ func (iptMgr *IptablesManager) Delete(entry *IptEntry) error {
// Run execute an iptables command to update iptables.
func (iptMgr *IptablesManager) Run(entry *IptEntry) (int, error) {
cmdName := util.Iptables
cmdArgs := append([]string{iptMgr.OperationFlag, entry.Chain}, entry.Specs...)
cmdArgs := append([]string{util.IptablesWaitFlag, iptMgr.OperationFlag, entry.Chain}, entry.Specs...)

cmdOut, err := exec.Command(cmdName, cmdArgs...).Output()
log.Printf("%s\n", string(cmdOut))
Expand All @@ -343,6 +353,17 @@ func (iptMgr *IptablesManager) Save(configFile string) error {
configFile = util.IptablesConfigFile
}

l, err := grabIptablesLocks()
if err != nil {
return err
}

defer func(l *os.File) {
if err = l.Close(); err != nil {
log.Printf("Failed to close iptables locks")
}
}(l)

// create the config file for writing
f, err := os.Create(configFile)
if err != nil {
Expand All @@ -354,7 +375,7 @@ func (iptMgr *IptablesManager) Save(configFile string) error {
cmd := exec.Command(util.IptablesSave)
cmd.Stdout = f
if err := cmd.Start(); err != nil {
log.Printf("Error running iptables-save.\n")
log.Printf("Error running iptables-save.")
return err
}
cmd.Wait()
Expand All @@ -368,6 +389,17 @@ func (iptMgr *IptablesManager) Restore(configFile string) error {
configFile = util.IptablesConfigFile
}

l, err := grabIptablesLocks()
if err != nil {
return err
}

defer func(l *os.File) {
if err = l.Close(); err != nil {
log.Printf("Failed to close iptables locks")
}
}(l)

// open the config file for reading
f, err := os.Open(configFile)
if err != nil {
Expand All @@ -386,3 +418,41 @@ func (iptMgr *IptablesManager) Restore(configFile string) error {

return nil
}

// grabs iptables v1.6 xtable lock
func grabIptablesLocks() (*os.File, error) {
var success bool

l := &os.File{}
defer func(l *os.File) {
// Clean up immediately on failure
if !success {
l.Close()
}
}(l)

// Grab 1.6.x style lock.
l, err := os.OpenFile(util.IptablesLockFile, os.O_CREATE, 0600)
if err != nil {
log.Printf("failed to open iptables lock")
return nil, err
}

if err := wait.PollImmediate(200*time.Millisecond, 2*time.Second, func() (bool, error) {
if err := grabIptablesFileLock(l); err != nil {
return false, nil
}

return true, nil
}); err != nil {
log.Printf("failed to acquire new iptables lock: %v", err)
return nil, err
}

success = true
return l, nil
}

func grabIptablesFileLock(f *os.File) error {
return unix.Flock(int(f.Fd()), unix.LOCK_EX|unix.LOCK_NB)
}
4 changes: 0 additions & 4 deletions npm/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ func (npMgr *NetworkPolicyManager) AddNamespace(nsObj *corev1.Namespace) error {
}
npMgr.nsMap[nsName] = ns

npMgr.clusterState.NsCount++

return nil
}

Expand Down Expand Up @@ -203,7 +201,5 @@ func (npMgr *NetworkPolicyManager) DeleteNamespace(nsObj *corev1.Namespace) erro

delete(npMgr.nsMap, nsName)

npMgr.clusterState.NsCount--

return nil
}
9 changes: 6 additions & 3 deletions npm/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func TestAllNsList(t *testing.T) {

func TestAddNamespace(t *testing.T) {
npMgr := &NetworkPolicyManager{
nsMap: make(map[string]*namespace),
nsMap: make(map[string]*namespace),
TelemetryEnabled: false,
reportManager: &telemetry.ReportManager{
HostNetAgentURL: hostNetAgentURLForNpm,
ContentType: contentType,
Expand Down Expand Up @@ -86,7 +87,8 @@ func TestAddNamespace(t *testing.T) {

func TestUpdateNamespace(t *testing.T) {
npMgr := &NetworkPolicyManager{
nsMap: make(map[string]*namespace),
nsMap: make(map[string]*namespace),
TelemetryEnabled: false,
reportManager: &telemetry.ReportManager{
HostNetAgentURL: hostNetAgentURLForNpm,
ContentType: contentType,
Expand Down Expand Up @@ -140,7 +142,8 @@ func TestUpdateNamespace(t *testing.T) {

func TestDeleteNamespace(t *testing.T) {
npMgr := &NetworkPolicyManager{
nsMap: make(map[string]*namespace),
nsMap: make(map[string]*namespace),
TelemetryEnabled: false,
reportManager: &telemetry.ReportManager{
HostNetAgentURL: hostNetAgentURLForNpm,
ContentType: contentType,
Expand Down
74 changes: 65 additions & 9 deletions npm/npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Azure/azure-container-networking/telemetry"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
Expand All @@ -22,9 +23,10 @@ import (
"k8s.io/client-go/tools/cache"
)

var (
hostNetAgentURLForNpm = "http://168.63.129.16/machine/plugins?comp=netagent&type=npmreport"
contentType = "application/json"
const (
hostNetAgentURLForNpm = "http://168.63.129.16/machine/plugins?comp=netagent&type=npmreport"
contentType = "application/json"
telemetryRetryWaitTimeInSeconds = 60
)

// NetworkPolicyManager contains informers for pod, namespace and networkpolicy.
Expand All @@ -44,17 +46,41 @@ type NetworkPolicyManager struct {
clusterState telemetry.ClusterState
reportManager *telemetry.ReportManager

serverVersion *version.Info
serverVersion *version.Info
TelemetryEnabled bool
}

// GetClusterState returns current cluster state.
func (npMgr *NetworkPolicyManager) GetClusterState() telemetry.ClusterState {
pods, err := npMgr.clientset.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
log.Printf("Error Listing pods in GetClusterState")
}

namespaces, err := npMgr.clientset.CoreV1().Namespaces().List(metav1.ListOptions{})
if err != nil {
log.Printf("Error Listing namespaces in GetClusterState")
}

networkpolicies, err := npMgr.clientset.NetworkingV1().NetworkPolicies("").List(metav1.ListOptions{})
if err != nil {
log.Printf("Error Listing networkpolicies in GetClusterState")
}

npMgr.clusterState.PodCount = len(pods.Items)
npMgr.clusterState.NsCount = len(namespaces.Items)
npMgr.clusterState.NwPolicyCount = len(networkpolicies.Items)

return npMgr.clusterState
}

// UpdateAndSendReport updates the npm report then send it.
// This function should only be called when npMgr is locked.
func (npMgr *NetworkPolicyManager) UpdateAndSendReport(err error, eventMsg string) error {
if !npMgr.TelemetryEnabled {
return nil
}

clusterState := npMgr.GetClusterState()
v := reflect.ValueOf(npMgr.reportManager.Report).Elem().FieldByName("ClusterState")
if v.CanSet() {
Expand All @@ -69,7 +95,10 @@ func (npMgr *NetworkPolicyManager) UpdateAndSendReport(err error, eventMsg strin
reflect.ValueOf(npMgr.reportManager.Report).Elem().FieldByName("EventMessage").SetString(err.Error())
}

return npMgr.reportManager.SendReport(nil)
var telemetryBuffer *telemetry.TelemetryBuffer
connectToTelemetryServer(telemetryBuffer)

return npMgr.reportManager.SendReport(telemetryBuffer)
}

// Run starts shared informers and waits for the shared informer cache to sync.
Expand All @@ -93,8 +122,33 @@ func (npMgr *NetworkPolicyManager) Run(stopCh <-chan struct{}) error {
return nil
}

func connectToTelemetryServer(telemetryBuffer *telemetry.TelemetryBuffer) {
for {
telemetryBuffer = telemetry.NewTelemetryBuffer("")
err := telemetryBuffer.StartServer()
if err == nil || telemetryBuffer.FdExists {
connErr := telemetryBuffer.Connect()
if connErr == nil {
break
}

log.Printf("[NPM-Telemetry] Failed to establish telemetry manager connection.")
time.Sleep(time.Second * telemetryRetryWaitTimeInSeconds)
}
}
}

// RunReportManager starts NPMReportManager and send telemetry periodically.
func (npMgr *NetworkPolicyManager) RunReportManager() {
if !npMgr.TelemetryEnabled {
return
}

var telemetryBuffer *telemetry.TelemetryBuffer
connectToTelemetryServer(telemetryBuffer)

go telemetryBuffer.BufferAndPushData(time.Duration(0))

for {
clusterState := npMgr.GetClusterState()
v := reflect.ValueOf(npMgr.reportManager.Report).Elem().FieldByName("ClusterState")
Expand All @@ -104,11 +158,12 @@ func (npMgr *NetworkPolicyManager) RunReportManager() {
v.FieldByName("NwPolicyCount").SetInt(int64(clusterState.NwPolicyCount))
}

if err := npMgr.reportManager.SendReport(nil); err != nil {
log.Printf("Error sending NPM telemetry report")
if err := npMgr.reportManager.SendReport(telemetryBuffer); err != nil {
log.Printf("[NPM-Telemetry] Error sending NPM telemetry report")
connectToTelemetryServer(telemetryBuffer)
}

time.Sleep(1 * time.Minute)
time.Sleep(5 * time.Minute)
}
}

Expand Down Expand Up @@ -150,7 +205,8 @@ func NewNetworkPolicyManager(clientset *kubernetes.Clientset, informerFactory in
ContentType: contentType,
Report: &telemetry.NPMReport{},
},
serverVersion: serverVersion,
serverVersion: serverVersion,
TelemetryEnabled: true,
}

clusterID := util.GetClusterID(npMgr.nodeName)
Expand Down
4 changes: 0 additions & 4 deletions npm/nwpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ func (npMgr *NetworkPolicyManager) AddNetworkPolicy(npObj *networkingv1.NetworkP

allNs.npMap[npName] = npObj

npMgr.clusterState.NwPolicyCount++

ns, err := newNs(npNs)
if err != nil {
log.Printf("Error creating namespace %s\n", npNs)
Expand Down Expand Up @@ -141,8 +139,6 @@ func (npMgr *NetworkPolicyManager) DeleteNetworkPolicy(npObj *networkingv1.Netwo

delete(allNs.npMap, npName)

npMgr.clusterState.NwPolicyCount--

if len(allNs.npMap) == 0 {
if err = iptMgr.UninitNpmChains(); err != nil {
log.Printf("Error uninitialize azure-npm chains.\n")
Expand Down
9 changes: 6 additions & 3 deletions npm/nwpolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (

func TestAddNetworkPolicy(t *testing.T) {
npMgr := &NetworkPolicyManager{
nsMap: make(map[string]*namespace),
nsMap: make(map[string]*namespace),
TelemetryEnabled: false,
reportManager: &telemetry.ReportManager{
HostNetAgentURL: hostNetAgentURLForNpm,
ContentType: contentType,
Expand Down Expand Up @@ -97,7 +98,8 @@ func TestAddNetworkPolicy(t *testing.T) {

func TestUpdateNetworkPolicy(t *testing.T) {
npMgr := &NetworkPolicyManager{
nsMap: make(map[string]*namespace),
nsMap: make(map[string]*namespace),
TelemetryEnabled: false,
reportManager: &telemetry.ReportManager{
HostNetAgentURL: hostNetAgentURLForNpm,
ContentType: contentType,
Expand Down Expand Up @@ -204,7 +206,8 @@ func TestUpdateNetworkPolicy(t *testing.T) {

func TestDeleteNetworkPolicy(t *testing.T) {
npMgr := &NetworkPolicyManager{
nsMap: make(map[string]*namespace),
nsMap: make(map[string]*namespace),
TelemetryEnabled: false,
reportManager: &telemetry.ReportManager{
HostNetAgentURL: hostNetAgentURLForNpm,
ContentType: contentType,
Expand Down
2 changes: 1 addition & 1 deletion npm/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func parseEgress(ns string, targetSets []string, rules []networkingv1.NetworkPol
util.IptablesSetFlag,
util.IptablesMatchSetFlag,
hashedTargetSetName,
util.IptablesDstFlag,
util.IptablesSrcFlag,
util.IptablesJumpFlag,
util.IptablesAzureEgressToNsChain,
},
Expand Down
2 changes: 2 additions & 0 deletions npm/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func main() {
panic(err.Error)
}

// Disable Azure-NPM telemetry for now since it might throttle wireserver.
npMgr.TelemetryEnabled = false
go npMgr.RunReportManager()

select {}
Expand Down
4 changes: 0 additions & 4 deletions npm/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ func (npMgr *NetworkPolicyManager) AddPod(podObj *corev1.Pod) error {
labelKeys = append(labelKeys, labelKey)
}

npMgr.clusterState.PodCount++

ns, err := newNs(podNs)
if err != nil {
log.Printf("Error creating namespace %s\n", podNs)
Expand Down Expand Up @@ -172,7 +170,5 @@ func (npMgr *NetworkPolicyManager) DeletePod(podObj *corev1.Pod) error {
}
}

npMgr.clusterState.PodCount--

return nil
}
Loading