Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions npm/npm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/npm/iptm"
"github.com/Azure/azure-container-networking/npm/util"
"github.com/Azure/azure-container-networking/telemetry"
corev1 "k8s.io/api/core/v1"
Expand All @@ -27,6 +28,9 @@ const (
hostNetAgentURLForNpm = "http://168.63.129.16/machine/plugins?comp=netagent&type=npmreport"
contentType = "application/json"
telemetryRetryWaitTimeInSeconds = 60
restoreRetryWaitTimeInSeconds = 5
restoreMaxRetries = 10
backupWaitTimeInSeconds = 60
)

// NetworkPolicyManager contains informers for pod, namespace and networkpolicy.
Expand Down Expand Up @@ -101,11 +105,45 @@ func (npMgr *NetworkPolicyManager) UpdateAndSendReport(err error, eventMsg strin
return npMgr.reportManager.SendReport(telemetryBuffer)
}

// Run starts shared informers and waits for the shared informer cache to sync.
func (npMgr *NetworkPolicyManager) Run(stopCh <-chan struct{}) error {
// restore restores iptables from backup file
func (npMgr *NetworkPolicyManager) restore() {
iptMgr := iptm.NewIptablesManager()
var err error
for i := 0; i < restoreMaxRetries; i++ {
if err = iptMgr.Restore(util.IptablesConfigFile); err == nil {
return
}

time.Sleep(restoreRetryWaitTimeInSeconds * time.Second)
}

log.Printf("Timeout restoring Azure-NPM states")
panic(err.Error)
}

// backup takes snapshots of iptables filter table and saves it periodically.
func (npMgr *NetworkPolicyManager) backup() {
iptMgr := iptm.NewIptablesManager()
var err error
for {
time.Sleep(backupWaitTimeInSeconds * time.Second)

if err = iptMgr.Save(util.IptablesConfigFile); err != nil {
log.Printf("Error backing up Azure-NPM states")
}
}
}

// Start starts shared informers and waits for the shared informer cache to sync.
func (npMgr *NetworkPolicyManager) Start(stopCh <-chan struct{}) error {
// Starts all informers manufactured by npMgr's informerFactory.
npMgr.informerFactory.Start(stopCh)

// Failure detected. Needs to restore Azure-NPM related iptables entries.
if util.Exists(util.IptablesConfigFile) {
npMgr.restore()
}

// Wait for the initial sync of local cache.
if !cache.WaitForCacheSync(stopCh, npMgr.podInformer.Informer().HasSynced) {
return fmt.Errorf("Pod informer failed to sync")
Expand All @@ -119,6 +157,8 @@ func (npMgr *NetworkPolicyManager) Run(stopCh <-chan struct{}) error {
return fmt.Errorf("Namespace informer failed to sync")
}

go npMgr.backup()

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion npm/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
factory := informers.NewSharedInformerFactory(clientset, time.Hour*24)

npMgr := npm.NewNetworkPolicyManager(clientset, factory, version)
err = npMgr.Run(wait.NeverStop)
err = npMgr.Start(wait.NeverStop)
if err != nil {
log.Printf("[Azure-NPM] npm failed with error %v.", err)
panic(err.Error)
Expand Down
12 changes: 12 additions & 0 deletions npm/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package util
import (
"fmt"
"hash/fnv"
"os"
"strconv"
"strings"

Expand All @@ -14,6 +15,17 @@ import (
// IsNewNwPolicyVerFlag indicates if the current kubernetes version is newer than 1.11 or not
var IsNewNwPolicyVerFlag = false

// Exists reports whether the named file or directory exists.
func Exists(filePath string) bool {
if _, err := os.Stat(filePath); err == nil {
return true
} else if !os.IsNotExist(err) {
return true
}

return false
}

// GetClusterID retrieves cluster ID through node name. (Azure-specific)
func GetClusterID(nodeName string) string {
s := strings.Split(nodeName, "-")
Expand Down