diff --git a/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuBuilderTests.cs b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuBuilderTests.cs index 06b41c4b..ea5a8973 100644 --- a/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuBuilderTests.cs +++ b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuBuilderTests.cs @@ -58,7 +58,6 @@ public void TestBufferConfiguraiton() { ICache lfu = new ConcurrentLfuBuilder() .WithBufferConfiguration(new LfuBufferSize( - new StripedBufferSize(128, 2), new StripedBufferSize(128, 2) )) .Build(); diff --git a/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs index 49504d53..f2822db7 100644 --- a/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs +++ b/BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs @@ -88,13 +88,7 @@ public void WhenItemIsEvictedItIsDisposed() LogLru(); dcache.Count.Should().Be(20); - - for (int i = 0; i < 5; i++) - { - disposables[i].IsDisposed.Should().BeTrue(); - } - - disposables[5].IsDisposed.Should().BeFalse(); + disposables.Count(d => d.IsDisposed).Should().Be(5); } // protected 15 @@ -417,23 +411,20 @@ public void WhenWriteBufferIsFullAddDoesMaintenance() var bufferSize = LfuBufferSize.DefaultBufferSize; var scheduler = new TestScheduler(); - var bufferConfig = new LfuBufferSize(new StripedBufferSize(bufferSize, 1), new StripedBufferSize(bufferSize, 1)); + var bufferConfig = new LfuBufferSize(new StripedBufferSize(bufferSize, 1)); cache = new ConcurrentLfu(1, bufferSize * 2, scheduler, EqualityComparer.Default, bufferConfig); // add an item, flush write buffer cache.GetOrAdd(-1, k => k); - scheduler.RunCount.Should().Be(1); cache.PendingMaintenance(); // remove the item but don't flush, it is now in the write buffer and maintenance is scheduled cache.TryRemove(-1).Should().BeTrue(); - scheduler.RunCount.Should().Be(2); // add buffer size items, last iteration will invoke maintenance on the foreground since write // buffer is full and test scheduler did not do any work for (int i = 0; i < bufferSize; i++) { - scheduler.RunCount.Should().Be(2); cache.GetOrAdd(i, k => k); } @@ -445,10 +436,11 @@ public void WhenWriteBufferIsFullAddDoesMaintenance() [Fact] public void WhenWriteBufferIsFullUpdatesAreDropped() { - var bufferSize = LfuBufferSize.DefaultBufferSize; + int capacity = 20; + var bufferSize = Math.Min(BitOps.CeilingPowerOfTwo(capacity), 128); var scheduler = new TestScheduler(); - var bufferConfig = new LfuBufferSize(new StripedBufferSize(bufferSize, 1), new StripedBufferSize(bufferSize, 1)); - cache = new ConcurrentLfu(1, 20, scheduler, EqualityComparer.Default, bufferConfig); + var bufferConfig = new LfuBufferSize(new StripedBufferSize(bufferSize, 1)); + cache = new ConcurrentLfu(1, capacity, scheduler, EqualityComparer.Default, bufferConfig); cache.GetOrAdd(1, k => k); scheduler.RunCount.Should().Be(1); @@ -744,7 +736,7 @@ public void VerifyHitsWithForegroundScheduler() public void VerifyMisses() { cache = new ConcurrentLfu(1, 20, new BackgroundThreadScheduler(), EqualityComparer.Default, - new LfuBufferSize(new StripedBufferSize(1, 1), new StripedBufferSize(1, 1))); + new LfuBufferSize(new StripedBufferSize(1, 1))); int iterations = 100000; Func func = x => x; @@ -779,7 +771,7 @@ public async Task ThreadedVerifyMisses() { // buffer size is 1, this will cause dropped writes on some threads where the buffer is full cache = new ConcurrentLfu(1, 20, new NullScheduler(), EqualityComparer.Default, - new LfuBufferSize(new StripedBufferSize(1, 1), new StripedBufferSize(1, 1))); + new LfuBufferSize(new StripedBufferSize(1, 1))); int threads = 4; int iterations = 100000; @@ -802,7 +794,6 @@ await Threaded.Run(threads, i => this.output.WriteLine($"Maintenance ops {this.cache.Scheduler.RunCount}"); cache.Metrics.Value.Misses.Should().Be(iterations * threads); - cache.Count.Should().BeCloseTo(20, 1); } private void VerifyHits(int iterations, int minSamples) diff --git a/BitFaster.Caching.UnitTests/Lfu/LfuBufferSizeTests.cs b/BitFaster.Caching.UnitTests/Lfu/LfuBufferSizeTests.cs index 8cbec085..ec3936b6 100644 --- a/BitFaster.Caching.UnitTests/Lfu/LfuBufferSizeTests.cs +++ b/BitFaster.Caching.UnitTests/Lfu/LfuBufferSizeTests.cs @@ -11,32 +11,24 @@ public class LfuBufferSizeTests [Fact] public void WhenReadBufferIsNullThrows() { - Action constructor = () => { var x = new LfuBufferSize(null, new StripedBufferSize(1, 1)); }; - - constructor.Should().Throw(); - } - - [Fact] - public void WhenWriteBufferIsNullThrows() - { - Action constructor = () => { var x = new LfuBufferSize(new StripedBufferSize(1, 1), null); }; + Action constructor = () => { var x = new LfuBufferSize(null); }; constructor.Should().Throw(); } [SkippableTheory] - [InlineData(1, 3, 1, 32, 1, 16)] - [InlineData(1, 14, 1, 128, 1, 16)] - [InlineData(1, 50, 1, 128, 1, 64)] - [InlineData(1, 100, 1, 128, 1, 128)] - [InlineData(4, 100, 4, 128, 4, 32)] - [InlineData(16, 100, 8, 128, 8, 16)] // fails win - [InlineData(64, 100, 8, 128, 8, 16)] // fails win - [InlineData(1, 1000, 1, 128, 1, 128)] - [InlineData(4, 1000, 4, 128, 4, 128)] - [InlineData(32, 1000, 32, 128, 32, 32)] // fails win + fails mac - [InlineData(256, 100000, 32, 128, 32, 32)] // fails win + fails mac - public void CalculateDefaultBufferSize(int concurrencyLevel, int capacity, int expectedReadStripes, int expectedReadBuffer, int expecteWriteStripes, int expecteWriteBuffer) + [InlineData(1, 3, 1, 32)] + [InlineData(1, 14, 1, 128)] + [InlineData(1, 50, 1, 128)] + [InlineData(1, 100, 1, 128)] + [InlineData(4, 100, 4, 128)] + [InlineData(16, 100, 8, 128)] // fails win + [InlineData(64, 100, 8, 128)] // fails win + [InlineData(1, 1000, 1, 128)] + [InlineData(4, 1000, 4, 128)] + [InlineData(32, 1000, 32, 128)] // fails win + fails mac + [InlineData(256, 100000, 32, 128)] // fails win + fails mac + public void CalculateDefaultBufferSize(int concurrencyLevel, int capacity, int expectedReadStripes, int expectedReadBuffer) { // Some of these tests depend on the CPU Core count - skip if run on a different config machine. bool notExpectedCpuCount = Environment.ProcessorCount != 12; @@ -48,8 +40,6 @@ public void CalculateDefaultBufferSize(int concurrencyLevel, int capacity, int e bufferSize.Read.StripeCount.Should().Be(expectedReadStripes); bufferSize.Read.BufferSize.Should().Be(expectedReadBuffer); - bufferSize.Write.StripeCount.Should().Be(expecteWriteStripes); - bufferSize.Write.BufferSize.Should().Be(expecteWriteBuffer); } } } diff --git a/BitFaster.Caching/Lfu/ConcurrentLfu.cs b/BitFaster.Caching/Lfu/ConcurrentLfu.cs index aa08d77a..1b3607c4 100644 --- a/BitFaster.Caching/Lfu/ConcurrentLfu.cs +++ b/BitFaster.Caching/Lfu/ConcurrentLfu.cs @@ -39,7 +39,7 @@ public sealed class ConcurrentLfu : ICache, IAsyncCache, IBoun private readonly ConcurrentDictionary> dictionary; private readonly StripedMpscBuffer> readBuffer; - private readonly StripedMpscBuffer> writeBuffer; + private readonly MpscBoundedBuffer> writeBuffer; private readonly CacheMetrics metrics = new CacheMetrics(); @@ -82,7 +82,10 @@ public ConcurrentLfu(int concurrencyLevel, int capacity, IScheduler scheduler, I this.dictionary = new ConcurrentDictionary>(concurrencyLevel, capacity, comparer); this.readBuffer = new StripedMpscBuffer>(bufferSize.Read); - this.writeBuffer = new StripedMpscBuffer>(bufferSize.Write); + + // Cap the write buffer to the cache size, or 128. Whichever is smaller. + int writeBufferSize = Math.Min(BitOps.CeilingPowerOfTwo(capacity), 128); + this.writeBuffer = new MpscBoundedBuffer>(writeBufferSize); this.cmSketch = new CmSketch(1, comparer); this.cmSketch.EnsureCapacity(capacity); @@ -447,7 +450,7 @@ private bool Maintenance(LfuNode droppedWrite = null) OnAccess(localDrainBuffer[i]); } - int writeCount = this.writeBuffer.DrainTo(localDrainBuffer); + int writeCount = this.writeBuffer.DrainTo(new ArraySegment>(localDrainBuffer)); for (int i = 0; i < writeCount; i++) { @@ -819,7 +822,7 @@ public LfuDebugView(ConcurrentLfu lfu) public StripedMpscBuffer> ReadBuffer => this.lfu.readBuffer; - public StripedMpscBuffer> WriteBuffer => this.lfu.writeBuffer; + public MpscBoundedBuffer> WriteBuffer => this.lfu.writeBuffer; public KeyValuePair[] Items { diff --git a/BitFaster.Caching/Lfu/LfuBufferSize.cs b/BitFaster.Caching/Lfu/LfuBufferSize.cs index 4f7f582c..862224a0 100644 --- a/BitFaster.Caching/Lfu/LfuBufferSize.cs +++ b/BitFaster.Caching/Lfu/LfuBufferSize.cs @@ -14,17 +14,13 @@ public class LfuBufferSize /// public const int DefaultBufferSize = 128; - private const int MaxWriteBufferTotalSize = 1024; - /// /// Initializes a new instance of the LfuBufferSize class with the specified read and write buffer sizes. /// /// The read buffer size. - /// The write buffer size. - public LfuBufferSize(StripedBufferSize readBufferSize, StripedBufferSize writeBufferSize) + public LfuBufferSize(StripedBufferSize readBufferSize) { Read = readBufferSize ?? throw new ArgumentNullException(nameof(readBufferSize)); - Write = writeBufferSize ?? throw new ArgumentNullException(nameof(writeBufferSize)); } /// @@ -32,11 +28,6 @@ public LfuBufferSize(StripedBufferSize readBufferSize, StripedBufferSize writeBu /// public StripedBufferSize Read { get; } - /// - /// Gets the write buffer size. - /// - public StripedBufferSize Write { get; } - /// /// Estimates default buffer sizes intended to give optimal throughput. /// @@ -48,8 +39,7 @@ public static LfuBufferSize Default(int concurrencyLevel, int capacity) if (capacity < 13) { return new LfuBufferSize( - new StripedBufferSize(32, 1), - new StripedBufferSize(16, 1)); + new StripedBufferSize(32, 1)); } // cap concurrency at proc count * 2 @@ -61,14 +51,8 @@ public static LfuBufferSize Default(int concurrencyLevel, int capacity) concurrencyLevel /= 2; } - // Constrain write buffer size so that the LFU dictionary will not ever end up with more than 2x cache - // capacity entries before maintenance runs. - int writeBufferTotalSize = Math.Min(BitOps.CeilingPowerOfTwo(capacity), MaxWriteBufferTotalSize); - int writeStripeSize = Math.Min(BitOps.CeilingPowerOfTwo(Math.Max(writeBufferTotalSize / concurrencyLevel, 4)), 128); - return new LfuBufferSize( - new StripedBufferSize(DefaultBufferSize, concurrencyLevel), - new StripedBufferSize(writeStripeSize, concurrencyLevel)); + new StripedBufferSize(DefaultBufferSize, concurrencyLevel)); } } }