diff --git a/cni/network/plugin/main.go b/cni/network/plugin/main.go index 6845f8a176..561c4fc8a6 100644 --- a/cni/network/plugin/main.go +++ b/cni/network/plugin/main.go @@ -133,6 +133,52 @@ func handleIfCniUpdate(update func(*skel.CmdArgs) error) (bool, error) { return isupdate, nil } +// startTelemetryService - Kills if any telemetry service runs and start new telemetry service +func startTelemetryService(path string) error { + platform.KillProcessByName(telemetry.TelemetryServiceProcessName) + + log.Printf("[cni] Starting telemetry service process") + + if err := common.StartProcess(path); err != nil { + log.Printf("[Telemetry] Failed to start telemetry service process :%v", err) + return err + } + + log.Printf("[cni] Telemetry service started") + + for attempt := 0; attempt < 5; attempt++ { + if telemetry.SockExists() { + break + } + + time.Sleep(200 * time.Millisecond) + } + + return nil +} + +func connectToTelemetryService(tb *telemetry.TelemetryBuffer) { + path := fmt.Sprintf("%v/%v", telemetry.CniInstallDir, telemetry.TelemetryServiceProcessName) + + for attempt := 0; attempt < 2; attempt++ { + if err := tb.Connect(); err != nil { + log.Printf("Connection to telemetry socket failed: %v", err) + tb.Cleanup(telemetry.FdName) + + if isExists, _ := common.CheckIfFileExists(path); !isExists { + log.Printf("Skip starting telemetry service as file didn't exist") + return + } + + startTelemetryService(path) + } else { + tb.Connected = true + log.Printf("Connected to telemetry service") + return + } + } +} + // Main is the entry point for CNI network plugin. func main() { @@ -170,20 +216,7 @@ func main() { } tb := telemetry.NewTelemetryBuffer("") - - for attempt := 0; attempt < 2; attempt++ { - err = tb.Connect() - if err != nil { - log.Printf("Connection to telemetry socket failed: %v", err) - tb.Cleanup(telemetry.FdName) - telemetry.StartTelemetryService() - } else { - tb.Connected = true - log.Printf("Connected to telemetry service") - break - } - } - + connectToTelemetryService(tb) defer tb.Close() t := time.Now() diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 488044af34..894e9e027f 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -17,6 +17,7 @@ import ( "strings" "github.com/Azure/azure-container-networking/common" + "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/platform" ) @@ -236,29 +237,16 @@ func (report *NPMReport) GetReport(clusterID, nodeName, npmVersion, kubernetesVe // SendReport will send telemetry report to HostNetAgent. func (reportMgr *ReportManager) SendReport(tb *TelemetryBuffer) error { var err error - if tb != nil && tb.Connected { - telemetryLogger.Printf("[Telemetry] Going to send Telemetry report to hostnetagent") - - switch reportMgr.Report.(type) { - case *CNIReport: - telemetryLogger.Printf("[Telemetry] %+v", reportMgr.Report.(*CNIReport)) - case *NPMReport: - telemetryLogger.Printf("[Telemetry] %+v", reportMgr.Report.(*NPMReport)) - case *DNCReport: - telemetryLogger.Printf("[Telemetry] %+v", reportMgr.Report.(*DNCReport)) - default: - telemetryLogger.Printf("[Telemetry] Invalid report type") - } + var report []byte - report, err := reportMgr.ReportToBytes() + if tb != nil && tb.Connected { + report, err = reportMgr.ReportToBytes() if err == nil { // If write fails, try to re-establish connections as server/client if _, err = tb.Write(report); err != nil { tb.Cancel() } } - } else { - err = fmt.Errorf("Not connected to telemetry server or tb is nil") } return err @@ -284,13 +272,12 @@ func (reportMgr *ReportManager) SetReportState(telemetryFile string) error { _, err = f.Write(reportBytes) if err != nil { - telemetryLogger.Printf("[Telemetry] Error while writing to file %v", err) + log.Printf("[Telemetry] Error while writing to file %v", err) return fmt.Errorf("[Telemetry] Error while writing to file %v", err) } // set IsNewInstance in report reflect.ValueOf(reportMgr.Report).Elem().FieldByName("IsNewInstance").SetBool(false) - telemetryLogger.Printf("[Telemetry] SetReportState succeeded") return nil } @@ -298,7 +285,7 @@ func (reportMgr *ReportManager) SetReportState(telemetryFile string) error { func (reportMgr *ReportManager) GetReportState(telemetryFile string) bool { // try to set IsNewInstance in report if _, err := os.Stat(telemetryFile); os.IsNotExist(err) { - telemetryLogger.Printf("[Telemetry] File not exist %v", telemetryFile) + log.Printf("[Telemetry] File not exist %v", telemetryFile) reflect.ValueOf(reportMgr.Report).Elem().FieldByName("IsNewInstance").SetBool(true) return false } @@ -430,7 +417,10 @@ func (report *CNIReport) GetOrchestratorDetails() { } // ReportToBytes - returns the report bytes -func (reportMgr *ReportManager) ReportToBytes() (report []byte, err error) { +func (reportMgr *ReportManager) ReportToBytes() ([]byte, error) { + var err error + var report []byte + switch reportMgr.Report.(type) { case *CNIReport: case *NPMReport: @@ -440,9 +430,10 @@ func (reportMgr *ReportManager) ReportToBytes() (report []byte, err error) { err = fmt.Errorf("[Telemetry] Invalid report type") } - if err == nil { - report, err = json.Marshal(reportMgr.Report) + if err != nil { + return []byte{}, err } - return + report, err = json.Marshal(reportMgr.Report) + return report, err } diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go index a12703d107..639585f1b3 100644 --- a/telemetry/telemetry_test.go +++ b/telemetry/telemetry_test.go @@ -94,6 +94,10 @@ func TestMain(m *testing.M) { reportManager.ContentType = "application/json" reportManager.Report = &CNIReport{} + if err := InitTelemetryLogger(); err == nil { + defer CloseTelemetryLogger() + } + tb = NewTelemetryBuffer(hostAgentUrl) err = tb.StartServer() if err == nil { @@ -158,6 +162,14 @@ func TestSendTelemetry(t *testing.T) { if err != nil { t.Errorf("SendTelemetry failed due to %v", err) } + + i := 3 + rpMgr := &ReportManager{} + rpMgr.Report = &i + err = rpMgr.SendReport(tb) + if err == nil { + t.Errorf("SendTelemetry not failed for incorrect report type") + } } func TestReceiveTelemetryData(t *testing.T) { @@ -214,6 +226,10 @@ func TestClientCloseTelemetryConnection(t *testing.T) { go tb.BufferAndPushData(0) } + if !SockExists() { + t.Errorf("telemetry sock doesn't exist") + } + // create client telemetrybuffer and connect to server tb1 := NewTelemetryBuffer(hostAgentUrl) if err := tb1.Connect(); err != nil { diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 9ee8a6b948..d88e473277 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -15,9 +15,7 @@ import ( "sync" "time" - "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" - "github.com/Azure/azure-container-networking/platform" ) // FdName - file descriptor name @@ -30,7 +28,7 @@ const ( FdName = "azure-vnet-telemetry" Delimiter = '\n' azureHostReportURL = "http://168.63.129.16/machine/plugins?comp=netagent&type=payload" - DefaultInterval = 10 * time.Second + DefaultInterval = 30 * time.Second logName = "azure-vnet-telemetry" MaxPayloadSize uint16 = 65535 dnc = "DNC" @@ -64,6 +62,14 @@ type Payload struct { CNSReports []CNSReport } +func InitTelemetryLogger() error { + return telemetryLogger.SetTarget(log.TargetLogfile) +} + +func CloseTelemetryLogger() { + telemetryLogger.Close() +} + // NewTelemetryBuffer - create a new TelemetryBuffer func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer { var tb TelemetryBuffer @@ -80,11 +86,6 @@ func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer { tb.payload.NPMReports = make([]NPMReport, 0) tb.payload.CNSReports = make([]CNSReport, 0) - err := telemetryLogger.SetTarget(log.TargetLogfile) - if err != nil { - fmt.Printf("Failed to configure logging: %v\n", err) - } - return &tb } @@ -107,6 +108,8 @@ func (tb *TelemetryBuffer) StartServer() error { return err } + InitTelemetryLogger() + telemetryLogger.Printf("Telemetry service started") // Spawn server goroutine to handle incoming connections go func() { @@ -251,7 +254,6 @@ func (tb *TelemetryBuffer) Cancel() { // Close - close all connections func (tb *TelemetryBuffer) Close() { if tb.client != nil { - telemetryLogger.Printf("client close") tb.client.Close() tb.client = nil } @@ -259,7 +261,7 @@ func (tb *TelemetryBuffer) Close() { if tb.listener != nil { telemetryLogger.Printf("server close") tb.listener.Close() - tb.listener = nil + CloseTelemetryLogger() } tb.mutex.Lock() @@ -267,7 +269,6 @@ func (tb *TelemetryBuffer) Close() { for _, conn := range tb.connections { if conn != nil { - telemetryLogger.Printf("connection close as server closed") conn.Close() } } @@ -390,11 +391,12 @@ func getHostMetadata() (Metadata, error) { if err == nil { var metadata Metadata if err = json.Unmarshal(content, &metadata); err == nil { - telemetryLogger.Printf("[Telemetry] Returning hostmetadata from state") return metadata, nil } } + telemetryLogger.Printf("[Telemetry] Request metadata from wireserver") + req, err := http.NewRequest("GET", metadataURL, nil) if err != nil { return Metadata{}, err @@ -424,27 +426,3 @@ func getHostMetadata() (Metadata, error) { return metareport.Metadata, err } - -// StartTelemetryService - Kills if any telemetry service runs and start new telemetry service -func StartTelemetryService() error { - platform.KillProcessByName(telemetryServiceProcessName) - - telemetryLogger.Printf("[Telemetry] Starting telemetry service process") - path := fmt.Sprintf("%v/%v", cniInstallDir, telemetryServiceProcessName) - if err := common.StartProcess(path); err != nil { - telemetryLogger.Printf("[Telemetry] Failed to start telemetry service process :%v", err) - return err - } - - telemetryLogger.Printf("[Telemetry] Telemetry service started") - - for attempt := 0; attempt < 5; attempt++ { - if checkIfSockExists() { - break - } - - time.Sleep(200 * time.Millisecond) - } - - return nil -} diff --git a/telemetry/telemetrybuffer_linux.go b/telemetry/telemetrybuffer_linux.go index ba6cced8b1..b5a68bc33a 100644 --- a/telemetry/telemetrybuffer_linux.go +++ b/telemetry/telemetrybuffer_linux.go @@ -10,9 +10,9 @@ import ( ) const ( - fdTemplate = "/tmp/%s.sock" - telemetryServiceProcessName = "azure-vnet-telemetry" - cniInstallDir = "/opt/cni/bin" + fdTemplate = "/var/run/%s.sock" + TelemetryServiceProcessName = "azure-vnet-telemetry" + CniInstallDir = "/opt/cni/bin" metadataFile = "/tmp/azuremetadata.json" ) @@ -41,7 +41,7 @@ func (tb *TelemetryBuffer) Cleanup(name string) error { return os.Remove(fmt.Sprintf(fdTemplate, name)) } -func checkIfSockExists() bool { +func SockExists() bool { if _, err := os.Stat(fmt.Sprintf(fdTemplate, FdName)); !os.IsNotExist(err) { return true } diff --git a/telemetry/telemetrybuffer_windows.go b/telemetry/telemetrybuffer_windows.go index a47efc0052..372bf03f07 100644 --- a/telemetry/telemetrybuffer_windows.go +++ b/telemetry/telemetrybuffer_windows.go @@ -12,8 +12,8 @@ import ( const ( fdTemplate = "\\\\.\\pipe\\%s" - telemetryServiceProcessName = "azure-vnet-telemetry.exe" - cniInstallDir = "c:\\k\\azurecni\\bin" + TelemetryServiceProcessName = "azure-vnet-telemetry.exe" + CniInstallDir = "c:\\k\\azurecni\\bin" metadataFile = "azuremetadata.json" ) @@ -43,7 +43,7 @@ func (tb *TelemetryBuffer) Cleanup(name string) error { } // Check if telemetry unix domain socket exists -func checkIfSockExists() bool { +func SockExists() bool { if _, err := os.Stat(fmt.Sprintf(fdTemplate, FdName)); !os.IsNotExist(err) { return true }