From 80c6b00edd03b787126dbf34a109f6e36594479d Mon Sep 17 00:00:00 2001 From: Jaeryn Chu Date: Thu, 3 Jan 2019 14:32:27 -0800 Subject: [PATCH 1/3] Limiting the size of our buffered payload to ~2MB --- telemetry/telemetrybuffer.go | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 8a9214787f..61d1b0464f 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -17,15 +17,14 @@ import ( // FdName - file descriptor name // Delimiter - delimiter for socket reads/writes // HostNetAgentURL - host net agent url of type payload -// DefaultDncReportsSize - default DNC report slice size -// DefaultCniReportsSize - default CNI report slice size -// DefaultNpmReportsSize - default NPM report slice size // DefaultInterval - default interval for sending payload to host +// MaxPayloadSize - max payload size (~2MB) const ( FdName = "azure-telemetry" Delimiter = '\n' HostNetAgentURL = "http://169.254.169.254/machine/plugins?comp=netagent&type=payload" DefaultInterval = 1 * time.Minute + MaxPayloadSize = 2097 ) // TelemetryBuffer object @@ -205,15 +204,32 @@ func (tb *TelemetryBuffer) sendToHost() error { // push - push the report (x) to corresponding slice func (pl *Payload) push(x interface{}) { + truncate := false + if pl.len() > MaxPayloadSize { + truncate = true + } + switch x.(type) { case DNCReport: pl.DNCReports = append(pl.DNCReports, x.(DNCReport)) + if truncate { + pl.DNCReports = pl.DNCReports[1:] + } case CNIReport: pl.CNIReports = append(pl.CNIReports, x.(CNIReport)) + if truncate { + pl.CNIReports = pl.CNIReports[1:] + } case NPMReport: pl.NPMReports = append(pl.NPMReports, x.(NPMReport)) + if truncate { + pl.NPMReports = pl.NPMReports[1:] + } case CNSReport: pl.CNSReports = append(pl.CNSReports, x.(CNSReport)) + if truncate { + pl.CNSReports = pl.CNSReports[1:] + } } } @@ -228,3 +244,8 @@ func (pl *Payload) reset() { pl.CNSReports = nil pl.CNSReports = make([]CNSReport, 0) } + +// len - get number of payload items +func (pl *Payload) len() int { + return len(pl.CNIReports) + len(pl.CNSReports) + len(pl.DNCReports) + len(pl.NPMReports) +} From 8c2b7e1d13c03c44d2b229e7f62b3edc8cd3c6da Mon Sep 17 00:00:00 2001 From: Jaeryn Chu Date: Wed, 27 Feb 2019 11:07:31 -0800 Subject: [PATCH 2/3] Modifying payload cap from 2MB to 65535 bytes. --- telemetry/telemetry_test.go | 38 +++++++++++++++++ telemetry/telemetrybuffer.go | 83 +++++++++++++++++++++--------------- 2 files changed, 87 insertions(+), 34 deletions(-) diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go index 0223fa71ce..3103e7e49c 100644 --- a/telemetry/telemetry_test.go +++ b/telemetry/telemetry_test.go @@ -4,6 +4,7 @@ package telemetry import ( + "bytes" "encoding/json" "fmt" "log" @@ -35,6 +36,28 @@ var ipamQueryResponse = "" + " " + "" +var sampleCniReport = CNIReport{ + IsNewInstance: false, + EventMessage: "[azure-cns] Code:UnknownContainerID {IPConfiguration:{IPSubnet:{IPAddress: PrefixLength:0} DNSServers:[] GatewayIPAddress:} Routes:[] CnetAddressSpace:[] MultiTenancyInfo:{EncapType: ID:0} PrimaryInterfaceIdentifier: LocalIPConfiguration:{IPSubnet:{IPAddress: PrefixLength:0} DNSServers:[] GatewayIPAddress:} {ReturnCode:18 Message:NetworkContainer doesn't exist.}}.", + Timestamp: "2019-02-27 17:44:47.319911225 +0000 UTC", + Metadata: Metadata{ + Location: "EastUS2EUAP", + VMName: "k8s-agentpool1-65609007-0", + Offer: "aks", + OsType: "Linux", + PlacementGroupID: "", + PlatformFaultDomain: "0", + PlatformUpdateDomain: "0", + Publisher: "microsoft-aks", + ResourceGroupName: "rghostnetagttest", + Sku: "aks-ubuntu-1604-201811", + SubscriptionID: "eff73b63-f38d-4cb5-bad1-21f273c1e36b", + Tags: "acsengineVersion:v0.25.0;creationSource:acsengine-k8s-agentpool1-65609007-0;orchestrator:Kubernetes:1.10.9;poolName:agentpool1;resourceNameSuffix:65609007", + OSVersion: "2018.11.02", + VMID: "eff73b63-f38d-4cb5-bad1-21f273c1e36b", + VMSize: "Standard_DS2_v2", + KernelVersion: ""}} + func TestMain(m *testing.M) { u, _ := url.Parse("tcp://" + ipamQueryUrl) ipamAgent, err := common.NewListener(u) @@ -155,3 +178,18 @@ func TestSetReportState(t *testing.T) { t.Errorf("Error removing telemetry file due to %v", err) } } + +func TestPayloadCap(t *testing.T) { + // sampleCniReport is ~66 bytes and we're adding 2000 reports here to test that the payload will be capped to 65535 + for i := 0; i < 4; i++ { + for j := 0; j < 500; j++ { + tb.payload.push(sampleCniReport) + } + + var body bytes.Buffer + json.NewEncoder(&body).Encode(tb.payload) + if uint16(body.Len()) > MaxPayloadSize { + t.Fatalf("Payload size exceeded max size of %d", MaxPayloadSize) + } + } +} diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index d2c3bdf95f..a4c9f1a138 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -22,17 +22,20 @@ import ( // FdName - file descriptor name // Delimiter - delimiter for socket reads/writes // azureHostReportURL - host net agent url of type payload -// DefaultDncReportsSize - default DNC report slice size -// DefaultCniReportsSize - default CNI report slice size -// DefaultNpmReportsSize - default NPM report slice size // DefaultInterval - default interval for sending payload to host +// logName - telemetry log name +// MaxPayloadSize - max payload size in bytes const ( - FdName = "azure-vnet-telemetry" - Delimiter = '\n' - azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload" - DefaultInterval = 60 * time.Second - logName = "azure-vnet-telemetry" - MaxPayloadSize = 2097 + FdName = "azure-vnet-telemetry" + Delimiter = '\n' + azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload" + DefaultInterval = 10 * time.Second + logName = "azure-vnet-telemetry" + MaxPayloadSize uint16 = 65535 + dnc = "DNC" + cns = "CNS" + npm = "NPM" + cni = "CNI" ) var telemetryLogger = log.NewLogger(logName, log.LevelInfo, log.TargetStderr) @@ -108,7 +111,6 @@ func (tb *TelemetryBuffer) StartServer() error { json.Unmarshal([]byte(reportStr), &npmReport) tb.data <- npmReport } else if _, ok := tmp["CniSucceeded"]; ok { - telemetryLogger.Printf("[Telemetry] Got cni report") var cniReport CNIReport json.Unmarshal([]byte(reportStr), &cniReport) tb.data <- cniReport @@ -157,14 +159,12 @@ func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) { case <-interval: // Send payload to host and clear cache when sent successfully // To-do : if we hit max slice size in payload, write to disk and process the logs on disk on future sends - telemetryLogger.Printf("[Telemetry] send data to host") if err := tb.sendToHost(); err == nil { tb.payload.reset() } else { telemetryLogger.Printf("[Telemetry] sending to host failed with error %+v", err) } case report := <-tb.data: - telemetryLogger.Printf("[Telemetry] Got data..Append it to buffer") tb.payload.push(report) case <-tb.cancel: goto EXIT @@ -253,25 +253,27 @@ func (pl *Payload) push(x interface{}) { } } - if pl.len() < MaxPayloadSize { - switch x.(type) { - case DNCReport: - dncReport := x.(DNCReport) - dncReport.Metadata = metadata - pl.DNCReports = append(pl.DNCReports, dncReport) - case CNIReport: - cniReport := x.(CNIReport) - cniReport.Metadata = metadata - pl.CNIReports = append(pl.CNIReports, cniReport) - case NPMReport: - npmReport := x.(NPMReport) - npmReport.Metadata = metadata - pl.NPMReports = append(pl.NPMReports, npmReport) - case CNSReport: - cnsReport := x.(CNSReport) - cnsReport.Metadata = metadata - pl.CNSReports = append(pl.CNSReports, cnsReport) - } + switch x.(type) { + case DNCReport: + dncReport := x.(DNCReport) + dncReport.Metadata = metadata + pl.DNCReports = append(pl.DNCReports, dncReport) + pl.capPayload(dnc) + case CNIReport: + cniReport := x.(CNIReport) + cniReport.Metadata = metadata + pl.CNIReports = append(pl.CNIReports, cniReport) + pl.capPayload(cni) + case NPMReport: + npmReport := x.(NPMReport) + npmReport.Metadata = metadata + pl.NPMReports = append(pl.NPMReports, npmReport) + pl.capPayload(npm) + case CNSReport: + cnsReport := x.(CNSReport) + cnsReport.Metadata = metadata + pl.CNSReports = append(pl.CNSReports, cnsReport) + pl.capPayload(cns) } } @@ -287,9 +289,22 @@ func (pl *Payload) reset() { pl.CNSReports = make([]CNSReport, 0) } -// len - get number of payload items -func (pl *Payload) len() int { - return len(pl.CNIReports) + len(pl.CNSReports) + len(pl.DNCReports) + len(pl.NPMReports) +// capPayload - get number of payload items +func (pl *Payload) capPayload(reportType string) { + var body bytes.Buffer + json.NewEncoder(&body).Encode(pl) + if uint16(body.Len()) > MaxPayloadSize { + switch reportType { + case dnc: + pl.DNCReports = pl.DNCReports[:len(pl.DNCReports)-1] + case cni: + pl.CNIReports = pl.CNIReports[:len(pl.CNIReports)-1] + case npm: + pl.NPMReports = pl.NPMReports[:len(pl.NPMReports)-1] + case cns: + pl.CNSReports = pl.CNSReports[:len(pl.CNSReports)-1] + } + } } // saveHostMetadata - save metadata got from wireserver to json file From d9e86600cec03faf87cc00b64330f020305c9490 Mon Sep 17 00:00:00 2001 From: Jaeryn Chu Date: Wed, 27 Feb 2019 14:18:16 -0800 Subject: [PATCH 3/3] Modifying how we cap payload size. --- telemetry/telemetrybuffer.go | 81 ++++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 36 deletions(-) diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index a4c9f1a138..a747003258 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -39,6 +39,7 @@ const ( ) var telemetryLogger = log.NewLogger(logName, log.LevelInfo, log.TargetStderr) +var payloadSize uint16 = 0 // TelemetryBuffer object type TelemetryBuffer struct { @@ -253,31 +254,29 @@ func (pl *Payload) push(x interface{}) { } } - switch x.(type) { - case DNCReport: - dncReport := x.(DNCReport) - dncReport.Metadata = metadata - pl.DNCReports = append(pl.DNCReports, dncReport) - pl.capPayload(dnc) - case CNIReport: - cniReport := x.(CNIReport) - cniReport.Metadata = metadata - pl.CNIReports = append(pl.CNIReports, cniReport) - pl.capPayload(cni) - case NPMReport: - npmReport := x.(NPMReport) - npmReport.Metadata = metadata - pl.NPMReports = append(pl.NPMReports, npmReport) - pl.capPayload(npm) - case CNSReport: - cnsReport := x.(CNSReport) - cnsReport.Metadata = metadata - pl.CNSReports = append(pl.CNSReports, cnsReport) - pl.capPayload(cns) + if notExceeded, reportType := pl.payloadCapNotExceeded(x); notExceeded { + switch reportType { + case dnc: + dncReport := x.(DNCReport) + dncReport.Metadata = metadata + pl.DNCReports = append(pl.DNCReports, dncReport) + case cni: + cniReport := x.(CNIReport) + cniReport.Metadata = metadata + pl.CNIReports = append(pl.CNIReports, cniReport) + case npm: + npmReport := x.(NPMReport) + npmReport.Metadata = metadata + pl.NPMReports = append(pl.NPMReports, npmReport) + case cns: + cnsReport := x.(CNSReport) + cnsReport.Metadata = metadata + pl.CNSReports = append(pl.CNSReports, cnsReport) + } } } -// reset - reset payload slices +// reset - reset payload slices and sets payloadSize to 0 func (pl *Payload) reset() { pl.DNCReports = nil pl.DNCReports = make([]DNCReport, 0) @@ -287,24 +286,34 @@ func (pl *Payload) reset() { pl.NPMReports = make([]NPMReport, 0) pl.CNSReports = nil pl.CNSReports = make([]CNSReport, 0) + payloadSize = 0 } -// capPayload - get number of payload items -func (pl *Payload) capPayload(reportType string) { +// payloadCapNotExceeded - Returns whether payload cap will be exceeded as a result of adding the new report; and the report type +// If the cap is not exceeded, we update the payload size here. +func (pl *Payload) payloadCapNotExceeded(x interface{}) (notExceeded bool, reportType string) { var body bytes.Buffer - json.NewEncoder(&body).Encode(pl) - if uint16(body.Len()) > MaxPayloadSize { - switch reportType { - case dnc: - pl.DNCReports = pl.DNCReports[:len(pl.DNCReports)-1] - case cni: - pl.CNIReports = pl.CNIReports[:len(pl.CNIReports)-1] - case npm: - pl.NPMReports = pl.NPMReports[:len(pl.NPMReports)-1] - case cns: - pl.CNSReports = pl.CNSReports[:len(pl.CNSReports)-1] - } + switch x.(type) { + case DNCReport: + reportType = dnc + json.NewEncoder(&body).Encode(x.(DNCReport)) + case CNIReport: + reportType = cni + json.NewEncoder(&body).Encode(x.(CNIReport)) + case NPMReport: + reportType = npm + json.NewEncoder(&body).Encode(x.(NPMReport)) + case CNSReport: + reportType = cns + json.NewEncoder(&body).Encode(x.(CNSReport)) + } + + updatedPayloadSize := uint16(body.Len()) + payloadSize + if notExceeded = updatedPayloadSize < MaxPayloadSize && payloadSize < updatedPayloadSize; notExceeded { + payloadSize = updatedPayloadSize } + + return } // saveHostMetadata - save metadata got from wireserver to json file