Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FFM-11212 Metrics enhancements #150

Merged
merged 44 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6ed8dab
FFM-11212 Start splitting metrics caches
erdirowlands Apr 16, 2024
1c25d71
FFM-11212 Start splitting metrics caches
erdirowlands Apr 16, 2024
125f743
FFM-11212 Start splitting metrics caches
erdirowlands Apr 16, 2024
133d5c5
FFM-11212 Use separate locks per map
erdirowlands Apr 16, 2024
b0d66f1
FFM-11212 Comments for clarity
erdirowlands Apr 16, 2024
1b56990
FFM-11212 Early return
erdirowlands Apr 16, 2024
a96c107
FFM-11212 Change return to continue
erdirowlands Apr 16, 2024
690d8f4
FFM-11212 Init locks
erdirowlands Apr 16, 2024
0f1512d
FFM-11212 Rename and reorg
erdirowlands Apr 16, 2024
973f81d
FFM-11212 Copy and clear targets
erdirowlands Apr 16, 2024
c29b8e7
FFM-11212 Refactor sendDataAndResetCache
erdirowlands Apr 16, 2024
6e1bbf8
FFM-11212 Rename key function
erdirowlands Apr 16, 2024
184ca14
FFM-11212 Delete unused func
erdirowlands Apr 16, 2024
682bd92
FFM-11212 Tweak comment
erdirowlands Apr 16, 2024
8d27fe5
FFM-11212 Tweak comment
erdirowlands Apr 16, 2024
089f0c5
FFM-11212 New test for listener
erdirowlands Apr 16, 2024
3a9e269
FFM-11212 New test for listener
erdirowlands Apr 16, 2024
d9f7a8c
FFM-11212 New test for listener
erdirowlands Apr 16, 2024
759d9e2
FFM-11212 New test for listener
erdirowlands Apr 16, 2024
651c880
FFM-11212 New test for listener
erdirowlands Apr 16, 2024
eb35ce7
FFM-11212 New test for listener
erdirowlands Apr 16, 2024
f0b9372
FFM-11212 New test for listener
erdirowlands Apr 16, 2024
98714d8
FFM-11212 New test for listener
erdirowlands Apr 17, 2024
0286d67
FFM-11212 Remove reduandant paranthesis
erdirowlands Apr 17, 2024
94b6fbc
FFM-11212 New test for listener
erdirowlands Apr 17, 2024
f82e13e
FFM-11212 Don't use pointer interface for metrics api
erdirowlands Apr 17, 2024
7e27d11
FFM-11212 Don't store anon targets
erdirowlands Apr 17, 2024
99761db
FFM-11212 Test for anonymous target
erdirowlands Apr 17, 2024
1184137
FFM-11212 Test for nil target
erdirowlands Apr 17, 2024
1ce0fc8
FFM-11212 Remove test for send data
erdirowlands Apr 17, 2024
dee51f1
FFM-11212 Implement cache limits
erdirowlands Apr 17, 2024
fbf3caa
FFM-11212 Implement cache limits
erdirowlands Apr 17, 2024
132b37c
FFM-11212 Encapsulate maps and write tests
erdirowlands Apr 18, 2024
4a262bb
FFM-11212 Encapsulate maps and write tests
erdirowlands Apr 18, 2024
644c9a3
FFM-11212 Encapsulate maps and write tests
erdirowlands Apr 18, 2024
28ee9e4
FFM-11212 Encapsulate maps and write tests
erdirowlands Apr 18, 2024
51ffe93
FFM-11212 Encapsulate maps and write tests
erdirowlands Apr 18, 2024
220e859
FFM-11212 Encapsulate maps and write tests
erdirowlands Apr 18, 2024
196154a
FFM-11212 Add copy method
erdirowlands Apr 18, 2024
4ea3703
FFM-11212 Add copy method
erdirowlands Apr 18, 2024
bc112ec
FFM-11212 Add iterate methpd
erdirowlands Apr 18, 2024
58257d3
FFM-11212 Update tests
erdirowlands Apr 18, 2024
90d26aa
FFM-11212 Update tests
erdirowlands Apr 18, 2024
7892eb4
FFM-11212 Update tests
erdirowlands Apr 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 107 additions & 125 deletions analyticsservice/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"strconv"
"sync"
"time"

"github.com/harness/ff-golang-server-sdk/sdk_codes"
Expand All @@ -32,8 +31,20 @@ const (
sdkLanguageAttribute string = "SDK_LANGUAGE"
sdkLanguage string = "go"
globalTarget string = "global"
maxAnalyticsEntries int = 10000
maxTargetEntries int = 100000
)

// SafeAnalyticsCache is a type that provides thread safe access to maps used by analytics
type SafeAnalyticsCache[K comparable, V any] interface {
set(key K, value V)
get(key K) (V, bool)
delete(key K)
size() int
clear()
iterate(func(K, V))
}

type analyticsEvent struct {
target *evaluation.Target
featureConfig *rest.FeatureConfig
Expand All @@ -43,13 +54,14 @@ type analyticsEvent struct {

// AnalyticsService provides a way to cache and send analytics to the server
type AnalyticsService struct {
mx *sync.Mutex
analyticsChan chan analyticsEvent
analyticsData map[string]analyticsEvent
timeout time.Duration
logger logger.Logger
metricsClient *metricsclient.ClientWithResponsesInterface
environmentID string
analyticsChan chan analyticsEvent
evaluationAnalytics SafeAnalyticsCache[string, analyticsEvent]
targetAnalytics SafeAnalyticsCache[string, evaluation.Target]
seenTargets SafeAnalyticsCache[string, bool]
timeout time.Duration
logger logger.Logger
metricsClient metricsclient.ClientWithResponsesInterface
environmentID string
}

// NewAnalyticsService creates and starts a analytics service to send data to the client
Expand All @@ -61,19 +73,20 @@ func NewAnalyticsService(timeout time.Duration, logger logger.Logger) *Analytics
serviceTimeout = 1 * time.Hour
}
as := AnalyticsService{
mx: &sync.Mutex{},
analyticsChan: make(chan analyticsEvent),
analyticsData: map[string]analyticsEvent{},
timeout: serviceTimeout,
logger: logger,
analyticsChan: make(chan analyticsEvent),
evaluationAnalytics: newSafeEvaluationAnalytics(),
targetAnalytics: newSafeTargetAnalytics(),
seenTargets: newSafeSeenTargets(),
timeout: serviceTimeout,
logger: logger,
}
go as.listener()

return &as
}

// Start starts the client and timer to send analytics
func (as *AnalyticsService) Start(ctx context.Context, client *metricsclient.ClientWithResponsesInterface, environmentID string) {
func (as *AnalyticsService) Start(ctx context.Context, client metricsclient.ClientWithResponsesInterface, environmentID string) {
as.logger.Infof("%s Metrics started", sdk_codes.MetricsStarted)
as.metricsClient = client
as.environmentID = environmentID
Expand Down Expand Up @@ -106,18 +119,44 @@ func (as *AnalyticsService) PushToQueue(featureConfig *rest.FeatureConfig, targe
func (as *AnalyticsService) listener() {
as.logger.Info("Analytics cache successfully initialized")
for ad := range as.analyticsChan {
key := getEventSummaryKey(ad)
analyticsKey := getEvaluationAnalyticKey(ad)

// Check if we've hit capacity for evaluations
if as.evaluationAnalytics.size() < maxAnalyticsEntries {
// Update evaluation metrics
analytic, ok := as.evaluationAnalytics.get(analyticsKey)
if !ok {
ad.count = 1
as.evaluationAnalytics.set(analyticsKey, ad)
} else {
ad.count = analytic.count + 1
as.evaluationAnalytics.set(analyticsKey, ad)
}
} else {
as.logger.Warnf("%s Evaluation analytic cache reached max size, remaining evaluation metrics for this analytics interval will not be sent", sdk_codes.EvaluationMetricsMaxSizeReached)
}

// Check if target is nil or anonymous
if ad.target == nil || (ad.target.Anonymous != nil && *ad.target.Anonymous) {
continue
}

as.mx.Lock()
analytic, ok := as.analyticsData[key]
if !ok {
ad.count = 1
as.analyticsData[key] = ad
// Check if target has been seen
_, seen := as.seenTargets.get(ad.target.Identifier)

if seen {
continue
}

// Update seen targets
as.seenTargets.set(ad.target.Identifier, true)

// Update target metrics
if as.targetAnalytics.size() < maxTargetEntries {
as.targetAnalytics.set(ad.target.Identifier, *ad.target)
} else {
ad.count = (analytic.count + 1)
as.analyticsData[key] = ad
as.logger.Warnf("%s Target analytics cache reached max size, remaining target metrics for this analytics interval will not be sent", sdk_codes.TargetMetricsMaxSizeReached)
}
as.mx.Unlock()
}
}

Expand Down Expand Up @@ -150,116 +189,75 @@ func convertInterfaceToString(i interface{}) string {
}

func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) {
as.mx.Lock()
// copy cache to send to server
analyticsData := as.analyticsData
// clear cache. As metrics is secondary to the flags, we do it this way
// so it doesn't effect the performance of our users code. Even if it means
// we lose metrics the odd time.
as.analyticsData = map[string]analyticsEvent{}
as.mx.Unlock()

metricData := make([]metricsclient.MetricsData, 0, len(as.analyticsData))
targetData := map[string]metricsclient.TargetData{}

for _, analytic := range analyticsData {
if analytic.target != nil {
if analytic.target.Anonymous == nil || !*analytic.target.Anonymous {
targetAttributes := make([]metricsclient.KeyValue, 0)
if analytic.target.Attributes != nil {
targetAttributes = make([]metricsclient.KeyValue, 0, len(*analytic.target.Attributes))
for key, value := range *analytic.target.Attributes {
v := convertInterfaceToString(value)
kv := metricsclient.KeyValue{
Key: key,
Value: v,
}
targetAttributes = append(targetAttributes, kv)
}

}

targetName := analytic.target.Identifier
if analytic.target.Name != "" {
targetName = analytic.target.Name
}

td := metricsclient.TargetData{
Name: targetName,
Identifier: analytic.target.Identifier,
Attributes: targetAttributes,
}
targetData[analytic.target.Identifier] = td
}
}

// Clone and reset the evaluation analytics cache to minimise the duration
// for which locks are held, so that metrics processing does not affect flag evaluations performance.
// Although this might occasionally result in the loss of some metrics during periods of high load,
// it is an acceptable tradeoff to prevent extended lock periods that could degrade user code.
evaluationAnalyticsClone := as.evaluationAnalytics

as.evaluationAnalytics = newSafeEvaluationAnalytics()

// Clone and reset target analytics cache for same reason.
targetAnalyticsClone := as.targetAnalytics
as.targetAnalytics = newSafeTargetAnalytics()

metricData := make([]metricsclient.MetricsData, 0, evaluationAnalyticsClone.size())
targetData := make([]metricsclient.TargetData, 0, targetAnalyticsClone.size())

// Process evaluation metrics
evaluationAnalyticsClone.iterate(func(key string, analytic analyticsEvent) {
metricAttributes := []metricsclient.KeyValue{
{
Key: featureIdentifierAttribute,
Value: analytic.featureConfig.Feature,
},
{
Key: featureNameAttribute,
Value: analytic.featureConfig.Feature,
},
{
Key: variationIdentifierAttribute,
Value: analytic.variation.Identifier,
},
{
Key: variationValueAttribute,
Value: analytic.variation.Value,
},
{
Key: sdkTypeAttribute,
Value: sdkType,
},
{
Key: sdkLanguageAttribute,
Value: sdkLanguage,
},
{
Key: sdkVersionAttribute,
Value: SdkVersion,
},
{Key: featureIdentifierAttribute, Value: analytic.featureConfig.Feature},
{Key: featureNameAttribute, Value: analytic.featureConfig.Feature},
{Key: variationIdentifierAttribute, Value: analytic.variation.Identifier},
{Key: variationValueAttribute, Value: analytic.variation.Value},
{Key: sdkTypeAttribute, Value: sdkType},
{Key: sdkLanguageAttribute, Value: sdkLanguage},
{Key: sdkVersionAttribute, Value: SdkVersion},
{Key: targetAttribute, Value: globalTarget},
}

metricAttributes = append(metricAttributes, metricsclient.KeyValue{
Key: targetAttribute,
Value: globalTarget,
})

md := metricsclient.MetricsData{
Timestamp: time.Now().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)),
Count: analytic.count,
MetricsType: metricsclient.MetricsDataMetricsType(ffMetricType),
Attributes: metricAttributes,
}
metricData = append(metricData, md)
}
})

// if targets data is empty we just send nil
var targetDataPayload *[]metricsclient.TargetData = nil
if len(targetData) > 0 {
targetDataPayload = targetDataMapToArray(targetData)
}
// Process target metrics
targetAnalyticsClone.iterate(func(key string, target evaluation.Target) {
targetAttributes := make([]metricsclient.KeyValue, 0)
for key, value := range *target.Attributes {
targetAttributes = append(targetAttributes, metricsclient.KeyValue{Key: key, Value: convertInterfaceToString(value)})
}

td := metricsclient.TargetData{
Identifier: target.Identifier,
Name: target.Name,
Attributes: targetAttributes,
}
targetData = append(targetData, td)
})

analyticsPayload := metricsclient.PostMetricsJSONRequestBody{
MetricsData: &metricData,
TargetData: targetDataPayload,
TargetData: &targetData,
}

if as.metricsClient != nil {
emptyMetricsData := analyticsPayload.MetricsData == nil || len(*analyticsPayload.MetricsData) == 0
emptyTargetData := analyticsPayload.TargetData == nil || len(*analyticsPayload.TargetData) == 0
emptyMetricsData := len(metricData) == 0
emptyTargetData := len(targetData) == 0

// if we have no metrics to send skip the post request
if emptyMetricsData && emptyTargetData {
as.logger.Debug("No metrics or target data to send")
return
}

mClient := *as.metricsClient
mClient := as.metricsClient

jsonData, err := json.Marshal(analyticsPayload)
if err != nil {
Expand Down Expand Up @@ -287,22 +285,6 @@ func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) {
}
}

//func getEventKey(event analyticsEvent) string {
// targetIdentifier := ""
// if event.target != nil {
// targetIdentifier = event.target.Identifier
// }
// return fmt.Sprintf("%s-%s-%s-%s", event.featureConfig.Feature, event.variation.Identifier, event.variation.Value, targetIdentifier)
//}

func getEventSummaryKey(event analyticsEvent) string {
func getEvaluationAnalyticKey(event analyticsEvent) string {
return fmt.Sprintf("%s-%s-%s-%s", event.featureConfig.Feature, event.variation.Identifier, event.variation.Value, globalTarget)
}

func targetDataMapToArray(targetMap map[string]metricsclient.TargetData) *[]metricsclient.TargetData {
targetDataArray := make([]metricsclient.TargetData, 0, len(targetMap))
for _, targetData := range targetMap {
targetDataArray = append(targetDataArray, targetData)
}
return &targetDataArray
}
Loading
Loading