Skip to content

Commit

Permalink
JAVA-1860: Allow reconnection at startup if no contact point is avail…
Browse files Browse the repository at this point in the history
…able
  • Loading branch information
olim7t committed Jun 20, 2018
1 parent ff7066f commit 3db7a83
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 29 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Expand Up @@ -4,6 +4,7 @@

### 4.0.0-alpha4 (in progress)

- [new feature] JAVA-1860: Allow reconnection at startup if no contact point is available
- [improvement] JAVA-1866: Make all public policies implement AutoCloseable
- [new feature] JAVA-1762: Build alternate core artifact with Netty shaded
- [new feature] JAVA-1761: Add OSGi descriptors
Expand Down
Expand Up @@ -44,6 +44,8 @@ public enum DefaultDriverOption implements DriverOption {
CONNECTION_POOL_LOCAL_SIZE("advanced.connection.pool.local.size"),
CONNECTION_POOL_REMOTE_SIZE("advanced.connection.pool.remote.size"),

RECONNECT_ON_INIT("advanced.reconnect-on-init"),

RECONNECTION_POLICY_CLASS("advanced.reconnection-policy.class"),
RECONNECTION_BASE_DELAY("advanced.reconnection-policy.base-delay"),
RECONNECTION_MAX_DELAY("advanced.reconnection-policy.max-delay"),
Expand Down
Expand Up @@ -243,7 +243,11 @@ public SelfT withClassLoader(ClassLoader classLoader) {
* @return a completion stage that completes with the session when it is fully initialized.
*/
public CompletionStage<SessionT> buildAsync() {
return buildDefaultSessionAsync().thenApply(this::wrap);
CompletionStage<CqlSession> buildStage = buildDefaultSessionAsync();
CompletionStage<SessionT> wrapStage = buildStage.thenApply(this::wrap);
// thenApply does not propagate cancellation (!)
CompletableFutures.propagateCancellation(wrapStage, buildStage);
return wrapStage;
}

/**
Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.datastax.oss.driver.internal.core.metadata.TopologyEvent;
import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.internal.core.util.concurrent.Reconnection;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
Expand Down Expand Up @@ -98,10 +99,18 @@ public CompletionStage<Void> init(boolean listenToClusterEvents, boolean reconne
return singleThreaded.initFuture;
}

public CompletionStage<Void> initFuture() {
return singleThreaded.initFuture;
}

public boolean isInit() {
return singleThreaded.initFuture.isDone();
}

public CompletionStage<Void> firstConnectionAttemptFuture() {
return singleThreaded.firstConnectionAttemptFuture;
}

/**
* The channel currently used by this control connection. This is modified concurrently in the
* event of a reconnection, so it may occasionally return a closed channel (clients should be
Expand Down Expand Up @@ -199,6 +208,7 @@ private void processSchemaChange(Event event) {
private class SingleThreaded {
private final InternalDriverContext context;
private final CompletableFuture<Void> initFuture = new CompletableFuture<>();
private final CompletableFuture<Void> firstConnectionAttemptFuture = new CompletableFuture<>();
private boolean initWasCalled;
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private boolean closeWasCalled;
Expand All @@ -212,6 +222,13 @@ private SingleThreaded(InternalDriverContext context) {
this.context = context;
this.reconnection =
new Reconnection(logPrefix, adminExecutor, context.reconnectionPolicy(), this::reconnect);
// In "reconnect-on-init" mode, handle cancellation of the initFuture by user code
CompletableFutures.whenCancelled(
this.initFuture,
() -> {
LOG.debug("[{}] Init future was cancelled, stopping reconnection", logPrefix);
reconnection.stop();
});

context
.eventBus()
Expand Down Expand Up @@ -241,12 +258,17 @@ private void init(boolean listenToClusterEvents, boolean reconnectOnFailure) {
connect(
nodes,
null,
() -> initFuture.complete(null),
() -> {
initFuture.complete(null);
firstConnectionAttemptFuture.complete(null);
},
error -> {
if (reconnectOnFailure && !closeWasCalled) {
reconnection.start();
} else {
initFuture.completeExceptionally(error);
}
initFuture.completeExceptionally(error);
firstConnectionAttemptFuture.completeExceptionally(error);
});
} catch (Throwable t) {
initFuture.completeExceptionally(t);
Expand Down Expand Up @@ -288,7 +310,7 @@ private void connect(
DistanceEvent lastDistanceEvent = lastDistanceEvents.get(node);
NodeStateEvent lastStateEvent = lastStateEvents.get(node);
if (error != null) {
if (closeWasCalled) {
if (closeWasCalled || initFuture.isCancelled()) {
onSuccess.run(); // abort, we don't really care about the result
} else {
LOG.debug(
Expand All @@ -302,7 +324,7 @@ private void connect(
context.eventBus().fire(ChannelEvent.controlConnectionFailed(node));
connect(nodes, newErrors, onSuccess, onFailure);
}
} else if (closeWasCalled) {
} else if (closeWasCalled || initFuture.isCancelled()) {
LOG.debug(
"[{}] New channel opened ({}) but the control connection was closed, closing it",
logPrefix,
Expand Down Expand Up @@ -360,6 +382,10 @@ private void connect(
}

private void onSuccessfulReconnect() {
// If reconnectOnFailure was true and we've never connected before, complete the future now,
// otherwise it's already complete and this is a no-op.
initFuture.complete(null);

// Always perform a full refresh (we don't know how long we were disconnected)
context
.metadataManager()
Expand Down
Expand Up @@ -60,6 +60,7 @@ public class DefaultTopologyMonitor implements TopologyMonitor {
private final ControlConnection controlConnection;
private final AddressTranslator addressTranslator;
private final Duration timeout;
private final boolean reconnectOnInit;
private final CompletableFuture<Void> closeFuture;

@VisibleForTesting volatile boolean isSchemaV2;
Expand All @@ -72,6 +73,7 @@ public DefaultTopologyMonitor(InternalDriverContext context) {
this.addressTranslator = context.addressTranslator();
DriverConfigProfile config = context.config().getDefaultProfile();
this.timeout = config.getDuration(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT);
this.reconnectOnInit = config.getBoolean(DefaultDriverOption.RECONNECT_ON_INIT);
this.closeFuture = new CompletableFuture<>();
// Set this to true initially, after the first refreshNodes is called this will either stay true
// or be set to false;
Expand All @@ -83,7 +85,12 @@ public CompletionStage<Void> init() {
if (closeFuture.isDone()) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
return controlConnection.init(true, false);
return controlConnection.init(true, reconnectOnInit);
}

@Override
public CompletionStage<Void> initFuture() {
return controlConnection.initFuture();
}

@Override
Expand Down
Expand Up @@ -403,10 +403,16 @@ private void startSchemaRequest(CompletableFuture<Metadata> future) {

// The control connection may or may not have been initialized already by TopologyMonitor.
private CompletionStage<Void> maybeInitControlConnection() {
return firstSchemaRefreshFuture.isDone()
// Not the first schema refresh, so we know init was attempted already
? firstSchemaRefreshFuture
: controlConnection.init(false, true);
if (firstSchemaRefreshFuture.isDone()) {
// Not the first schema refresh, so we know init was attempted already
return firstSchemaRefreshFuture;
} else {
controlConnection.init(false, true);
// The control connection might fail to connect and reattempt, but for the metadata refresh
// that led us here we only care about the first attempt (metadata is not vital, so if we
// can't get it right now it's OK to move on)
return controlConnection.firstConnectionAttemptFuture();
}
}

private Void parseAndApplySchemaRows(SchemaRows schemaRows) {
Expand Down
Expand Up @@ -40,9 +40,32 @@
*/
public interface TopologyMonitor extends AsyncAutoCloseable {

/** Triggers the initialization of the monitor. */
/**
* Triggers the initialization of the monitor.
*
* <p>The completion of the future returned by this method marks the point when the driver
* considers itself "connected" to the cluster, and proceeds with the rest of the initialization:
* refreshing the list of nodes and the metadata, opening connection pools, etc.
*
* <p>If {@code advanced.reconnect-on-init = true} in the configuration, this method is
* responsible for handling reconnection. That is, if the initial attempt to "connect" to the
* cluster fails, it must schedule reattempts, and only complete the returned future when
* connection eventually succeeds. If the user cancels the returned future, then the reconnection
* attempts should stop.
*
* <p>If this method is called multiple times, it should trigger initialization only once, and
* return the same future on subsequent invocations.
*/
CompletionStage<Void> init();

/**
* The future returned by {@link #init()}.
*
* <p>Note that this method may be called before {@link #init()}; at that stage, the future should
* already exist, but be incomplete.
*/
CompletionStage<Void> initFuture();

/**
* Invoked when the driver needs to refresh the information about an existing node. This is called
* when the node was back and comes back up.
Expand Down
Expand Up @@ -240,6 +240,8 @@ private SingleThreaded(InternalDriverContext context, Set<InetSocketAddress> con
.eventBus()
.register(
NodeStateEvent.class, RunOrSchedule.on(adminExecutor, this::onNodeStateChanged));
CompletableFutures.propagateCancellation(
this.initFuture, context.topologyMonitor().initFuture());
}

private void init(CqlIdentifier keyspace) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.datastax.oss.driver.api.core.DriverExecutionException;
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -138,4 +139,18 @@ public static <T> CompletableFuture<T> wrap(Supplier<T> supplier) {
return failedFuture(t);
}
}

public static void whenCancelled(CompletionStage<?> stage, Runnable action) {
stage.exceptionally(
(error) -> {
if (error instanceof CancellationException) {
action.run();
}
return null;
});
}

public static void propagateCancellation(CompletionStage<?> source, CompletionStage<?> target) {
whenCancelled(source, () -> target.toCompletableFuture().cancel(true));
}
}
15 changes: 15 additions & 0 deletions core/src/main/resources/reference.conf
Expand Up @@ -268,6 +268,21 @@ datastax-java-driver {
warn-on-init-error = true
}

# Whether to schedule reconnection attempts if all contact points are unreachable on the first
# initialization attempt.
#
# If this is true, the driver will retry according to the reconnection policy. The
# `SessionBuilder.build()` call -- or the future returned by `SessionBuilder.buildAsync()` --
# won't complete until a contact point has been reached.
#
# If this is false and no contact points are available, the driver will fail with an
# AllNodesFailedException.
#
# Required: yes
# Modifiable at runtime: no
# Overridable in a profile: no
advanced.reconnect-on-init = false

# The policy that controls how often the driver tries to re-establish connections to down nodes.
#
# Required: yes
Expand Down
Expand Up @@ -18,38 +18,79 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.driver.internal.testinfra.session.TestConfigLoader;
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
import com.datastax.oss.simulacron.server.RejectScope;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(ParallelizableTests.class)
public class ConnectIT {
@ClassRule public static CcmRule ccm = CcmRule.getInstance();

@ClassRule public static SessionRule<CqlSession> sessionRule = SessionRule.builder(ccm).build();
@ClassRule
public static SimulacronRule simulacronRule =
new SimulacronRule(ClusterSpec.builder().withNodes(1));

@Test
public void should_connect_to_existing_keyspace() {
CqlIdentifier keyspace = sessionRule.keyspace();
try (Session session = SessionUtils.newSession(ccm, keyspace)) {
assertThat(session.getKeyspace()).isEqualTo(keyspace);
}
@Before
public void setup() {
simulacronRule.cluster().acceptConnections();
}

@Test(expected = AllNodesFailedException.class)
public void should_fail_fast_if_contact_points_unreachable_and_reconnection_disabled() {
// Given
simulacronRule.cluster().rejectConnections(0, RejectScope.STOP);

// When
SessionUtils.newSession(simulacronRule);

// Then the exception is thrown
}

@Test
public void should_connect_with_no_keyspace() {
try (Session session = SessionUtils.newSession(ccm)) {
assertThat(session.getKeyspace()).isNull();
}
public void should_wait_for_contact_points_if_reconnection_enabled() throws Exception {
// Given
simulacronRule.cluster().rejectConnections(0, RejectScope.STOP);

// When
CompletableFuture<? extends Session> sessionFuture =
newSessionAsync(
simulacronRule,
"advanced.reconnect-on-init = true",
// Use a short delay so we don't have to wait too long:
"advanced.reconnection-policy.class = ConstantReconnectionPolicy",
"advanced.reconnection-policy.base-delay = 500 milliseconds")
.toCompletableFuture();
// wait a bit to ensure we have a couple of reconnections, otherwise we might race and allow
// reconnections before the initial attempt
TimeUnit.SECONDS.sleep(2);

// Then
assertThat(sessionFuture).isNotCompleted();

// When
simulacronRule.cluster().acceptConnections();

// Then this doesn't throw
Session session = sessionFuture.get(2, TimeUnit.SECONDS);

session.close();
}

@Test(expected = InvalidKeyspaceException.class)
public void should_fail_to_connect_to_non_existent_keyspace() {
CqlIdentifier keyspace = CqlIdentifier.fromInternal("does not exist");
SessionUtils.newSession(ccm, keyspace);
@SuppressWarnings("unchecked")
private CompletionStage<? extends Session> newSessionAsync(
SimulacronRule serverRule, String... options) {
return SessionUtils.baseBuilder()
.addContactPoints(serverRule.getContactPoints())
.withConfigLoader(new TestConfigLoader(options))
.buildAsync();
}
}

0 comments on commit 3db7a83

Please sign in to comment.