diff --git a/analyticsservice/analytics.go b/analyticsservice/analytics.go index e3b26b8..e2cca31 100644 --- a/analyticsservice/analytics.go +++ b/analyticsservice/analytics.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "strconv" - "sync" "time" "github.com/harness/ff-golang-server-sdk/sdk_codes" @@ -32,8 +31,20 @@ const ( sdkLanguageAttribute string = "SDK_LANGUAGE" sdkLanguage string = "go" globalTarget string = "global" + maxAnalyticsEntries int = 10000 + maxTargetEntries int = 100000 ) +// SafeAnalyticsCache is a type that provides thread safe access to maps used by analytics +type SafeAnalyticsCache[K comparable, V any] interface { + set(key K, value V) + get(key K) (V, bool) + delete(key K) + size() int + clear() + iterate(func(K, V)) +} + type analyticsEvent struct { target *evaluation.Target featureConfig *rest.FeatureConfig @@ -43,13 +54,14 @@ type analyticsEvent struct { // AnalyticsService provides a way to cache and send analytics to the server type AnalyticsService struct { - mx *sync.Mutex - analyticsChan chan analyticsEvent - analyticsData map[string]analyticsEvent - timeout time.Duration - logger logger.Logger - metricsClient *metricsclient.ClientWithResponsesInterface - environmentID string + analyticsChan chan analyticsEvent + evaluationAnalytics SafeAnalyticsCache[string, analyticsEvent] + targetAnalytics SafeAnalyticsCache[string, evaluation.Target] + seenTargets SafeAnalyticsCache[string, bool] + timeout time.Duration + logger logger.Logger + metricsClient metricsclient.ClientWithResponsesInterface + environmentID string } // NewAnalyticsService creates and starts a analytics service to send data to the client @@ -61,11 +73,12 @@ func NewAnalyticsService(timeout time.Duration, logger logger.Logger) *Analytics serviceTimeout = 1 * time.Hour } as := AnalyticsService{ - mx: &sync.Mutex{}, - analyticsChan: make(chan analyticsEvent), - analyticsData: map[string]analyticsEvent{}, - timeout: serviceTimeout, - logger: logger, + analyticsChan: make(chan analyticsEvent), + evaluationAnalytics: newSafeEvaluationAnalytics(), + targetAnalytics: newSafeTargetAnalytics(), + seenTargets: newSafeSeenTargets(), + timeout: serviceTimeout, + logger: logger, } go as.listener() @@ -73,7 +86,7 @@ func NewAnalyticsService(timeout time.Duration, logger logger.Logger) *Analytics } // Start starts the client and timer to send analytics -func (as *AnalyticsService) Start(ctx context.Context, client *metricsclient.ClientWithResponsesInterface, environmentID string) { +func (as *AnalyticsService) Start(ctx context.Context, client metricsclient.ClientWithResponsesInterface, environmentID string) { as.logger.Infof("%s Metrics started", sdk_codes.MetricsStarted) as.metricsClient = client as.environmentID = environmentID @@ -106,18 +119,44 @@ func (as *AnalyticsService) PushToQueue(featureConfig *rest.FeatureConfig, targe func (as *AnalyticsService) listener() { as.logger.Info("Analytics cache successfully initialized") for ad := range as.analyticsChan { - key := getEventSummaryKey(ad) + analyticsKey := getEvaluationAnalyticKey(ad) + + // Check if we've hit capacity for evaluations + if as.evaluationAnalytics.size() < maxAnalyticsEntries { + // Update evaluation metrics + analytic, ok := as.evaluationAnalytics.get(analyticsKey) + if !ok { + ad.count = 1 + as.evaluationAnalytics.set(analyticsKey, ad) + } else { + ad.count = analytic.count + 1 + as.evaluationAnalytics.set(analyticsKey, ad) + } + } else { + as.logger.Warnf("%s Evaluation analytic cache reached max size, remaining evaluation metrics for this analytics interval will not be sent", sdk_codes.EvaluationMetricsMaxSizeReached) + } + + // Check if target is nil or anonymous + if ad.target == nil || (ad.target.Anonymous != nil && *ad.target.Anonymous) { + continue + } - as.mx.Lock() - analytic, ok := as.analyticsData[key] - if !ok { - ad.count = 1 - as.analyticsData[key] = ad + // Check if target has been seen + _, seen := as.seenTargets.get(ad.target.Identifier) + + if seen { + continue + } + + // Update seen targets + as.seenTargets.set(ad.target.Identifier, true) + + // Update target metrics + if as.targetAnalytics.size() < maxTargetEntries { + as.targetAnalytics.set(ad.target.Identifier, *ad.target) } else { - ad.count = (analytic.count + 1) - as.analyticsData[key] = ad + as.logger.Warnf("%s Target analytics cache reached max size, remaining target metrics for this analytics interval will not be sent", sdk_codes.TargetMetricsMaxSizeReached) } - as.mx.Unlock() } } @@ -150,85 +189,35 @@ func convertInterfaceToString(i interface{}) string { } func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) { - as.mx.Lock() - // copy cache to send to server - analyticsData := as.analyticsData - // clear cache. As metrics is secondary to the flags, we do it this way - // so it doesn't effect the performance of our users code. Even if it means - // we lose metrics the odd time. - as.analyticsData = map[string]analyticsEvent{} - as.mx.Unlock() - - metricData := make([]metricsclient.MetricsData, 0, len(as.analyticsData)) - targetData := map[string]metricsclient.TargetData{} - - for _, analytic := range analyticsData { - if analytic.target != nil { - if analytic.target.Anonymous == nil || !*analytic.target.Anonymous { - targetAttributes := make([]metricsclient.KeyValue, 0) - if analytic.target.Attributes != nil { - targetAttributes = make([]metricsclient.KeyValue, 0, len(*analytic.target.Attributes)) - for key, value := range *analytic.target.Attributes { - v := convertInterfaceToString(value) - kv := metricsclient.KeyValue{ - Key: key, - Value: v, - } - targetAttributes = append(targetAttributes, kv) - } - - } - - targetName := analytic.target.Identifier - if analytic.target.Name != "" { - targetName = analytic.target.Name - } - - td := metricsclient.TargetData{ - Name: targetName, - Identifier: analytic.target.Identifier, - Attributes: targetAttributes, - } - targetData[analytic.target.Identifier] = td - } - } + // Clone and reset the evaluation analytics cache to minimise the duration + // for which locks are held, so that metrics processing does not affect flag evaluations performance. + // Although this might occasionally result in the loss of some metrics during periods of high load, + // it is an acceptable tradeoff to prevent extended lock periods that could degrade user code. + evaluationAnalyticsClone := as.evaluationAnalytics + + as.evaluationAnalytics = newSafeEvaluationAnalytics() + + // Clone and reset target analytics cache for same reason. + targetAnalyticsClone := as.targetAnalytics + as.targetAnalytics = newSafeTargetAnalytics() + + metricData := make([]metricsclient.MetricsData, 0, evaluationAnalyticsClone.size()) + targetData := make([]metricsclient.TargetData, 0, targetAnalyticsClone.size()) + + // Process evaluation metrics + evaluationAnalyticsClone.iterate(func(key string, analytic analyticsEvent) { metricAttributes := []metricsclient.KeyValue{ - { - Key: featureIdentifierAttribute, - Value: analytic.featureConfig.Feature, - }, - { - Key: featureNameAttribute, - Value: analytic.featureConfig.Feature, - }, - { - Key: variationIdentifierAttribute, - Value: analytic.variation.Identifier, - }, - { - Key: variationValueAttribute, - Value: analytic.variation.Value, - }, - { - Key: sdkTypeAttribute, - Value: sdkType, - }, - { - Key: sdkLanguageAttribute, - Value: sdkLanguage, - }, - { - Key: sdkVersionAttribute, - Value: SdkVersion, - }, + {Key: featureIdentifierAttribute, Value: analytic.featureConfig.Feature}, + {Key: featureNameAttribute, Value: analytic.featureConfig.Feature}, + {Key: variationIdentifierAttribute, Value: analytic.variation.Identifier}, + {Key: variationValueAttribute, Value: analytic.variation.Value}, + {Key: sdkTypeAttribute, Value: sdkType}, + {Key: sdkLanguageAttribute, Value: sdkLanguage}, + {Key: sdkVersionAttribute, Value: SdkVersion}, + {Key: targetAttribute, Value: globalTarget}, } - metricAttributes = append(metricAttributes, metricsclient.KeyValue{ - Key: targetAttribute, - Value: globalTarget, - }) - md := metricsclient.MetricsData{ Timestamp: time.Now().UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond)), Count: analytic.count, @@ -236,22 +225,31 @@ func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) { Attributes: metricAttributes, } metricData = append(metricData, md) - } + }) - // if targets data is empty we just send nil - var targetDataPayload *[]metricsclient.TargetData = nil - if len(targetData) > 0 { - targetDataPayload = targetDataMapToArray(targetData) - } + // Process target metrics + targetAnalyticsClone.iterate(func(key string, target evaluation.Target) { + targetAttributes := make([]metricsclient.KeyValue, 0) + for key, value := range *target.Attributes { + targetAttributes = append(targetAttributes, metricsclient.KeyValue{Key: key, Value: convertInterfaceToString(value)}) + } + + td := metricsclient.TargetData{ + Identifier: target.Identifier, + Name: target.Name, + Attributes: targetAttributes, + } + targetData = append(targetData, td) + }) analyticsPayload := metricsclient.PostMetricsJSONRequestBody{ MetricsData: &metricData, - TargetData: targetDataPayload, + TargetData: &targetData, } if as.metricsClient != nil { - emptyMetricsData := analyticsPayload.MetricsData == nil || len(*analyticsPayload.MetricsData) == 0 - emptyTargetData := analyticsPayload.TargetData == nil || len(*analyticsPayload.TargetData) == 0 + emptyMetricsData := len(metricData) == 0 + emptyTargetData := len(targetData) == 0 // if we have no metrics to send skip the post request if emptyMetricsData && emptyTargetData { @@ -259,7 +257,7 @@ func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) { return } - mClient := *as.metricsClient + mClient := as.metricsClient jsonData, err := json.Marshal(analyticsPayload) if err != nil { @@ -287,22 +285,6 @@ func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) { } } -//func getEventKey(event analyticsEvent) string { -// targetIdentifier := "" -// if event.target != nil { -// targetIdentifier = event.target.Identifier -// } -// return fmt.Sprintf("%s-%s-%s-%s", event.featureConfig.Feature, event.variation.Identifier, event.variation.Value, targetIdentifier) -//} - -func getEventSummaryKey(event analyticsEvent) string { +func getEvaluationAnalyticKey(event analyticsEvent) string { return fmt.Sprintf("%s-%s-%s-%s", event.featureConfig.Feature, event.variation.Identifier, event.variation.Value, globalTarget) } - -func targetDataMapToArray(targetMap map[string]metricsclient.TargetData) *[]metricsclient.TargetData { - targetDataArray := make([]metricsclient.TargetData, 0, len(targetMap)) - for _, targetData := range targetMap { - targetDataArray = append(targetDataArray, targetData) - } - return &targetDataArray -} diff --git a/analyticsservice/analytics_test.go b/analyticsservice/analytics_test.go index 5eb1aa3..6d48c18 100644 --- a/analyticsservice/analytics_test.go +++ b/analyticsservice/analytics_test.go @@ -1,6 +1,139 @@ package analyticsservice -import "testing" +import ( + "testing" + "time" + + "github.com/harness/ff-golang-server-sdk/evaluation" + "github.com/harness/ff-golang-server-sdk/logger" + "github.com/harness/ff-golang-server-sdk/rest" +) + +func TestListenerHandlesEventsCorrectly(t *testing.T) { + noOpLogger := logger.NewNoOpLogger() + + testCases := []struct { + name string + events []analyticsEvent + expectedEvaluations map[string]int + expectedSeen map[string]bool + expectedTargets map[string]evaluation.Target + }{ + { + name: "Single evaluation", + events: []analyticsEvent{ + {target: &evaluation.Target{Identifier: "target1"}, featureConfig: &rest.FeatureConfig{Feature: "feature1"}, variation: &rest.Variation{Identifier: "var1", Value: "value1"}}, + }, + expectedEvaluations: map[string]int{"feature1-var1-value1-global": 1}, + expectedSeen: map[string]bool{"target1": true}, + expectedTargets: map[string]evaluation.Target{"target1": {Identifier: "target1"}}, + }, + { + name: "Two identical evaluations with the same target", + events: []analyticsEvent{ + {target: &evaluation.Target{Identifier: "target1"}, featureConfig: &rest.FeatureConfig{Feature: "feature1"}, variation: &rest.Variation{Identifier: "var1", Value: "value1"}}, + {target: &evaluation.Target{Identifier: "target1"}, featureConfig: &rest.FeatureConfig{Feature: "feature1"}, variation: &rest.Variation{Identifier: "var1", Value: "value1"}}, + }, + expectedEvaluations: map[string]int{"feature1-var1-value1-global": 2}, + expectedSeen: map[string]bool{"target1": true}, + expectedTargets: map[string]evaluation.Target{"target1": {Identifier: "target1"}}, + }, + { + name: "Two identical evaluations with different targets", + events: []analyticsEvent{ + {target: &evaluation.Target{Identifier: "target1"}, featureConfig: &rest.FeatureConfig{Feature: "feature1"}, variation: &rest.Variation{Identifier: "var1", Value: "value1"}}, + {target: &evaluation.Target{Identifier: "target2"}, featureConfig: &rest.FeatureConfig{Feature: "feature1"}, variation: &rest.Variation{Identifier: "var1", Value: "value1"}}, + }, + expectedEvaluations: map[string]int{"feature1-var1-value1-global": 2}, + expectedSeen: map[string]bool{"target1": true, "target2": true}, + expectedTargets: map[string]evaluation.Target{"target1": {Identifier: "target1"}, "target2": {Identifier: "target2"}}, + }, + { + name: "Two different evaluations with two different targets", + events: []analyticsEvent{ + {target: &evaluation.Target{Identifier: "target1"}, featureConfig: &rest.FeatureConfig{Feature: "feature1"}, variation: &rest.Variation{Identifier: "var1", Value: "value1"}}, + {target: &evaluation.Target{Identifier: "target2"}, featureConfig: &rest.FeatureConfig{Feature: "feature2"}, variation: &rest.Variation{Identifier: "var2", Value: "value2"}}, + }, + expectedEvaluations: map[string]int{"feature1-var1-value1-global": 1, "feature2-var2-value2-global": 1}, + expectedSeen: map[string]bool{"target1": true, "target2": true}, + expectedTargets: map[string]evaluation.Target{"target1": {Identifier: "target1"}, "target2": {Identifier: "target2"}}, + }, + { + name: "Three different evaluations with two identical targets", + events: []analyticsEvent{ + {target: &evaluation.Target{Identifier: "target1"}, featureConfig: &rest.FeatureConfig{Feature: "feature1"}, variation: &rest.Variation{Identifier: "var1", Value: "value1"}}, + {target: &evaluation.Target{Identifier: "target2"}, featureConfig: &rest.FeatureConfig{Feature: "feature2"}, variation: &rest.Variation{Identifier: "var2", Value: "value2"}}, + {target: &evaluation.Target{Identifier: "target3"}, featureConfig: &rest.FeatureConfig{Feature: "feature3"}, variation: &rest.Variation{Identifier: "var3", Value: "value3"}}, + }, + expectedEvaluations: map[string]int{"feature1-var1-value1-global": 1, "feature2-var2-value2-global": 1, "feature3-var3-value3-global": 1}, + expectedSeen: map[string]bool{"target1": true, "target2": true, "target3": true}, + expectedTargets: map[string]evaluation.Target{"target1": {Identifier: "target1"}, "target2": {Identifier: "target2"}, "target3": {Identifier: "target3"}}, + }, + { + name: "Three different evaluations with two anonymous targets", + events: []analyticsEvent{ + {target: &evaluation.Target{Identifier: "target1"}, featureConfig: &rest.FeatureConfig{Feature: "feature1"}, variation: &rest.Variation{Identifier: "var1", Value: "value1"}}, + {target: &evaluation.Target{Identifier: "target2", Anonymous: boolPtr(true)}, featureConfig: &rest.FeatureConfig{Feature: "feature2"}, variation: &rest.Variation{Identifier: "var2", Value: "value2"}}, + {target: &evaluation.Target{Identifier: "target3"}, featureConfig: &rest.FeatureConfig{Feature: "feature3"}, variation: &rest.Variation{Identifier: "var3", Value: "value3"}}, + }, + expectedEvaluations: map[string]int{"feature1-var1-value1-global": 1, "feature2-var2-value2-global": 1, "feature3-var3-value3-global": 1}, + expectedSeen: map[string]bool{"target1": true, "target3": true}, + expectedTargets: map[string]evaluation.Target{"target1": {Identifier: "target1"}, "target3": {Identifier: "target3"}}, + }, + { + name: "Three different evaluations with one anonymous target and one nil target", + events: []analyticsEvent{ + {target: nil, featureConfig: &rest.FeatureConfig{Feature: "feature1"}, variation: &rest.Variation{Identifier: "var1", Value: "value1"}}, + {target: &evaluation.Target{Identifier: "target2", Anonymous: boolPtr(true)}, featureConfig: &rest.FeatureConfig{Feature: "feature2"}, variation: &rest.Variation{Identifier: "var2", Value: "value2"}}, + {target: &evaluation.Target{Identifier: "target3"}, featureConfig: &rest.FeatureConfig{Feature: "feature3"}, variation: &rest.Variation{Identifier: "var3", Value: "value3"}}, + }, + expectedEvaluations: map[string]int{"feature1-var1-value1-global": 1, "feature2-var2-value2-global": 1, "feature3-var3-value3-global": 1}, + expectedSeen: map[string]bool{"target3": true}, + expectedTargets: map[string]evaluation.Target{"target3": {Identifier: "target3"}}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + service := NewAnalyticsService(1*time.Minute, noOpLogger) + defer close(service.analyticsChan) + + // Start the listener in a goroutine + go service.listener() + + // Send all events for the test case + for _, event := range tc.events { + service.analyticsChan <- event + } + + // Allow some time for the events to be processed + time.Sleep(100 * time.Millisecond) + + // Check evaluation metrics counts + for key, expectedCount := range tc.expectedEvaluations { + analytic, exists := service.evaluationAnalytics.get(key) + if !exists || analytic.count != expectedCount { + t.Errorf("Test %s failed: expected count for key %s is %d, got %d", tc.name, key, expectedCount, analytic.count) + } + } + + // Check target metrics + for targetID, expectedSeen := range tc.expectedSeen { + if _, seen := service.seenTargets.get(targetID); seen != expectedSeen { + t.Errorf("Test %s failed: expected target to be in seen targets cache %s is %v", tc.name, targetID, expectedSeen) + } + } + + // Check target analytics + for targetID, expectedTarget := range tc.expectedTargets { + target, exists := service.targetAnalytics.get(targetID) + if !exists || target.Identifier != expectedTarget.Identifier { + t.Errorf("Test %s failed: expected target to be in target cache %s", tc.name, targetID) + } + } + }) + } +} func Test_convertInterfaceToString(t *testing.T) { testCases := map[string]struct { @@ -52,3 +185,7 @@ func Test_convertInterfaceToString(t *testing.T) { }) } } + +func boolPtr(b bool) *bool { + return &b +} diff --git a/analyticsservice/safe_evaluations_map.go b/analyticsservice/safe_evaluations_map.go new file mode 100644 index 0000000..8669955 --- /dev/null +++ b/analyticsservice/safe_evaluations_map.go @@ -0,0 +1,55 @@ +package analyticsservice + +import ( + "sync" +) + +type safeEvaluationAnalytics struct { + sync.RWMutex + data map[string]analyticsEvent +} + +func newSafeEvaluationAnalytics() SafeAnalyticsCache[string, analyticsEvent] { + return &safeEvaluationAnalytics{ + data: make(map[string]analyticsEvent), + } +} + +func (s *safeEvaluationAnalytics) set(key string, value analyticsEvent) { + s.Lock() + defer s.Unlock() + s.data[key] = value +} + +func (s *safeEvaluationAnalytics) get(key string) (analyticsEvent, bool) { + s.RLock() + defer s.RUnlock() + val, exists := s.data[key] + return val, exists +} + +func (s *safeEvaluationAnalytics) size() int { + s.RLock() + defer s.RUnlock() + return len(s.data) +} + +func (s *safeEvaluationAnalytics) delete(key string) { + s.Lock() + defer s.Unlock() + delete(s.data, key) +} + +func (s *safeEvaluationAnalytics) clear() { + s.Lock() + defer s.Unlock() + s.data = make(map[string]analyticsEvent) +} + +func (s *safeEvaluationAnalytics) iterate(f func(string, analyticsEvent)) { + s.RLock() + defer s.RUnlock() + for key, value := range s.data { + f(key, value) + } +} diff --git a/analyticsservice/safe_maps_test.go b/analyticsservice/safe_maps_test.go new file mode 100644 index 0000000..fb25e1d --- /dev/null +++ b/analyticsservice/safe_maps_test.go @@ -0,0 +1,93 @@ +package analyticsservice + +import ( + "reflect" + "sync" + "testing" + + "github.com/harness/ff-golang-server-sdk/evaluation" +) + +func testMapOperations[K comparable, V any](t *testing.T, mapInstance SafeAnalyticsCache[K, V], testData map[K]V) { + var wg sync.WaitGroup + + // Test concurrent sets and gets + for key, value := range testData { + wg.Add(1) + go func(k K, v V) { + defer wg.Done() + mapInstance.set(k, v) + if got, exists := mapInstance.get(k); !exists || !reflect.DeepEqual(got, v) { + t.Errorf("Concurrent set or get method failed for key %v, expected %v, got %v", k, v, got) + } + }(key, value) + } + wg.Wait() + + // Test concurrent iteration and size + for key := range testData { + wg.Add(1) + go func(k K) { + defer wg.Done() + mapInstance.size() + mapInstance.iterate(func(k K, v V) { + if expected, exists := testData[k]; !exists || !reflect.DeepEqual(v, expected) { + t.Errorf("Iterate failed for key %v, expected %v, got %v", k, expected, v) + } + }) + }(key) + } + wg.Wait() + + // Test concurrent deletes + for key := range testData { + wg.Add(1) + go func(k K) { + defer wg.Done() + mapInstance.delete(k) + if _, exists := mapInstance.get(k); exists { + t.Errorf("Concurrent delete method failed, %v should have been deleted", k) + } + }(key) + } + wg.Wait() +} + +func TestSafeEvaluationAnalytics(t *testing.T) { + s := newSafeEvaluationAnalytics() + testData := map[string]analyticsEvent{ + "event1": {count: 10}, + "event2": {count: 5}, + "event3": {count: 5}, + "event4": {count: 3}, + "event5": {count: 2}, + "event6": {count: 1}, + } + + testMapOperations[string, analyticsEvent](t, s, testData) +} + +func TestSafeTargetAnalytics(t *testing.T) { + s := newSafeTargetAnalytics() + testData := map[string]evaluation.Target{ + "target1": {Identifier: "id1"}, + "target2": {Identifier: "id2"}, + "target3": {Identifier: "id3"}, + "target4": {Identifier: "id4"}, + "target5": {Identifier: "id5"}, + } + + testMapOperations[string, evaluation.Target](t, s, testData) +} + +func TestSafeSeenTargets(t *testing.T) { + s := newSafeSeenTargets() + testData := map[string]bool{ + "target1": true, + "target21": true, + "target3": true, + "target4": true, + } + + testMapOperations[string, bool](t, s, testData) +} diff --git a/analyticsservice/safe_seen_targets_map.go b/analyticsservice/safe_seen_targets_map.go new file mode 100644 index 0000000..94e4a8d --- /dev/null +++ b/analyticsservice/safe_seen_targets_map.go @@ -0,0 +1,55 @@ +package analyticsservice + +import ( + "sync" +) + +type safeSeenTargets struct { + sync.RWMutex + data map[string]bool +} + +func newSafeSeenTargets() SafeAnalyticsCache[string, bool] { + return &safeSeenTargets{ + data: make(map[string]bool), + } +} + +func (s *safeSeenTargets) set(key string, seen bool) { + s.Lock() + defer s.Unlock() + s.data[key] = seen +} + +func (s *safeSeenTargets) get(key string) (bool, bool) { + s.RLock() + defer s.RUnlock() + seen, exists := s.data[key] + return seen, exists +} + +func (s *safeSeenTargets) delete(key string) { + s.Lock() + defer s.Unlock() + delete(s.data, key) +} + +func (s *safeSeenTargets) size() int { + s.RLock() + defer s.RUnlock() + return len(s.data) +} + +func (s *safeSeenTargets) clear() { + s.Lock() + defer s.Unlock() + s.data = make(map[string]bool) +} + +func (s *safeSeenTargets) iterate(f func(string, bool)) { + s.RLock() + defer s.RUnlock() + for key, value := range s.data { + f(key, value) + } +} diff --git a/analyticsservice/safe_target_map.go b/analyticsservice/safe_target_map.go new file mode 100644 index 0000000..f8a634b --- /dev/null +++ b/analyticsservice/safe_target_map.go @@ -0,0 +1,63 @@ +package analyticsservice + +import ( + "sync" + + "github.com/harness/ff-golang-server-sdk/evaluation" +) + +type safeTargetAnalytics struct { + sync.RWMutex + data map[string]evaluation.Target +} + +func newSafeTargetAnalytics() SafeAnalyticsCache[string, evaluation.Target] { + return &safeTargetAnalytics{ + data: make(map[string]evaluation.Target), + } +} + +func (s *safeTargetAnalytics) set(key string, value evaluation.Target) { + s.Lock() + defer s.Unlock() + s.data[key] = value +} + +func (s *safeTargetAnalytics) get(key string) (evaluation.Target, bool) { + s.RLock() + defer s.RUnlock() + val, exists := s.data[key] + return val, exists +} + +func (s *safeTargetAnalytics) size() int { + s.RLock() + defer s.RUnlock() + return len(s.data) +} + +func (s *safeTargetAnalytics) delete(key string) { + s.Lock() + defer s.Unlock() + delete(s.data, key) +} + +func (s *safeTargetAnalytics) clear() { + s.Lock() + defer s.Unlock() + s.data = make(map[string]evaluation.Target) +} + +func (s *safeTargetAnalytics) iterate(f func(string, evaluation.Target)) { + s.RLock() + defer s.RUnlock() + for key, value := range s.data { + f(key, value) + } +} + +func (s *safeTargetAnalytics) iterateUnsafe(f func(string, evaluation.Target)) { + for key, value := range s.data { + f(key, value) + } +} diff --git a/client/client.go b/client/client.go index 3062351..4cfabb5 100644 --- a/client/client.go +++ b/client/client.go @@ -586,7 +586,7 @@ func (c *CfClient) setAnalyticsServiceClient(ctx context.Context) { return } c.config.Logger.Info("Posting analytics data enabled") - c.analyticsService.Start(ctx, &c.metricsApi, c.environmentID) + c.analyticsService.Start(ctx, c.metricsApi, c.environmentID) } // BoolVariation returns the value of a boolean feature flag for a given target. diff --git a/client/client_test.go b/client/client_test.go index beca237..1e97e72 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -4,6 +4,12 @@ import ( "bytes" "encoding/json" "errors" + "io" + "net/http" + "os" + "testing" + "time" + "github.com/cenkalti/backoff/v4" "github.com/harness/ff-golang-server-sdk/dto" "github.com/harness/ff-golang-server-sdk/evaluation" @@ -13,11 +19,6 @@ import ( "github.com/harness/ff-golang-server-sdk/types" "github.com/jarcoal/httpmock" "github.com/stretchr/testify/assert" - "io" - "net/http" - "os" - "testing" - "time" ) const ( @@ -591,24 +592,6 @@ func TestCfClient_DefaultVariationReturned(t *testing.T) { expectedJSON: types.JSON{"a default flagIdentifier": "a default value"}, expectedError: DefaultVariationReturnedError, }, - { - name: "Evaluations with Sync client with valid sdk key and flag not found", - clientFunc: func() (*CfClient, error) { - return newClient(http.DefaultClient, ValidSDKKey, WithWaitForInitialized(true)) - }, - mockResponder: func() { - authSuccessResponse := AuthResponse(200, ValidAuthToken) - registerResponders(authSuccessResponse, TargetSegmentsResponse, FeatureConfigsResponse) - - }, - flagIdentifier: "made up", - expectedBool: false, - expectedString: "a default value", - expectedInt: 45555, - expectedNumber: 45.222, - expectedJSON: types.JSON{"a default flagIdentifier": "a default value"}, - expectedError: DefaultVariationReturnedError, - }, } target := target() diff --git a/sdk_codes/sdk_codes.go b/sdk_codes/sdk_codes.go index 744744b..fbbf003 100644 --- a/sdk_codes/sdk_codes.go +++ b/sdk_codes/sdk_codes.go @@ -4,29 +4,30 @@ package sdk_codes type SDKCode string const ( - InitSuccess SDKCode = "SDKCODE:1000" - InitAuthError SDKCode = "SDKCODE:1001" - InitMissingKey SDKCode = "SDKCODE:1002" - InitWaiting SDKCode = "SDKCODE:1003" - AuthSuccess SDKCode = "SDKCODE:2000" - AuthFailed SDKCode = "SDKCODE:2001" - AuthAttempt SDKCode = "SDKCODE:2002" - AuthExceededRetries SDKCode = "SDKCODE:2003" - CloseStarted SDKCode = "SDKCODE:3000" - CloseSuccess SDKCode = "SDKCODE:3001" - PollStart SDKCode = "SDKCODE:4000" - PollStop SDKCode = "SDKCODE:4001" - StreamStarted SDKCode = "SDKCODE:5000" - StreamDisconnected SDKCode = "SDKCODE:5001" - StreamEvent SDKCode = "SDKCODE:5002" - // StreamRetry TODO it's not clear how the SSE retry mechanism is working. Add this once SSE resilency has been established in FFM-9485 - StreamRetry SDKCode = "SDKCODE:5003" - StreamStop SDKCode = "SDKCODE:5004" - EvaluationSuccess SDKCode = "SDKCODE:6000" - EvaluationFailed SDKCode = "SDKCODE:6001" - MissingBucketBy SDKCode = "SDKCODE:6002" - MetricsStarted SDKCode = "SDKCODE:7000" - MetricsStopped SDKCode = "SDKCODE:7001" - MetricsSendFail SDKCode = "SDKCODE:7002" - MetricsSendSuccess SDKCode = "SDKCODE:7003" + InitSuccess SDKCode = "SDKCODE:1000" + InitAuthError SDKCode = "SDKCODE:1001" + InitMissingKey SDKCode = "SDKCODE:1002" + InitWaiting SDKCode = "SDKCODE:1003" + AuthSuccess SDKCode = "SDKCODE:2000" + AuthFailed SDKCode = "SDKCODE:2001" + AuthAttempt SDKCode = "SDKCODE:2002" + AuthExceededRetries SDKCode = "SDKCODE:2003" + CloseStarted SDKCode = "SDKCODE:3000" + CloseSuccess SDKCode = "SDKCODE:3001" + PollStart SDKCode = "SDKCODE:4000" + PollStop SDKCode = "SDKCODE:4001" + StreamStarted SDKCode = "SDKCODE:5000" + StreamDisconnected SDKCode = "SDKCODE:5001" + StreamEvent SDKCode = "SDKCODE:5002" + StreamRetry SDKCode = "SDKCODE:5003" + StreamStop SDKCode = "SDKCODE:5004" + EvaluationSuccess SDKCode = "SDKCODE:6000" + EvaluationFailed SDKCode = "SDKCODE:6001" + MissingBucketBy SDKCode = "SDKCODE:6002" + MetricsStarted SDKCode = "SDKCODE:7000" + MetricsStopped SDKCode = "SDKCODE:7001" + MetricsSendFail SDKCode = "SDKCODE:7002" + MetricsSendSuccess SDKCode = "SDKCODE:7003" + TargetMetricsMaxSizeReached SDKCode = "SDKCODE:7004" + EvaluationMetricsMaxSizeReached SDKCode = "SDKCODE:7007" )