Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public void TestBufferConfiguraiton()
{
ICache<string, int> lfu = new ConcurrentLfuBuilder<string, int>()
.WithBufferConfiguration(new LfuBufferSize(
new StripedBufferSize(128, 2),
new StripedBufferSize(128, 2)
))
.Build();
Expand Down
25 changes: 8 additions & 17 deletions BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,7 @@ public void WhenItemIsEvictedItIsDisposed()
LogLru();

dcache.Count.Should().Be(20);

for (int i = 0; i < 5; i++)
{
disposables[i].IsDisposed.Should().BeTrue();
}

disposables[5].IsDisposed.Should().BeFalse();
disposables.Count(d => d.IsDisposed).Should().Be(5);
}

// protected 15
Expand Down Expand Up @@ -417,23 +411,20 @@ public void WhenWriteBufferIsFullAddDoesMaintenance()
var bufferSize = LfuBufferSize.DefaultBufferSize;
var scheduler = new TestScheduler();

var bufferConfig = new LfuBufferSize(new StripedBufferSize(bufferSize, 1), new StripedBufferSize(bufferSize, 1));
var bufferConfig = new LfuBufferSize(new StripedBufferSize(bufferSize, 1));
cache = new ConcurrentLfu<int, int>(1, bufferSize * 2, scheduler, EqualityComparer<int>.Default, bufferConfig);

// add an item, flush write buffer
cache.GetOrAdd(-1, k => k);
scheduler.RunCount.Should().Be(1);
cache.PendingMaintenance();

// remove the item but don't flush, it is now in the write buffer and maintenance is scheduled
cache.TryRemove(-1).Should().BeTrue();
scheduler.RunCount.Should().Be(2);

// add buffer size items, last iteration will invoke maintenance on the foreground since write
// buffer is full and test scheduler did not do any work
for (int i = 0; i < bufferSize; i++)
{
scheduler.RunCount.Should().Be(2);
cache.GetOrAdd(i, k => k);
}

Expand All @@ -445,10 +436,11 @@ public void WhenWriteBufferIsFullAddDoesMaintenance()
[Fact]
public void WhenWriteBufferIsFullUpdatesAreDropped()
{
var bufferSize = LfuBufferSize.DefaultBufferSize;
int capacity = 20;
var bufferSize = Math.Min(BitOps.CeilingPowerOfTwo(capacity), 128);
var scheduler = new TestScheduler();
var bufferConfig = new LfuBufferSize(new StripedBufferSize(bufferSize, 1), new StripedBufferSize(bufferSize, 1));
cache = new ConcurrentLfu<int, int>(1, 20, scheduler, EqualityComparer<int>.Default, bufferConfig);
var bufferConfig = new LfuBufferSize(new StripedBufferSize(bufferSize, 1));
cache = new ConcurrentLfu<int, int>(1, capacity, scheduler, EqualityComparer<int>.Default, bufferConfig);

cache.GetOrAdd(1, k => k);
scheduler.RunCount.Should().Be(1);
Expand Down Expand Up @@ -744,7 +736,7 @@ public void VerifyHitsWithForegroundScheduler()
public void VerifyMisses()
{
cache = new ConcurrentLfu<int, int>(1, 20, new BackgroundThreadScheduler(), EqualityComparer<int>.Default,
new LfuBufferSize(new StripedBufferSize(1, 1), new StripedBufferSize(1, 1)));
new LfuBufferSize(new StripedBufferSize(1, 1)));

int iterations = 100000;
Func<int, int> func = x => x;
Expand Down Expand Up @@ -779,7 +771,7 @@ public async Task ThreadedVerifyMisses()
{
// buffer size is 1, this will cause dropped writes on some threads where the buffer is full
cache = new ConcurrentLfu<int, int>(1, 20, new NullScheduler(), EqualityComparer<int>.Default,
new LfuBufferSize(new StripedBufferSize(1, 1), new StripedBufferSize(1, 1)));
new LfuBufferSize(new StripedBufferSize(1, 1)));

int threads = 4;
int iterations = 100000;
Expand All @@ -802,7 +794,6 @@ await Threaded.Run(threads, i =>
this.output.WriteLine($"Maintenance ops {this.cache.Scheduler.RunCount}");

cache.Metrics.Value.Misses.Should().Be(iterations * threads);
cache.Count.Should().BeCloseTo(20, 1);
}

private void VerifyHits(int iterations, int minSamples)
Expand Down
36 changes: 13 additions & 23 deletions BitFaster.Caching.UnitTests/Lfu/LfuBufferSizeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,24 @@ public class LfuBufferSizeTests
[Fact]
public void WhenReadBufferIsNullThrows()
{
Action constructor = () => { var x = new LfuBufferSize(null, new StripedBufferSize(1, 1)); };

constructor.Should().Throw<ArgumentNullException>();
}

[Fact]
public void WhenWriteBufferIsNullThrows()
{
Action constructor = () => { var x = new LfuBufferSize(new StripedBufferSize(1, 1), null); };
Action constructor = () => { var x = new LfuBufferSize(null); };

constructor.Should().Throw<ArgumentNullException>();
}

[SkippableTheory]
[InlineData(1, 3, 1, 32, 1, 16)]
[InlineData(1, 14, 1, 128, 1, 16)]
[InlineData(1, 50, 1, 128, 1, 64)]
[InlineData(1, 100, 1, 128, 1, 128)]
[InlineData(4, 100, 4, 128, 4, 32)]
[InlineData(16, 100, 8, 128, 8, 16)] // fails win
[InlineData(64, 100, 8, 128, 8, 16)] // fails win
[InlineData(1, 1000, 1, 128, 1, 128)]
[InlineData(4, 1000, 4, 128, 4, 128)]
[InlineData(32, 1000, 32, 128, 32, 32)] // fails win + fails mac
[InlineData(256, 100000, 32, 128, 32, 32)] // fails win + fails mac
public void CalculateDefaultBufferSize(int concurrencyLevel, int capacity, int expectedReadStripes, int expectedReadBuffer, int expecteWriteStripes, int expecteWriteBuffer)
[InlineData(1, 3, 1, 32)]
[InlineData(1, 14, 1, 128)]
[InlineData(1, 50, 1, 128)]
[InlineData(1, 100, 1, 128)]
[InlineData(4, 100, 4, 128)]
[InlineData(16, 100, 8, 128)] // fails win
[InlineData(64, 100, 8, 128)] // fails win
[InlineData(1, 1000, 1, 128)]
[InlineData(4, 1000, 4, 128)]
[InlineData(32, 1000, 32, 128)] // fails win + fails mac
[InlineData(256, 100000, 32, 128)] // fails win + fails mac
public void CalculateDefaultBufferSize(int concurrencyLevel, int capacity, int expectedReadStripes, int expectedReadBuffer)
{
// Some of these tests depend on the CPU Core count - skip if run on a different config machine.
bool notExpectedCpuCount = Environment.ProcessorCount != 12;
Expand All @@ -48,8 +40,6 @@ public void CalculateDefaultBufferSize(int concurrencyLevel, int capacity, int e

bufferSize.Read.StripeCount.Should().Be(expectedReadStripes);
bufferSize.Read.BufferSize.Should().Be(expectedReadBuffer);
bufferSize.Write.StripeCount.Should().Be(expecteWriteStripes);
bufferSize.Write.BufferSize.Should().Be(expecteWriteBuffer);
}
}
}
11 changes: 7 additions & 4 deletions BitFaster.Caching/Lfu/ConcurrentLfu.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public sealed class ConcurrentLfu<K, V> : ICache<K, V>, IAsyncCache<K, V>, IBoun
private readonly ConcurrentDictionary<K, LfuNode<K, V>> dictionary;

private readonly StripedMpscBuffer<LfuNode<K, V>> readBuffer;
private readonly StripedMpscBuffer<LfuNode<K, V>> writeBuffer;
private readonly MpscBoundedBuffer<LfuNode<K, V>> writeBuffer;

private readonly CacheMetrics metrics = new CacheMetrics();

Expand Down Expand Up @@ -82,7 +82,10 @@ public ConcurrentLfu(int concurrencyLevel, int capacity, IScheduler scheduler, I
this.dictionary = new ConcurrentDictionary<K, LfuNode<K, V>>(concurrencyLevel, capacity, comparer);

this.readBuffer = new StripedMpscBuffer<LfuNode<K, V>>(bufferSize.Read);
this.writeBuffer = new StripedMpscBuffer<LfuNode<K, V>>(bufferSize.Write);

// Cap the write buffer to the cache size, or 128. Whichever is smaller.
int writeBufferSize = Math.Min(BitOps.CeilingPowerOfTwo(capacity), 128);
this.writeBuffer = new MpscBoundedBuffer<LfuNode<K, V>>(writeBufferSize);

this.cmSketch = new CmSketch<K>(1, comparer);
this.cmSketch.EnsureCapacity(capacity);
Expand Down Expand Up @@ -447,7 +450,7 @@ private bool Maintenance(LfuNode<K, V> droppedWrite = null)
OnAccess(localDrainBuffer[i]);
}

int writeCount = this.writeBuffer.DrainTo(localDrainBuffer);
int writeCount = this.writeBuffer.DrainTo(new ArraySegment<LfuNode<K, V>>(localDrainBuffer));

for (int i = 0; i < writeCount; i++)
{
Expand Down Expand Up @@ -819,7 +822,7 @@ public LfuDebugView(ConcurrentLfu<K, V> lfu)

public StripedMpscBuffer<LfuNode<K, V>> ReadBuffer => this.lfu.readBuffer;

public StripedMpscBuffer<LfuNode<K, V>> WriteBuffer => this.lfu.writeBuffer;
public MpscBoundedBuffer<LfuNode<K, V>> WriteBuffer => this.lfu.writeBuffer;

public KeyValuePair<K, V>[] Items
{
Expand Down
22 changes: 3 additions & 19 deletions BitFaster.Caching/Lfu/LfuBufferSize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,20 @@ public class LfuBufferSize
/// </summary>
public const int DefaultBufferSize = 128;

private const int MaxWriteBufferTotalSize = 1024;

/// <summary>
/// Initializes a new instance of the LfuBufferSize class with the specified read and write buffer sizes.
/// </summary>
/// <param name="readBufferSize">The read buffer size.</param>
/// <param name="writeBufferSize">The write buffer size.</param>
public LfuBufferSize(StripedBufferSize readBufferSize, StripedBufferSize writeBufferSize)
public LfuBufferSize(StripedBufferSize readBufferSize)
{
Read = readBufferSize ?? throw new ArgumentNullException(nameof(readBufferSize));
Write = writeBufferSize ?? throw new ArgumentNullException(nameof(writeBufferSize));
}

/// <summary>
/// Gets the read buffer size.
/// </summary>
public StripedBufferSize Read { get; }

/// <summary>
/// Gets the write buffer size.
/// </summary>
public StripedBufferSize Write { get; }

/// <summary>
/// Estimates default buffer sizes intended to give optimal throughput.
/// </summary>
Expand All @@ -48,8 +39,7 @@ public static LfuBufferSize Default(int concurrencyLevel, int capacity)
if (capacity < 13)
{
return new LfuBufferSize(
new StripedBufferSize(32, 1),
new StripedBufferSize(16, 1));
new StripedBufferSize(32, 1));
}

// cap concurrency at proc count * 2
Expand All @@ -61,14 +51,8 @@ public static LfuBufferSize Default(int concurrencyLevel, int capacity)
concurrencyLevel /= 2;
}

// Constrain write buffer size so that the LFU dictionary will not ever end up with more than 2x cache
// capacity entries before maintenance runs.
int writeBufferTotalSize = Math.Min(BitOps.CeilingPowerOfTwo(capacity), MaxWriteBufferTotalSize);
int writeStripeSize = Math.Min(BitOps.CeilingPowerOfTwo(Math.Max(writeBufferTotalSize / concurrencyLevel, 4)), 128);

return new LfuBufferSize(
new StripedBufferSize(DefaultBufferSize, concurrencyLevel),
new StripedBufferSize(writeStripeSize, concurrencyLevel));
new StripedBufferSize(DefaultBufferSize, concurrencyLevel));
}
}
}