From accce427491b4d46132a691f4341917d59feff9b Mon Sep 17 00:00:00 2001 From: GrizzlT <13691001+GrizzlT@users.noreply.github.com> Date: Wed, 25 Aug 2021 16:26:51 +0200 Subject: [PATCH 1/3] Refactoring of ReactorHttpClient + critical bugfix - Refactored `ReactorHttpClient` to use `ReentrantLock` - Fixed a racing condition bug that can lead to unexpected behaviour and following "death" of the reactor transport --- .../api/reactor/ReactorHttpClient.java | 112 +++++++++++------- 1 file changed, 72 insertions(+), 40 deletions(-) diff --git a/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java b/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java index 458fadf0..e0d33e17 100644 --- a/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java +++ b/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java @@ -18,8 +18,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; public class ReactorHttpClient implements HypixelHttpClient { + private final HttpClient httpClient; private final UUID apiKey; @@ -34,7 +37,8 @@ public class ReactorHttpClient implements HypixelHttpClient { // For shutting down the flux that emits request callbacks private final Disposable requestCallbackFluxDisposable; - private final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(true); + private final Condition limitResetCondition = lock.newCondition(); /* * How many requests we can send before reaching the limit @@ -62,13 +66,8 @@ public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests, int bufferCa callback = blockingQueue.take(); } - synchronized (lock) { - while (this.actionsLeftThisMinute <= 0) { - lock.wait(); - } + this.decrementActionsOrWait(); - actionsLeftThisMinute--; - } synchronousSink.next(callback); } catch (InterruptedException e) { throw new AssertionError("This should not have been possible", e); @@ -89,7 +88,7 @@ public ReactorHttpClient(UUID apiKey) { } /** - * Canceling the returned future will result in canceling the request if possible + * Canceling the returned future will result in canceling the sending of the request if still possible */ @Override public CompletableFuture makeRequest(String url) { @@ -97,7 +96,7 @@ public CompletableFuture makeRequest(String url) { } /** - * Canceling the returned future will result in canceling the request if possible + * Canceling the returned future will result in canceling the sending of the request if still possible */ @Override public CompletableFuture makeAuthenticatedRequest(String url) { @@ -115,7 +114,7 @@ public void shutdown() { } /** - * Makes a request to the Hypixel api and returns a {@link Mono>} containing + * Makes a request to the Hypixel api and returns a {@link Mono} containing * the response body and status code, canceling this mono will prevent the request from being sent if possible * * @param path full url @@ -134,6 +133,39 @@ public Mono> makeRequest(String path, boolean isAuthenti }).subscribeOn(Schedulers.boundedElastic()); } + private void decrementActionsOrWait() throws InterruptedException { + this.lock.lock(); + try { + while (this.actionsLeftThisMinute <= 0) { + this.limitResetCondition.await(); + } + this.actionsLeftThisMinute = Math.max(0, this.actionsLeftThisMinute--); + } finally { + this.lock.unlock(); + } + } + + + private void incrementActionsLeftThisMinute() { + this.lock.lock(); + try { + this.actionsLeftThisMinute++; + this.limitResetCondition.signal(); + } finally { + this.lock.unlock(); + } + } + + private void setActionsLeftThisMinute(int value) { + this.lock.lock(); + try { + this.actionsLeftThisMinute = Math.max(0, value); + this.limitResetCondition.signal(); + } finally { + this.lock.unlock(); + } + } + /** * Reads response status and retries error 429 (too many requests) * The first request after every limit reset will be used to schedule the next limit reset @@ -144,12 +176,11 @@ public Mono> makeRequest(String path, boolean isAuthenti */ private ResponseHandlingResult handleResponse(HttpClientResponse response, RequestCallback requestCallback) throws InterruptedException { if (response.status() == HttpResponseStatus.TOO_MANY_REQUESTS) { + System.out.println("Too many requests were sent, is something else using the same API Key?!!"); int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10)); if (this.overflowStartedNewClock.compareAndSet(false, true)) { - synchronized (lock) { - this.actionsLeftThisMinute = 0; - } + this.setActionsLeftThisMinute(0); resetForFirstRequest(timeRemaining); } @@ -162,10 +193,8 @@ private ResponseHandlingResult handleResponse(HttpClientResponse response, Reque int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10)); int requestsRemaining = response.responseHeaders().getInt("ratelimit-remaining", 110); - synchronized (lock) { - this.actionsLeftThisMinute = requestsRemaining; - lock.notifyAll(); - } + + this.setActionsLeftThisMinute(requestsRemaining); resetForFirstRequest(timeRemaining); } @@ -182,10 +211,7 @@ private void resetForFirstRequest(int timeRemaining) { Schedulers.parallel().schedule(() -> { this.firstRequestReturned.set(false); this.overflowStartedNewClock.set(false); - synchronized (lock) { - this.actionsLeftThisMinute = 1; - lock.notifyAll(); - } + this.setActionsLeftThisMinute(1); }, timeRemaining + 2, TimeUnit.SECONDS); } @@ -194,37 +220,43 @@ private void resetForFirstRequest(int timeRemaining) { */ private static class RequestCallback { private final String url; - private final MonoSink> monoSink; + private final MonoSink> requestResultSink; private final ReactorHttpClient requestRateLimiter; private final boolean isAuthenticated; + private final ReentrantLock lock = new ReentrantLock(); private boolean isCanceled = false; - private RequestCallback(String url, MonoSink> monoSink, boolean isAuthenticated, ReactorHttpClient requestRateLimiter) { + private RequestCallback(String url, MonoSink> requestResultSink, boolean isAuthenticated, ReactorHttpClient requestRateLimiter) { this.url = url; - this.monoSink = monoSink; + this.requestResultSink = requestResultSink; this.requestRateLimiter = requestRateLimiter; this.isAuthenticated = isAuthenticated; - this.monoSink.onCancel(() -> { - synchronized (this) { - this.isCanceled = true; - } - }); + this.requestResultSink.onCancel(this::setCanceled); + } + + private void setCanceled() { + this.lock.lock(); + try { + this.isCanceled = true; + } finally { + this.lock.unlock(); + } } public boolean isCanceled() { - return this.isCanceled; + this.lock.lock(); + try { + return this.isCanceled; + } finally { + this.lock.unlock(); + } } private void sendRequest() { - synchronized (this) { - if (isCanceled) { - synchronized (this.requestRateLimiter.lock) { - this.requestRateLimiter.actionsLeftThisMinute++; - this.requestRateLimiter.lock.notifyAll(); - } - return; - } + if (this.isCanceled()) { + this.requestRateLimiter.incrementActionsLeftThisMinute(); + return; } (this.isAuthenticated ? requestRateLimiter.httpClient.headers(headers -> headers.add("API-Key", requestRateLimiter.apiKey.toString())) : requestRateLimiter.httpClient).get() @@ -238,10 +270,10 @@ private void sendRequest() { } return Mono.empty(); } catch (InterruptedException e) { - monoSink.error(e); + this.requestResultSink.error(e); throw new AssertionError("ERROR: Queue insertion got interrupted, serious problem! (this should not happen!!)", e); } - }).subscribe(this.monoSink::success); + }).subscribe(this.requestResultSink::success); } } From 126e4154712d7978c2813354201737f04a98a770 Mon Sep 17 00:00:00 2001 From: GrizzlT <13691001+GrizzlT@users.noreply.github.com> Date: Wed, 25 Aug 2021 17:27:04 +0200 Subject: [PATCH 2/3] Fixed newly created bug preventing the main loop from waiting when necessary --- .../main/java/net/hypixel/api/reactor/ReactorHttpClient.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java b/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java index e0d33e17..ebf3a51f 100644 --- a/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java +++ b/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java @@ -139,7 +139,7 @@ private void decrementActionsOrWait() throws InterruptedException { while (this.actionsLeftThisMinute <= 0) { this.limitResetCondition.await(); } - this.actionsLeftThisMinute = Math.max(0, this.actionsLeftThisMinute--); + this.actionsLeftThisMinute--; } finally { this.lock.unlock(); } @@ -193,7 +193,6 @@ private ResponseHandlingResult handleResponse(HttpClientResponse response, Reque int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10)); int requestsRemaining = response.responseHeaders().getInt("ratelimit-remaining", 110); - this.setActionsLeftThisMinute(requestsRemaining); resetForFirstRequest(timeRemaining); From 32f718d6666a3e3af4cc3e7645e822f82f26a400 Mon Sep 17 00:00:00 2001 From: GrizzlT <13691001+GrizzlT@users.noreply.github.com> Date: Thu, 26 Aug 2021 17:48:25 +0200 Subject: [PATCH 3/3] Main loop now runs on thread that won't terminate automatically when idle --- .../net/hypixel/api/reactor/ReactorHttpClient.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java b/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java index ebf3a51f..c258abb3 100644 --- a/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java +++ b/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java @@ -14,9 +14,7 @@ import java.time.Duration; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -36,6 +34,7 @@ public class ReactorHttpClient implements HypixelHttpClient { // For shutting down the flux that emits request callbacks private final Disposable requestCallbackFluxDisposable; + private final ExecutorService requestCallbackFluxExecutorService = Executors.newSingleThreadExecutor(); private final ReentrantLock lock = new ReentrantLock(true); private final Condition limitResetCondition = lock.newCondition(); @@ -72,7 +71,7 @@ public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests, int bufferCa } catch (InterruptedException e) { throw new AssertionError("This should not have been possible", e); } - }).subscribeOn(Schedulers.boundedElastic()).delayElements(Duration.ofMillis(minDelayBetweenRequests), Schedulers.boundedElastic()).subscribe(RequestCallback::sendRequest); + }).subscribeOn(Schedulers.fromExecutorService(this.requestCallbackFluxExecutorService)).delayElements(Duration.ofMillis(minDelayBetweenRequests), Schedulers.boundedElastic()).subscribe(RequestCallback::sendRequest); } public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests) { @@ -111,6 +110,7 @@ private static CompletableFuture toHypixelResponseFuture(Mo @Override public void shutdown() { this.requestCallbackFluxDisposable.dispose(); + this.requestCallbackFluxExecutorService.shutdown(); } /** @@ -121,7 +121,7 @@ public void shutdown() { * @param isAuthenticated whether to enable authentication or not */ public Mono> makeRequest(String path, boolean isAuthenticated) { - return Mono.>create(sink -> { + return Mono.create(sink -> { RequestCallback callback = new RequestCallback(path, sink, isAuthenticated, this); try { @@ -130,7 +130,7 @@ public Mono> makeRequest(String path, boolean isAuthenti sink.error(e); throw new AssertionError("Queue insertion interrupted. This should not have been possible", e); } - }).subscribeOn(Schedulers.boundedElastic()); + }); } private void decrementActionsOrWait() throws InterruptedException {