Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FFM-11022 Flag/Group cache recovery #111

Merged
merged 13 commits into from Mar 27, 2024
Merged
13 changes: 12 additions & 1 deletion client/api/Config.cs
Expand Up @@ -22,10 +22,13 @@ public class Config
internal bool streamEnabled = true;
internal int evaluationMetricsMaxSize = 10000;
internal int targetMetricsMaxSize = 100000;
internal int cacheRecoveryTimeoutInMs = 5000;



public Config(string configUrl, string eventUrl, bool streamEnabled, int pollIntervalInSeconds,
bool analyticsEnabled, int frequency, int targetMetricsMaxSize, int connectionTimeout, int readTimeout,
int writeTimeout, bool debug, long metricsServiceAcceptableDuration)
int writeTimeout, bool debug, long metricsServiceAcceptableDuration, int cacheRecoveryTimeoutInMs)
{
this.configUrl = configUrl;
this.eventUrl = eventUrl;
Expand All @@ -39,6 +42,7 @@ public class Config
this.writeTimeout = writeTimeout;
this.debug = debug;
this.metricsServiceAcceptableDuration = metricsServiceAcceptableDuration;
this.cacheRecoveryTimeoutInMs = cacheRecoveryTimeoutInMs;
}

public Config()
Expand All @@ -64,6 +68,7 @@ public Config()

public int TargetMetricsMaxSize => targetMetricsMaxSize;
public int EvaluationMetricsMaxSize => evaluationMetricsMaxSize;
public int CacheRecoveryTimeoutInMs => cacheRecoveryTimeoutInMs;


/**
Expand Down Expand Up @@ -189,6 +194,12 @@ public ConfigBuilder debug(bool debug)
configtobuild.debug = debug;
return this;
}

public ConfigBuilder SetCacheRecoveryTimeout(int timeoutMilliseconds)
{
configtobuild.cacheRecoveryTimeoutInMs = timeoutMilliseconds;
return this;
}

/// <summary>
/// <para>
Expand Down
35 changes: 33 additions & 2 deletions client/api/Evaluator.cs
Expand Up @@ -33,15 +33,20 @@ internal class Evaluator : IEvaluator
private readonly ILogger<Evaluator> logger;
private readonly ILoggerFactory loggerFactory;
private readonly IRepository repository;
private readonly IPollingProcessor poller;
private readonly Config config;


public Evaluator(IRepository repository, IEvaluatorCallback callback, ILoggerFactory loggerFactory,
bool isAnalyticsEnabled)
bool isAnalyticsEnabled, IPollingProcessor poller, Config config)
{
this.repository = repository;
this.callback = callback;
logger = loggerFactory.CreateLogger<Evaluator>();
this.loggerFactory = loggerFactory;
IsAnalyticsEnabled = isAnalyticsEnabled;
this.poller = poller;
this.config = config;
}

public bool BoolVariation(string key, Target target, bool defaultValue)
Expand Down Expand Up @@ -85,8 +90,34 @@ public string StringVariation(string key, Target target, string defaultValue)
private Variation EvaluateVariation(string key, Target target, FeatureConfigKind kind)
{
var featureConfig = repository.GetFlag(key);
if (featureConfig == null || featureConfig.Kind != kind)
if (featureConfig == null)
{
logger.LogWarning(
"Unable to find flag {Key} in cache, refreshing flag cache and retrying evaluation ",
key);
var refreshSuccess = poller.RefreshFlags(TimeSpan.FromSeconds(config.CacheRecoveryTimeoutInMs));

if (refreshSuccess)
// Re-attempt to fetch the feature config after the refresh
featureConfig = repository.GetFlag(key);

// If still not found or doesn't match the kind, return null to indicate failure
if (featureConfig == null)
{
logger.LogError(
"Failed to find flag {Key} in cache even after attempting a refresh. Check flag exists in project",
key);
return null;
}
}

if (featureConfig.Kind != kind)
{
logger.LogWarning(
"Requested variation {Kind} does not match flag {Key} which is of type {featureConfigKind}",
kind, key, featureConfig.Kind);
return null;
}

var prerequisites = featureConfig.Prerequisites;
if (prerequisites != null && prerequisites.Count > 0)
Expand Down
126 changes: 107 additions & 19 deletions client/api/InnerClient.cs
Expand Up @@ -33,6 +33,7 @@ internal class InnerClient :
private IEvaluator evaluator;
private MetricsProcessor metric;
private IConnector connector;
private Config config;

public event EventHandler InitializationCompleted;
public event EventHandler<string> EvaluationChanged;
Expand All @@ -50,6 +51,7 @@ public InnerClient(string apiKey, Config config, CfClient parent, ILoggerFactory
this.loggerFactory = loggerFactory;
this.parent = parent;
this.logger = loggerFactory.CreateLogger<InnerClient>();
this.config = config;
Initialize(apiKey, config);
}

Expand Down Expand Up @@ -81,7 +83,7 @@ public void Initialize(IConnector connector, Config config)
this.repository = new StorageRepository(config.Cache, config.Store, this, loggerFactory);
this.polling = new PollingProcessor(connector, this.repository, config, this, loggerFactory);
this.update = new UpdateProcessor(connector, this.repository, config, this, loggerFactory);
this.evaluator = new Evaluator(this.repository, this, loggerFactory, config.analyticsEnabled);
this.evaluator = new Evaluator(this.repository, this, loggerFactory, config.analyticsEnabled, polling, config);
// Since 1.4.2, we enable the global target for evaluation metrics.
this.metric = new MetricsProcessor(config, evaluationAnalyticsCache, targetAnalyticsCache, new AnalyticsPublisherService(connector, evaluationAnalyticsCache, targetAnalyticsCache, loggerFactory), loggerFactory, true);
Start();
Expand Down Expand Up @@ -213,38 +215,82 @@ private void OnNotifyEvaluationChanged(string identifier)
EvaluationChanged?.Invoke(parent, identifier);
}

public bool BoolVariation(string key, dto.Target target, bool defaultValue)
public bool BoolVariation(string key, Target target, bool defaultValue)
{
try
{
return evaluator.BoolVariation(key, target, defaultValue);
}
catch (InvalidCacheStateException ex)
{
logger.LogError(ex, "Invalid cache state detected when evaluating boolean variation for flag {Key}",
logger.LogWarning(ex,
"Invalid cache state detected when evaluating boolean variation for flag {Key}, refreshing cache and retrying evaluation ",
key);
LogEvaluationFailureError(FeatureConfigKind.Boolean, key, target, defaultValue.ToString());
polling.TriggerProcessSegments();
return defaultValue;
// Attempt to refresh cache
var success = polling.RefreshFlagsAndSegments(TimeSpan.FromMilliseconds(2000));

// If the refresh has failed or exceeded the timout, return default variation
if (!success)
{
logger.LogError(ex,
"Refreshing cache for boolean variation for flag {Key} failed, returning default variation ",
key);
LogEvaluationFailureError(FeatureConfigKind.Boolean, key, target, defaultValue.ToString());
return defaultValue;
}

try
{
return evaluator.BoolVariation(key, target, defaultValue);
}
catch (InvalidCacheStateException)
{
logger.LogError(ex,
"Attempted re-evaluation of boolean variation for flag {Key} after refreshing cache failed due to invalid cache state, returning default variation",
key);
LogEvaluationFailureError(FeatureConfigKind.Boolean, key, target, defaultValue.ToString());
return defaultValue;
}
}
}

public string StringVariation(string key, dto.Target target, string defaultValue)
public string StringVariation(string key, Target target, string defaultValue)
{
try
{
return evaluator.StringVariation(key, target, defaultValue);
}
catch (InvalidCacheStateException ex)
{
logger.LogError(ex, "Invalid cache state detected when evaluating string variation for flag {Key}",
logger.LogWarning(ex,
"Invalid cache state detected when evaluating string variation for flag {Key}, refreshing cache and retrying evaluation",
key);
LogEvaluationFailureError(FeatureConfigKind.String, key, target, defaultValue);
polling.TriggerProcessSegments();
return defaultValue;
var success = polling.RefreshFlagsAndSegments(TimeSpan.FromSeconds(config.CacheRecoveryTimeoutInMs));
if (!success)
{
logger.LogError(
"Refreshing cache for string variation for flag {Key} failed, returning default variation",
key);
LogEvaluationFailureError(FeatureConfigKind.String, key, target, defaultValue);
return defaultValue;
}

try
{
return evaluator.StringVariation(key, target, defaultValue);
}
catch (InvalidCacheStateException)
{
logger.LogWarning(
"Attempted re-evaluation of string variation for flag {Key} after refreshing cache failed due to invalid cache state, returning default variation",
key);
LogEvaluationFailureError(FeatureConfigKind.String, key, target, defaultValue);
return defaultValue;
}
}
}


public double NumberVariation(string key, Target target, double defaultValue)
{
try
Expand All @@ -253,14 +299,35 @@ public double NumberVariation(string key, Target target, double defaultValue)
}
catch (InvalidCacheStateException ex)
{
logger.LogError(ex, "Invalid cache state detected when evaluating number variation for flag {Key}",
logger.LogWarning(ex,
"Invalid cache state detected when evaluating number variation for flag {Key}, refreshing cache and retrying evaluation",
key);
LogEvaluationFailureError(FeatureConfigKind.Int, key, target, defaultValue.ToString());
polling.TriggerProcessSegments();
return defaultValue;
var success = polling.RefreshFlagsAndSegments(TimeSpan.FromSeconds(config.CacheRecoveryTimeoutInMs));
if (!success)
{
logger.LogError(
"Refreshing cache for number variation for flag {Key} failed, returning default variation",
key);
LogEvaluationFailureError(FeatureConfigKind.Int, key, target, defaultValue.ToString());
return defaultValue;
}

try
{
return evaluator.NumberVariation(key, target, defaultValue);
}
catch (InvalidCacheStateException)
{
logger.LogWarning(
"Attempted re-evaluation of number variation for flag {Key} after refreshing cache failed due to invalid cache state, returning default variation",
key);
LogEvaluationFailureError(FeatureConfigKind.Int, key, target, defaultValue.ToString());
return defaultValue;
}
}
}


public JObject JsonVariation(string key, Target target, JObject defaultValue)
{
try
Expand All @@ -269,14 +336,35 @@ public JObject JsonVariation(string key, Target target, JObject defaultValue)
}
catch (InvalidCacheStateException ex)
{
logger.LogError(ex, "Invalid cache state detected when evaluating json variation for flag {Key}",
logger.LogWarning(ex,
"Invalid cache state detected when evaluating json variation for flag {Key}, refreshing cache and retrying evaluation",
key);
LogEvaluationFailureError(FeatureConfigKind.Json, key, target, defaultValue.ToString());
polling.TriggerProcessSegments();
return defaultValue;
var success = polling.RefreshFlagsAndSegments(TimeSpan.FromSeconds(config.CacheRecoveryTimeoutInMs));
if (!success)
{
logger.LogError(
"Refreshing cache for json variation for flag {Key} failed, returning default variation",
key);
LogEvaluationFailureError(FeatureConfigKind.Json, key, target, defaultValue.ToString());
return defaultValue;
}

try
{
return evaluator.JsonVariation(key, target, defaultValue);
}
catch (InvalidCacheStateException)
{
logger.LogWarning(
"Attempted re-evaluation of json variation for flag {Key} after refreshing cache failed due to invalid cache state, returning default variation",
key);
LogEvaluationFailureError(FeatureConfigKind.Json, key, target, defaultValue.ToString());
return defaultValue;
}
}
}


public void Close()
{
this.connector?.Close();
Expand Down
71 changes: 61 additions & 10 deletions client/api/PollingProcessor.cs
Expand Up @@ -30,7 +30,11 @@ internal interface IPollingProcessor
/// </summary>
void Start();

void TriggerProcessSegments();
bool RefreshSegments(TimeSpan timeout);
bool RefreshFlags(TimeSpan timeout);

bool RefreshFlagsAndSegments(TimeSpan timeout);

}

/// <summary>
Expand Down Expand Up @@ -133,18 +137,65 @@ private async Task ProcessSegments()
throw;
}
}

public bool RefreshFlagsAndSegments(TimeSpan timeout)
{
var processSegmentsTask = Task.Run(async () => await ProcessSegments());
var processFlagsTask = Task.Run(async () => await ProcessFlags());

try
{
// Await both tasks to complete within the timeout
if (Task.WaitAll(new[] { processSegmentsTask, processFlagsTask }, timeout)) return true;

logger.LogWarning("Refreshing flags and groups did not complete within the specified timeout");
return false;
}
catch (Exception ex)
{
logger.LogError(ex, "Exception occurred while refreshing flags and groups");
return false;
}
}

public void TriggerProcessSegments()
public bool RefreshSegments(TimeSpan timeout)
{
try
{
var task = Task.Run(async () => await ProcessSegments());
if (task.Wait(timeout))
{
return true;
}

logger.LogWarning("RefreshSegments did not complete within the specified timeout");
return false;
}
catch (Exception ex)
{
logger.LogError(ex, "Exception occurred while trying to refresh groups");
return false;
}
}

public bool RefreshFlags(TimeSpan timeout)
{
Task.Run(async () => await ProcessSegments())
.ContinueWith(task =>
try
{
var task = Task.Run(async () => await ProcessFlags());
if (task.Wait(timeout))
{
if (task.Exception != null)
{
// Handle exceptions from ProcessSegments
logger.LogError(task.Exception, "Error occurred in TriggerProcessSegments");
}
});
return true;
}

logger.LogWarning("RefreshFlags did not complete within the specified timeout");
return false;
}
catch (Exception ex)
{
logger.LogError(ex, "Exception occurred while trying to refresh flags");
return false;
}
}


Expand Down
4 changes: 2 additions & 2 deletions ff-netF48-server-sdk.csproj
Expand Up @@ -10,8 +10,8 @@
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<Version>1.6.4</Version>
<PackOnBuild>true</PackOnBuild>
<PackageVersion>1.6.4</PackageVersion>
<AssemblyVersion>1.6.4</AssemblyVersion>
<PackageVersion>1.6.5</PackageVersion>
<AssemblyVersion>1.6.5</AssemblyVersion>
<Authors>support@harness.io</Authors>
<Copyright>Copyright © 2024</Copyright>
<PackageIconUrl>https://harness.io/icon-ff.svg</PackageIconUrl>
Expand Down