From 2addce3efc4a98d033855e76181f1636c701606e Mon Sep 17 00:00:00 2001 From: Tamilmani Manoharan Date: Tue, 26 Feb 2019 18:54:44 -0800 Subject: [PATCH 1/3] 1. fix for closing telemetry socket in cni 2. fix for closing connection socket if server receives error on read --- cni/network/plugin/main.go | 8 +++++--- cni/telemetry/service/telemetrymain.go | 14 ------------- telemetry/telemetrybuffer.go | 28 +++++++++++++++++++++++--- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/cni/network/plugin/main.go b/cni/network/plugin/main.go index 226c2b04cc..6845f8a176 100644 --- a/cni/network/plugin/main.go +++ b/cni/network/plugin/main.go @@ -184,6 +184,8 @@ func main() { } } + defer tb.Close() + t := time.Now() cniReport.Timestamp = t.Format("2006-01-02 15:04:05") cniReport.GetReport(pluginName, version, ipamQueryURL) @@ -205,7 +207,7 @@ func main() { if err != nil { log.Printf("Failed to create network plugin, err:%v.\n", err) reportPluginError(reportManager, tb, err) - os.Exit(1) + return } netPlugin.SetCNIReport(cniReport) @@ -213,7 +215,7 @@ func main() { if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil { log.Printf("Failed to initialize key-value store of network plugin, err:%v.\n", err) reportPluginError(reportManager, tb, err) - os.Exit(1) + return } defer func() { @@ -222,7 +224,7 @@ func main() { } if recover() != nil { - os.Exit(1) + return } }() diff --git a/cni/telemetry/service/telemetrymain.go b/cni/telemetry/service/telemetrymain.go index 87edf9cec7..519c789503 100644 --- a/cni/telemetry/service/telemetrymain.go +++ b/cni/telemetry/service/telemetrymain.go @@ -3,10 +3,8 @@ package main // Entry point of the telemetry service if started by CNI import ( - "fmt" "time" - "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/telemetry" ) @@ -19,28 +17,16 @@ func main() { var tb *telemetry.TelemetryBuffer var err error - log.SetName(azurecnitelemetry) - log.SetLevel(log.LevelInfo) - err = log.SetTarget(log.TargetLogfile) - if err != nil { - fmt.Printf("log settarget failed") - } - - log.Printf("[Telemetry] TelemetryBuffer process started") for { tb = telemetry.NewTelemetryBuffer("") err = tb.StartServer() if err == nil || tb.FdExists { - log.Printf("[Telemetry] Server started") break } tb.Cleanup(telemetry.FdName) - - log.Printf("[Telemetry] Failed to establish telemetry buffer connection.") time.Sleep(time.Millisecond * 200) } tb.BufferAndPushData(reportToHostIntervalInSeconds) - log.Printf("[Telemetry] TelemetryBuffer process exiting") } diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index d2c3bdf95f..b1af199a74 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -82,14 +82,21 @@ func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer { return &tb } +func remove(s []net.Conn, i int) []net.Conn { + s[i] = s[len(s)-1] + return s[:len(s)-1] +} + // Starts Telemetry server listening on unix domain socket func (tb *TelemetryBuffer) StartServer() error { err := tb.Listen(FdName) if err != nil { tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied") + telemetryLogger.Printf("Listen returns: %v", err.Error()) return err } + telemetryLogger.Printf("Telemetry service started") // Spawn server goroutine to handle incoming connections go func() { for { @@ -121,6 +128,15 @@ func (tb *TelemetryBuffer) StartServer() error { json.Unmarshal([]byte(reportStr), &cnsReport) tb.data <- cnsReport } + } else { + telemetryLogger.Printf("Server closing client connection") + for index, value := range tb.connections { + if value == conn { + conn.Close() + tb.connections = remove(tb.connections, index) + return + } + } } } }() @@ -144,7 +160,7 @@ func (tb *TelemetryBuffer) Connect() error { // BufferAndPushData - BufferAndPushData running an instance if it isn't already being run elsewhere func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) { - defer tb.close() + defer tb.Close() if !tb.FdExists { telemetryLogger.Printf("[Telemetry] Buffer telemetry data and send it to host") if intervalms < DefaultInterval { @@ -167,11 +183,13 @@ func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) { telemetryLogger.Printf("[Telemetry] Got data..Append it to buffer") tb.payload.push(report) case <-tb.cancel: + telemetryLogger.Printf("server cancel event") goto EXIT } } } else { <-tb.cancel + telemetryLogger.Printf("Received cancel event") } EXIT: @@ -201,21 +219,25 @@ func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) { // Cancel - signal to tear down telemetry buffer func (tb *TelemetryBuffer) Cancel() { + telemetryLogger.Printf("Calling cancel") tb.cancel <- true } -// close - close all connections -func (tb *TelemetryBuffer) close() { +// Close - close all connections +func (tb *TelemetryBuffer) Close() { if tb.client != nil { + telemetryLogger.Printf("client close") tb.client.Close() } if tb.listener != nil { + telemetryLogger.Printf("server close") tb.listener.Close() } for _, conn := range tb.connections { if conn != nil { + telemetryLogger.Printf("connection close") conn.Close() } } From a0e355ed2fe0df2c02711e27774387aa17de1e8a Mon Sep 17 00:00:00 2001 From: Tamilmani Manoharan Date: Wed, 27 Feb 2019 17:42:33 -0800 Subject: [PATCH 2/3] added uts and addressed comments --- telemetry/telemetry_test.go | 10 +++++++++- telemetry/telemetrybuffer.go | 10 +++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go index 0223fa71ce..90981aebaa 100644 --- a/telemetry/telemetry_test.go +++ b/telemetry/telemetry_test.go @@ -82,7 +82,6 @@ func TestMain(m *testing.M) { } exitCode := m.Run() - tb.Cancel() tb.Cleanup(FdName) os.Exit(exitCode) } @@ -144,6 +143,15 @@ func TestReceiveTelemetryData(t *testing.T) { t.Errorf("payload doesn't contain CNI report") } } + +func TestCloseTelemetryConnection(t *testing.T) { + tb.Cancel() + time.Sleep(300 * time.Millisecond) + if len(tb.connections) != 0 { + t.Errorf("server didn't close connection") + } +} + func TestSetReportState(t *testing.T) { err := reportManager.SetReportState("a.json") if err != nil { diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index b1af199a74..1fcef9c752 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -68,7 +68,7 @@ func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer { tb.data = make(chan interface{}) tb.cancel = make(chan bool, 1) - tb.connections = make([]net.Conn, 1) + tb.connections = make([]net.Conn, 0) tb.payload.DNCReports = make([]DNCReport, 0) tb.payload.CNIReports = make([]CNIReport, 0) tb.payload.NPMReports = make([]NPMReport, 0) @@ -140,6 +140,8 @@ func (tb *TelemetryBuffer) StartServer() error { } } }() + } else { + return } } }() @@ -219,7 +221,6 @@ func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) { // Cancel - signal to tear down telemetry buffer func (tb *TelemetryBuffer) Cancel() { - telemetryLogger.Printf("Calling cancel") tb.cancel <- true } @@ -228,17 +229,20 @@ func (tb *TelemetryBuffer) Close() { if tb.client != nil { telemetryLogger.Printf("client close") tb.client.Close() + tb.client = nil } if tb.listener != nil { telemetryLogger.Printf("server close") tb.listener.Close() + tb.listener = nil } - for _, conn := range tb.connections { + for index, conn := range tb.connections { if conn != nil { telemetryLogger.Printf("connection close") conn.Close() + tb.connections[index] = nil } } } From c79b47f234f1c616f8a95a28ed3df6899ff95e36 Mon Sep 17 00:00:00 2001 From: Tamilmani Manoharan Date: Wed, 27 Feb 2019 17:48:06 -0800 Subject: [PATCH 3/3] removed from slice after closing connection --- telemetry/telemetrybuffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 1fcef9c752..d3689e2ff5 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -242,7 +242,7 @@ func (tb *TelemetryBuffer) Close() { if conn != nil { telemetryLogger.Printf("connection close") conn.Close() - tb.connections[index] = nil + remove(tb.connections, index) } } }