Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package telemetry

import (
"bytes"
"encoding/json"
"fmt"
"log"
Expand Down Expand Up @@ -35,6 +36,28 @@ var ipamQueryResponse = "" +
" </Interface>" +
"</Interfaces>"

var sampleCniReport = CNIReport{
IsNewInstance: false,
EventMessage: "[azure-cns] Code:UnknownContainerID {IPConfiguration:{IPSubnet:{IPAddress: PrefixLength:0} DNSServers:[] GatewayIPAddress:} Routes:[] CnetAddressSpace:[] MultiTenancyInfo:{EncapType: ID:0} PrimaryInterfaceIdentifier: LocalIPConfiguration:{IPSubnet:{IPAddress: PrefixLength:0} DNSServers:[] GatewayIPAddress:} {ReturnCode:18 Message:NetworkContainer doesn't exist.}}.",
Timestamp: "2019-02-27 17:44:47.319911225 +0000 UTC",
Metadata: Metadata{
Location: "EastUS2EUAP",
VMName: "k8s-agentpool1-65609007-0",
Offer: "aks",
OsType: "Linux",
PlacementGroupID: "",
PlatformFaultDomain: "0",
PlatformUpdateDomain: "0",
Publisher: "microsoft-aks",
ResourceGroupName: "rghostnetagttest",
Sku: "aks-ubuntu-1604-201811",
SubscriptionID: "eff73b63-f38d-4cb5-bad1-21f273c1e36b",
Tags: "acsengineVersion:v0.25.0;creationSource:acsengine-k8s-agentpool1-65609007-0;orchestrator:Kubernetes:1.10.9;poolName:agentpool1;resourceNameSuffix:65609007",
OSVersion: "2018.11.02",
VMID: "eff73b63-f38d-4cb5-bad1-21f273c1e36b",
VMSize: "Standard_DS2_v2",
KernelVersion: ""}}

func TestMain(m *testing.M) {
u, _ := url.Parse("tcp://" + ipamQueryUrl)
ipamAgent, err := common.NewListener(u)
Expand Down Expand Up @@ -155,3 +178,18 @@ func TestSetReportState(t *testing.T) {
t.Errorf("Error removing telemetry file due to %v", err)
}
}

func TestPayloadCap(t *testing.T) {
// sampleCniReport is ~66 bytes and we're adding 2000 reports here to test that the payload will be capped to 65535
for i := 0; i < 4; i++ {
for j := 0; j < 500; j++ {
tb.payload.push(sampleCniReport)
}

var body bytes.Buffer
json.NewEncoder(&body).Encode(tb.payload)
if uint16(body.Len()) > MaxPayloadSize {
t.Fatalf("Payload size exceeded max size of %d", MaxPayloadSize)
}
}
}
68 changes: 46 additions & 22 deletions telemetry/telemetrybuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@ import (
// FdName - file descriptor name
// Delimiter - delimiter for socket reads/writes
// azureHostReportURL - host net agent url of type payload
// DefaultDncReportsSize - default DNC report slice size
// DefaultCniReportsSize - default CNI report slice size
// DefaultNpmReportsSize - default NPM report slice size
// DefaultInterval - default interval for sending payload to host
// logName - telemetry log name
// MaxPayloadSize - max payload size in bytes
const (
FdName = "azure-vnet-telemetry"
Delimiter = '\n'
azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload"
DefaultInterval = 60 * time.Second
logName = "azure-vnet-telemetry"
MaxPayloadSize = 2097
FdName = "azure-vnet-telemetry"
Delimiter = '\n'
azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload"
DefaultInterval = 10 * time.Second
logName = "azure-vnet-telemetry"
MaxPayloadSize uint16 = 65535
dnc = "DNC"
cns = "CNS"
npm = "NPM"
cni = "CNI"
)

var telemetryLogger = log.NewLogger(logName, log.LevelInfo, log.TargetStderr)
var payloadSize uint16 = 0

// TelemetryBuffer object
type TelemetryBuffer struct {
Expand Down Expand Up @@ -108,7 +112,6 @@ func (tb *TelemetryBuffer) StartServer() error {
json.Unmarshal([]byte(reportStr), &npmReport)
tb.data <- npmReport
} else if _, ok := tmp["CniSucceeded"]; ok {
telemetryLogger.Printf("[Telemetry] Got cni report")
var cniReport CNIReport
json.Unmarshal([]byte(reportStr), &cniReport)
tb.data <- cniReport
Expand Down Expand Up @@ -157,14 +160,12 @@ func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) {
case <-interval:
// Send payload to host and clear cache when sent successfully
// To-do : if we hit max slice size in payload, write to disk and process the logs on disk on future sends
telemetryLogger.Printf("[Telemetry] send data to host")
if err := tb.sendToHost(); err == nil {
tb.payload.reset()
} else {
telemetryLogger.Printf("[Telemetry] sending to host failed with error %+v", err)
}
case report := <-tb.data:
telemetryLogger.Printf("[Telemetry] Got data..Append it to buffer")
tb.payload.push(report)
case <-tb.cancel:
goto EXIT
Expand Down Expand Up @@ -253,29 +254,29 @@ func (pl *Payload) push(x interface{}) {
}
}

if pl.len() < MaxPayloadSize {
switch x.(type) {
case DNCReport:
if notExceeded, reportType := pl.payloadCapNotExceeded(x); notExceeded {
switch reportType {
case dnc:
dncReport := x.(DNCReport)
dncReport.Metadata = metadata
pl.DNCReports = append(pl.DNCReports, dncReport)
case CNIReport:
case cni:
cniReport := x.(CNIReport)
cniReport.Metadata = metadata
pl.CNIReports = append(pl.CNIReports, cniReport)
case NPMReport:
case npm:
npmReport := x.(NPMReport)
npmReport.Metadata = metadata
pl.NPMReports = append(pl.NPMReports, npmReport)
case CNSReport:
case cns:
cnsReport := x.(CNSReport)
cnsReport.Metadata = metadata
pl.CNSReports = append(pl.CNSReports, cnsReport)
}
}
}

// reset - reset payload slices
// reset - reset payload slices and sets payloadSize to 0
func (pl *Payload) reset() {
pl.DNCReports = nil
pl.DNCReports = make([]DNCReport, 0)
Expand All @@ -285,11 +286,34 @@ func (pl *Payload) reset() {
pl.NPMReports = make([]NPMReport, 0)
pl.CNSReports = nil
pl.CNSReports = make([]CNSReport, 0)
payloadSize = 0
}

// len - get number of payload items
func (pl *Payload) len() int {
return len(pl.CNIReports) + len(pl.CNSReports) + len(pl.DNCReports) + len(pl.NPMReports)
// payloadCapNotExceeded - Returns whether payload cap will be exceeded as a result of adding the new report; and the report type
// If the cap is not exceeded, we update the payload size here.
func (pl *Payload) payloadCapNotExceeded(x interface{}) (notExceeded bool, reportType string) {
var body bytes.Buffer
switch x.(type) {
case DNCReport:
reportType = dnc
json.NewEncoder(&body).Encode(x.(DNCReport))
case CNIReport:
reportType = cni
json.NewEncoder(&body).Encode(x.(CNIReport))
case NPMReport:
reportType = npm
json.NewEncoder(&body).Encode(x.(NPMReport))
case CNSReport:
reportType = cns
json.NewEncoder(&body).Encode(x.(CNSReport))
}

updatedPayloadSize := uint16(body.Len()) + payloadSize
if notExceeded = updatedPayloadSize < MaxPayloadSize && payloadSize < updatedPayloadSize; notExceeded {
payloadSize = updatedPayloadSize
}

return
}

// saveHostMetadata - save metadata got from wireserver to json file
Expand Down