diff --git a/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs index da263c47..082d67c5 100644 --- a/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs +++ b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs @@ -740,6 +740,71 @@ public void VerifyHitsWithForegroundScheduler() VerifyHits(iterations: iterations + dropped, minSamples: iterations); } + [Fact] + public void VerifyMisses() + { + cache = new ConcurrentLfu(1, 20, new BackgroundThreadScheduler(), EqualityComparer.Default, + new LfuBufferSize(new StripedBufferSize(1, 1), new StripedBufferSize(1, 1))); + + int iterations = 100000; + Func func = x => x; + + var start = Stopwatch.GetTimestamp(); + + for (int i = 0; i < iterations; i++) + { + cache.GetOrAdd(i, func); + } + + var end = Stopwatch.GetTimestamp(); + + cache.PendingMaintenance(); + + var totalTicks = end - start; + var timeMs = ((double)totalTicks / Stopwatch.Frequency) * 1000.0; + var timeNs = timeMs / 1000000; + + var timePerOp = timeMs / (double)iterations; + var samplePercent = this.cache.Metrics.Value.Misses / (double)iterations * 100; + + this.output.WriteLine($"Elapsed {timeMs}ms - {timeNs}ns/op"); + this.output.WriteLine($"Cache misses {this.cache.Metrics.Value.Misses} (sampled {samplePercent}%)"); + this.output.WriteLine($"Maintenance ops {this.cache.Scheduler.RunCount}"); + + cache.Metrics.Value.Misses.Should().Be(iterations); + } + + [Fact] + public async Task ThreadedVerifyMisses() + { + // buffer size is 1, this will cause dropped writes on some threads where the buffer is full + cache = new ConcurrentLfu(1, 20, new NullScheduler(), EqualityComparer.Default, + new LfuBufferSize(new StripedBufferSize(1, 1), new StripedBufferSize(1, 1))); + + int threads = 4; + int iterations = 100000; + + await Threaded.Run(threads, i => + { + Func func = x => x; + + int start = i * iterations; + + for (int j = start; j < start + iterations; j++) + { + cache.GetOrAdd(j, func); + } + }); + + var samplePercent = this.cache.Metrics.Value.Misses / (double)iterations / threads * 100; + + this.output.WriteLine($"Cache misses {this.cache.Metrics.Value.Misses} (sampled {samplePercent}%)"); + this.output.WriteLine($"Maintenance ops {this.cache.Scheduler.RunCount}"); + + cache.Metrics.Value.Misses.Should().Be(iterations * threads); + cache.Count.Should().BeCloseTo(20, 1); + } + private void VerifyHits(int iterations, int minSamples) { Func func = x => x; diff --git a/BitFaster.Caching.UnitTests/Threaded.cs b/BitFaster.Caching.UnitTests/Threaded.cs index 04cc97de..50e3f10b 100644 --- a/BitFaster.Caching.UnitTests/Threaded.cs +++ b/BitFaster.Caching.UnitTests/Threaded.cs @@ -9,17 +9,23 @@ namespace BitFaster.Caching.UnitTests { public class Threaded { - public static async Task Run(int threadCount, Action action) + public static Task Run(int threadCount, Action action) + { + return Run(threadCount, i => action()); + } + + public static async Task Run(int threadCount, Action action) { var tasks = new Task[threadCount]; ManualResetEvent mre = new ManualResetEvent(false); for (int i = 0; i < threadCount; i++) { + int run = i; tasks[i] = Task.Run(() => { mre.WaitOne(); - action(); + action(run); }); } diff --git a/BitFaster.Caching/Lfu/ConcurrentLfu.cs b/BitFaster.Caching/Lfu/ConcurrentLfu.cs index 187bcac6..058bdd8e 100644 --- a/BitFaster.Caching/Lfu/ConcurrentLfu.cs +++ b/BitFaster.Caching/Lfu/ConcurrentLfu.cs @@ -34,7 +34,7 @@ namespace BitFaster.Caching.Lfu [DebuggerDisplay("Count = {Count}/{Capacity}")] public sealed class ConcurrentLfu : ICache, IAsyncCache, IBoundedPolicy { - private const int MaxWriteBufferRetries = 16; + private const int MaxWriteBufferRetries = 64; private readonly ConcurrentDictionary> dictionary; @@ -308,8 +308,6 @@ private static void TakeCandidatesInLruOrder(LfuNodeList lru, List node) { - var spinner = new SpinWait(); - for (int i = 0; i < MaxWriteBufferRetries; i++) { if (writeBuffer.TryAdd(node) == BufferStatus.Success) @@ -319,12 +317,23 @@ private void AfterWrite(LfuNode node) } TryScheduleDrain(); - - spinner.SpinOnce(); } lock (this.maintenanceLock) { + // aggressively try to exit the lock early before doing full maintenance + var status = BufferStatus.Contended; + while (status != BufferStatus.Full) + { + status = writeBuffer.TryAdd(node); + + if (status == BufferStatus.Success) + { + ScheduleAfterWrite(); + return; + } + } + // if the write was dropped from the buffer, explicitly pass it to maintenance Maintenance(node); } @@ -332,6 +341,7 @@ private void AfterWrite(LfuNode node) private void ScheduleAfterWrite() { + var spinner = new SpinWait(); while (true) { switch (this.drainStatus.Status()) @@ -352,6 +362,7 @@ private void ScheduleAfterWrite() case DrainStatus.ProcessingToRequired: return; } + spinner.SpinOnce(); } } @@ -424,29 +435,32 @@ private bool Maintenance(LfuNode droppedWrite = null) var localDrainBuffer = RentDrainBuffer(); // extract to a buffer before doing book keeping work, ~2x faster - var count = readBuffer.DrainTo(localDrainBuffer); + int readCount = readBuffer.DrainTo(localDrainBuffer); - for (int i = 0; i < count; i++) + for (int i = 0; i < readCount; i++) { this.cmSketch.Increment(localDrainBuffer[i].Key); } - for (int i = 0; i < count; i++) + for (int i = 0; i < readCount; i++) { OnAccess(localDrainBuffer[i]); } - var wasDrained = count == 0; - count = this.writeBuffer.DrainTo(localDrainBuffer); + int writeCount = this.writeBuffer.DrainTo(localDrainBuffer); - for (int i = 0; i < count; i++) + for (int i = 0; i < writeCount; i++) { OnWrite(localDrainBuffer[i]); } + // we are done only when both buffers are empty + var done = readCount == 0 & writeCount == 0; + if (droppedWrite != null) { OnWrite(droppedWrite); + done = true; } ReturnDrainBuffer(localDrainBuffer); @@ -458,14 +472,14 @@ private bool Maintenance(LfuNode droppedWrite = null) // Reset to idle if either // 1. We drained both input buffers (all work done) // 2. or scheduler is foreground (since don't run continuously on the foreground) - if ((wasDrained || !scheduler.IsBackground) && + if ((done || !scheduler.IsBackground) && (this.drainStatus.Status() != DrainStatus.ProcessingToIdle || !this.drainStatus.Cas(DrainStatus.ProcessingToIdle, DrainStatus.Idle))) { this.drainStatus.Set(DrainStatus.Required); } - return wasDrained; + return done; } private void OnAccess(LfuNode node) @@ -595,7 +609,7 @@ private void EvictFromMain(LfuNode candidate) while (this.windowLru.Count + this.probationLru.Count + this.protectedLru.Count > this.Capacity) { // bail when we run out of options - if (candidate == null || victim == null || victim == candidate) + if (candidate == null | victim == null | victim == candidate) { break; }