diff --git a/cni/network/network.go b/cni/network/network.go index 9cfced72c7..f134f96561 100644 --- a/cni/network/network.go +++ b/cni/network/network.go @@ -31,6 +31,7 @@ import ( cniSkel "github.com/containernetworking/cni/pkg/skel" cniTypes "github.com/containernetworking/cni/pkg/types" cniTypesCurr "github.com/containernetworking/cni/pkg/types/current" + "github.com/pkg/errors" ) const ( @@ -319,13 +320,14 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error { cnsNetworkConfig *cns.GetNetworkContainerResponse enableInfraVnet bool enableSnatForDns bool + k8sPodName string cniMetric telemetry.AIMetric ) startTime := time.Now() - log.Printf("[cni-net] Processing ADD command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v StdinData:%s}.", - args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData) + telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Processing ADD command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v StdinData:%s}.", + args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData)) // Parse network configuration from stdin. nwCfg, err := cni.ParseNetworkConfig(args.StdinData) @@ -376,7 +378,7 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error { res.Print() } - log.Printf("[cni-net] ADD command completed with result:%+v err:%v.", result, err) + log.Printf("[cni-net] ADD command completed for pod %v with result:%+v err:%v.", k8sPodName, result, err) }() // Parse Pod arguments. @@ -517,16 +519,18 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error { }() } + telemetry.SendCNIEvent(plugin.tb, fmt.Sprintf("Allocated IPAddress from ipam:%+v v6:%+v", result, resultV6)) + // Create network if nwInfoErr != nil { // Network does not exist. - log.Printf("[cni-net] Creating network %v.", networkID) + telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Creating network %v.", networkID)) if nwInfo, err = plugin.createNetworkInternal(networkID, policies, args, nwCfg, cnsNetworkConfig, subnetPrefix, result, resultV6); err != nil { log.Errorf("Create network failed:%w", err) return err } - log.Printf("[cni-net] Created network %v with subnet %v.", networkID, subnetPrefix.String()) + telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Created network %v with subnet %v.", networkID, subnetPrefix.String())) } natInfo := getNATInfo(nwCfg.ExecutionMode, options[network.SNATIPKey], nwCfg.MultiTenancy, enableSnatForDns) @@ -553,10 +557,8 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error { return err } - msg := fmt.Sprintf("CNI ADD succeeded : CNI Version %+v, IP:%+v, VlanID: %v, Interfaces:%+v, podname %v, namespace %v", - result.CNIVersion, result.IPs, epInfo.Data[network.VlanIDKey], result.Interfaces, k8sPodName, k8sNamespace) - plugin.setCNIReportDetails(nwCfg, CNI_ADD, msg) - + telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("CNI ADD succeeded : IP:%+v, VlanID: %v, podname %v, namespace %v numendpoints:%d", + result.IPs, epInfo.Data[network.VlanIDKey], k8sPodName, k8sNamespace, plugin.nm.GetNumberOfEndpoints("", nwCfg.Name))) return nil } @@ -777,6 +779,7 @@ func (plugin *NetPlugin) createEndpointInternal(opt *createEndpointInternalOpt) } // Create the endpoint. + telemetry.SendCNIEvent(plugin.tb, fmt.Sprintf("[cni-net] Creating endpoint %+v.", epInfo)) log.Printf("[cni-net] Creating endpoint %v.", epInfo.Id) err = plugin.nm.CreateEndpoint(cnsclient, opt.nwInfo.Id, &epInfo) if err != nil { @@ -890,7 +893,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error { nwCfg *cni.NetworkConfig k8sPodName string k8sNamespace string - networkId string + networkID string nwInfo network.NetworkInfo epInfo *network.EndpointInfo cniMetric telemetry.AIMetric @@ -899,11 +902,11 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error { startTime := time.Now() - log.Printf("[cni-net] Processing DEL command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v, StdinData:%s}.", - args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData) + telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Processing DEL command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v, StdinData:%s}.", + args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData)) defer func() { - log.Printf("[cni-net] DEL command completed with err:%v.", err) + log.Printf("[cni-net] DEL command completed for pod %v with err:%v.", k8sPodName, err) }() // Parse network configuration from stdin. @@ -948,10 +951,10 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error { switch nwCfg.Ipam.Type { case network.AzureCNS: cnsURL := "http://localhost:" + strconv.Itoa(cnsPort) - cnsClient, er := cnscli.New(cnsURL, defaultRequestTimeout) - if err != nil { - log.Printf("[cni-net] failed to create cns client", networkId, err) - return fmt.Errorf("ailed to create cns client with err %w", er) + cnsClient, cnsErr := cnscli.New(cnsURL, defaultRequestTimeout) + if cnsErr != nil { + log.Printf("[cni-net] failed to create cns client:%v", cnsErr) + return errors.Wrap(cnsErr, "failed to create cns client") } plugin.ipamInvoker = NewCNSInvoker(k8sPodName, k8sNamespace, cnsClient) @@ -960,7 +963,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error { } } // Initialize values from network config. - networkId, err = plugin.getNetworkName(k8sPodName, k8sNamespace, args.IfName, nwCfg) + networkID, err = plugin.getNetworkName(k8sPodName, k8sNamespace, args.IfName, nwCfg) // If error is not found error, then we ignore it, to comply with CNI SPEC. if err != nil { @@ -972,40 +975,40 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error { } } - endpointId := GetEndpointID(args) - // Query the network. - if nwInfo, err = plugin.nm.GetNetworkInfo(networkId); err != nil { + if nwInfo, err = plugin.nm.GetNetworkInfo(networkID); err != nil { if !nwCfg.MultiTenancy { // attempt to release address associated with this Endpoint id // This is to ensure clean up is done even in failure cases + log.Errorf("[cni-net] Failed to query network: %v", err) + telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("Release ip by ContainerID (network not found):%v", args.ContainerID)) err = plugin.ipamInvoker.Delete(nil, nwCfg, args, nwInfo.Options) if err != nil { - log.Printf("Network not found, attempted to release address with error: %v", err) + log.Printf("Attempted to release address by containerID failed with error: %v", err) } } // Log the error but return success if the endpoint being deleted is not found. - plugin.Errorf("[cni-net] Failed to query network: %v", err) err = nil return err } + endpointID := GetEndpointID(args) // Query the endpoint. - if epInfo, err = plugin.nm.GetEndpointInfo(networkId, endpointId); err != nil { + if epInfo, err = plugin.nm.GetEndpointInfo(networkID, endpointID); err != nil { if !nwCfg.MultiTenancy { // attempt to release address associated with this Endpoint id // This is to ensure clean up is done even in failure cases - log.Printf("release ip ep not found") + log.Printf("[cni-net] Failed to query endpoint: %v", err) + telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("Release ip by ContainerID (endpoint not found):%v", args.ContainerID)) if err = plugin.ipamInvoker.Delete(nil, nwCfg, args, nwInfo.Options); err != nil { - log.Printf("Endpoint not found, attempted to release address with error: %v", err) + log.Printf("Attempted to release address by containerID failed with error: %v", err) } } // Log the error but return success if the endpoint being deleted is not found. - plugin.Errorf("[cni-net] Failed to query endpoint: %v", err) err = nil return err } @@ -1018,8 +1021,9 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error { // schedule send metric before attempting delete defer sendMetricFunc() + telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("Deleting endpoint:%v", endpointID)) // Delete the endpoint. - if err = plugin.nm.DeleteEndpoint(cnsclient, networkId, endpointId); err != nil { + if err = plugin.nm.DeleteEndpoint(cnsclient, networkID, endpointID); err != nil { err = plugin.Errorf("Failed to delete endpoint: %v", err) return err } @@ -1028,7 +1032,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error { log.Printf("epinfo:%+v", epInfo) // Call into IPAM plugin to release the endpoint's addresses. for _, address := range epInfo.IPAddresses { - log.Printf("release ip:%s", address.IP.String()) + telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("Release ip:%s", address.IP.String())) err = plugin.ipamInvoker.Delete(&address, nwCfg, args, nwInfo.Options) if err != nil { err = plugin.Errorf("Failed to release address %v with error: %v", address, err) @@ -1045,8 +1049,8 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error { } } - msg = fmt.Sprintf("CNI DEL succeeded : Released ip %+v podname %v namespace %v", nwCfg.Ipam.Address, k8sPodName, k8sNamespace) plugin.setCNIReportDetails(nwCfg, CNI_DEL, msg) + telemetry.SendCNIEvent(plugin.tb, fmt.Sprintf("CNI DEL succeeded : Released ip %+v podname %v namespace %v", nwCfg.Ipam.Address, k8sPodName, k8sNamespace)) return err } diff --git a/cni/network/plugin/main.go b/cni/network/plugin/main.go index 5a368a9531..7160334fbd 100644 --- a/cni/network/plugin/main.go +++ b/cni/network/plugin/main.go @@ -129,8 +129,6 @@ func handleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) { // Main is the entry point for CNI network plugin. func main() { - startTime := time.Now() - // Initialize and parse command line arguments. common.ParseArgs(&args, printVersion) vers := common.GetArg(common.OptVersion).(bool) @@ -281,27 +279,8 @@ func main() { netPlugin.Stop() - // release cni lock - if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(); errUninit != nil { - log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit) - } - - executionTimeMs := time.Since(startTime).Milliseconds() - if err != nil { reportPluginError(reportManager, tb, err) panic("network plugin execute fatal error") } - - // Report CNI successfully finished execution. - reflect.ValueOf(reportManager.Report).Elem().FieldByName("CniSucceeded").SetBool(true) - reflect.ValueOf(reportManager.Report).Elem().FieldByName("OperationDuration").SetInt(executionTimeMs) - - if cniReport.ErrorMessage != "" || cniReport.EventMessage != "" { - if err = reportManager.SendReport(tb); err != nil { - log.Errorf("SendReport failed due to %v", err) - } else { - log.Printf("Sending report succeeded") - } - } } diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 223d1fbbff..6e53d222bd 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -5,6 +5,8 @@ package telemetry import ( "encoding/json" + "fmt" + "os" "github.com/Azure/azure-container-networking/aitelemetry" "github.com/Azure/azure-container-networking/common" @@ -154,3 +156,30 @@ func SendCNIMetric(cniMetric *AIMetric, tb *TelemetryBuffer) error { return err } + +// This function for sending CNI metrics to telemetry service +func LogAndSendEvent(tb *TelemetryBuffer, msg string) { + log.Printf(msg) + SendCNIEvent(tb, msg) +} + +func SendCNIEvent(tb *TelemetryBuffer, msg string) { + var err error + var report []byte + + eventMsg := fmt.Sprintf("[%d] %s", os.Getpid(), msg) + cniReport := &CNIReport{ + EventMessage: eventMsg, + } + if tb != nil && tb.Connected { + reportMgr := &ReportManager{Report: cniReport} + report, err = reportMgr.ReportToBytes() + if err == nil { + // If write fails, try to re-establish connections as server/client + if _, err = tb.Write(report); err != nil { + log.Printf("Error writing to telemetry socket:%v", err) + tb.Cancel() + } + } + } +}