Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cni/network/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ func main() {
}
}

defer tb.Close()

t := time.Now()
cniReport.Timestamp = t.Format("2006-01-02 15:04:05")
cniReport.GetReport(pluginName, version, ipamQueryURL)
Expand All @@ -205,15 +207,15 @@ func main() {
if err != nil {
log.Printf("Failed to create network plugin, err:%v.\n", err)
reportPluginError(reportManager, tb, err)
os.Exit(1)
return
}

netPlugin.SetCNIReport(cniReport)

if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil {
log.Printf("Failed to initialize key-value store of network plugin, err:%v.\n", err)
reportPluginError(reportManager, tb, err)
os.Exit(1)
return
}

defer func() {
Expand All @@ -222,7 +224,7 @@ func main() {
}

if recover() != nil {
os.Exit(1)
return
}
}()

Expand Down
14 changes: 0 additions & 14 deletions cni/telemetry/service/telemetrymain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package main
// Entry point of the telemetry service if started by CNI

import (
"fmt"
"time"

"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/telemetry"
)

Expand All @@ -19,28 +17,16 @@ func main() {
var tb *telemetry.TelemetryBuffer
var err error

log.SetName(azurecnitelemetry)
log.SetLevel(log.LevelInfo)
err = log.SetTarget(log.TargetLogfile)
if err != nil {
fmt.Printf("log settarget failed")
}

log.Printf("[Telemetry] TelemetryBuffer process started")
for {
tb = telemetry.NewTelemetryBuffer("")
err = tb.StartServer()
if err == nil || tb.FdExists {
log.Printf("[Telemetry] Server started")
break
}

tb.Cleanup(telemetry.FdName)

log.Printf("[Telemetry] Failed to establish telemetry buffer connection.")
time.Sleep(time.Millisecond * 200)
}

tb.BufferAndPushData(reportToHostIntervalInSeconds)
log.Printf("[Telemetry] TelemetryBuffer process exiting")
}
10 changes: 9 additions & 1 deletion telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func TestMain(m *testing.M) {
}

exitCode := m.Run()
tb.Cancel()
tb.Cleanup(FdName)
os.Exit(exitCode)
}
Expand Down Expand Up @@ -144,6 +143,15 @@ func TestReceiveTelemetryData(t *testing.T) {
t.Errorf("payload doesn't contain CNI report")
}
}

func TestCloseTelemetryConnection(t *testing.T) {
tb.Cancel()
time.Sleep(300 * time.Millisecond)
if len(tb.connections) != 0 {
t.Errorf("server didn't close connection")
}
}

func TestSetReportState(t *testing.T) {
err := reportManager.SetReportState("a.json")
if err != nil {
Expand Down
36 changes: 31 additions & 5 deletions telemetry/telemetrybuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer {

tb.data = make(chan interface{})
tb.cancel = make(chan bool, 1)
tb.connections = make([]net.Conn, 1)
tb.connections = make([]net.Conn, 0)
tb.payload.DNCReports = make([]DNCReport, 0)
tb.payload.CNIReports = make([]CNIReport, 0)
tb.payload.NPMReports = make([]NPMReport, 0)
Expand All @@ -82,14 +82,21 @@ func NewTelemetryBuffer(hostReportURL string) *TelemetryBuffer {
return &tb
}

func remove(s []net.Conn, i int) []net.Conn {
s[i] = s[len(s)-1]
return s[:len(s)-1]
}

// Starts Telemetry server listening on unix domain socket
func (tb *TelemetryBuffer) StartServer() error {
err := tb.Listen(FdName)
if err != nil {
tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied")
telemetryLogger.Printf("Listen returns: %v", err.Error())
return err
}

telemetryLogger.Printf("Telemetry service started")
// Spawn server goroutine to handle incoming connections
go func() {
for {
Expand Down Expand Up @@ -121,9 +128,20 @@ func (tb *TelemetryBuffer) StartServer() error {
json.Unmarshal([]byte(reportStr), &cnsReport)
tb.data <- cnsReport
}
} else {
telemetryLogger.Printf("Server closing client connection")
for index, value := range tb.connections {
if value == conn {
conn.Close()
tb.connections = remove(tb.connections, index)
return
}
}
}
}
}()
} else {
return
}
}
}()
Expand All @@ -144,7 +162,7 @@ func (tb *TelemetryBuffer) Connect() error {

// BufferAndPushData - BufferAndPushData running an instance if it isn't already being run elsewhere
func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) {
defer tb.close()
defer tb.Close()
if !tb.FdExists {
telemetryLogger.Printf("[Telemetry] Buffer telemetry data and send it to host")
if intervalms < DefaultInterval {
Expand All @@ -167,11 +185,13 @@ func (tb *TelemetryBuffer) BufferAndPushData(intervalms time.Duration) {
telemetryLogger.Printf("[Telemetry] Got data..Append it to buffer")
tb.payload.push(report)
case <-tb.cancel:
telemetryLogger.Printf("server cancel event")
goto EXIT
}
}
} else {
<-tb.cancel
telemetryLogger.Printf("Received cancel event")
}

EXIT:
Expand Down Expand Up @@ -204,19 +224,25 @@ func (tb *TelemetryBuffer) Cancel() {
tb.cancel <- true
}

// close - close all connections
func (tb *TelemetryBuffer) close() {
// Close - close all connections
func (tb *TelemetryBuffer) Close() {
if tb.client != nil {
telemetryLogger.Printf("client close")
tb.client.Close()
tb.client = nil
}

if tb.listener != nil {
telemetryLogger.Printf("server close")
tb.listener.Close()
tb.listener = nil
}

for _, conn := range tb.connections {
for index, conn := range tb.connections {
if conn != nil {
telemetryLogger.Printf("connection close")
conn.Close()
remove(tb.connections, index)
}
}
}
Expand Down