Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,71 @@ public void VerifyHitsWithForegroundScheduler()
VerifyHits(iterations: iterations + dropped, minSamples: iterations);
}

[Fact]
public void VerifyMisses()
{
cache = new ConcurrentLfu<int, int>(1, 20, new BackgroundThreadScheduler(), EqualityComparer<int>.Default,
new LfuBufferSize(new StripedBufferSize(1, 1), new StripedBufferSize(1, 1)));

int iterations = 100000;
Func<int, int> func = x => x;

var start = Stopwatch.GetTimestamp();

for (int i = 0; i < iterations; i++)
{
cache.GetOrAdd(i, func);
}

var end = Stopwatch.GetTimestamp();

cache.PendingMaintenance();

var totalTicks = end - start;
var timeMs = ((double)totalTicks / Stopwatch.Frequency) * 1000.0;
var timeNs = timeMs / 1000000;

var timePerOp = timeMs / (double)iterations;
var samplePercent = this.cache.Metrics.Value.Misses / (double)iterations * 100;

this.output.WriteLine($"Elapsed {timeMs}ms - {timeNs}ns/op");
this.output.WriteLine($"Cache misses {this.cache.Metrics.Value.Misses} (sampled {samplePercent}%)");
this.output.WriteLine($"Maintenance ops {this.cache.Scheduler.RunCount}");

cache.Metrics.Value.Misses.Should().Be(iterations);
}

[Fact]
public async Task ThreadedVerifyMisses()
{
// buffer size is 1, this will cause dropped writes on some threads where the buffer is full
cache = new ConcurrentLfu<int, int>(1, 20, new NullScheduler(), EqualityComparer<int>.Default,
new LfuBufferSize(new StripedBufferSize(1, 1), new StripedBufferSize(1, 1)));

int threads = 4;
int iterations = 100000;

await Threaded.Run(threads, i =>
{
Func<int, int> func = x => x;

int start = i * iterations;

for (int j = start; j < start + iterations; j++)
{
cache.GetOrAdd(j, func);
}
});

var samplePercent = this.cache.Metrics.Value.Misses / (double)iterations / threads * 100;

this.output.WriteLine($"Cache misses {this.cache.Metrics.Value.Misses} (sampled {samplePercent}%)");
this.output.WriteLine($"Maintenance ops {this.cache.Scheduler.RunCount}");

cache.Metrics.Value.Misses.Should().Be(iterations * threads);
cache.Count.Should().BeCloseTo(20, 1);
}

private void VerifyHits(int iterations, int minSamples)
{
Func<int, int> func = x => x;
Expand Down
10 changes: 8 additions & 2 deletions BitFaster.Caching.UnitTests/Threaded.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@ namespace BitFaster.Caching.UnitTests
{
public class Threaded
{
public static async Task Run(int threadCount, Action action)
public static Task Run(int threadCount, Action action)
{
return Run(threadCount, i => action());
}

public static async Task Run(int threadCount, Action<int> action)
{
var tasks = new Task[threadCount];
ManualResetEvent mre = new ManualResetEvent(false);

for (int i = 0; i < threadCount; i++)
{
int run = i;
tasks[i] = Task.Run(() =>
{
mre.WaitOne();
action();
action(run);
});
}

Expand Down
42 changes: 28 additions & 14 deletions BitFaster.Caching/Lfu/ConcurrentLfu.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace BitFaster.Caching.Lfu
[DebuggerDisplay("Count = {Count}/{Capacity}")]
public sealed class ConcurrentLfu<K, V> : ICache<K, V>, IAsyncCache<K, V>, IBoundedPolicy
{
private const int MaxWriteBufferRetries = 16;
private const int MaxWriteBufferRetries = 64;

private readonly ConcurrentDictionary<K, LfuNode<K, V>> dictionary;

Expand Down Expand Up @@ -308,8 +308,6 @@ private static void TakeCandidatesInLruOrder(LfuNodeList<K, V> lru, List<LfuNode

private void AfterWrite(LfuNode<K, V> node)
{
var spinner = new SpinWait();

for (int i = 0; i < MaxWriteBufferRetries; i++)
{
if (writeBuffer.TryAdd(node) == BufferStatus.Success)
Expand All @@ -319,19 +317,31 @@ private void AfterWrite(LfuNode<K, V> node)
}

TryScheduleDrain();

spinner.SpinOnce();
}

lock (this.maintenanceLock)
{
// aggressively try to exit the lock early before doing full maintenance
var status = BufferStatus.Contended;
while (status != BufferStatus.Full)
{
status = writeBuffer.TryAdd(node);

if (status == BufferStatus.Success)
{
ScheduleAfterWrite();
return;
}
}

// if the write was dropped from the buffer, explicitly pass it to maintenance
Maintenance(node);
}
}

private void ScheduleAfterWrite()
{
var spinner = new SpinWait();
while (true)
{
switch (this.drainStatus.Status())
Expand All @@ -352,6 +362,7 @@ private void ScheduleAfterWrite()
case DrainStatus.ProcessingToRequired:
return;
}
spinner.SpinOnce();
}
}

Expand Down Expand Up @@ -424,29 +435,32 @@ private bool Maintenance(LfuNode<K, V> droppedWrite = null)
var localDrainBuffer = RentDrainBuffer();

// extract to a buffer before doing book keeping work, ~2x faster
var count = readBuffer.DrainTo(localDrainBuffer);
int readCount = readBuffer.DrainTo(localDrainBuffer);

for (int i = 0; i < count; i++)
for (int i = 0; i < readCount; i++)
{
this.cmSketch.Increment(localDrainBuffer[i].Key);
}

for (int i = 0; i < count; i++)
for (int i = 0; i < readCount; i++)
{
OnAccess(localDrainBuffer[i]);
}

var wasDrained = count == 0;
count = this.writeBuffer.DrainTo(localDrainBuffer);
int writeCount = this.writeBuffer.DrainTo(localDrainBuffer);

for (int i = 0; i < count; i++)
for (int i = 0; i < writeCount; i++)
{
OnWrite(localDrainBuffer[i]);
}

// we are done only when both buffers are empty
var done = readCount == 0 & writeCount == 0;

if (droppedWrite != null)
{
OnWrite(droppedWrite);
done = true;
}

ReturnDrainBuffer(localDrainBuffer);
Expand All @@ -458,14 +472,14 @@ private bool Maintenance(LfuNode<K, V> droppedWrite = null)
// Reset to idle if either
// 1. We drained both input buffers (all work done)
// 2. or scheduler is foreground (since don't run continuously on the foreground)
if ((wasDrained || !scheduler.IsBackground) &&
if ((done || !scheduler.IsBackground) &&
(this.drainStatus.Status() != DrainStatus.ProcessingToIdle ||
!this.drainStatus.Cas(DrainStatus.ProcessingToIdle, DrainStatus.Idle)))
{
this.drainStatus.Set(DrainStatus.Required);
}

return wasDrained;
return done;
}

private void OnAccess(LfuNode<K, V> node)
Expand Down Expand Up @@ -595,7 +609,7 @@ private void EvictFromMain(LfuNode<K, V> candidate)
while (this.windowLru.Count + this.probationLru.Count + this.protectedLru.Count > this.Capacity)
{
// bail when we run out of options
if (candidate == null || victim == null || victim == candidate)
if (candidate == null | victim == null | victim == candidate)
{
break;
}
Expand Down