diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/RaftException.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/RaftException.java index 1a16b345e9..e44eea1b2f 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/RaftException.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/RaftException.java @@ -134,5 +134,9 @@ public static class Unavailable extends RaftException { public Unavailable(String message, Object... args) { super(RaftError.Type.UNAVAILABLE, message, args); } + + public Unavailable(Throwable cause) { + super(RaftError.Type.UNAVAILABLE, cause); + } } } diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/DefaultRaftClient.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/DefaultRaftClient.java index bea2ba342e..6f4a80f45e 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/DefaultRaftClient.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/DefaultRaftClient.java @@ -16,6 +16,7 @@ package io.atomix.protocols.raft.impl; import io.atomix.protocols.raft.RaftClient; +import io.atomix.protocols.raft.RaftException; import io.atomix.protocols.raft.RaftMetadataClient; import io.atomix.protocols.raft.cluster.MemberId; import io.atomix.protocols.raft.protocol.RaftClientProtocol; @@ -33,6 +34,7 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -127,13 +129,23 @@ public String toString() { private class SessionBuilder extends RaftProxy.Builder { @Override public RaftProxy build() { + // Create a client builder that uses the session manager to open a session. RaftProxyClient.Builder clientBuilder = new RaftProxyClient.Builder() { @Override public RaftProxyClient build() { - return sessionManager.openSession(name, serviceType, readConsistency, communicationStrategy, timeout).join(); + try { + return sessionManager.openSession(name, serviceType, readConsistency, communicationStrategy, timeout).join(); + } catch (CompletionException e) { + if (e.getCause() instanceof RaftException.Unavailable) { + throw (RaftException.Unavailable) e.getCause(); + } else { + throw new RaftException.Unavailable(e); + } + } } }; + // Populate the proxy client builder. clientBuilder.withName(name) .withServiceType(serviceType) .withReadConsistency(readConsistency) @@ -147,7 +159,7 @@ public RaftProxyClient build() { // If the recovery strategy is set to RECOVER, wrap the builder in a recovering proxy client. if (recoveryStrategy == RecoveryStrategy.RECOVER) { - client = new RecoveringRaftProxyClient(clientBuilder); + client = new RecoveringRaftProxyClient(clientBuilder, new ThreadPoolContext(threadPoolExecutor)); } else { client = clientBuilder.build(); } diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RaftProxyManager.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RaftProxyManager.java index 179c032921..3c833a6536 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RaftProxyManager.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RaftProxyManager.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.ConnectException; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -162,10 +163,10 @@ public CompletableFuture openSession( future.complete(client); } else { - future.completeExceptionally(response.error().createException()); + future.completeExceptionally(new RaftException.Unavailable(response.error().message())); } } else { - future.completeExceptionally(error); + future.completeExceptionally(new RaftException.Unavailable(error.getMessage())); } }, proxyContext); return future; diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RecoveringRaftProxyClient.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RecoveringRaftProxyClient.java index 312d10bd49..c4bb3eca5d 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RecoveringRaftProxyClient.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/proxy/impl/RecoveringRaftProxyClient.java @@ -17,14 +17,18 @@ import com.google.common.collect.Sets; import io.atomix.protocols.raft.RaftEvent; +import io.atomix.protocols.raft.RaftException; import io.atomix.protocols.raft.RaftOperation; import io.atomix.protocols.raft.ServiceType; import io.atomix.protocols.raft.proxy.RaftProxy; import io.atomix.protocols.raft.proxy.RaftProxyClient; import io.atomix.protocols.raft.session.SessionId; +import io.atomix.utils.concurrent.Scheduled; +import io.atomix.utils.concurrent.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -38,15 +42,18 @@ public class RecoveringRaftProxyClient implements RaftProxyClient { private final Logger log = LoggerFactory.getLogger(RecoveringRaftProxyClient.class); private final RaftProxyClient.Builder proxyClientBuilder; + private final Scheduler scheduler; private RaftProxyClient client; private volatile RaftProxy.State state = State.CLOSED; private final Set> stateChangeListeners = Sets.newCopyOnWriteArraySet(); private final Set> eventListeners = Sets.newCopyOnWriteArraySet(); + private Scheduled recoverTask; private boolean recover = true; - public RecoveringRaftProxyClient(RaftProxyClient.Builder proxyClientBuilder) { + public RecoveringRaftProxyClient(RaftProxyClient.Builder proxyClientBuilder, Scheduler scheduler) { this.proxyClientBuilder = checkNotNull(proxyClientBuilder); - openSession(); + this.scheduler = checkNotNull(scheduler); + this.client = openClient().join(); } @Override @@ -82,7 +89,7 @@ private synchronized void onStateChange(State state) { // If the session was closed then reopen it. if (state == State.CLOSED) { - openSession(); + recover(); } } } @@ -98,15 +105,43 @@ public void removeStateChangeListener(Consumer listener) { } /** - * Opens the session. + * Recovers the underlying proxy client. */ - private synchronized void openSession() { - log.debug("Opening session"); + private synchronized void recover() { + recoverTask = null; + this.client = openClient().join(); + } + + /** + * Opens the underlying proxy client. + */ + private synchronized CompletableFuture openClient() { + if (recoverTask == null) { + CompletableFuture future = new CompletableFuture<>(); + openClient(future); + return future; + } + return CompletableFuture.completedFuture(null); + } + + /** + * Opens the underlying proxy client. + */ + private synchronized void openClient(CompletableFuture future) { if (recover) { - client = proxyClientBuilder.build(); - onStateChange(State.CONNECTED); - client.addStateChangeListener(this::onStateChange); - eventListeners.forEach(client::addEventListener); + log.debug("Opening session"); + RaftProxyClient client; + try { + client = proxyClientBuilder.build(); + onStateChange(State.CONNECTED); + client.addStateChangeListener(this::onStateChange); + eventListeners.forEach(client::addEventListener); + future.complete(client); + } catch (RaftException.Unavailable e) { + recoverTask = scheduler.schedule(Duration.ofSeconds(1), this::recover); + } + } else { + future.completeExceptionally(new RaftException.Unavailable("Proxy client is closed")); } } @@ -135,6 +170,9 @@ public boolean isOpen() { @Override public synchronized CompletableFuture close() { recover = false; + if (recoverTask != null) { + recoverTask.cancel(); + } return client.close(); } diff --git a/protocols/raft/src/test/java/io/atomix/protocols/raft/RaftTest.java b/protocols/raft/src/test/java/io/atomix/protocols/raft/RaftTest.java index 6bb0cc16df..4c0eac5015 100644 --- a/protocols/raft/src/test/java/io/atomix/protocols/raft/RaftTest.java +++ b/protocols/raft/src/test/java/io/atomix/protocols/raft/RaftTest.java @@ -188,6 +188,30 @@ public void testCrashRecover() throws Throwable { await(30000); } + /** + * Tests opening a client before servers. + */ + public void testOpenClientBeforeServer() throws Throwable { + for (int i = 0; i < 3; i++) { + members.add(nextMember(RaftMember.Type.ACTIVE)); + } + + RaftClient client = createClient(); + RaftProxy session = createSession(client); + submit(session, 0, 1000); + + for (int i = 0; i < 3; i++) { + RaftServer server = createServer(members.get(i)); + servers.add(server); + } + + for (RaftServer server : servers) { + server.bootstrap(members.stream().map(RaftMember::memberId).collect(Collectors.toList())).thenRun(this::resume); + } + + await(30000, 4); + } + /** * Tests leaving a sever from a cluster. */