From ea4fc81863349cfdb0b0d886fb467444ac7bbce6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20Spie=C3=9F?= Date: Tue, 14 Jan 2020 10:25:11 +0100 Subject: [PATCH] Improve ratelimit bucket handling (#1103) --- .../java/net/dv8tion/jda/api/JDABuilder.java | 2 +- .../api/exceptions/RateLimitedException.java | 2 +- .../net/dv8tion/jda/internal/JDAImpl.java | 1 + .../jda/internal/requests/RateLimiter.java | 2 + .../jda/internal/requests/Requester.java | 213 +++--- .../dv8tion/jda/internal/requests/Route.java | 375 +++++------ .../requests/ratelimit/BotRateLimiter.java | 615 +++++++++--------- .../requests/ratelimit/ClientRateLimiter.java | 8 +- .../internal/requests/ratelimit/IBucket.java | 1 - 9 files changed, 579 insertions(+), 640 deletions(-) diff --git a/src/main/java/net/dv8tion/jda/api/JDABuilder.java b/src/main/java/net/dv8tion/jda/api/JDABuilder.java index d08d535989..735fbd6ffb 100644 --- a/src/main/java/net/dv8tion/jda/api/JDABuilder.java +++ b/src/main/java/net/dv8tion/jda/api/JDABuilder.java @@ -156,7 +156,7 @@ public JDABuilder setRawEventsEnabled(boolean enable) /** * Whether the rate-limit should be relative to the current time plus latency. - *
By default we use the {@code X-RateLimit-Rest-After} header to determine when + *
By default we use the {@code X-RateLimit-Reset-After} header to determine when * a rate-limit is no longer imminent. This has the disadvantage that it might wait longer than needed due * to the latency which is ignored by the reset-after relative delay. * diff --git a/src/main/java/net/dv8tion/jda/api/exceptions/RateLimitedException.java b/src/main/java/net/dv8tion/jda/api/exceptions/RateLimitedException.java index b9937e7677..c4ebaefd07 100644 --- a/src/main/java/net/dv8tion/jda/api/exceptions/RateLimitedException.java +++ b/src/main/java/net/dv8tion/jda/api/exceptions/RateLimitedException.java @@ -28,7 +28,7 @@ public class RateLimitedException extends Exception public RateLimitedException(Route.CompiledRoute route, long retryAfter) { - this(route.getRatelimitRoute(), retryAfter); + this(route.getBaseRoute().getRoute() + ":" + route.getMajorParameters(), retryAfter); } public RateLimitedException(String route, long retryAfter) diff --git a/src/main/java/net/dv8tion/jda/internal/JDAImpl.java b/src/main/java/net/dv8tion/jda/internal/JDAImpl.java index 7b254806d5..32b9e8fe1d 100644 --- a/src/main/java/net/dv8tion/jda/internal/JDAImpl.java +++ b/src/main/java/net/dv8tion/jda/internal/JDAImpl.java @@ -228,6 +228,7 @@ public int login(String gatewayUrl, ShardInfo shardInfo, Compression compression { this.shardInfo = shardInfo; threadConfig.init(this::getIdentifierString); + requester.getRateLimiter().init(); this.gatewayUrl = gatewayUrl == null ? getGateway() : gatewayUrl; Checks.notNull(this.gatewayUrl, "Gateway URL"); diff --git a/src/main/java/net/dv8tion/jda/internal/requests/RateLimiter.java b/src/main/java/net/dv8tion/jda/internal/requests/RateLimiter.java index c9a2fc94c9..58f1a7e886 100644 --- a/src/main/java/net/dv8tion/jda/internal/requests/RateLimiter.java +++ b/src/main/java/net/dv8tion/jda/internal/requests/RateLimiter.java @@ -96,6 +96,8 @@ public List getQueuedRouteBuckets() } } + public void init() {} + protected void shutdown() { isShutdown = true; diff --git a/src/main/java/net/dv8tion/jda/internal/requests/Requester.java b/src/main/java/net/dv8tion/jda/internal/requests/Requester.java index e0e4d1c617..63e93265b3 100644 --- a/src/main/java/net/dv8tion/jda/internal/requests/Requester.java +++ b/src/main/java/net/dv8tion/jda/internal/requests/Requester.java @@ -37,10 +37,10 @@ import javax.net.ssl.SSLPeerUnverifiedException; import java.net.SocketException; import java.net.SocketTimeoutException; -import java.util.*; +import java.util.Collections; +import java.util.LinkedHashSet; import java.util.Map.Entry; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; +import java.util.Set; import java.util.concurrent.ConcurrentMap; public class Requester @@ -121,82 +121,7 @@ private static boolean isRetry(Throwable e) || e instanceof SSLPeerUnverifiedException; // SSL Certificate was wrong } - private void attemptRequest(CompletableFuture task, okhttp3.Request request, - List responses, Set rays, - Request apiRequest, String url, - boolean handleOnRatelimit, boolean timeout, int attempt) - { - Route.CompiledRoute route = apiRequest.getRoute(); - okhttp3.Response lastResponse = responses.isEmpty() ? null : responses.get(responses.size() - 1); - // If the request has been canceled via the Future, don't execute. - if (apiRequest.isCanceled()) - { - apiRequest.onFailure(new CancellationException("RestAction has been cancelled")); - task.complete(null); - return; - } - - if (attempt >= 4) - { - //Epic failure from other end. Attempted 4 times. - Response response = new Response(Objects.requireNonNull(lastResponse), -1, rays); - apiRequest.handleResponse(response); - task.complete(null); - return; - } - Call call = httpClient.newCall(request); - call.enqueue(FunctionalCallback.onFailure((c, e) -> { - if (isRetry(e)) - { - if (retryOnTimeout && !timeout) - { - // Retry once on timeout - attemptRequest(task, request, responses, rays, apiRequest, url, true, true, attempt + 1); - } - else - { - // Second attempt failed or we don't want to retry - LOG.error("Requester timed out while executing a request", e); - apiRequest.handleResponse(new Response(null, e, rays)); - task.complete(null); - } - return; - } - // Unexpected error, failed request - LOG.error("There was an exception while executing a REST request", e); //This originally only printed on DEBUG in 2.x - apiRequest.handleResponse(new Response(null, e, rays)); - task.complete(null); - }) - .onSuccess((c, response) -> { - responses.add(response); - String cfRay = response.header("CF-RAY"); - if (cfRay != null) - rays.add(cfRay); - - if (response.code() >= 500) - { - LOG.debug("Requesting {} -> {} returned status {}... retrying (attempt {})", - route.getMethod(), url, response.code(), attempt); - attemptRequest(task, request, responses, rays, apiRequest, url, true, timeout, attempt + 1); - return; - } - - Long retryAfter = rateLimiter.handleResponse(route, response); - if (!rays.isEmpty()) - LOG.debug("Received response with following cf-rays: {}", rays); - - LOG.trace("Finished Request {} {} with code {}", route.getMethod(), response.request().url(), response.code()); - - if (retryAfter == null) - apiRequest.handleResponse(new Response(response, -1, rays)); - else if (handleOnRatelimit) - apiRequest.handleResponse(new Response(response, retryAfter, rays)); - - task.complete(retryAfter); - }).build()); - } - - public CompletableFuture execute(Request apiRequest) + public Long execute(Request apiRequest) { return execute(apiRequest, false); } @@ -206,44 +131,142 @@ public CompletableFuture execute(Request apiRequest) * * @param apiRequest * The API request that needs to be sent - * @param handleOnRatelimit + * @param handleOnRateLimit * Whether to forward rate-limits, false if rate limit handling should take over * * @return Non-null if the request was ratelimited. Returns a Long containing retry_after milliseconds until * the request can be made again. This could either be for the Per-Route ratelimit or the Global ratelimit. *
Check if globalCooldown is {@code null} to determine if it was Per-Route or Global. */ - public CompletableFuture execute(Request apiRequest, boolean handleOnRatelimit) + public Long execute(Request apiRequest, boolean handleOnRateLimit) + { + return execute(apiRequest, false, handleOnRateLimit); + } + + public Long execute(Request apiRequest, boolean retried, boolean handleOnRatelimit) { Route.CompiledRoute route = apiRequest.getRoute(); Long retryAfter = rateLimiter.getRateLimit(route); - if (retryAfter != null) + if (retryAfter != null && retryAfter > 0) { if (handleOnRatelimit) apiRequest.handleResponse(new Response(retryAfter, Collections.emptySet())); - return CompletableFuture.completedFuture(retryAfter); + return retryAfter; } - // Build the request okhttp3.Request.Builder builder = new okhttp3.Request.Builder(); + String url = DISCORD_API_PREFIX + route.getCompiledRoute(); builder.url(url); - applyBody(apiRequest, builder); - applyHeaders(apiRequest, builder, url.startsWith(DISCORD_API_PREFIX)); + + String method = apiRequest.getRoute().getMethod().toString(); + RequestBody body = apiRequest.getBody(); + + if (body == null && HttpMethod.requiresRequestBody(method)) + body = EMPTY_BODY; + + builder.method(method, body) + .header("X-RateLimit-Precision", "millisecond") + .header("user-agent", USER_AGENT) + .header("accept-encoding", "gzip"); + + //adding token to all requests to the discord api or cdn pages + //we can check for startsWith(DISCORD_API_PREFIX) because the cdn endpoints don't need any kind of authorization + if (url.startsWith(DISCORD_API_PREFIX)) + builder.header("authorization", api.getToken()); + + // Apply custom headers like X-Audit-Log-Reason + // If customHeaders is null this does nothing + if (apiRequest.getHeaders() != null) + { + for (Entry header : apiRequest.getHeaders().entrySet()) + builder.addHeader(header.getKey(), header.getValue()); + } + okhttp3.Request request = builder.build(); - // Setup response handling Set rays = new LinkedHashSet<>(); - List responses = new ArrayList<>(4); - CompletableFuture task = new CompletableFuture<>(); - task.whenComplete((i1, i2) -> { + okhttp3.Response[] responses = new okhttp3.Response[4]; + // we have an array of all responses to later close them all at once + //the response below this comment is used as the first successful response from the server + okhttp3.Response lastResponse = null; + try + { + LOG.trace("Executing request {} {}", apiRequest.getRoute().getMethod(), url); + int attempt = 0; + do + { + //If the request has been canceled via the Future, don't execute. + //if (apiRequest.isCanceled()) + // return null; + Call call = httpClient.newCall(request); + lastResponse = call.execute(); + responses[attempt] = lastResponse; + String cfRay = lastResponse.header("CF-RAY"); + if (cfRay != null) + rays.add(cfRay); + + if (lastResponse.code() < 500) + break; // break loop, got a successful response! + + attempt++; + LOG.debug("Requesting {} -> {} returned status {}... retrying (attempt {})", + apiRequest.getRoute().getMethod(), + url, lastResponse.code(), attempt); + try + { + Thread.sleep(50 * attempt); + } + catch (InterruptedException ignored) {} + } + while (attempt < 3 && lastResponse.code() >= 500); + + LOG.trace("Finished Request {} {} with code {}", route.getMethod(), lastResponse.request().url(), lastResponse.code()); + + if (lastResponse.code() >= 500) + { + //Epic failure from other end. Attempted 4 times. + Response response = new Response(lastResponse, -1, rays); + apiRequest.handleResponse(response); + return null; + } + + retryAfter = rateLimiter.handleResponse(route, lastResponse); + if (!rays.isEmpty()) + LOG.debug("Received response with following cf-rays: {}", rays); + + if (retryAfter == null) + apiRequest.handleResponse(new Response(lastResponse, -1, rays)); + else if (handleOnRatelimit) + apiRequest.handleResponse(new Response(lastResponse, retryAfter, rays)); + + return retryAfter; + } + catch (SocketTimeoutException e) + { + if (retryOnTimeout && !retried) + return execute(apiRequest, true, handleOnRatelimit); + LOG.error("Requester timed out while executing a request", e); + apiRequest.handleResponse(new Response(lastResponse, e, rays)); + return null; + } + catch (Exception e) + { + if (retryOnTimeout && !retried && isRetry(e)) + return execute(apiRequest, true, handleOnRatelimit); + LOG.error("There was an exception while executing a REST request", e); //This originally only printed on DEBUG in 2.x + apiRequest.handleResponse(new Response(lastResponse, e, rays)); + return null; + } + finally + { for (okhttp3.Response r : responses) + { + if (r == null) + break; r.close(); - }); - LOG.trace("Executing request {} {}", apiRequest.getRoute().getMethod(), url); - // Initialize state-machine - attemptRequest(task, request, responses, rays, apiRequest, url, handleOnRatelimit, false, 0); - return task; + } + } } private void applyBody(Request apiRequest, okhttp3.Request.Builder builder) diff --git a/src/main/java/net/dv8tion/jda/internal/requests/Route.java b/src/main/java/net/dv8tion/jda/internal/requests/Route.java index ad201fddfc..6d7368fdf5 100644 --- a/src/main/java/net/dv8tion/jda/internal/requests/Route.java +++ b/src/main/java/net/dv8tion/jda/internal/requests/Route.java @@ -21,10 +21,6 @@ import javax.annotation.CheckReturnValue; import javax.annotation.Nonnull; -import java.util.ArrayList; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static net.dv8tion.jda.internal.requests.Method.*; @@ -33,10 +29,10 @@ public class Route { public static class Misc { - public static final Route TRACK = new Route(POST, true, "track"); - public static final Route GET_VOICE_REGIONS = new Route(GET, true, "voice/regions"); - public static final Route GATEWAY = new Route(GET, true, "gateway"); - public static final Route GATEWAY_BOT = new Route(GET, true, "gateway/bot"); + public static final Route TRACK = new Route(POST, "track"); + public static final Route GET_VOICE_REGIONS = new Route(GET, "voice/regions"); + public static final Route GATEWAY = new Route(GET, "gateway"); + public static final Route GATEWAY_BOT = new Route(GET, "gateway/bot"); } public static class Applications @@ -63,12 +59,12 @@ public static class Applications public static class Self { - public static final Route GET_SELF = new Route(GET, true, "users/@me"); - public static final Route MODIFY_SELF = new Route(PATCH, "users/@me"); - public static final Route GET_GUILDS = new Route(GET, "users/@me/guilds"); - public static final Route LEAVE_GUILD = new Route(DELETE, "users/@me/guilds/{guild_id}"); - public static final Route GET_PRIVATE_CHANNELS = new Route(GET, "users/@me/channels"); - public static final Route CREATE_PRIVATE_CHANNEL = new Route(POST, "users/@me/channels"); + public static final Route GET_SELF = new Route(GET, "users/@me"); + public static final Route MODIFY_SELF = new Route(PATCH, "users/@me"); + public static final Route GET_GUILDS = new Route(GET, "users/@me/guilds"); + public static final Route LEAVE_GUILD = new Route(DELETE, "users/@me/guilds/{guild_id}"); + public static final Route GET_PRIVATE_CHANNELS = new Route(GET, "users/@me/channels"); + public static final Route CREATE_PRIVATE_CHANNEL = new Route(POST, "users/@me/channels"); // Client only public static final Route USER_SETTINGS = new Route(GET, "users/@me/settings"); @@ -95,39 +91,39 @@ public static class Relationships public static class Guilds { - public static final Route GET_GUILD = new Route(GET, "guilds/{guild_id}", "guild_id"); - public static final Route MODIFY_GUILD = new Route(PATCH, "guilds/{guild_id}", "guild_id"); - public static final Route GET_VANITY_URL = new Route(GET, "guilds/{guild_id}/vanity-url", "guild_id"); - public static final Route CREATE_CHANNEL = new Route(POST, "guilds/{guild_id}/channels", "guild_id"); - public static final Route GET_CHANNELS = new Route(GET, "guilds/{guild_id}/channels", "guild_id"); - public static final Route MODIFY_CHANNELS = new Route(PATCH, "guilds/{guild_id}/channels", "guild_id"); - public static final Route MODIFY_ROLES = new Route(PATCH, "guilds/{guild_id}/roles", "guild_id"); - public static final Route GET_BANS = new Route(GET, "guilds/{guild_id}/bans", "guild_id"); - public static final Route GET_BAN = new Route(GET, "guilds/{guild_id}/bans/{user_id}", "guild_id"); - public static final Route UNBAN = new Route(DELETE, "guilds/{guild_id}/bans/{user_id}", "guild_id"); - public static final Route BAN = new Route(PUT, "guilds/{guild_id}/bans/{user_id}", "guild_id"); - public static final Route KICK_MEMBER = new Route(DELETE, "guilds/{guild_id}/members/{user_id}", "guild_id"); - public static final Route MODIFY_MEMBER = new Route(PATCH, "guilds/{guild_id}/members/{user_id}", "guild_id"); - public static final Route ADD_MEMBER = new Route(PUT, "guilds/{guild_id}/members/{user_id}", "guild_id"); - public static final Route GET_MEMBER = new Route(GET, "guilds/{guild_id}/members/{user_id}", "guild_id"); - public static final Route MODIFY_SELF_NICK = new Route(PATCH, "guilds/{guild_id}/members/@me/nick", "guild_id"); - public static final Route PRUNABLE_COUNT = new Route(GET, "guilds/{guild_id}/prune", "guild_id"); - public static final Route PRUNE_MEMBERS = new Route(POST, "guilds/{guild_id}/prune", "guild_id"); - public static final Route GET_WEBHOOKS = new Route(GET, "guilds/{guild_id}/webhooks", "guild_id"); - public static final Route GET_GUILD_EMBED = new Route(GET, "guilds/{guild_id}/embed", "guild_id"); - public static final Route MODIFY_GUILD_EMBED = new Route(PATCH, "guilds/{guild_id}/embed", "guild_id"); - public static final Route GET_GUILD_EMOTES = new Route(GET, "guilds/{guild_id}/emojis", "guild_id"); - public static final Route GET_AUDIT_LOGS = new Route(GET, true, "guilds/{guild_id}/audit-logs", "guild_id"); - public static final Route GET_VOICE_REGIONS = new Route(GET, true, "guilds/{guild_id}/regions", "guild_id"); - - public static final Route GET_INTEGRATIONS = new Route(GET, "guilds/{guild_id}/integrations", "guild_id"); - public static final Route CREATE_INTEGRATION = new Route(POST, "guilds/{guild_id}/integrations", "guild_id"); - public static final Route DELETE_INTEGRATION = new Route(DELETE, "guilds/{guild_id}/integrations/{integration_id}", "guild_id"); - public static final Route MODIFY_INTEGRATION = new Route(PATCH, "guilds/{guild_id}/integrations/{integration_id}", "guild_id"); - public static final Route SYNC_INTEGRATION = new Route(POST, "guilds/{guild_id}/integrations/{integration_id}/sync", "guild_id"); - - public static final Route ADD_MEMBER_ROLE = new Route(PUT, "guilds/{guild_id}/members/{user_id}/roles/{role_id}", "guild_id"); - public static final Route REMOVE_MEMBER_ROLE = new Route(DELETE, "guilds/{guild_id}/members/{user_id}/roles/{role_id}", "guild_id"); + public static final Route GET_GUILD = new Route(GET, "guilds/{guild_id}"); + public static final Route MODIFY_GUILD = new Route(PATCH, "guilds/{guild_id}"); + public static final Route GET_VANITY_URL = new Route(GET, "guilds/{guild_id}/vanity-url"); + public static final Route CREATE_CHANNEL = new Route(POST, "guilds/{guild_id}/channels"); + public static final Route GET_CHANNELS = new Route(GET, "guilds/{guild_id}/channels"); + public static final Route MODIFY_CHANNELS = new Route(PATCH, "guilds/{guild_id}/channels"); + public static final Route MODIFY_ROLES = new Route(PATCH, "guilds/{guild_id}/roles"); + public static final Route GET_BANS = new Route(GET, "guilds/{guild_id}/bans"); + public static final Route GET_BAN = new Route(GET, "guilds/{guild_id}/bans/{user_id}"); + public static final Route UNBAN = new Route(DELETE, "guilds/{guild_id}/bans/{user_id}"); + public static final Route BAN = new Route(PUT, "guilds/{guild_id}/bans/{user_id}"); + public static final Route KICK_MEMBER = new Route(DELETE, "guilds/{guild_id}/members/{user_id}"); + public static final Route MODIFY_MEMBER = new Route(PATCH, "guilds/{guild_id}/members/{user_id}"); + public static final Route ADD_MEMBER = new Route(PUT, "guilds/{guild_id}/members/{user_id}"); + public static final Route GET_MEMBER = new Route(GET, "guilds/{guild_id}/members/{user_id}"); + public static final Route MODIFY_SELF_NICK = new Route(PATCH, "guilds/{guild_id}/members/@me/nick"); + public static final Route PRUNABLE_COUNT = new Route(GET, "guilds/{guild_id}/prune"); + public static final Route PRUNE_MEMBERS = new Route(POST, "guilds/{guild_id}/prune"); + public static final Route GET_WEBHOOKS = new Route(GET, "guilds/{guild_id}/webhooks"); + public static final Route GET_GUILD_EMBED = new Route(GET, "guilds/{guild_id}/embed"); + public static final Route MODIFY_GUILD_EMBED = new Route(PATCH, "guilds/{guild_id}/embed"); + public static final Route GET_GUILD_EMOTES = new Route(GET, "guilds/{guild_id}/emojis"); + public static final Route GET_AUDIT_LOGS = new Route(GET, "guilds/{guild_id}/audit-logs"); + public static final Route GET_VOICE_REGIONS = new Route(GET, "guilds/{guild_id}/regions"); + + public static final Route GET_INTEGRATIONS = new Route(GET, "guilds/{guild_id}/integrations"); + public static final Route CREATE_INTEGRATION = new Route(POST, "guilds/{guild_id}/integrations"); + public static final Route DELETE_INTEGRATION = new Route(DELETE, "guilds/{guild_id}/integrations/{integration_id}"); + public static final Route MODIFY_INTEGRATION = new Route(PATCH, "guilds/{guild_id}/integrations/{integration_id}"); + public static final Route SYNC_INTEGRATION = new Route(POST, "guilds/{guild_id}/integrations/{integration_id}/sync"); + + public static final Route ADD_MEMBER_ROLE = new Route(PUT, "guilds/{guild_id}/members/{user_id}/roles/{role_id}"); + public static final Route REMOVE_MEMBER_ROLE = new Route(DELETE, "guilds/{guild_id}/members/{user_id}/roles/{role_id}"); //Client Only @@ -141,51 +137,51 @@ public static class Guilds public static class Emotes { // These are all client endpoints and thus don't need defined major parameters - public static final Route MODIFY_EMOTE = new Route(PATCH, true, "guilds/{guild_id}/emojis/{emote_id}", "guild_id"); - public static final Route DELETE_EMOTE = new Route(DELETE, true, "guilds/{guild_id}/emojis/{emote_id}", "guild_id"); - public static final Route CREATE_EMOTE = new Route(POST, true, "guilds/{guild_id}/emojis", "guild_id"); + public static final Route MODIFY_EMOTE = new Route(PATCH, "guilds/{guild_id}/emojis/{emote_id}"); + public static final Route DELETE_EMOTE = new Route(DELETE, "guilds/{guild_id}/emojis/{emote_id}"); + public static final Route CREATE_EMOTE = new Route(POST, "guilds/{guild_id}/emojis"); - public static final Route GET_EMOTES = new Route(GET, true, "guilds/{guild_id}/emojis", "guild_id"); - public static final Route GET_EMOTE = new Route(GET, true, "guilds/{guild_id}/emojis/{emoji_id}", "guild_id"); + public static final Route GET_EMOTES = new Route(GET, "guilds/{guild_id}/emojis"); + public static final Route GET_EMOTE = new Route(GET, "guilds/{guild_id}/emojis/{emoji_id}"); } public static class Webhooks { - public static final Route GET_WEBHOOK = new Route(GET, true, "webhooks/{webhook_id}"); - public static final Route GET_TOKEN_WEBHOOK = new Route(GET, true, "webhooks/{webhook_id}/{token}"); - public static final Route DELETE_WEBHOOK = new Route(DELETE, true, "webhooks/{webhook_id}"); - public static final Route DELETE_TOKEN_WEBHOOK = new Route(DELETE, true, "webhooks/{webhook_id}/{token}"); - public static final Route MODIFY_WEBHOOK = new Route(PATCH, true, "webhooks/{webhook_id}"); - public static final Route MODIFY_TOKEN_WEBHOOK = new Route(PATCH, true, "webhooks/{webhook_id}/{token}"); + public static final Route GET_WEBHOOK = new Route(GET, "webhooks/{webhook_id}"); + public static final Route GET_TOKEN_WEBHOOK = new Route(GET, "webhooks/{webhook_id}/{token}"); + public static final Route DELETE_WEBHOOK = new Route(DELETE, "webhooks/{webhook_id}"); + public static final Route DELETE_TOKEN_WEBHOOK = new Route(DELETE, "webhooks/{webhook_id}/{token}"); + public static final Route MODIFY_WEBHOOK = new Route(PATCH, "webhooks/{webhook_id}"); + public static final Route MODIFY_TOKEN_WEBHOOK = new Route(PATCH, "webhooks/{webhook_id}/{token}"); // Separate - public static final Route EXECUTE_WEBHOOK = new Route(POST, "webhooks/{webhook_id}/{token}", "webhook_id"); - public static final Route EXECUTE_WEBHOOK_SLACK = new Route(POST, "webhooks/{webhook_id}/{token}/slack", "webhook_id"); - public static final Route EXECUTE_WEBHOOK_GITHUB = new Route(POST, "webhooks/{webhook_id}/{token}/github", "webhook_id"); + public static final Route EXECUTE_WEBHOOK = new Route(POST, "webhooks/{webhook_id}/{token}"); + public static final Route EXECUTE_WEBHOOK_SLACK = new Route(POST, "webhooks/{webhook_id}/{token}/slack"); + public static final Route EXECUTE_WEBHOOK_GITHUB = new Route(POST, "webhooks/{webhook_id}/{token}/github"); } public static class Roles { - public static final Route GET_ROLES = new Route(GET, "guilds/{guild_id}/roles", "guild_id"); - public static final Route CREATE_ROLE = new Route(POST, "guilds/{guild_id}/roles", "guild_id"); - public static final Route GET_ROLE = new Route(GET, "guilds/{guild_id}/roles/{role_id}", "guild_id"); - public static final Route MODIFY_ROLE = new Route(PATCH, "guilds/{guild_id}/roles/{role_id}", "guild_id"); - public static final Route DELETE_ROLE = new Route(DELETE, "guilds/{guild_id}/roles/{role_id}", "guild_id"); + public static final Route GET_ROLES = new Route(GET, "guilds/{guild_id}/roles"); + public static final Route CREATE_ROLE = new Route(POST, "guilds/{guild_id}/roles"); + public static final Route GET_ROLE = new Route(GET, "guilds/{guild_id}/roles/{role_id}"); + public static final Route MODIFY_ROLE = new Route(PATCH, "guilds/{guild_id}/roles/{role_id}"); + public static final Route DELETE_ROLE = new Route(DELETE, "guilds/{guild_id}/roles/{role_id}"); } public static class Channels { - public static final Route DELETE_CHANNEL = new Route(DELETE, true, "channels/{channel_id}", "channel_id"); - public static final Route MODIFY_CHANNEL = new Route(PATCH, true, "channels/{channel_id}", "channel_id"); - public static final Route GET_WEBHOOKS = new Route(GET, true, "channels/{channel_id}/webhooks", "channel_id"); - public static final Route CREATE_WEBHOOK = new Route(POST, true, "channels/{channel_id}/webhooks", "channel_id"); - public static final Route CREATE_PERM_OVERRIDE = new Route(PUT, true, "channels/{channel_id}/permissions/{permoverride_id}", "channel_id"); - public static final Route MODIFY_PERM_OVERRIDE = new Route(PUT, true, "channels/{channel_id}/permissions/{permoverride_id}", "channel_id"); - public static final Route DELETE_PERM_OVERRIDE = new Route(DELETE, true, "channels/{channel_id}/permissions/{permoverride_id}", "channel_id"); - - public static final Route SEND_TYPING = new Route(POST, "channels/{channel_id}/typing", "channel_id"); - public static final Route GET_PERMISSIONS = new Route(GET, "channels/{channel_id}/permissions", "channel_id"); - public static final Route GET_PERM_OVERRIDE = new Route(GET, "channels/{channel_id}/permissions/{permoverride_id}", "channel_id"); + public static final Route DELETE_CHANNEL = new Route(DELETE, "channels/{channel_id}"); + public static final Route MODIFY_CHANNEL = new Route(PATCH, "channels/{channel_id}"); + public static final Route GET_WEBHOOKS = new Route(GET, "channels/{channel_id}/webhooks"); + public static final Route CREATE_WEBHOOK = new Route(POST, "channels/{channel_id}/webhooks"); + public static final Route CREATE_PERM_OVERRIDE = new Route(PUT, "channels/{channel_id}/permissions/{permoverride_id}"); + public static final Route MODIFY_PERM_OVERRIDE = new Route(PUT, "channels/{channel_id}/permissions/{permoverride_id}"); + public static final Route DELETE_PERM_OVERRIDE = new Route(DELETE, "channels/{channel_id}/permissions/{permoverride_id}"); + + public static final Route SEND_TYPING = new Route(POST, "channels/{channel_id}/typing"); + public static final Route GET_PERMISSIONS = new Route(GET, "channels/{channel_id}/permissions"); + public static final Route GET_PERM_OVERRIDE = new Route(GET, "channels/{channel_id}/permissions/{permoverride_id}"); // Client Only public static final Route GET_RECIPIENTS = new Route(GET, "channels/{channel_id}/recipients"); @@ -198,23 +194,23 @@ public static class Channels public static class Messages { - public static final Route SEND_MESSAGE = new Route(POST, "channels/{channel_id}/messages", "channel_id"); - public static final Route EDIT_MESSAGE = new Route(PATCH, "channels/{channel_id}/messages/{message_id}", "channel_id"); - public static final Route GET_PINNED_MESSAGES = new Route(GET, "channels/{channel_id}/pins", "channel_id"); - public static final Route ADD_PINNED_MESSAGE = new Route(PUT, "channels/{channel_id}/pins/{message_id}", "channel_id"); - public static final Route REMOVE_PINNED_MESSAGE = new Route(DELETE, "channels/{channel_id}/pins/{message_id}", "channel_id"); + public static final Route EDIT_MESSAGE = new Route(PATCH, "channels/{channel_id}/messages/{message_id}"); // requires special handling, same bucket but different endpoints + public static final Route SEND_MESSAGE = new Route(POST, "channels/{channel_id}/messages"); + public static final Route GET_PINNED_MESSAGES = new Route(GET, "channels/{channel_id}/pins"); + public static final Route ADD_PINNED_MESSAGE = new Route(PUT, "channels/{channel_id}/pins/{message_id}"); + public static final Route REMOVE_PINNED_MESSAGE = new Route(DELETE, "channels/{channel_id}/pins/{message_id}"); - public static final Route ADD_REACTION = new ReactionRoute(PUT); - public static final Route REMOVE_REACTION = new ReactionRoute(DELETE); - public static final Route REMOVE_ALL_REACTIONS = new Route(DELETE, "channels/{channel_id}/messages/{message_id}/reactions", "channel_id"); - public static final Route GET_REACTION_USERS = new Route(GET, "channels/{channel_id}/messages/{message_id}/reactions/{reaction_code}", "channel_id"); + public static final Route ADD_REACTION = new Route(PUT, "channels/{channel_id}/messages/{message_id}/reactions/{reaction_code}/{user_id}"); + public static final Route REMOVE_REACTION = new Route(DELETE, "channels/{channel_id}/messages/{message_id}/reactions/{reaction_code}/{user_id}"); + public static final Route REMOVE_ALL_REACTIONS = new Route(DELETE, "channels/{channel_id}/messages/{message_id}/reactions"); + public static final Route GET_REACTION_USERS = new Route(GET, "channels/{channel_id}/messages/{message_id}/reactions/{reaction_code}"); - public static final Route DELETE_MESSAGE = new DeleteMessageRoute(); - public static final Route GET_MESSAGE_HISTORY = new Route(GET, true, "channels/{channel_id}/messages", "channel_id"); + public static final Route DELETE_MESSAGE = new Route(DELETE, "channels/{channel_id}/messages/{message_id}"); + public static final Route GET_MESSAGE_HISTORY = new Route(GET, "channels/{channel_id}/messages"); //Bot only - public static final Route GET_MESSAGE = new Route(GET, true, "channels/{channel_id}/messages/{message_id}", "channel_id"); - public static final Route DELETE_MESSAGES = new Route(POST, "channels/{channel_id}/messages/bulk-delete", "channel_id"); + public static final Route GET_MESSAGE = new Route(GET, "channels/{channel_id}/messages/{message_id}"); + public static final Route DELETE_MESSAGES = new Route(POST, "channels/{channel_id}/messages/bulk-delete"); //Client only public static final Route ACK_MESSAGE = new Route(POST, "channels/{channel_id}/messages/{message_id}/ack"); @@ -222,79 +218,65 @@ public static class Messages public static class Invites { - public static final Route GET_INVITE = new Route(GET, true, "invites/{code}"); - public static final Route GET_GUILD_INVITES = new Route(GET, true, "guilds/{guild_id}/invites", "guild_id"); - public static final Route GET_CHANNEL_INVITES = new Route(GET, true, "channels/{channel_id}/invites", "channel_id"); - public static final Route CREATE_INVITE = new Route(POST, "channels/{channel_id}/invites", "channel_id"); - public static final Route DELETE_INVITE = new Route(DELETE, "invites/{code}"); + public static final Route GET_INVITE = new Route(GET, "invites/{code}"); + public static final Route GET_GUILD_INVITES = new Route(GET, "guilds/{guild_id}/invites"); + public static final Route GET_CHANNEL_INVITES = new Route(GET, "channels/{channel_id}/invites"); + public static final Route CREATE_INVITE = new Route(POST, "channels/{channel_id}/invites"); + public static final Route DELETE_INVITE = new Route(DELETE, "invites/{code}"); } - public static class Custom + @Nonnull + public static Route custom(@Nonnull Method method, @Nonnull String route) { - public static final Route DELETE_ROUTE = new Route(DELETE, "{}"); - public static final Route GET_ROUTE = new Route(GET, "{}"); - public static final Route POST_ROUTE = new Route(POST, "{}"); - public static final Route PUT_ROUTE = new Route(PUT, "{}"); - public static final Route PATCH_ROUTE = new Route(PATCH, "{}"); + Checks.notNull(method, "Method"); + Checks.notEmpty(route, "Route"); + Checks.noWhitespace(route, "Route"); + return new Route(method, route); } - private final String route; - private final String ratelimitRoute; - private final String compilableRoute; - private final int paramCount; - private final Method method; - private final List majorParamIndexes = new ArrayList<>(); - private final boolean missingHeaders; + @Nonnull + public static Route delete(@Nonnull String route) + { + return custom(DELETE, route); + } + + @Nonnull + public static Route post(@Nonnull String route) + { + return custom(POST, route); + } + + @Nonnull + public static Route put(@Nonnull String route) + { + return custom(PUT, route); + } + + @Nonnull + public static Route patch(@Nonnull String route) + { + return custom(PATCH, route); + } - private Route(Method method, String route, String... majorParameters) + @Nonnull + public static Route get(@Nonnull String route) { - this(method, false, route, majorParameters); + return custom(GET, route); } - private Route(Method method, boolean missingHeaders, String route, String... majorParameters) + private static final String majorParameters = "guild_id:channel_id:webhook_id"; + private final String route; + private final Method method; + private final int paramCount; + + private Route(Method method, String route) { this.method = method; - this.missingHeaders = missingHeaders; this.route = route; this.paramCount = Helpers.countMatches(route, '{'); //All parameters start with { if (paramCount != Helpers.countMatches(route, '}')) throw new IllegalArgumentException("An argument does not have both {}'s for route: " + method + " " + route); - - //Create a String.format compilable route for parameter compiling. - compilableRoute = route.replaceAll("\\{.*?\\}","%s"); - - //If this route has major parameters that are unique markers for the ratelimit route, then we need to - // create a ratelimit compilable route. This goes through and replaces the parameters specified by majorParameters - // and records the parameter index so that when we compile it later we can select the proper parameters - // from the ones provided to make sure we inject in the proper indexes. - if (majorParameters.length != 0) - { - int paramIndex = 0; - String replaceRoute = route; - Pattern keyP = Pattern.compile("\\{(.*?)\\}"); - Matcher keyM = keyP.matcher(route); - //Search the route for all parameters - while(keyM.find()) - { - String param = keyM.group(1); - //Attempt to match the found parameter with any of our majorParameters - for (String majorParam : majorParameters) - { - //If the parameter is a major parameter, replace it with a string token and record its - // parameter index for later ratelimitRoute compiling. - if (param.equals(majorParam)) - { - replaceRoute = replaceRoute.replace(keyM.group(0), "%s"); - majorParamIndexes.add(paramIndex); - } - } - paramIndex++; - } - ratelimitRoute = replaceRoute; - } - else - ratelimitRoute = route; } public Method getMethod() @@ -307,21 +289,6 @@ public String getRoute() return route; } - public boolean isMissingHeaders() - { - return missingHeaders; - } - - public String getRatelimitRoute() - { - return ratelimitRoute; - } - - public String getCompilableRoute() - { - return compilableRoute; - } - public int getParamCount() { return paramCount; @@ -336,23 +303,22 @@ public CompiledRoute compile(String... params) } //Compile the route for interfacing with discord. - String compiledRoute = String.format(compilableRoute, (Object[]) params); - String compiledRatelimitRoute = getRatelimitRoute(); - //If this route has major parameters which help to uniquely distinguish it from others of this route type then - // compile it using the major parameter indexes we discovered in the constructor. - if (!majorParamIndexes.isEmpty()) + StringBuilder majorParameter = new StringBuilder(majorParameters); + StringBuilder compiledRoute = new StringBuilder(route); + for (int i = 0; i < paramCount; i++) { - - String[] majorParams = new String[majorParamIndexes.size()]; - for (int i = 0; i < majorParams.length; i++) - { - majorParams[i] = params[majorParamIndexes.get(i)]; - } - compiledRatelimitRoute = String.format(compiledRatelimitRoute, (Object[]) majorParams); + int paramStart = compiledRoute.indexOf("{"); + int paramEnd = compiledRoute.indexOf("}"); + String paramName = compiledRoute.substring(paramStart+1, paramEnd); + int majorParamIndex = majorParameter.indexOf(paramName); + if (majorParamIndex > -1) + majorParameter.replace(majorParamIndex, majorParamIndex + paramName.length(), params[i]); + + compiledRoute.replace(paramStart, paramEnd + 1, params[i]); } - return new CompiledRoute(this, compiledRatelimitRoute, compiledRoute); + return new CompiledRoute(this, compiledRoute.toString(), majorParameter.toString()); } @Override @@ -374,27 +340,27 @@ public boolean equals(Object o) @Override public String toString() { - return "Route(" + method + ": " + route + ")"; + return method + "/" + route; } public class CompiledRoute { private final Route baseRoute; - private final String ratelimitRoute; + private final String major; private final String compiledRoute; private final boolean hasQueryParams; - private CompiledRoute(Route baseRoute, String ratelimitRoute, String compiledRoute, boolean hasQueryParams) + private CompiledRoute(Route baseRoute, String compiledRoute, String major, boolean hasQueryParams) { this.baseRoute = baseRoute; - this.ratelimitRoute = ratelimitRoute; this.compiledRoute = compiledRoute; + this.major = major; this.hasQueryParams = hasQueryParams; } - private CompiledRoute(Route baseRoute, String ratelimitRoute, String compiledRoute) + private CompiledRoute(Route baseRoute, String compiledRoute, String major) { - this(baseRoute, ratelimitRoute, compiledRoute, false); + this(baseRoute, compiledRoute, major, false); } @Nonnull @@ -409,12 +375,12 @@ public CompiledRoute withQueryParams(String... params) for (int i = 0; i < params.length; i++) newRoute.append(!hasQueryParams && i == 0 ? '?' : '&').append(params[i]).append('=').append(params[++i]); - return new CompiledRoute(baseRoute, ratelimitRoute, newRoute.toString(), true); + return new CompiledRoute(baseRoute, newRoute.toString(), major, true); } - public String getRatelimitRoute() + public String getMajorParameters() { - return ratelimitRoute; + return major; } public String getCompiledRoute() @@ -455,45 +421,4 @@ public String toString() return "CompiledRoute(" + method + ": " + compiledRoute + ")"; } } - - //edit message uses a different rate-limit bucket as delete message and thus we need a special handling - - /* - From the docs: - - There is currently a single exception to the above rule regarding different HTTP methods sharing the same rate limit, - and that is for the deletion of messages. - Deleting messages falls under a separate, higher rate limit so that bots are able - to more quickly delete content from channels (which is useful for moderation bots). - - As of 1st of September 2018 - */ - private static class DeleteMessageRoute extends Route - { - private DeleteMessageRoute() - { - super(DELETE, true, "channels/{channel_id}/messages/{message_id}", "channel_id"); - } - - @Override - public String getRatelimitRoute() - { - return "channels/%s/messages/{message_id}/delete"; //the additional "/delete" forces a new bucket - } - } - - // This endpoint shares the rate-limit bucket with REMOVE_ALL_REACTIONS - private static class ReactionRoute extends Route - { - private ReactionRoute(Method method) - { - super(method, "channels/{channel_id}/messages/{message_id}/reactions/{reaction_code}/{user_id}", "channel_id"); - } - - @Override - public String getRatelimitRoute() - { - return "channels/%s/messages/{message_id}/reactions"; - } - } } diff --git a/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/BotRateLimiter.java b/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/BotRateLimiter.java index c1d65793be..d2b2f5c9a6 100644 --- a/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/BotRateLimiter.java +++ b/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/BotRateLimiter.java @@ -16,414 +16,413 @@ package net.dv8tion.jda.internal.requests.ratelimit; -import net.dv8tion.jda.api.events.ExceptionEvent; import net.dv8tion.jda.api.requests.Request; import net.dv8tion.jda.api.utils.MiscUtil; -import net.dv8tion.jda.api.utils.data.DataObject; -import net.dv8tion.jda.internal.JDAImpl; import net.dv8tion.jda.internal.requests.RateLimiter; import net.dv8tion.jda.internal.requests.Requester; import net.dv8tion.jda.internal.requests.Route; -import net.dv8tion.jda.internal.utils.IOUtil; -import net.dv8tion.jda.internal.utils.UnlockHook; import okhttp3.Headers; +import org.jetbrains.annotations.Contract; -import java.io.IOException; -import java.io.InputStream; -import java.io.UncheckedIOException; import java.util.Iterator; +import java.util.Map; import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; +/* + +** How does it work? ** + +A bucket is determined via the Path+Method+Major in the following way: + + 1. Get Hash from Path+Method (we call this route) + 2. Get bucket from Hash+Major (we call this bucketid) + +If no hash is known we default to the constant "unlimited" hash. The hash is loaded from HTTP responses using the "X-RateLimit-Bucket" response header. +This hash is per Method+Path and can be stored indefinitely once received. +Some endpoints don't return a hash, this means that the endpoint is **unlimited** and will be in queue with only the major parameter. + +To explain this further, lets look at the example of message history. The endpoint to fetch message history is "GET/channels/{channel.id}/messages". +This endpoint does not have any rate limit (unlimited) and will thus use the hash "unlimited+GET/channels/{channel.id}/messages". +The bucket id for this will be "unlimited+GET/channels/{channel.id}/messages:guild_id:{channel.id}:webhook_id" where "{channel.id}" would be replaced with the respective id. +This means you can fetch history concurrently for multiple channels but it will be in sequence for the same channel. + +If the endpoint is not unlimited we will receive a hash on the first response. +Once this happens every unlimited bucket will start moving its queue to the correct bucket. +This is done during the queue work iteration so many requests to one endpoint would be moved correctly. + +For example, the first message sending: + + public void onReady(ReadyEvent event) { + TextChannel channel = event.getJDA().getTextChannelById("123"); + for (int i = 1; i <= 100; i++) { + channel.sendMessage("Message: " + i).queue(); + } + } + +This will send 100 messages on startup. At this point we don't yet know the hash for this route so we put them all in "unlimited+POST/channels/{channel.id}/messages:guild_id:123:webhook_id". +The bucket iterates the requests in sync and gets the first response. This response provides the hash for this route and we create a bucket for it. +Once the response is handled we continue with the next request in the unlimited bucket and notice the new bucket. We then move all related requests to this bucket. + + */ public class BotRateLimiter extends RateLimiter { private static final String RESET_AFTER_HEADER = "X-RateLimit-Reset-After"; private static final String RESET_HEADER = "X-RateLimit-Reset"; private static final String LIMIT_HEADER = "X-RateLimit-Limit"; private static final String REMAINING_HEADER = "X-RateLimit-Remaining"; + private static final String GLOBAL_HEADER = "X-RateLimit-Global"; + private static final String HASH_HEADER = "X-RateLimit-Bucket"; + private static final String RETRY_AFTER_HEADER = "Retry-After"; + private static final String UNLIMITED_BUCKET = "unlimited"; // we generate an unlimited bucket for every major parameter configuration + + private final ReentrantLock bucketLock = new ReentrantLock(); + // Route -> Hash + private final Map hash = new ConcurrentHashMap<>(); + // Hash + Major Parameter -> Bucket + private final Map bucket = new ConcurrentHashMap<>(); + // Bucket -> Rate-Limit Worker + private final Map> rateLimitQueue = new ConcurrentHashMap<>(); + private Future cleanupWorker; public BotRateLimiter(Requester requester) { super(requester); } + @Override + public void init() + { + cleanupWorker = getScheduler().scheduleAtFixedRate(this::cleanup, 30, 30, TimeUnit.SECONDS); + } + + private ScheduledExecutorService getScheduler() + { + return requester.getJDA().getRateLimitPool(); + } + + private void cleanup() + { + // This will remove buckets that are no longer needed every 30 seconds to avoid memory leakage + // We will keep the hashes in memory since they are very limited (by the amount of possible routes) + MiscUtil.locked(bucketLock, () -> { + int size = bucket.size(); + Iterator> entries = bucket.entrySet().iterator(); + + while (entries.hasNext()) + { + Map.Entry entry = entries.next(); + String key = entry.getKey(); + Bucket bucket = entry.getValue(); + if (bucket.isUnlimited() && bucket.requests.isEmpty()) + entries.remove(); // remove unlimited if requests are empty + // If the requests of the bucket are drained and the reset is expired the bucket has no valuable information + else if (bucket.requests.isEmpty() && bucket.reset <= getNow()) + entries.remove(); + } + // Log how many buckets were removed + size -= bucket.size(); + if (size > 0) + log.debug("Removed {} expired buckets", size); + }); + } + + private String getRouteHash(Route route) + { + return hash.getOrDefault(route, UNLIMITED_BUCKET + "+" + route); + } + + @Override + protected void shutdown() + { + super.shutdown(); + if (cleanupWorker != null) + cleanupWorker.cancel(false); + } + @Override public Long getRateLimit(Route.CompiledRoute route) { - Bucket bucket = getBucket(route); - synchronized (bucket) - { - return bucket.getRateLimit(); - } + Bucket bucket = getBucket(route, false); + return bucket == null ? 0L : bucket.getRateLimit(); } @Override + @SuppressWarnings("rawtypes") protected void queueRequest(Request request) { - Bucket bucket = getBucket(request.getRoute()); - synchronized (bucket) - { - bucket.addToQueue(request); - } + // Create bucket and enqueue request + MiscUtil.locked(bucketLock, () -> { + Bucket bucket = getBucket(request.getRoute(), true); + bucket.enqueue(request); + runBucket(bucket); + }); } @Override protected Long handleResponse(Route.CompiledRoute route, okhttp3.Response response) { - Bucket bucket = getBucket(route); - synchronized (bucket) + bucketLock.lock(); + try { - Headers headers = response.headers(); - int code = response.code(); + long rateLimit = updateBucket(route, response).getRateLimit(); + if (response.code() == 429) + return rateLimit; + else + return null; + } + finally + { + bucketLock.unlock(); + } + } - if (code == 429) + private Bucket updateBucket(Route.CompiledRoute route, okhttp3.Response response) + { + return MiscUtil.locked(bucketLock, () -> { + try { - String global = headers.get("X-RateLimit-Global"); - String retry = headers.get("Retry-After"); - if (retry == null || retry.isEmpty()) + Bucket bucket = getBucket(route, true); + Headers headers = response.headers(); + + boolean wasUnlimited = bucket.isUnlimited(); + boolean global = headers.get(GLOBAL_HEADER) != null; + String hash = headers.get(HASH_HEADER); + long now = getNow(); + + // Create a new bucket for the hash if needed + Route baseRoute = route.getBaseRoute(); + if (hash != null) { - try (InputStream in = IOUtil.getBody(response)) + if (!this.hash.containsKey(baseRoute)) { - DataObject limitObj = DataObject.fromJson(in); - retry = limitObj.get("retry_after").toString(); - } - catch (IOException e) - { - throw new UncheckedIOException(e); + this.hash.put(baseRoute, hash); + log.debug("Caching bucket hash {} -> {}", baseRoute, hash); } + + bucket = getBucket(route, true); } - long retryAfter = Long.parseLong(retry); - if (Boolean.parseBoolean(global)) //global ratelimit + + // Handle global rate limit if necessary + if (global) { - //If it is global, lock down the threads. - log.warn("Encountered global rate-limit! Retry-After: {}", retryAfter); - requester.getJDA().getSessionController().setGlobalRatelimit(getNow() + retryAfter); + String retryAfterHeader = headers.get(RETRY_AFTER_HEADER); + long retryAfter = parseLong(retryAfterHeader); + requester.getJDA().getSessionController().setGlobalRatelimit(now + retryAfter); + log.error("Encountered global rate limit! Retry-After: {} ms", retryAfter); } - else + // Handle hard rate limit, pretty much just log that it happened + else if (response.code() == 429) { - log.warn("Encountered 429 on route /{}", bucket.getRoute()); - updateBucket(bucket, headers, retryAfter); + // Update the bucket to the new information + String retryAfterHeader = headers.get(RETRY_AFTER_HEADER); + long retryAfter = parseLong(retryAfterHeader); + bucket.remaining = 0; + bucket.reset = getNow() + retryAfter; + // don't log warning if we are switching bucket, this means it was an issue with an un-hashed route that is now resolved + if (hash == null || !wasUnlimited) + log.warn("Encountered 429 on route {} with bucket {} Retry-After: {} ms", baseRoute, bucket.bucketId, retryAfter); + else + log.debug("Encountered 429 on route {} with bucket {} Retry-After: {} ms", baseRoute, bucket.bucketId, retryAfter); + return bucket; } - return retryAfter; + // If hash is null this means we didn't get enough information to update a bucket + if (hash == null) + return bucket; + + // Update the bucket parameters with new information + String limitHeader = headers.get(LIMIT_HEADER); + String remainingHeader = headers.get(REMAINING_HEADER); + String resetAfterHeader = headers.get(RESET_AFTER_HEADER); + String resetHeader = headers.get(RESET_HEADER); + + bucket.limit = (int) Math.max(1L, parseLong(limitHeader)); + bucket.remaining = (int) parseLong(remainingHeader); + if (requester.getJDA().isRelativeRateLimit()) + bucket.reset = now + parseDouble(resetAfterHeader); + else + bucket.reset = parseDouble(resetHeader); + log.trace("Updated bucket {} to ({}/{}, {})", bucket.bucketId, bucket.remaining, bucket.limit, bucket.reset - now); + return bucket; } - else + catch (Exception e) { - updateBucket(bucket, headers, -1); - return null; + Bucket bucket = getBucket(route, true); + log.error("Encountered Exception while updating a bucket. Route: {} Bucket: {} Code: {} Headers:\n{}", + route.getBaseRoute(), bucket, response.code(), response.headers(), e); + return bucket; } - } - + }); } - private Bucket getBucket(Route.CompiledRoute route) + @Contract("_,true->!null") + private Bucket getBucket(Route.CompiledRoute route, boolean create) { - String rateLimitRoute = route.getRatelimitRoute(); - Bucket bucket = (Bucket) buckets.get(rateLimitRoute); - if (bucket == null) + return MiscUtil.locked(bucketLock, () -> { - synchronized (buckets) - { - bucket = (Bucket) buckets.get(rateLimitRoute); - if (bucket == null) - { - Route baseRoute = route.getBaseRoute(); - bucket = new Bucket(rateLimitRoute, baseRoute.isMissingHeaders()); - buckets.put(rateLimitRoute, bucket); - } - } - } - return bucket; + // Retrieve the hash via the route + String hash = getRouteHash(route.getBaseRoute()); + // Get or create a bucket for the hash + major parameters + String bucketId = hash + ":" + route.getMajorParameters(); + Bucket bucket = this.bucket.get(bucketId); + if (bucket == null && create) + this.bucket.put(bucketId, bucket = new Bucket(bucketId)); + + return bucket; + }); } - public long getNow() + private void runBucket(Bucket bucket) { - return System.currentTimeMillis(); + if (isShutdown) + return; + // Schedule a new bucket worker if no worker is running + MiscUtil.locked(bucketLock, () -> + rateLimitQueue.computeIfAbsent(bucket, + (k) -> getScheduler().schedule(bucket, bucket.getRateLimit(), TimeUnit.MILLISECONDS))); } - private void updateBucket(Bucket bucket, Headers headers, long retryAfter) + private long parseLong(String input) { - int headerCount = 0; - if (retryAfter > 0) - { - bucket.resetTime = getNow() + retryAfter; - bucket.routeUsageRemaining = 0; - } - - // relative = reset-after - if (requester.getJDA().isRelativeRateLimit()) - headerCount += parseDouble(headers.get(RESET_AFTER_HEADER), bucket, (time, b) -> b.resetTime = getNow() + (long) (time * 1000)); //Seconds to milliseconds - else // absolute = reset - headerCount += parseDouble(headers.get(RESET_HEADER), bucket, (time, b) -> b.resetTime = (long) (time * 1000)); //Seconds to milliseconds - - headerCount += parseInt(headers.get(LIMIT_HEADER), bucket, (limit, b) -> b.routeUsageLimit = limit); - - //Currently, we check the remaining amount even for hardcoded ratelimits just to further respect Discord - // however, if there should ever be a case where Discord informs that the remaining is less than what - // it actually is and we add a custom ratelimit to handle that, we will need to instead move this to the - // above else statement and add a bucket.routeUsageRemaining-- decrement to the above if body. - //An example of this statement needing to be moved would be if the custom ratelimit reset time interval is - // equal to or greater than 1000ms, and the remaining count provided by discord is less than the ACTUAL - // amount that their systems allow in such a way that isn't a bug. - //The custom ratelimit system is primarily for ratelimits that can't be properly represented by Discord's - // header system due to their headers only supporting accuracy to the second. The custom ratelimit system - // allows for hardcoded ratelimits that allow accuracy to the millisecond which is important for some - // ratelimits like Reactions which is 1/0.25s, but discord reports the ratelimit as 1/1s with headers. - headerCount += parseInt(headers.get(REMAINING_HEADER), bucket, (remaining, b) -> b.routeUsageRemaining = remaining); - if (!bucket.missingHeaders && headerCount < 3) - { - log.debug("Encountered issue with headers when updating a bucket\n" + - "Route: {}\nHeaders: {}", bucket.getRoute(), headers); - } + return input == null ? 0L : Long.parseLong(input); } - private int parseInt(String input, Bucket bucket, IntObjectConsumer consumer) + private long parseDouble(String input) { - try - { - int parsed = Integer.parseInt(input); - consumer.accept(parsed, bucket); - return 1; - } - catch (NumberFormatException ignored) {} - catch (Exception e) - { - log.error("Uncaught exception parsing header value: {}", input, e); - } - return 0; + //The header value is using a double to represent milliseconds and seconds: + // 5.250 this is 5 seconds and 250 milliseconds (5250 milliseconds) + return input == null ? 0L : (long) (Double.parseDouble(input) * 1000); } - private int parseDouble(String input, Bucket bucket, DoubleObjectConsumer consumer) + + public long getNow() { - try - { - double parsed = Double.parseDouble(input); - consumer.accept(parsed, bucket); - return 1; - } - catch (NumberFormatException | NullPointerException ignored) {} - catch (Exception e) - { - log.error("Uncaught exception parsing header value: {}", input, e); - } - return 0; + return System.currentTimeMillis(); } + @SuppressWarnings("rawtypes") private class Bucket implements IBucket, Runnable { - final String route; - final boolean missingHeaders; - final ConcurrentLinkedQueue requests = new ConcurrentLinkedQueue<>(); - final ReentrantLock requestLock = new ReentrantLock(); - volatile boolean processing = false; - volatile long resetTime = 0; - volatile int routeUsageRemaining = 1; //These are default values to only allow 1 request until we have properly - volatile int routeUsageLimit = 1; // ratelimit information. - - public Bucket(String route, boolean missingHeaders) + private final String bucketId; + private final Queue requests = new ConcurrentLinkedQueue<>(); + + private long reset = 0; + private int remaining = 1; + private int limit = 1; + + public Bucket(String bucketId) { - this.route = route; - this.missingHeaders = missingHeaders; + this.bucketId = bucketId; } - void addToQueue(Request request) + public void enqueue(Request request) { requests.add(request); - submitForProcessing(); } - void submitForProcessing() + public long getRateLimit() { - synchronized (submittedBuckets) + long now = getNow(); + long global = requester.getJDA().getSessionController().getGlobalRatelimit(); + // Global rate limit is more important to handle + if (global > now) + return global - now; + // Check if the bucket reset time has expired + if (reset <= now) { - if (!submittedBuckets.contains(this)) - { - Long delay = getRateLimit(); - if (delay == null) - delay = 0L; - - if (delay > 0) - { - log.debug("Backing off {} milliseconds on route /{}", delay, getRoute()); - requester.getJDA().getRateLimitPool().schedule(this, delay, TimeUnit.MILLISECONDS); - } - else - { - requester.getJDA().getRateLimitPool().execute(this); - } - submittedBuckets.add(this); - } + // Update the remaining uses to the limit (we don't know better) + remaining = limit; + return 0L; } - } - Long getRateLimit() - { - long gCooldown = requester.getJDA().getSessionController().getGlobalRatelimit(); - if (gCooldown > 0) //Are we on global cooldown? - { - long now = getNow(); - if (now > gCooldown) //Verify that we should still be on cooldown. - { - //If we are done cooling down, reset the globalCooldown and continue. - requester.getJDA().getSessionController().setGlobalRatelimit(Long.MIN_VALUE); - } - else - { - //If we should still be on cooldown, return when we can go again. - return gCooldown - now; - } - } - if (this.routeUsageRemaining <= 0) - { - if (getNow() > this.resetTime) - { - this.routeUsageRemaining = this.routeUsageLimit; - this.resetTime = 0; - } - } - if (this.routeUsageRemaining > 0) - return null; - else - return this.resetTime - getNow(); + // If there are remaining requests we don't need to do anything, otherwise return backoff in milliseconds + return remaining < 1 ? reset - now : 0L; } - @Override - public boolean equals(Object o) + public long getReset() { - if (!(o instanceof Bucket)) - return false; - - Bucket oBucket = (Bucket) o; - return route.equals(oBucket.route); + return reset; } - @Override - public int hashCode() + public int getRemaining() { - return route.hashCode(); + return remaining; } - private void handleResponse(Iterator it, Long retryAfter) + public int getLimit() { - if (retryAfter == null) - { - // We were not rate-limited! Then just continue with the rest of the requests - it.remove(); - processIterator(it); - } - else - { - // Rate-limited D: Guess we have to backoff for now - finishProcess(); - } + return limit; } - private void processIterator(Iterator it) + private boolean isUnlimited() { - Request request = null; - try - { - do - { - if (!it.hasNext()) - { - // End loop, no more requests left - finishProcess(); - return; - } - request = it.next(); - } while (isSkipped(it, request)); - - CompletableFuture handle = requester.execute(request); - final Request request0 = request; - // Hook the callback for the request - handle.whenComplete((retryAfter, error) -> - { - requester.setContext(); - if (error != null) - { - // There was an error, handle it and continue with the next request - log.error("Requester system encountered internal error", error); - it.remove(); - request0.onFailure(error); - processIterator(it); - } - else - { - // Handle the response and backoff if necessary - handleResponse(it, retryAfter); - } - }); - } - catch (Throwable t) - { - log.error("Requester system encountered an internal error", t); - it.remove(); - if (request != null) - request.onFailure(t); - // don't forget to end the loop and start over - finishProcess(); - } + return bucketId.startsWith("unlimited"); } - private void finishProcess() + private void backoff() { - // We are done with processing - MiscUtil.locked(requestLock, () -> - { - processing = false; - }); - // Re-submit if new requests were added or rate-limit was hit - synchronized (submittedBuckets) - { - submittedBuckets.remove(this); + // Schedule backoff if requests are not done + MiscUtil.locked(bucketLock, () -> { + rateLimitQueue.remove(this); if (!requests.isEmpty()) - { - try - { - this.submitForProcessing(); - } - catch (RejectedExecutionException e) - { - log.debug("Caught RejectedExecutionException when re-queuing a ratelimited request. The requester is probably shutdown, thus, this can be ignored."); - } - } - } + runBucket(this); + }); } @Override public void run() { - requester.setContext(); - requestLock.lock(); - try (UnlockHook hook = new UnlockHook(requestLock)) - { - // Ensure the processing is synchronized - if (processing) - return; - processing = true; - // Start processing loop - Iterator it = requests.iterator(); - processIterator(it); - } - catch (Throwable err) + log.trace("Bucket {} is running {} requests", bucketId, requests.size()); + Iterator iterator = requests.iterator(); + while (iterator.hasNext()) { - log.error("Requester system encountered an internal error from beyond the synchronized execution blocks. NOT GOOD!", err); - if (err instanceof Error) + Long rateLimit = getRateLimit(); + if (rateLimit > 0L) { - JDAImpl api = requester.getJDA(); - api.handleEvent(new ExceptionEvent(api, err, true)); + // We need to backoff since we ran out of remaining uses or hit the global rate limit + log.debug("Backing off {} ms for bucket {}", rateLimit, bucketId); + break; + } + + Request request = iterator.next(); + if (isUnlimited()) + { + boolean shouldSkip = MiscUtil.locked(bucketLock, () -> { + // Attempt moving request to correct bucket if it has been created + Bucket bucket = getBucket(request.getRoute(), true); + if (bucket != this) + { + bucket.enqueue(request); + iterator.remove(); + runBucket(bucket); + return true; + } + return false; + }); + if (shouldSkip) continue; + } + + if (isSkipped(iterator, request)) + continue; + + try + { + rateLimit = requester.execute(request); + if (rateLimit != null) + break; // this means we hit a hard rate limit (429) so the request needs to be retried + + // The request went through so we can remove it + iterator.remove(); + } + catch (Exception ex) + { + log.error("Encountered exception trying to execute request", ex); + break; } } - } - @Override - public String getRoute() - { - return route; + backoff(); } @Override @@ -431,15 +430,11 @@ public Queue getRequests() { return requests; } - } - - private interface IntObjectConsumer - { - void accept(int n, T t); - } - private interface DoubleObjectConsumer - { - void accept(double n, T t); + @Override + public String toString() + { + return bucketId; + } } } diff --git a/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/ClientRateLimiter.java b/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/ClientRateLimiter.java index 29830d1c2d..ca87fc2ef9 100644 --- a/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/ClientRateLimiter.java +++ b/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/ClientRateLimiter.java @@ -205,7 +205,7 @@ public void run() if (isSkipped(it, request)) continue; // Blocking code because I'm lazy and client accounts are not priority - Long retryAfter = requester.execute(request).get(); + Long retryAfter = requester.execute(request); if (retryAfter != null) break; else @@ -248,12 +248,6 @@ public void run() } } - @Override - public String getRoute() - { - return route; - } - @Override public Queue getRequests() { diff --git a/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/IBucket.java b/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/IBucket.java index fc5b919846..fb81554070 100644 --- a/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/IBucket.java +++ b/src/main/java/net/dv8tion/jda/internal/requests/ratelimit/IBucket.java @@ -22,6 +22,5 @@ public interface IBucket { - String getRoute(); Queue getRequests(); }