Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 214 additions & 11 deletions BitFaster.Caching.UnitTests/Lru/ConcurrentLruTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

using System.Collections.Concurrent;
using System.Reflection;
using System.Runtime.CompilerServices;

namespace BitFaster.Caching.UnitTests.Lru
{
public class ConcurrentLruTests
Expand Down Expand Up @@ -1088,19 +1091,149 @@ public void WhenItemsAreTrimmedAnEventIsFired()
}

[Fact]
public async Task WhenItemsAreScannedInParallelCapacityIsNotExceeded()
public async Task WhenSoakConcurrentGetCacheEndsInConsistentState()
{
await Threaded.Run(4, () => {
for (int i = 0; i < 100000; i++)
{
lru.GetOrAdd(i + 1, i =>i.ToString());
}
});
for (int i = 0; i < 10; i++)
{
await Threaded.Run(4, () => {
for (int i = 0; i < 100000; i++)
{
lru.GetOrAdd(i + 1, i =>i.ToString());
}
});

this.testOutputHelper.WriteLine($"{lru.HotCount} {lru.WarmCount} {lru.ColdCount}");
this.testOutputHelper.WriteLine($"{lru.HotCount} {lru.WarmCount} {lru.ColdCount}");
this.testOutputHelper.WriteLine(string.Join(" ", lru.Keys));

// allow +/- 1 variance for capacity
lru.Count.Should().BeCloseTo(9, 1);
// allow +/- 1 variance for capacity
lru.Count.Should().BeCloseTo(9, 1);
RunIntegrityCheck();
}
}

[Fact]
public async Task WhenSoakConcurrentGetAsyncCacheEndsInConsistentState()
{
for (int i = 0; i < 10; i++)
{
await Threaded.RunAsync(4, async () => {
for (int i = 0; i < 100000; i++)
{
await lru.GetOrAddAsync(i + 1, i => Task.FromResult(i.ToString()));
}
});

this.testOutputHelper.WriteLine($"{lru.HotCount} {lru.WarmCount} {lru.ColdCount}");
this.testOutputHelper.WriteLine(string.Join(" ", lru.Keys));

// allow +/- 1 variance for capacity
lru.Count.Should().BeCloseTo(9, 1);
RunIntegrityCheck();
}
}

[Fact]
public async Task WhenSoakConcurrentGetWithArgCacheEndsInConsistentState()
{
for (int i = 0; i < 10; i++)
{
await Threaded.Run(4, () => {
for (int i = 0; i < 100000; i++)
{
// use the arg overload
lru.GetOrAdd(i + 1, (i, s) => i.ToString(), "Foo");
}
});

this.testOutputHelper.WriteLine($"{lru.HotCount} {lru.WarmCount} {lru.ColdCount}");
this.testOutputHelper.WriteLine(string.Join(" ", lru.Keys));

// allow +/- 1 variance for capacity
lru.Count.Should().BeCloseTo(9, 1);
RunIntegrityCheck();
}
}

[Fact]
public async Task WhenSoakConcurrentGetAsyncWithArgCacheEndsInConsistentState()
{
for (int i = 0; i < 10; i++)
{
await Threaded.RunAsync(4, async () => {
for (int i = 0; i < 100000; i++)
{
// use the arg overload
await lru.GetOrAddAsync(i + 1, (i, s) => Task.FromResult(i.ToString()), "Foo");
}
});

this.testOutputHelper.WriteLine($"{lru.HotCount} {lru.WarmCount} {lru.ColdCount}");
this.testOutputHelper.WriteLine(string.Join(" ", lru.Keys));

// allow +/- 1 variance for capacity
lru.Count.Should().BeCloseTo(9, 1);
RunIntegrityCheck();
}
}

[Fact]
public async Task WhenSoakConcurrentGetAndRemoveCacheEndsInConsistentState()
{
for (int i = 0; i < 10; i++)
{
await Threaded.Run(4, () => {
for (int i = 0; i < 100000; i++)
{
lru.TryRemove(i + 1);
lru.GetOrAdd(i + 1, i => i.ToString());
}
});

this.testOutputHelper.WriteLine($"{lru.HotCount} {lru.WarmCount} {lru.ColdCount}");
this.testOutputHelper.WriteLine(string.Join(" ", lru.Keys));

RunIntegrityCheck();
}
}

[Fact]
public async Task WhenSoakConcurrentGetAndUpdateCacheEndsInConsistentState()
{
for (int i = 0; i < 10; i++)
{
await Threaded.Run(4, () => {
for (int i = 0; i < 100000; i++)
{
lru.TryUpdate(i + 1, i.ToString());
lru.GetOrAdd(i + 1, i => i.ToString());
}
});

this.testOutputHelper.WriteLine($"{lru.HotCount} {lru.WarmCount} {lru.ColdCount}");
this.testOutputHelper.WriteLine(string.Join(" ", lru.Keys));

RunIntegrityCheck();
}
}

[Fact]
public async Task WhenSoakConcurrentGetAndAddCacheEndsInConsistentState()
{
for (int i = 0; i < 10; i++)
{
await Threaded.Run(4, () => {
for (int i = 0; i < 100000; i++)
{
lru.AddOrUpdate(i + 1, i.ToString());
lru.GetOrAdd(i + 1, i => i.ToString());
}
});

this.testOutputHelper.WriteLine($"{lru.HotCount} {lru.WarmCount} {lru.ColdCount}");
this.testOutputHelper.WriteLine(string.Join(" ", lru.Keys));

RunIntegrityCheck();
}
}

private void Warmup()
Expand All @@ -1115,5 +1248,75 @@ private void Warmup()
lru.GetOrAdd(-8, valueFactory.Create);
lru.GetOrAdd(-9, valueFactory.Create);
}

private void RunIntegrityCheck()
{
new ConcurrentLruIntegrityChecker<int, string, LruItem<int, string>, LruPolicy<int, string>, TelemetryPolicy<int, string>>(this.lru).Validate();
}
}

public class ConcurrentLruIntegrityChecker<K, V, I, P, T>
where I : LruItem<K, V>
where P : struct, IItemPolicy<K, V, I>
where T : struct, ITelemetryPolicy<K, V>
{
private readonly ConcurrentLruCore<K, V, I, P, T> cache;

private readonly ConcurrentQueue<I> hotQueue;
private readonly ConcurrentQueue<I> warmQueue;
private readonly ConcurrentQueue<I> coldQueue;

private static FieldInfo hotQueueField = typeof(ConcurrentLruCore<K, V, I, P, T>).GetField("hotQueue", BindingFlags.NonPublic | BindingFlags.Instance);
private static FieldInfo warmQueueField = typeof(ConcurrentLruCore<K, V, I, P, T>).GetField("warmQueue", BindingFlags.NonPublic | BindingFlags.Instance);
private static FieldInfo coldQueueField = typeof(ConcurrentLruCore<K, V, I, P, T>).GetField("coldQueue", BindingFlags.NonPublic | BindingFlags.Instance);

public ConcurrentLruIntegrityChecker(ConcurrentLruCore<K, V, I, P, T> cache)
{
this.cache = cache;

// get queues via reflection
this.hotQueue = (ConcurrentQueue<I>)hotQueueField.GetValue(cache);
this.warmQueue = (ConcurrentQueue<I>)warmQueueField.GetValue(cache);
this.coldQueue = (ConcurrentQueue<I>)coldQueueField.GetValue(cache);
}

public void Validate()
{
// queue counters must be consistent with queues
this.hotQueue.Count.Should().Be(cache.HotCount, "hot queue has a corrupted count");
this.warmQueue.Count.Should().Be(cache.WarmCount, "warm queue has a corrupted count");
this.coldQueue.Count.Should().Be(cache.ColdCount, "cold queue has a corrupted count");

// cache contents must be consistent with queued items
ValidateQueue(cache, this.hotQueue, "hot");
ValidateQueue(cache, this.warmQueue, "warm");
ValidateQueue(cache, this.coldQueue, "cold");

// cache must be within capacity
cache.Count.Should().BeLessThanOrEqualTo(cache.Capacity + 1, "capacity out of valid range");
}

private void ValidateQueue(ConcurrentLruCore<K, V, I, P, T> cache, ConcurrentQueue<I> queue, string queueName)
{
foreach (var item in queue)
{
if (item.WasRemoved)
{
// It is possible for the queues to contain 2 (or more) instances of the same key/item. One that was removed,
// and one that was added after the other was removed.
// In this case, the dictionary may contain the value only if the queues contain an entry for that key marked as WasRemoved == false.
if (cache.TryGet(item.Key, out var value))
{
hotQueue.Union(warmQueue).Union(coldQueue)
.Any(i => i.Key.Equals(item.Key) && !i.WasRemoved)
.Should().BeTrue($"{queueName} removed item {item.Key} was not removed");
}
}
else
{
cache.TryGet(item.Key, out var value).Should().BeTrue($"{queueName} item {item.Key} was not present");
}
}
}
}
}
25 changes: 25 additions & 0 deletions BitFaster.Caching.UnitTests/Threaded.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,30 @@ public static async Task Run(int threadCount, Action<int> action)

await Task.WhenAll(tasks);
}

public static Task RunAsync(int threadCount, Func<Task> action)
{
return Run(threadCount, i => action());
}

public static async Task RunAsync(int threadCount, Func<int, Task> action)
{
var tasks = new Task[threadCount];
ManualResetEvent mre = new ManualResetEvent(false);

for (int i = 0; i < threadCount; i++)
{
int run = i;
tasks[i] = Task.Run(async () =>
{
mre.WaitOne();
await action(run);
});
}

mre.Set();

await Task.WhenAll(tasks);
}
}
}