diff --git a/Makefile b/Makefile index 8c59cdad94..7cc6701158 100644 --- a/Makefile +++ b/Makefile @@ -78,6 +78,8 @@ CNI_MULTITENANCY_BUILD_DIR = $(BUILD_DIR)/cni-multitenancy CNS_BUILD_DIR = $(BUILD_DIR)/cns NPM_BUILD_DIR = $(BUILD_DIR)/npm NPM_TELEMETRY_DIR = $(NPM_BUILD_DIR)/telemetry +CNI_AI_ID = 5515a1eb-b2bc-406a-98eb-ba462e6f0411 +ACN_PACKAGE_PATH = github.com/Azure/azure-container-networking # Containerized build parameters. BUILD_CONTAINER_IMAGE = acn-build @@ -169,7 +171,7 @@ $(CNI_BUILD_DIR)/azure-vnet-ipam$(EXE_EXT): $(CNIFILES) # Build the Azure CNI telemetry plugin. $(CNI_BUILD_DIR)/azure-vnet-telemetry$(EXE_EXT): $(CNIFILES) - go build -v -o $(CNI_BUILD_DIR)/azure-vnet-telemetry$(EXE_EXT) -ldflags "-X main.version=$(VERSION) -s -w" $(CNI_TELEMETRY_DIR)/*.go + go build -v -o $(CNI_BUILD_DIR)/azure-vnet-telemetry$(EXE_EXT) -ldflags "-X main.version=$(VERSION) -X $(ACN_PACKAGE_PATH)/telemetry.aiMetadata=$(CNI_AI_ID) -s -w" $(CNI_TELEMETRY_DIR)/*.go # Build the Azure CNS Service. $(CNS_BUILD_DIR)/azure-cns$(EXE_EXT): $(CNSFILES) diff --git a/aitelemetry/api.go b/aitelemetry/api.go index 3ebe8f6040..d23389edd4 100644 --- a/aitelemetry/api.go +++ b/aitelemetry/api.go @@ -28,6 +28,8 @@ type AIConfig struct { BatchInterval int DisableMetadataRefreshThread bool RefreshTimeout int + GetEnvRetryCount int + GetEnvRetryWaitTimeInSecs int DebugMode bool } diff --git a/aitelemetry/telemetrywrapper.go b/aitelemetry/telemetrywrapper.go index 4d69f85c37..aaad0a3eda 100644 --- a/aitelemetry/telemetrywrapper.go +++ b/aitelemetry/telemetrywrapper.go @@ -1,6 +1,7 @@ package aitelemetry import ( + "fmt" "runtime" "time" @@ -11,14 +12,16 @@ import ( ) const ( - resourceGroupStr = "ResourceGroup" - vmSizeStr = "VMSize" - osVersionStr = "OSVersion" - locationStr = "Region" - appNameStr = "AppName" - subscriptionIDStr = "SubscriptionID" - vmNameStr = "VMName" - defaultTimeout = 10 + resourceGroupStr = "ResourceGroup" + vmSizeStr = "VMSize" + osVersionStr = "OSVersion" + locationStr = "Region" + appNameStr = "AppName" + subscriptionIDStr = "SubscriptionID" + vmNameStr = "VMName" + versionStr = "AppVersion" + azurePublicCloudStr = "AzurePublicCloud" + defaultTimeout = 10 ) var debugMode bool @@ -26,7 +29,7 @@ var debugMode bool func messageListener() appinsights.DiagnosticsMessageListener { if debugMode { return appinsights.NewDiagnosticsMessageListener(func(msg string) error { - debuglog("[AppInsights] [%s] %s\n", time.Now().Format(time.UnixDate), msg) + debugLog("[AppInsights] [%s] %s\n", time.Now().Format(time.UnixDate), msg) return nil }) } @@ -34,7 +37,7 @@ func messageListener() appinsights.DiagnosticsMessageListener { return nil } -func debuglog(format string, args ...interface{}) { +func debugLog(format string, args ...interface{}) { if debugMode { log.Printf(format, args...) } @@ -55,12 +58,12 @@ func getMetadata(th *telemetryHandle) { break } - debuglog("[AppInsights] Error getting metadata %v. Sleep for %d", err, th.refreshTimeout) + debugLog("[AppInsights] Error getting metadata %v. Sleep for %d", err, th.refreshTimeout) time.Sleep(time.Duration(th.refreshTimeout) * time.Second) } if err != nil { - debuglog("[AppInsights] Error getting metadata %v", err) + debugLog("[AppInsights] Error getting metadata %v", err) return } @@ -72,7 +75,7 @@ func getMetadata(th *telemetryHandle) { // Save metadata retrieved from wireserver to a file kvs, err := store.NewJsonFileStore(metadataFile) if err != nil { - debuglog("[AppInsights] Error initializing kvs store: %v", err) + debugLog("[AppInsights] Error initializing kvs store: %v", err) return } @@ -80,20 +83,55 @@ func getMetadata(th *telemetryHandle) { err = common.SaveHostMetadata(th.metadata, metadataFile) kvs.Unlock(true) if err != nil { - debuglog("[AppInsights] saving host metadata failed with :%v", err) + debugLog("[AppInsights] saving host metadata failed with :%v", err) } } +func isPublicEnvironment(url string, retryCount, waitTimeInSecs int) (bool, error) { + var ( + cloudName string + err error + ) + + for i := 0; i < retryCount; i++ { + cloudName, err = common.GetAzureCloud(url) + if cloudName == azurePublicCloudStr { + debugLog("[AppInsights] CloudName: %s\n", cloudName) + return true, nil + } else if err == nil { + debugLog("[AppInsights] This is not azure public cloud:%s", cloudName) + return false, fmt.Errorf("Not an azure public cloud: %s", cloudName) + } + + debugLog("GetAzureCloud returned err :%v", err) + time.Sleep(time.Duration(waitTimeInSecs) * time.Second) + } + + return false, err +} + // NewAITelemetry creates telemetry handle with user specified appinsights id. func NewAITelemetry( + azEnvUrl string, id string, aiConfig AIConfig, -) TelemetryHandle { +) (TelemetryHandle, error) { + debugMode = aiConfig.DebugMode + + if id == "" { + debugLog("Empty AI key") + return nil, fmt.Errorf("AI key is empty") + } + + // check if azure instance is in public cloud + isPublic, err := isPublicEnvironment(azEnvUrl, aiConfig.GetEnvRetryCount, aiConfig.GetEnvRetryWaitTimeInSecs) + if !isPublic { + return nil, err + } telemetryConfig := appinsights.NewTelemetryConfiguration(id) telemetryConfig.MaxBatchSize = aiConfig.BatchSize telemetryConfig.MaxBatchInterval = time.Duration(aiConfig.BatchInterval) * time.Second - debugMode = aiConfig.DebugMode th := &telemetryHandle{ client: appinsights.NewTelemetryClientFromConfig(telemetryConfig), @@ -110,7 +148,7 @@ func NewAITelemetry( go getMetadata(th) } - return th + return th, nil } // TrackLog function sends report (trace) to appinsights resource. It overrides few of the existing columns with app information @@ -167,6 +205,7 @@ func (th *telemetryHandle) TrackMetric(metric Metric) { aimetric.Properties[locationStr] = th.metadata.Location aimetric.Properties[subscriptionIDStr] = th.metadata.SubscriptionID aimetric.Properties[vmNameStr] = th.metadata.VMName + aimetric.Properties[versionStr] = th.appVersion } // copy custom dimensions diff --git a/aitelemetry/telemetrywrapper_test.go b/aitelemetry/telemetrywrapper_test.go index baa16faa31..4edebadae6 100644 --- a/aitelemetry/telemetrywrapper_test.go +++ b/aitelemetry/telemetrywrapper_test.go @@ -2,17 +2,33 @@ package aitelemetry import ( "fmt" + "net/http" + "net/url" "os" "path/filepath" "runtime" "testing" + "github.com/Azure/azure-container-networking/common" + "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" ) -var th TelemetryHandle +var ( + th TelemetryHandle + hostAgentUrl = "localhost:3501" + getCloudResponse = "AzurePublicCloud" + httpURL = "http://" + hostAgentUrl +) func TestMain(m *testing.M) { + log.SetLogDirectory("/var/log/") + log.SetName("testaitelemetry") + log.SetLevel(log.LevelInfo) + err := log.SetTarget(log.TargetLogfile) + if err == nil { + fmt.Printf("TestST LogDir configuration succeeded\n") + } if runtime.GOOS == "linux" { platform.ExecuteCommand("cp metadata_test.json /tmp/azuremetadata.json") @@ -22,6 +38,20 @@ func TestMain(m *testing.M) { platform.ExecuteCommand(cmd) } + hostu, _ := url.Parse("tcp://" + hostAgentUrl) + hostAgent, err := common.NewListener(hostu) + if err != nil { + fmt.Printf("Failed to create agent, err:%v.\n", err) + return + } + + hostAgent.AddHandler("/", handleGetCloud) + err = hostAgent.Start(make(chan error, 1)) + if err != nil { + fmt.Printf("Failed to start agent, err:%v.\n", err) + return + } + exitCode := m.Run() if runtime.GOOS == "linux" { @@ -32,10 +62,17 @@ func TestMain(m *testing.M) { platform.ExecuteCommand(cmd) } + log.Close() os.Exit(exitCode) } +func handleGetCloud(w http.ResponseWriter, req *http.Request) { + w.Write([]byte(getCloudResponse)) +} + func TestEmptyAIKey(t *testing.T) { + var err error + aiConfig := AIConfig{ AppName: "testapp", AppVersion: "v1.0.26", @@ -45,26 +82,29 @@ func TestEmptyAIKey(t *testing.T) { DebugMode: true, DisableMetadataRefreshThread: true, } - th := NewAITelemetry("", aiConfig) - if th == nil { - t.Errorf("Error intializing AI telemetry") + _, err = NewAITelemetry(httpURL, "", aiConfig) + if err == nil { + t.Errorf("Error intializing AI telemetry:%v", err) } - th.Close(10) } func TestNewAITelemetry(t *testing.T) { + var err error + aiConfig := AIConfig{ AppName: "testapp", AppVersion: "v1.0.26", BatchSize: 4096, BatchInterval: 2, RefreshTimeout: 10, + GetEnvRetryCount: 1, + GetEnvRetryWaitTimeInSecs: 2, DebugMode: true, DisableMetadataRefreshThread: true, } - th = NewAITelemetry("00ca2a73-c8d6-4929-a0c2-cf84545ec225", aiConfig) + th, err = NewAITelemetry(httpURL, "00ca2a73-c8d6-4929-a0c2-cf84545ec225", aiConfig) if th == nil { - t.Errorf("Error intializing AI telemetry") + t.Errorf("Error intializing AI telemetry: %v", err) } } @@ -95,6 +135,8 @@ func TestClose(t *testing.T) { } func TestClosewithoutSend(t *testing.T) { + var err error + aiConfig := AIConfig{ AppName: "testapp", AppVersion: "v1.0.26", @@ -102,11 +144,13 @@ func TestClosewithoutSend(t *testing.T) { BatchInterval: 2, DisableMetadataRefreshThread: true, RefreshTimeout: 10, + GetEnvRetryCount: 1, + GetEnvRetryWaitTimeInSecs: 2, } - thtest := NewAITelemetry("00ca2a73-c8d6-4929-a0c2-cf84545ec225", aiConfig) + thtest, err := NewAITelemetry(httpURL, "00ca2a73-c8d6-4929-a0c2-cf84545ec225", aiConfig) if thtest == nil { - t.Errorf("Error intializing AI telemetry") + t.Errorf("Error intializing AI telemetry:%v", err) } thtest.Close(10) diff --git a/cni/network/network.go b/cni/network/network.go index 8510c27d11..5406cb45f5 100644 --- a/cni/network/network.go +++ b/cni/network/network.go @@ -13,6 +13,7 @@ import ( "strings" "time" + "github.com/Azure/azure-container-networking/aitelemetry" "github.com/Azure/azure-container-networking/cni" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/cnsclient" @@ -61,6 +62,7 @@ type netPlugin struct { *cni.Plugin nm network.NetworkManager report *telemetry.CNIReport + tb *telemetry.TelemetryBuffer } // snatConfiguration contains a bool that determines whether CNI enables snat on host and snat for dns @@ -91,8 +93,9 @@ func NewPlugin(name string, config *common.PluginConfig) (*netPlugin, error) { }, nil } -func (plugin *netPlugin) SetCNIReport(report *telemetry.CNIReport) { +func (plugin *netPlugin) SetCNIReport(report *telemetry.CNIReport, tb *telemetry.TelemetryBuffer) { plugin.report = report + plugin.tb = tb } // Starts the plugin. @@ -188,6 +191,29 @@ func (plugin *netPlugin) getPodInfo(args string) (string, string, error) { return k8sPodName, k8sNamespace, nil } +func SetCustomDimensions(cniMetric *telemetry.AIMetric, nwCfg *cni.NetworkConfig, err error) { + if cniMetric == nil { + log.Errorf("[CNI] Unable to set custom dimension. Report is nil") + return + } + + if err != nil { + cniMetric.Metric.CustomDimensions[telemetry.StatusStr] = telemetry.FailedStr + } else { + cniMetric.Metric.CustomDimensions[telemetry.StatusStr] = telemetry.SucceededStr + } + + if nwCfg != nil { + if nwCfg.MultiTenancy { + cniMetric.Metric.CustomDimensions[telemetry.CNIModeStr] = telemetry.MultiTenancyStr + } else { + cniMetric.Metric.CustomDimensions[telemetry.CNIModeStr] = telemetry.SingleTenancyStr + } + + cniMetric.Metric.CustomDimensions[telemetry.CNINetworkModeStr] = nwCfg.Mode + } +} + func (plugin *netPlugin) setCNIReportDetails(nwCfg *cni.NetworkConfig, opType string, msg string) { if nwCfg.MultiTenancy { plugin.report.Context = "AzureCNIMultitenancy" @@ -220,8 +246,11 @@ func (plugin *netPlugin) Add(args *cniSkel.CmdArgs) error { enableInfraVnet bool enableSnatForDns bool nwDNSInfo network.DNSInfo + cniMetric telemetry.AIMetric ) + startTime := time.Now() + log.Printf("[cni-net] Processing ADD command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v StdinData:%s}.", args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData) @@ -245,6 +274,15 @@ func (plugin *netPlugin) Add(args *cniSkel.CmdArgs) error { plugin.setCNIReportDetails(nwCfg, CNI_ADD, "") defer func() { + operationTimeMs := time.Since(startTime).Milliseconds() + cniMetric.Metric = aitelemetry.Metric{ + Name: telemetry.CNIAddTimeMetricStr, + Value: float64(operationTimeMs), + CustomDimensions: make(map[string]string), + } + SetCustomDimensions(&cniMetric, nwCfg, err) + telemetry.SendCNIMetric(&cniMetric, plugin.tb) + // Add Interfaces to result. if result == nil { result = &cniTypesCurr.Result{} @@ -255,9 +293,7 @@ func (plugin *netPlugin) Add(args *cniSkel.CmdArgs) error { } result.Interfaces = append(result.Interfaces, iface) - addSnatInterface(nwCfg, result) - // Convert result to the requested CNI version. res, vererr := result.GetAsVersion(nwCfg.CNIVersion) if vererr != nil { @@ -279,6 +315,8 @@ func (plugin *netPlugin) Add(args *cniSkel.CmdArgs) error { return err } + plugin.report.ContainerName = k8sPodName + ":" + k8sNamespace + if nwCfg.MultiTenancy { // Initialize CNSClient cnsclient.InitCnsClient(nwCfg.CNSUrl) @@ -655,8 +693,11 @@ func (plugin *netPlugin) Delete(args *cniSkel.CmdArgs) error { networkId string nwInfo *network.NetworkInfo epInfo *network.EndpointInfo + cniMetric telemetry.AIMetric ) + startTime := time.Now() + log.Printf("[cni-net] Processing DEL command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v, StdinData:%s}.", args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData) @@ -736,6 +777,15 @@ func (plugin *netPlugin) Delete(args *cniSkel.CmdArgs) error { msg := fmt.Sprintf("CNI DEL succeeded : Released ip %+v podname %v namespace %v", nwCfg.Ipam.Address, k8sPodName, k8sNamespace) plugin.setCNIReportDetails(nwCfg, CNI_DEL, msg) + operationTimeMs := time.Since(startTime).Milliseconds() + cniMetric.Metric = aitelemetry.Metric{ + Name: telemetry.CNIDelTimeMetricStr, + Value: float64(operationTimeMs), + CustomDimensions: make(map[string]string), + } + SetCustomDimensions(&cniMetric, nwCfg, nil) + telemetry.SendCNIMetric(&cniMetric, plugin.tb) + return nil } @@ -751,8 +801,11 @@ func (plugin *netPlugin) Update(args *cniSkel.CmdArgs) error { cnsClient *cnsclient.CNSClient orchestratorContext []byte targetNetworkConfig *cns.GetNetworkContainerResponse + cniMetric telemetry.AIMetric ) + startTime := time.Now() + log.Printf("[cni-net] Processing UPDATE command with args {Netns:%v Args:%v Path:%v}.", args.Netns, args.Args, args.Path) @@ -768,6 +821,15 @@ func (plugin *netPlugin) Update(args *cniSkel.CmdArgs) error { plugin.setCNIReportDetails(nwCfg, CNI_UPDATE, "") defer func() { + operationTimeMs := time.Since(startTime).Milliseconds() + cniMetric.Metric = aitelemetry.Metric{ + Name: telemetry.CNIUpdateTimeMetricStr, + Value: float64(operationTimeMs), + CustomDimensions: make(map[string]string), + } + SetCustomDimensions(&cniMetric, nwCfg, err) + telemetry.SendCNIMetric(&cniMetric, plugin.tb) + if result == nil { result = &cniTypesCurr.Result{} } diff --git a/cni/network/plugin/main.go b/cni/network/plugin/main.go index 53a7d438e4..8b95a41ff5 100644 --- a/cni/network/plugin/main.go +++ b/cni/network/plugin/main.go @@ -11,6 +11,7 @@ import ( "reflect" "time" + "github.com/Azure/azure-container-networking/aitelemetry" "github.com/Azure/azure-container-networking/cni" "github.com/Azure/azure-container-networking/cni/network" "github.com/Azure/azure-container-networking/common" @@ -126,6 +127,8 @@ func handleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) { // Main is the entry point for CNI network plugin. func main() { + startTime := time.Now() + // Initialize and parse command line arguments. acn.ParseArgs(&args, printVersion) vers := acn.GetArg(acn.OptVersion).(bool) @@ -136,8 +139,9 @@ func main() { } var ( - config common.PluginConfig - err error + config common.PluginConfig + err error + cnimetric telemetry.AIMetric ) log.SetName(name) @@ -169,7 +173,6 @@ func main() { } cniReport.GetReport(pluginName, version, ipamQueryURL) - startTime := time.Now().UnixNano() / int64(time.Millisecond) netPlugin, err := network.NewPlugin(name, &config) if err != nil { @@ -177,8 +180,6 @@ func main() { return } - netPlugin.SetCNIReport(cniReport) - // CNI Acquires lock if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil { log.Errorf("Failed to initialize key-value store of network plugin, err:%v.\n", err) @@ -214,6 +215,8 @@ func main() { tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds) defer tb.Close() + netPlugin.SetCNIReport(cniReport, tb) + t := time.Now() cniReport.Timestamp = t.Format("2006-01-02 15:04:05") @@ -230,9 +233,6 @@ func main() { log.Errorf("Failed to execute network plugin, err:%v.\n", err) } - endTime := time.Now().UnixNano() / int64(time.Millisecond) - reflect.ValueOf(reportManager.Report).Elem().FieldByName("OperationDuration").SetInt(int64(endTime - startTime)) - netPlugin.Stop() // release cni lock @@ -240,6 +240,15 @@ func main() { log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit) } + executionTimeMs := time.Since(startTime).Milliseconds() + cnimetric.Metric = aitelemetry.Metric{ + Name: telemetry.CNIExecutimeMetricStr, + Value: float64(executionTimeMs), + CustomDimensions: make(map[string]string), + } + network.SetCustomDimensions(&cnimetric, nil, err) + telemetry.SendCNIMetric(&cnimetric, tb) + if err != nil { reportPluginError(reportManager, tb, err) panic("network plugin execute fatal error") @@ -247,6 +256,7 @@ func main() { // Report CNI successfully finished execution. reflect.ValueOf(reportManager.Report).Elem().FieldByName("CniSucceeded").SetBool(true) + reflect.ValueOf(reportManager.Report).Elem().FieldByName("OperationDuration").SetInt(executionTimeMs) if err = reportManager.SendReport(tb); err != nil { log.Errorf("SendReport failed due to %v", err) diff --git a/cni/telemetry/service/telemetrymain.go b/cni/telemetry/service/telemetrymain.go index 07cc5e08a3..1a8229e3d9 100644 --- a/cni/telemetry/service/telemetrymain.go +++ b/cni/telemetry/service/telemetrymain.go @@ -8,15 +8,22 @@ import ( "runtime" "time" + "github.com/Azure/azure-container-networking/aitelemetry" acn "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/telemetry" ) const ( - reportToHostIntervalInSeconds = 30 - azureVnetTelemetry = "azure-vnet-telemetry" - configExtension = ".config" + defaultReportToHostIntervalInSecs = 30 + defaultRefreshTimeoutInSecs = 15 + defaultBatchSizeInBytes = 16384 + defaultBatchIntervalInSecs = 15 + defaultGetEnvRetryCount = 2 + defaultGetEnvRetryWaitTimeInSecs = 3 + pluginName = "AzureCNI" + azureVnetTelemetry = "azure-vnet-telemetry" + configExtension = ".config" ) var version string @@ -76,6 +83,32 @@ func printVersion() { fmt.Printf("Version %v\n", version) } +func setTelemetryDefaults(config *telemetry.TelemetryConfig) { + if config.ReportToHostIntervalInSeconds == 0 { + config.ReportToHostIntervalInSeconds = defaultReportToHostIntervalInSecs + } + + if config.RefreshTimeoutInSecs == 0 { + config.RefreshTimeoutInSecs = defaultRefreshTimeoutInSecs + } + + if config.BatchIntervalInSecs == 0 { + config.BatchIntervalInSecs = defaultBatchIntervalInSecs + } + + if config.BatchSizeInBytes == 0 { + config.BatchSizeInBytes = defaultBatchSizeInBytes + } + + if config.GetEnvRetryCount == 0 { + config.GetEnvRetryCount = defaultGetEnvRetryCount + } + + if config.GetEnvRetryWaitTimeInSecs == 0 { + config.GetEnvRetryWaitTimeInSecs = defaultGetEnvRetryWaitTimeInSecs + } +} + func main() { var tb *telemetry.TelemetryBuffer var config telemetry.TelemetryConfig @@ -123,6 +156,10 @@ func main() { log.Logf("read config returned %+v", config) + setTelemetryDefaults(&config) + + log.Logf("Config after setting defaults %+v", config) + // Cleaning up orphan socket if present tbtemp := telemetry.NewTelemetryBuffer("") tbtemp.Cleanup(telemetry.FdName) @@ -131,7 +168,7 @@ func main() { tb = telemetry.NewTelemetryBuffer("") log.Logf("[Telemetry] Starting telemetry server") - err = tb.StartServer() + err = tb.StartServer(config.DisableTelemetryToNetAgent) if err == nil || tb.FdExists { break } @@ -141,11 +178,23 @@ func main() { time.Sleep(time.Millisecond * 200) } - if config.ReportToHostIntervalInSeconds == 0 { - config.ReportToHostIntervalInSeconds = reportToHostIntervalInSeconds + aiConfig := aitelemetry.AIConfig{ + AppName: pluginName, + AppVersion: version, + BatchSize: config.BatchSizeInBytes, + BatchInterval: config.BatchIntervalInSecs, + RefreshTimeout: config.RefreshTimeoutInSecs, + DisableMetadataRefreshThread: config.DisableMetadataThread, + DebugMode: config.DebugMode, + GetEnvRetryCount: config.GetEnvRetryCount, + GetEnvRetryWaitTimeInSecs: config.GetEnvRetryWaitTimeInSecs, } + err = telemetry.CreateAITelemetryHandle(aiConfig, config.DisableAll, config.DisableTrace, config.DisableMetric) + log.Printf("[Telemetry] AI Handle creation status:%v", err) log.Logf("[Telemetry] Report to host for an interval of %d seconds", config.ReportToHostIntervalInSeconds) tb.BufferAndPushData(config.ReportToHostIntervalInSeconds * time.Second) + telemetry.CloseAITelemetryHandle() + log.Close() } diff --git a/common/utils.go b/common/utils.go index 8b08a37ba3..d392989f3b 100644 --- a/common/utils.go +++ b/common/utils.go @@ -14,6 +14,7 @@ import ( "net" "net/http" "os" + "strings" "time" "github.com/Azure/azure-container-networking/log" @@ -21,8 +22,9 @@ import ( const ( metadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-08-01&format=json" - httpConnectionTimeout = 10 - headerTimeout = 20 + azCloudUrl = "http://169.254.169.254/metadata/instance/compute/azEnvironment?api-version=2018-10-01&format=text" + httpConnectionTimeout = 7 + headerTimeout = 7 ) // XmlDocument - Azure host agent XML document format. @@ -288,3 +290,36 @@ func SaveHostMetadata(metadata Metadata, fileName string) error { return err } + +func GetAzureCloud(url string) (string, error) { + if url == "" { + url = azCloudUrl + } + + log.Printf("GetAzureCloud querying url: %s", url) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return "", err + } + + req.Header.Set("Metadata", "True") + + client := InitHttpClient(httpConnectionTimeout, headerTimeout) + resp, err := client.Do(req) + if err != nil { + return "", err + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("Bad http status:%v", resp.Status) + } + + bodyBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + + return strings.TrimSpace(string(bodyBytes)), nil +} diff --git a/telemetry/aiwrapper.go b/telemetry/aiwrapper.go new file mode 100644 index 0000000000..527af04922 --- /dev/null +++ b/telemetry/aiwrapper.go @@ -0,0 +1,82 @@ +// Copyright Microsoft. All rights reserved. +package telemetry + +import ( + "fmt" + "runtime" + + "github.com/Azure/azure-container-networking/aitelemetry" + "github.com/Azure/azure-container-networking/log" +) + +var ( + aiMetadata string + th aitelemetry.TelemetryHandle + gDisableTrace bool + gDisableMetric bool +) + +const ( + // Wait time for AI to gracefully close AI telemetry session + waitTimeInSecs = 10 +) + +func CreateAITelemetryHandle(aiConfig aitelemetry.AIConfig, disableAll, disableMetric, disableTrace bool) error { + var err error + + if disableAll { + log.Printf("Telemetry is disabled") + return fmt.Errorf("Telmetry disabled") + } + + th, err = aitelemetry.NewAITelemetry("", aiMetadata, aiConfig) + if err != nil { + return err + } + + gDisableMetric = disableMetric + gDisableTrace = disableTrace + return nil +} + +func SendAITelemetry(cnireport CNIReport) { + if th == nil || gDisableTrace { + return + } + + var msg string + if cnireport.ErrorMessage != "" { + msg = cnireport.ErrorMessage + } else { + msg = cnireport.EventMessage + } + + report := aitelemetry.Report{ + Message: "CNI:" + msg, + Context: cnireport.ContainerName, + CustomDimensions: make(map[string]string), + } + + report.CustomDimensions[ContextStr] = cnireport.Context + report.CustomDimensions[SubContextStr] = cnireport.SubContext + report.CustomDimensions[VMUptimeStr] = cnireport.VMUptime + report.CustomDimensions[OperationTypeStr] = cnireport.OperationType + report.CustomDimensions[VersionStr] = cnireport.Version + + th.TrackLog(report) +} + +func SendAIMetric(aiMetric AIMetric) { + if th == nil || gDisableMetric { + return + } + + aiMetric.Metric.CustomDimensions[OSTypeStr] = runtime.GOOS + th.TrackMetric(aiMetric.Metric) +} + +func CloseAITelemetryHandle() { + if th != nil { + th.Close(waitTimeInSecs) + } +} diff --git a/telemetry/azure-vnet-telemetry.config b/telemetry/azure-vnet-telemetry.config index dff89c8cf4..daf0c91406 100644 --- a/telemetry/azure-vnet-telemetry.config +++ b/telemetry/azure-vnet-telemetry.config @@ -1,3 +1,8 @@ { - "reportToHostIntervalInSeconds": 30 -} \ No newline at end of file + "reportToHostIntervalInSeconds": 30, + "BatchSizeInBytes":16384, + "BatchIntervalInSecs":15, + "RefreshTimeoutInSecs": 15, + "DisableAll": false, + "DebugMode":false +} diff --git a/telemetry/constants.go b/telemetry/constants.go new file mode 100644 index 0000000000..9fadc585d6 --- /dev/null +++ b/telemetry/constants.go @@ -0,0 +1,29 @@ +// Copyright Microsoft. All rights reserved. + +package telemetry + +const ( + + // Metric Names + CNIExecutimeMetricStr = "CNIExecutionTimeMs" + CNIAddTimeMetricStr = "CNIAddTimeMs" + CNIDelTimeMetricStr = "CNIDelTimeMs" + CNIUpdateTimeMetricStr = "CNIUpdateTimeMs" + + // Dimension Names + ContextStr = "Context" + SubContextStr = "SubContext" + VMUptimeStr = "VMUptime" + OperationTypeStr = "OperationType" + VersionStr = "Version" + StatusStr = "Status" + CNIModeStr = "CNIMode" + CNINetworkModeStr = "CNINetworkMode" + OSTypeStr = "OSType" + + // Values + SucceededStr = "Succeeded" + FailedStr = "Failed" + SingleTenancyStr = "SingleTenancy" + MultiTenancyStr = "MultiTenancy" +) diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 397017a142..a774673f5a 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -14,6 +14,7 @@ import ( "reflect" "strings" + "github.com/Azure/azure-container-networking/aitelemetry" "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" @@ -100,6 +101,10 @@ type CNIReport struct { Metadata common.Metadata `json:"compute"` } +type AIMetric struct { + Metric aitelemetry.Metric +} + // Azure CNS Telemetry Report structure. type CNSReport struct { IsNewInstance bool @@ -372,6 +377,7 @@ func (reportMgr *ReportManager) ReportToBytes() ([]byte, error) { case *NPMReport: case *DNCReport: case *CNSReport: + case *AIMetric: default: err = fmt.Errorf("[Telemetry] Invalid report type") } @@ -383,3 +389,24 @@ func (reportMgr *ReportManager) ReportToBytes() ([]byte, error) { report, err = json.Marshal(reportMgr.Report) return report, err } + +// This function for sending CNI metrics to telemetry service +func SendCNIMetric(cniMetric *AIMetric, tb *TelemetryBuffer) error { + var ( + err error + report []byte + ) + + if tb != nil && tb.Connected { + reportMgr := &ReportManager{Report: cniMetric} + report, err = reportMgr.ReportToBytes() + if err == nil { + // If write fails, try to re-establish connections as server/client + if _, err = tb.Write(report); err != nil { + tb.Cancel() + } + } + } + + return err +} diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go index 4df069ea00..463ff2301c 100644 --- a/telemetry/telemetry_test.go +++ b/telemetry/telemetry_test.go @@ -106,7 +106,7 @@ func TestMain(m *testing.M) { reportManager.Report = &CNIReport{} tb = NewTelemetryBuffer(hostAgentUrl) - err = tb.StartServer() + err = tb.StartServer(false) if err == nil { go tb.BufferAndPushData(0) } @@ -186,13 +186,6 @@ func TestSendTelemetry(t *testing.T) { } } -func TestReceiveTelemetryData(t *testing.T) { - time.Sleep(300 * time.Millisecond) - if len(tb.buffer.CNIReports) != 1 { - t.Errorf("buffer doesn't contain CNI report") - } -} - func TestCloseTelemetryConnection(t *testing.T) { tb.Cancel() time.Sleep(300 * time.Millisecond) @@ -204,7 +197,7 @@ func TestCloseTelemetryConnection(t *testing.T) { func TestServerCloseTelemetryConnection(t *testing.T) { // create server telemetrybuffer and start server tb = NewTelemetryBuffer(hostAgentUrl) - err := tb.StartServer() + err := tb.StartServer(false) if err == nil { go tb.BufferAndPushData(0) } @@ -235,7 +228,7 @@ func TestServerCloseTelemetryConnection(t *testing.T) { func TestClientCloseTelemetryConnection(t *testing.T) { // create server telemetrybuffer and start server tb = NewTelemetryBuffer(hostAgentUrl) - err := tb.StartServer() + err := tb.StartServer(false) if err == nil { go tb.BufferAndPushData(0) } diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 441f44aec3..6be313867a 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -28,6 +28,17 @@ import ( // TelemetryConfig - telemetry config read by telemetry service type TelemetryConfig struct { ReportToHostIntervalInSeconds time.Duration `json:"reportToHostIntervalInSeconds"` + DisableAll bool + DisableTrace bool + DisableMetric bool + DisableMetadataThread bool + DebugMode bool + DisableTelemetryToNetAgent bool + RefreshTimeoutInSecs int + BatchIntervalInSecs int + BatchSizeInBytes int + GetEnvRetryCount int + GetEnvRetryWaitTimeInSecs int } // FdName - file descriptor name @@ -50,7 +61,10 @@ const ( cni = "CNI" ) -var payloadSize uint16 = 0 +var ( + payloadSize uint16 = 0 + disableTelemetryToNetAgent bool +) // TelemetryBuffer object type TelemetryBuffer struct { @@ -104,7 +118,8 @@ func remove(s []net.Conn, i int) []net.Conn { } // Starts Telemetry server listening on unix domain socket -func (tb *TelemetryBuffer) StartServer() error { +func (tb *TelemetryBuffer) StartServer(disableNetAgentChannel bool) error { + disableTelemetryToNetAgent = disableNetAgentChannel err := tb.Listen(FdName) if err != nil { tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied") @@ -136,6 +151,10 @@ func (tb *TelemetryBuffer) StartServer() error { var cniReport CNIReport json.Unmarshal([]byte(reportStr), &cniReport) tb.data <- cniReport + } else if _, ok := tmp["Metric"]; ok { + var aiMetric AIMetric + json.Unmarshal([]byte(reportStr), &aiMetric) + tb.data <- aiMetric } else if _, ok := tmp["Allocations"]; ok { var dncReport DNCReport json.Unmarshal([]byte(reportStr), &dncReport) @@ -279,6 +298,10 @@ func (tb *TelemetryBuffer) Close() { // sendToHost - send buffer to host func (tb *TelemetryBuffer) sendToHost() error { + if disableTelemetryToNetAgent { + return nil + } + buf := Buffer{ DNCReports: make([]DNCReport, 0), CNIReports: make([]CNIReport, 0), @@ -443,7 +466,13 @@ func (buf *Buffer) push(x interface{}) { } cniReport := x.(CNIReport) cniReport.Metadata = metadata + SendAITelemetry(cniReport) buf.CNIReports = append(buf.CNIReports, cniReport) + + case AIMetric: + aiMetric := x.(AIMetric) + SendAIMetric(aiMetric) + case NPMReport: if len(buf.NPMReports) >= MaxNumReports { return