Skip to content

Commit

Permalink
JAVA-2077: Allow reconnection policy to distinguish first connection …
Browse files Browse the repository at this point in the history
…attempt

This allows the policy to generate a different schedule for the initial
connection attempt: that is, when it failed to reach any contact point
at startup, and reconnect-on-init is set.
  • Loading branch information
olim7t committed Jan 24, 2019
1 parent 4f889cc commit 6b0e299
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 45 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Expand Up @@ -4,6 +4,7 @@

### 4.0.0-rc1 (in progress)

- [improvement] JAVA-2077: Allow reconnection policy to detect first connection attempt
- [improvement] JAVA-2067: Publish javadocs JAR for the shaded module
- [improvement] JAVA-2103: Expose partitioner name in TokenMap API
- [documentation] JAVA-2075: Document preference for LZ4 over Snappy
Expand Down
13 changes: 13 additions & 0 deletions core/revapi.json
Expand Up @@ -698,6 +698,19 @@
"newArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1-SNAPSHOT",
"elementKind": "method",
"justification": "JAVA-2103: Expose partitioner name in TokenMap API"
},
{
"code": "java.method.numberOfParametersChanged",
"old": "method com.datastax.oss.driver.api.core.connection.ReconnectionPolicy.ReconnectionSchedule com.datastax.oss.driver.api.core.connection.ReconnectionPolicy::newControlConnectionSchedule()",
"new": "method com.datastax.oss.driver.api.core.connection.ReconnectionPolicy.ReconnectionSchedule com.datastax.oss.driver.api.core.connection.ReconnectionPolicy::newControlConnectionSchedule(boolean)",
"package": "com.datastax.oss.driver.api.core.connection",
"classQualifiedName": "com.datastax.oss.driver.api.core.connection.ReconnectionPolicy",
"classSimpleName": "ReconnectionPolicy",
"methodName": "newControlConnectionSchedule",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-beta2",
"newArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1-SNAPSHOT",
"elementKind": "method",
"justification": "JAVA-2077: Allow reconnection policy to detect first connection attempt"
}
]
}
Expand Down
Expand Up @@ -36,8 +36,10 @@
* for a node does not have its configured number of connections (see {@code
* advanced.connection.pool.*.size} in the configuration), a reconnection starts for that
* pool.
* <li>{@linkplain #newControlConnectionSchedule() for the control connection}: when the control
* node goes down, a reconnection starts to find another node to replace it.
* <li>{@linkplain #newControlConnectionSchedule(boolean) for the control connection}: when the
* control node goes down, a reconnection starts to find another node to replace it. This is
* also used if the configuration option {@code advanced.reconnect-on-init} is set and the
* driver has to retry the initial connection.
* </ul>
*
* This interface defines separate methods for those two cases, but implementations are free to
Expand All @@ -49,9 +51,21 @@ public interface ReconnectionPolicy extends AutoCloseable {
@NonNull
ReconnectionSchedule newNodeSchedule(@NonNull Node node);

/** Creates a new schedule for the control connection. */
/**
* Creates a new schedule for the control connection.
*
* @param isInitialConnection whether this schedule is generated for the driver's initial attempt
* to connect to the cluster.
* <ul>
* <li>{@code true} means that the configuration option {@code advanced.reconnect-on-init}
* is set, the driver failed to reach any contact point, and it is now scheduling
* reattempts.
* <li>{@code false} means that the driver was already initialized, lost connection to the
* control node, and is now scheduling attempts to connect to another node.
* </ul>
*/
@NonNull
ReconnectionSchedule newControlConnectionSchedule();
ReconnectionSchedule newControlConnectionSchedule(boolean isInitialConnection);

/** Called when the cluster that this policy is associated with closes. */
@Override
Expand Down
Expand Up @@ -74,7 +74,8 @@ public ReconnectionSchedule newNodeSchedule(@NonNull Node node) {

@NonNull
@Override
public ReconnectionSchedule newControlConnectionSchedule() {
public ReconnectionSchedule newControlConnectionSchedule(
@SuppressWarnings("ignored") boolean isInitialConnection) {
LOG.debug("[{}] Creating new schedule for the control connection", logPrefix);
return schedule;
}
Expand Down
Expand Up @@ -124,7 +124,8 @@ public ReconnectionSchedule newNodeSchedule(@NonNull Node node) {

@NonNull
@Override
public ReconnectionSchedule newControlConnectionSchedule() {
public ReconnectionSchedule newControlConnectionSchedule(
@SuppressWarnings("ignored") boolean isInitialConnection) {
LOG.debug("[{}] Creating new schedule for the control connection", logPrefix);
return new ExponentialSchedule();
}
Expand Down
Expand Up @@ -26,15 +26,17 @@
import com.datastax.oss.driver.internal.core.channel.DriverChannelOptions;
import com.datastax.oss.driver.internal.core.channel.EventCallback;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.DefaultTopologyMonitor;
import com.datastax.oss.driver.internal.core.metadata.DistanceEvent;
import com.datastax.oss.driver.internal.core.metadata.MetadataManager;
import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent;
import com.datastax.oss.driver.internal.core.metadata.TopologyEvent;
import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.internal.core.util.concurrent.Reconnection;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.ProtocolConstants;
Expand All @@ -57,15 +59,19 @@
import org.slf4j.LoggerFactory;

/**
* Maintains a dedicated connection to a Cassandra node for administrative queries: schema
* refreshes, and cluster topology queries and events.
* Maintains a dedicated connection to a Cassandra node for administrative queries.
*
* <p>If the control node goes down, a reconnection is triggered. The control node is chosen
* randomly among the contact points at startup, or according to the load balancing policy for later
* reconnections.
*
* <p>If a custom {@link TopologyMonitor} is used, the control connection is used only for schema
* refreshes; if schema metadata is also disabled, the control connection never initializes.
* <p>The control connection is used by:
*
* <ul>
* <li>{@link DefaultTopologyMonitor} to determine cluster connectivity and retrieve node
* metadata;
* <li>{@link MetadataManager} to run schema metadata queries.
* </ul>
*/
@ThreadSafe
public class ControlConnection implements EventCallback, AsyncAutoCloseable {
Expand All @@ -88,16 +94,31 @@ public ControlConnection(InternalDriverContext context) {
}

/**
* Initializes the control connection. If it is already initialized, this is a no-op and all
* parameters are ignored.
*
* @param listenToClusterEvents whether to register for TOPOLOGY_CHANGE and STATUS_CHANGE events.
* If the control connection has already initialized with another value, this is ignored.
* SCHEMA_CHANGE events are always registered.
* @param reconnectOnFailure whether to schedule a reconnection if the initial attempt fails (this
* does not affect the returned future, which always represent the outcome of the initial
* attempt only).
* @param useInitialReconnectionSchedule if no node can be reached, the type of reconnection
* schedule to use. In other words, the value that will be passed to {@link
* ReconnectionPolicy#newControlConnectionSchedule(boolean)}.
*/
public CompletionStage<Void> init(boolean listenToClusterEvents, boolean reconnectOnFailure) {
public CompletionStage<Void> init(
boolean listenToClusterEvents,
boolean reconnectOnFailure,
boolean useInitialReconnectionSchedule) {
Preconditions.checkArgument(
reconnectOnFailure || !useInitialReconnectionSchedule,
"Can't set useInitialReconnectionSchedule if reconnectOnFailure is false");
RunOrSchedule.on(
adminExecutor, () -> singleThreaded.init(listenToClusterEvents, reconnectOnFailure));
adminExecutor,
() ->
singleThreaded.init(
listenToClusterEvents, reconnectOnFailure, useInitialReconnectionSchedule));
return singleThreaded.initFuture;
}

Expand Down Expand Up @@ -217,6 +238,7 @@ private class SingleThreaded {
private boolean initWasCalled;
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private boolean closeWasCalled;
private final ReconnectionPolicy reconnectionPolicy;
private final Reconnection reconnection;
private DriverChannelOptions channelOptions;
// The last events received for each node
Expand All @@ -225,12 +247,12 @@ private class SingleThreaded {

private SingleThreaded(InternalDriverContext context) {
this.context = context;
ReconnectionPolicy reconnectionPolicy = context.getReconnectionPolicy();
this.reconnectionPolicy = context.getReconnectionPolicy();
this.reconnection =
new Reconnection(
logPrefix,
adminExecutor,
reconnectionPolicy::newControlConnectionSchedule,
() -> reconnectionPolicy.newControlConnectionSchedule(false),
this::reconnect);
// In "reconnect-on-init" mode, handle cancellation of the initFuture by user code
CompletableFutures.whenCancelled(
Expand All @@ -248,7 +270,10 @@ private SingleThreaded(InternalDriverContext context) {
.register(NodeStateEvent.class, RunOrSchedule.on(adminExecutor, this::onStateEvent));
}

private void init(boolean listenToClusterEvents, boolean reconnectOnFailure) {
private void init(
boolean listenToClusterEvents,
boolean reconnectOnFailure,
boolean useInitialReconnectionSchedule) {
assert adminExecutor.inEventLoop();
if (initWasCalled) {
return;
Expand All @@ -274,7 +299,9 @@ private void init(boolean listenToClusterEvents, boolean reconnectOnFailure) {
},
error -> {
if (reconnectOnFailure && !closeWasCalled) {
reconnection.start();
reconnection.start(
reconnectionPolicy.newControlConnectionSchedule(
useInitialReconnectionSchedule));
} else {
// Special case for the initial connection: reword to a more user-friendly error
// message
Expand Down
Expand Up @@ -89,7 +89,7 @@ public CompletionStage<Void> init() {
if (closeFuture.isDone()) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
return controlConnection.init(true, reconnectOnInit);
return controlConnection.init(true, reconnectOnInit, true);
}

@Override
Expand Down
Expand Up @@ -411,7 +411,7 @@ private CompletionStage<Void> maybeInitControlConnection() {
// Not the first schema refresh, so we know init was attempted already
return firstSchemaRefreshFuture;
} else {
controlConnection.init(false, true);
controlConnection.init(false, true, false);
// The control connection might fail to connect and reattempt, but for the metadata refresh
// that led us here we only care about the first attempt (metadata is not vital, so if we
// can't get it right now it's OK to move on)
Expand Down
Expand Up @@ -98,6 +98,10 @@ public boolean isRunning() {

/** This is a no-op if the reconnection is already running. */
public void start() {
start(null);
}

public void start(ReconnectionSchedule customSchedule) {
assert executor.inEventLoop();
switch (state) {
case SCHEDULED:
Expand All @@ -109,7 +113,7 @@ public void start() {
state = State.ATTEMPT_IN_PROGRESS;
break;
case STOPPED:
reconnectionSchedule = scheduleSupplier.get();
reconnectionSchedule = (customSchedule == null) ? scheduleSupplier.get() : customSchedule;
onStart.run();
scheduleNextAttempt();
break;
Expand Down
Expand Up @@ -52,7 +52,7 @@ public void setup() {
@Test
public void should_generate_exponential_delay_with_jitter() throws Exception {
ExponentialReconnectionPolicy policy = new ExponentialReconnectionPolicy(driverContext);
ReconnectionPolicy.ReconnectionSchedule schedule = policy.newControlConnectionSchedule();
ReconnectionPolicy.ReconnectionSchedule schedule = policy.newControlConnectionSchedule(false);
// generate a number of delays and make sure they are all within the base/max values range
for (int i = 0; i < 128; ++i) {
// compute the min and max delays based on attempt count (i)
Expand Down
Expand Up @@ -44,7 +44,7 @@ public void should_register_for_all_events_if_topology_requested() {
.thenReturn(CompletableFuture.completedFuture(channel1));

// When
controlConnection.init(true, false);
controlConnection.init(true, false, false);
waitForPendingAdminTasks();
DriverChannelOptions channelOptions = optionsCaptor.getValue();

Expand All @@ -67,7 +67,7 @@ public void should_register_for_schema_events_only_if_topology_not_requested() {
.thenReturn(CompletableFuture.completedFuture(channel1));

// When
controlConnection.init(false, false);
controlConnection.init(false, false, false);
waitForPendingAdminTasks();
DriverChannelOptions channelOptions = optionsCaptor.getValue();

Expand All @@ -85,7 +85,7 @@ public void should_process_status_change_events() {
ArgumentCaptor.forClass(DriverChannelOptions.class);
Mockito.when(channelFactory.connect(eq(node1), optionsCaptor.capture()))
.thenReturn(CompletableFuture.completedFuture(channel1));
controlConnection.init(true, false);
controlConnection.init(true, false, false);
waitForPendingAdminTasks();
EventCallback callback = optionsCaptor.getValue().eventCallback;
StatusChangeEvent event =
Expand All @@ -107,7 +107,7 @@ public void should_process_topology_change_events() {
ArgumentCaptor.forClass(DriverChannelOptions.class);
Mockito.when(channelFactory.connect(eq(node1), optionsCaptor.capture()))
.thenReturn(CompletableFuture.completedFuture(channel1));
controlConnection.init(true, false);
controlConnection.init(true, false, false);
waitForPendingAdminTasks();
EventCallback callback = optionsCaptor.getValue().eventCallback;
TopologyChangeEvent event =
Expand All @@ -129,7 +129,7 @@ public void should_process_schema_change_events() {
ArgumentCaptor.forClass(DriverChannelOptions.class);
Mockito.when(channelFactory.connect(eq(node1), optionsCaptor.capture()))
.thenReturn(CompletableFuture.completedFuture(channel1));
controlConnection.init(false, false);
controlConnection.init(false, false, false);
waitForPendingAdminTasks();
EventCallback callback = optionsCaptor.getValue().eventCallback;
SchemaChangeEvent event =
Expand Down

0 comments on commit 6b0e299

Please sign in to comment.