From 0b8d5662f0d63bc9ee324d28af3c36e928ccf24d Mon Sep 17 00:00:00 2001 From: Jaeryn Date: Tue, 15 Jun 2021 11:52:17 -0700 Subject: [PATCH] Removing telemetry processed on HostNetAgent --- cni/network/plugin/main.go | 7 +- cni/telemetry/service/telemetrymain.go | 8 +- cnms/service/networkmonitor.go | 2 +- cns/logger/heartbeat.go | 70 ------- cns/logger/log.go | 4 - cns/service/main.go | 7 - log/logger.go | 29 +-- telemetry/telemetry.go | 35 ---- telemetry/telemetry_test.go | 22 +- telemetry/telemetrybuffer.go | 277 ++----------------------- 10 files changed, 42 insertions(+), 419 deletions(-) diff --git a/cni/network/plugin/main.go b/cni/network/plugin/main.go index 7f9abdee15..c3eb35cbd2 100644 --- a/cni/network/plugin/main.go +++ b/cni/network/plugin/main.go @@ -6,12 +6,13 @@ package main import ( "encoding/json" "fmt" - "github.com/Azure/azure-container-networking/nns" "io/ioutil" "os" "reflect" "time" + "github.com/Azure/azure-container-networking/nns" + "github.com/Azure/azure-container-networking/cni" "github.com/Azure/azure-container-networking/cni/network" "github.com/Azure/azure-container-networking/common" @@ -183,7 +184,7 @@ func main() { // CNI Acquires lock if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil { log.Errorf("Failed to initialize key-value store of network plugin, err:%v.\n", err) - tb := telemetry.NewTelemetryBuffer("") + tb := telemetry.NewTelemetryBuffer() if tberr := tb.Connect(); tberr == nil { reportPluginError(reportManager, tb, err) tb.Close() @@ -211,7 +212,7 @@ func main() { // Start telemetry process if not already started. This should be done inside lock, otherwise multiple process // end up creating/killing telemetry process results in undesired state. - tb := telemetry.NewTelemetryBuffer("") + tb := telemetry.NewTelemetryBuffer() tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds) defer tb.Close() diff --git a/cni/telemetry/service/telemetrymain.go b/cni/telemetry/service/telemetrymain.go index d45b371876..5d1076c642 100644 --- a/cni/telemetry/service/telemetrymain.go +++ b/cni/telemetry/service/telemetrymain.go @@ -157,14 +157,14 @@ func main() { log.Logf("Config after setting defaults %+v", config) // Cleaning up orphan socket if present - tbtemp := telemetry.NewTelemetryBuffer("") + tbtemp := telemetry.NewTelemetryBuffer() tbtemp.Cleanup(telemetry.FdName) for { - tb = telemetry.NewTelemetryBuffer("") + tb = telemetry.NewTelemetryBuffer() log.Logf("[Telemetry] Starting telemetry server") - err = tb.StartServer(config.DisableTelemetryToNetAgent) + err = tb.StartServer() if err == nil || tb.FdExists { break } @@ -189,7 +189,7 @@ func main() { err = telemetry.CreateAITelemetryHandle(aiConfig, config.DisableAll, config.DisableTrace, config.DisableMetric) log.Printf("[Telemetry] AI Handle creation status:%v", err) log.Logf("[Telemetry] Report to host for an interval of %d seconds", config.ReportToHostIntervalInSeconds) - tb.BufferAndPushData(config.ReportToHostIntervalInSeconds * time.Second) + tb.PushData() telemetry.CloseAITelemetryHandle() log.Close() diff --git a/cnms/service/networkmonitor.go b/cnms/service/networkmonitor.go index 57069075c6..87f2c9f904 100644 --- a/cnms/service/networkmonitor.go +++ b/cnms/service/networkmonitor.go @@ -135,7 +135,7 @@ func main() { CNIReport: reportManager.Report.(*telemetry.CNIReport), } - tb := telemetry.NewTelemetryBuffer("") + tb := telemetry.NewTelemetryBuffer() tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds) defer tb.Close() diff --git a/cns/logger/heartbeat.go b/cns/logger/heartbeat.go index 15ee429f23..bd028cf051 100644 --- a/cns/logger/heartbeat.go +++ b/cns/logger/heartbeat.go @@ -4,29 +4,11 @@ package logger import ( - "reflect" - "regexp" "time" "github.com/Azure/azure-container-networking/aitelemetry" - "github.com/Azure/azure-container-networking/log" - "github.com/Azure/azure-container-networking/platform" - "github.com/Azure/azure-container-networking/telemetry" - "github.com/google/uuid" ) -const ( - // CNSTelemetryFile - telemetry file path. - cnsTelemetryFile = platform.CNSRuntimePath + "AzureCNSTelemetry.json" - errorcodePrefix = 5 - heartbeatIntervalInMinutes = 30 - retryWaitTimeInSeconds = 60 - telemetryNumRetries = 5 - telemetryWaitTimeInMilliseconds = 200 -) - -var codeRegex = regexp.MustCompile(`Code:(\w*)`) - func SendHeartBeat(heartbeatIntervalInMins int, stopheartbeat chan bool) { heartbeat := time.NewTicker(time.Minute * time.Duration(heartbeatIntervalInMins)).C metric := aitelemetry.Metric{ @@ -44,55 +26,3 @@ func SendHeartBeat(heartbeatIntervalInMins int, stopheartbeat chan bool) { } } } - -// SendCnsTelemetry - handles cns telemetry reports -func SendToTelemetryService(reports chan interface{}, telemetryStopProcessing chan bool) { - -CONNECT: - tb := telemetry.NewTelemetryBuffer("") - tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds) - if tb.Connected { - - reportMgr := telemetry.ReportManager{ - ContentType: telemetry.ContentType, - Report: &telemetry.CNSReport{}, - } - - reportMgr.GetReportState(cnsTelemetryFile) - reportMgr.GetKernelVersion() - - for { - select { - case msg := <-reports: - codeStr := codeRegex.FindString(msg.(string)) - if len(codeStr) > errorcodePrefix { - reflect.ValueOf(reportMgr.Report).Elem().FieldByName("Errorcode").SetString(codeStr[errorcodePrefix:]) - } - - reflect.ValueOf(reportMgr.Report).Elem().FieldByName("EventMessage").SetString(msg.(string)) - case <-telemetryStopProcessing: - tb.Close() - return - } - - reflect.ValueOf(reportMgr.Report).Elem().FieldByName("Timestamp").SetString(time.Now().UTC().String()) - if id, err := uuid.NewUUID(); err == nil { - reflect.ValueOf(reportMgr.Report).Elem().FieldByName("UUID").SetString(id.String()) - } - - if !reportMgr.GetReportState(cnsTelemetryFile) { - reportMgr.SetReportState(cnsTelemetryFile) - } - - report, err := reportMgr.ReportToBytes() - if err == nil { - // If write fails, try to re-establish connections as server/client - if _, err = tb.Write(report); err != nil { - log.Logf("[CNS-Telemetry] Telemetry write failed: %v", err) - tb.Close() - goto CONNECT - } - } - } - } -} diff --git a/cns/logger/log.go b/cns/logger/log.go index 306403b5a6..d10df70b36 100644 --- a/cns/logger/log.go +++ b/cns/logger/log.go @@ -51,10 +51,6 @@ func InitAI(aiConfig aitelemetry.AIConfig, disableTraceLogging, disableMetricLog Log.DisableEventLogging = disableEventLogging } -func InitReportChannel(reports chan interface{}) { - Log.logger.SetChannel(reports) -} - // Close CNS and AI telemetry handle func Close() { Log.logger.Close() diff --git a/cns/service/main.go b/cns/service/main.go index fff500df7d..939d600392 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -57,9 +57,6 @@ const ( // Version is populated by make during build. var version string - -// Reports channel -var reports = make(chan interface{}) var telemetryStopProcessing = make(chan bool) var stopheartbeat = make(chan bool) var stopSnapshots = make(chan bool) @@ -436,7 +433,6 @@ func main() { } logger.InitAI(aiConfig, ts.DisableTrace, ts.DisableMetric, ts.DisableEvent) - logger.InitReportChannel(reports) } // Log platform information. @@ -537,7 +533,6 @@ func main() { } if !disableTelemetry { - go logger.SendToTelemetryService(reports, telemetryStopProcessing) go logger.SendHeartBeat(cnsconfig.TelemetrySettings.HeartBeatIntervalInMins, stopheartbeat) go httpRestService.SendNCSnapShotPeriodically(cnsconfig.TelemetrySettings.SnapshotIntervalInMins, stopSnapshots) } @@ -846,8 +841,6 @@ func InitializeCRDState(httpRestService cns.HTTPService, cnsconfig configuration <-time.NewTicker(cnsconfig.SyncHostNCVersionIntervalMs * time.Millisecond).C httpRestServiceImplementation.SyncHostNCVersion(rootCxt, cnsconfig.ChannelMode, cnsconfig.SyncHostNCTimeoutMs) } - - logger.Printf("[Azure CNS] Exiting SyncHostNCVersion") }() return nil diff --git a/log/logger.go b/log/logger.go index fa73043cc1..485db76841 100644 --- a/log/logger.go +++ b/log/logger.go @@ -53,7 +53,6 @@ type Logger struct { maxFileCount int callCount int directory string - reports chan interface{} mutex *sync.Mutex } @@ -91,11 +90,6 @@ func (logger *Logger) SetLogFileLimits(maxFileSize int, maxFileCount int) { logger.maxFileCount = maxFileCount } -// SetChannel sets the channel for error message reports. -func (logger *Logger) SetChannel(reports chan interface{}) { - logger.reports = reports -} - // Close closes the log stream. func (logger *Logger) Close() { if logger.out != nil { @@ -223,14 +217,7 @@ func (logger *Logger) Printf(format string, args ...interface{}) { return } - logger.mutex.Lock() - logger.logf(format, args...) - logger.mutex.Unlock() - go func() { - if logger.reports != nil { - logger.reports <- fmt.Sprintf(format, args...) - } - }() + logger.Logf(format, args...) } // Debugf logs a formatted string at info level. @@ -239,22 +226,10 @@ func (logger *Logger) Debugf(format string, args ...interface{}) { return } - logger.mutex.Lock() - logger.logf(format, args...) - logger.mutex.Unlock() - go func() { - if logger.reports != nil { - logger.reports <- fmt.Sprintf(format, args...) - } - }() + logger.Logf(format, args...) } // Errorf logs a formatted string at info level and sends the string to TelemetryBuffer. func (logger *Logger) Errorf(format string, args ...interface{}) { logger.Logf(format, args...) - go func() { - if logger.reports != nil { - logger.reports <- fmt.Sprintf(format, args...) - } - }() } diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index a774673f5a..0f37a1cf30 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -105,20 +105,6 @@ type AIMetric struct { Metric aitelemetry.Metric } -// Azure CNS Telemetry Report structure. -type CNSReport struct { - IsNewInstance bool - CPUUsage string - MemoryUsage string - Processes string - EventMessage string - DncPartitionKey string - Timestamp string - UUID string - Errorcode string - Metadata common.Metadata `json:"compute"` -} - // ClusterState contains the current kubernetes cluster state. type ClusterState struct { PodCount int @@ -142,24 +128,6 @@ type NPMReport struct { Metadata common.Metadata `json:"compute"` } -// DNCReport structure. -type DNCReport struct { - IsNewInstance bool - CPUUsage string - MemoryUsage string - Processes string - EventMessage string - PartitionKey string - Allocations string - Timestamp string - NumberOfNodes int - NumberOfNCs int - Orchestrator string - ContainerType string - Errorcode string - Metadata common.Metadata `json:"compute"` -} - // ReportManager structure. type ReportManager struct { HostNetAgentURL string @@ -374,9 +342,6 @@ func (reportMgr *ReportManager) ReportToBytes() ([]byte, error) { switch reportMgr.Report.(type) { case *CNIReport: - case *NPMReport: - case *DNCReport: - case *CNSReport: case *AIMetric: default: err = fmt.Errorf("[Telemetry] Invalid report type") diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go index ff865692c0..397f3ae6ce 100644 --- a/telemetry/telemetry_test.go +++ b/telemetry/telemetry_test.go @@ -105,10 +105,10 @@ func TestMain(m *testing.M) { reportManager.ContentType = "application/json" reportManager.Report = &CNIReport{} - tb = NewTelemetryBuffer(hostAgentUrl) - err = tb.StartServer(false) + tb = NewTelemetryBuffer() + err = tb.StartServer() if err == nil { - go tb.BufferAndPushData(0) + go tb.PushData() } if err := tb.Connect(); err != nil { @@ -200,14 +200,14 @@ func TestCloseTelemetryConnection(t *testing.T) { func TestServerCloseTelemetryConnection(t *testing.T) { // create server telemetrybuffer and start server - tb = NewTelemetryBuffer(hostAgentUrl) - err := tb.StartServer(false) + tb = NewTelemetryBuffer() + err := tb.StartServer() if err == nil { - go tb.BufferAndPushData(0) + go tb.PushData() } // create client telemetrybuffer and connect to server - tb1 := NewTelemetryBuffer(hostAgentUrl) + tb1 := NewTelemetryBuffer() if err := tb1.Connect(); err != nil { t.Errorf("connection to telemetry server failed %v", err) } @@ -233,10 +233,10 @@ func TestServerCloseTelemetryConnection(t *testing.T) { func TestClientCloseTelemetryConnection(t *testing.T) { // create server telemetrybuffer and start server - tb = NewTelemetryBuffer(hostAgentUrl) - err := tb.StartServer(false) + tb = NewTelemetryBuffer() + err := tb.StartServer() if err == nil { - go tb.BufferAndPushData(0) + go tb.PushData() } if !SockExists() { @@ -244,7 +244,7 @@ func TestClientCloseTelemetryConnection(t *testing.T) { } // create client telemetrybuffer and connect to server - tb1 := NewTelemetryBuffer(hostAgentUrl) + tb1 := NewTelemetryBuffer() if err := tb1.Connect(); err != nil { t.Errorf("connection to telemetry server failed %v", err) } diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 69d4bf73d7..af941d2bf3 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -5,16 +5,12 @@ package telemetry import ( "bufio" - "bytes" "encoding/json" "fmt" "io/ioutil" - "math/rand" "net" - "net/http" "os" "path/filepath" - "reflect" "strings" "sync" "time" @@ -43,27 +39,12 @@ type TelemetryConfig struct { // FdName - file descriptor name // Delimiter - delimiter for socket reads/writes -// azureHostReportURL - host net agent url of type buffer -// DefaultInterval - default interval for sending buffer to host -// logName - telemetry log name // MaxPayloadSize - max buffer size in bytes const ( - FdName = "azure-vnet-telemetry" - Delimiter = '\n' - azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload" - minInterval = 10 * time.Second - logName = "azure-vnet-telemetry" - MaxPayloadSize = 4096 - MaxNumReports = 1000 - dnc = "DNC" - cns = "CNS" - npm = "NPM" - cni = "CNI" -) - -var ( - payloadSize uint16 = 0 - disableTelemetryToNetAgent bool + FdName = "azure-vnet-telemetry" + Delimiter = '\n' + MaxPayloadSize = 4096 + MaxNumReports = 1000 ) // TelemetryBuffer object @@ -72,7 +53,6 @@ type TelemetryBuffer struct { listener net.Listener connections []net.Conn azureHostReportURL string - buffer Buffer FdExists bool Connected bool data chan interface{} @@ -82,27 +62,16 @@ type TelemetryBuffer struct { // Buffer object holds the different types of reports type Buffer struct { - DNCReports []DNCReport CNIReports []CNIReport - NPMReports []NPMReport - CNSReports []CNSReport } // NewTelemetryBuffer - create a new TelemetryBuffer -func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer { +func NewTelemetryBuffer() *TelemetryBuffer { var tb TelemetryBuffer - if hostReportURL == "" { - tb.azureHostReportURL = azureHostReportURL - } - tb.data = make(chan interface{}, MaxNumReports) tb.cancel = make(chan bool, 1) tb.connections = make([]net.Conn, 0) - tb.buffer.DNCReports = make([]DNCReport, 0, MaxNumReports) - tb.buffer.CNIReports = make([]CNIReport, 0, MaxNumReports) - tb.buffer.NPMReports = make([]NPMReport, 0, MaxNumReports) - tb.buffer.CNSReports = make([]CNSReport, 0, MaxNumReports) return &tb } @@ -118,8 +87,7 @@ func remove(s []net.Conn, i int) []net.Conn { } // Starts Telemetry server listening on unix domain socket -func (tb *TelemetryBuffer) StartServer(disableNetAgentChannel bool) error { - disableTelemetryToNetAgent = disableNetAgentChannel +func (tb *TelemetryBuffer) StartServer() error { err := tb.Listen(FdName) if err != nil { tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied") @@ -143,11 +111,7 @@ func (tb *TelemetryBuffer) StartServer(disableNetAgentChannel bool) error { if err == nil { var tmp map[string]interface{} json.Unmarshal(reportStr, &tmp) - if _, ok := tmp["NpmVersion"]; ok { - var npmReport NPMReport - json.Unmarshal([]byte(reportStr), &npmReport) - tb.data <- npmReport - } else if _, ok := tmp["CniSucceeded"]; ok { + if _, ok := tmp["CniSucceeded"]; ok { var cniReport CNIReport json.Unmarshal([]byte(reportStr), &cniReport) tb.data <- cniReport @@ -155,14 +119,6 @@ func (tb *TelemetryBuffer) StartServer(disableNetAgentChannel bool) error { var aiMetric AIMetric json.Unmarshal([]byte(reportStr), &aiMetric) tb.data <- aiMetric - } else if _, ok := tmp["Allocations"]; ok { - var dncReport DNCReport - json.Unmarshal([]byte(reportStr), &dncReport) - tb.data <- dncReport - } else if _, ok := tmp["DncPartitionKey"]; ok { - var cnsReport CNSReport - json.Unmarshal([]byte(reportStr), &cnsReport) - tb.data <- cnsReport } } else { var index int @@ -209,39 +165,22 @@ func (tb *TelemetryBuffer) Connect() error { return err } -// BufferAndPushData - BufferAndPushData running an instance if it isn't already being run elsewhere -func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) { - defer tb.Close() - if !tb.FdExists { - log.Logf("[Telemetry] Buffer telemetry data and send it to host") - if intervalms < minInterval { - intervalms = minInterval - } - - interval := time.NewTicker(intervalms).C - for { - select { - case <-interval: - // Send buffer to host and clear cache when sent successfully - // To-do : if we hit max slice size in buffer, write to disk and process the logs on disk on future sends - tb.mutex.Lock() - tb.sendToHost() - tb.mutex.Unlock() - case report := <-tb.data: - tb.mutex.Lock() - tb.buffer.push(report) - tb.mutex.Unlock() - case <-tb.cancel: - log.Logf("[Telemetry] server cancel event") - goto EXIT - } +// PushData - PushData running an instance if it isn't already being run elsewhere +func (tb *TelemetryBuffer) PushData() { + for { + select { + case report := <-tb.data: + tb.mutex.Lock() + push(report) + tb.mutex.Unlock() + case <-tb.cancel: + log.Logf("[Telemetry] server cancel event") + goto EXIT } - } else { - <-tb.cancel - log.Logf("[Telemetry] Received cancel event") } EXIT: + tb.Close() } // read - read from the file descriptor @@ -296,145 +235,8 @@ func (tb *TelemetryBuffer) Close() { tb.connections = make([]net.Conn, 0) } -// sendToHost - send buffer to host -func (tb *TelemetryBuffer) sendToHost() error { - if disableTelemetryToNetAgent { - return nil - } - - buf := Buffer{ - DNCReports: make([]DNCReport, 0), - CNIReports: make([]CNIReport, 0), - NPMReports: make([]NPMReport, 0), - CNSReports: make([]CNSReport, 0), - } - - seed := rand.NewSource(time.Now().UnixNano()) - i, payloadSize, maxPayloadSizeReached := rand.New(seed).Intn(reflect.ValueOf(&buf).Elem().NumField()), 0, false - isDNCReportsEmpty, isCNIReportsEmpty, isCNSReportsEmpty, isNPMReportsEmpty := false, false, false, false - for { - // craft payload in a round-robin manner. - switch i % 4 { - case 0: - reportLen := len(tb.buffer.DNCReports) - if reportLen == 0 || isDNCReportsEmpty { - isDNCReportsEmpty = true - break - } - - if reportLen == 1 { - isDNCReportsEmpty = true - } - - report := tb.buffer.DNCReports[0] - if bytes, err := json.Marshal(report); err == nil { - payloadSize += len(bytes) - if payloadSize > MaxPayloadSize { - maxPayloadSizeReached = true - break - } - } - buf.DNCReports = append(buf.DNCReports, report) - tb.buffer.DNCReports = tb.buffer.DNCReports[1:] - case 1: - reportLen := len(tb.buffer.CNIReports) - if reportLen == 0 || isCNIReportsEmpty { - isCNIReportsEmpty = true - break - } - - if reportLen == 1 { - isCNIReportsEmpty = true - } - - report := tb.buffer.CNIReports[0] - if bytes, err := json.Marshal(report); err == nil { - payloadSize += len(bytes) - if payloadSize > MaxPayloadSize { - maxPayloadSizeReached = true - break - } - } - buf.CNIReports = append(buf.CNIReports, report) - tb.buffer.CNIReports = tb.buffer.CNIReports[1:] - case 2: - reportLen := len(tb.buffer.CNSReports) - if reportLen == 0 || isCNSReportsEmpty { - isCNSReportsEmpty = true - break - } - - if reportLen == 1 { - isCNSReportsEmpty = true - } - - report := tb.buffer.CNSReports[0] - if bytes, err := json.Marshal(report); err == nil { - payloadSize += len(bytes) - if payloadSize > MaxPayloadSize { - maxPayloadSizeReached = true - break - } - } - buf.CNSReports = append(buf.CNSReports, report) - tb.buffer.CNSReports = tb.buffer.CNSReports[1:] - case 3: - reportLen := len(tb.buffer.NPMReports) - if reportLen == 0 || isNPMReportsEmpty { - isNPMReportsEmpty = true - break - } - - if reportLen == 1 { - isNPMReportsEmpty = true - } - - report := tb.buffer.NPMReports[0] - if bytes, err := json.Marshal(report); err == nil { - payloadSize += len(bytes) - if payloadSize > MaxPayloadSize { - maxPayloadSizeReached = true - break - } - } - buf.NPMReports = append(buf.NPMReports, report) - tb.buffer.NPMReports = tb.buffer.NPMReports[1:] - } - - if isDNCReportsEmpty && isCNIReportsEmpty && isCNSReportsEmpty && isNPMReportsEmpty { - break - } - - if maxPayloadSizeReached { - break - } - - i++ - } - - httpc := &http.Client{} - var body bytes.Buffer - log.Logf("Sending buffer %+v", buf) - if err := json.NewEncoder(&body).Encode(buf); err != nil { - log.Logf("[Telemetry] Encode buffer error %v", err) - } - resp, err := httpc.Post(tb.azureHostReportURL, ContentType, &body) - log.Logf("[Telemetry] Got response %v", resp) - if err != nil { - return fmt.Errorf("[Telemetry] HTTP Post returned error %v", err) - } - - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("[Telemetry] HTTP Post returned statuscode %d", resp.StatusCode) - } - - return nil -} - // push - push the report (x) to corresponding slice -func (buf *Buffer) push(x interface{}) { +func push(x interface{}) { metadata, err := common.GetHostMetadata(metadataFile) if err != nil { log.Logf("Error getting metadata %v", err) @@ -453,56 +255,17 @@ func (buf *Buffer) push(x interface{}) { } switch x.(type) { - case DNCReport: - if len(buf.DNCReports) >= MaxNumReports { - return - } - dncReport := x.(DNCReport) - dncReport.Metadata = metadata - buf.DNCReports = append(buf.DNCReports, dncReport) case CNIReport: - if len(buf.CNIReports) >= MaxNumReports { - return - } cniReport := x.(CNIReport) cniReport.Metadata = metadata SendAITelemetry(cniReport) - buf.CNIReports = append(buf.CNIReports, cniReport) case AIMetric: aiMetric := x.(AIMetric) SendAIMetric(aiMetric) - - case NPMReport: - if len(buf.NPMReports) >= MaxNumReports { - return - } - npmReport := x.(NPMReport) - npmReport.Metadata = metadata - buf.NPMReports = append(buf.NPMReports, npmReport) - case CNSReport: - if len(buf.CNSReports) >= MaxNumReports { - return - } - cnsReport := x.(CNSReport) - cnsReport.Metadata = metadata - buf.CNSReports = append(buf.CNSReports, cnsReport) } } -// reset - reset buffer slices and sets payloadSize to 0 -func (buf *Buffer) reset() { - buf.DNCReports = nil - buf.DNCReports = make([]DNCReport, 0) - buf.CNIReports = nil - buf.CNIReports = make([]CNIReport, 0) - buf.NPMReports = nil - buf.NPMReports = make([]NPMReport, 0) - buf.CNSReports = nil - buf.CNSReports = make([]CNSReport, 0) - payloadSize = 0 -} - // WaitForTelemetrySocket - Block still pipe/sock created or until max attempts retried func WaitForTelemetrySocket(maxAttempt int, waitTimeInMillisecs time.Duration) { for attempt := 0; attempt < maxAttempt; attempt++ {