Skip to content

Commit

Permalink
JAVA-2000: Fix ConcurrentModificationException during channel shutdown
Browse files Browse the repository at this point in the history
Motivation:

When a channel is shut down abruptly we iterate the inFlight map in
abortAllInFlight() to fail all pending queries. But in some cases,
failing a query will also indirectly mutate the map, causing a
ConcurrentModificationException.

We ran into this particular scenario:
- the channel initializes and the write of the initial STARTUP query is
  scheduled (but the write future is not complete yet)
- during the actual write task, an IOException is thrown ("connection
  reset by peer"). exceptionCaught() is called and invokes
  abortAllInFlight().
- abortAllInFlight() calls onFailure() on the
  ProtocolInitHandler.InitRequest corresponding to the STARTUP request.
  That calls ConnectInitHandler.setConnectFailure(), which closes the
  channel.
- closing the channel fails the write future of the STARTUP query, which
  invokes the write listener synchronously. The listener calls release()
  which removes the callback from inFlight.
- the initial iteration resumes and finds out that the map was modified.

Similar issues could happen if one of the aborted requests is a
SetKeyspaceRequest that calls abortAllInFlight() recursively.

Modifications:

- don't iterate inFlight directly, make a copy to avoid concurrent
  modifications.
- clear it immediately so that recursive invocations of
  abortAllInFlight() have no effect.
- ensure release() is lenient if the callback is not in inFlight. For
  clarity, also change it to never return the callback, the caller has
  to retrieve it itself (as already done in channelRead).

Result:

The initial call to abortInFlight() clear inFlight and fails the STARTUP
query. When the write listener invokes release(), that's a no-op because
the callback is not in the map anymore.

Co-authored-by: Greg Bestland <Greg.Bestland@datastax.com>
  • Loading branch information
olim7t and GregBestland committed Nov 2, 2018
1 parent 28e920f commit 8cb77c4
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 24 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Expand Up @@ -4,6 +4,7 @@

### 4.0.0-beta3 (in progress)

- [bug] JAVA-2000: Fix ConcurrentModificationException during channel shutdown
- [improvement] JAVA-2002: Reimplement TypeCodec.accepts to improve performance
- [improvement] JAVA-2011: Re-add ResultSet.getAvailableWithoutFetching() and isFullyFetched()
- [improvement] JAVA-2007: Make driver threads extend FastThreadLocalThread
Expand Down
Expand Up @@ -25,9 +25,9 @@
import com.datastax.oss.driver.internal.core.channel.DriverChannel.SetKeyspaceEvent;
import com.datastax.oss.driver.internal.core.protocol.FrameDecodingException;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.shaded.guava.common.base.MoreObjects;
import com.datastax.oss.driver.shaded.guava.common.collect.BiMap;
import com.datastax.oss.driver.shaded.guava.common.collect.HashBiMap;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.request.Query;
Expand All @@ -40,6 +40,7 @@
import io.netty.util.concurrent.Promise;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -273,13 +274,16 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable exception) thro
LOG.debug("[{}] Error while decoding response on stream id {}", logPrefix, streamId);
if (streamId >= 0) {
// We know which request matches the failing response, fail that one only
ResponseCallback responseCallback = release(streamId, ctx);
try {
responseCallback.onFailure(exception.getCause());
} catch (Throwable t) {
Loggers.warnWithException(
LOG, "[{}] Unexpected error while invoking failure handler", logPrefix, t);
ResponseCallback responseCallback = inFlight.get(streamId);
if (responseCallback != null) {
try {
responseCallback.onFailure(exception.getCause());
} catch (Throwable t) {
Loggers.warnWithException(
LOG, "[{}] Unexpected error while invoking failure handler", logPrefix, t);
}
}
release(streamId, ctx);
} else {
Loggers.warnWithException(
LOG,
Expand Down Expand Up @@ -325,19 +329,21 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}

private ResponseCallback release(int streamId, ChannelHandlerContext ctx) {
private void release(int streamId, ChannelHandlerContext ctx) {
LOG.trace("[{}] Releasing stream id {}", logPrefix, streamId);
ResponseCallback responseCallback =
MoreObjects.firstNonNull(inFlight.remove(streamId), orphaned.remove(streamId));
orphanedSize = orphaned.size();
streamIds.release(streamId);
// If we're in the middle of an orderly close and this was the last request, actually close
// the channel now
if (closingGracefully && inFlight.isEmpty()) {
LOG.debug("[{}] Done handling the last pending query, closing channel", logPrefix);
ctx.channel().close();
if (inFlight.remove(streamId) != null) {
// If we're in the middle of an orderly close and this was the last request, actually close
// the channel now
if (closingGracefully && inFlight.isEmpty()) {
LOG.debug("[{}] Done handling the last pending query, closing channel", logPrefix);
ctx.channel().close();
}
} else if (orphaned.remove(streamId) != null) {
orphanedSize = orphaned.size();
}
return responseCallback;
// Note: it's possible that the callback is in neither map, if we get here after a call to
// abortAllInFlight that already cleared the map (see JAVA-2000)
streamIds.release(streamId);
}

private void abortAllInFlight(DriverException cause) {
Expand All @@ -349,14 +355,19 @@ private void abortAllInFlight(DriverException cause) {
* loop)
*/
private void abortAllInFlight(DriverException cause, ResponseCallback ignore) {
for (ResponseCallback responseCallback : inFlight.values()) {
if (responseCallback != ignore) {
responseCallback.onFailure(cause);
if (!inFlight.isEmpty()) {
// Clear the map now and iterate on a copy, in case one of the onFailure calls below recurses
// back into this method
Set<ResponseCallback> toAbort = ImmutableSet.copyOf(inFlight.values());
inFlight.clear();
for (ResponseCallback responseCallback : toAbort) {
if (responseCallback != ignore) {
responseCallback.onFailure(cause);
}
}
// It's not necessary to release the stream ids, since we always call this method right before
// closing the channel
}
inFlight.clear();
// It's not necessary to release the stream ids, since we always call this method right before
// closing the channel
}

int getAvailableIds() {
Expand Down

0 comments on commit 8cb77c4

Please sign in to comment.