Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 47 additions & 14 deletions cni/network/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,52 @@ func handleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) {
return isupdate, nil
}

// startTelemetryService - Kills if any telemetry service runs and start new telemetry service
func startTelemetryService(path string) error {
platform.KillProcessByName(telemetry.TelemetryServiceProcessName)

log.Printf("[cni] Starting telemetry service process")

if err := common.StartProcess(path); err != nil {
log.Printf("[Telemetry] Failed to start telemetry service process :%v", err)
return err
}

log.Printf("[cni] Telemetry service started")

for attempt := 0; attempt < 5; attempt++ {
if telemetry.SockExists() {
break
}

time.Sleep(200 * time.Millisecond)
}

return nil
}

func connectToTelemetryService(tb *telemetry.TelemetryBuffer) {
path := fmt.Sprintf("%v/%v", telemetry.CniInstallDir, telemetry.TelemetryServiceProcessName)

for attempt := 0; attempt < 2; attempt++ {
if err := tb.Connect(); err != nil {
log.Printf("Connection to telemetry socket failed: %v", err)
tb.Cleanup(telemetry.FdName)

if isExists, _ := common.CheckIfFileExists(path); !isExists {
log.Printf("Skip starting telemetry service as file didn't exist")
return
}

startTelemetryService(path)
} else {
tb.Connected = true
log.Printf("Connected to telemetry service")
return
}
}
}

// Main is the entry point for CNI network plugin.
func main() {

Expand Down Expand Up @@ -170,20 +216,7 @@ func main() {
}

tb := telemetry.NewTelemetryBuffer("")

for attempt := 0; attempt < 2; attempt++ {
err = tb.Connect()
if err != nil {
log.Printf("Connection to telemetry socket failed: %v", err)
tb.Cleanup(telemetry.FdName)
telemetry.StartTelemetryService()
} else {
tb.Connected = true
log.Printf("Connected to telemetry service")
break
}
}

connectToTelemetryService(tb)
defer tb.Close()

t := time.Now()
Expand Down
37 changes: 14 additions & 23 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strings"

"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/platform"
)

Expand Down Expand Up @@ -236,29 +237,16 @@ func (report *NPMReport) GetReport(clusterID, nodeName, npmVersion, kubernetesVe
// SendReport will send telemetry report to HostNetAgent.
func (reportMgr *ReportManager) SendReport(tb *TelemetryBuffer) error {
var err error
if tb != nil && tb.Connected {
telemetryLogger.Printf("[Telemetry] Going to send Telemetry report to hostnetagent")

switch reportMgr.Report.(type) {
case *CNIReport:
telemetryLogger.Printf("[Telemetry] %+v", reportMgr.Report.(*CNIReport))
case *NPMReport:
telemetryLogger.Printf("[Telemetry] %+v", reportMgr.Report.(*NPMReport))
case *DNCReport:
telemetryLogger.Printf("[Telemetry] %+v", reportMgr.Report.(*DNCReport))
default:
telemetryLogger.Printf("[Telemetry] Invalid report type")
}
var report []byte

report, err := reportMgr.ReportToBytes()
if tb != nil && tb.Connected {
report, err = reportMgr.ReportToBytes()
if err == nil {
// If write fails, try to re-establish connections as server/client
if _, err = tb.Write(report); err != nil {
tb.Cancel()
}
}
} else {
err = fmt.Errorf("Not connected to telemetry server or tb is nil")
}

return err
Expand All @@ -284,21 +272,20 @@ func (reportMgr *ReportManager) SetReportState(telemetryFile string) error {

_, err = f.Write(reportBytes)
if err != nil {
telemetryLogger.Printf("[Telemetry] Error while writing to file %v", err)
log.Printf("[Telemetry] Error while writing to file %v", err)
return fmt.Errorf("[Telemetry] Error while writing to file %v", err)
}

// set IsNewInstance in report
reflect.ValueOf(reportMgr.Report).Elem().FieldByName("IsNewInstance").SetBool(false)
telemetryLogger.Printf("[Telemetry] SetReportState succeeded")
return nil
}

// GetReportState will check if report is sent at least once by checking telemetry file.
func (reportMgr *ReportManager) GetReportState(telemetryFile string) bool {
// try to set IsNewInstance in report
if _, err := os.Stat(telemetryFile); os.IsNotExist(err) {
telemetryLogger.Printf("[Telemetry] File not exist %v", telemetryFile)
log.Printf("[Telemetry] File not exist %v", telemetryFile)
reflect.ValueOf(reportMgr.Report).Elem().FieldByName("IsNewInstance").SetBool(true)
return false
}
Expand Down Expand Up @@ -430,7 +417,10 @@ func (report *CNIReport) GetOrchestratorDetails() {
}

// ReportToBytes - returns the report bytes
func (reportMgr *ReportManager) ReportToBytes() (report []byte, err error) {
func (reportMgr *ReportManager) ReportToBytes() ([]byte, error) {
var err error
var report []byte

switch reportMgr.Report.(type) {
case *CNIReport:
case *NPMReport:
Expand All @@ -440,9 +430,10 @@ func (reportMgr *ReportManager) ReportToBytes() (report []byte, err error) {
err = fmt.Errorf("[Telemetry] Invalid report type")
}

if err == nil {
report, err = json.Marshal(reportMgr.Report)
if err != nil {
return []byte{}, err
}

return
report, err = json.Marshal(reportMgr.Report)
return report, err
}
16 changes: 16 additions & 0 deletions telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func TestMain(m *testing.M) {
reportManager.ContentType = "application/json"
reportManager.Report = &CNIReport{}

if err := InitTelemetryLogger(); err == nil {
defer CloseTelemetryLogger()
}

tb = NewTelemetryBuffer(hostAgentUrl)
err = tb.StartServer()
if err == nil {
Expand Down Expand Up @@ -158,6 +162,14 @@ func TestSendTelemetry(t *testing.T) {
if err != nil {
t.Errorf("SendTelemetry failed due to %v", err)
}

i := 3
rpMgr := &ReportManager{}
rpMgr.Report = &i
err = rpMgr.SendReport(tb)
if err == nil {
t.Errorf("SendTelemetry not failed for incorrect report type")
}
}

func TestReceiveTelemetryData(t *testing.T) {
Expand Down Expand Up @@ -214,6 +226,10 @@ func TestClientCloseTelemetryConnection(t *testing.T) {
go tb.BufferAndPushData(0)
}

if !SockExists() {
t.Errorf("telemetry sock doesn't exist")
}

// create client telemetrybuffer and connect to server
tb1 := NewTelemetryBuffer(hostAgentUrl)
if err := tb1.Connect(); err != nil {
Expand Down
50 changes: 14 additions & 36 deletions telemetry/telemetrybuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
"sync"
"time"

"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/platform"
)

// FdName - file descriptor name
Expand All @@ -30,7 +28,7 @@ const (
FdName = "azure-vnet-telemetry"
Delimiter = '\n'
azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload"
DefaultInterval = 10 * time.Second
DefaultInterval = 30 * time.Second
logName = "azure-vnet-telemetry"
MaxPayloadSize uint16 = 65535
dnc = "DNC"
Expand Down Expand Up @@ -64,6 +62,14 @@ type Payload struct {
CNSReports []CNSReport
}

func InitTelemetryLogger() error {
return telemetryLogger.SetTarget(log.TargetLogfile)
}

func CloseTelemetryLogger() {
telemetryLogger.Close()
}

// NewTelemetryBuffer - create a new TelemetryBuffer
func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer {
var tb TelemetryBuffer
Expand All @@ -80,11 +86,6 @@ func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer {
tb.payload.NPMReports = make([]NPMReport, 0)
tb.payload.CNSReports = make([]CNSReport, 0)

err := telemetryLogger.SetTarget(log.TargetLogfile)
if err != nil {
fmt.Printf("Failed to configure logging: %v\n", err)
}

return &tb
}

Expand All @@ -107,6 +108,8 @@ func (tb *TelemetryBuffer) StartServer() error {
return err
}

InitTelemetryLogger()

telemetryLogger.Printf("Telemetry service started")
// Spawn server goroutine to handle incoming connections
go func() {
Expand Down Expand Up @@ -251,23 +254,21 @@ func (tb *TelemetryBuffer) Cancel() {
// Close - close all connections
func (tb *TelemetryBuffer) Close() {
if tb.client != nil {
telemetryLogger.Printf("client close")
tb.client.Close()
tb.client = nil
}

if tb.listener != nil {
telemetryLogger.Printf("server close")
tb.listener.Close()
tb.listener = nil
CloseTelemetryLogger()
}

tb.mutex.Lock()
defer tb.mutex.Unlock()

for _, conn := range tb.connections {
if conn != nil {
telemetryLogger.Printf("connection close as server closed")
conn.Close()
}
}
Expand Down Expand Up @@ -390,11 +391,12 @@ func getHostMetadata() (Metadata, error) {
if err == nil {
var metadata Metadata
if err = json.Unmarshal(content, &metadata); err == nil {
telemetryLogger.Printf("[Telemetry] Returning hostmetadata from state")
return metadata, nil
}
}

telemetryLogger.Printf("[Telemetry] Request metadata from wireserver")

req, err := http.NewRequest("GET", metadataURL, nil)
if err != nil {
return Metadata{}, err
Expand Down Expand Up @@ -424,27 +426,3 @@ func getHostMetadata() (Metadata, error) {

return metareport.Metadata, err
}

// StartTelemetryService - Kills if any telemetry service runs and start new telemetry service
func StartTelemetryService() error {
platform.KillProcessByName(telemetryServiceProcessName)

telemetryLogger.Printf("[Telemetry] Starting telemetry service process")
path := fmt.Sprintf("%v/%v", cniInstallDir, telemetryServiceProcessName)
if err := common.StartProcess(path); err != nil {
telemetryLogger.Printf("[Telemetry] Failed to start telemetry service process :%v", err)
return err
}

telemetryLogger.Printf("[Telemetry] Telemetry service started")

for attempt := 0; attempt < 5; attempt++ {
if checkIfSockExists() {
break
}

time.Sleep(200 * time.Millisecond)
}

return nil
}
8 changes: 4 additions & 4 deletions telemetry/telemetrybuffer_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
)

const (
fdTemplate = "/tmp/%s.sock"
telemetryServiceProcessName = "azure-vnet-telemetry"
cniInstallDir = "/opt/cni/bin"
fdTemplate = "/var/run/%s.sock"
TelemetryServiceProcessName = "azure-vnet-telemetry"
CniInstallDir = "/opt/cni/bin"
metadataFile = "/tmp/azuremetadata.json"
)

Expand Down Expand Up @@ -41,7 +41,7 @@ func (tb *TelemetryBuffer) Cleanup(name string) error {
return os.Remove(fmt.Sprintf(fdTemplate, name))
}

func checkIfSockExists() bool {
func SockExists() bool {
if _, err := os.Stat(fmt.Sprintf(fdTemplate, FdName)); !os.IsNotExist(err) {
return true
}
Expand Down
6 changes: 3 additions & 3 deletions telemetry/telemetrybuffer_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (

const (
fdTemplate = "\\\\.\\pipe\\%s"
telemetryServiceProcessName = "azure-vnet-telemetry.exe"
cniInstallDir = "c:\\k\\azurecni\\bin"
TelemetryServiceProcessName = "azure-vnet-telemetry.exe"
CniInstallDir = "c:\\k\\azurecni\\bin"
metadataFile = "azuremetadata.json"
)

Expand Down Expand Up @@ -43,7 +43,7 @@ func (tb *TelemetryBuffer) Cleanup(name string) error {
}

// Check if telemetry unix domain socket exists
func checkIfSockExists() bool {
func SockExists() bool {
if _, err := os.Stat(fmt.Sprintf(fdTemplate, FdName)); !os.IsNotExist(err) {
return true
}
Expand Down