From afca2b61485749204e6f405341a96d2b380515ae Mon Sep 17 00:00:00 2001 From: olim7t Date: Wed, 26 Apr 2017 18:17:36 -0700 Subject: [PATCH] Pass distance instead of size to pool --- .../api/core/config/CoreDriverOption.java | 3 + .../internal/core/pool/ChannelPool.java | 33 ++++-- core/src/main/resources/reference.conf | 13 +++ .../internal/core/pool/ChannelPoolTest.java | 104 +++++++++++------- 4 files changed, 104 insertions(+), 49 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/CoreDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/CoreDriverOption.java index 206b3030061..10cf933dd5d 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/CoreDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/CoreDriverOption.java @@ -41,6 +41,9 @@ public enum CoreDriverOption implements DriverOption { RECONNECTION_CONFIG_BASE_DELAY("connection.reconnection.config.base-delay", true), RECONNECTION_CONFIG_MAX_DELAY("connection.reconnection.config.max-delay", true), + POOLING_LOCAL_CONNECTIONS("pooling.local.connections", true), + POOLING_REMOTE_CONNECTIONS("pooling.remote.connections", true), + ADDRESS_TRANSLATOR_CLASS("address-translation.translator-class", true), AUTHENTICATION_PROVIDER_CLASS("authentication.provider-class", false), diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java index 0dd669c1b63..1380fc9383b 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java @@ -18,6 +18,9 @@ import com.datastax.oss.driver.api.core.AsyncAutoCloseable; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.InvalidKeyspaceException; +import com.datastax.oss.driver.api.core.config.CoreDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfig; +import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.internal.core.channel.ChannelEvent; import com.datastax.oss.driver.internal.core.channel.ChannelFactory; @@ -66,8 +69,8 @@ public class ChannelPool implements AsyncAutoCloseable { * channels (i.e. {@link #next()} return {@code null}) and is reconnecting. */ public static CompletionStage init( - Node node, CqlIdentifier keyspaceName, int channelCount, InternalDriverContext context) { - ChannelPool pool = new ChannelPool(node, keyspaceName, channelCount, context); + Node node, CqlIdentifier keyspaceName, NodeDistance distance, InternalDriverContext context) { + ChannelPool pool = new ChannelPool(node, keyspaceName, distance, context); return pool.connect(); } @@ -80,10 +83,10 @@ public static CompletionStage init( private volatile boolean invalidKeyspace; private ChannelPool( - Node node, CqlIdentifier keyspaceName, int channelCount, InternalDriverContext context) { + Node node, CqlIdentifier keyspaceName, NodeDistance distance, InternalDriverContext context) { this.node = node; this.adminExecutor = context.nettyOptions().adminEventExecutorGroup().next(); - this.singleThreaded = new SingleThreaded(keyspaceName, channelCount, context); + this.singleThreaded = new SingleThreaded(keyspaceName, distance, context); } private CompletionStage connect() { @@ -114,8 +117,8 @@ public DriverChannel next() { return channels.next(); } - public void resize(int newChannelCount) { - RunOrSchedule.on(adminExecutor, () -> singleThreaded.resize(newChannelCount)); + public void resize(NodeDistance newDistance) { + RunOrSchedule.on(adminExecutor, () -> singleThreaded.resize(newDistance)); } /** @@ -149,6 +152,7 @@ public CompletionStage forceCloseAsync() { /** Holds all administration tasks, that are confined to the admin executor. */ private class SingleThreaded { + private final DriverConfig config; private final ChannelFactory channelFactory; private final EventBus eventBus; // The channels that are currently connecting @@ -165,9 +169,10 @@ private class SingleThreaded { private CqlIdentifier keyspaceName; private SingleThreaded( - CqlIdentifier keyspaceName, int wantedCount, InternalDriverContext context) { + CqlIdentifier keyspaceName, NodeDistance distance, InternalDriverContext context) { this.keyspaceName = keyspaceName; - this.wantedCount = wantedCount; + this.config = context.config(); + this.wantedCount = computeSize(distance); this.channelFactory = context.channelFactory(); this.eventBus = context.eventBus(); this.reconnection = @@ -295,8 +300,9 @@ private void onChannelClosed(DriverChannel channel) { } } - private void resize(int newChannelCount) { + private void resize(NodeDistance newDistance) { assert adminExecutor.inEventLoop(); + int newChannelCount = computeSize(newDistance); if (newChannelCount > wantedCount) { LOG.debug("{} growing ({} => {} channels)", ChannelPool.this, wantedCount, newChannelCount); wantedCount = newChannelCount; @@ -409,5 +415,14 @@ private void forceClose() { channel.forceClose(); } } + + private int computeSize(NodeDistance distance) { + return config + .defaultProfile() + .getInt( + (distance == NodeDistance.LOCAL) + ? CoreDriverOption.POOLING_LOCAL_CONNECTIONS + : CoreDriverOption.POOLING_REMOTE_CONNECTIONS); + } } } diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index ae97c2e12ae..6459ac5097b 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -88,6 +88,19 @@ datastax-java-driver { page-size = 5000 } } + + # The driver maintains a connection pool to each node, according to the distance assigned to it + # by the load balancing policy. If the distance is IGNORED, no connections are maintained. + pooling { + local { + # The number of connections in the pool. + connections = 1 + } + remote { + connections = 1 + } + } + metadata { # Topology events are external signals that inform the driver of the state of Cassandra nodes # (by default, they correspond to gossip events received on the control connection). diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java index e2562fb89ae..861fec9b8d0 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTest.java @@ -17,8 +17,12 @@ import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.InvalidKeyspaceException; +import com.datastax.oss.driver.api.core.config.CoreDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfig; +import com.datastax.oss.driver.api.core.config.DriverConfigProfile; import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy; import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy.ReconnectionSchedule; +import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.internal.core.channel.ChannelEvent; import com.datastax.oss.driver.internal.core.channel.ChannelFactory; @@ -58,15 +62,17 @@ import static org.mockito.Mockito.times; public class ChannelPoolTest { - public static final InetSocketAddress ADDRESS = new InetSocketAddress("localhost", 9042); + private static final InetSocketAddress ADDRESS = new InetSocketAddress("localhost", 9042); private static final Node NODE = new DefaultNode(ADDRESS); - private @Mock InternalDriverContext context; - private @Mock ReconnectionPolicy reconnectionPolicy; - private @Mock ReconnectionSchedule reconnectionSchedule; - private @Mock NettyOptions nettyOptions; - private @Mock EventBus eventBus; - private @Mock ChannelFactory channelFactory; + @Mock private InternalDriverContext context; + @Mock private DriverConfig config; + @Mock private DriverConfigProfile defaultProfile; + @Mock private ReconnectionPolicy reconnectionPolicy; + @Mock private ReconnectionSchedule reconnectionSchedule; + @Mock private NettyOptions nettyOptions; + @Mock private EventBus eventBus; + @Mock private ChannelFactory channelFactory; private DefaultEventLoopGroup adminEventLoopGroup; @BeforeMethod @@ -77,6 +83,8 @@ public void setup() { Mockito.when(context.nettyOptions()).thenReturn(nettyOptions); Mockito.when(nettyOptions.adminEventExecutorGroup()).thenReturn(adminEventLoopGroup); + Mockito.when(context.config()).thenReturn(config); + Mockito.when(config.defaultProfile()).thenReturn(defaultProfile); Mockito.when(context.eventBus()).thenReturn(eventBus); Mockito.when(context.channelFactory()).thenReturn(channelFactory); @@ -94,7 +102,7 @@ public void teardown() { @Test public void should_initialize_when_all_channels_succeed() throws Exception { - int poolSize = 3; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(3); DriverChannel channel1 = newMockDriverChannel(1); DriverChannel channel2 = newMockDriverChannel(2); @@ -106,7 +114,8 @@ public void should_initialize_when_all_channels_succeed() throws Exception { .success(ADDRESS, channel3) .build(); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 3); waitForPendingAdminTasks(); @@ -120,7 +129,7 @@ public void should_initialize_when_all_channels_succeed() throws Exception { @Test public void should_initialize_when_all_channels_fail() throws Exception { - int poolSize = 3; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(3); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) @@ -129,7 +138,8 @@ public void should_initialize_when_all_channels_fail() throws Exception { .failure(ADDRESS, "mock channel init failure") .build(); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 3); waitForPendingAdminTasks(); @@ -142,7 +152,7 @@ public void should_initialize_when_all_channels_fail() throws Exception { @Test public void should_indicate_when_keyspace_failed_on_all_channels() { - int poolSize = 3; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(3); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) @@ -151,7 +161,8 @@ public void should_indicate_when_keyspace_failed_on_all_channels() { .failure(ADDRESS, new InvalidKeyspaceException("invalid keyspace")) .build(); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 3); waitForPendingAdminTasks(); @@ -160,7 +171,7 @@ public void should_indicate_when_keyspace_failed_on_all_channels() { @Test public void should_fire_force_down_event_when_cluster_name_does_not_match() throws Exception { - int poolSize = 3; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(3); ClusterNameMismatchException error = new ClusterNameMismatchException(ADDRESS, "actual", "expected"); @@ -171,7 +182,7 @@ public void should_fire_force_down_event_when_cluster_name_does_not_match() thro .failure(ADDRESS, error) .build(); - ChannelPool.init(NODE, null, poolSize, context); + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 3); waitForPendingAdminTasks(); @@ -187,7 +198,7 @@ public void should_reconnect_when_init_incomplete() throws Exception { // Short delay so we don't have to wait in the test Mockito.when(reconnectionSchedule.nextDelay()).thenReturn(Duration.ofNanos(1)); - int poolSize = 2; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(2); DriverChannel channel1 = newMockDriverChannel(1); DriverChannel channel2 = newMockDriverChannel(2); @@ -202,7 +213,8 @@ public void should_reconnect_when_init_incomplete() throws Exception { .build(); InOrder inOrder = Mockito.inOrder(eventBus); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 2); waitForPendingAdminTasks(); @@ -231,7 +243,7 @@ public void should_reconnect_when_init_incomplete() throws Exception { public void should_reconnect_when_channel_dies() throws Exception { Mockito.when(reconnectionSchedule.nextDelay()).thenReturn(Duration.ofNanos(1)); - int poolSize = 2; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(2); DriverChannel channel1 = newMockDriverChannel(1); DriverChannel channel2 = newMockDriverChannel(2); @@ -247,7 +259,8 @@ public void should_reconnect_when_channel_dies() throws Exception { .build(); InOrder inOrder = Mockito.inOrder(eventBus); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 2); waitForPendingAdminTasks(); @@ -279,7 +292,8 @@ public void should_reconnect_when_channel_dies() throws Exception { @Test public void should_shrink_outside_of_reconnection() throws Exception { - int poolSize = 4; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_REMOTE_CONNECTIONS)).thenReturn(4); + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(2); DriverChannel channel1 = newMockDriverChannel(1); DriverChannel channel2 = newMockDriverChannel(2); @@ -294,7 +308,8 @@ public void should_shrink_outside_of_reconnection() throws Exception { .build(); InOrder inOrder = Mockito.inOrder(eventBus); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.REMOTE, context); factoryHelper.waitForCalls(ADDRESS, 4); waitForPendingAdminTasks(); @@ -304,7 +319,7 @@ public void should_shrink_outside_of_reconnection() throws Exception { assertThat(pool.channels).containsOnly(channel1, channel2, channel3, channel4); inOrder.verify(eventBus, times(4)).fire(ChannelEvent.channelOpened(NODE)); - pool.resize(2); + pool.resize(NodeDistance.LOCAL); waitForPendingAdminTasks(); inOrder.verify(eventBus, times(2)).fire(ChannelEvent.channelClosed(NODE)); @@ -318,7 +333,8 @@ public void should_shrink_outside_of_reconnection() throws Exception { public void should_shrink_during_reconnection() throws Exception { Mockito.when(reconnectionSchedule.nextDelay()).thenReturn(Duration.ofNanos(1)); - int poolSize = 4; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_REMOTE_CONNECTIONS)).thenReturn(4); + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(2); DriverChannel channel1 = newMockDriverChannel(1); DriverChannel channel2 = newMockDriverChannel(2); @@ -339,7 +355,8 @@ public void should_shrink_during_reconnection() throws Exception { .build(); InOrder inOrder = Mockito.inOrder(eventBus); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.REMOTE, context); factoryHelper.waitForCalls(ADDRESS, 4); waitForPendingAdminTasks(); @@ -353,7 +370,7 @@ public void should_shrink_during_reconnection() throws Exception { Mockito.verify(reconnectionSchedule).nextDelay(); inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStarted(NODE)); - pool.resize(2); + pool.resize(NodeDistance.LOCAL); waitForPendingAdminTasks(); @@ -377,7 +394,8 @@ public void should_shrink_during_reconnection() throws Exception { public void should_grow_outside_of_reconnection() throws Exception { Mockito.when(reconnectionSchedule.nextDelay()).thenReturn(Duration.ofNanos(1)); - int poolSize = 2; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(2); + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_REMOTE_CONNECTIONS)).thenReturn(4); DriverChannel channel1 = newMockDriverChannel(1); DriverChannel channel2 = newMockDriverChannel(2); @@ -394,7 +412,8 @@ public void should_grow_outside_of_reconnection() throws Exception { .build(); InOrder inOrder = Mockito.inOrder(eventBus); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 2); waitForPendingAdminTasks(); @@ -404,7 +423,7 @@ public void should_grow_outside_of_reconnection() throws Exception { ChannelPool pool = poolFuture.toCompletableFuture().get(); assertThat(pool.channels).containsOnly(channel1, channel2); - pool.resize(4); + pool.resize(NodeDistance.REMOTE); waitForPendingAdminTasks(); // The resizing should have triggered a reconnection @@ -425,7 +444,8 @@ public void should_grow_outside_of_reconnection() throws Exception { public void should_grow_during_reconnection() throws Exception { Mockito.when(reconnectionSchedule.nextDelay()).thenReturn(Duration.ofNanos(1)); - int poolSize = 2; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(2); + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_REMOTE_CONNECTIONS)).thenReturn(4); DriverChannel channel1 = newMockDriverChannel(1); DriverChannel channel2 = newMockDriverChannel(2); @@ -447,7 +467,8 @@ public void should_grow_during_reconnection() throws Exception { .build(); InOrder inOrder = Mockito.inOrder(eventBus); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 2); waitForPendingAdminTasks(); @@ -461,7 +482,7 @@ public void should_grow_during_reconnection() throws Exception { Mockito.verify(reconnectionSchedule).nextDelay(); inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStarted(NODE)); - pool.resize(4); + pool.resize(NodeDistance.REMOTE); waitForPendingAdminTasks(); @@ -494,18 +515,18 @@ public void should_grow_during_reconnection() throws Exception { @Test public void should_switch_keyspace_on_existing_channels() throws Exception { - int poolSize = 2; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(2); DriverChannel channel1 = newMockDriverChannel(1); DriverChannel channel2 = newMockDriverChannel(2); - CompletableFuture channel2Future = new CompletableFuture<>(); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) .success(ADDRESS, channel1) .success(ADDRESS, channel2) .build(); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 2); waitForPendingAdminTasks(); @@ -530,7 +551,7 @@ public void should_switch_keyspace_on_existing_channels() throws Exception { public void should_switch_keyspace_on_pending_channels() throws Exception { Mockito.when(reconnectionSchedule.nextDelay()).thenReturn(Duration.ofNanos(1)); - int poolSize = 2; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(2); DriverChannel channel1 = newMockDriverChannel(1); CompletableFuture channel1Future = new CompletableFuture<>(); @@ -546,7 +567,8 @@ public void should_switch_keyspace_on_pending_channels() throws Exception { .pending(ADDRESS, channel2Future) .build(); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 2); waitForPendingAdminTasks(); @@ -581,7 +603,7 @@ public void should_switch_keyspace_on_pending_channels() throws Exception { public void should_close_all_channels_when_closed() throws Exception { Mockito.when(reconnectionSchedule.nextDelay()).thenReturn(Duration.ofNanos(1)); - int poolSize = 3; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(3); DriverChannel channel1 = newMockDriverChannel(1); DriverChannel channel2 = newMockDriverChannel(2); @@ -598,7 +620,8 @@ public void should_close_all_channels_when_closed() throws Exception { .build(); InOrder inOrder = Mockito.inOrder(eventBus); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 3); waitForPendingAdminTasks(); @@ -643,7 +666,7 @@ public void should_close_all_channels_when_closed() throws Exception { public void should_force_close_all_channels_when_force_closed() throws Exception { Mockito.when(reconnectionSchedule.nextDelay()).thenReturn(Duration.ofNanos(1)); - int poolSize = 3; + Mockito.when(defaultProfile.getInt(CoreDriverOption.POOLING_LOCAL_CONNECTIONS)).thenReturn(3); DriverChannel channel1 = newMockDriverChannel(1); DriverChannel channel2 = newMockDriverChannel(2); @@ -660,7 +683,8 @@ public void should_force_close_all_channels_when_force_closed() throws Exception .build(); InOrder inOrder = Mockito.inOrder(eventBus); - CompletionStage poolFuture = ChannelPool.init(NODE, null, poolSize, context); + CompletionStage poolFuture = + ChannelPool.init(NODE, null, NodeDistance.LOCAL, context); factoryHelper.waitForCalls(ADDRESS, 3); waitForPendingAdminTasks();