diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go index 0223fa71ce..3103e7e49c 100644 --- a/telemetry/telemetry_test.go +++ b/telemetry/telemetry_test.go @@ -4,6 +4,7 @@ package telemetry import ( + "bytes" "encoding/json" "fmt" "log" @@ -35,6 +36,28 @@ var ipamQueryResponse = "" + " " + "" +var sampleCniReport = CNIReport{ + IsNewInstance: false, + EventMessage: "[azure-cns] Code:UnknownContainerID {IPConfiguration:{IPSubnet:{IPAddress: PrefixLength:0} DNSServers:[] GatewayIPAddress:} Routes:[] CnetAddressSpace:[] MultiTenancyInfo:{EncapType: ID:0} PrimaryInterfaceIdentifier: LocalIPConfiguration:{IPSubnet:{IPAddress: PrefixLength:0} DNSServers:[] GatewayIPAddress:} {ReturnCode:18 Message:NetworkContainer doesn't exist.}}.", + Timestamp: "2019-02-27 17:44:47.319911225 +0000 UTC", + Metadata: Metadata{ + Location: "EastUS2EUAP", + VMName: "k8s-agentpool1-65609007-0", + Offer: "aks", + OsType: "Linux", + PlacementGroupID: "", + PlatformFaultDomain: "0", + PlatformUpdateDomain: "0", + Publisher: "microsoft-aks", + ResourceGroupName: "rghostnetagttest", + Sku: "aks-ubuntu-1604-201811", + SubscriptionID: "eff73b63-f38d-4cb5-bad1-21f273c1e36b", + Tags: "acsengineVersion:v0.25.0;creationSource:acsengine-k8s-agentpool1-65609007-0;orchestrator:Kubernetes:1.10.9;poolName:agentpool1;resourceNameSuffix:65609007", + OSVersion: "2018.11.02", + VMID: "eff73b63-f38d-4cb5-bad1-21f273c1e36b", + VMSize: "Standard_DS2_v2", + KernelVersion: ""}} + func TestMain(m *testing.M) { u, _ := url.Parse("tcp://" + ipamQueryUrl) ipamAgent, err := common.NewListener(u) @@ -155,3 +178,18 @@ func TestSetReportState(t *testing.T) { t.Errorf("Error removing telemetry file due to %v", err) } } + +func TestPayloadCap(t *testing.T) { + // sampleCniReport is ~66 bytes and we're adding 2000 reports here to test that the payload will be capped to 65535 + for i := 0; i < 4; i++ { + for j := 0; j < 500; j++ { + tb.payload.push(sampleCniReport) + } + + var body bytes.Buffer + json.NewEncoder(&body).Encode(tb.payload) + if uint16(body.Len()) > MaxPayloadSize { + t.Fatalf("Payload size exceeded max size of %d", MaxPayloadSize) + } + } +} diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index d2c3bdf95f..a747003258 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -22,20 +22,24 @@ import ( // FdName - file descriptor name // Delimiter - delimiter for socket reads/writes // azureHostReportURL - host net agent url of type payload -// DefaultDncReportsSize - default DNC report slice size -// DefaultCniReportsSize - default CNI report slice size -// DefaultNpmReportsSize - default NPM report slice size // DefaultInterval - default interval for sending payload to host +// logName - telemetry log name +// MaxPayloadSize - max payload size in bytes const ( - FdName = "azure-vnet-telemetry" - Delimiter = '\n' - azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload" - DefaultInterval = 60 * time.Second - logName = "azure-vnet-telemetry" - MaxPayloadSize = 2097 + FdName = "azure-vnet-telemetry" + Delimiter = '\n' + azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload" + DefaultInterval = 10 * time.Second + logName = "azure-vnet-telemetry" + MaxPayloadSize uint16 = 65535 + dnc = "DNC" + cns = "CNS" + npm = "NPM" + cni = "CNI" ) var telemetryLogger = log.NewLogger(logName, log.LevelInfo, log.TargetStderr) +var payloadSize uint16 = 0 // TelemetryBuffer object type TelemetryBuffer struct { @@ -108,7 +112,6 @@ func (tb *TelemetryBuffer) StartServer() error { json.Unmarshal([]byte(reportStr), &npmReport) tb.data <- npmReport } else if _, ok := tmp["CniSucceeded"]; ok { - telemetryLogger.Printf("[Telemetry] Got cni report") var cniReport CNIReport json.Unmarshal([]byte(reportStr), &cniReport) tb.data <- cniReport @@ -157,14 +160,12 @@ func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) { case <-interval: // Send payload to host and clear cache when sent successfully // To-do : if we hit max slice size in payload, write to disk and process the logs on disk on future sends - telemetryLogger.Printf("[Telemetry] send data to host") if err := tb.sendToHost(); err == nil { tb.payload.reset() } else { telemetryLogger.Printf("[Telemetry] sending to host failed with error %+v", err) } case report := <-tb.data: - telemetryLogger.Printf("[Telemetry] Got data..Append it to buffer") tb.payload.push(report) case <-tb.cancel: goto EXIT @@ -253,21 +254,21 @@ func (pl *Payload) push(x interface{}) { } } - if pl.len() < MaxPayloadSize { - switch x.(type) { - case DNCReport: + if notExceeded, reportType := pl.payloadCapNotExceeded(x); notExceeded { + switch reportType { + case dnc: dncReport := x.(DNCReport) dncReport.Metadata = metadata pl.DNCReports = append(pl.DNCReports, dncReport) - case CNIReport: + case cni: cniReport := x.(CNIReport) cniReport.Metadata = metadata pl.CNIReports = append(pl.CNIReports, cniReport) - case NPMReport: + case npm: npmReport := x.(NPMReport) npmReport.Metadata = metadata pl.NPMReports = append(pl.NPMReports, npmReport) - case CNSReport: + case cns: cnsReport := x.(CNSReport) cnsReport.Metadata = metadata pl.CNSReports = append(pl.CNSReports, cnsReport) @@ -275,7 +276,7 @@ func (pl *Payload) push(x interface{}) { } } -// reset - reset payload slices +// reset - reset payload slices and sets payloadSize to 0 func (pl *Payload) reset() { pl.DNCReports = nil pl.DNCReports = make([]DNCReport, 0) @@ -285,11 +286,34 @@ func (pl *Payload) reset() { pl.NPMReports = make([]NPMReport, 0) pl.CNSReports = nil pl.CNSReports = make([]CNSReport, 0) + payloadSize = 0 } -// len - get number of payload items -func (pl *Payload) len() int { - return len(pl.CNIReports) + len(pl.CNSReports) + len(pl.DNCReports) + len(pl.NPMReports) +// payloadCapNotExceeded - Returns whether payload cap will be exceeded as a result of adding the new report; and the report type +// If the cap is not exceeded, we update the payload size here. +func (pl *Payload) payloadCapNotExceeded(x interface{}) (notExceeded bool, reportType string) { + var body bytes.Buffer + switch x.(type) { + case DNCReport: + reportType = dnc + json.NewEncoder(&body).Encode(x.(DNCReport)) + case CNIReport: + reportType = cni + json.NewEncoder(&body).Encode(x.(CNIReport)) + case NPMReport: + reportType = npm + json.NewEncoder(&body).Encode(x.(NPMReport)) + case CNSReport: + reportType = cns + json.NewEncoder(&body).Encode(x.(CNSReport)) + } + + updatedPayloadSize := uint16(body.Len()) + payloadSize + if notExceeded = updatedPayloadSize < MaxPayloadSize && payloadSize < updatedPayloadSize; notExceeded { + payloadSize = updatedPayloadSize + } + + return } // saveHostMetadata - save metadata got from wireserver to json file