Skip to content

Commit

Permalink
[RateLimiting] Dequeue items when queuing with NewestFirst (#63377)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy committed Jan 19, 2022
1 parent 12f434d commit 67848f2
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,27 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, Canc
return new ValueTask<RateLimitLease>(lease);
}

// Don't queue if queue limit reached
if (_queueCount + permitCount > _options.QueueLimit)
// Avoid integer overflow by using subtraction instead of addition
Debug.Assert(_options.QueueLimit >= _queueCount);
if (_options.QueueLimit - _queueCount < permitCount)
{
return new ValueTask<RateLimitLease>(QueueLimitLease);
if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit)
{
// remove oldest items from queue until there is space for the newest request
do
{
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
Debug.Assert(_queueCount >= 0);
oldestRequest.Tcs.TrySetResult(FailedLease);
}
while (_options.QueueLimit - _queueCount < permitCount);
}
else
{
// Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst
return new ValueTask<RateLimitLease>(QueueLimitLease);
}
}

TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,27 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int tokenCount, Cance
return new ValueTask<RateLimitLease>(lease);
}

// Don't queue if queue limit reached
if (_queueCount + tokenCount > _options.QueueLimit)
// Avoid integer overflow by using subtraction instead of addition
Debug.Assert(_options.QueueLimit >= _queueCount);
if (_options.QueueLimit - _queueCount < tokenCount)
{
return new ValueTask<RateLimitLease>(CreateFailedTokenLease(tokenCount));
if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && tokenCount <= _options.QueueLimit)
{
// remove oldest items from queue until there is space for the newest acquisition request
do
{
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
Debug.Assert(_queueCount >= 0);
oldestRequest.Tcs.TrySetResult(FailedLease);
}
while (_options.QueueLimit - _queueCount < tokenCount);
}
else
{
// Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst
return new ValueTask<RateLimitLease>(CreateFailedTokenLease(tokenCount));
}
}

TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,23 @@ public abstract class BaseRateLimiterTests
public abstract Task CanAcquireResourceAsync_QueuesAndGrabsNewest();

[Fact]
public abstract Task FailsWhenQueuingMoreThanLimit();
public abstract Task FailsWhenQueuingMoreThanLimit_OldestFirst();

[Fact]
public abstract Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst();

[Fact]
public abstract Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst();

[Fact]
public abstract Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst();

[Fact]
public abstract Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable();

[Fact]
public abstract Task LargeAcquiresAndQueuesDoNotIntegerOverflow();

[Fact]
public abstract void ThrowsWhenAcquiringMoreThanLimit();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest()
}

[Fact]
public override async Task FailsWhenQueuingMoreThanLimit()
public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst()
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1));
using var lease = limiter.Acquire(1);
var wait = limiter.WaitAsync(1);

Expand All @@ -105,11 +105,96 @@ public override async Task FailsWhenQueuingMoreThanLimit()
}

[Fact]
public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable()
public override async Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst()
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
var lease = limiter.Acquire(1);
var wait = limiter.WaitAsync(1);
Assert.False(wait.IsCompleted);

var wait2 = limiter.WaitAsync(1);
var lease1 = await wait;
Assert.False(lease1.IsAcquired);
Assert.False(wait2.IsCompleted);

lease.Dispose();

lease = await wait2;
Assert.True(lease.IsAcquired);
}

[Fact]
public override async Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst()
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2));
var lease = limiter.Acquire(2);
Assert.True(lease.IsAcquired);
var wait = limiter.WaitAsync(1);
Assert.False(wait.IsCompleted);

var wait2 = limiter.WaitAsync(1);
Assert.False(wait2.IsCompleted);

var wait3 = limiter.WaitAsync(2);
var lease1 = await wait;
var lease2 = await wait2;
Assert.False(lease1.IsAcquired);
Assert.False(lease2.IsAcquired);
Assert.False(wait3.IsCompleted);

lease.Dispose();

lease = await wait3;
Assert.True(lease.IsAcquired);
}

[Fact]
public override async Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst()
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1));
var lease = limiter.Acquire(2);
Assert.True(lease.IsAcquired);

// Fill queue
var wait = limiter.WaitAsync(1);
Assert.False(wait.IsCompleted);

var lease1 = await limiter.WaitAsync(2);
Assert.False(lease1.IsAcquired);

lease.Dispose();
var lease2 = await wait;
Assert.True(lease2.IsAcquired);
}

[Fact]
public override async Task LargeAcquiresAndQueuesDoNotIntegerOverflow()
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(int.MaxValue, QueueProcessingOrder.NewestFirst, int.MaxValue));
var lease = limiter.Acquire(int.MaxValue);
Assert.True(lease.IsAcquired);

// Fill queue
var wait = limiter.WaitAsync(3);
Assert.False(wait.IsCompleted);

var wait2 = limiter.WaitAsync(int.MaxValue);
Assert.False(wait2.IsCompleted);

var lease1 = await wait;
Assert.False(lease1.IsAcquired);

lease.Dispose();
var lease2 = await wait2;
Assert.True(lease2.IsAcquired);
}

[Fact]
public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable()
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1));
var lease = limiter.Acquire(1);
var wait = limiter.WaitAsync(1);

var failedLease = await limiter.WaitAsync(1);
Assert.False(failedLease.IsAcquired);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ public override async Task CanAcquireResourceAsync_QueuesAndGrabsNewest()
}

[Fact]
public override async Task FailsWhenQueuingMoreThanLimit()
public override async Task FailsWhenQueuingMoreThanLimit_OldestFirst()
{
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
TimeSpan.Zero, 1, autoReplenishment: false));
using var lease = limiter.Acquire(1);
var wait = limiter.WaitAsync(1);
Expand All @@ -125,9 +125,77 @@ public override async Task FailsWhenQueuingMoreThanLimit()
}

[Fact]
public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable()
public override async Task DropsOldestWhenQueuingMoreThanLimit_NewestFirst()
{
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1,
TimeSpan.Zero, 1, autoReplenishment: false));
var lease = limiter.Acquire(1);
var wait = limiter.WaitAsync(1);
Assert.False(wait.IsCompleted);

var wait2 = limiter.WaitAsync(1);
var lease1 = await wait;
Assert.False(lease1.IsAcquired);
Assert.False(wait2.IsCompleted);

limiter.TryReplenish();

lease = await wait2;
Assert.True(lease.IsAcquired);
}

[Fact]
public override async Task DropsMultipleOldestWhenQueuingMoreThanLimit_NewestFirst()
{
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2,
TimeSpan.Zero, 1, autoReplenishment: false));
var lease = limiter.Acquire(2);
Assert.True(lease.IsAcquired);
var wait = limiter.WaitAsync(1);
Assert.False(wait.IsCompleted);

var wait2 = limiter.WaitAsync(1);
Assert.False(wait2.IsCompleted);

var wait3 = limiter.WaitAsync(2);
var lease1 = await wait;
var lease2 = await wait2;
Assert.False(lease1.IsAcquired);
Assert.False(lease2.IsAcquired);
Assert.False(wait3.IsCompleted);

limiter.TryReplenish();
limiter.TryReplenish();

lease = await wait3;
Assert.True(lease.IsAcquired);
}

[Fact]
public override async Task DropsRequestedLeaseIfPermitCountGreaterThanQueueLimitAndNoAvailability_NewestFirst()
{
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(2, QueueProcessingOrder.NewestFirst, 1,
TimeSpan.Zero, 1, autoReplenishment: false));
var lease = limiter.Acquire(2);
Assert.True(lease.IsAcquired);

// Fill queue
var wait = limiter.WaitAsync(1);
Assert.False(wait.IsCompleted);

var lease1 = await limiter.WaitAsync(2);
Assert.False(lease1.IsAcquired);

limiter.TryReplenish();

lease = await wait;
Assert.True(lease.IsAcquired);
}

[Fact]
public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAvailable()
{
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 1,
TimeSpan.Zero, 1, autoReplenishment: false));
var lease = limiter.Acquire(1);
var wait = limiter.WaitAsync(1);
Expand All @@ -147,6 +215,29 @@ public override async Task QueueAvailableAfterQueueLimitHitAndResources_BecomeAv
Assert.True(lease.IsAcquired);
}

[Fact]
public override async Task LargeAcquiresAndQueuesDoNotIntegerOverflow()
{
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(int.MaxValue, QueueProcessingOrder.NewestFirst, int.MaxValue,
TimeSpan.Zero, int.MaxValue, autoReplenishment: false));
var lease = limiter.Acquire(int.MaxValue);
Assert.True(lease.IsAcquired);

// Fill queue
var wait = limiter.WaitAsync(3);
Assert.False(wait.IsCompleted);

var wait2 = limiter.WaitAsync(int.MaxValue);
Assert.False(wait2.IsCompleted);

var lease1 = await wait;
Assert.False(lease1.IsAcquired);

limiter.TryReplenish();
var lease2 = await wait2;
Assert.True(lease2.IsAcquired);
}

[Fact]
public override void ThrowsWhenAcquiringMoreThanLimit()
{
Expand Down

0 comments on commit 67848f2

Please sign in to comment.