Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ReactorHttpClient implements HypixelHttpClient {

private final HttpClient httpClient;
private final UUID apiKey;

Expand All @@ -33,8 +34,10 @@ public class ReactorHttpClient implements HypixelHttpClient {

// For shutting down the flux that emits request callbacks
private final Disposable requestCallbackFluxDisposable;
private final ExecutorService requestCallbackFluxExecutorService = Executors.newSingleThreadExecutor();

private final Object lock = new Object();
private final ReentrantLock lock = new ReentrantLock(true);
private final Condition limitResetCondition = lock.newCondition();

/*
* How many requests we can send before reaching the limit
Expand Down Expand Up @@ -62,18 +65,13 @@ public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests, int bufferCa
callback = blockingQueue.take();
}

synchronized (lock) {
while (this.actionsLeftThisMinute <= 0) {
lock.wait();
}
this.decrementActionsOrWait();

actionsLeftThisMinute--;
}
synchronousSink.next(callback);
} catch (InterruptedException e) {
throw new AssertionError("This should not have been possible", e);
}
}).subscribeOn(Schedulers.boundedElastic()).delayElements(Duration.ofMillis(minDelayBetweenRequests), Schedulers.boundedElastic()).subscribe(RequestCallback::sendRequest);
}).subscribeOn(Schedulers.fromExecutorService(this.requestCallbackFluxExecutorService)).delayElements(Duration.ofMillis(minDelayBetweenRequests), Schedulers.boundedElastic()).subscribe(RequestCallback::sendRequest);
}

public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests) {
Expand All @@ -89,15 +87,15 @@ public ReactorHttpClient(UUID apiKey) {
}

/**
* Canceling the returned future will result in canceling the request if possible
* Canceling the returned future will result in canceling the sending of the request if still possible
*/
@Override
public CompletableFuture<HypixelHttpResponse> makeRequest(String url) {
return toHypixelResponseFuture(makeRequest(url, false));
}

/**
* Canceling the returned future will result in canceling the request if possible
* Canceling the returned future will result in canceling the sending of the request if still possible
*/
@Override
public CompletableFuture<HypixelHttpResponse> makeAuthenticatedRequest(String url) {
Expand All @@ -112,17 +110,18 @@ private static CompletableFuture<HypixelHttpResponse> toHypixelResponseFuture(Mo
@Override
public void shutdown() {
this.requestCallbackFluxDisposable.dispose();
this.requestCallbackFluxExecutorService.shutdown();
}

/**
* Makes a request to the Hypixel api and returns a {@link Mono<Tuple2<String, Integer>>} containing
* Makes a request to the Hypixel api and returns a {@link Mono} containing
* the response body and status code, canceling this mono will prevent the request from being sent if possible
*
* @param path full url
* @param isAuthenticated whether to enable authentication or not
*/
public Mono<Tuple2<String, Integer>> makeRequest(String path, boolean isAuthenticated) {
return Mono.<Tuple2<String, Integer>>create(sink -> {
return Mono.create(sink -> {
RequestCallback callback = new RequestCallback(path, sink, isAuthenticated, this);

try {
Expand All @@ -131,7 +130,40 @@ public Mono<Tuple2<String, Integer>> makeRequest(String path, boolean isAuthenti
sink.error(e);
throw new AssertionError("Queue insertion interrupted. This should not have been possible", e);
}
}).subscribeOn(Schedulers.boundedElastic());
});
}

private void decrementActionsOrWait() throws InterruptedException {
this.lock.lock();
try {
while (this.actionsLeftThisMinute <= 0) {
this.limitResetCondition.await();
}
this.actionsLeftThisMinute--;
} finally {
this.lock.unlock();
}
}


private void incrementActionsLeftThisMinute() {
this.lock.lock();
try {
this.actionsLeftThisMinute++;
this.limitResetCondition.signal();
} finally {
this.lock.unlock();
}
}

private void setActionsLeftThisMinute(int value) {
this.lock.lock();
try {
this.actionsLeftThisMinute = Math.max(0, value);
this.limitResetCondition.signal();
} finally {
this.lock.unlock();
}
}

/**
Expand All @@ -144,12 +176,11 @@ public Mono<Tuple2<String, Integer>> makeRequest(String path, boolean isAuthenti
*/
private ResponseHandlingResult handleResponse(HttpClientResponse response, RequestCallback requestCallback) throws InterruptedException {
if (response.status() == HttpResponseStatus.TOO_MANY_REQUESTS) {
System.out.println("Too many requests were sent, is something else using the same API Key?!!");
int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10));

if (this.overflowStartedNewClock.compareAndSet(false, true)) {
synchronized (lock) {
this.actionsLeftThisMinute = 0;
}
this.setActionsLeftThisMinute(0);
resetForFirstRequest(timeRemaining);
}

Expand All @@ -162,10 +193,7 @@ private ResponseHandlingResult handleResponse(HttpClientResponse response, Reque
int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10));
int requestsRemaining = response.responseHeaders().getInt("ratelimit-remaining", 110);

synchronized (lock) {
this.actionsLeftThisMinute = requestsRemaining;
lock.notifyAll();
}
this.setActionsLeftThisMinute(requestsRemaining);

resetForFirstRequest(timeRemaining);
}
Expand All @@ -182,10 +210,7 @@ private void resetForFirstRequest(int timeRemaining) {
Schedulers.parallel().schedule(() -> {
this.firstRequestReturned.set(false);
this.overflowStartedNewClock.set(false);
synchronized (lock) {
this.actionsLeftThisMinute = 1;
lock.notifyAll();
}
this.setActionsLeftThisMinute(1);
}, timeRemaining + 2, TimeUnit.SECONDS);
}

Expand All @@ -194,37 +219,43 @@ private void resetForFirstRequest(int timeRemaining) {
*/
private static class RequestCallback {
private final String url;
private final MonoSink<Tuple2<String, Integer>> monoSink;
private final MonoSink<Tuple2<String, Integer>> requestResultSink;
private final ReactorHttpClient requestRateLimiter;
private final boolean isAuthenticated;
private final ReentrantLock lock = new ReentrantLock();
private boolean isCanceled = false;

private RequestCallback(String url, MonoSink<Tuple2<String, Integer>> monoSink, boolean isAuthenticated, ReactorHttpClient requestRateLimiter) {
private RequestCallback(String url, MonoSink<Tuple2<String, Integer>> requestResultSink, boolean isAuthenticated, ReactorHttpClient requestRateLimiter) {
this.url = url;
this.monoSink = monoSink;
this.requestResultSink = requestResultSink;
this.requestRateLimiter = requestRateLimiter;
this.isAuthenticated = isAuthenticated;

this.monoSink.onCancel(() -> {
synchronized (this) {
this.isCanceled = true;
}
});
this.requestResultSink.onCancel(this::setCanceled);
}

private void setCanceled() {
this.lock.lock();
try {
this.isCanceled = true;
} finally {
this.lock.unlock();
}
}

public boolean isCanceled() {
return this.isCanceled;
this.lock.lock();
try {
return this.isCanceled;
} finally {
this.lock.unlock();
}
}

private void sendRequest() {
synchronized (this) {
if (isCanceled) {
synchronized (this.requestRateLimiter.lock) {
this.requestRateLimiter.actionsLeftThisMinute++;
this.requestRateLimiter.lock.notifyAll();
}
return;
}
if (this.isCanceled()) {
this.requestRateLimiter.incrementActionsLeftThisMinute();
return;
}

(this.isAuthenticated ? requestRateLimiter.httpClient.headers(headers -> headers.add("API-Key", requestRateLimiter.apiKey.toString())) : requestRateLimiter.httpClient).get()
Expand All @@ -238,10 +269,10 @@ private void sendRequest() {
}
return Mono.empty();
} catch (InterruptedException e) {
monoSink.error(e);
this.requestResultSink.error(e);
throw new AssertionError("ERROR: Queue insertion got interrupted, serious problem! (this should not happen!!)", e);
}
}).subscribe(this.monoSink::success);
}).subscribe(this.requestResultSink::success);
}
}

Expand Down