Skip to content

Commit

Permalink
FFM-11212 Metrics enhancements (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
erdirowlands committed Apr 19, 2024
1 parent dffa98c commit 5d1c3b7
Show file tree
Hide file tree
Showing 9 changed files with 544 additions and 175 deletions.
232 changes: 107 additions & 125 deletions analyticsservice/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"strconv"
"sync"
"time"

"github.com/harness/ff-golang-server-sdk/sdk_codes"
Expand All @@ -32,8 +31,20 @@ const (
sdkLanguageAttribute string = "SDK_LANGUAGE"
sdkLanguage string = "go"
globalTarget string = "global"
maxAnalyticsEntries int = 10000
maxTargetEntries int = 100000
)

// SafeAnalyticsCache is a type that provides thread safe access to maps used by analytics
type SafeAnalyticsCache[K comparable, V any] interface {
set(key K, value V)
get(key K) (V, bool)
delete(key K)
size() int
clear()
iterate(func(K, V))
}

type analyticsEvent struct {
target *evaluation.Target
featureConfig *rest.FeatureConfig
Expand All @@ -43,13 +54,14 @@ type analyticsEvent struct {

// AnalyticsService provides a way to cache and send analytics to the server
type AnalyticsService struct {
mx *sync.Mutex
analyticsChan chan analyticsEvent
analyticsData map[string]analyticsEvent
timeout time.Duration
logger logger.Logger
metricsClient *metricsclient.ClientWithResponsesInterface
environmentID string
analyticsChan chan analyticsEvent
evaluationAnalytics SafeAnalyticsCache[string, analyticsEvent]
targetAnalytics SafeAnalyticsCache[string, evaluation.Target]
seenTargets SafeAnalyticsCache[string, bool]
timeout time.Duration
logger logger.Logger
metricsClient metricsclient.ClientWithResponsesInterface
environmentID string
}

// NewAnalyticsService creates and starts a analytics service to send data to the client
Expand All @@ -61,19 +73,20 @@ func NewAnalyticsService(timeout time.Duration, logger logger.Logger) *Analytics
serviceTimeout = 1 * time.Hour
}
as := AnalyticsService{
mx: &sync.Mutex{},
analyticsChan: make(chan analyticsEvent),
analyticsData: map[string]analyticsEvent{},
timeout: serviceTimeout,
logger: logger,
analyticsChan: make(chan analyticsEvent),
evaluationAnalytics: newSafeEvaluationAnalytics(),
targetAnalytics: newSafeTargetAnalytics(),
seenTargets: newSafeSeenTargets(),
timeout: serviceTimeout,
logger: logger,
}
go as.listener()

return &as
}

// Start starts the client and timer to send analytics
func (as *AnalyticsService) Start(ctx context.Context, client *metricsclient.ClientWithResponsesInterface, environmentID string) {
func (as *AnalyticsService) Start(ctx context.Context, client metricsclient.ClientWithResponsesInterface, environmentID string) {
as.logger.Infof("%s Metrics started", sdk_codes.MetricsStarted)
as.metricsClient = client
as.environmentID = environmentID
Expand Down Expand Up @@ -106,18 +119,44 @@ func (as *AnalyticsService) PushToQueue(featureConfig *rest.FeatureConfig, targe
func (as *AnalyticsService) listener() {
as.logger.Info("Analytics cache successfully initialized")
for ad := range as.analyticsChan {
key := getEventSummaryKey(ad)
analyticsKey := getEvaluationAnalyticKey(ad)

// Check if we've hit capacity for evaluations
if as.evaluationAnalytics.size() < maxAnalyticsEntries {
// Update evaluation metrics
analytic, ok := as.evaluationAnalytics.get(analyticsKey)
if !ok {
ad.count = 1
as.evaluationAnalytics.set(analyticsKey, ad)
} else {
ad.count = analytic.count + 1
as.evaluationAnalytics.set(analyticsKey, ad)
}
} else {
as.logger.Warnf("%s Evaluation analytic cache reached max size, remaining evaluation metrics for this analytics interval will not be sent", sdk_codes.EvaluationMetricsMaxSizeReached)
}

// Check if target is nil or anonymous
if ad.target == nil || (ad.target.Anonymous != nil && *ad.target.Anonymous) {
continue
}

as.mx.Lock()
analytic, ok := as.analyticsData[key]
if !ok {
ad.count = 1
as.analyticsData[key] = ad
// Check if target has been seen
_, seen := as.seenTargets.get(ad.target.Identifier)

if seen {
continue
}

// Update seen targets
as.seenTargets.set(ad.target.Identifier, true)

// Update target metrics
if as.targetAnalytics.size() < maxTargetEntries {
as.targetAnalytics.set(ad.target.Identifier, *ad.target)
} else {
ad.count = (analytic.count + 1)
as.analyticsData[key] = ad
as.logger.Warnf("%s Target analytics cache reached max size, remaining target metrics for this analytics interval will not be sent", sdk_codes.TargetMetricsMaxSizeReached)
}
as.mx.Unlock()
}
}

Expand Down Expand Up @@ -150,116 +189,75 @@ func convertInterfaceToString(i interface{}) string {
}

func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) {
as.mx.Lock()
// copy cache to send to server
analyticsData := as.analyticsData
// clear cache. As metrics is secondary to the flags, we do it this way
// so it doesn't effect the performance of our users code. Even if it means
// we lose metrics the odd time.
as.analyticsData = map[string]analyticsEvent{}
as.mx.Unlock()

metricData := make([]metricsclient.MetricsData, 0, len(as.analyticsData))
targetData := map[string]metricsclient.TargetData{}

for _, analytic := range analyticsData {
if analytic.target != nil {
if analytic.target.Anonymous == nil || !*analytic.target.Anonymous {
targetAttributes := make([]metricsclient.KeyValue, 0)
if analytic.target.Attributes != nil {
targetAttributes = make([]metricsclient.KeyValue, 0, len(*analytic.target.Attributes))
for key, value := range *analytic.target.Attributes {
v := convertInterfaceToString(value)
kv := metricsclient.KeyValue{
Key: key,
Value: v,
}
targetAttributes = append(targetAttributes, kv)
}

}

targetName := analytic.target.Identifier
if analytic.target.Name != "" {
targetName = analytic.target.Name
}

td := metricsclient.TargetData{
Name: targetName,
Identifier: analytic.target.Identifier,
Attributes: targetAttributes,
}
targetData[analytic.target.Identifier] = td
}
}

// Clone and reset the evaluation analytics cache to minimise the duration
// for which locks are held, so that metrics processing does not affect flag evaluations performance.
// Although this might occasionally result in the loss of some metrics during periods of high load,
// it is an acceptable tradeoff to prevent extended lock periods that could degrade user code.
evaluationAnalyticsClone := as.evaluationAnalytics

as.evaluationAnalytics = newSafeEvaluationAnalytics()

// Clone and reset target analytics cache for same reason.
targetAnalyticsClone := as.targetAnalytics
as.targetAnalytics = newSafeTargetAnalytics()

metricData := make([]metricsclient.MetricsData, 0, evaluationAnalyticsClone.size())
targetData := make([]metricsclient.TargetData, 0, targetAnalyticsClone.size())

// Process evaluation metrics
evaluationAnalyticsClone.iterate(func(key string, analytic analyticsEvent) {
metricAttributes := []metricsclient.KeyValue{
{
Key: featureIdentifierAttribute,
Value: analytic.featureConfig.Feature,
},
{
Key: featureNameAttribute,
Value: analytic.featureConfig.Feature,
},
{
Key: variationIdentifierAttribute,
Value: analytic.variation.Identifier,
},
{
Key: variationValueAttribute,
Value: analytic.variation.Value,
},
{
Key: sdkTypeAttribute,
Value: sdkType,
},
{
Key: sdkLanguageAttribute,
Value: sdkLanguage,
},
{
Key: sdkVersionAttribute,
Value: SdkVersion,
},
{Key: featureIdentifierAttribute, Value: analytic.featureConfig.Feature},
{Key: featureNameAttribute, Value: analytic.featureConfig.Feature},
{Key: variationIdentifierAttribute, Value: analytic.variation.Identifier},
{Key: variationValueAttribute, Value: analytic.variation.Value},
{Key: sdkTypeAttribute, Value: sdkType},
{Key: sdkLanguageAttribute, Value: sdkLanguage},
{Key: sdkVersionAttribute, Value: SdkVersion},
{Key: targetAttribute, Value: globalTarget},
}

metricAttributes = append(metricAttributes, metricsclient.KeyValue{
Key: targetAttribute,
Value: globalTarget,
})

md := metricsclient.MetricsData{
Timestamp: time.Now().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)),
Count: analytic.count,
MetricsType: metricsclient.MetricsDataMetricsType(ffMetricType),
Attributes: metricAttributes,
}
metricData = append(metricData, md)
}
})

// if targets data is empty we just send nil
var targetDataPayload *[]metricsclient.TargetData = nil
if len(targetData) > 0 {
targetDataPayload = targetDataMapToArray(targetData)
}
// Process target metrics
targetAnalyticsClone.iterate(func(key string, target evaluation.Target) {
targetAttributes := make([]metricsclient.KeyValue, 0)
for key, value := range *target.Attributes {
targetAttributes = append(targetAttributes, metricsclient.KeyValue{Key: key, Value: convertInterfaceToString(value)})
}

td := metricsclient.TargetData{
Identifier: target.Identifier,
Name: target.Name,
Attributes: targetAttributes,
}
targetData = append(targetData, td)
})

analyticsPayload := metricsclient.PostMetricsJSONRequestBody{
MetricsData: &metricData,
TargetData: targetDataPayload,
TargetData: &targetData,
}

if as.metricsClient != nil {
emptyMetricsData := analyticsPayload.MetricsData == nil || len(*analyticsPayload.MetricsData) == 0
emptyTargetData := analyticsPayload.TargetData == nil || len(*analyticsPayload.TargetData) == 0
emptyMetricsData := len(metricData) == 0
emptyTargetData := len(targetData) == 0

// if we have no metrics to send skip the post request
if emptyMetricsData && emptyTargetData {
as.logger.Debug("No metrics or target data to send")
return
}

mClient := *as.metricsClient
mClient := as.metricsClient

jsonData, err := json.Marshal(analyticsPayload)
if err != nil {
Expand Down Expand Up @@ -287,22 +285,6 @@ func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) {
}
}

//func getEventKey(event analyticsEvent) string {
// targetIdentifier := ""
// if event.target != nil {
// targetIdentifier = event.target.Identifier
// }
// return fmt.Sprintf("%s-%s-%s-%s", event.featureConfig.Feature, event.variation.Identifier, event.variation.Value, targetIdentifier)
//}

func getEventSummaryKey(event analyticsEvent) string {
func getEvaluationAnalyticKey(event analyticsEvent) string {
return fmt.Sprintf("%s-%s-%s-%s", event.featureConfig.Feature, event.variation.Identifier, event.variation.Value, globalTarget)
}

func targetDataMapToArray(targetMap map[string]metricsclient.TargetData) *[]metricsclient.TargetData {
targetDataArray := make([]metricsclient.TargetData, 0, len(targetMap))
for _, targetData := range targetMap {
targetDataArray = append(targetDataArray, targetData)
}
return &targetDataArray
}
Loading

0 comments on commit 5d1c3b7

Please sign in to comment.