Skip to content

Commit

Permalink
FFM-11360 Use target-segments v2-rule parameter / Swap to atomic wrap…
Browse files Browse the repository at this point in the history
…per in analytics (#157)
  • Loading branch information
erdirowlands committed May 2, 2024
1 parent 57d3bd4 commit d2fb709
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 91 deletions.
18 changes: 9 additions & 9 deletions analyticsservice/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
variationValueAttribute string = "featureValue"
targetAttribute string = "target"
sdkVersionAttribute string = "SDK_VERSION"
SdkVersion string = "0.1.22"
SdkVersion string = "0.1.23"
sdkTypeAttribute string = "SDK_TYPE"
sdkType string = "server"
sdkLanguageAttribute string = "SDK_LANGUAGE"
Expand Down Expand Up @@ -59,8 +59,8 @@ type AnalyticsService struct {
evaluationAnalytics SafeAnalyticsCache[string, analyticsEvent]
targetAnalytics SafeAnalyticsCache[string, evaluation.Target]
seenTargets SafeAnalyticsCache[string, bool]
logEvaluationLimitReached int32
logTargetLimitReached int32
logEvaluationLimitReached atomic.Bool
logTargetLimitReached atomic.Bool
timeout time.Duration
logger logger.Logger
metricsClient metricsclient.ClientWithResponsesInterface
Expand Down Expand Up @@ -136,9 +136,9 @@ func (as *AnalyticsService) listener() {
as.evaluationAnalytics.set(analyticsKey, ad)
}
} else {
if atomic.LoadInt32(&as.logEvaluationLimitReached) == 0 {
if !as.logEvaluationLimitReached.Load() {
as.logger.Warnf("%s Evaluation analytic cache reached max size, remaining evaluation metrics for this analytics interval will not be sent", sdk_codes.EvaluationMetricsMaxSizeReached)
atomic.StoreInt32(&as.logEvaluationLimitReached, 1)
as.logEvaluationLimitReached.Store(true)
}
}

Expand All @@ -161,9 +161,9 @@ func (as *AnalyticsService) listener() {
if as.targetAnalytics.size() < maxTargetEntries {
as.targetAnalytics.set(ad.target.Identifier, *ad.target)
} else {
if atomic.LoadInt32(&as.logTargetLimitReached) == 0 {
if !as.logTargetLimitReached.Load() {
as.logger.Warnf("%s Target analytics cache reached max size, remaining target metrics for this analytics interval will not be sent", sdk_codes.TargetMetricsMaxSizeReached)
atomic.StoreInt32(&as.logTargetLimitReached, 1)
as.logTargetLimitReached.Store(true)
}
}
}
Expand Down Expand Up @@ -212,8 +212,8 @@ func (as *AnalyticsService) sendDataAndResetCache(ctx context.Context) {
as.targetAnalytics = newSafeTargetAnalytics()

// Reset flags that keep track of cache limits being reached for logging purposes
atomic.StoreInt32(&as.logEvaluationLimitReached, 0)
atomic.StoreInt32(&as.logTargetLimitReached, 0)
as.logEvaluationLimitReached.Store(false)
as.logTargetLimitReached.Store(false)

metricData := make([]metricsclient.MetricsData, 0, evaluationAnalyticsClone.size())
targetData := make([]metricsclient.TargetData, 0, targetAnalyticsClone.size())
Expand Down
8 changes: 8 additions & 0 deletions apiconfig/api_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package apiconfig

import "github.com/harness/ff-golang-server-sdk/rest"

// ApiConfiguration is a type that provides the required configuration for requests
type ApiConfiguration interface {
GetSegmentRulesV2QueryParam() *rest.SegmentRulesV2QueryParam
}
7 changes: 5 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (c *CfClient) streamConnect(ctx context.Context) {
sseClient.Connection = c.config.httpClient

conn := stream.NewSSEClient(c.sdkKey, c.token, sseClient, c.repository, c.api, c.config.Logger,
c.config.eventStreamListener, c.config.proxyMode, c.streamConnectedChan, c.streamDisconnectedChan)
c.config.eventStreamListener, c.config.proxyMode, c.streamConnectedChan, c.streamDisconnectedChan, c.config.apiConfig)

// Connect kicks off a goroutine that attempts to establish a stream connection
// while this is happening we set streamConnectedBool to true - if any errors happen
Expand Down Expand Up @@ -558,7 +558,10 @@ func (c *CfClient) retrieveSegments(ctx context.Context) error {
c.mux.RLock()
defer c.mux.RUnlock()
c.config.Logger.Info("Retrieving segments started")
segments, err := c.api.GetAllSegmentsWithResponse(ctx, c.environmentID, nil)
requestParams := &rest.GetAllSegmentsParams{
Rules: c.config.apiConfig.GetSegmentRulesV2QueryParam(),
}
segments, err := c.api.GetAllSegmentsWithResponse(ctx, c.environmentID, requestParams)
if err != nil {
// log
return err
Expand Down
22 changes: 19 additions & 3 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@ package client

import (
"fmt"
"net/http"
"os"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/harness/ff-golang-server-sdk/cache"
"github.com/harness/ff-golang-server-sdk/evaluation"
"github.com/harness/ff-golang-server-sdk/logger"
"github.com/harness/ff-golang-server-sdk/rest"
"github.com/harness/ff-golang-server-sdk/storage"
"github.com/harness/ff-golang-server-sdk/stream"
"github.com/harness/ff-golang-server-sdk/types"
"github.com/hashicorp/go-retryablehttp"
"net/http"
"os"
"time"
)

type config struct {
Expand All @@ -35,6 +37,15 @@ type config struct {
authRetryStrategy *backoff.ExponentialBackOff
streamingRetryStrategy *backoff.ExponentialBackOff
sleeper types.Sleeper
apiConfig *apiConfiguration
}

type apiConfiguration struct {
segmentRulesV2QueryParam rest.SegmentRulesV2QueryParam
}

func (a *apiConfiguration) GetSegmentRulesV2QueryParam() *rest.SegmentRulesV2QueryParam {
return &a.segmentRulesV2QueryParam
}

func newDefaultConfig(log logger.Logger) *config {
Expand Down Expand Up @@ -71,6 +82,10 @@ func newDefaultConfig(log logger.Logger) *config {
return resp, fmt.Errorf(message)
}

apiConfig := &apiConfiguration{
segmentRulesV2QueryParam: "v2",
}

return &config{
url: "https://config.ff.harness.io/api/1.0",
eventsURL: "https://events.ff.harness.io/api/1.0",
Expand All @@ -89,6 +104,7 @@ func newDefaultConfig(log logger.Logger) *config {
authRetryStrategy: getDefaultExpBackoff(),
streamingRetryStrategy: getDefaultExpBackoff(),
sleeper: &types.RealClock{},
apiConfig: apiConfig,
}
}

Expand Down
15 changes: 14 additions & 1 deletion resources/client-v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ paths:
schema:
type: string
- $ref: '#/components/parameters/clusterQueryOptionalParam'
- $ref: '#/components/parameters/segmentRulesV2QueryParam'
security:
- BearerAuth: []
responses:
Expand Down Expand Up @@ -127,6 +128,7 @@ paths:
schema:
type: string
- $ref: '#/components/parameters/clusterQueryOptionalParam'
- $ref: '#/components/parameters/segmentRulesV2QueryParam'
security:
- BearerAuth: []
responses:
Expand Down Expand Up @@ -952,6 +954,17 @@ components:
description: Unique identifier for the cluster for the account
schema:
type: string
segmentRulesV2QueryParam:
name: rules
in: query
required: false
description: >-
When set to rules=v2 will return AND rule compatible serving_rules
field. When not set or set to any other value will return old rules
field only compatible with OR rules.
allowEmptyValue: true
schema:
type: string
environmentPathParam:
name: environmentUUID
in: path
Expand Down Expand Up @@ -1009,4 +1022,4 @@ components:
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
$ref: '#/components/schemas/Error'
2 changes: 1 addition & 1 deletion resources/metrics-v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,4 @@ components:
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
$ref: '#/components/schemas/Error'

0 comments on commit d2fb709

Please sign in to comment.