Skip to content

Commit

Permalink
Add TranslateKey to PartitionedRateLimiter (#69407)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy committed Jul 21, 2022
1 parent 2fc9108 commit d6fef3b
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public abstract partial class PartitionedRateLimiter<TResource> : System.IAsyncD
public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
protected virtual System.Threading.Tasks.ValueTask DisposeAsyncCore() { throw null; }
public abstract int GetAvailablePermits(TResource resource);
public System.Threading.RateLimiting.PartitionedRateLimiter<TOuter> TranslateKey<TOuter>(System.Func<TOuter, TResource> keyAdapter) { throw null; }
public System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> WaitAndAcquireAsync(TResource resource, int permitCount = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected abstract System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> WaitAndAcquireAsyncCore(TResource resource, int permitCount, System.Threading.CancellationToken cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(NetCoreAppCurrent);$(NetCoreAppMinimum);netstandard2.0;$(NetFrameworkMinimum)</TargetFrameworks>
<IsPackable>true</IsPackable>
Expand Down Expand Up @@ -37,6 +37,7 @@ System.Threading.RateLimiting.RateLimitLease</PackageDescription>
<Compile Include="System\Threading\RateLimiting\TokenBucketRateLimiter.cs" />
<Compile Include="System\Threading\RateLimiting\TokenBucketRateLimiterOptions.cs" />
<Compile Include="$(CommonPath)System\Collections\Generic\Deque.cs" Link="Common\System\Collections\Generic\Deque.cs" />
<Compile Include="System\Threading\RateLimiting\TranslatingLimiter.cs" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,23 @@ public async ValueTask DisposeAsync()
// Suppress finalization.
GC.SuppressFinalize(this);
}

/// <summary>
/// Translates PartitionedRateLimiter&lt;TOuter&gt; into the current <see cref="PartitionedRateLimiter{TResource}"/>
/// using the <paramref name="keyAdapter"/> to translate <typeparamref name="TOuter"/> to <typeparamref name="TResource"/>.
/// </summary>
/// <typeparam name="TOuter">The type to translate into <typeparamref name="TResource"/>.</typeparam>
/// <param name="keyAdapter">The function to be called every time a <typeparamref name="TOuter"/> is passed to
/// PartitionedRateLimiter&lt;TOuter&gt;.Acquire(TOuter, int) or PartitionedRateLimiter&lt;TOuter&gt;.WaitAsync(TOuter, int, CancellationToken).</param>
/// <remarks><see cref="PartitionedRateLimiter{TResource}.Dispose()"/> or <see cref="PartitionedRateLimiter{TResource}.DisposeAsync()"/> does not dispose the wrapped <see cref="PartitionedRateLimiter{TResource}"/>.</remarks>
/// <returns>A new PartitionedRateLimiter&lt;TOuter&gt; that translates <typeparamref name="TOuter"/>
/// to <typeparamref name="TResource"/> and calls the inner <see cref="PartitionedRateLimiter{TResource}"/>.</returns>
public PartitionedRateLimiter<TOuter> TranslateKey<TOuter>(Func<TOuter, TResource> keyAdapter)
{
// REVIEW: Do we want to have an option to dispose the inner limiter?
// Should the default be to dispose the inner limiter and have an option to not dispose it?
// See Stream wrappers like SslStream for prior-art
return new TranslatingLimiter<TResource, TOuter>(this, keyAdapter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public bool GetResult()

private void Tick()
{
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
Action? continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
continuation?.Invoke();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace System.Threading.RateLimiting
{
internal sealed class TranslatingLimiter<TInner, TResource> : PartitionedRateLimiter<TResource>
{
private readonly PartitionedRateLimiter<TInner> _innerRateLimiter;
private readonly Func<TResource, TInner> _keyAdapter;

private bool _disposed;

public TranslatingLimiter(PartitionedRateLimiter<TInner> inner, Func<TResource, TInner> keyAdapter)
{
_innerRateLimiter = inner;
_keyAdapter = keyAdapter;
}

public override int GetAvailablePermits(TResource resource)
{
ThrowIfDispose();
TInner key = _keyAdapter(resource);
return _innerRateLimiter.GetAvailablePermits(key);
}

protected override RateLimitLease AcquireCore(TResource resource, int permitCount)
{
ThrowIfDispose();
TInner key = _keyAdapter(resource);
return _innerRateLimiter.Acquire(key, permitCount);
}

protected override ValueTask<RateLimitLease> WaitAndAcquireAsyncCore(TResource resource, int permitCount, CancellationToken cancellationToken)
{
ThrowIfDispose();
TInner key = _keyAdapter(resource);
return _innerRateLimiter.WaitAndAcquireAsync(key, permitCount, cancellationToken);
}

protected override void Dispose(bool disposing)
{
_disposed = true;
base.Dispose(disposing);
}

protected override ValueTask DisposeAsyncCore()
{
_disposed = true;
return base.DisposeAsyncCore();
}

private void ThrowIfDispose()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(PartitionedRateLimiter));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -595,5 +595,197 @@ public async Task ThrowingTryReplenishDoesNotPreventIdleLimiterBeingCleanedUp()
// Wait for Timer to run again which will see the throwing TryReplenish and an idle limiter it needs to clean-up
await disposeTcs.Task;
}

// Translate

[Fact]
public void Translate_AcquirePassesThroughToInnerLimiter()
{
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
else
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
});

var translateCallCount = 0;
var translateLimiter = limiter.TranslateKey<int>(i =>
{
translateCallCount++;
return i.ToString();
});

var lease = translateLimiter.Acquire(1);
Assert.True(lease.IsAcquired);
Assert.Equal(1, translateCallCount);

var lease2 = limiter.Acquire("1");
Assert.False(lease2.IsAcquired);

lease.Dispose();

lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);

Assert.Equal(1, translateCallCount);
}

[Fact]
public async Task Translate_WaitAsyncPassesThroughToInnerLimiter()
{
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
else
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
});

var translateCallCount = 0;
var translateLimiter = limiter.TranslateKey<int>(i =>
{
translateCallCount++;
return i.ToString();
});

var lease = await translateLimiter.WaitAndAcquireAsync(1);
Assert.True(lease.IsAcquired);
Assert.Equal(1, translateCallCount);

var lease2 = limiter.Acquire("1");
Assert.False(lease2.IsAcquired);

lease.Dispose();

lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);

Assert.Equal(1, translateCallCount);
}

[Fact]
public void Translate_GetAvailablePermitsPassesThroughToInnerLimiter()
{
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
else
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
});

var translateCallCount = 0;
var translateLimiter = limiter.TranslateKey<int>(i =>
{
translateCallCount++;
return i.ToString();
});

Assert.Equal(1, translateLimiter.GetAvailablePermits(1));
Assert.Equal(1, translateCallCount);

var lease = translateLimiter.Acquire(1);
Assert.True(lease.IsAcquired);
Assert.Equal(2, translateCallCount);
Assert.Equal(0, translateLimiter.GetAvailablePermits(1));
Assert.Equal(3, translateCallCount);

var lease2 = limiter.Acquire("1");
Assert.False(lease2.IsAcquired);

lease.Dispose();

Assert.Equal(1, translateLimiter.GetAvailablePermits(1));
Assert.Equal(4, translateCallCount);

lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);

Assert.Equal(0, translateLimiter.GetAvailablePermits(1));
Assert.Equal(5, translateCallCount);
}

[Fact]
public void Translate_DisposeDoesNotDisposeInnerLimiter()
{
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
else
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
});

var translateCallCount = 0;
var translateLimiter = limiter.TranslateKey<int>(i =>
{
translateCallCount++;
return i.ToString();
});

translateLimiter.Dispose();

var lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);

Assert.Throws<ObjectDisposedException>(() => translateLimiter.Acquire(1));
}

[Fact]
public async Task Translate_DisposeAsyncDoesNotDisposeInnerLimiter()
{
using var limiter = PartitionedRateLimiter.Create<string, int>(resource =>
{
if (resource == "1")
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
else
{
return RateLimitPartition.GetConcurrencyLimiter(1,
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
});

var translateCallCount = 0;
var translateLimiter = limiter.TranslateKey<int>(i =>
{
translateCallCount++;
return i.ToString();
});

await translateLimiter.DisposeAsync();

var lease = limiter.Acquire("1");
Assert.True(lease.IsAcquired);

Assert.Throws<ObjectDisposedException>(() => translateLimiter.Acquire(1));
}
}
}

0 comments on commit d6fef3b

Please sign in to comment.