Skip to content

Commit

Permalink
NCBC-1929: Use Partioner for LoadTests RateLimiter
Browse files Browse the repository at this point in the history
Motivation
----------
The current SemaphoreSlim approach is adding a lot of CPU overhead.

Modifications
-------------
Use a Partioner to split the list of items to be processed into subsets
based on the rate limit.

Increased SmallDocuments test size from 5M to 10M documents to keep
test runtime large enough to give good data.

Results
-------
Tests using RateLimiter should be less skewed by the limiter itself.

Change-Id: Ic2daea1541055ceab0fe18f9de4e8c4d2fa2a7f6
Reviewed-on: http://review.couchbase.org/107909
Tested-by: Build Bot <build@couchbase.com>
Reviewed-by: Mike Goldsmith <goldsmith.mike@gmail.com>
  • Loading branch information
brantburnett authored and MikeGoldsmith committed Apr 18, 2019
1 parent 6872042 commit a11a8ba
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 26 deletions.
Expand Up @@ -30,7 +30,7 @@ public async Task SmallDocuments()
{
// Arrange

const int totalOperations = 5_000_000;
const int totalOperations = 10_000_000;
var maxSimultaneous = Environment.ProcessorCount;

var converter = new DefaultConverter();
Expand Down
Expand Up @@ -23,7 +23,7 @@ public async Task SmallDocuments()
{
// Arrange

const int totalOperations = 5_000_000;
const int totalOperations = 10_000_000;
var maxSimultaneous = Environment.ProcessorCount;

var docGenerator = new JsonDocumentGenerator(32, 1024);
Expand Down
37 changes: 13 additions & 24 deletions tests/Couchbase.LoadTests/Helpers/RateLimiter.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -15,33 +16,21 @@ public static Task ExecuteRateLimited<T>(this IEnumerable<T> source, Func<T, Tas

public static async Task ExecuteRateLimited<T>(this IEnumerable<T> source, Func<T, Task> action, int rateLimit, CancellationToken cancellationToken)
{
using (var semaphore = new SemaphoreSlim(rateLimit))
{
var tasks = new ConcurrentDictionary<int, Task>();
var i = 0;

foreach (var item in source)
var tasks = Partitioner.Create(source)
.GetPartitions(rateLimit)
.Select(partition =>
{
await semaphore.WaitAsync(cancellationToken);

var task = Task.Run(() => action.Invoke(item), cancellationToken);
tasks.TryAdd(i, task);

// ReSharper disable once AccessToDisposedClosure
#pragma warning disable 4014
task.ContinueWith((t2, state) =>
return Task.Run(async () =>
{
var j = (int) state;
semaphore.Release();
tasks.TryRemove(j, out _);
}, i, cancellationToken);
#pragma warning restore 4014

i++;
}
while (partition.MoveNext())
{
await action.Invoke(partition.Current);
}
}, cancellationToken);
})
.ToList();

await Task.WhenAll(tasks.Values);
}
await Task.WhenAll(tasks);
}
}
}

0 comments on commit a11a8ba

Please sign in to comment.