Skip to content

Commit

Permalink
Refactor recovering proxy client to retry connecting to the cluster.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jul 1, 2017
1 parent aa4970d commit da317ce
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 14 deletions.
Expand Up @@ -134,5 +134,9 @@ public static class Unavailable extends RaftException {
public Unavailable(String message, Object... args) {
super(RaftError.Type.UNAVAILABLE, message, args);
}

public Unavailable(Throwable cause) {
super(RaftError.Type.UNAVAILABLE, cause);
}
}
}
Expand Up @@ -16,6 +16,7 @@
package io.atomix.protocols.raft.impl;

import io.atomix.protocols.raft.RaftClient;
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.RaftMetadataClient;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
Expand All @@ -33,6 +34,7 @@

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -127,13 +129,23 @@ public String toString() {
private class SessionBuilder extends RaftProxy.Builder {
@Override
public RaftProxy build() {
// Create a client builder that uses the session manager to open a session.
RaftProxyClient.Builder clientBuilder = new RaftProxyClient.Builder() {
@Override
public RaftProxyClient build() {
return sessionManager.openSession(name, serviceType, readConsistency, communicationStrategy, timeout).join();
try {
return sessionManager.openSession(name, serviceType, readConsistency, communicationStrategy, timeout).join();
} catch (CompletionException e) {
if (e.getCause() instanceof RaftException.Unavailable) {
throw (RaftException.Unavailable) e.getCause();
} else {
throw new RaftException.Unavailable(e);
}
}
}
};

// Populate the proxy client builder.
clientBuilder.withName(name)
.withServiceType(serviceType)
.withReadConsistency(readConsistency)
Expand All @@ -147,7 +159,7 @@ public RaftProxyClient build() {

// If the recovery strategy is set to RECOVER, wrap the builder in a recovering proxy client.
if (recoveryStrategy == RecoveryStrategy.RECOVER) {
client = new RecoveringRaftProxyClient(clientBuilder);
client = new RecoveringRaftProxyClient(clientBuilder, new ThreadPoolContext(threadPoolExecutor));
} else {
client = clientBuilder.build();
}
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.ConnectException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -162,10 +163,10 @@ public CompletableFuture<RaftProxyClient> openSession(

future.complete(client);
} else {
future.completeExceptionally(response.error().createException());
future.completeExceptionally(new RaftException.Unavailable(response.error().message()));
}
} else {
future.completeExceptionally(error);
future.completeExceptionally(new RaftException.Unavailable(error.getMessage()));
}
}, proxyContext);
return future;
Expand Down
Expand Up @@ -17,14 +17,18 @@

import com.google.common.collect.Sets;
import io.atomix.protocols.raft.RaftEvent;
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.RaftOperation;
import io.atomix.protocols.raft.ServiceType;
import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.protocols.raft.proxy.RaftProxyClient;
import io.atomix.protocols.raft.session.SessionId;
import io.atomix.utils.concurrent.Scheduled;
import io.atomix.utils.concurrent.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand All @@ -38,15 +42,18 @@
public class RecoveringRaftProxyClient implements RaftProxyClient {
private final Logger log = LoggerFactory.getLogger(RecoveringRaftProxyClient.class);
private final RaftProxyClient.Builder proxyClientBuilder;
private final Scheduler scheduler;
private RaftProxyClient client;
private volatile RaftProxy.State state = State.CLOSED;
private final Set<Consumer<State>> stateChangeListeners = Sets.newCopyOnWriteArraySet();
private final Set<Consumer<RaftEvent>> eventListeners = Sets.newCopyOnWriteArraySet();
private Scheduled recoverTask;
private boolean recover = true;

public RecoveringRaftProxyClient(RaftProxyClient.Builder proxyClientBuilder) {
public RecoveringRaftProxyClient(RaftProxyClient.Builder proxyClientBuilder, Scheduler scheduler) {
this.proxyClientBuilder = checkNotNull(proxyClientBuilder);
openSession();
this.scheduler = checkNotNull(scheduler);
this.client = openClient().join();
}

@Override
Expand Down Expand Up @@ -82,7 +89,7 @@ private synchronized void onStateChange(State state) {

// If the session was closed then reopen it.
if (state == State.CLOSED) {
openSession();
recover();
}
}
}
Expand All @@ -98,15 +105,43 @@ public void removeStateChangeListener(Consumer<State> listener) {
}

/**
* Opens the session.
* Recovers the underlying proxy client.
*/
private synchronized void openSession() {
log.debug("Opening session");
private synchronized void recover() {
recoverTask = null;
this.client = openClient().join();
}

/**
* Opens the underlying proxy client.
*/
private synchronized CompletableFuture<RaftProxyClient> openClient() {
if (recoverTask == null) {
CompletableFuture<RaftProxyClient> future = new CompletableFuture<>();
openClient(future);
return future;
}
return CompletableFuture.completedFuture(null);
}

/**
* Opens the underlying proxy client.
*/
private synchronized void openClient(CompletableFuture<RaftProxyClient> future) {
if (recover) {
client = proxyClientBuilder.build();
onStateChange(State.CONNECTED);
client.addStateChangeListener(this::onStateChange);
eventListeners.forEach(client::addEventListener);
log.debug("Opening session");
RaftProxyClient client;
try {
client = proxyClientBuilder.build();
onStateChange(State.CONNECTED);
client.addStateChangeListener(this::onStateChange);
eventListeners.forEach(client::addEventListener);
future.complete(client);
} catch (RaftException.Unavailable e) {
recoverTask = scheduler.schedule(Duration.ofSeconds(1), this::recover);
}
} else {
future.completeExceptionally(new RaftException.Unavailable("Proxy client is closed"));
}
}

Expand Down Expand Up @@ -135,6 +170,9 @@ public boolean isOpen() {
@Override
public synchronized CompletableFuture<Void> close() {
recover = false;
if (recoverTask != null) {
recoverTask.cancel();
}
return client.close();
}

Expand Down
Expand Up @@ -188,6 +188,30 @@ public void testCrashRecover() throws Throwable {
await(30000);
}

/**
* Tests opening a client before servers.
*/
public void testOpenClientBeforeServer() throws Throwable {
for (int i = 0; i < 3; i++) {
members.add(nextMember(RaftMember.Type.ACTIVE));
}

RaftClient client = createClient();
RaftProxy session = createSession(client);
submit(session, 0, 1000);

for (int i = 0; i < 3; i++) {
RaftServer server = createServer(members.get(i));
servers.add(server);
}

for (RaftServer server : servers) {
server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume);
}

await(30000, 4);
}

/**
* Tests leaving a sever from a cluster.
*/
Expand Down

0 comments on commit da317ce

Please sign in to comment.