Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class ArangoDBAsyncImpl extends InternalArangoDB<ArangoExecutorAsync> imp
private static final Logger LOGGER = LoggerFactory.getLogger(ArangoDBAsyncImpl.class);

private final CommunicationProtocol cp;
private final HostResolver asyncHostResolver;
private final HostResolver syncHostResolver;
private final HostHandler asyncHostHandler;
private final HostHandler syncHostHandler;

Expand All @@ -78,6 +80,8 @@ public ArangoDBAsyncImpl(
final VstCommunication<Response, VstConnectionSync> cacheCom = syncCommBuilder.build(util);

cp = new VstProtocol(cacheCom);
this.syncHostResolver = syncHostResolver;
this.asyncHostResolver = asyncHostResolver;
this.asyncHostHandler = asyncHostHandler;
this.syncHostHandler = syncHostHandler;

Expand All @@ -96,6 +100,8 @@ protected ArangoExecutorAsync executor() {
@Override
public void shutdown() {
try {
asyncHostResolver.shutdown();
syncHostResolver.shutdown();
executor.disconnect();
} finally {
try {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/arangodb/internal/ArangoDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class ArangoDBImpl extends InternalArangoDB<ArangoExecutorSync> implement

private static final Logger LOGGER = LoggerFactory.getLogger(ArangoDBImpl.class);
private final CommunicationProtocol cp;
private final HostResolver hostResolver;
private final HostHandler hostHandler;

public ArangoDBImpl(final VstCommunicationSync.Builder vstBuilder, final HttpCommunication.Builder httpBuilder,
Expand All @@ -70,6 +71,7 @@ util, new QueueTimeMetricsImpl(responseQueueTimeSamples), timeoutMs),
new HttpCommunication.Builder(httpBuilder),
util,
protocol);
this.hostResolver = hostResolver;
this.hostHandler = hostHandler;

hostResolver.init(this.executor(), getSerde());
Expand Down Expand Up @@ -108,6 +110,7 @@ protected ArangoExecutorSync executor() {
@Override
public void shutdown() {
try {
hostResolver.shutdown();
executor.disconnect();
} finally {
try {
Expand Down
97 changes: 68 additions & 29 deletions src/main/java/com/arangodb/internal/http/HttpCommunication.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

/**
Expand All @@ -54,54 +56,91 @@ public void close() throws IOException {
}

public Response execute(final Request request, final HostHandle hostHandle) {
return execute(request, hostHandle, 0);
try {
return execute(request, hostHandle, 0).get();
} catch (InterruptedException e) {
throw new ArangoDBException(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof ArangoDBException) {
throw (ArangoDBException) cause;
} else {
throw new ArangoDBException(cause);
}
}
}

private Response execute(final Request request, final HostHandle hostHandle, final int attemptCount) {
private CompletableFuture<Response> execute(final Request request, final HostHandle hostHandle, final int attemptCount) {
final CompletableFuture<Response> rfuture = new CompletableFuture<>();
final AccessType accessType = RequestUtils.determineAccessType(request);
Host host = hostHandler.get(hostHandle, accessType);
try {
while (true) {
try {
final HttpConnection connection = (HttpConnection) host.connection();
final Response response = connection.execute(request);
hostHandler.success();
hostHandler.confirm();
return response;
} catch (final SocketTimeoutException e) {
final HttpConnection connection = (HttpConnection) host.connection();
connection.execute(request).whenComplete(((resp, e) -> {
if (resp != null) {
hostHandler.success();
hostHandler.confirm();
rfuture.complete(resp);
} else if (e != null) {
if (e instanceof SocketTimeoutException) {
// SocketTimeoutException exceptions are wrapped and rethrown.
// Differently from other IOException exceptions they must not be retried,
// since the requests could not be idempotent.
TimeoutException te = new TimeoutException(e.getMessage());
te.initCause(e);
throw new ArangoDBException(te);
} catch (final IOException e) {
hostHandler.fail(e);
rfuture.completeExceptionally(new ArangoDBException(te));
} else if (e instanceof IOException) {
hostHandler.fail((IOException) e);
if (hostHandle != null && hostHandle.getHost() != null) {
hostHandle.setHost(null);
}
final Host failedHost = host;
host = hostHandler.get(hostHandle, accessType);
if (host != null) {
LOGGER.warn(String.format("Could not connect to %s", failedHost.getDescription()), e);

Host nextHost;
try {
nextHost = hostHandler.get(hostHandle, accessType);
} catch (ArangoDBException ex) {
rfuture.completeExceptionally(e);
return;
}

if (nextHost != null) {
LOGGER.warn(String.format("Could not connect to %s", host.getDescription()), e);
LOGGER.warn(String.format("Could not connect to %s. Try connecting to %s",
failedHost.getDescription(), host.getDescription()));
host.getDescription(), nextHost.getDescription()));
CompletableFuture<Response> req =
execute(request, new HostHandle().setHost(nextHost.getDescription()), attemptCount);
mirrorFuture(req, rfuture);
} else {
LOGGER.error(e.getMessage(), e);
throw new ArangoDBException(e);
rfuture.completeExceptionally(new ArangoDBException(e));
}
} else if (e instanceof ArangoDBRedirectException) {
if (attemptCount < 3) {
ArangoDBRedirectException redirEx = (ArangoDBRedirectException) e;
final String location = redirEx.getLocation();
final HostDescription redirectHost = HostUtils.createFromLocation(location);
hostHandler.failIfNotMatch(redirectHost, redirEx);
CompletableFuture<Response> req =
execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1);
mirrorFuture(req, rfuture);
} else {
rfuture.completeExceptionally(e);
}
} else {
rfuture.completeExceptionally(e);
}
}
} catch (final ArangoDBException e) {
if (e instanceof ArangoDBRedirectException && attemptCount < 3) {
final String location = ((ArangoDBRedirectException) e).getLocation();
final HostDescription redirectHost = HostUtils.createFromLocation(location);
hostHandler.failIfNotMatch(redirectHost, e);
return execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1);
} else {
throw e;
}));
return rfuture;
}

private void mirrorFuture(CompletableFuture<Response> upstream, CompletableFuture<Response> downstream) {
upstream.whenComplete((v, err) -> {
if (v != null) {
downstream.complete(v);
} else if (err != null) {
downstream.completeExceptionally(err);
}
}
});
}

public static class Builder {
Expand Down
51 changes: 25 additions & 26 deletions src/main/java/com/arangodb/internal/http/HttpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,20 @@
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


/**
* @author Mark Vollmary
* @author Michele Rastelli
*/
public class HttpConnection implements Connection {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpCommunication.class);
Expand All @@ -76,7 +76,7 @@ public class HttpConnection implements Connection {
private final InternalSerde util;
private final String baseUrl;
private final ContentType contentType;
private volatile String auth;
private String auth;
private final WebClient client;
private final Integer timeout;
private final Vertx vertx;
Expand All @@ -89,9 +89,12 @@ private HttpConnection(final HostDescription host, final Integer timeout, final
this.contentType = ContentType.of(protocol);
this.timeout = timeout;
baseUrl = buildBaseUrl(host, useSsl);
auth = new UsernamePasswordCredentials(user, password != null ? password : "").toHttpAuthorization();
vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1));
vertx.runOnContext(e -> Thread.currentThread().setName("adb-eventloop-" + THREAD_COUNT.getAndIncrement()));
vertx.runOnContext(e -> {
Thread.currentThread().setName("adb-eventloop-" + THREAD_COUNT.getAndIncrement());
auth = new UsernamePasswordCredentials(user, password != null ? password : "").toHttpAuthorization();
LOGGER.debug("Created Vert.x context");
});

int _ttl = ttl == null ? 0 : Math.toIntExact(ttl / 1000);

Expand Down Expand Up @@ -223,7 +226,13 @@ private String buildBaseUrl(HostDescription host, boolean useSsl) {
return (Boolean.TRUE.equals(useSsl) ? "https://" : "http://") + host.getHost() + ":" + host.getPort();
}

public Response execute(final Request request) throws IOException {
public CompletableFuture<Response> execute(final Request request) {
CompletableFuture<Response> rfuture = new CompletableFuture<>();
vertx.runOnContext(e -> doExecute(request, rfuture));
return rfuture;
}

public void doExecute(final Request request, final CompletableFuture<Response> rfuture) {
String path = buildUrl(request);
HttpRequest<Buffer> httpRequest = client
.request(requestTypeToHttpMethod(request.getRequestType()), path)
Expand All @@ -250,26 +259,15 @@ public Response execute(final Request request) throws IOException {
} else {
buffer = Buffer.buffer();
}
HttpResponse<Buffer> bufferResponse;
try {
// FIXME: make async API
bufferResponse = httpRequest.sendBuffer(buffer).toCompletionStage().toCompletableFuture().get();
} catch (InterruptedException e) {
throw new ArangoDBException(e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
} else {
throw new ArangoDBException(e.getCause());
}
}
Response response = buildResponse(bufferResponse);
checkError(response);
return response;

httpRequest.sendBuffer(buffer)
.map(this::buildResponse)
.map(this::checkError)
.onSuccess(rfuture::complete)
.onFailure(rfuture::completeExceptionally);
}

public Response buildResponse(final HttpResponse<Buffer> httpResponse) throws UnsupportedOperationException {
private Response buildResponse(final HttpResponse<Buffer> httpResponse) {
final Response response = new Response();
response.setResponseCode(httpResponse.statusCode());
Buffer body = httpResponse.body();
Expand All @@ -285,14 +283,15 @@ public Response buildResponse(final HttpResponse<Buffer> httpResponse) throws Un
return response;
}

protected void checkError(final Response response) {
protected Response checkError(final Response response) {
ResponseUtils.checkError(util, response);
return response;
}

@Override
public void setJwt(String jwt) {
if (jwt != null) {
auth = new TokenCredentials(jwt).toHttpAuthorization();
vertx.runOnContext((e) -> auth = new TokenCredentials(jwt).toHttpAuthorization());
}
}

Expand Down
Loading