Skip to content

Commit

Permalink
JAVA-1772: Revisit multi-response callbacks
Browse files Browse the repository at this point in the history
Motivation:

In the previous version, multi-response ResponseCallback implementations
had to explicitly indicate when to release the stream id. It's simpler
to do it directly in InFlightHandler, provided that we have a simple way
to test when a Frame is the last that the server will send (and we
should ensure that this is always the case for future multi-response
requests).

Modifications:

Replace ResponseCallback.holdStreamId() by isLastResponse(Frame).
Remove DriverChannel.release(int).
Keep track of orphaned ResponseCallbacks in InFlightHandler, and release
them (and the stream id) when their last response is received.

Result:

ResponseCallbacks now only need to indicate how to identify the last
frame, InFlightHandler handles the rest.
Cancelled multi-response callbacks will be properly released.
  • Loading branch information
olim7t committed Mar 1, 2018
1 parent 3c046e2 commit 8257234
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 80 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Expand Up @@ -4,6 +4,7 @@

### 4.0.0-alpha4 (in progress)

- [improvement] JAVA-1772: Revisit multi-response callbacks
- [new feature] JAVA-1537: Add remaining socket options
- [bug] JAVA-1756: Propagate custom payload when preparing a statement

Expand Down
Expand Up @@ -87,16 +87,6 @@ public void cancel(ResponseCallback responseCallback) {
writeCoalescer.writeAndFlush(channel, responseCallback).addListener(UncaughtExceptions::log);
}

/**
* Releases a stream id if the client was holding onto it, and has now determined that it can be
* safely reused.
*
* @see ResponseCallback#holdStreamId()
*/
public void release(int streamId) {
channel.pipeline().fireUserEventTriggered(new ReleaseEvent(streamId));
}

/**
* Switches the underlying Cassandra connection to a new keyspace (as if a {@code USE ...}
* statement was issued).
Expand Down Expand Up @@ -244,14 +234,6 @@ static class RequestMessage {
}
}

static class ReleaseEvent {
final int streamId;

ReleaseEvent(int streamId) {
this.streamId = streamId;
}
}

static class SetKeyspaceEvent {
final CqlIdentifier keyspaceName;
final Promise<Void> promise;
Expand Down
Expand Up @@ -21,7 +21,6 @@
import com.datastax.oss.driver.api.core.connection.BusyConnectionException;
import com.datastax.oss.driver.api.core.connection.ClosedConnectionException;
import com.datastax.oss.driver.api.core.connection.HeartbeatException;
import com.datastax.oss.driver.internal.core.channel.DriverChannel.ReleaseEvent;
import com.datastax.oss.driver.internal.core.channel.DriverChannel.RequestMessage;
import com.datastax.oss.driver.internal.core.channel.DriverChannel.SetKeyspaceEvent;
import com.datastax.oss.driver.internal.core.protocol.FrameDecodingException;
Expand All @@ -30,6 +29,7 @@
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.request.Query;
import com.datastax.oss.protocol.internal.response.result.SetKeyspace;
import com.google.common.base.MoreObjects;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import io.netty.channel.ChannelDuplexHandler;
Expand All @@ -38,6 +38,8 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Promise;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,13 +52,14 @@ public class InFlightHandler extends ChannelDuplexHandler {
final ChannelPromise closeStartedFuture;
private final String ownerLogPrefix;
private final BiMap<Integer, ResponseCallback> inFlight;
private final Map<Integer, ResponseCallback> orphaned;
private volatile int orphanedSize; // thread-safe view for metrics
private final long setKeyspaceTimeoutMillis;
private final EventCallback eventCallback;
private final int maxOrphanStreamIds;
private boolean closingGracefully;
private SetKeyspaceRequest setKeyspaceRequest;
private String logPrefix;
private volatile int orphanStreamIds; // volatile only for metrics

InFlightHandler(
ProtocolVersion protocolVersion,
Expand All @@ -73,6 +76,7 @@ public class InFlightHandler extends ChannelDuplexHandler {
this.ownerLogPrefix = ownerLogPrefix;
this.logPrefix = ownerLogPrefix + "|connecting...";
this.inFlight = HashBiMap.create(streamIds.getMaxAvailableIds());
this.orphaned = new HashMap<>(maxOrphanStreamIds);
this.setKeyspaceTimeoutMillis = setKeyspaceTimeoutMillis;
this.eventCallback = eventCallback;
}
Expand Down Expand Up @@ -138,22 +142,19 @@ private void write(ChannelHandlerContext ctx, RequestMessage message, ChannelPro
writeFuture.addListener(
future -> {
if (future.isSuccess()) {
if (message.responseCallback.holdStreamId()) {
message.responseCallback.onStreamIdAssigned(streamId);
}
message.responseCallback.onStreamIdAssigned(streamId);
} else {
release(streamId, ctx);
}
});
}

@SuppressWarnings("NonAtomicVolatileUpdate")
private void cancel(
ChannelHandlerContext ctx, ResponseCallback responseCallback, ChannelPromise promise) {
Integer streamId = inFlight.inverse().remove(responseCallback);
if (streamId == null) {
LOG.debug(
"[{}] Received cancellation request for unknown callback {}, skipping",
"[{}] Received cancellation for unknown or already cancelled callback {}, skipping",
logPrefix,
responseCallback);
} else {
Expand All @@ -164,15 +165,17 @@ private void cancel(
ctx.channel().close();
} else {
// We can't release the stream id, because a response might still come back from the server.
// Keep track of how many of those ids are held, because we want to replace the channel if
// it becomes too high.
orphanStreamIds += 1; // safe because the method is confined to the I/O thread
if (orphanStreamIds > maxOrphanStreamIds) {
// Keep track of those "orphaned" ids, to release them later if we get a response and the
// callback says it's the last one.
orphaned.put(streamId, responseCallback);
if (orphaned.size() > maxOrphanStreamIds) {
LOG.debug(
"[{}] Orphan stream ids exceeded the configured threshold ({}), closing gracefully",
logPrefix,
maxOrphanStreamIds);
startGracefulShutdown(ctx);
} else {
orphanedSize = orphaned.size();
}
}
}
Expand Down Expand Up @@ -215,25 +218,44 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}
} else {
ResponseCallback responseCallback = inFlight.get(streamId);
if (responseCallback == null) {
LOG.debug("[{}] Got response on orphan stream id {}, releasing", logPrefix, streamId);
release(streamId, ctx);
orphanStreamIds -= 1; // safe because the method is confined to the I/O thread
} else {
LOG.debug(
"[{}] Got response on stream id {}, completing {}",
logPrefix,
streamId,
responseCallback);
if (!responseCallback.holdStreamId()) {
boolean wasInFlight = true;
ResponseCallback callback = inFlight.get(streamId);
if (callback == null) {
wasInFlight = false;
callback = orphaned.get(streamId);
if (callback == null) {
LOG.warn("[{}] Got response on unknown stream id {}, skipping", streamId);
return;
}
}
try {
if (callback.isLastResponse(responseFrame)) {
LOG.debug(
"[{}] Got last response on {} stream id {}, completing and releasing",
wasInFlight ? "in-flight" : "orphaned",
streamId);
release(streamId, ctx);
} else {
LOG.debug(
"[{}] Got non-last response on {} stream id {}, still holding",
wasInFlight ? "in-flight" : "orphaned",
streamId);
}
try {
responseCallback.onResponse(responseFrame);
} catch (Throwable t) {
if (wasInFlight) {
callback.onResponse(responseFrame);
}
} catch (Throwable t) {
if (wasInFlight) {
callback.onFailure(
new IllegalArgumentException("Unexpected error while invoking response handler", t));
} else {
// Assume the callback is already completed, so it's better to log
Loggers.warnWithException(
LOG, "[{}] Unexpected error while invoking response handler", logPrefix, t);
LOG,
"[{}] Unexpected error while invoking response handler on stream id {}",
logPrefix,
t,
streamId);
}
}
}
Expand Down Expand Up @@ -273,11 +295,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable exception) thro

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
if (event instanceof ReleaseEvent) {
int streamId = ((ReleaseEvent) event).streamId;
LOG.debug("[{}] Releasing stream id {}", logPrefix, streamId);
release(streamId, ctx);
} else if (event instanceof SetKeyspaceEvent) {
if (event instanceof SetKeyspaceEvent) {
SetKeyspaceEvent setKeyspaceEvent = (SetKeyspaceEvent) event;
if (this.setKeyspaceRequest != null) {
setKeyspaceEvent.promise.setFailure(
Expand Down Expand Up @@ -305,7 +323,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

private ResponseCallback release(int streamId, ChannelHandlerContext ctx) {
LOG.debug("[{}] Releasing stream id {}", logPrefix, streamId);
ResponseCallback responseCallback = inFlight.remove(streamId);
ResponseCallback responseCallback =
MoreObjects.firstNonNull(inFlight.remove(streamId), orphaned.remove(streamId));
orphanedSize = orphaned.size();
streamIds.release(streamId);
// If we're in the middle of an orderly close and this was the last request, actually close
// the channel now
Expand Down Expand Up @@ -344,7 +364,7 @@ int getInFlight() {
}

int getOrphanIds() {
return orphanStreamIds;
return orphanedSize;
}

private class SetKeyspaceRequest extends ChannelHandlerRequest {
Expand Down
Expand Up @@ -47,31 +47,33 @@ public interface ResponseCallback {
void onFailure(Throwable error);

/**
* Whether to hold the stream id beyond the first response.
* Reports the stream id used for the request on the current connection.
*
* <p>By default, this is false, and the channel will release the stream id (and make it available
* for other requests) as soon as {@link #onResponse(Frame)} or {@link #onFailure(Throwable)} gets
* invoked.
* <p>This is called every time the request is written successfully to a connection (and therefore
* might multiple times in case of retries). It is guaranteed to be invoked before any response to
* the request on that connection is processed.
*
* <p>If this is true, the channel will keep the stream id assigned to this request, and {@code
* onResponse} might be invoked multiple times. {@link #onStreamIdAssigned(int)} will be called to
* notify the caller of the stream id, and it is the caller's responsibility to determine when the
* request is over, and then call {@link DriverChannel#release(int)} to release the stream id.
* <p>The default implementation does nothing. This only needs to be overridden for specialized
* requests that hold the stream id across multiple responses.
*
* <p>This is intended to allow streaming requests, that would send multiple chunks of data in
* response to a single request (this feature does not exist yet in Cassandra but might be
* implemented in the future).
* @see #isLastResponse(Frame)
*/
default boolean holdStreamId() {
return false;
default void onStreamIdAssigned(int streamId) {
// nothing to do
}

/**
* Reports the stream id to the caller if {@link #holdStreamId()} is true.
* Whether the given frame is the last response to this request.
*
* <p>This is invoked for each response received by this callback; if it returns {@code true}, the
* driver assumes that the server is no longer using this stream id, and that it can be safely
* reused to send another request.
*
* <p>By default, this will never get called.
* <p>The default implementation always returns {@code true}: regular CQL requests only have one
* response, and we can reuse the stream id as soon as we've received it. This only needs to be
* overridden for specialized requests that hold the stream id across multiple responses.
*/
default void onStreamIdAssigned(int streamId) {
// nothing to do by default
default boolean isLastResponse(Frame responseFrame) {
return true;
}
}
Expand Up @@ -47,6 +47,8 @@ public void setup() throws InterruptedException {

Mockito.when(defaultConfigProfile.getInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS))
.thenReturn(128);

Mockito.when(responseCallback.isLastResponse(any(Frame.class))).thenReturn(true);
}

@Test
Expand Down

0 comments on commit 8257234

Please sign in to comment.