Skip to content
28 changes: 19 additions & 9 deletions aitelemetry/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,27 @@ type Metric struct {
CustomDimensions map[string]string
}

type AIConfig struct {
AppName string
AppVersion string
BatchSize int
BatchInterval int
DisableMetadataRefreshThread bool
RefreshTimeout int
DebugMode bool
}

// TelmetryHandle holds appinsight handles and metadata
type telemetryHandle struct {
telemetryConfig *appinsights.TelemetryConfiguration
appName string
appVersion string
metadata common.Metadata
diagListener appinsights.DiagnosticsMessageListener
client appinsights.TelemetryClient
enableMetadataRefreshThread bool
refreshTimeout int
rwmutex sync.RWMutex
telemetryConfig *appinsights.TelemetryConfiguration
appName string
appVersion string
metadata common.Metadata
diagListener appinsights.DiagnosticsMessageListener
client appinsights.TelemetryClient
disableMetadataRefreshThread bool
refreshTimeout int
rwmutex sync.RWMutex
}

// Telemetry Interface to send metrics/Logs to appinsights
Expand Down
81 changes: 50 additions & 31 deletions aitelemetry/telemetrywrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,55 @@ const (
vmSizeStr = "VMSize"
osVersionStr = "OSVersion"
locationStr = "Region"
appVersionStr = "Appversion"
appNameStr = "AppName"
subscriptionIDStr = "SubscriptionID"
vmNameStr = "VMName"
defaultTimeout = 10
)

var debugMode bool

func messageListener() appinsights.DiagnosticsMessageListener {
return appinsights.NewDiagnosticsMessageListener(func(msg string) error {
log.Printf("[AppInsights] [%s] %s\n", time.Now().Format(time.UnixDate), msg)
return nil
})
if debugMode {
return appinsights.NewDiagnosticsMessageListener(func(msg string) error {
debuglog("[AppInsights] [%s] %s\n", time.Now().Format(time.UnixDate), msg)
return nil
})
}

return nil
}

func debuglog(format string, args ...interface{}) {
if debugMode {
log.Printf(format, args...)
}
}

func getMetadata(th *telemetryHandle) {
var metadata common.Metadata
var err error

if th.refreshTimeout < 4 {
th.refreshTimeout = defaultTimeout
}

// check if metadata in memory otherwise initiate wireserver request
for {
metadata, err = common.GetHostMetadata(metadataFile)
if err == nil || !th.enableMetadataRefreshThread {
if err == nil || th.disableMetadataRefreshThread {
break
}

log.Printf("[AppInsights] Error getting metadata %v. Sleep for %d", err, th.refreshTimeout)
debuglog("[AppInsights] Error getting metadata %v. Sleep for %d", err, th.refreshTimeout)
time.Sleep(time.Duration(th.refreshTimeout) * time.Second)
}

if err != nil {
debuglog("[AppInsights] Error getting metadata %v", err)
return
}

//acquire write lock before writing metadata to telemetry handle
th.rwmutex.Lock()
th.metadata = metadata
Expand All @@ -50,46 +72,42 @@ func getMetadata(th *telemetryHandle) {
// Save metadata retrieved from wireserver to a file
kvs, err := store.NewJsonFileStore(metadataFile)
if err != nil {
log.Printf("[AppInsights] Error initializing kvs store: %v", err)
debuglog("[AppInsights] Error initializing kvs store: %v", err)
return
}

kvs.Lock(true)
err = common.SaveHostMetadata(th.metadata, metadataFile)
kvs.Unlock(true)
if err != nil {
log.Printf("[AppInsights] saving host metadata failed with :%v", err)
debuglog("[AppInsights] saving host metadata failed with :%v", err)
}
}

// NewAITelemetry creates telemetry handle with user specified appinsights key.
// NewAITelemetry creates telemetry handle with user specified appinsights id.
func NewAITelemetry(
key string,
appName string,
appVersion string,
batchSize int,
batchInterval int,
enableMetadataRefreshThread bool,
refreshTimeout int,
id string,
aiConfig AIConfig,
) TelemetryHandle {

telemetryConfig := appinsights.NewTelemetryConfiguration(key)
telemetryConfig.MaxBatchSize = batchSize
telemetryConfig.MaxBatchInterval = time.Duration(batchInterval) * time.Second
telemetryConfig := appinsights.NewTelemetryConfiguration(id)
telemetryConfig.MaxBatchSize = aiConfig.BatchSize
telemetryConfig.MaxBatchInterval = time.Duration(aiConfig.BatchInterval) * time.Second
debugMode = aiConfig.DebugMode

th := &telemetryHandle{
client: appinsights.NewTelemetryClientFromConfig(telemetryConfig),
appName: appName,
appVersion: appVersion,
diagListener: messageListener(),
enableMetadataRefreshThread: enableMetadataRefreshThread,
refreshTimeout: refreshTimeout,
client: appinsights.NewTelemetryClientFromConfig(telemetryConfig),
appName: aiConfig.AppName,
appVersion: aiConfig.AppVersion,
diagListener: messageListener(),
disableMetadataRefreshThread: aiConfig.DisableMetadataRefreshThread,
refreshTimeout: aiConfig.RefreshTimeout,
}

if th.enableMetadataRefreshThread {
go getMetadata(th)
} else {
if th.disableMetadataRefreshThread {
getMetadata(th)
} else {
go getMetadata(th)
}

return th
Expand All @@ -104,14 +122,14 @@ func (th *telemetryHandle) TrackLog(report Report) {
//Override few of existing columns with metadata
trace.Tags.User().SetAuthUserId(runtime.GOOS)
trace.Tags.Operation().SetId(report.Context)
trace.Tags.Operation().SetParentId(th.appName)
trace.Tags.Operation().SetParentId(th.appVersion)

// copy app specified custom dimension
for key, value := range report.CustomDimensions {
trace.Properties[key] = value
}

trace.Properties[appVersionStr] = th.appVersion
trace.Properties[appNameStr] = th.appName

// Acquire read lock to read metadata
th.rwmutex.RLock()
Expand Down Expand Up @@ -148,6 +166,7 @@ func (th *telemetryHandle) TrackMetric(metric Metric) {
if metadata.SubscriptionID != "" {
aimetric.Properties[locationStr] = th.metadata.Location
aimetric.Properties[subscriptionIDStr] = th.metadata.SubscriptionID
aimetric.Properties[vmNameStr] = th.metadata.VMName
}

// copy custom dimensions
Expand Down
56 changes: 53 additions & 3 deletions aitelemetry/telemetrywrapper_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package aitelemetry

import (
"fmt"
"os"
"path/filepath"
"runtime"
"testing"

Expand All @@ -15,22 +17,52 @@ func TestMain(m *testing.M) {
if runtime.GOOS == "linux" {
platform.ExecuteCommand("cp metadata_test.json /tmp/azuremetadata.json")
} else {
platform.ExecuteCommand("copy metadata_test.json azuremetadata.json")
metadataFile := filepath.FromSlash(os.Getenv("TEMP")) + "\\azuremetadata.json"
cmd := fmt.Sprintf("copy metadata_test.json %s", metadataFile)
platform.ExecuteCommand(cmd)
}

exitCode := m.Run()

if runtime.GOOS == "linux" {
platform.ExecuteCommand("rm /tmp/azuremetadata.json")
} else {
platform.ExecuteCommand("del azuremetadata.json")
metadataFile := filepath.FromSlash(os.Getenv("TEMP")) + "\\azuremetadata.json"
cmd := fmt.Sprintf("del %s", metadataFile)
platform.ExecuteCommand(cmd)
}

os.Exit(exitCode)
}

func TestEmptyAIKey(t *testing.T) {
aiConfig := AIConfig{
AppName: "testapp",
AppVersion: "v1.0.26",
BatchSize: 4096,
BatchInterval: 2,
RefreshTimeout: 10,
DebugMode: true,
DisableMetadataRefreshThread: true,
}
th := NewAITelemetry("", aiConfig)
if th == nil {
t.Errorf("Error intializing AI telemetry")
}
th.Close(10)
}

func TestNewAITelemetry(t *testing.T) {
th = NewAITelemetry("00ca2a73-c8d6-4929-a0c2-cf84545ec225", "testapp", "v1.0.26", 4096, 2, false, 10)
aiConfig := AIConfig{
AppName: "testapp",
AppVersion: "v1.0.26",
BatchSize: 4096,
BatchInterval: 2,
RefreshTimeout: 10,
DebugMode: true,
DisableMetadataRefreshThread: true,
}
th = NewAITelemetry("00ca2a73-c8d6-4929-a0c2-cf84545ec225", aiConfig)
if th == nil {
t.Errorf("Error intializing AI telemetry")
}
Expand Down Expand Up @@ -61,3 +93,21 @@ func TestTrackLog(t *testing.T) {
func TestClose(t *testing.T) {
th.Close(10)
}

func TestClosewithoutSend(t *testing.T) {
aiConfig := AIConfig{
AppName: "testapp",
AppVersion: "v1.0.26",
BatchSize: 4096,
BatchInterval: 2,
DisableMetadataRefreshThread: true,
RefreshTimeout: 10,
}

thtest := NewAITelemetry("00ca2a73-c8d6-4929-a0c2-cf84545ec225", aiConfig)
if thtest == nil {
t.Errorf("Error intializing AI telemetry")
}

thtest.Close(10)
}