Skip to content

Commit

Permalink
Use randomization when connecting to remote members in ClientSession.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Aug 3, 2015
1 parent 838526c commit 79cf45b
Showing 1 changed file with 16 additions and 9 deletions.
Expand Up @@ -34,7 +34,9 @@
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.Iterator; import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
Expand All @@ -57,11 +59,12 @@ private static enum State {
EXPIRED EXPIRED
} }


private final Random random = new Random();
private final Client client; private final Client client;
private Members members; private Members members;
private final long keepAliveInterval; private final long keepAliveInterval;
private final Context context; private final Context context;
private Iterator<Member> connectMembers; private List<Member> connectMembers;
private Connection connection; private Connection connection;
private volatile State state = State.CLOSED; private volatile State state = State.CLOSED;
private volatile long id; private volatile long id;
Expand All @@ -79,7 +82,7 @@ private static enum State {
this.members = members; this.members = members;
this.keepAliveInterval = keepAliveInterval; this.keepAliveInterval = keepAliveInterval;
this.context = new SingleThreadContext("copycat-client-" + UUID.randomUUID().toString(), serializer.clone()); this.context = new SingleThreadContext("copycat-client-" + UUID.randomUUID().toString(), serializer.clone());
this.connectMembers = members.members().iterator(); this.connectMembers = new ArrayList<>(members.members());
} }


@Override @Override
Expand All @@ -101,7 +104,7 @@ public Context context() {
*/ */
private void setMembers(Members members) { private void setMembers(Members members) {
this.members = members; this.members = members;
this.connectMembers = this.members.members().iterator(); this.connectMembers = new ArrayList<>(this.members.members());
} }


/** /**
Expand Down Expand Up @@ -139,9 +142,10 @@ private <T> CompletableFuture<T> submit(CommandRequest request, CompletableFutur
if (response.status() == Response.Status.OK) { if (response.status() == Response.Status.OK) {
responseSequence = Math.max(responseSequence, request.commandSequence()); responseSequence = Math.max(responseSequence, request.commandSequence());
future.complete((T) response.result()); future.complete((T) response.result());
resetMembers();
request.close(); request.close();
} else { } else {
submit(request, future); resetConnection().submit(request, future);
} }
response.close(); response.close();
} else { } else {
Expand Down Expand Up @@ -186,9 +190,10 @@ private <T> CompletableFuture<T> submit(QueryRequest request, CompletableFuture<
if (error == null) { if (error == null) {
if (response.status() == Response.Status.OK) { if (response.status() == Response.Status.OK) {
future.complete((T) response.result()); future.complete((T) response.result());
resetMembers();
request.close(); request.close();
} else { } else {
submit(request, future); resetConnection().submit(request, future);
} }
response.close(); response.close();
} else { } else {
Expand Down Expand Up @@ -251,14 +256,14 @@ private <T extends Request<T>, U extends Response<U>> CompletableFuture<U> reque
} }


// If we've run out of members to which to connect then expire the session. // If we've run out of members to which to connect then expire the session.
if (!connectMembers.hasNext()) { if (connectMembers.isEmpty()) {
LOGGER.warn("Failed to connect to cluster"); LOGGER.warn("Failed to connect to cluster");
resetConnection().onExpire(); resetConnection().onExpire();
future.completeExceptionally(new IllegalStateException("session not open")); future.completeExceptionally(new IllegalStateException("session not open"));
return future; return future;
} }


Member member = connectMembers.next(); Member member = connectMembers.remove(random.nextInt(connectMembers.size()));


final InetSocketAddress address; final InetSocketAddress address;
try { try {
Expand Down Expand Up @@ -305,7 +310,9 @@ private ClientSession resetConnection() {
* Resets the members to which to connect. * Resets the members to which to connect.
*/ */
private ClientSession resetMembers() { private ClientSession resetMembers() {
connectMembers = members.members().iterator(); if (connectMembers.isEmpty() || connectMembers.size() < members.members().size() - 1) {
connectMembers = new ArrayList<>(members.members());
}
return this; return this;
} }


Expand Down

0 comments on commit 79cf45b

Please sign in to comment.