Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Implement gateway ratelimit #1537

Merged
merged 31 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bcb8b53
Implement gateway ratelimit
SubZero0 May 17, 2020
95e5d27
Remove unused code
SubZero0 May 17, 2020
a6b1633
Share WebSocketRequestQueue between clients
SubZero0 May 18, 2020
491df1c
Add global limit and a way to change gateway limits
SubZero0 May 18, 2020
e5299e8
Refactoring variable to fit lib standards
SubZero0 May 21, 2020
5c5f272
Update xml docs
SubZero0 May 22, 2020
767ff40
Update xml docs
SubZero0 May 22, 2020
e5bcc0d
Move warning to remarks
SubZero0 May 22, 2020
9c46081
Remove specific RequestQueue for WebSocket and other changes
SubZero0 May 25, 2020
bf747b9
Add summary to IdentifySemaphoreName
SubZero0 May 25, 2020
055a1f9
Fix spacing
SubZero0 May 26, 2020
73efc65
Add max_concurrency and other fixes
SubZero0 Jun 5, 2020
380a4b5
Add missing RequestQueue parameter and wrong nullable
SubZero0 Jun 5, 2020
4b8a7d0
Add RequeueQueue paramater to Webhook
SubZero0 Jun 5, 2020
5de1634
Better xml documentation
SubZero0 Jun 5, 2020
6550faf
Remove GatewayLimits class and other changes
SubZero0 Jun 5, 2020
68ec2a6
Remove unnecessary using and wording
SubZero0 Jun 5, 2020
ae29957
Remove more unnecessary usings
SubZero0 Jun 5, 2020
cb5051e
Change named Semaphores to SemaphoreSlim
SubZero0 Jun 6, 2020
2005751
Remove unused using
SubZero0 Jun 6, 2020
df4ac9c
Update branch
SubZero0 Aug 10, 2020
f812bdb
Merge branch 'dev' into gateway-ratelimit
SubZero0 Aug 10, 2020
e5c86e7
Fix merge conflicts and update to new ratelimit
SubZero0 Aug 10, 2020
39052b2
Merge branch 'dev' into gateway-ratelimit
SubZero0 Nov 17, 2020
5b65b99
Fixing merge, ignore limit for heartbeat, and dispose
SubZero0 Nov 18, 2020
04a00a7
Missed one place and better xml docs.
SubZero0 Nov 18, 2020
baaa434
Wait identify before opening the connection
SubZero0 Nov 18, 2020
42cf7ac
Only request identify ticket when needed
SubZero0 Nov 18, 2020
e61a2c7
Move identify control to sharded client
SubZero0 Nov 19, 2020
ae96b3d
Better description for IdentifyMaxConcurrency
SubZero0 Nov 19, 2020
0cca17f
Add lock to InvalidSession
SubZero0 Nov 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Discord.Net.Core/RequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class RequestOptions
internal string BucketId { get; set; }
internal bool IsClientBucket { get; set; }
internal bool IsReactionBucket { get; set; }
internal bool IsGatewayBucket { get; set; }

internal static RequestOptions CreateOrClone(RequestOptions options)
{
Expand Down
23 changes: 23 additions & 0 deletions src/Discord.Net.Rest/Entities/Gateway/GatewayLimit.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace Discord.Rest
{
/// <summary>
/// Represents the limits for a gateway request.
/// </summary>
public struct GatewayLimit
{
/// <summary>
/// Gets or sets the maximum amount of this type of request in a time window, that is set by <see cref="Seconds"/>.
/// </summary>
public int Count { get; set; }
SubZero0 marked this conversation as resolved.
Show resolved Hide resolved
/// <summary>
/// Gets or sets the amount of seconds until the rate limiter resets the remaining requests <see cref="Count"/>.
/// </summary>
public int Seconds { get; set; }

internal GatewayLimit(int count, int seconds)
{
Count = count;
Seconds = seconds;
}
}
}
32 changes: 32 additions & 0 deletions src/Discord.Net.Rest/Entities/Gateway/GatewayLimits.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace Discord.Rest
{
/// <summary>
/// Contains the rate limits for the gateway.
/// </summary>
public class GatewayLimits
{
/// <summary>
/// Gets or sets the global limits for the gateway rate limiter.
/// </summary>
/// <remarks>
/// This property includes all the other limits, like Identify.
/// </remarks>
public GatewayLimit Global { get; set; }
/// <summary>
/// Gets or sets the limits of Identify requests.
/// </summary>
public GatewayLimit Identify { get; set; }

/// <summary>
/// Initializes a new <see cref="GatewayLimits"/> with the default values.
/// </summary>
public GatewayLimits()
SubZero0 marked this conversation as resolved.
Show resolved Hide resolved
{
Global = new GatewayLimit(120, 60);
Identify = new GatewayLimit(1, 5);
}

internal static GatewayLimits GetOrCreate(GatewayLimits limits)
=> limits ?? new GatewayLimits();
}
}
62 changes: 62 additions & 0 deletions src/Discord.Net.Rest/Net/Queue/GatewayBucket.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using Discord.Rest;
using System.Collections.Immutable;

namespace Discord.Net.Queue
{
public enum GatewayBucketType
SubZero0 marked this conversation as resolved.
Show resolved Hide resolved
{
Unbucketed = 0,
Identify = 1
}
internal struct GatewayBucket
{
private static ImmutableDictionary<GatewayBucketType, GatewayBucket> DefsByType;
private static ImmutableDictionary<string, GatewayBucket> DefsById;

static GatewayBucket()
{
SetLimits(GatewayLimits.GetOrCreate(null));
}

public static GatewayBucket Get(GatewayBucketType type) => DefsByType[type];
public static GatewayBucket Get(string id) => DefsById[id];

public static void SetLimits(GatewayLimits limits)
{
limits = GatewayLimits.GetOrCreate(limits);
Preconditions.GreaterThan(limits.Global.Count, 0, nameof(limits.Global.Count), "Global count must be greater than zero.");
Preconditions.GreaterThan(limits.Global.Seconds, 0, nameof(limits.Global.Seconds), "Global seconds must be greater than zero.");
Preconditions.GreaterThan(limits.Identify.Count, 0, nameof(limits.Identify.Count), "Identify count must be greater than zero.");
Preconditions.GreaterThan(limits.Identify.Seconds, 0, nameof(limits.Identify.Seconds), "Identify seconds must be greater than zero.");

var buckets = new[]
{
new GatewayBucket(GatewayBucketType.Unbucketed, "<gateway-unbucketed>", limits.Global.Count, limits.Global.Seconds),
new GatewayBucket(GatewayBucketType.Identify, "<gateway-identify>", limits.Identify.Count, limits.Identify.Seconds)
};

var builder = ImmutableDictionary.CreateBuilder<GatewayBucketType, GatewayBucket>();
foreach (var bucket in buckets)
builder.Add(bucket.Type, bucket);
DefsByType = builder.ToImmutable();

var builder2 = ImmutableDictionary.CreateBuilder<string, GatewayBucket>();
foreach (var bucket in buckets)
builder2.Add(bucket.Id, bucket);
DefsById = builder2.ToImmutable();
}

public GatewayBucketType Type { get; }
public string Id { get; }
public int WindowCount { get; set; }
public int WindowSeconds { get; set; }

public GatewayBucket(GatewayBucketType type, string id, int count, int seconds)
{
Type = type;
Id = id;
WindowCount = count;
WindowSeconds = seconds;
}
}
}
30 changes: 26 additions & 4 deletions src/Discord.Net.Rest/Net/Queue/RequestQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,18 @@ public async Task<Stream> SendAsync(RestRequest request)
}
public async Task SendAsync(WebSocketRequest request)
{
//TODO: Re-impl websocket buckets
request.CancelToken = _requestCancelToken;
await request.SendAsync().ConfigureAwait(false);
CancellationTokenSource createdTokenSource = null;
if (request.Options.CancelToken.CanBeCanceled)
{
createdTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_requestCancelToken, request.Options.CancelToken);
request.Options.CancelToken = createdTokenSource.Token;
}
else
request.Options.CancelToken = _requestCancelToken;

var bucket = GetOrCreateBucket(request.Options.BucketId, request);
await bucket.SendAsync(request).ConfigureAwait(false);
createdTokenSource?.Dispose();
}

internal async Task EnterGlobalAsync(int id, RestRequest request)
Expand All @@ -109,8 +118,21 @@ internal void PauseGlobal(RateLimitInfo info)
{
_waitUntil = DateTimeOffset.UtcNow.AddMilliseconds(info.RetryAfter.Value + (info.Lag?.TotalMilliseconds ?? 0.0));
}
internal async Task EnterGlobalAsync(int id, WebSocketRequest request)
{
var requestBucket = GatewayBucket.Get(request.Options.BucketId);
if (requestBucket.Type == GatewayBucketType.Unbucketed)
return;

var globalBucketType = GatewayBucket.Get(GatewayBucketType.Unbucketed);
var options = RequestOptions.CreateOrClone(request.Options);
options.BucketId = globalBucketType.Id;
var globalRequest = new WebSocketRequest(null, null, false, options);
var globalBucket = GetOrCreateBucket(globalBucketType.Id, globalRequest);
await globalBucket.TriggerAsync(id, globalRequest);
}

private RequestBucket GetOrCreateBucket(string id, RestRequest request)
private RequestBucket GetOrCreateBucket(string id, IRequest request)
{
return _buckets.GetOrAdd(id, x => new RequestBucket(this, request, x));
}
Expand Down
89 changes: 84 additions & 5 deletions src/Discord.Net.Rest/Net/Queue/RequestQueueBucket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ internal class RequestBucket
public int WindowCount { get; private set; }
public DateTimeOffset LastAttemptAt { get; private set; }

public RequestBucket(RequestQueue queue, RestRequest request, string id)
public RequestBucket(RequestQueue queue, IRequest request, string id)
{
_queue = queue;
Id = id;
Expand All @@ -31,13 +31,15 @@ public RequestBucket(RequestQueue queue, RestRequest request, string id)

if (request.Options.IsClientBucket)
WindowCount = ClientBucket.Get(request.Options.BucketId).WindowCount;
else if (request.Options.IsGatewayBucket)
WindowCount = GatewayBucket.Get(request.Options.BucketId).WindowCount;
else
WindowCount = 1; //Only allow one request until we get a header back
_semaphore = WindowCount;
_resetTick = null;
LastAttemptAt = DateTimeOffset.UtcNow;
}

static int nextId = 0;
public async Task<Stream> SendAsync(RestRequest request)
{
Expand Down Expand Up @@ -149,8 +151,68 @@ public async Task<Stream> SendAsync(RestRequest request)
}
}
}
public async Task SendAsync(WebSocketRequest request)
{
int id = Interlocked.Increment(ref nextId);
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Start");
#endif
LastAttemptAt = DateTimeOffset.UtcNow;
while (true)
{
await _queue.EnterGlobalAsync(id, request).ConfigureAwait(false);
await EnterAsync(id, request).ConfigureAwait(false);

#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Sending...");
#endif
try
{
await request.SendAsync().ConfigureAwait(false);
return;
}
catch (TimeoutException)
{
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Timeout");
#endif
if ((request.Options.RetryMode & RetryMode.RetryTimeouts) == 0)
throw;

await Task.Delay(500).ConfigureAwait(false);
continue; //Retry
}
/*catch (Exception)
{
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Error");
#endif
if ((request.Options.RetryMode & RetryMode.RetryErrors) == 0)
throw;

await Task.Delay(500);
continue; //Retry
}*/
finally
{
UpdateRateLimit(id, request, default(RateLimitInfo), false);
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Stop");
#endif
}
}
}

private async Task EnterAsync(int id, RestRequest request)
internal async Task TriggerAsync(int id, IRequest request)
{
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Trigger Bucket");
#endif
await EnterAsync(id, request).ConfigureAwait(false);
UpdateRateLimit(id, request, default(RateLimitInfo), false);
}

private async Task EnterAsync(int id, IRequest request)
{
int windowCount;
DateTimeOffset? resetAt;
Expand Down Expand Up @@ -213,7 +275,7 @@ private async Task EnterAsync(int id, RestRequest request)
}
}

private void UpdateRateLimit(int id, RestRequest request, RateLimitInfo info, bool is429)
private void UpdateRateLimit(int id, IRequest request, RateLimitInfo info, bool is429)
{
if (WindowCount == 0)
return;
Expand Down Expand Up @@ -273,6 +335,23 @@ private void UpdateRateLimit(int id, RestRequest request, RateLimitInfo info, bo
Debug.WriteLine($"[{id}] Client Bucket ({ClientBucket.Get(request.Options.BucketId).WindowSeconds * 1000} ms)");
#endif
}
else if (request.Options.IsGatewayBucket && request.Options.BucketId != null)
{
resetTick = DateTimeOffset.UtcNow.AddSeconds(GatewayBucket.Get(request.Options.BucketId).WindowSeconds);
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Gateway Bucket ({GatewayBucket.Get(request.Options.BucketId).WindowSeconds * 1000} ms)");
#endif
if (!hasQueuedReset)
{
_resetTick = resetTick;
LastAttemptAt = resetTick.Value;
#if DEBUG_LIMITS
Debug.WriteLine($"[{id}] Reset in {(int)Math.Ceiling((resetTick - DateTimeOffset.UtcNow).Value.TotalMilliseconds)} ms");
#endif
var _ = QueueReset(id, (int)Math.Ceiling((_resetTick.Value - DateTimeOffset.UtcNow).TotalMilliseconds));
}
return;
}

if (resetTick == null)
{
Expand Down Expand Up @@ -320,7 +399,7 @@ private async Task QueueReset(int id, int millis)
}
}

private void ThrowRetryLimit(RestRequest request)
private void ThrowRetryLimit(IRequest request)
{
if ((request.Options.RetryMode & RetryMode.RetryRatelimit) == 0)
throw new RateLimitedException(request);
Expand Down
4 changes: 1 addition & 3 deletions src/Discord.Net.Rest/Net/Queue/Requests/WebSocketRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,18 @@ namespace Discord.Net.Queue
public class WebSocketRequest : IRequest
{
public IWebSocketClient Client { get; }
public string BucketId { get; }
public byte[] Data { get; }
public bool IsText { get; }
public DateTimeOffset? TimeoutAt { get; }
public TaskCompletionSource<Stream> Promise { get; }
public RequestOptions Options { get; }
public CancellationToken CancelToken { get; internal set; }

public WebSocketRequest(IWebSocketClient client, string bucketId, byte[] data, bool isText, RequestOptions options)
public WebSocketRequest(IWebSocketClient client, byte[] data, bool isText, RequestOptions options)
{
Preconditions.NotNull(options, nameof(options));

Client = client;
BucketId = bucketId;
Data = data;
IsText = isText;
Options = options;
Expand Down
2 changes: 1 addition & 1 deletion src/Discord.Net.WebSocket/BaseSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public abstract partial class BaseSocketClient : BaseDiscordClient, IDiscordClie
internal BaseSocketClient(DiscordSocketConfig config, DiscordRestApiClient client)
: base(config, client) => BaseConfig = config;
private static DiscordSocketApiClient CreateApiClient(DiscordSocketConfig config)
=> new DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent,
=> new DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, config.WebsocketRequestQueue,
rateLimitPrecision: config.RateLimitPrecision,
useSystemClock: config.UseSystemClock);

Expand Down
12 changes: 11 additions & 1 deletion src/Discord.Net.WebSocket/DiscordShardedClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Linq;
using System.Threading.Tasks;
using System.Threading;
using Discord.Net.Queue;

namespace Discord.WebSocket
{
Expand Down Expand Up @@ -66,6 +67,7 @@ private DiscordShardedClient(int[] ids, DiscordSocketConfig config, API.DiscordS
config.DisplayInitialLog = false;
_baseConfig = config;
_connectionGroupLock = new SemaphoreSlim(1, 1);
GatewayBucket.SetLimits(GatewayLimits.GetOrCreate(config.GatewayLimits));

if (config.TotalShards == null)
_automaticShards = true;
Expand All @@ -83,9 +85,17 @@ private DiscordShardedClient(int[] ids, DiscordSocketConfig config, API.DiscordS
RegisterEvents(_shards[i], i == 0);
}
}

ApiClient.WebSocketRequestQueue.RateLimitTriggered += async (id, info) =>
{
if (info == null)
await _restLogger.VerboseAsync($"Preemptive Rate limit triggered: {id ?? "null"}").ConfigureAwait(false);
else
await _restLogger.WarningAsync($"Rate limit triggered: {id ?? "null"}").ConfigureAwait(false);
};
}
private static API.DiscordSocketApiClient CreateApiClient(DiscordSocketConfig config)
=> new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent,
=> new API.DiscordSocketApiClient(config.RestClientProvider, config.WebSocketProvider, DiscordRestConfig.UserAgent, config.WebsocketRequestQueue,
rateLimitPrecision: config.RateLimitPrecision);

internal override async Task OnLoginAsync(TokenType tokenType, string token)
Expand Down
Loading