Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 34 additions & 30 deletions cni/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
cniSkel "github.com/containernetworking/cni/pkg/skel"
cniTypes "github.com/containernetworking/cni/pkg/types"
cniTypesCurr "github.com/containernetworking/cni/pkg/types/current"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -319,13 +320,14 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
cnsNetworkConfig *cns.GetNetworkContainerResponse
enableInfraVnet bool
enableSnatForDns bool
k8sPodName string
cniMetric telemetry.AIMetric
)

startTime := time.Now()

log.Printf("[cni-net] Processing ADD command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v StdinData:%s}.",
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData)
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Processing ADD command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v StdinData:%s}.",
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData))

// Parse network configuration from stdin.
nwCfg, err := cni.ParseNetworkConfig(args.StdinData)
Expand Down Expand Up @@ -376,7 +378,7 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
res.Print()
}

log.Printf("[cni-net] ADD command completed with result:%+v err:%v.", result, err)
log.Printf("[cni-net] ADD command completed for pod %v with result:%+v err:%v.", k8sPodName, result, err)
}()

// Parse Pod arguments.
Expand Down Expand Up @@ -517,16 +519,18 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
}()
}

telemetry.SendCNIEvent(plugin.tb, fmt.Sprintf("Allocated IPAddress from ipam:%+v v6:%+v", result, resultV6))

// Create network
if nwInfoErr != nil {
// Network does not exist.
log.Printf("[cni-net] Creating network %v.", networkID)
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Creating network %v.", networkID))
if nwInfo, err = plugin.createNetworkInternal(networkID, policies, args, nwCfg, cnsNetworkConfig, subnetPrefix, result, resultV6); err != nil {
log.Errorf("Create network failed:%w", err)
return err
}

log.Printf("[cni-net] Created network %v with subnet %v.", networkID, subnetPrefix.String())
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Created network %v with subnet %v.", networkID, subnetPrefix.String()))
}

natInfo := getNATInfo(nwCfg.ExecutionMode, options[network.SNATIPKey], nwCfg.MultiTenancy, enableSnatForDns)
Expand All @@ -553,10 +557,8 @@ func (plugin *NetPlugin) Add(args *cniSkel.CmdArgs) error {
return err
}

msg := fmt.Sprintf("CNI ADD succeeded : CNI Version %+v, IP:%+v, VlanID: %v, Interfaces:%+v, podname %v, namespace %v",
result.CNIVersion, result.IPs, epInfo.Data[network.VlanIDKey], result.Interfaces, k8sPodName, k8sNamespace)
plugin.setCNIReportDetails(nwCfg, CNI_ADD, msg)

telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("CNI ADD succeeded : IP:%+v, VlanID: %v, podname %v, namespace %v numendpoints:%d",
result.IPs, epInfo.Data[network.VlanIDKey], k8sPodName, k8sNamespace, plugin.nm.GetNumberOfEndpoints("", nwCfg.Name)))
return nil
}

Expand Down Expand Up @@ -777,6 +779,7 @@ func (plugin *NetPlugin) createEndpointInternal(opt *createEndpointInternalOpt)
}

// Create the endpoint.
telemetry.SendCNIEvent(plugin.tb, fmt.Sprintf("[cni-net] Creating endpoint %+v.", epInfo))
log.Printf("[cni-net] Creating endpoint %v.", epInfo.Id)
err = plugin.nm.CreateEndpoint(cnsclient, opt.nwInfo.Id, &epInfo)
if err != nil {
Expand Down Expand Up @@ -890,7 +893,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
nwCfg *cni.NetworkConfig
k8sPodName string
k8sNamespace string
networkId string
networkID string
nwInfo network.NetworkInfo
epInfo *network.EndpointInfo
cniMetric telemetry.AIMetric
Expand All @@ -899,11 +902,11 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {

startTime := time.Now()

log.Printf("[cni-net] Processing DEL command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v, StdinData:%s}.",
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData)
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("[cni-net] Processing DEL command with args {ContainerID:%v Netns:%v IfName:%v Args:%v Path:%v, StdinData:%s}.",
args.ContainerID, args.Netns, args.IfName, args.Args, args.Path, args.StdinData))

defer func() {
log.Printf("[cni-net] DEL command completed with err:%v.", err)
log.Printf("[cni-net] DEL command completed for pod %v with err:%v.", k8sPodName, err)
}()

// Parse network configuration from stdin.
Expand Down Expand Up @@ -948,10 +951,10 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
switch nwCfg.Ipam.Type {
case network.AzureCNS:
cnsURL := "http://localhost:" + strconv.Itoa(cnsPort)
cnsClient, er := cnscli.New(cnsURL, defaultRequestTimeout)
if err != nil {
log.Printf("[cni-net] failed to create cns client", networkId, err)
return fmt.Errorf("ailed to create cns client with err %w", er)
cnsClient, cnsErr := cnscli.New(cnsURL, defaultRequestTimeout)
if cnsErr != nil {
log.Printf("[cni-net] failed to create cns client:%v", cnsErr)
return errors.Wrap(cnsErr, "failed to create cns client")
}
plugin.ipamInvoker = NewCNSInvoker(k8sPodName, k8sNamespace, cnsClient)

Expand All @@ -960,7 +963,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
}
}
// Initialize values from network config.
networkId, err = plugin.getNetworkName(k8sPodName, k8sNamespace, args.IfName, nwCfg)
networkID, err = plugin.getNetworkName(k8sPodName, k8sNamespace, args.IfName, nwCfg)

// If error is not found error, then we ignore it, to comply with CNI SPEC.
if err != nil {
Expand All @@ -972,40 +975,40 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
}
}

endpointId := GetEndpointID(args)

// Query the network.
if nwInfo, err = plugin.nm.GetNetworkInfo(networkId); err != nil {
if nwInfo, err = plugin.nm.GetNetworkInfo(networkID); err != nil {

if !nwCfg.MultiTenancy {
// attempt to release address associated with this Endpoint id
// This is to ensure clean up is done even in failure cases
log.Errorf("[cni-net] Failed to query network: %v", err)
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("Release ip by ContainerID (network not found):%v", args.ContainerID))
err = plugin.ipamInvoker.Delete(nil, nwCfg, args, nwInfo.Options)
if err != nil {
log.Printf("Network not found, attempted to release address with error: %v", err)
log.Printf("Attempted to release address by containerID failed with error: %v", err)
}
}

// Log the error but return success if the endpoint being deleted is not found.
plugin.Errorf("[cni-net] Failed to query network: %v", err)
err = nil
return err
}

endpointID := GetEndpointID(args)
// Query the endpoint.
if epInfo, err = plugin.nm.GetEndpointInfo(networkId, endpointId); err != nil {
if epInfo, err = plugin.nm.GetEndpointInfo(networkID, endpointID); err != nil {

if !nwCfg.MultiTenancy {
// attempt to release address associated with this Endpoint id
// This is to ensure clean up is done even in failure cases
log.Printf("release ip ep not found")
log.Printf("[cni-net] Failed to query endpoint: %v", err)
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("Release ip by ContainerID (endpoint not found):%v", args.ContainerID))
if err = plugin.ipamInvoker.Delete(nil, nwCfg, args, nwInfo.Options); err != nil {
log.Printf("Endpoint not found, attempted to release address with error: %v", err)
log.Printf("Attempted to release address by containerID failed with error: %v", err)
}
}

// Log the error but return success if the endpoint being deleted is not found.
plugin.Errorf("[cni-net] Failed to query endpoint: %v", err)
err = nil
return err
}
Expand All @@ -1018,8 +1021,9 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {

// schedule send metric before attempting delete
defer sendMetricFunc()
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("Deleting endpoint:%v", endpointID))
// Delete the endpoint.
if err = plugin.nm.DeleteEndpoint(cnsclient, networkId, endpointId); err != nil {
if err = plugin.nm.DeleteEndpoint(cnsclient, networkID, endpointID); err != nil {
err = plugin.Errorf("Failed to delete endpoint: %v", err)
return err
}
Expand All @@ -1028,7 +1032,7 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
log.Printf("epinfo:%+v", epInfo)
// Call into IPAM plugin to release the endpoint's addresses.
for _, address := range epInfo.IPAddresses {
log.Printf("release ip:%s", address.IP.String())
telemetry.LogAndSendEvent(plugin.tb, fmt.Sprintf("Release ip:%s", address.IP.String()))
err = plugin.ipamInvoker.Delete(&address, nwCfg, args, nwInfo.Options)
if err != nil {
err = plugin.Errorf("Failed to release address %v with error: %v", address, err)
Expand All @@ -1045,8 +1049,8 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
}
}

msg = fmt.Sprintf("CNI DEL succeeded : Released ip %+v podname %v namespace %v", nwCfg.Ipam.Address, k8sPodName, k8sNamespace)
plugin.setCNIReportDetails(nwCfg, CNI_DEL, msg)
telemetry.SendCNIEvent(plugin.tb, fmt.Sprintf("CNI DEL succeeded : Released ip %+v podname %v namespace %v", nwCfg.Ipam.Address, k8sPodName, k8sNamespace))

return err
}
Expand Down
21 changes: 0 additions & 21 deletions cni/network/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ func handleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) {

// Main is the entry point for CNI network plugin.
func main() {
startTime := time.Now()

// Initialize and parse command line arguments.
common.ParseArgs(&args, printVersion)
vers := common.GetArg(common.OptVersion).(bool)
Expand Down Expand Up @@ -281,27 +279,8 @@ func main() {

netPlugin.Stop()

// release cni lock
if errUninit := netPlugin.Plugin.UninitializeKeyValueStore(); errUninit != nil {
log.Errorf("Failed to uninitialize key-value store of network plugin, err:%v.\n", errUninit)
}

executionTimeMs := time.Since(startTime).Milliseconds()

if err != nil {
reportPluginError(reportManager, tb, err)
panic("network plugin execute fatal error")
}

// Report CNI successfully finished execution.
reflect.ValueOf(reportManager.Report).Elem().FieldByName("CniSucceeded").SetBool(true)
reflect.ValueOf(reportManager.Report).Elem().FieldByName("OperationDuration").SetInt(executionTimeMs)

if cniReport.ErrorMessage != "" || cniReport.EventMessage != "" {
if err = reportManager.SendReport(tb); err != nil {
log.Errorf("SendReport failed due to %v", err)
} else {
log.Printf("Sending report succeeded")
}
}
}
29 changes: 29 additions & 0 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package telemetry

import (
"encoding/json"
"fmt"
"os"

"github.com/Azure/azure-container-networking/aitelemetry"
"github.com/Azure/azure-container-networking/common"
Expand Down Expand Up @@ -154,3 +156,30 @@ func SendCNIMetric(cniMetric *AIMetric, tb *TelemetryBuffer) error {

return err
}

// This function for sending CNI metrics to telemetry service
func LogAndSendEvent(tb *TelemetryBuffer, msg string) {
log.Printf(msg)
SendCNIEvent(tb, msg)
}

func SendCNIEvent(tb *TelemetryBuffer, msg string) {
var err error
var report []byte

eventMsg := fmt.Sprintf("[%d] %s", os.Getpid(), msg)
cniReport := &CNIReport{
EventMessage: eventMsg,
}
if tb != nil && tb.Connected {
reportMgr := &ReportManager{Report: cniReport}
report, err = reportMgr.ReportToBytes()
if err == nil {
// If write fails, try to re-establish connections as server/client
if _, err = tb.Write(report); err != nil {
log.Printf("Error writing to telemetry socket:%v", err)
tb.Cancel()
}
}
}
}