Skip to content

Commit

Permalink
Limit retries in client connection.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Mar 19, 2018
1 parent 60171ed commit dee8d43
Showing 1 changed file with 40 additions and 24 deletions.
Expand Up @@ -67,6 +67,7 @@ public class RaftProxyConnection {
private final MemberSelector selector;
private final ThreadContext context;
private NodeId currentNode;
private int selectionId;

public RaftProxyConnection(RaftClientProtocol protocol, MemberSelector selector, ThreadContext context, LoggerContext loggerContext) {
this.protocol = checkNotNull(protocol, "protocol cannot be null");
Expand Down Expand Up @@ -119,9 +120,9 @@ public Collection<NodeId> members() {
public CompletableFuture<OpenSessionResponse> openSession(OpenSessionRequest request) {
CompletableFuture<OpenSessionResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::openSession, next(), future);
sendRequest(request, protocol::openSession, future);
} else {
context.execute(() -> sendRequest(request, protocol::openSession, next(), future));
context.execute(() -> sendRequest(request, protocol::openSession, future));
}
return future;
}
Expand All @@ -135,9 +136,9 @@ public CompletableFuture<OpenSessionResponse> openSession(OpenSessionRequest req
public CompletableFuture<CloseSessionResponse> closeSession(CloseSessionRequest request) {
CompletableFuture<CloseSessionResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::closeSession, next(), future);
sendRequest(request, protocol::closeSession, future);
} else {
context.execute(() -> sendRequest(request, protocol::closeSession, next(), future));
context.execute(() -> sendRequest(request, protocol::closeSession, future));
}
return future;
}
Expand All @@ -151,9 +152,9 @@ public CompletableFuture<CloseSessionResponse> closeSession(CloseSessionRequest
public CompletableFuture<KeepAliveResponse> keepAlive(KeepAliveRequest request) {
CompletableFuture<KeepAliveResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::keepAlive, next(), future);
sendRequest(request, protocol::keepAlive, future);
} else {
context.execute(() -> sendRequest(request, protocol::keepAlive, next(), future));
context.execute(() -> sendRequest(request, protocol::keepAlive, future));
}
return future;
}
Expand All @@ -167,9 +168,9 @@ public CompletableFuture<KeepAliveResponse> keepAlive(KeepAliveRequest request)
public CompletableFuture<QueryResponse> query(QueryRequest request) {
CompletableFuture<QueryResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::query, next(), future);
sendRequest(request, protocol::query, future);
} else {
context.execute(() -> sendRequest(request, protocol::query, next(), future));
context.execute(() -> sendRequest(request, protocol::query, future));
}
return future;
}
Expand All @@ -183,9 +184,9 @@ public CompletableFuture<QueryResponse> query(QueryRequest request) {
public CompletableFuture<CommandResponse> command(CommandRequest request) {
CompletableFuture<CommandResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::command, next(), future);
sendRequest(request, protocol::command, future);
} else {
context.execute(() -> sendRequest(request, protocol::command, next(), future));
context.execute(() -> sendRequest(request, protocol::command, future));
}
return future;
}
Expand All @@ -199,22 +200,31 @@ public CompletableFuture<CommandResponse> command(CommandRequest request) {
public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) {
CompletableFuture<MetadataResponse> future = new CompletableFuture<>();
if (context.isCurrentContext()) {
sendRequest(request, protocol::metadata, next(), future);
sendRequest(request, protocol::metadata, future);
} else {
context.execute(() -> sendRequest(request, protocol::metadata, next(), future));
context.execute(() -> sendRequest(request, protocol::metadata, future));
}
return future;
}

/**
* Sends the given request attempt to the cluster.
*/
protected <T extends RaftRequest, U extends RaftResponse> void sendRequest(T request, BiFunction<NodeId, T, CompletableFuture<U>> sender, NodeId member, CompletableFuture<U> future) {
if (member != null) {
log.trace("Sending {} to {}", request, member);
sender.apply(member, request).whenCompleteAsync((r, e) -> {
protected <T extends RaftRequest, U extends RaftResponse> void sendRequest(T request, BiFunction<NodeId, T, CompletableFuture<U>> sender, CompletableFuture<U> future) {
sendRequest(request, sender, 0, future);
}

/**
* Sends the given request attempt to the cluster.
*/
protected <T extends RaftRequest, U extends RaftResponse> void sendRequest(T request, BiFunction<NodeId, T, CompletableFuture<U>> sender, int count, CompletableFuture<U> future) {
NodeId node = next();
if (node != null) {
log.trace("Sending {} to {}", request, node);
int selectionId = this.selectionId;
sender.apply(node, request).whenCompleteAsync((r, e) -> {
if (e != null || r != null) {
handleResponse(request, sender, member, r, e, future);
handleResponse(request, sender, count, selectionId, node, r, e, future);
} else {
future.complete(null);
}
Expand All @@ -228,37 +238,41 @@ protected <T extends RaftRequest, U extends RaftResponse> void sendRequest(T req
* Resends a request due to a request failure, resetting the connection if necessary.
*/
@SuppressWarnings("unchecked")
protected <T extends RaftRequest> void retryRequest(Throwable cause, T request, BiFunction sender, NodeId member, CompletableFuture future) {
protected <T extends RaftRequest> void retryRequest(Throwable cause, T request, BiFunction sender, int count, int selectionId, CompletableFuture future) {
// If the connection has not changed, reset it and connect to the next server.
if (this.currentNode == member) {
if (this.selectionId == selectionId) {
log.trace("Resetting connection. Reason: {}", cause.getMessage());
this.currentNode = null;
}

// Attempt to send the request again.
sendRequest(request, sender, next(), future);
sendRequest(request, sender, count, future);
}

/**
* Handles a response from the cluster.
*/
@SuppressWarnings("unchecked")
protected <T extends RaftRequest> void handleResponse(T request, BiFunction sender, NodeId member, RaftResponse response, Throwable error, CompletableFuture future) {
protected <T extends RaftRequest> void handleResponse(T request, BiFunction sender, int count, int selectionId, NodeId node, RaftResponse response, Throwable error, CompletableFuture future) {
if (error == null) {
log.trace("Received {} from {}", response, member);
log.trace("Received {} from {}", response, node);
if (COMPLETE_PREDICATE.test(response)) {
future.complete(response);
selector.reset();
} else {
retryRequest(response.error().createException(), request, sender, member, future);
retryRequest(response.error().createException(), request, sender, count + 1, selectionId, future);
}
} else {
if (error instanceof CompletionException) {
error = error.getCause();
}
log.debug("{} failed! Reason: {}", request, error);
if (error instanceof ConnectException || error instanceof TimeoutException || error instanceof ClosedChannelException) {
retryRequest(error, request, sender, member, future);
if (count < selector.members().size() + 1) {
retryRequest(error, request, sender, count + 1, selectionId, future);
} else {
future.completeExceptionally(error);
}
} else {
future.completeExceptionally(error);
}
Expand All @@ -278,6 +292,7 @@ protected NodeId next() {
if (selector.leader() != null) {
selector.reset(null, selector.members());
this.currentNode = selector.next();
this.selectionId++;
return currentNode;
} else {
log.debug("Failed to connect to the cluster");
Expand All @@ -286,6 +301,7 @@ protected NodeId next() {
}
} else {
this.currentNode = selector.next();
this.selectionId++;
return currentNode;
}
}
Expand Down

0 comments on commit dee8d43

Please sign in to comment.