Skip to content

Commit

Permalink
[FFM-11089] - Cache & exception handling improvements (#112)
Browse files Browse the repository at this point in the history
* [FFM-11089] - Cache & exception handling improvements

What
- Improve cache locking and synchronization
- Improve stream error handling
- Increase capacity of dictionary
- Additional cache recovery checks

Why
We're getting reports of cache not being setup correctly in the field.

Testing
Unit + Manual + Testgrid
  • Loading branch information
andybharness committed Mar 28, 2024
1 parent 5198506 commit e5db133
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 66 deletions.
12 changes: 2 additions & 10 deletions client/api/PollingProcessor.cs
Expand Up @@ -116,12 +116,8 @@ private async Task ProcessFlags()
logger.LogDebug("Fetching flags started");
var flags = await this.connector.GetFlags();
logger.LogDebug("Fetching flags finished");

foreach (var item in flags.ToArray())
{
repository.SetFlag(item.Feature, item);
}

repository.SetFlags(flags);
}
catch (Exception ex)
{
Expand All @@ -136,11 +132,7 @@ private async Task ProcessSegments()
logger.LogDebug("Fetching segments started");
IEnumerable<Segment> segments = await connector.GetSegments();
logger.LogDebug("Fetching segments finished");

foreach (Segment item in segments.ToArray())
{
repository.SetSegment(item.Identifier, item);
}
repository.SetSegments(segments);

logger.LogDebug("Loaded {SegmentRuleCount}", segments.Count());
}
Expand Down
172 changes: 120 additions & 52 deletions client/api/Repository.cs
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using io.harness.cfsdk.client.cache;
using io.harness.cfsdk.HarnessOpenAPIService;
using Microsoft.Extensions.Logging;
Expand All @@ -21,6 +23,8 @@ internal interface IRepository
void SetFlag(string identifier, FeatureConfig featureConfig);
void SetSegment(string identifier, Segment segment);

void SetFlags(IEnumerable<FeatureConfig> flags);
void SetSegments(IEnumerable<Segment> segments);

FeatureConfig GetFlag(string identifier);
Segment GetSegment(string identifier);
Expand All @@ -34,12 +38,14 @@ internal interface IRepository

internal class StorageRepository : IRepository
{
private readonly ReaderWriterLockSlim rwLock;
private readonly ILogger logger;
private readonly ICache cache;
private readonly IStore store;
private readonly IRepositoryCallback callback;
private readonly IRepositoryCallback callback; // avoid calling callbacks inside rwLocks!
public StorageRepository(ICache cache, IStore store, IRepositoryCallback callback, ILoggerFactory loggerFactory)
{
this.rwLock = new ReaderWriterLockSlim();
this.cache = cache;
this.store = store;
this.callback = callback;
Expand All @@ -59,58 +65,80 @@ public Segment GetSegment(string identifier)
}
IEnumerable<string> IRepository.FindFlagsBySegment(string segment)
{
List<string> features = new List<string>();
ICollection<string> keys = this.store != null ? this.store.Keys() : this.cache.Keys();
foreach( string key in keys)
rwLock.EnterReadLock();
try
{
FeatureConfig flag = GetFlag(key);
if(flag != null && flag.Rules != null)
List<string> features = new List<string>();
ICollection<string> keys = this.store != null ? this.store.Keys() : this.cache.Keys();
foreach (string key in keys)
{
foreach( ServingRule rule in flag.Rules)
FeatureConfig flag = GetFlag(key);
if (flag != null && flag.Rules != null)
{
foreach (Clause clause in rule.Clauses)
foreach (ServingRule rule in flag.Rules)
{
if(clause.Op.Equals("segmentMatch") && clause.Values.Contains(segment))
foreach (Clause clause in rule.Clauses)
{
features.Add(flag.Feature);
if (clause.Op.Equals("segmentMatch") && clause.Values.Contains(segment))
{
features.Add(flag.Feature);
}
}
}
}
}

return features;
}
finally
{
rwLock.ExitReadLock();
}
return features;
}
public void DeleteFlag(string identifier)
{
string key = FlagKey(identifier);
if (store != null)
rwLock.EnterWriteLock();
try
{
logger.LogDebug("Flag {identifier} successfully deleted from store", identifier);
store.Delete(key);
string key = FlagKey(identifier);
if (store != null)
{
logger.LogDebug("Flag {identifier} successfully deleted from store", identifier);
store.Delete(key);
}

this.cache.Delete(key);
logger.LogDebug("Flag {identifier} successfully deleted from cache", identifier);

}
this.cache.Delete(key);
logger.LogDebug("Flag {identifier} successfully deleted from cache", identifier);
if (this.callback != null)
finally
{
this.callback.OnFlagDeleted(identifier);
rwLock.ExitWriteLock();
}

this.callback?.OnFlagDeleted(identifier);
}

public void DeleteSegment(string identifier)
{
string key = SegmentKey(identifier);
if (store != null)
rwLock.EnterWriteLock();
try
{
logger.LogDebug("Segment {identifier} successfully deleted from store", identifier);
store.Delete(key);
string key = SegmentKey(identifier);
if (store != null)
{
logger.LogDebug("Segment {identifier} successfully deleted from store", identifier);
store.Delete(key);
}

this.cache.Delete(key);
logger.LogDebug("Segment {identifier} successfully deleted from cache", identifier);
}
this.cache.Delete(key);
logger.LogDebug("Segment {identifier} successfully deleted from cache", identifier);
if (this.callback != null)
finally
{
this.callback.OnSegmentDeleted(identifier);
rwLock.ExitWriteLock();
}

this.callback?.OnSegmentDeleted(identifier);
}
private T GetCache<T>(string key, bool updateCache)
{
Expand All @@ -129,6 +157,7 @@ private T GetCache<T>(string key, bool updateCache)
}
return (T)item;
}

private FeatureConfig GetFlag( string identifer, bool updateCache)
{
string key = FlagKey(identifer);
Expand All @@ -141,38 +170,80 @@ private Segment GetSegment(string identifer, bool updateCache)
}
void IRepository.SetFlag(string identifier, FeatureConfig featureConfig)
{
FeatureConfig current = GetFlag(identifier, false);
// Update stored value in case if server returned newer version,
// or if version is equal 0 (or doesn't exist)
if( current != null && featureConfig.Version != 0 && current.Version >= featureConfig.Version )
rwLock.EnterWriteLock();
try
{
logger.LogTrace("Flag {identifier} already exists", identifier);
return;
}

Update(identifier, FlagKey(identifier), featureConfig);
FeatureConfig current = GetFlag(identifier, false);
// Update stored value in case if server returned newer version,
// or if version is equal 0 (or doesn't exist)
if (current != null && featureConfig.Version != 0 && current.Version >= featureConfig.Version)
{
logger.LogTrace("Flag {identifier} already exists", identifier);
return;
}

if (this.callback != null)
Update(identifier, FlagKey(identifier), featureConfig);
}
finally
{
this.callback.OnFlagStored(identifier);
rwLock.ExitWriteLock();
}

this.callback?.OnFlagStored(identifier);
}
void IRepository.SetSegment(string identifier, Segment segment)
{
Segment current = GetSegment(identifier, false);
// Update stored value in case if server returned newer version,
// or if version is equal 0 (or doesn't exist)
if (current != null && segment.Version != 0 && current.Version >= segment.Version)
rwLock.EnterWriteLock();
try
{
logger.LogTrace("Segment {identifier} already exists", identifier);
return;
Segment current = GetSegment(identifier, false);
// Update stored value in case if server returned newer version,
// or if version is equal 0 (or doesn't exist)
if (current != null && segment.Version != 0 && current.Version >= segment.Version)
{
logger.LogTrace("Segment {identifier} already exists", identifier);
return;
}

Update(identifier, SegmentKey(identifier), segment);
}
finally
{
rwLock.ExitWriteLock();
}

Update(identifier, SegmentKey(identifier), segment);
this.callback?.OnSegmentStored(identifier);
}

public void SetFlags(IEnumerable<FeatureConfig> flags)
{
rwLock.EnterWriteLock();
try
{
foreach (var item in flags)
{
Update(item.Feature, FlagKey(item.Feature), item);
}
}
finally
{
rwLock.ExitWriteLock();
}
}

if (this.callback != null)
public void SetSegments(IEnumerable<Segment> segments)
{
rwLock.EnterWriteLock();
try
{
foreach (var item in segments)
{
Update(item.Identifier, SegmentKey(item.Identifier), item);
}
}
finally
{
this.callback.OnSegmentStored(identifier);
rwLock.ExitWriteLock();
}
}

Expand All @@ -193,10 +264,7 @@ private void Update(string identifier, string key, Object value)

public void Close()
{
if(this.store != null)
{
this.store.Close();
}
this.store?.Close();
}
}
}
1 change: 1 addition & 0 deletions client/connector/EventSource.cs
Expand Up @@ -110,6 +110,7 @@ private async Task StartStreaming()
}
finally
{
Thread.Sleep(TimeSpan.FromSeconds(10));
callback.OnStreamDisconnected();
}

Expand Down
6 changes: 3 additions & 3 deletions ff-netF48-server-sdk.csproj
Expand Up @@ -8,10 +8,10 @@
<PackageId>ff-dotnet-server-sdk</PackageId>
<RootNamespace>io.harness.cfsdk</RootNamespace>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<Version>1.6.4</Version>
<Version>1.6.6</Version>
<PackOnBuild>true</PackOnBuild>
<PackageVersion>1.6.5</PackageVersion>
<AssemblyVersion>1.6.5</AssemblyVersion>
<PackageVersion>1.6.6</PackageVersion>
<AssemblyVersion>1.6.6</AssemblyVersion>
<Authors>support@harness.io</Authors>
<Copyright>Copyright © 2024</Copyright>
<PackageIconUrl>https://harness.io/icon-ff.svg</PackageIconUrl>
Expand Down
2 changes: 1 addition & 1 deletion tests/ff-server-sdk-test/EvaluatorTest.cs
Expand Up @@ -56,7 +56,7 @@ static EvaluatorTest()
var listener = new EvaluatorListener();
cache = new FeatureSegmentCache();
repository = new StorageRepository(cache, null, null, loggerFactory);
evaluator = new Evaluator(repository, listener, loggerFactory, true);
evaluator = new Evaluator(repository, listener, loggerFactory, true, null, null);
}

private static void LoadSegments(List<Segment> segments)
Expand Down

0 comments on commit e5db133

Please sign in to comment.