Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cns/networkcontainers/networkcontainers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type NetworkContainers struct {

func interfaceExists(iFaceName string) (bool, error) {
_, err := net.InterfaceByName(iFaceName)

if err != nil {
errMsg := fmt.Sprintf("[Azure CNS] Unable to get interface by name %v, %v", iFaceName, err)
log.Printf(errMsg)
Expand Down
38 changes: 36 additions & 2 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017 Microsoft. All rights reserved.
// Copyright 2018 Microsoft. All rights reserved.
// MIT License

package telemetry
Expand Down Expand Up @@ -142,6 +142,21 @@ type NPMReport struct {
Metadata Metadata `json:"compute"`
}

// DNCReport structure.
type DNCReport struct {
IsNewInstance bool
CPUUsage string
MemoryUsage string
Processes string
EventMessage string
PartitionKey string
Allocations string
Timestamp string
UUID string
Errorcode string
Metadata Metadata `json:"compute"`
}

// ReportManager structure.
type ReportManager struct {
HostNetAgentURL string
Expand Down Expand Up @@ -210,8 +225,10 @@ func (reportMgr *ReportManager) SendReport() error {
log.Printf("[Telemetry] %+v", reportMgr.Report.(*CNIReport))
case *NPMReport:
log.Printf("[Telemetry] %+v", reportMgr.Report.(*NPMReport))
case *DNCReport:
log.Printf("[Telemetry] %+v", reportMgr.Report.(*DNCReport))
default:
log.Printf("[Telemetry] %+v", reportMgr.Report)
log.Printf("[Telemetry] Invalid report type")
}

httpc := &http.Client{}
Expand Down Expand Up @@ -452,3 +469,20 @@ func (reportMgr *ReportManager) GetHostMetadata() error {

return err
}

// ReportToBytes - returns the report bytes
func (reportMgr *ReportManager) ReportToBytes() (report []byte, err error) {
switch reportMgr.Report.(type) {
case *CNIReport:
case *NPMReport:
case *DNCReport:
default:
err = fmt.Errorf("[Telemetry] Invalid report type")
}

if err == nil {
report, err = json.Marshal(reportMgr.Report)
}

return
}
220 changes: 220 additions & 0 deletions telemetry/telemetrybuffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright 2018 Microsoft. All rights reserved.
// MIT License

package telemetry

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"net"
"net/http"
"strings"
"time"
)

// FdName - file descriptor name
// Delimiter - delimiter for socket reads/writes
// HostNetAgentURL - host net agent url of type payload
// DefaultDncReportsSize - default DNC report slice size
// DefaultCniReportsSize - default CNI report slice size
// DefaultNpmReportsSize - default NPM report slice size
// DefaultInterval - default interval for sending payload to host
const (
FdName = "azure-telemetry"
Delimiter = '\n'
HostNetAgentURL = "http://169.254.169.254/machine/plugins?comp=netagent&type=payload"
DefaultInterval = 1 * time.Minute
)

// TelemetryBuffer object
type TelemetryBuffer struct {
client net.Conn
listener net.Listener
connections []net.Conn
payload Payload
fdExists bool
connected bool
data chan interface{}
cancel chan bool
}

// Payload object holds the different types of reports
type Payload struct {
DNCReports []DNCReport
CNIReports []CNIReport
NPMReports []NPMReport
}

// NewTelemetryBuffer - create a new TelemetryBuffer
func NewTelemetryBuffer() (*TelemetryBuffer, error) {
var tb TelemetryBuffer
tb.data = make(chan interface{})
tb.cancel = make(chan bool, 1)
tb.connections = make([]net.Conn, 1)
err := tb.Listen(FdName)
if err != nil {
tb.fdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied")
} else {
// Spawn server goroutine to handle incoming connections
go func() {
for {
// Spawn worker goroutines to communicate with client
conn, err := tb.listener.Accept()
if err == nil {
tb.connections = append(tb.connections, conn)
go func() {
for {
reportStr, err := read(conn)
if err == nil {
var tmp map[string]interface{}
json.Unmarshal(reportStr, &tmp)
if _, ok := tmp["NpmVersion"]; ok {
var npmReport NPMReport
json.Unmarshal([]byte(reportStr), &npmReport)
tb.data <- npmReport
} else if _, ok := tmp["CniSucceeded"]; ok {
var cniReport CNIReport
json.Unmarshal([]byte(reportStr), &cniReport)
tb.data <- cniReport
} else if _, ok := tmp["Allocations"]; ok {
var dncReport DNCReport
json.Unmarshal([]byte(reportStr), &dncReport)
tb.data <- dncReport
}
}
}
}()
}
}
}()
}

err = tb.Dial(FdName)
if err == nil {
tb.connected = true
tb.payload.DNCReports = make([]DNCReport, 0)
tb.payload.CNIReports = make([]CNIReport, 0)
tb.payload.NPMReports = make([]NPMReport, 0)
} else if tb.fdExists {
tb.cleanup(FdName)
}

return &tb, err
}

// Start - start running an instance if it isn't already being run elsewhere
func (tb *TelemetryBuffer) Start(intervalms time.Duration) {
defer tb.close()
if !tb.fdExists && tb.connected {
if intervalms < DefaultInterval {
intervalms = DefaultInterval
}

interval := time.NewTicker(intervalms).C
for {
select {
case <-interval:
// Send payload to host and clear cache when sent successfully
// To-do : if we hit max slice size in payload, write to disk and process the logs on disk on future sends
if err := tb.sendToHost(); err == nil {
tb.payload.reset()
}
case report := <-tb.data:
tb.payload.push(report)
case <-tb.cancel:
goto EXIT
}
}
} else {
<-tb.cancel
}

EXIT:
}

// read - read from the file descriptor
func read(conn net.Conn) (b []byte, err error) {
b, err = bufio.NewReader(conn).ReadBytes(Delimiter)
if err == nil {
b = b[:len(b)-1]
}

return
}

// Write - write to the file descriptor
func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) {
b = append(b, Delimiter)
w := bufio.NewWriter(tb.client)
c, err = w.Write(b)
if err == nil {
w.Flush()
}

return
}

// Cancel - signal to tear down telemetry buffer
func (tb *TelemetryBuffer) Cancel() {
tb.cancel <- true
}

// close - close all connections
func (tb *TelemetryBuffer) close() {
if tb.client != nil {
tb.client.Close()
}

if tb.listener != nil {
tb.listener.Close()
}

for _, conn := range tb.connections {
if conn != nil {
conn.Close()
}
}
}

// sendToHost - send payload to host
func (tb *TelemetryBuffer) sendToHost() error {
httpc := &http.Client{}
var body bytes.Buffer
json.NewEncoder(&body).Encode(tb.payload)
resp, err := httpc.Post(HostNetAgentURL, ContentType, &body)
if err != nil {
return fmt.Errorf("[Telemetry] HTTP Post returned error %v", err)
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("[Telemetry] HTTP Post returned statuscode %d", resp.StatusCode)
}

return nil
}

// push - push the report (x) to corresponding slice
func (pl *Payload) push(x interface{}) {
switch x.(type) {
case DNCReport:
pl.DNCReports = append(pl.DNCReports, x.(DNCReport))
case CNIReport:
pl.CNIReports = append(pl.CNIReports, x.(CNIReport))
case NPMReport:
pl.NPMReports = append(pl.NPMReports, x.(NPMReport))
}
}

// reset - reset payload slices
func (pl *Payload) reset() {
pl.DNCReports = nil
pl.DNCReports = make([]DNCReport, 0)
pl.CNIReports = nil
pl.CNIReports = make([]CNIReport, 0)
pl.NPMReports = nil
pl.NPMReports = make([]NPMReport, 0)
}
39 changes: 39 additions & 0 deletions telemetry/telemetrybuffer_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2018 Microsoft. All rights reserved.
// MIT License

package telemetry

import (
"fmt"
"net"
"os"
)

const (
fdTemplate = "/tmp/%s.sock"
)

// Dial - try to connect to/create a socket with 'name'
func (tb *TelemetryBuffer) Dial(name string) (err error) {
conn, err := net.Dial("unix", fmt.Sprintf(fdTemplate, name))
if err == nil {
tb.client = conn
}

return err
}

// Listen - try to create and listen on socket with 'name'
func (tb *TelemetryBuffer) Listen(name string) (err error) {
conn, err := net.Listen("unix", fmt.Sprintf(fdTemplate, name))
if err == nil {
tb.listener = conn
}

return err
}

// cleanup - manually remove socket
func (tb *TelemetryBuffer) cleanup(name string) error {
return os.Remove(fmt.Sprintf(fdTemplate, name))
}
39 changes: 39 additions & 0 deletions telemetry/telemetrybuffer_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2018 Microsoft. All rights reserved.
// MIT License

package telemetry

import (
"fmt"

"github.com/Microsoft/go-winio"
)

const (
fdTemplate = "\\\\.\\pipe\\%s"
)

// Dial - try to connect to a named pipe with 'name'
func (tb *TelemetryBuffer) Dial(name string) (err error) {
conn, err := winio.DialPipe(fmt.Sprintf(fdTemplate, name), nil)
if err == nil {
tb.client = conn
}

return err
}

// Listen - try to create and listen on named pipe with 'name'
func (tb *TelemetryBuffer) Listen(name string) (err error) {
listener, err := winio.ListenPipe(fmt.Sprintf(fdTemplate, name), nil)
if err == nil {
tb.listener = listener
}

return err
}

// cleanup - stub
func (tb *TelemetryBuffer) cleanup(name string) error {
return nil
}