From b58a16fae66105986cca1ac3bb792ab99ad5dd70 Mon Sep 17 00:00:00 2001 From: Jaeryn Chu Date: Thu, 28 Mar 2019 10:17:17 -0700 Subject: [PATCH 1/3] Spawn telemetry buffer in a separate process instead of goroutine. --- cni/network/plugin/main.go | 34 ++++------------------------------ cns/service/main.go | 11 ++++++----- telemetry/cnstelemetry.go | 25 +++++++------------------ telemetry/telemetry.go | 8 +++++--- telemetry/telemetrybuffer.go | 35 +++++++++++++++++++++++++++++++++++ 5 files changed, 57 insertions(+), 56 deletions(-) diff --git a/cni/network/plugin/main.go b/cni/network/plugin/main.go index 5e1ae34ec0..2eb9b8cc30 100644 --- a/cni/network/plugin/main.go +++ b/cni/network/plugin/main.go @@ -22,11 +22,9 @@ import ( ) const ( - hostNetAgentURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=cnireport" - ipamQueryURL = "http://168.63.129.16/machine/plugins?comp=nmagent&type=getinterfaceinfov1" - pluginName = "CNI" - telemetryNumRetries = 5 - telemetryWaitTimeInMilliseconds = 200 + hostNetAgentURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=cnireport" + ipamQueryURL = "http://168.63.129.16/machine/plugins?comp=nmagent&type=getinterfaceinfov1" + pluginName = "CNI" ) // Version is populated by make during build. @@ -135,30 +133,6 @@ func handleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) { return isupdate, nil } -func connectToTelemetryService(tb *telemetry.TelemetryBuffer) { - path := fmt.Sprintf("%v/%v", telemetry.CniInstallDir, telemetry.TelemetryServiceProcessName) - args := []string{"-d", telemetry.CniInstallDir} - - for attempt := 0; attempt < 2; attempt++ { - if err := tb.Connect(); err != nil { - log.Printf("Connection to telemetry socket failed: %v", err) - tb.Cleanup(telemetry.FdName) - - if isExists, _ := common.CheckIfFileExists(path); !isExists { - log.Printf("Skip starting telemetry service as file didn't exist") - return - } - - telemetry.StartTelemetryService(path, args) - telemetry.WaitForTelemetrySocket(telemetryNumRetries, telemetryWaitTimeInMilliseconds) - } else { - tb.Connected = true - log.Printf("Connected to telemetry service") - return - } - } -} - // Main is the entry point for CNI network plugin. func main() { @@ -196,7 +170,7 @@ func main() { } tb := telemetry.NewTelemetryBuffer("") - connectToTelemetryService(tb) + tb.ConnectToTelemetryService() defer tb.Close() t := time.Now() diff --git a/cns/service/main.go b/cns/service/main.go index 94946de48d..6b8c48af9d 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -17,6 +17,7 @@ import ( "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" "github.com/Azure/azure-container-networking/store" + "github.com/Azure/azure-container-networking/telemetry" ) const ( @@ -168,7 +169,7 @@ func main() { ipamQueryInterval, _ := acn.GetArg(acn.OptIpamQueryInterval).(int) stopcnm = acn.GetArg(acn.OptStopAzureVnet).(bool) vers := acn.GetArg(acn.OptVersion).(bool) - // reportToHostInterval := acn.GetArg(acn.OptReportToHostInterval).(int) + reportToHostInterval := acn.GetArg(acn.OptReportToHostInterval).(int) if vers { printVersion() @@ -231,10 +232,10 @@ func main() { // Start CNS. if httpRestService != nil { - // go telemetry.SendCnsTelemetry(reportToHostInterval, - // reports, - // httpRestService.(*restserver.HTTPRestService), - // telemetryStopProcessing) + go telemetry.SendCnsTelemetry(reportToHostInterval, + reports, + httpRestService.(*restserver.HTTPRestService), + telemetryStopProcessing) err = httpRestService.Start(&config) if err != nil { log.Errorf("Failed to start CNS, err:%v.\n", err) diff --git a/telemetry/cnstelemetry.go b/telemetry/cnstelemetry.go index a554def7f7..cdc07e38d8 100644 --- a/telemetry/cnstelemetry.go +++ b/telemetry/cnstelemetry.go @@ -26,17 +26,10 @@ const ( func SendCnsTelemetry(interval int, reports chan interface{}, service *restserver.HTTPRestService, telemetryStopProcessing chan bool) { CONNECT: - telemetryBuffer := NewTelemetryBuffer("") - err := telemetryBuffer.StartServer() - if err == nil || telemetryBuffer.FdExists { - if err := telemetryBuffer.Connect(); err != nil { - log.Printf("[CNS-Telemetry] Failed to establish telemetry manager connection.") - time.Sleep(time.Second * retryWaitTimeInSeconds) - goto CONNECT - } - - go telemetryBuffer.BufferAndPushData(time.Duration(0)) - + tb := NewTelemetryBuffer("") + tb.ConnectToTelemetryService() + if tb.Connected { + go tb.BufferAndPushData(time.Duration(0)) heartbeat := time.NewTicker(time.Minute * heartbeatIntervalInMinutes).C reportMgr := ReportManager{ ContentType: ContentType, @@ -63,7 +56,7 @@ CONNECT: reflect.ValueOf(reportMgr.Report).Elem().FieldByName("EventMessage").SetString(msg.(string)) case <-telemetryStopProcessing: - telemetryBuffer.Cancel() + tb.Cancel() return } @@ -79,16 +72,12 @@ CONNECT: report, err := reportMgr.ReportToBytes() if err == nil { // If write fails, try to re-establish connections as server/client - if _, err = telemetryBuffer.Write(report); err != nil { + if _, err = tb.Write(report); err != nil { log.Printf("[CNS-Telemetry] Telemetry write failed: %v", err) - telemetryBuffer.Cancel() + tb.Cancel() goto CONNECT } } } - } else { - log.Printf("[CNS-Telemetry] Failed to start telemetry manager server.") - time.Sleep(time.Second * retryWaitTimeInSeconds) - goto CONNECT } } diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index ce410b7704..5c0affb4bf 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -26,9 +26,11 @@ const ( NPMTelemetryFile = platform.NPMRuntimePath + "AzureNPMTelemetry.json" // CNITelemetryFile Path. CNITelemetryFile = platform.CNIRuntimePath + "AzureCNITelemetry.json" - - metadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-08-01&format=json" - ContentType = "application/json" + // ContentType of JSON + ContentType = "application/json" + metadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-08-01&format=json" + telemetryNumRetries = 5 + telemetryWaitTimeInMilliseconds = 200 ) // OS Details structure. diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 36cb62de21..4c72b65beb 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -11,6 +11,8 @@ import ( "io/ioutil" "net" "net/http" + "os" + "path/filepath" "strings" "sync" "time" @@ -464,3 +466,36 @@ func ReadConfigFile(filePath string) (TelemetryConfig, error) { return config, err } + +// ConnectToTelemetryService - Attempt to spawn telemetry process if it's not already running. +func (tb *TelemetryBuffer) ConnectToTelemetryService() { + var path string + var args []string + + path = fmt.Sprintf("%v/%v", CniInstallDir, TelemetryServiceProcessName) + if exists, _ := common.CheckIfFileExists(path); !exists { + ex, _ := os.Executable() + exDir := filepath.Dir(ex) + path = fmt.Sprintf("%v/%v", exDir, TelemetryServiceProcessName) + if exists, _ = common.CheckIfFileExists(path); !exists { + log.Printf("Skip starting telemetry service as file didn't exist") + return + } + args = []string{"-d", exDir} + } else { + args = []string{"-d", CniInstallDir} + } + + for attempt := 0; attempt < 2; attempt++ { + if err := tb.Connect(); err != nil { + log.Printf("Connection to telemetry socket failed: %v", err) + tb.Cleanup(FdName) + StartTelemetryService(path, args) + WaitForTelemetrySocket(telemetryNumRetries, telemetryWaitTimeInMilliseconds) + } else { + tb.Connected = true + log.Printf("Connected to telemetry service") + return + } + } +} From 5a986ff7f34e74403100b5f7ba683a25ad7b9beb Mon Sep 17 00:00:00 2001 From: Jaeryn Chu Date: Fri, 29 Mar 2019 13:30:00 -0700 Subject: [PATCH 2/3] Adding an option to disable telemetry. --- cns/service/main.go | 10 +++++++++- common/config.go | 4 ++++ log/logger.go | 8 +++++--- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/cns/service/main.go b/cns/service/main.go index 6b8c48af9d..388237afaf 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -143,6 +143,13 @@ var args = acn.ArgumentList{ Type: "string", DefaultValue: platform.K8SNetConfigPath + string(os.PathSeparator) + defaultCNINetworkConfigFileName, }, + { + Name: acn.OptTelemetry, + Shorthand: acn.OptTelemetryAlias, + Description: "Set to false to disable telemetry", + Type: "bool", + DefaultValue: true, + }, } // Prints description and version information. @@ -198,7 +205,8 @@ func main() { return } - if logger := log.GetStd(); logger != nil { + // Set-up channel for CNS telemetry if it's enabled (enabled by default) + if logger := log.GetStd(); logger != nil && acn.GetArg(acn.OptTelemetry).(bool) { logger.SetChannel(reports) } diff --git a/common/config.go b/common/config.go index 839c1998ff..3206ac678f 100644 --- a/common/config.go +++ b/common/config.go @@ -71,4 +71,8 @@ const ( // Telemetry config Location OptTelemetryConfigDir = "telemetry-config-file" OptTelemetryConfigDirAlias = "d" + + // Disable Telemetry + OptTelemetry = "telemetry" + OptTelemetryAlias = "dt" ) diff --git a/log/logger.go b/log/logger.go index e321cea6cf..e4bcd954bc 100644 --- a/log/logger.go +++ b/log/logger.go @@ -216,7 +216,9 @@ func (logger *Logger) Debugf(format string, args ...interface{}) { // Errorf logs a formatted string at info level and sends the string to TelemetryBuffer. func (logger *Logger) Errorf(format string, args ...interface{}) { logger.Printf(format, args...) - // go func() { - // logger.reports <- fmt.Sprintf(format, args...) - // }() + go func() { + if logger.reports != nil { + logger.reports <- fmt.Sprintf(format, args...) + } + }() } From 4fd8e2b13c7efbbe9442474204f1839f8181aad5 Mon Sep 17 00:00:00 2001 From: Jaeryn Chu Date: Tue, 2 Apr 2019 09:08:35 -0700 Subject: [PATCH 3/3] Addressing some of Tamilmani's comments. --- cni/network/plugin/main.go | 10 ++++++---- cns/service/main.go | 22 +++++++++------------ telemetry/cnstelemetry.go | 19 +++++++++--------- telemetry/telemetry.go | 6 ++---- telemetry/telemetrybuffer.go | 37 +++++++++++++++++++----------------- 5 files changed, 47 insertions(+), 47 deletions(-) diff --git a/cni/network/plugin/main.go b/cni/network/plugin/main.go index 2eb9b8cc30..4531f2203a 100644 --- a/cni/network/plugin/main.go +++ b/cni/network/plugin/main.go @@ -22,9 +22,11 @@ import ( ) const ( - hostNetAgentURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=cnireport" - ipamQueryURL = "http://168.63.129.16/machine/plugins?comp=nmagent&type=getinterfaceinfov1" - pluginName = "CNI" + hostNetAgentURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=cnireport" + ipamQueryURL = "http://168.63.129.16/machine/plugins?comp=nmagent&type=getinterfaceinfov1" + pluginName = "CNI" + telemetryNumRetries = 5 + telemetryWaitTimeInMilliseconds = 200 ) // Version is populated by make during build. @@ -170,7 +172,7 @@ func main() { } tb := telemetry.NewTelemetryBuffer("") - tb.ConnectToTelemetryService() + tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds) defer tb.Close() t := time.Now() diff --git a/cns/service/main.go b/cns/service/main.go index 388237afaf..d8e1dfc084 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -122,13 +122,6 @@ var args = acn.ArgumentList{ Type: "bool", DefaultValue: false, }, - { - Name: acn.OptReportToHostInterval, - Shorthand: acn.OptReportToHostIntervalAlias, - Description: "Set interval in ms to report to host", - Type: "int", - DefaultValue: "60000", - }, { Name: acn.OptCNIPath, Shorthand: acn.OptCNIPathAlias, @@ -176,7 +169,7 @@ func main() { ipamQueryInterval, _ := acn.GetArg(acn.OptIpamQueryInterval).(int) stopcnm = acn.GetArg(acn.OptStopAzureVnet).(bool) vers := acn.GetArg(acn.OptVersion).(bool) - reportToHostInterval := acn.GetArg(acn.OptReportToHostInterval).(int) + telemetryEnabled := acn.GetArg(acn.OptTelemetry).(bool) if vers { printVersion() @@ -206,7 +199,7 @@ func main() { } // Set-up channel for CNS telemetry if it's enabled (enabled by default) - if logger := log.GetStd(); logger != nil && acn.GetArg(acn.OptTelemetry).(bool) { + if logger := log.GetStd(); logger != nil && telemetryEnabled { logger.SetChannel(reports) } @@ -240,10 +233,13 @@ func main() { // Start CNS. if httpRestService != nil { - go telemetry.SendCnsTelemetry(reportToHostInterval, - reports, - httpRestService.(*restserver.HTTPRestService), - telemetryStopProcessing) + if telemetryEnabled { + go telemetry.SendCnsTelemetry( + reports, + httpRestService.(*restserver.HTTPRestService), + telemetryStopProcessing) + } + err = httpRestService.Start(&config) if err != nil { log.Errorf("Failed to start CNS, err:%v.\n", err) diff --git a/telemetry/cnstelemetry.go b/telemetry/cnstelemetry.go index cdc07e38d8..271e0c51de 100644 --- a/telemetry/cnstelemetry.go +++ b/telemetry/cnstelemetry.go @@ -16,20 +16,21 @@ import ( const ( // CNSTelemetryFile - telemetry file path. - CNSTelemetryFile = platform.CNSRuntimePath + "AzureCNSTelemetry.json" - errorcodePrefix = 5 - heartbeatIntervalInMinutes = 30 - retryWaitTimeInSeconds = 60 + CNSTelemetryFile = platform.CNSRuntimePath + "AzureCNSTelemetry.json" + errorcodePrefix = 5 + heartbeatIntervalInMinutes = 30 + retryWaitTimeInSeconds = 60 + telemetryNumRetries = 5 + telemetryWaitTimeInMilliseconds = 200 ) // SendCnsTelemetry - handles cns telemetry reports -func SendCnsTelemetry(interval int, reports chan interface{}, service *restserver.HTTPRestService, telemetryStopProcessing chan bool) { +func SendCnsTelemetry(reports chan interface{}, service *restserver.HTTPRestService, telemetryStopProcessing chan bool) { CONNECT: tb := NewTelemetryBuffer("") - tb.ConnectToTelemetryService() + tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds) if tb.Connected { - go tb.BufferAndPushData(time.Duration(0)) heartbeat := time.NewTicker(time.Minute * heartbeatIntervalInMinutes).C reportMgr := ReportManager{ ContentType: ContentType, @@ -56,7 +57,7 @@ CONNECT: reflect.ValueOf(reportMgr.Report).Elem().FieldByName("EventMessage").SetString(msg.(string)) case <-telemetryStopProcessing: - tb.Cancel() + tb.Close() return } @@ -74,7 +75,7 @@ CONNECT: // If write fails, try to re-establish connections as server/client if _, err = tb.Write(report); err != nil { log.Printf("[CNS-Telemetry] Telemetry write failed: %v", err) - tb.Cancel() + tb.Close() goto CONNECT } } diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 5c0affb4bf..a232da0815 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -27,10 +27,8 @@ const ( // CNITelemetryFile Path. CNITelemetryFile = platform.CNIRuntimePath + "AzureCNITelemetry.json" // ContentType of JSON - ContentType = "application/json" - metadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-08-01&format=json" - telemetryNumRetries = 5 - telemetryWaitTimeInMilliseconds = 200 + ContentType = "application/json" + metadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-08-01&format=json" ) // OS Details structure. diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 4c72b65beb..87ddd2a14a 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -468,10 +468,24 @@ func ReadConfigFile(filePath string) (TelemetryConfig, error) { } // ConnectToTelemetryService - Attempt to spawn telemetry process if it's not already running. -func (tb *TelemetryBuffer) ConnectToTelemetryService() { - var path string - var args []string +func (tb *TelemetryBuffer) ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds int) { + path, dir := getTelemetryServiceDirectory() + args := []string{"-d", dir} + for attempt := 0; attempt < 2; attempt++ { + if err := tb.Connect(); err != nil { + log.Printf("Connection to telemetry socket failed: %v", err) + tb.Cleanup(FdName) + StartTelemetryService(path, args) + WaitForTelemetrySocket(telemetryNumRetries, time.Duration(telemetryWaitTimeInMilliseconds)) + } else { + tb.Connected = true + log.Printf("Connected to telemetry service") + return + } + } +} +func getTelemetryServiceDirectory() (path string, dir string) { path = fmt.Sprintf("%v/%v", CniInstallDir, TelemetryServiceProcessName) if exists, _ := common.CheckIfFileExists(path); !exists { ex, _ := os.Executable() @@ -481,21 +495,10 @@ func (tb *TelemetryBuffer) ConnectToTelemetryService() { log.Printf("Skip starting telemetry service as file didn't exist") return } - args = []string{"-d", exDir} + dir = exDir } else { - args = []string{"-d", CniInstallDir} + dir = CniInstallDir } - for attempt := 0; attempt < 2; attempt++ { - if err := tb.Connect(); err != nil { - log.Printf("Connection to telemetry socket failed: %v", err) - tb.Cleanup(FdName) - StartTelemetryService(path, args) - WaitForTelemetrySocket(telemetryNumRetries, telemetryWaitTimeInMilliseconds) - } else { - tb.Connected = true - log.Printf("Connected to telemetry service") - return - } - } + return }