Skip to content

Commit

Permalink
Add channel pool
Browse files Browse the repository at this point in the history
  • Loading branch information
olim7t committed Apr 10, 2017
1 parent 904b5fe commit 88a91ff
Show file tree
Hide file tree
Showing 34 changed files with 1,973 additions and 86 deletions.
Expand Up @@ -24,7 +24,11 @@ public class AuthenticationException extends RuntimeException {
private final SocketAddress address;

public AuthenticationException(SocketAddress address, String message) {
super(String.format("Authentication error on host %s: %s", address, message));
this(address, message, null);
}

public AuthenticationException(SocketAddress address, String message, Throwable cause) {
super(String.format("Authentication error on host %s: %s", address, message), cause);
this.address = address;
}
}
Expand Up @@ -23,7 +23,11 @@ public enum CoreDriverOption implements DriverOption {
CONNECTION_MAX_FRAME_LENGTH("connection.max-frame-length", true),
CONNECTION_MAX_REQUESTS("connection.max-requests-per-connection", true),
CONNECTION_HEARTBEAT_INTERVAL("connection.heartbeat.interval", true),
CONNECTION_HEARTBEAT_TIMEOUT("connection.heartbeat.interval", true),
CONNECTION_HEARTBEAT_TIMEOUT("connection.heartbeat.timeout", true),

RECONNECTION_POLICY_CLASS("connection.reconnection-policy.provider-class", true),
RECONNECTION_CONFIG_BASE_DELAY("connection.reconnection-policy.config.base-delay", true),
RECONNECTION_CONFIG_MAX_DELAY("connection.reconnection-policy.config.max-delay", true),

AUTHENTICATION_PROVIDER_CLASS("authentication.provider-class", false),
AUTHENTICATION_CONFIG_USERNAME("authentication.config.username", false),
Expand Down
@@ -0,0 +1,111 @@
/*
* Copyright (C) 2017-2017 DataStax Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.api.core.connection;

import com.datastax.oss.driver.api.core.config.CoreDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigProfile;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
* A reconnection policy that waits exponentially longer between each reconnection attempt (but
* keeps a constant delay once a maximum delay is reached).
*/
public class ExponentialReconnectionPolicy implements ReconnectionPolicy {

private final long baseDelayMs;
private final long maxDelayMs;
private final long maxAttempts;

/** Builds a new instance. */
public ExponentialReconnectionPolicy(DriverContext context) {
DriverConfigProfile config = context.config().defaultProfile();
this.baseDelayMs =
config.getDuration(CoreDriverOption.RECONNECTION_CONFIG_BASE_DELAY, TimeUnit.MILLISECONDS);
this.maxDelayMs =
config.getDuration(CoreDriverOption.RECONNECTION_CONFIG_MAX_DELAY, TimeUnit.MILLISECONDS);

Preconditions.checkArgument(
baseDelayMs > 0,
"%s must be strictly positive (got %s)",
CoreDriverOption.RECONNECTION_CONFIG_BASE_DELAY.getPath(),
baseDelayMs);
Preconditions.checkArgument(
maxDelayMs >= 0,
"%s must be positive (got %s)",
CoreDriverOption.RECONNECTION_CONFIG_MAX_DELAY.getPath(),
maxDelayMs);
Preconditions.checkArgument(
maxDelayMs >= baseDelayMs,
"%s must be bigger than %s (got %s, %s)",
CoreDriverOption.RECONNECTION_CONFIG_MAX_DELAY.getPath(),
CoreDriverOption.RECONNECTION_CONFIG_BASE_DELAY.getPath(),
maxDelayMs,
baseDelayMs);

// Maximum number of attempts after which we overflow
int ceil = (baseDelayMs & (baseDelayMs - 1)) == 0 ? 0 : 1;
this.maxAttempts = 64 - Long.numberOfLeadingZeros(Long.MAX_VALUE / baseDelayMs) - ceil;
}

/**
* The base delay in milliseconds for this policy (e.g. the delay before the first reconnection
* attempt).
*
* @return the base delay in milliseconds for this policy.
*/
public long getBaseDelayMs() {
return baseDelayMs;
}

/**
* The maximum delay in milliseconds between reconnection attempts for this policy.
*
* @return the maximum delay in milliseconds between reconnection attempts for this policy.
*/
public long getMaxDelayMs() {
return maxDelayMs;
}

/**
* A new schedule that used an exponentially growing delay between reconnection attempts.
*
* <p>For this schedule, reconnection attempt {@code i} will be tried {@code Math.min(2^(i-1) *
* getBaseDelayMs(), getMaxDelayMs())} milliseconds after the previous one.
*
* @return the newly created schedule.
*/
@Override
public ReconnectionSchedule newSchedule() {
return new ExponentialSchedule();
}

private class ExponentialSchedule implements ReconnectionSchedule {

private int attempts;

@Override
public Duration nextDelay() {
long delay =
(attempts > maxAttempts)
? maxDelayMs
: Math.min(baseDelayMs * (1L << attempts++), maxDelayMs);
return Duration.ofMillis(delay);
}
}
}
@@ -0,0 +1,27 @@
/*
* Copyright (C) 2017-2017 DataStax Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.api.core.connection;

public class HeartbeatException extends ConnectionException {

public HeartbeatException(String message) {
super(message);
}

public HeartbeatException(String message, Throwable cause) {
super(message, cause);
}
}
Expand Up @@ -17,6 +17,7 @@

import com.datastax.oss.driver.api.core.auth.AuthProvider;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
import com.datastax.oss.driver.api.core.ssl.SslEngineFactory;
import java.util.Optional;

Expand All @@ -26,6 +27,9 @@ public interface DriverContext {
/** The driver's configuration. */
DriverConfig config();

/** The configured reconnection policy. */
ReconnectionPolicy reconnectionPolicy();

/** The authentication provider, if authentication was configured. */
Optional<AuthProvider> authProvider();

Expand Down
@@ -0,0 +1,25 @@
/*
* Copyright (C) 2017-2017 DataStax Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.internal.core.channel;

/**
* Hack to expose the number of available ids on a channel. We want to access it on the {@link
* DriverChannel} instance, but it's updated from {@link InFlightHandler}, and we want volatile for
* efficiency.
*/
class AvailableIdsHolder {
volatile int value;
}
Expand Up @@ -65,10 +65,13 @@ public ChannelFactory(InternalDriverContext context) {
} // else it will be negotiated with the first opened connection
}

/** @param reportAvailableIds whether {@link DriverChannel#availableIds()} should be maintained */
public CompletionStage<DriverChannel> connect(
final SocketAddress address, CqlIdentifier keyspace) {
final SocketAddress address, CqlIdentifier keyspace, boolean reportAvailableIds) {
CompletableFuture<DriverChannel> resultFuture = new CompletableFuture<>();

AvailableIdsHolder availableIdsHolder = reportAvailableIds ? new AvailableIdsHolder() : null;

ProtocolVersion currentVersion;
boolean isNegotiating;
List<ProtocolVersion> attemptedVersions = new CopyOnWriteArrayList<>();
Expand All @@ -80,13 +83,21 @@ public CompletionStage<DriverChannel> connect(
isNegotiating = true;
}

connect(address, keyspace, currentVersion, isNegotiating, attemptedVersions, resultFuture);
connect(
address,
keyspace,
availableIdsHolder,
currentVersion,
isNegotiating,
attemptedVersions,
resultFuture);
return resultFuture;
}

private void connect(
SocketAddress address,
CqlIdentifier keyspace,
AvailableIdsHolder availableIdsHolder,
final ProtocolVersion currentVersion,
boolean isNegotiating,
List<ProtocolVersion> attemptedVersions,
Expand All @@ -99,7 +110,7 @@ private void connect(
.group(nettyOptions.ioEventLoopGroup())
.channel(nettyOptions.channelClass())
.option(ChannelOption.ALLOCATOR, nettyOptions.allocator())
.handler(initializer(address, currentVersion, keyspace));
.handler(initializer(address, currentVersion, keyspace, availableIdsHolder));

nettyOptions.afterBootstrapInitialized(bootstrap);

Expand All @@ -109,7 +120,8 @@ private void connect(
cf -> {
if (connectFuture.isSuccess()) {
Channel channel = connectFuture.channel();
DriverChannel driverChannel = new DriverChannel(channel, context.writeCoalescer());
DriverChannel driverChannel =
new DriverChannel(channel, context.writeCoalescer(), availableIdsHolder);
// If this is the first successful connection, remember the protocol version and
// cluster name for future connections.
if (isNegotiating) {
Expand All @@ -135,7 +147,14 @@ private void connect(
"Failed to connect with protocol {}, retrying with {}",
currentVersion,
downgraded.get());
connect(address, keyspace, downgraded.get(), true, attemptedVersions, resultFuture);
connect(
address,
keyspace,
availableIdsHolder,
downgraded.get(),
true,
attemptedVersions,
resultFuture);
} else {
resultFuture.completeExceptionally(
UnsupportedProtocolVersionException.forNegotiation(address, attemptedVersions));
Expand All @@ -149,7 +168,10 @@ private void connect(

@VisibleForTesting
ChannelInitializer<Channel> initializer(
SocketAddress address, final ProtocolVersion protocolVersion, final CqlIdentifier keyspace) {
SocketAddress address,
final ProtocolVersion protocolVersion,
final CqlIdentifier keyspace,
AvailableIdsHolder availableIdsHolder) {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
Expand All @@ -168,7 +190,8 @@ protected void initChannel(Channel channel) throws Exception {
new InFlightHandler(
protocolVersion,
new StreamIdGenerator(maxRequestsPerConnection),
setKeyspaceTimeoutMillis);
setKeyspaceTimeoutMillis,
availableIdsHolder);
ProtocolInitHandler initHandler =
new ProtocolInitHandler(context, protocolVersion, clusterName, keyspace);

Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.protocol.internal.Message;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
Expand All @@ -31,16 +32,24 @@
*/
public class DriverChannel {
static final AttributeKey<String> CLUSTER_NAME_KEY = AttributeKey.newInstance("cluster_name");
static final Object GRACEFUL_CLOSE_MESSAGE = new Object();
static final Object FORCEFUL_CLOSE_MESSAGE = new Object();

@SuppressWarnings("RedundantStringConstructorCall")
static final Object GRACEFUL_CLOSE_MESSAGE = new String("GRACEFUL_CLOSE_MESSAGE");

@SuppressWarnings("RedundantStringConstructorCall")
static final Object FORCEFUL_CLOSE_MESSAGE = new String("FORCEFUL_CLOSE_MESSAGE");

private final Channel channel;
private final WriteCoalescer writeCoalescer;
private final AvailableIdsHolder availableIdsHolder;
private final AtomicBoolean closing = new AtomicBoolean();
private final AtomicBoolean forceClosing = new AtomicBoolean();

DriverChannel(Channel channel, WriteCoalescer writeCoalescer) {
DriverChannel(
Channel channel, WriteCoalescer writeCoalescer, AvailableIdsHolder availableIdsHolder) {
this.channel = channel;
this.writeCoalescer = writeCoalescer;
this.availableIdsHolder = availableIdsHolder;
}

/**
Expand Down Expand Up @@ -93,6 +102,15 @@ public String getClusterName() {
return channel.attr(CLUSTER_NAME_KEY).get();
}

/**
* @return the number of available stream ids on the channel. This is used to weigh channels in
* the pool. Note that for performance reasons this is only maintained if the channel is part
* of a pool that has a size bigger than 1, otherwise it will always return -1.
*/
public int availableIds() {
return (availableIdsHolder == null) ? -1 : availableIdsHolder.value;
}

/**
* Initiates a graceful shutdown: no new requests will be accepted, but all pending requests will
* be allowed to complete before the underlying channel is closed.
Expand All @@ -111,8 +129,15 @@ public Future<Void> close() {
* will be closed.
*/
public Future<Void> forceClose() {
closing.set(true);
writeCoalescer.writeAndFlush(channel, FORCEFUL_CLOSE_MESSAGE);
this.close();
if (forceClosing.compareAndSet(false, true)) {
writeCoalescer.writeAndFlush(channel, FORCEFUL_CLOSE_MESSAGE);
}
return channel.closeFuture();
}

/** Does not close the channel, but returns a future that will complete when it does. */
public ChannelFuture closeFuture() {
return channel.closeFuture();
}

Expand Down

0 comments on commit 88a91ff

Please sign in to comment.