From 6b841ee3545258effd39d6e82ce4b5ef94ab7dd2 Mon Sep 17 00:00:00 2001 From: Yongli Chen Date: Tue, 26 Mar 2019 14:18:18 -0700 Subject: [PATCH 1/3] handle failover --- npm/npm.go | 51 ++++++++++++++++++++++++++++++++++++++++------ npm/plugin/main.go | 2 +- npm/util/util.go | 11 ++++++++++ 3 files changed, 57 insertions(+), 7 deletions(-) diff --git a/npm/npm.go b/npm/npm.go index 57f2e307ac..c8f08c763b 100644 --- a/npm/npm.go +++ b/npm/npm.go @@ -3,13 +3,13 @@ package npm import ( - "fmt" "os" "reflect" "sync" "time" "github.com/Azure/azure-container-networking/log" + "github.com/Azure/azure-container-networking/npm/iptm" "github.com/Azure/azure-container-networking/npm/util" "github.com/Azure/azure-container-networking/telemetry" corev1 "k8s.io/api/core/v1" @@ -27,6 +27,9 @@ const ( hostNetAgentURLForNpm = "http://168.63.129.16/machine/plugins?comp=netagent&type=npmreport" contentType = "application/json" telemetryRetryWaitTimeInSeconds = 60 + restoreRetryWaitTimeInSeconds = 5 + restoreMaxRetries = 10 + backupWaitTimeInSeconds = 60 ) // NetworkPolicyManager contains informers for pod, namespace and networkpolicy. @@ -101,24 +104,60 @@ func (npMgr *NetworkPolicyManager) UpdateAndSendReport(err error, eventMsg strin return npMgr.reportManager.SendReport(telemetryBuffer) } -// Run starts shared informers and waits for the shared informer cache to sync. -func (npMgr *NetworkPolicyManager) Run(stopCh <-chan struct{}) error { +// restore restores iptables from backup file +func (npMgr *NetworkPolicyManager) restore() { + iptMgr := iptm.NewIptablesManager() + var err error + for i := 0; i < restoreMaxRetries; i++ { + if err = iptMgr.Restore(util.IptablesConfigFile); err == nil { + return + } + + time.Sleep(restoreRetryWaitTimeInSeconds * time.Second) + } + + log.Printf("Timeout restoring Azure-NPM states") + panic(err.Error) +} + +// backup takes snapshots of iptables filter table and saves it periodically. +func (npMgr *NetworkPolicyManager) backup() { + iptMgr := iptm.NewIptablesManager() + var err error + for { + time.Sleep(backupWaitTimeInSeconds * time.Second) + + if err = iptMgr.Save(util.IptablesConfigFile); err != nil { + log.Printf("Error backing up Azure-NPM states") + } + } +} + +// Start starts shared informers and waits for the shared informer cache to sync. +func (npMgr *NetworkPolicyManager) Start(stopCh <-chan struct{}) error { // Starts all informers manufactured by npMgr's informerFactory. npMgr.informerFactory.Start(stopCh) + // Failure detected. Needs to restore Azure-NPM related iptables entries. + if util.Exists(util.IptablesConfigFile) { + npMgr.restore() + } + // Wait for the initial sync of local cache. if !cache.WaitForCacheSync(stopCh, npMgr.podInformer.Informer().HasSynced) { - return fmt.Errorf("Pod informer failed to sync") + return log.Errorf("Pod informer failed to sync") } if !cache.WaitForCacheSync(stopCh, npMgr.nsInformer.Informer().HasSynced) { - return fmt.Errorf("Namespace informer failed to sync") + return log.Errorf("Namespace informer failed to sync") } if !cache.WaitForCacheSync(stopCh, npMgr.npInformer.Informer().HasSynced) { - return fmt.Errorf("Namespace informer failed to sync") + return log.Errorf("Namespace informer failed to sync") } + go npMgr.backup() + return nil } diff --git a/npm/plugin/main.go b/npm/plugin/main.go index 7c3cd9c16e..25909bd9b1 100644 --- a/npm/plugin/main.go +++ b/npm/plugin/main.go @@ -57,7 +57,7 @@ func main() { factory := informers.NewSharedInformerFactory(clientset, time.Hour*24) npMgr := npm.NewNetworkPolicyManager(clientset, factory, version) - err = npMgr.Run(wait.NeverStop) + err = npMgr.Start(wait.NeverStop) if err != nil { log.Printf("[Azure-NPM] npm failed with error %v.", err) panic(err.Error) diff --git a/npm/util/util.go b/npm/util/util.go index d523af06c5..5c5a0e8ef7 100644 --- a/npm/util/util.go +++ b/npm/util/util.go @@ -5,6 +5,7 @@ package util import ( "fmt" "hash/fnv" + "os" "strconv" "strings" @@ -14,6 +15,16 @@ import ( // IsNewNwPolicyVerFlag indicates if the current kubernetes version is newer than 1.11 or not var IsNewNwPolicyVerFlag = false +// Exists reports whether the named file or directory exists. +func Exists(name string) bool { + if _, err := os.Stat(name); err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +} + // GetClusterID retrieves cluster ID through node name. (Azure-specific) func GetClusterID(nodeName string) string { s := strings.Split(nodeName, "-") From 551c37669e9522734a807396b94d5e17df10eec5 Mon Sep 17 00:00:00 2001 From: Yongli Chen Date: Tue, 26 Mar 2019 14:26:10 -0700 Subject: [PATCH 2/3] return errorf on sync failure --- npm/npm.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/npm/npm.go b/npm/npm.go index c8f08c763b..806433d32b 100644 --- a/npm/npm.go +++ b/npm/npm.go @@ -3,6 +3,7 @@ package npm import ( + "fmt" "os" "reflect" "sync" @@ -145,15 +146,15 @@ func (npMgr *NetworkPolicyManager) Start(stopCh <-chan struct{}) error { // Wait for the initial sync of local cache. if !cache.WaitForCacheSync(stopCh, npMgr.podInformer.Informer().HasSynced) { - return log.Errorf("Pod informer failed to sync") + return fmt.Errorf("Pod informer failed to sync") } if !cache.WaitForCacheSync(stopCh, npMgr.nsInformer.Informer().HasSynced) { - return log.Errorf("Namespace informer failed to sync") + return fmt.Errorf("Namespace informer failed to sync") } if !cache.WaitForCacheSync(stopCh, npMgr.npInformer.Informer().HasSynced) { - return log.Errorf("Namespace informer failed to sync") + return fmt.Errorf("Namespace informer failed to sync") } go npMgr.backup() From 32ffd224b3f7ef9e49ebd335f284d4d134fccb19 Mon Sep 17 00:00:00 2001 From: Yongli Chen Date: Tue, 26 Mar 2019 16:59:55 -0700 Subject: [PATCH 3/3] fix Exists() --- npm/util/util.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/npm/util/util.go b/npm/util/util.go index 5c5a0e8ef7..0494273164 100644 --- a/npm/util/util.go +++ b/npm/util/util.go @@ -16,13 +16,14 @@ import ( var IsNewNwPolicyVerFlag = false // Exists reports whether the named file or directory exists. -func Exists(name string) bool { - if _, err := os.Stat(name); err != nil { - if os.IsNotExist(err) { - return false - } +func Exists(filePath string) bool { + if _, err := os.Stat(filePath); err == nil { + return true + } else if !os.IsNotExist(err) { + return true } - return true + + return false } // GetClusterID retrieves cluster ID through node name. (Azure-specific)