Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate limit with wait and retry of the remaining piece of the time window #930

Closed
gabriele-ricci-kyklos opened this issue Mar 17, 2022 · 9 comments
Labels

Comments

@gabriele-ricci-kyklos
Copy link

Summary: What are you wanting to achieve?
I have a rate-limited API that I need to call, limits are of 300 calls for 5 minutes.
I wanted to implement a rate limit with a retry and I did successfully with this code:

int maxCalls = 300;
int intervalInSeconds = 300;

var rateLimitPolicy =
    Policy
        .RateLimitAsync(maxCalls, TimeSpan.FromSeconds(intervalInSeconds), maxCalls);

var retryPolicy =
    Policy
        .Handle<RateLimitRejectedException>()
        .WaitAndRetryAsync(retryCount: 3, retryNumber => TimeSpan.FromSeconds(intervalInSeconds))
        .WrapAsync(rateLimitPolicy);

retryPolicy.ExecuteAsync( //my API call here

Here I'm using the Polly.Contrib.WaitAndRetry package to help with the WaitAndRetry policy.
With this code I'm able to do what I want, but once the limit is reached the code will wait for the entire time window (5 mins) before trying again. I'd like to wait only for the remaining part of the time window, this info is inside the exception RateLimitRejectedException but I have no idea how to set it as the wait time in the retry policy.

Also, I read in docs of a factory retryAfterFactory but I don't understand how to use it, since the policy becomes typed on TimeSpan and when calling Execute wants a TimeSpan back:

// Allow up to 20 executions per second with a burst of 10 executions,
// with a delegate to return the retry-after value to use if the rate
// limit is exceeded.
Policy.RateLimit(20, TimeSpan.FromSeconds(1), 10, (retryAfter, context) =>
{
    return retryAfter.Add(TimeSpan.FromSeconds(2));
});

//calling Execute accepts Func<TimeSpan> action but I don't know what to provide

How can I achieve this behavior?

@martincostello
Copy link
Member

The retryAfterFactory delegate is used to create the value of the RetryAfter property on the RateLimitRejectedException that is thrown when the rate limit is exceeded. It's not related to the delegate used to invoke the policy itself (or at least, it shouldn't be). Do you have an example of the specific code you're using to call Execute on your rate limit policy? The code example you've provided doesn't show it being used, just created.

As to your original question my (untried and untested) suggestion is to:

  1. Use the Context parameter on the retryAfterFactory delegate to inspect the exception that occurs when you're rate-limited to determine when the retry should next be attempted, and then store that value in the Context before returning it to the caller.
  2. Use the overload of WaitAndRetryAsync() that accepts a Context
    public static AsyncRetryPolicy WaitAndRetryAsync(this PolicyBuilder policyBuilder, int retryCount,
    Func<int, Context, TimeSpan> sleepDurationProvider, Action<Exception, TimeSpan, Context> onRetry)
    and read the value you placed in it in your retryAfterFactory to return that as the wait period.

It might be that there's some context missing somewhere that means that doesn't quite work, but it might give you something to go on to get it to do what you want. It's possible that this wasn't a use case catered for in the original design, and it isn't currently possible with the functionality available via the public API of Polly.

@gabriele-ricci-kyklos
Copy link
Author

@martincostello this is the complete working example I used to simulate 40 API calls in parallel, with a limit of 3 calls every 5 seconds:

int maxCalls = 3;
int intervalInSeconds = 5;

var rateLimitPolicy =
    Policy
        .RateLimitAsync(maxCalls, TimeSpan.FromSeconds(intervalInSeconds), maxCalls);

var retryPolicy =
    Policy
        .Handle<RateLimitRejectedException>()
        .WaitAndRetryAsync
        (
            retryCount: 3,
            retryNumber => TimeSpan.FromSeconds(intervalInSeconds)
        )
        .WrapAsync(rateLimitPolicy);

ConcurrentBag<Task> tasks = new ();

Parallel
    .ForEach
    (
        Enumerable.Range(1, 40),
        n =>
        {
            Task t =
            retryPolicy
                .ExecuteAsync(() =>
                {
                    Console.WriteLine($"{DateTime.Now:HH:mm:ss:ff} - Call #{n}");
                    return Task.CompletedTask;
                });

            tasks.Add(t);
        }
    );

await Task.WhenAll(tasks);

Now if I try to use the RateLimit overload with retryAfterFactory, I get that the policy gets typed on TResult:

public static AsyncRateLimitPolicy<TResult> RateLimitAsync<TResult>(int numberOfExecutions, TimeSpan perTimeSpan, Func<TimeSpan, Context, TResult> retryAfterFactory)

If I try to implement retryAfterFactory the policy gets typed, and later when I call Execute it expects a TResult.
How can I implement this correctly?

@martincostello
Copy link
Member

My mistake, the delegate has a different purpose with rate limiting.

The delegate is used to return the result to return if the execution is rate limited. The TimeSpan for the Retry-After value is provided along with the Context. The value returned by the delegate is returned to the caller of the policy instead of an exception being thrown:

if (retryAfterFactory != null)
{
return retryAfterFactory(retryAfter, context);
}
throw new RateLimitRejectedException(retryAfter);

if (retryAfterFactory != null)
{
return retryAfterFactory(retryAfter, context);
}
throw new RateLimitRejectedException(retryAfter);

On that basis you should be able to get what you're trying to achieve like this:

[Fact]
public async Task Rate_Limit_Wait_Until_Retry_After()
{
    int maxCalls = 3;
    int intervalInSeconds = 5;

    var rateLimitPolicy =
        Policy
            .RateLimitAsync(maxCalls, TimeSpan.FromSeconds(intervalInSeconds), maxCalls);

    var retryPolicy =
        Policy
            .Handle<RateLimitRejectedException>()
            .WaitAndRetryAsync(3, (retryNumber, context) => (TimeSpan)context["Retry-After"], // This will have a value of 42 seconds
            (_, _, _) => { })
            .WrapAsync(rateLimitPolicy);

    await retryPolicy.ExecuteAsync(context =>
    {
        try
        {
            // Your API call goes here - throwing the exception is just for demonstration purposes
            throw new RateLimitRejectedException(TimeSpan.FromSeconds(42));
        }
        catch (RateLimitRejectedException ex)
        {
            context.Add("Retry-After", ex.RetryAfter);
            throw;
        }
    },
    new Context());
}

@gabriele-ricci-kyklos
Copy link
Author

@martincostello thank you for your help.
I'm missing where to add the value to the context, since in my case I do not throw any exception as it's done by the rate-limit policy itself, I only handle it with another policy (wait & retry).

@martincostello
Copy link
Member

Yes, you need to add it to your code as-per the example.

@gabriele-ricci-kyklos
Copy link
Author

@martincostello how can I throw the RateLimitRejectedException though? It should be thrown by the policy, not by the Action supplied to the ExecuteAsync

@martincostello
Copy link
Member

Just like in the code sample:

image

@martincostello
Copy link
Member

Sorry, I just realised there was something I forgot to do in the original code sample, I'll see if I can extend it.

Ultimately though, you need to layer the policies to achieve what you want - you can't do it out of the box by just wrapping one policy in another.

@gabriele-ricci-kyklos
Copy link
Author

gabriele-ricci-kyklos commented Mar 18, 2022

@martincostello you're absolutely right, since the retry needs to work on the single call while the rate-limit must be defined globally (for all calls).
EDIT: this is the latest version, simulating calls coming from multiple threads:

class Program
{
    static async Task Main(string[] args)
    {
        ClientWithPolicy client = new(); //Dependency injection

        Task thread1 =
            Task.Run(async () =>
            {
                while (true)
                {
                    Random r = new();
                    int random = r.Next();
                    if (random % 2 == 0)
                    {
                        await client.DoCallWithPolicyAsync(1);
                    }

                    if (random % 5 == 0)
                    {
                        Console.WriteLine($"Thread #1 - Delaying 6 seconds");
                        await Task.Delay(6000);
                    }
                }
            });

        Task thread2 =
            Task.Run(async () =>
            {
                while (true)
                {
                    Random r = new();
                    int random = r.Next();
                    if (random % 2 != 0)
                    {
                        await client.DoCallWithPolicyAsync(2);
                    }

                    if (random % 5 == 0)
                    {
                        Console.WriteLine($"Thread #2 - Delaying 8 seconds");
                        await Task.Delay(8000);
                    }
                }
            });

        await Task.WhenAll(thread1, thread2);
    }
}

class ClientWithPolicy
{
    private int _count = 0;
    private readonly AsyncPolicy _policy = Policy.RateLimitAsync(5, TimeSpan.FromSeconds(5), 5);

    public async Task DoCallWithPolicyAsync(int threadNumber)
    {
        int c = Interlocked.Increment(ref _count);

        var retryPolicy =
            Policy
                .Handle<RateLimitRejectedException>()
                .WaitAndRetryAsync
                (
                    retryCount: 3,
                    retryNumber => TimeSpan.FromMilliseconds(100),
                    (ex, ts, n, ctx) => Console.WriteLine($"Thread #{threadNumber} - Call #{c} - retry attempt #{n}")
                );

        await retryPolicy
            .ExecuteAsync(async () =>
            {
                try
                {
                    await _policy.ExecuteAsync(() => DoCallAsync(threadNumber, c));
                }
                catch (RateLimitRejectedException ex)
                {
                    await Task.Delay(ex.RetryAfter);
                    throw;
                }
            });
    }

    private static Task DoCallAsync(int threadNumber, int callNumber)
    {
        Console.WriteLine($"{DateTime.Now:HH:mm:ss:ff} Thread #{threadNumber} - Doing the call #{callNumber}");
        return Task.CompletedTask;
    }
}

Producing an output:

14:41:32:32 Thread #1 - Doing the call #1
14:41:32:32 Thread #2 - Doing the call #2
Thread #2 - Delaying 8 seconds
14:41:32:35 Thread #1 - Doing the call #3
Thread #1 - Delaying 6 seconds
14:41:38:35 Thread #1 - Doing the call #4
Thread #1 - Delaying 6 seconds
Thread #2 - Delaying 8 seconds
14:41:44:36 Thread #1 - Doing the call #5
14:41:44:36 Thread #1 - Doing the call #6
Thread #1 - Delaying 6 seconds
14:41:48:35 Thread #2 - Doing the call #7
14:41:48:35 Thread #2 - Doing the call #8
14:41:48:35 Thread #2 - Doing the call #9
14:41:48:35 Thread #2 - Doing the call #10
14:41:48:35 Thread #2 - Doing the call #11
14:41:48:35 Thread #2 - Doing the call #12
14:41:48:35 Thread #2 - Doing the call #13
Thread #2 - Call #14 - retry attempt #1
14:41:49:52 Thread #2 - Doing the call #14
Thread #2 - Delaying 8 seconds
Thread #1 - Call #15 - retry attempt #1
14:41:50:69 Thread #1 - Doing the call #15
Thread #1 - Call #16 - retry attempt #1
14:41:51:85 Thread #1 - Doing the call #16
Thread #1 - Call #17 - retry attempt #1
14:41:53:02 Thread #1 - Doing the call #17
Thread #1 - Call #18 - retry attempt #1
14:41:54:19 Thread #1 - Doing the call #18
Thread #1 - Call #19 - retry attempt #1
14:41:55:35 Thread #1 - Doing the call #19
Thread #1 - Call #20 - retry attempt #1
14:41:56:51 Thread #1 - Doing the call #20
14:41:57:53 Thread #2 - Doing the call #22
Thread #1 - Call #21 - retry attempt #1
Thread #2 - Call #23 - retry attempt #1
Thread #1 - Call #21 - retry attempt #2
14:41:58:70 Thread #1 - Doing the call #21
Thread #1 - Delaying 6 seconds
Thread #2 - Call #23 - retry attempt #2
14:41:59:86 Thread #2 - Doing the call #23
Thread #2 - Delaying 8 seconds
Thread #1 - Delaying 6 seconds
14:42:07:86 Thread #2 - Doing the call #24
Thread #2 - Delaying 8 seconds
14:42:10:72 Thread #1 - Doing the call #25
14:42:10:72 Thread #1 - Doing the call #26
Thread #1 - Delaying 6 seconds
[...]

@App-vNext App-vNext locked and limited conversation to collaborators Apr 16, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants