Skip to content

Commit

Permalink
Pass distance instead of size to pool
Browse files Browse the repository at this point in the history
  • Loading branch information
olim7t committed Apr 27, 2017
1 parent 65184cd commit afca2b6
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 49 deletions.
Expand Up @@ -41,6 +41,9 @@ public enum CoreDriverOption implements DriverOption {
RECONNECTION_CONFIG_BASE_DELAY("connection.reconnection.config.base-delay", true),
RECONNECTION_CONFIG_MAX_DELAY("connection.reconnection.config.max-delay", true),

POOLING_LOCAL_CONNECTIONS("pooling.local.connections", true),
POOLING_REMOTE_CONNECTIONS("pooling.remote.connections", true),

ADDRESS_TRANSLATOR_CLASS("address-translation.translator-class", true),

AUTHENTICATION_PROVIDER_CLASS("authentication.provider-class", false),
Expand Down
Expand Up @@ -18,6 +18,9 @@
import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.InvalidKeyspaceException;
import com.datastax.oss.driver.api.core.config.CoreDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.channel.ChannelEvent;
import com.datastax.oss.driver.internal.core.channel.ChannelFactory;
Expand Down Expand Up @@ -66,8 +69,8 @@ public class ChannelPool implements AsyncAutoCloseable {
* channels (i.e. {@link #next()} return {@code null}) and is reconnecting.
*/
public static CompletionStage<ChannelPool> init(
Node node, CqlIdentifier keyspaceName, int channelCount, InternalDriverContext context) {
ChannelPool pool = new ChannelPool(node, keyspaceName, channelCount, context);
Node node, CqlIdentifier keyspaceName, NodeDistance distance, InternalDriverContext context) {
ChannelPool pool = new ChannelPool(node, keyspaceName, distance, context);
return pool.connect();
}

Expand All @@ -80,10 +83,10 @@ public static CompletionStage<ChannelPool> init(
private volatile boolean invalidKeyspace;

private ChannelPool(
Node node, CqlIdentifier keyspaceName, int channelCount, InternalDriverContext context) {
Node node, CqlIdentifier keyspaceName, NodeDistance distance, InternalDriverContext context) {
this.node = node;
this.adminExecutor = context.nettyOptions().adminEventExecutorGroup().next();
this.singleThreaded = new SingleThreaded(keyspaceName, channelCount, context);
this.singleThreaded = new SingleThreaded(keyspaceName, distance, context);
}

private CompletionStage<ChannelPool> connect() {
Expand Down Expand Up @@ -114,8 +117,8 @@ public DriverChannel next() {
return channels.next();
}

public void resize(int newChannelCount) {
RunOrSchedule.on(adminExecutor, () -> singleThreaded.resize(newChannelCount));
public void resize(NodeDistance newDistance) {
RunOrSchedule.on(adminExecutor, () -> singleThreaded.resize(newDistance));
}

/**
Expand Down Expand Up @@ -149,6 +152,7 @@ public CompletionStage<Void> forceCloseAsync() {
/** Holds all administration tasks, that are confined to the admin executor. */
private class SingleThreaded {

private final DriverConfig config;
private final ChannelFactory channelFactory;
private final EventBus eventBus;
// The channels that are currently connecting
Expand All @@ -165,9 +169,10 @@ private class SingleThreaded {
private CqlIdentifier keyspaceName;

private SingleThreaded(
CqlIdentifier keyspaceName, int wantedCount, InternalDriverContext context) {
CqlIdentifier keyspaceName, NodeDistance distance, InternalDriverContext context) {
this.keyspaceName = keyspaceName;
this.wantedCount = wantedCount;
this.config = context.config();
this.wantedCount = computeSize(distance);
this.channelFactory = context.channelFactory();
this.eventBus = context.eventBus();
this.reconnection =
Expand Down Expand Up @@ -295,8 +300,9 @@ private void onChannelClosed(DriverChannel channel) {
}
}

private void resize(int newChannelCount) {
private void resize(NodeDistance newDistance) {
assert adminExecutor.inEventLoop();
int newChannelCount = computeSize(newDistance);
if (newChannelCount > wantedCount) {
LOG.debug("{} growing ({} => {} channels)", ChannelPool.this, wantedCount, newChannelCount);
wantedCount = newChannelCount;
Expand Down Expand Up @@ -409,5 +415,14 @@ private void forceClose() {
channel.forceClose();
}
}

private int computeSize(NodeDistance distance) {
return config
.defaultProfile()
.getInt(
(distance == NodeDistance.LOCAL)
? CoreDriverOption.POOLING_LOCAL_CONNECTIONS
: CoreDriverOption.POOLING_REMOTE_CONNECTIONS);
}
}
}
13 changes: 13 additions & 0 deletions core/src/main/resources/reference.conf
Expand Up @@ -88,6 +88,19 @@ datastax-java-driver {
page-size = 5000
}
}

# The driver maintains a connection pool to each node, according to the distance assigned to it
# by the load balancing policy. If the distance is IGNORED, no connections are maintained.
pooling {
local {
# The number of connections in the pool.
connections = 1
}
remote {
connections = 1
}
}

metadata {
# Topology events are external signals that inform the driver of the state of Cassandra nodes
# (by default, they correspond to gossip events received on the control connection).
Expand Down

0 comments on commit afca2b6

Please sign in to comment.