Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cni/network/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ package main
import (
"encoding/json"
"fmt"
"github.com/Azure/azure-container-networking/nns"
"io/ioutil"
"os"
"reflect"
"time"

"github.com/Azure/azure-container-networking/nns"

"github.com/Azure/azure-container-networking/cni"
"github.com/Azure/azure-container-networking/cni/network"
"github.com/Azure/azure-container-networking/common"
Expand Down Expand Up @@ -183,7 +184,7 @@ func main() {
// CNI Acquires lock
if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil {
log.Errorf("Failed to initialize key-value store of network plugin, err:%v.\n", err)
tb := telemetry.NewTelemetryBuffer("")
tb := telemetry.NewTelemetryBuffer()
if tberr := tb.Connect(); tberr == nil {
reportPluginError(reportManager, tb, err)
tb.Close()
Expand Down Expand Up @@ -211,7 +212,7 @@ func main() {

// Start telemetry process if not already started. This should be done inside lock, otherwise multiple process
// end up creating/killing telemetry process results in undesired state.
tb := telemetry.NewTelemetryBuffer("")
tb := telemetry.NewTelemetryBuffer()
tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
defer tb.Close()

Expand Down
8 changes: 4 additions & 4 deletions cni/telemetry/service/telemetrymain.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,14 @@ func main() {
log.Logf("Config after setting defaults %+v", config)

// Cleaning up orphan socket if present
tbtemp := telemetry.NewTelemetryBuffer("")
tbtemp := telemetry.NewTelemetryBuffer()
tbtemp.Cleanup(telemetry.FdName)

for {
tb = telemetry.NewTelemetryBuffer("")
tb = telemetry.NewTelemetryBuffer()

log.Logf("[Telemetry] Starting telemetry server")
err = tb.StartServer(config.DisableTelemetryToNetAgent)
err = tb.StartServer()
if err == nil || tb.FdExists {
break
}
Expand All @@ -189,7 +189,7 @@ func main() {
err = telemetry.CreateAITelemetryHandle(aiConfig, config.DisableAll, config.DisableTrace, config.DisableMetric)
log.Printf("[Telemetry] AI Handle creation status:%v", err)
log.Logf("[Telemetry] Report to host for an interval of %d seconds", config.ReportToHostIntervalInSeconds)
tb.BufferAndPushData(config.ReportToHostIntervalInSeconds * time.Second)
tb.PushData()
telemetry.CloseAITelemetryHandle()

log.Close()
Expand Down
2 changes: 1 addition & 1 deletion cnms/service/networkmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func main() {
CNIReport: reportManager.Report.(*telemetry.CNIReport),
}

tb := telemetry.NewTelemetryBuffer("")
tb := telemetry.NewTelemetryBuffer()
tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
defer tb.Close()

Expand Down
70 changes: 0 additions & 70 deletions cns/logger/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,11 @@
package logger

import (
"reflect"
"regexp"
"time"

"github.com/Azure/azure-container-networking/aitelemetry"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/platform"
"github.com/Azure/azure-container-networking/telemetry"
"github.com/google/uuid"
)

const (
// CNSTelemetryFile - telemetry file path.
cnsTelemetryFile = platform.CNSRuntimePath + "AzureCNSTelemetry.json"
errorcodePrefix = 5
heartbeatIntervalInMinutes = 30
retryWaitTimeInSeconds = 60
telemetryNumRetries = 5
telemetryWaitTimeInMilliseconds = 200
)

var codeRegex = regexp.MustCompile(`Code:(\w*)`)

func SendHeartBeat(heartbeatIntervalInMins int, stopheartbeat chan bool) {
heartbeat := time.NewTicker(time.Minute * time.Duration(heartbeatIntervalInMins)).C
metric := aitelemetry.Metric{
Expand All @@ -44,55 +26,3 @@ func SendHeartBeat(heartbeatIntervalInMins int, stopheartbeat chan bool) {
}
}
}

// SendCnsTelemetry - handles cns telemetry reports
func SendToTelemetryService(reports chan interface{}, telemetryStopProcessing chan bool) {

CONNECT:
tb := telemetry.NewTelemetryBuffer("")
tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
if tb.Connected {

reportMgr := telemetry.ReportManager{
ContentType: telemetry.ContentType,
Report: &telemetry.CNSReport{},
}

reportMgr.GetReportState(cnsTelemetryFile)
reportMgr.GetKernelVersion()

for {
select {
case msg := <-reports:
codeStr := codeRegex.FindString(msg.(string))
if len(codeStr) > errorcodePrefix {
reflect.ValueOf(reportMgr.Report).Elem().FieldByName("Errorcode").SetString(codeStr[errorcodePrefix:])
}

reflect.ValueOf(reportMgr.Report).Elem().FieldByName("EventMessage").SetString(msg.(string))
case <-telemetryStopProcessing:
tb.Close()
return
}

reflect.ValueOf(reportMgr.Report).Elem().FieldByName("Timestamp").SetString(time.Now().UTC().String())
if id, err := uuid.NewUUID(); err == nil {
reflect.ValueOf(reportMgr.Report).Elem().FieldByName("UUID").SetString(id.String())
}

if !reportMgr.GetReportState(cnsTelemetryFile) {
reportMgr.SetReportState(cnsTelemetryFile)
}

report, err := reportMgr.ReportToBytes()
if err == nil {
// If write fails, try to re-establish connections as server/client
if _, err = tb.Write(report); err != nil {
log.Logf("[CNS-Telemetry] Telemetry write failed: %v", err)
tb.Close()
goto CONNECT
}
}
}
}
}
4 changes: 0 additions & 4 deletions cns/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ func InitAI(aiConfig aitelemetry.AIConfig, disableTraceLogging, disableMetricLog
Log.DisableEventLogging = disableEventLogging
}

func InitReportChannel(reports chan interface{}) {
Log.logger.SetChannel(reports)
}

// Close CNS and AI telemetry handle
func Close() {
Log.logger.Close()
Expand Down
7 changes: 0 additions & 7 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ const (

// Version is populated by make during build.
var version string

// Reports channel
var reports = make(chan interface{})
var telemetryStopProcessing = make(chan bool)
var stopheartbeat = make(chan bool)
var stopSnapshots = make(chan bool)
Expand Down Expand Up @@ -436,7 +433,6 @@ func main() {
}

logger.InitAI(aiConfig, ts.DisableTrace, ts.DisableMetric, ts.DisableEvent)
logger.InitReportChannel(reports)
}

// Log platform information.
Expand Down Expand Up @@ -537,7 +533,6 @@ func main() {
}

if !disableTelemetry {
go logger.SendToTelemetryService(reports, telemetryStopProcessing)
go logger.SendHeartBeat(cnsconfig.TelemetrySettings.HeartBeatIntervalInMins, stopheartbeat)
go httpRestService.SendNCSnapShotPeriodically(cnsconfig.TelemetrySettings.SnapshotIntervalInMins, stopSnapshots)
}
Expand Down Expand Up @@ -846,8 +841,6 @@ func InitializeCRDState(httpRestService cns.HTTPService, cnsconfig configuration
<-time.NewTicker(cnsconfig.SyncHostNCVersionIntervalMs * time.Millisecond).C
httpRestServiceImplementation.SyncHostNCVersion(rootCxt, cnsconfig.ChannelMode, cnsconfig.SyncHostNCTimeoutMs)
}

logger.Printf("[Azure CNS] Exiting SyncHostNCVersion")
}()

return nil
Expand Down
29 changes: 2 additions & 27 deletions log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type Logger struct {
maxFileCount int
callCount int
directory string
reports chan interface{}
mutex *sync.Mutex
}

Expand Down Expand Up @@ -91,11 +90,6 @@ func (logger *Logger) SetLogFileLimits(maxFileSize int, maxFileCount int) {
logger.maxFileCount = maxFileCount
}

// SetChannel sets the channel for error message reports.
func (logger *Logger) SetChannel(reports chan interface{}) {
logger.reports = reports
}

// Close closes the log stream.
func (logger *Logger) Close() {
if logger.out != nil {
Expand Down Expand Up @@ -223,14 +217,7 @@ func (logger *Logger) Printf(format string, args ...interface{}) {
return
}

logger.mutex.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.mutex.Lock()

Do we still need to take Mutex lock?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.mutex.Lock()

missing mutex lock

logger.logf(format, args...)
logger.mutex.Unlock()
go func() {
if logger.reports != nil {
logger.reports <- fmt.Sprintf(format, args...)
}
}()
logger.Logf(format, args...)
}

// Debugf logs a formatted string at info level.
Expand All @@ -239,22 +226,10 @@ func (logger *Logger) Debugf(format string, args ...interface{}) {
return
}

logger.mutex.Lock()
logger.logf(format, args...)
logger.mutex.Unlock()
go func() {
if logger.reports != nil {
logger.reports <- fmt.Sprintf(format, args...)
}
}()
logger.Logf(format, args...)
}

// Errorf logs a formatted string at info level and sends the string to TelemetryBuffer.
func (logger *Logger) Errorf(format string, args ...interface{}) {
logger.Logf(format, args...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutex lock?

go func() {
if logger.reports != nil {
logger.reports <- fmt.Sprintf(format, args...)
}
}()
}
35 changes: 0 additions & 35 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,6 @@ type AIMetric struct {
Metric aitelemetry.Metric
}

// Azure CNS Telemetry Report structure.
type CNSReport struct {
IsNewInstance bool
CPUUsage string
MemoryUsage string
Processes string
EventMessage string
DncPartitionKey string
Timestamp string
UUID string
Errorcode string
Metadata common.Metadata `json:"compute"`
}

// ClusterState contains the current kubernetes cluster state.
type ClusterState struct {
PodCount int
Expand All @@ -142,24 +128,6 @@ type NPMReport struct {
Metadata common.Metadata `json:"compute"`
}

// DNCReport structure.
type DNCReport struct {
IsNewInstance bool
CPUUsage string
MemoryUsage string
Processes string
EventMessage string
PartitionKey string
Allocations string
Timestamp string
NumberOfNodes int
NumberOfNCs int
Orchestrator string
ContainerType string
Errorcode string
Metadata common.Metadata `json:"compute"`
}

// ReportManager structure.
type ReportManager struct {
HostNetAgentURL string
Expand Down Expand Up @@ -374,9 +342,6 @@ func (reportMgr *ReportManager) ReportToBytes() ([]byte, error) {

switch reportMgr.Report.(type) {
case *CNIReport:
case *NPMReport:
case *DNCReport:
case *CNSReport:
case *AIMetric:
default:
err = fmt.Errorf("[Telemetry] Invalid report type")
Expand Down
22 changes: 11 additions & 11 deletions telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ func TestMain(m *testing.M) {
reportManager.ContentType = "application/json"
reportManager.Report = &CNIReport{}

tb = NewTelemetryBuffer(hostAgentUrl)
err = tb.StartServer(false)
tb = NewTelemetryBuffer()
err = tb.StartServer()
if err == nil {
go tb.BufferAndPushData(0)
go tb.PushData()
}

if err := tb.Connect(); err != nil {
Expand Down Expand Up @@ -200,14 +200,14 @@ func TestCloseTelemetryConnection(t *testing.T) {

func TestServerCloseTelemetryConnection(t *testing.T) {
// create server telemetrybuffer and start server
tb = NewTelemetryBuffer(hostAgentUrl)
err := tb.StartServer(false)
tb = NewTelemetryBuffer()
err := tb.StartServer()
if err == nil {
go tb.BufferAndPushData(0)
go tb.PushData()
}

// create client telemetrybuffer and connect to server
tb1 := NewTelemetryBuffer(hostAgentUrl)
tb1 := NewTelemetryBuffer()
if err := tb1.Connect(); err != nil {
t.Errorf("connection to telemetry server failed %v", err)
}
Expand All @@ -233,18 +233,18 @@ func TestServerCloseTelemetryConnection(t *testing.T) {

func TestClientCloseTelemetryConnection(t *testing.T) {
// create server telemetrybuffer and start server
tb = NewTelemetryBuffer(hostAgentUrl)
err := tb.StartServer(false)
tb = NewTelemetryBuffer()
err := tb.StartServer()
if err == nil {
go tb.BufferAndPushData(0)
go tb.PushData()
}

if !SockExists() {
t.Errorf("telemetry sock doesn't exist")
}

// create client telemetrybuffer and connect to server
tb1 := NewTelemetryBuffer(hostAgentUrl)
tb1 := NewTelemetryBuffer()
if err := tb1.Connect(); err != nil {
t.Errorf("connection to telemetry server failed %v", err)
}
Expand Down
Loading