diff --git a/cni/network/plugin/main.go b/cni/network/plugin/main.go index 226c2b04cc..6845f8a176 100644 --- a/cni/network/plugin/main.go +++ b/cni/network/plugin/main.go @@ -184,6 +184,8 @@ func main() { } } + defer tb.Close() + t := time.Now() cniReport.Timestamp = t.Format("2006-01-02 15:04:05") cniReport.GetReport(pluginName, version, ipamQueryURL) @@ -205,7 +207,7 @@ func main() { if err != nil { log.Printf("Failed to create network plugin, err:%v.\n", err) reportPluginError(reportManager, tb, err) - os.Exit(1) + return } netPlugin.SetCNIReport(cniReport) @@ -213,7 +215,7 @@ func main() { if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil { log.Printf("Failed to initialize key-value store of network plugin, err:%v.\n", err) reportPluginError(reportManager, tb, err) - os.Exit(1) + return } defer func() { @@ -222,7 +224,7 @@ func main() { } if recover() != nil { - os.Exit(1) + return } }() diff --git a/cni/telemetry/service/telemetrymain.go b/cni/telemetry/service/telemetrymain.go index 87edf9cec7..519c789503 100644 --- a/cni/telemetry/service/telemetrymain.go +++ b/cni/telemetry/service/telemetrymain.go @@ -3,10 +3,8 @@ package main // Entry point of the telemetry service if started by CNI import ( - "fmt" "time" - "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/telemetry" ) @@ -19,28 +17,16 @@ func main() { var tb *telemetry.TelemetryBuffer var err error - log.SetName(azurecnitelemetry) - log.SetLevel(log.LevelInfo) - err = log.SetTarget(log.TargetLogfile) - if err != nil { - fmt.Printf("log settarget failed") - } - - log.Printf("[Telemetry] TelemetryBuffer process started") for { tb = telemetry.NewTelemetryBuffer("") err = tb.StartServer() if err == nil || tb.FdExists { - log.Printf("[Telemetry] Server started") break } tb.Cleanup(telemetry.FdName) - - log.Printf("[Telemetry] Failed to establish telemetry buffer connection.") time.Sleep(time.Millisecond * 200) } tb.BufferAndPushData(reportToHostIntervalInSeconds) - log.Printf("[Telemetry] TelemetryBuffer process exiting") } diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go index 0223fa71ce..90981aebaa 100644 --- a/telemetry/telemetry_test.go +++ b/telemetry/telemetry_test.go @@ -82,7 +82,6 @@ func TestMain(m *testing.M) { } exitCode := m.Run() - tb.Cancel() tb.Cleanup(FdName) os.Exit(exitCode) } @@ -144,6 +143,15 @@ func TestReceiveTelemetryData(t *testing.T) { t.Errorf("payload doesn't contain CNI report") } } + +func TestCloseTelemetryConnection(t *testing.T) { + tb.Cancel() + time.Sleep(300 * time.Millisecond) + if len(tb.connections) != 0 { + t.Errorf("server didn't close connection") + } +} + func TestSetReportState(t *testing.T) { err := reportManager.SetReportState("a.json") if err != nil { diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index d2c3bdf95f..d3689e2ff5 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -68,7 +68,7 @@ func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer { tb.data = make(chan interface{}) tb.cancel = make(chan bool, 1) - tb.connections = make([]net.Conn, 1) + tb.connections = make([]net.Conn, 0) tb.payload.DNCReports = make([]DNCReport, 0) tb.payload.CNIReports = make([]CNIReport, 0) tb.payload.NPMReports = make([]NPMReport, 0) @@ -82,14 +82,21 @@ func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer { return &tb } +func remove(s []net.Conn, i int) []net.Conn { + s[i] = s[len(s)-1] + return s[:len(s)-1] +} + // Starts Telemetry server listening on unix domain socket func (tb *TelemetryBuffer) StartServer() error { err := tb.Listen(FdName) if err != nil { tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied") + telemetryLogger.Printf("Listen returns: %v", err.Error()) return err } + telemetryLogger.Printf("Telemetry service started") // Spawn server goroutine to handle incoming connections go func() { for { @@ -121,9 +128,20 @@ func (tb *TelemetryBuffer) StartServer() error { json.Unmarshal([]byte(reportStr), &cnsReport) tb.data <- cnsReport } + } else { + telemetryLogger.Printf("Server closing client connection") + for index, value := range tb.connections { + if value == conn { + conn.Close() + tb.connections = remove(tb.connections, index) + return + } + } } } }() + } else { + return } } }() @@ -144,7 +162,7 @@ func (tb *TelemetryBuffer) Connect() error { // BufferAndPushData - BufferAndPushData running an instance if it isn't already being run elsewhere func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) { - defer tb.close() + defer tb.Close() if !tb.FdExists { telemetryLogger.Printf("[Telemetry] Buffer telemetry data and send it to host") if intervalms < DefaultInterval { @@ -167,11 +185,13 @@ func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) { telemetryLogger.Printf("[Telemetry] Got data..Append it to buffer") tb.payload.push(report) case <-tb.cancel: + telemetryLogger.Printf("server cancel event") goto EXIT } } } else { <-tb.cancel + telemetryLogger.Printf("Received cancel event") } EXIT: @@ -204,19 +224,25 @@ func (tb *TelemetryBuffer) Cancel() { tb.cancel <- true } -// close - close all connections -func (tb *TelemetryBuffer) close() { +// Close - close all connections +func (tb *TelemetryBuffer) Close() { if tb.client != nil { + telemetryLogger.Printf("client close") tb.client.Close() + tb.client = nil } if tb.listener != nil { + telemetryLogger.Printf("server close") tb.listener.Close() + tb.listener = nil } - for _, conn := range tb.connections { + for index, conn := range tb.connections { if conn != nil { + telemetryLogger.Printf("connection close") conn.Close() + remove(tb.connections, index) } } }