From e5db13384d4cf368c2e78ce58503c12016bf6ec4 Mon Sep 17 00:00:00 2001 From: Andrew Bell <115623869+andybharness@users.noreply.github.com> Date: Thu, 28 Mar 2024 17:43:36 +0000 Subject: [PATCH] [FFM-11089] - Cache & exception handling improvements (#112) * [FFM-11089] - Cache & exception handling improvements What - Improve cache locking and synchronization - Improve stream error handling - Increase capacity of dictionary - Additional cache recovery checks Why We're getting reports of cache not being setup correctly in the field. Testing Unit + Manual + Testgrid --- client/api/PollingProcessor.cs | 12 +- client/api/Repository.cs | 172 ++++++++++++------ client/connector/EventSource.cs | 1 + ff-netF48-server-sdk.csproj | 6 +- tests/ff-server-sdk-test/EvaluatorTest.cs | 2 +- .../StorageRepositoryTest.cs | 91 +++++++++ tests/ff-server-sdk-test/api/CfClientTest.cs | 67 +++++++ 7 files changed, 285 insertions(+), 66 deletions(-) create mode 100644 tests/ff-server-sdk-test/StorageRepositoryTest.cs diff --git a/client/api/PollingProcessor.cs b/client/api/PollingProcessor.cs index 32b3b49..e5734cd 100644 --- a/client/api/PollingProcessor.cs +++ b/client/api/PollingProcessor.cs @@ -116,12 +116,8 @@ private async Task ProcessFlags() logger.LogDebug("Fetching flags started"); var flags = await this.connector.GetFlags(); logger.LogDebug("Fetching flags finished"); - - foreach (var item in flags.ToArray()) - { - repository.SetFlag(item.Feature, item); - } + repository.SetFlags(flags); } catch (Exception ex) { @@ -136,11 +132,7 @@ private async Task ProcessSegments() logger.LogDebug("Fetching segments started"); IEnumerable segments = await connector.GetSegments(); logger.LogDebug("Fetching segments finished"); - - foreach (Segment item in segments.ToArray()) - { - repository.SetSegment(item.Identifier, item); - } + repository.SetSegments(segments); logger.LogDebug("Loaded {SegmentRuleCount}", segments.Count()); } diff --git a/client/api/Repository.cs b/client/api/Repository.cs index d8c76f8..9673a57 100644 --- a/client/api/Repository.cs +++ b/client/api/Repository.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Runtime.CompilerServices; +using System.Threading; using io.harness.cfsdk.client.cache; using io.harness.cfsdk.HarnessOpenAPIService; using Microsoft.Extensions.Logging; @@ -21,6 +23,8 @@ internal interface IRepository void SetFlag(string identifier, FeatureConfig featureConfig); void SetSegment(string identifier, Segment segment); + void SetFlags(IEnumerable flags); + void SetSegments(IEnumerable segments); FeatureConfig GetFlag(string identifier); Segment GetSegment(string identifier); @@ -34,12 +38,14 @@ internal interface IRepository internal class StorageRepository : IRepository { + private readonly ReaderWriterLockSlim rwLock; private readonly ILogger logger; private readonly ICache cache; private readonly IStore store; - private readonly IRepositoryCallback callback; + private readonly IRepositoryCallback callback; // avoid calling callbacks inside rwLocks! public StorageRepository(ICache cache, IStore store, IRepositoryCallback callback, ILoggerFactory loggerFactory) { + this.rwLock = new ReaderWriterLockSlim(); this.cache = cache; this.store = store; this.callback = callback; @@ -59,58 +65,80 @@ public Segment GetSegment(string identifier) } IEnumerable IRepository.FindFlagsBySegment(string segment) { - List features = new List(); - ICollection keys = this.store != null ? this.store.Keys() : this.cache.Keys(); - foreach( string key in keys) + rwLock.EnterReadLock(); + try { - FeatureConfig flag = GetFlag(key); - if(flag != null && flag.Rules != null) + List features = new List(); + ICollection keys = this.store != null ? this.store.Keys() : this.cache.Keys(); + foreach (string key in keys) { - foreach( ServingRule rule in flag.Rules) + FeatureConfig flag = GetFlag(key); + if (flag != null && flag.Rules != null) { - foreach (Clause clause in rule.Clauses) + foreach (ServingRule rule in flag.Rules) { - if(clause.Op.Equals("segmentMatch") && clause.Values.Contains(segment)) + foreach (Clause clause in rule.Clauses) { - features.Add(flag.Feature); + if (clause.Op.Equals("segmentMatch") && clause.Values.Contains(segment)) + { + features.Add(flag.Feature); + } } } } } - + return features; + } + finally + { + rwLock.ExitReadLock(); } - return features; } public void DeleteFlag(string identifier) { - string key = FlagKey(identifier); - if (store != null) + rwLock.EnterWriteLock(); + try { - logger.LogDebug("Flag {identifier} successfully deleted from store", identifier); - store.Delete(key); + string key = FlagKey(identifier); + if (store != null) + { + logger.LogDebug("Flag {identifier} successfully deleted from store", identifier); + store.Delete(key); + } + + this.cache.Delete(key); + logger.LogDebug("Flag {identifier} successfully deleted from cache", identifier); + } - this.cache.Delete(key); - logger.LogDebug("Flag {identifier} successfully deleted from cache", identifier); - if (this.callback != null) + finally { - this.callback.OnFlagDeleted(identifier); + rwLock.ExitWriteLock(); } + + this.callback?.OnFlagDeleted(identifier); } public void DeleteSegment(string identifier) { - string key = SegmentKey(identifier); - if (store != null) + rwLock.EnterWriteLock(); + try { - logger.LogDebug("Segment {identifier} successfully deleted from store", identifier); - store.Delete(key); + string key = SegmentKey(identifier); + if (store != null) + { + logger.LogDebug("Segment {identifier} successfully deleted from store", identifier); + store.Delete(key); + } + + this.cache.Delete(key); + logger.LogDebug("Segment {identifier} successfully deleted from cache", identifier); } - this.cache.Delete(key); - logger.LogDebug("Segment {identifier} successfully deleted from cache", identifier); - if (this.callback != null) + finally { - this.callback.OnSegmentDeleted(identifier); + rwLock.ExitWriteLock(); } + + this.callback?.OnSegmentDeleted(identifier); } private T GetCache(string key, bool updateCache) { @@ -129,6 +157,7 @@ private T GetCache(string key, bool updateCache) } return (T)item; } + private FeatureConfig GetFlag( string identifer, bool updateCache) { string key = FlagKey(identifer); @@ -141,38 +170,80 @@ private Segment GetSegment(string identifer, bool updateCache) } void IRepository.SetFlag(string identifier, FeatureConfig featureConfig) { - FeatureConfig current = GetFlag(identifier, false); - // Update stored value in case if server returned newer version, - // or if version is equal 0 (or doesn't exist) - if( current != null && featureConfig.Version != 0 && current.Version >= featureConfig.Version ) + rwLock.EnterWriteLock(); + try { - logger.LogTrace("Flag {identifier} already exists", identifier); - return; - } - - Update(identifier, FlagKey(identifier), featureConfig); + FeatureConfig current = GetFlag(identifier, false); + // Update stored value in case if server returned newer version, + // or if version is equal 0 (or doesn't exist) + if (current != null && featureConfig.Version != 0 && current.Version >= featureConfig.Version) + { + logger.LogTrace("Flag {identifier} already exists", identifier); + return; + } - if (this.callback != null) + Update(identifier, FlagKey(identifier), featureConfig); + } + finally { - this.callback.OnFlagStored(identifier); + rwLock.ExitWriteLock(); } + + this.callback?.OnFlagStored(identifier); } void IRepository.SetSegment(string identifier, Segment segment) { - Segment current = GetSegment(identifier, false); - // Update stored value in case if server returned newer version, - // or if version is equal 0 (or doesn't exist) - if (current != null && segment.Version != 0 && current.Version >= segment.Version) + rwLock.EnterWriteLock(); + try { - logger.LogTrace("Segment {identifier} already exists", identifier); - return; + Segment current = GetSegment(identifier, false); + // Update stored value in case if server returned newer version, + // or if version is equal 0 (or doesn't exist) + if (current != null && segment.Version != 0 && current.Version >= segment.Version) + { + logger.LogTrace("Segment {identifier} already exists", identifier); + return; + } + + Update(identifier, SegmentKey(identifier), segment); + } + finally + { + rwLock.ExitWriteLock(); } - Update(identifier, SegmentKey(identifier), segment); + this.callback?.OnSegmentStored(identifier); + } + + public void SetFlags(IEnumerable flags) + { + rwLock.EnterWriteLock(); + try + { + foreach (var item in flags) + { + Update(item.Feature, FlagKey(item.Feature), item); + } + } + finally + { + rwLock.ExitWriteLock(); + } + } - if (this.callback != null) + public void SetSegments(IEnumerable segments) + { + rwLock.EnterWriteLock(); + try + { + foreach (var item in segments) + { + Update(item.Identifier, SegmentKey(item.Identifier), item); + } + } + finally { - this.callback.OnSegmentStored(identifier); + rwLock.ExitWriteLock(); } } @@ -193,10 +264,7 @@ private void Update(string identifier, string key, Object value) public void Close() { - if(this.store != null) - { - this.store.Close(); - } + this.store?.Close(); } } } diff --git a/client/connector/EventSource.cs b/client/connector/EventSource.cs index 4a93aa4..049f4a9 100644 --- a/client/connector/EventSource.cs +++ b/client/connector/EventSource.cs @@ -110,6 +110,7 @@ private async Task StartStreaming() } finally { + Thread.Sleep(TimeSpan.FromSeconds(10)); callback.OnStreamDisconnected(); } diff --git a/ff-netF48-server-sdk.csproj b/ff-netF48-server-sdk.csproj index f95dc3a..e1698b6 100644 --- a/ff-netF48-server-sdk.csproj +++ b/ff-netF48-server-sdk.csproj @@ -8,10 +8,10 @@ ff-dotnet-server-sdk io.harness.cfsdk false - 1.6.4 + 1.6.6 true - 1.6.5 - 1.6.5 + 1.6.6 + 1.6.6 support@harness.io Copyright © 2024 https://harness.io/icon-ff.svg diff --git a/tests/ff-server-sdk-test/EvaluatorTest.cs b/tests/ff-server-sdk-test/EvaluatorTest.cs index de3efa2..3fec680 100644 --- a/tests/ff-server-sdk-test/EvaluatorTest.cs +++ b/tests/ff-server-sdk-test/EvaluatorTest.cs @@ -56,7 +56,7 @@ static EvaluatorTest() var listener = new EvaluatorListener(); cache = new FeatureSegmentCache(); repository = new StorageRepository(cache, null, null, loggerFactory); - evaluator = new Evaluator(repository, listener, loggerFactory, true); + evaluator = new Evaluator(repository, listener, loggerFactory, true, null, null); } private static void LoadSegments(List segments) diff --git a/tests/ff-server-sdk-test/StorageRepositoryTest.cs b/tests/ff-server-sdk-test/StorageRepositoryTest.cs new file mode 100644 index 0000000..50664f2 --- /dev/null +++ b/tests/ff-server-sdk-test/StorageRepositoryTest.cs @@ -0,0 +1,91 @@ +using System.Collections.Generic; +using System.Linq; +using io.harness.cfsdk.client.api; +using io.harness.cfsdk.client.cache; +using io.harness.cfsdk.HarnessOpenAPIService; +using Microsoft.Extensions.Logging; +using NUnit.Framework; + +namespace ff_server_sdk_test +{ + public class StorageRepositoryTest + { + [Test] + public void TestRepository() + { + var cache = new FeatureSegmentCache(); + IStore store = null; + IRepositoryCallback callback = null; + + var factory = LoggerFactory.Create(builder => + { + builder + .AddFilter("Microsoft", LogLevel.Warning) + .AddFilter("System", LogLevel.Warning) + .AddConsole(); + }); + + IRepository repo = new StorageRepository(cache, store, callback, factory); + + // Flags that don't exist + + var result = repo.GetFlag("i_do_not_exist"); + Assert.IsNull(result); + var segmentResult = repo.GetSegment("i_do_not_exist2"); + Assert.IsNull(segmentResult); + + // Set/GetFlags + + var flag = new FeatureConfig() + { + Feature = "ident" + }; + repo.SetFlag("flag1", flag); + var getFlagResult = repo.GetFlag("flag1"); + Assert.IsNotNull(getFlagResult); + + var flag2 = new FeatureConfig() + { + Feature = "ident2" + }; + repo.SetFlags(new List() {flag2, flag2}); + + // Set/GetSegment + + var segment = new Segment() + { + Identifier = "segmentIdent" + }; + repo.SetSegment("segment1", segment); + repo.SetSegment("segment1", segment); + var getSegmentResult = repo.GetSegment("segment1"); + Assert.IsNotNull(getSegmentResult); + + var segment2 = new Segment() + { + Identifier = "ident2" + }; + repo.SetSegments(new List() {segment2, segment2}); + + // iteration + + var foundSegments = repo.FindFlagsBySegment("segment1"); + Assert.IsNotNull(foundSegments); + //Assert.IsTrue(foundSegments.Count() == 1); + + // Deletes + + repo.DeleteFlag("flag1"); + getFlagResult = repo.GetFlag("flag1"); + Assert.IsNull(getFlagResult); + + repo.DeleteSegment("segment1"); + getSegmentResult = repo.GetSegment("segment1"); + Assert.IsNull(getSegmentResult); + + + + repo.Close(); + } + } +} \ No newline at end of file diff --git a/tests/ff-server-sdk-test/api/CfClientTest.cs b/tests/ff-server-sdk-test/api/CfClientTest.cs index 57029cf..09aeb2e 100644 --- a/tests/ff-server-sdk-test/api/CfClientTest.cs +++ b/tests/ff-server-sdk-test/api/CfClientTest.cs @@ -8,6 +8,7 @@ using WireMock.Server; using WireMock.Settings; using NUnit.Framework; +using WireMock; using WireMock.Logging; @@ -90,6 +91,72 @@ public void StopMockServer() Assert.That(result, Is.EqualTo("on"), "did not get correct flag state"); } + + [Test] + public void ShouldPopulateCacheIfStreamFails() + { + server + .Given(Request.Create().WithPath("/api/1.0/client/auth").UsingPost()) + .RespondWith(MakeAuthResponse()); + + server + .Given(Request.Create() + .WithPath("/api/1.0/stream").UsingGet()) + .RespondWith(Response.Create() + .WithStatusCode(500) + .WithFault(FaultType.MALFORMED_RESPONSE_CHUNK) + .WithBody("{malformed stream}}}}}")); + + server + .Given(Request.Create() + .WithPath("/api/1.0/client/env/00000000-0000-0000-0000-000000000000/feature-configs").UsingGet()) + .RespondWith(Response.Create() + .WithStatusCode(200) + .WithBody(MakeFeatureConfigBodyWithVariationToTargetMapSetToNull())); + + server + .Given(Request.Create() + .WithPath("/api/1.0/client/env/00000000-0000-0000-0000-000000000000/target-segments").UsingGet()) + .RespondWith(Response.Create() + .WithStatusCode(200) + .WithBody(MakeTargetSegmentsBody())); + + var target = + Target.builder() + .Name("CfClientTest") + .Attributes(new Dictionary { { "attr", "val" } }) + .Identifier("CfClientTest") + .build(); + + Console.WriteLine("Running at " + server.Url); + + var client = new CfClient("dummy api key", Config.Builder() + .debug(true) + .SetStreamEnabled(true) + .SetAnalyticsEnabled(false) + .ConfigUrl(server.Url + "/api/1.0") + .Build()); + + CountdownEvent initLatch = new CountdownEvent(1); + + client.InitializationCompleted += (sender, e) => + { + Console.WriteLine("Initialization Completed"); + initLatch.Signal(); + }; + + var success = client.WaitForInitialization(10_000); + Assert.IsTrue(success, "timeout while waiting for WaitForInitialization()"); + + Thread.Sleep(2000); + + var ok = initLatch.Wait(TimeSpan.FromMinutes(2)); + Assert.That(ok, Is.True, "failed to init in time"); + + var result = client.stringVariation("FeatureWithVariationToTargetMapSetAsNull", target, "failed"); + Assert.That(result, Is.EqualTo("on"), "did not get correct flag state"); + } + [Test] public void ShouldNotThrowErrorIfTargetToVariationMapNotPopulated() {