Skip to content

Commit

Permalink
Always report available stream ids
Browse files Browse the repository at this point in the history
  • Loading branch information
olim7t committed Dec 3, 2017
1 parent 43e855a commit 873bf86
Show file tree
Hide file tree
Showing 14 changed files with 41 additions and 147 deletions.

This file was deleted.

Expand Up @@ -85,9 +85,6 @@ public CompletionStage<DriverChannel> connect(
final SocketAddress address, DriverChannelOptions options) {
CompletableFuture<DriverChannel> resultFuture = new CompletableFuture<>();

AvailableIdsHolder availableIdsHolder =
options.reportAvailableIds ? new AvailableIdsHolder() : null;

ProtocolVersion currentVersion;
boolean isNegotiating;
List<ProtocolVersion> attemptedVersions = new CopyOnWriteArrayList<>();
Expand All @@ -99,21 +96,13 @@ public CompletionStage<DriverChannel> connect(
isNegotiating = true;
}

connect(
address,
options,
availableIdsHolder,
currentVersion,
isNegotiating,
attemptedVersions,
resultFuture);
connect(address, options, currentVersion, isNegotiating, attemptedVersions, resultFuture);
return resultFuture;
}

private void connect(
SocketAddress address,
DriverChannelOptions options,
AvailableIdsHolder availableIdsHolder,
final ProtocolVersion currentVersion,
boolean isNegotiating,
List<ProtocolVersion> attemptedVersions,
Expand All @@ -126,8 +115,7 @@ private void connect(
.group(nettyOptions.ioEventLoopGroup())
.channel(nettyOptions.channelClass())
.option(ChannelOption.ALLOCATOR, nettyOptions.allocator())
.handler(
initializer(address, currentVersion, options, availableIdsHolder, resultFuture));
.handler(initializer(address, currentVersion, options, resultFuture));

nettyOptions.afterBootstrapInitialized(bootstrap);

Expand All @@ -138,8 +126,7 @@ private void connect(
if (connectFuture.isSuccess()) {
Channel channel = connectFuture.channel();
DriverChannel driverChannel =
new DriverChannel(
channel, context.writeCoalescer(), availableIdsHolder, currentVersion);
new DriverChannel(channel, context.writeCoalescer(), currentVersion);
// If this is the first successful connection, remember the protocol version and
// cluster name for future connections.
if (isNegotiating) {
Expand All @@ -160,14 +147,7 @@ private void connect(
"Failed to connect with protocol {}, retrying with {}",
currentVersion,
downgraded.get());
connect(
address,
options,
availableIdsHolder,
downgraded.get(),
true,
attemptedVersions,
resultFuture);
connect(address, options, downgraded.get(), true, attemptedVersions, resultFuture);
} else {
resultFuture.completeExceptionally(
UnsupportedProtocolVersionException.forNegotiation(address, attemptedVersions));
Expand All @@ -186,7 +166,6 @@ ChannelInitializer<Channel> initializer(
SocketAddress address,
final ProtocolVersion protocolVersion,
final DriverChannelOptions options,
AvailableIdsHolder availableIdsHolder,
CompletableFuture<DriverChannel> resultFuture) {
return new ChannelInitializer<Channel>() {
@Override
Expand All @@ -211,7 +190,6 @@ protected void initChannel(Channel channel) throws Exception {
new StreamIdGenerator(maxRequestsPerConnection),
maxOrphanRequests,
setKeyspaceTimeoutMillis,
availableIdsHolder,
channel.newPromise(),
options.eventCallback,
options.ownerLogPrefix);
Expand Down
Expand Up @@ -43,22 +43,16 @@ public class DriverChannel {
static final Object FORCEFUL_CLOSE_MESSAGE = new String("FORCEFUL_CLOSE_MESSAGE");

private final Channel channel;
private final ChannelFuture closeStartedFuture;
private final InFlightHandler inFlightHandler;
private final WriteCoalescer writeCoalescer;
private final AvailableIdsHolder availableIdsHolder;
private final ProtocolVersion protocolVersion;
private final AtomicBoolean closing = new AtomicBoolean();
private final AtomicBoolean forceClosing = new AtomicBoolean();

DriverChannel(
Channel channel,
WriteCoalescer writeCoalescer,
AvailableIdsHolder availableIdsHolder,
ProtocolVersion protocolVersion) {
DriverChannel(Channel channel, WriteCoalescer writeCoalescer, ProtocolVersion protocolVersion) {
this.channel = channel;
this.closeStartedFuture = channel.pipeline().get(InFlightHandler.class).closeStartedFuture;
this.inFlightHandler = channel.pipeline().get(InFlightHandler.class);
this.writeCoalescer = writeCoalescer;
this.availableIdsHolder = availableIdsHolder;
this.protocolVersion = protocolVersion;
}

Expand Down Expand Up @@ -127,11 +121,11 @@ public String getClusterName() {

/**
* @return the number of available stream ids on the channel. This is used to weigh channels in
* the pool. Note that for performance reasons this is only maintained if the channel is part
* of a pool that has a size bigger than 1, otherwise it will always return -1.
* pools that have a size bigger than 1, in the load balancing policy, and for monitoring
* purposes.
*/
public int availableIds() {
return (availableIdsHolder == null) ? -1 : availableIdsHolder.value;
public int getAvailableIds() {
return inFlightHandler.getAvailableIds();
}

public EventLoop eventLoop() {
Expand Down Expand Up @@ -186,7 +180,7 @@ public Future<Void> forceClose() {
* #forceClose()} is called first, this future will never complete.
*/
public ChannelFuture closeStartedFuture() {
return this.closeStartedFuture;
return this.inFlightHandler.closeStartedFuture;
}

/**
Expand Down
Expand Up @@ -32,9 +32,6 @@ public static Builder builder() {

public final CqlIdentifier keyspace;

/** Whether {@link DriverChannel#availableIds()} should be maintained */
public final boolean reportAvailableIds;

/**
* What kind of protocol events to listen for.
*
Expand All @@ -48,20 +45,17 @@ public static Builder builder() {

private DriverChannelOptions(
CqlIdentifier keyspace,
boolean reportAvailableIds,
List<String> eventTypes,
EventCallback eventCallback,
String ownerLogPrefix) {
this.keyspace = keyspace;
this.reportAvailableIds = reportAvailableIds;
this.eventTypes = eventTypes;
this.eventCallback = eventCallback;
this.ownerLogPrefix = ownerLogPrefix;
}

public static class Builder {
private CqlIdentifier keyspace = null;
private boolean reportAvailableIds = false;
private List<String> eventTypes = Collections.emptyList();
private EventCallback eventCallback = null;
private String ownerLogPrefix = null;
Expand All @@ -71,11 +65,6 @@ public Builder withKeyspace(CqlIdentifier keyspace) {
return this;
}

public Builder reportAvailableIds(boolean reportAvailableIds) {
this.reportAvailableIds = reportAvailableIds;
return this;
}

public Builder withEvents(List<String> eventTypes, EventCallback eventCallback) {
Preconditions.checkArgument(eventTypes != null && !eventTypes.isEmpty());
Preconditions.checkNotNull(eventCallback);
Expand All @@ -90,8 +79,7 @@ public Builder withOwnerLogPrefix(String ownerLogPrefix) {
}

public DriverChannelOptions build() {
return new DriverChannelOptions(
keyspace, reportAvailableIds, eventTypes, eventCallback, ownerLogPrefix);
return new DriverChannelOptions(keyspace, eventTypes, eventCallback, ownerLogPrefix);
}
}
}
Expand Up @@ -51,7 +51,6 @@ public class InFlightHandler extends ChannelDuplexHandler {
private final String ownerLogPrefix;
private final BiMap<Integer, ResponseCallback> inFlight;
private final long setKeyspaceTimeoutMillis;
private final AvailableIdsHolder availableIdsHolder;
private final EventCallback eventCallback;
private final int maxOrphanStreamIds;
private boolean closingGracefully;
Expand All @@ -64,7 +63,6 @@ public class InFlightHandler extends ChannelDuplexHandler {
StreamIdGenerator streamIds,
int maxOrphanStreamIds,
long setKeyspaceTimeoutMillis,
AvailableIdsHolder availableIdsHolder,
ChannelPromise closeStartedFuture,
EventCallback eventCallback,
String ownerLogPrefix) {
Expand All @@ -74,10 +72,8 @@ public class InFlightHandler extends ChannelDuplexHandler {
this.closeStartedFuture = closeStartedFuture;
this.ownerLogPrefix = ownerLogPrefix;
this.logPrefix = ownerLogPrefix + "|connecting...";
reportAvailableIds();
this.inFlight = HashBiMap.create(streamIds.getMaxAvailableIds());
this.setKeyspaceTimeoutMillis = setKeyspaceTimeoutMillis;
this.availableIdsHolder = availableIdsHolder;
this.eventCallback = eventCallback;
}

Expand Down Expand Up @@ -128,8 +124,6 @@ private void write(ChannelHandlerContext ctx, RequestMessage message, ChannelPro
return;
}

reportAvailableIds();

LOG.debug("[{}] Writing {} on stream id {}", logPrefix, message.responseCallback, streamId);
Frame frame =
Frame.forRequest(
Expand Down Expand Up @@ -311,7 +305,6 @@ private ResponseCallback release(int streamId, ChannelHandlerContext ctx) {
LOG.debug("[{}] Releasing stream id {}", logPrefix, streamId);
ResponseCallback responseCallback = inFlight.remove(streamId);
streamIds.release(streamId);
reportAvailableIds();
// If we're in the middle of an orderly close and this was the last request, actually close
// the channel now
if (closingGracefully && inFlight.isEmpty()) {
Expand Down Expand Up @@ -340,10 +333,8 @@ private void abortAllInFlight(DriverException cause, ResponseCallback ignore) {
// closing the channel
}

private void reportAvailableIds() {
if (availableIdsHolder != null) {
availableIdsHolder.value = streamIds.getAvailableIds();
}
int getAvailableIds() {
return streamIds.getAvailableIds();
}

private class SetKeyspaceRequest extends ChannelHandlerRequest {
Expand Down
Expand Up @@ -17,13 +17,19 @@

import java.util.BitSet;

/** Manages the set of stream ids used to distinguish multiplexed requests on a channel. */
/**
* Manages the set of identifiers used to distinguish multiplexed requests on a channel.
*
* <p>This class is not thread safe: calls to {@link #acquire()} and {@link #release(int)} must be
* properly synchronized (in practice this is done by only calling them from the I/O thread).
* However, {@link #getAvailableIds()} has volatile semantics.
*/
class StreamIdGenerator {

private final int maxAvailableIds;
// unset = available, set = borrowed (note that this is the opposite of the 3.x implementation)
private final BitSet ids;
private int availableIds;
private volatile int availableIds;

StreamIdGenerator(int maxAvailableIds) {
this.maxAvailableIds = maxAvailableIds;
Expand Down
Expand Up @@ -248,7 +248,6 @@ private CompletionStage<Boolean> addMissingChannels() {
DriverChannelOptions options =
DriverChannelOptions.builder()
.withKeyspace(keyspaceName)
.reportAvailableIds(wantedCount > 1)
.withOwnerLogPrefix(sessionLogPrefix)
.build();
for (int i = 0; i < missing; i++) {
Expand Down
Expand Up @@ -85,7 +85,7 @@ DriverChannel next() {
DriverChannel best = null;
int bestScore = 0;
for (DriverChannel channel : snapshot) {
int score = channel.availableIds();
int score = channel.getAvailableIds();
if (score > bestScore) {
bestScore = score;
best = channel;
Expand Down
Expand Up @@ -49,65 +49,33 @@ public void setup() throws InterruptedException {
}

@Test
public void should_report_available_ids_if_requested() {
public void should_report_available_ids() {
// Given
ChannelFactory factory = newChannelFactory();

// When
CompletionStage<DriverChannel> channelFuture =
factory.connect(
SERVER_ADDRESS, DriverChannelOptions.builder().reportAvailableIds(true).build());
factory.connect(SERVER_ADDRESS, DriverChannelOptions.builder().build());
completeSimpleChannelInit();

// Then
assertThat(channelFuture)
.isSuccess(
channel -> {
assertThat(channel.availableIds()).isEqualTo(128);
assertThat(channel.getAvailableIds()).isEqualTo(128);

// Write a request, should decrease the count
Future<java.lang.Void> writeFuture =
channel.write(new Query("test"), false, Frame.NO_PAYLOAD, responseCallback);
assertThat(writeFuture)
.isSuccess(
v -> {
assertThat(channel.availableIds()).isEqualTo(127);
assertThat(channel.getAvailableIds()).isEqualTo(127);

// Complete the request, should increase again
writeInboundFrame(readOutboundFrame(), Void.INSTANCE);
Mockito.verify(responseCallback, timeout(500)).onResponse(any(Frame.class));
assertThat(channel.availableIds()).isEqualTo(128);
});
});
}

@Test
public void should_not_report_available_ids_if_not_requested() {
// Given
ChannelFactory factory = newChannelFactory();

// When
CompletionStage<DriverChannel> channelFuture =
factory.connect(SERVER_ADDRESS, DriverChannelOptions.DEFAULT);
completeSimpleChannelInit();

// Then
assertThat(channelFuture)
.isSuccess(
channel -> {
assertThat(channel.availableIds()).isEqualTo(-1);

// Write a request, complete it, count should never be updated
Future<java.lang.Void> writeFuture =
channel.write(new Query("test"), false, Frame.NO_PAYLOAD, responseCallback);
assertThat(writeFuture)
.isSuccess(
v -> {
assertThat(channel.availableIds()).isEqualTo(-1);

writeInboundFrame(readOutboundFrame(), Void.INSTANCE);
Mockito.verify(responseCallback, timeout(500)).onResponse(any(Frame.class));
assertThat(channel.availableIds()).isEqualTo(-1);
assertThat(channel.getAvailableIds()).isEqualTo(128);
});
});
}
Expand Down
Expand Up @@ -226,7 +226,6 @@ ChannelInitializer<Channel> initializer(
SocketAddress address,
ProtocolVersion protocolVersion,
DriverChannelOptions options,
AvailableIdsHolder availableIdsHolder,
CompletableFuture<DriverChannel> resultFuture) {
return new ChannelInitializer<Channel>() {
@Override
Expand All @@ -247,7 +246,6 @@ protected void initChannel(Channel channel) throws Exception {
new StreamIdGenerator(maxRequestsPerConnection),
Integer.MAX_VALUE,
setKeyspaceTimeoutMillis,
availableIdsHolder,
channel.newPromise(),
null,
"test");
Expand Down

0 comments on commit 873bf86

Please sign in to comment.