diff --git a/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java b/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java index 458fadf0..c258abb3 100644 --- a/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java +++ b/hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java @@ -14,12 +14,13 @@ import java.time.Duration; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; public class ReactorHttpClient implements HypixelHttpClient { + private final HttpClient httpClient; private final UUID apiKey; @@ -33,8 +34,10 @@ public class ReactorHttpClient implements HypixelHttpClient { // For shutting down the flux that emits request callbacks private final Disposable requestCallbackFluxDisposable; + private final ExecutorService requestCallbackFluxExecutorService = Executors.newSingleThreadExecutor(); - private final Object lock = new Object(); + private final ReentrantLock lock = new ReentrantLock(true); + private final Condition limitResetCondition = lock.newCondition(); /* * How many requests we can send before reaching the limit @@ -62,18 +65,13 @@ public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests, int bufferCa callback = blockingQueue.take(); } - synchronized (lock) { - while (this.actionsLeftThisMinute <= 0) { - lock.wait(); - } + this.decrementActionsOrWait(); - actionsLeftThisMinute--; - } synchronousSink.next(callback); } catch (InterruptedException e) { throw new AssertionError("This should not have been possible", e); } - }).subscribeOn(Schedulers.boundedElastic()).delayElements(Duration.ofMillis(minDelayBetweenRequests), Schedulers.boundedElastic()).subscribe(RequestCallback::sendRequest); + }).subscribeOn(Schedulers.fromExecutorService(this.requestCallbackFluxExecutorService)).delayElements(Duration.ofMillis(minDelayBetweenRequests), Schedulers.boundedElastic()).subscribe(RequestCallback::sendRequest); } public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests) { @@ -89,7 +87,7 @@ public ReactorHttpClient(UUID apiKey) { } /** - * Canceling the returned future will result in canceling the request if possible + * Canceling the returned future will result in canceling the sending of the request if still possible */ @Override public CompletableFuture makeRequest(String url) { @@ -97,7 +95,7 @@ public CompletableFuture makeRequest(String url) { } /** - * Canceling the returned future will result in canceling the request if possible + * Canceling the returned future will result in canceling the sending of the request if still possible */ @Override public CompletableFuture makeAuthenticatedRequest(String url) { @@ -112,17 +110,18 @@ private static CompletableFuture toHypixelResponseFuture(Mo @Override public void shutdown() { this.requestCallbackFluxDisposable.dispose(); + this.requestCallbackFluxExecutorService.shutdown(); } /** - * Makes a request to the Hypixel api and returns a {@link Mono>} containing + * Makes a request to the Hypixel api and returns a {@link Mono} containing * the response body and status code, canceling this mono will prevent the request from being sent if possible * * @param path full url * @param isAuthenticated whether to enable authentication or not */ public Mono> makeRequest(String path, boolean isAuthenticated) { - return Mono.>create(sink -> { + return Mono.create(sink -> { RequestCallback callback = new RequestCallback(path, sink, isAuthenticated, this); try { @@ -131,7 +130,40 @@ public Mono> makeRequest(String path, boolean isAuthenti sink.error(e); throw new AssertionError("Queue insertion interrupted. This should not have been possible", e); } - }).subscribeOn(Schedulers.boundedElastic()); + }); + } + + private void decrementActionsOrWait() throws InterruptedException { + this.lock.lock(); + try { + while (this.actionsLeftThisMinute <= 0) { + this.limitResetCondition.await(); + } + this.actionsLeftThisMinute--; + } finally { + this.lock.unlock(); + } + } + + + private void incrementActionsLeftThisMinute() { + this.lock.lock(); + try { + this.actionsLeftThisMinute++; + this.limitResetCondition.signal(); + } finally { + this.lock.unlock(); + } + } + + private void setActionsLeftThisMinute(int value) { + this.lock.lock(); + try { + this.actionsLeftThisMinute = Math.max(0, value); + this.limitResetCondition.signal(); + } finally { + this.lock.unlock(); + } } /** @@ -144,12 +176,11 @@ public Mono> makeRequest(String path, boolean isAuthenti */ private ResponseHandlingResult handleResponse(HttpClientResponse response, RequestCallback requestCallback) throws InterruptedException { if (response.status() == HttpResponseStatus.TOO_MANY_REQUESTS) { + System.out.println("Too many requests were sent, is something else using the same API Key?!!"); int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10)); if (this.overflowStartedNewClock.compareAndSet(false, true)) { - synchronized (lock) { - this.actionsLeftThisMinute = 0; - } + this.setActionsLeftThisMinute(0); resetForFirstRequest(timeRemaining); } @@ -162,10 +193,7 @@ private ResponseHandlingResult handleResponse(HttpClientResponse response, Reque int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10)); int requestsRemaining = response.responseHeaders().getInt("ratelimit-remaining", 110); - synchronized (lock) { - this.actionsLeftThisMinute = requestsRemaining; - lock.notifyAll(); - } + this.setActionsLeftThisMinute(requestsRemaining); resetForFirstRequest(timeRemaining); } @@ -182,10 +210,7 @@ private void resetForFirstRequest(int timeRemaining) { Schedulers.parallel().schedule(() -> { this.firstRequestReturned.set(false); this.overflowStartedNewClock.set(false); - synchronized (lock) { - this.actionsLeftThisMinute = 1; - lock.notifyAll(); - } + this.setActionsLeftThisMinute(1); }, timeRemaining + 2, TimeUnit.SECONDS); } @@ -194,37 +219,43 @@ private void resetForFirstRequest(int timeRemaining) { */ private static class RequestCallback { private final String url; - private final MonoSink> monoSink; + private final MonoSink> requestResultSink; private final ReactorHttpClient requestRateLimiter; private final boolean isAuthenticated; + private final ReentrantLock lock = new ReentrantLock(); private boolean isCanceled = false; - private RequestCallback(String url, MonoSink> monoSink, boolean isAuthenticated, ReactorHttpClient requestRateLimiter) { + private RequestCallback(String url, MonoSink> requestResultSink, boolean isAuthenticated, ReactorHttpClient requestRateLimiter) { this.url = url; - this.monoSink = monoSink; + this.requestResultSink = requestResultSink; this.requestRateLimiter = requestRateLimiter; this.isAuthenticated = isAuthenticated; - this.monoSink.onCancel(() -> { - synchronized (this) { - this.isCanceled = true; - } - }); + this.requestResultSink.onCancel(this::setCanceled); + } + + private void setCanceled() { + this.lock.lock(); + try { + this.isCanceled = true; + } finally { + this.lock.unlock(); + } } public boolean isCanceled() { - return this.isCanceled; + this.lock.lock(); + try { + return this.isCanceled; + } finally { + this.lock.unlock(); + } } private void sendRequest() { - synchronized (this) { - if (isCanceled) { - synchronized (this.requestRateLimiter.lock) { - this.requestRateLimiter.actionsLeftThisMinute++; - this.requestRateLimiter.lock.notifyAll(); - } - return; - } + if (this.isCanceled()) { + this.requestRateLimiter.incrementActionsLeftThisMinute(); + return; } (this.isAuthenticated ? requestRateLimiter.httpClient.headers(headers -> headers.add("API-Key", requestRateLimiter.apiKey.toString())) : requestRateLimiter.httpClient).get() @@ -238,10 +269,10 @@ private void sendRequest() { } return Mono.empty(); } catch (InterruptedException e) { - monoSink.error(e); + this.requestResultSink.error(e); throw new AssertionError("ERROR: Queue insertion got interrupted, serious problem! (this should not happen!!)", e); } - }).subscribe(this.monoSink::success); + }).subscribe(this.requestResultSink::success); } }