diff --git a/changelog/README.md b/changelog/README.md
index 1b07c2eb26e..6763419c4f7 100644
--- a/changelog/README.md
+++ b/changelog/README.md
@@ -4,6 +4,7 @@
### 4.0.0-rc1 (in progress)
+- [improvement] JAVA-2077: Allow reconnection policy to detect first connection attempt
- [improvement] JAVA-2067: Publish javadocs JAR for the shaded module
- [improvement] JAVA-2103: Expose partitioner name in TokenMap API
- [documentation] JAVA-2075: Document preference for LZ4 over Snappy
diff --git a/core/revapi.json b/core/revapi.json
index bc53052e86f..52006e65669 100644
--- a/core/revapi.json
+++ b/core/revapi.json
@@ -698,6 +698,19 @@
"newArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1-SNAPSHOT",
"elementKind": "method",
"justification": "JAVA-2103: Expose partitioner name in TokenMap API"
+ },
+ {
+ "code": "java.method.numberOfParametersChanged",
+ "old": "method com.datastax.oss.driver.api.core.connection.ReconnectionPolicy.ReconnectionSchedule com.datastax.oss.driver.api.core.connection.ReconnectionPolicy::newControlConnectionSchedule()",
+ "new": "method com.datastax.oss.driver.api.core.connection.ReconnectionPolicy.ReconnectionSchedule com.datastax.oss.driver.api.core.connection.ReconnectionPolicy::newControlConnectionSchedule(boolean)",
+ "package": "com.datastax.oss.driver.api.core.connection",
+ "classQualifiedName": "com.datastax.oss.driver.api.core.connection.ReconnectionPolicy",
+ "classSimpleName": "ReconnectionPolicy",
+ "methodName": "newControlConnectionSchedule",
+ "oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-beta2",
+ "newArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1-SNAPSHOT",
+ "elementKind": "method",
+ "justification": "JAVA-2077: Allow reconnection policy to detect first connection attempt"
}
]
}
diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/connection/ReconnectionPolicy.java b/core/src/main/java/com/datastax/oss/driver/api/core/connection/ReconnectionPolicy.java
index fedb0ed1ebc..083e83950c6 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/connection/ReconnectionPolicy.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/connection/ReconnectionPolicy.java
@@ -36,8 +36,10 @@
* for a node does not have its configured number of connections (see {@code
* advanced.connection.pool.*.size} in the configuration), a reconnection starts for that
* pool.
- *
{@linkplain #newControlConnectionSchedule() for the control connection}: when the control
- * node goes down, a reconnection starts to find another node to replace it.
+ * {@linkplain #newControlConnectionSchedule(boolean) for the control connection}: when the
+ * control node goes down, a reconnection starts to find another node to replace it. This is
+ * also used if the configuration option {@code advanced.reconnect-on-init} is set and the
+ * driver has to retry the initial connection.
*
*
* This interface defines separate methods for those two cases, but implementations are free to
@@ -49,9 +51,21 @@ public interface ReconnectionPolicy extends AutoCloseable {
@NonNull
ReconnectionSchedule newNodeSchedule(@NonNull Node node);
- /** Creates a new schedule for the control connection. */
+ /**
+ * Creates a new schedule for the control connection.
+ *
+ * @param isInitialConnection whether this schedule is generated for the driver's initial attempt
+ * to connect to the cluster.
+ *
+ * - {@code true} means that the configuration option {@code advanced.reconnect-on-init}
+ * is set, the driver failed to reach any contact point, and it is now scheduling
+ * reattempts.
+ *
- {@code false} means that the driver was already initialized, lost connection to the
+ * control node, and is now scheduling attempts to connect to another node.
+ *
+ */
@NonNull
- ReconnectionSchedule newControlConnectionSchedule();
+ ReconnectionSchedule newControlConnectionSchedule(boolean isInitialConnection);
/** Called when the cluster that this policy is associated with closes. */
@Override
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/connection/ConstantReconnectionPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/connection/ConstantReconnectionPolicy.java
index 6d89fd14c64..ccead526906 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/connection/ConstantReconnectionPolicy.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/connection/ConstantReconnectionPolicy.java
@@ -74,7 +74,8 @@ public ReconnectionSchedule newNodeSchedule(@NonNull Node node) {
@NonNull
@Override
- public ReconnectionSchedule newControlConnectionSchedule() {
+ public ReconnectionSchedule newControlConnectionSchedule(
+ @SuppressWarnings("ignored") boolean isInitialConnection) {
LOG.debug("[{}] Creating new schedule for the control connection", logPrefix);
return schedule;
}
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/connection/ExponentialReconnectionPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/connection/ExponentialReconnectionPolicy.java
index d8cf728e847..1d90f6f803f 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/connection/ExponentialReconnectionPolicy.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/connection/ExponentialReconnectionPolicy.java
@@ -124,7 +124,8 @@ public ReconnectionSchedule newNodeSchedule(@NonNull Node node) {
@NonNull
@Override
- public ReconnectionSchedule newControlConnectionSchedule() {
+ public ReconnectionSchedule newControlConnectionSchedule(
+ @SuppressWarnings("ignored") boolean isInitialConnection) {
LOG.debug("[{}] Creating new schedule for the control connection", logPrefix);
return new ExponentialSchedule();
}
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java
index b5ad2235c64..1b717f4cb34 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java
@@ -26,15 +26,17 @@
import com.datastax.oss.driver.internal.core.channel.DriverChannelOptions;
import com.datastax.oss.driver.internal.core.channel.EventCallback;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
+import com.datastax.oss.driver.internal.core.metadata.DefaultTopologyMonitor;
import com.datastax.oss.driver.internal.core.metadata.DistanceEvent;
+import com.datastax.oss.driver.internal.core.metadata.MetadataManager;
import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent;
import com.datastax.oss.driver.internal.core.metadata.TopologyEvent;
-import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.internal.core.util.concurrent.Reconnection;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
+import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.ProtocolConstants;
@@ -57,15 +59,19 @@
import org.slf4j.LoggerFactory;
/**
- * Maintains a dedicated connection to a Cassandra node for administrative queries: schema
- * refreshes, and cluster topology queries and events.
+ * Maintains a dedicated connection to a Cassandra node for administrative queries.
*
* If the control node goes down, a reconnection is triggered. The control node is chosen
* randomly among the contact points at startup, or according to the load balancing policy for later
* reconnections.
*
- *
If a custom {@link TopologyMonitor} is used, the control connection is used only for schema
- * refreshes; if schema metadata is also disabled, the control connection never initializes.
+ *
The control connection is used by:
+ *
+ *
+ * - {@link DefaultTopologyMonitor} to determine cluster connectivity and retrieve node
+ * metadata;
+ *
- {@link MetadataManager} to run schema metadata queries.
+ *
*/
@ThreadSafe
public class ControlConnection implements EventCallback, AsyncAutoCloseable {
@@ -88,16 +94,31 @@ public ControlConnection(InternalDriverContext context) {
}
/**
+ * Initializes the control connection. If it is already initialized, this is a no-op and all
+ * parameters are ignored.
+ *
* @param listenToClusterEvents whether to register for TOPOLOGY_CHANGE and STATUS_CHANGE events.
* If the control connection has already initialized with another value, this is ignored.
* SCHEMA_CHANGE events are always registered.
* @param reconnectOnFailure whether to schedule a reconnection if the initial attempt fails (this
* does not affect the returned future, which always represent the outcome of the initial
* attempt only).
+ * @param useInitialReconnectionSchedule if no node can be reached, the type of reconnection
+ * schedule to use. In other words, the value that will be passed to {@link
+ * ReconnectionPolicy#newControlConnectionSchedule(boolean)}.
*/
- public CompletionStage init(boolean listenToClusterEvents, boolean reconnectOnFailure) {
+ public CompletionStage init(
+ boolean listenToClusterEvents,
+ boolean reconnectOnFailure,
+ boolean useInitialReconnectionSchedule) {
+ Preconditions.checkArgument(
+ reconnectOnFailure || !useInitialReconnectionSchedule,
+ "Can't set useInitialReconnectionSchedule if reconnectOnFailure is false");
RunOrSchedule.on(
- adminExecutor, () -> singleThreaded.init(listenToClusterEvents, reconnectOnFailure));
+ adminExecutor,
+ () ->
+ singleThreaded.init(
+ listenToClusterEvents, reconnectOnFailure, useInitialReconnectionSchedule));
return singleThreaded.initFuture;
}
@@ -217,6 +238,7 @@ private class SingleThreaded {
private boolean initWasCalled;
private final CompletableFuture closeFuture = new CompletableFuture<>();
private boolean closeWasCalled;
+ private final ReconnectionPolicy reconnectionPolicy;
private final Reconnection reconnection;
private DriverChannelOptions channelOptions;
// The last events received for each node
@@ -225,12 +247,12 @@ private class SingleThreaded {
private SingleThreaded(InternalDriverContext context) {
this.context = context;
- ReconnectionPolicy reconnectionPolicy = context.getReconnectionPolicy();
+ this.reconnectionPolicy = context.getReconnectionPolicy();
this.reconnection =
new Reconnection(
logPrefix,
adminExecutor,
- reconnectionPolicy::newControlConnectionSchedule,
+ () -> reconnectionPolicy.newControlConnectionSchedule(false),
this::reconnect);
// In "reconnect-on-init" mode, handle cancellation of the initFuture by user code
CompletableFutures.whenCancelled(
@@ -248,7 +270,10 @@ private SingleThreaded(InternalDriverContext context) {
.register(NodeStateEvent.class, RunOrSchedule.on(adminExecutor, this::onStateEvent));
}
- private void init(boolean listenToClusterEvents, boolean reconnectOnFailure) {
+ private void init(
+ boolean listenToClusterEvents,
+ boolean reconnectOnFailure,
+ boolean useInitialReconnectionSchedule) {
assert adminExecutor.inEventLoop();
if (initWasCalled) {
return;
@@ -274,7 +299,9 @@ private void init(boolean listenToClusterEvents, boolean reconnectOnFailure) {
},
error -> {
if (reconnectOnFailure && !closeWasCalled) {
- reconnection.start();
+ reconnection.start(
+ reconnectionPolicy.newControlConnectionSchedule(
+ useInitialReconnectionSchedule));
} else {
// Special case for the initial connection: reword to a more user-friendly error
// message
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
index 4801e1e12db..84eee637fbf 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java
@@ -89,7 +89,7 @@ public CompletionStage init() {
if (closeFuture.isDone()) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
- return controlConnection.init(true, reconnectOnInit);
+ return controlConnection.init(true, reconnectOnInit, true);
}
@Override
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java
index ce51374e9f1..d7b711bc9e5 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java
@@ -411,7 +411,7 @@ private CompletionStage maybeInitControlConnection() {
// Not the first schema refresh, so we know init was attempted already
return firstSchemaRefreshFuture;
} else {
- controlConnection.init(false, true);
+ controlConnection.init(false, true, false);
// The control connection might fail to connect and reattempt, but for the metadata refresh
// that led us here we only care about the first attempt (metadata is not vital, so if we
// can't get it right now it's OK to move on)
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Reconnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Reconnection.java
index eaf2a0eba75..d40265bd09a 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Reconnection.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Reconnection.java
@@ -98,6 +98,10 @@ public boolean isRunning() {
/** This is a no-op if the reconnection is already running. */
public void start() {
+ start(null);
+ }
+
+ public void start(ReconnectionSchedule customSchedule) {
assert executor.inEventLoop();
switch (state) {
case SCHEDULED:
@@ -109,7 +113,7 @@ public void start() {
state = State.ATTEMPT_IN_PROGRESS;
break;
case STOPPED:
- reconnectionSchedule = scheduleSupplier.get();
+ reconnectionSchedule = (customSchedule == null) ? scheduleSupplier.get() : customSchedule;
onStart.run();
scheduleNextAttempt();
break;
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/connection/ExponentialReconnectionPolicyTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/connection/ExponentialReconnectionPolicyTest.java
index c59cc273a94..85b3bb83ef3 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/connection/ExponentialReconnectionPolicyTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/connection/ExponentialReconnectionPolicyTest.java
@@ -52,7 +52,7 @@ public void setup() {
@Test
public void should_generate_exponential_delay_with_jitter() throws Exception {
ExponentialReconnectionPolicy policy = new ExponentialReconnectionPolicy(driverContext);
- ReconnectionPolicy.ReconnectionSchedule schedule = policy.newControlConnectionSchedule();
+ ReconnectionPolicy.ReconnectionSchedule schedule = policy.newControlConnectionSchedule(false);
// generate a number of delays and make sure they are all within the base/max values range
for (int i = 0; i < 128; ++i) {
// compute the min and max delays based on attempt count (i)
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java
index e77427c6e3f..5bb888c5cfe 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java
@@ -44,7 +44,7 @@ public void should_register_for_all_events_if_topology_requested() {
.thenReturn(CompletableFuture.completedFuture(channel1));
// When
- controlConnection.init(true, false);
+ controlConnection.init(true, false, false);
waitForPendingAdminTasks();
DriverChannelOptions channelOptions = optionsCaptor.getValue();
@@ -67,7 +67,7 @@ public void should_register_for_schema_events_only_if_topology_not_requested() {
.thenReturn(CompletableFuture.completedFuture(channel1));
// When
- controlConnection.init(false, false);
+ controlConnection.init(false, false, false);
waitForPendingAdminTasks();
DriverChannelOptions channelOptions = optionsCaptor.getValue();
@@ -85,7 +85,7 @@ public void should_process_status_change_events() {
ArgumentCaptor.forClass(DriverChannelOptions.class);
Mockito.when(channelFactory.connect(eq(node1), optionsCaptor.capture()))
.thenReturn(CompletableFuture.completedFuture(channel1));
- controlConnection.init(true, false);
+ controlConnection.init(true, false, false);
waitForPendingAdminTasks();
EventCallback callback = optionsCaptor.getValue().eventCallback;
StatusChangeEvent event =
@@ -107,7 +107,7 @@ public void should_process_topology_change_events() {
ArgumentCaptor.forClass(DriverChannelOptions.class);
Mockito.when(channelFactory.connect(eq(node1), optionsCaptor.capture()))
.thenReturn(CompletableFuture.completedFuture(channel1));
- controlConnection.init(true, false);
+ controlConnection.init(true, false, false);
waitForPendingAdminTasks();
EventCallback callback = optionsCaptor.getValue().eventCallback;
TopologyChangeEvent event =
@@ -129,7 +129,7 @@ public void should_process_schema_change_events() {
ArgumentCaptor.forClass(DriverChannelOptions.class);
Mockito.when(channelFactory.connect(eq(node1), optionsCaptor.capture()))
.thenReturn(CompletableFuture.completedFuture(channel1));
- controlConnection.init(false, false);
+ controlConnection.init(false, false, false);
waitForPendingAdminTasks();
EventCallback callback = optionsCaptor.getValue().eventCallback;
SchemaChangeEvent event =
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java
index 3abf455d14d..b91a13dc482 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java
@@ -56,7 +56,7 @@ public void should_init_with_first_contact_point_if_reachable() {
MockChannelFactoryHelper.builder(channelFactory).success(node1, channel1).build();
// When
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
@@ -76,9 +76,9 @@ public void should_always_return_same_init_future() {
MockChannelFactoryHelper.builder(channelFactory).success(node1, channel1).build();
// When
- CompletionStage initFuture1 = controlConnection.init(false, false);
+ CompletionStage initFuture1 = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
- CompletionStage initFuture2 = controlConnection.init(false, false);
+ CompletionStage initFuture2 = controlConnection.init(false, false, false);
// Then
assertThatStage(initFuture1).isEqualTo(initFuture2);
@@ -97,7 +97,7 @@ public void should_init_with_second_contact_point_if_first_one_fails() {
.build();
// When
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
factoryHelper.waitForCall(node2);
waitForPendingAdminTasks();
@@ -123,7 +123,7 @@ public void should_fail_to_init_if_all_contact_points_fail() {
.build();
// When
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
factoryHelper.waitForCall(node2);
waitForPendingAdminTasks();
@@ -151,7 +151,7 @@ public void should_reconnect_if_channel_goes_down() throws Exception {
.success(node2, channel2)
.build();
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
@@ -190,7 +190,7 @@ public void should_reconnect_if_node_becomes_ignored() {
.success(node2, channel2)
.build();
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
@@ -238,7 +238,7 @@ private void should_reconnect_if_event(NodeStateEvent event) {
.success(node2, channel2)
.build();
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
@@ -282,7 +282,7 @@ public void should_reconnect_if_node_became_ignored_during_reconnection_attempt(
.success(node1, channel3)
.build();
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
@@ -339,7 +339,7 @@ private void should_reconnect_if_event_during_reconnection_attempt(NodeStateEven
.success(node1, channel3)
.build();
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
@@ -383,7 +383,7 @@ public void should_force_reconnection_if_pending() {
.success(node2, channel2)
.build();
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
assertThatStage(initFuture).isSuccess();
@@ -421,7 +421,7 @@ public void should_force_reconnection_even_if_connected() {
.success(node2, channel2)
.build();
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
assertThatStage(initFuture).isSuccess();
@@ -459,7 +459,7 @@ public void should_not_force_reconnection_if_closed() {
DriverChannel channel1 = newMockDriverChannel(1);
MockChannelFactoryHelper factoryHelper =
MockChannelFactoryHelper.builder(channelFactory).success(node1, channel1).build();
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
assertThatStage(initFuture).isSuccess();
@@ -483,7 +483,7 @@ public void should_close_channel_when_closing() {
MockChannelFactoryHelper factoryHelper =
MockChannelFactoryHelper.builder(channelFactory).success(node1, channel1).build();
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
assertThatStage(initFuture).isSuccess();
@@ -514,7 +514,7 @@ public void should_close_channel_if_closed_during_reconnection() {
.pending(node2, channel2Future)
.build();
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
assertThatStage(initFuture).isSuccess();
@@ -561,7 +561,7 @@ public void should_handle_channel_failure_if_closed_during_reconnection() {
.success(node2, channel2)
.build();
- CompletionStage initFuture = controlConnection.init(false, false);
+ CompletionStage initFuture = controlConnection.init(false, false, false);
factoryHelper.waitForCall(node1);
waitForPendingAdminTasks();
assertThatStage(initFuture).isSuccess();
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java
index cf2496f32dc..5c224594543 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java
@@ -19,6 +19,9 @@
import static org.mockito.ArgumentMatchers.any;
import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfig;
+import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.addresstranslation.PassThroughAddressTranslator;
@@ -57,6 +60,8 @@ abstract class ControlConnectionTestBase {
protected static final InetSocketAddress ADDRESS2 = new InetSocketAddress("127.0.0.2", 9042);
@Mock protected InternalDriverContext context;
+ @Mock protected DriverConfig config;
+ @Mock protected DriverExecutionProfile defaultProfile;
@Mock protected ReconnectionPolicy reconnectionPolicy;
@Mock protected ReconnectionPolicy.ReconnectionSchedule reconnectionSchedule;
@Mock protected NettyOptions nettyOptions;
@@ -95,8 +100,14 @@ public void setup() {
return channelFuture;
});
+ Mockito.when(context.getConfig()).thenReturn(config);
+ Mockito.when(config.getDefaultProfile()).thenReturn(defaultProfile);
+ Mockito.when(defaultProfile.getBoolean(DefaultDriverOption.RECONNECT_ON_INIT))
+ .thenReturn(false);
+
Mockito.when(context.getReconnectionPolicy()).thenReturn(reconnectionPolicy);
- Mockito.when(reconnectionPolicy.newControlConnectionSchedule())
+ // Child classes only cover "runtime" reconnections when the driver is already initialized
+ Mockito.when(reconnectionPolicy.newControlConnectionSchedule(false))
.thenReturn(reconnectionSchedule);
// By default, set a large reconnection delay. Tests that care about reconnection will override
// it.
diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
index ffcf23f8360..32ffd70cb30 100644
--- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
+++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java
@@ -102,7 +102,7 @@ public void should_initialize_control_connection() {
topologyMonitor.init();
// Then
- Mockito.verify(controlConnection).init(true, false);
+ Mockito.verify(controlConnection).init(true, false, false);
}
@Test
diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/api/core/ConnectIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/api/core/ConnectIT.java
index aa88e55b487..169516500a7 100644
--- a/integration-tests/src/test/java/com/datastax/oss/driver/api/core/ConnectIT.java
+++ b/integration-tests/src/test/java/com/datastax/oss/driver/api/core/ConnectIT.java
@@ -22,6 +22,7 @@
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule;
@@ -29,6 +30,7 @@
import com.datastax.oss.driver.internal.core.connection.ConstantReconnectionPolicy;
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
import com.datastax.oss.simulacron.server.RejectScope;
+import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -79,7 +81,7 @@ public void should_wait_for_contact_points_if_reconnection_enabled() throws Exce
SessionUtils.configLoaderBuilder()
.withBoolean(DefaultDriverOption.RECONNECT_ON_INIT, true)
.withClass(
- DefaultDriverOption.RECONNECTION_POLICY_CLASS, ConstantReconnectionPolicy.class)
+ DefaultDriverOption.RECONNECTION_POLICY_CLASS, InitOnlyReconnectionPolicy.class)
// Use a short delay so we don't have to wait too long:
.withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(500))
.build();
@@ -133,4 +135,28 @@ private CompletionStage extends Session> newSessionAsync(
.withConfigLoader(loader)
.buildAsync();
}
+
+ /**
+ * Test policy that fails if a "runtime" control connection schedule is requested.
+ *
+ * This is just to check that {@link #newControlConnectionSchedule(boolean)} is called with the
+ * correct boolean parameter.
+ */
+ public static class InitOnlyReconnectionPolicy extends ConstantReconnectionPolicy {
+
+ public InitOnlyReconnectionPolicy(DriverContext context) {
+ super(context);
+ }
+
+ @NonNull
+ @Override
+ public ReconnectionSchedule newControlConnectionSchedule(boolean isInitialConnection) {
+ if (isInitialConnection) {
+ return super.newControlConnectionSchedule(true);
+ } else {
+ throw new UnsupportedOperationException(
+ "should not be called with isInitialConnection==false");
+ }
+ }
+ }
}