Skip to content

Commit

Permalink
Catch NSE race condition and return after failing connections
Browse files Browse the repository at this point in the history
Patch by brandonwilliams; reviewed by adelapena and aleksey for
CASSANDRA-17618
  • Loading branch information
driftx committed Sep 6, 2022
1 parent 6748b8b commit 4670091
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.1-alpha2
* Fix race and return after failing connections (CASSANDRA-17618)
* Speculative execution threshold unit mismatch (CASSANDRA-17877)
* Fix BulkLoader to load entireSSTableThrottle and entireSSTableInterDcThrottle (CASSANDRA-17677)
* Fix a race condition where a keyspace can be oopened while it is being removed (CASSANDRA-17658)
Expand Down
Expand Up @@ -280,6 +280,7 @@ void initiate(ChannelHandlerContext ctx, ByteBuf in) throws IOException
logger.warn("peer {} attempted to establish an unencrypted connection (broadcast address {})",
ctx.channel().remoteAddress(), initiate.from);
failHandshake(ctx);
return;
}

if (initiate.acceptVersions != null)
Expand All @@ -305,11 +306,13 @@ void initiate(ChannelHandlerContext ctx, ByteBuf in) throws IOException
{
logger.info("peer {} only supports messaging versions higher ({}) than this node supports ({})", ctx.channel().remoteAddress(), initiate.acceptVersions.min, current_version);
failHandshake(ctx);
return;
}
else if (initiate.acceptVersions.max < accept.min)
{
logger.info("peer {} only supports messaging versions lower ({}) than this node supports ({})", ctx.channel().remoteAddress(), initiate.acceptVersions.max, minimum_version);
failHandshake(ctx);
return;
}
else
{
Expand All @@ -332,6 +335,7 @@ else if (initiate.acceptVersions.max < accept.min)
{
logger.warn("Received stream using protocol version {} (my version {}). Terminating connection", version, settings.acceptStreaming.max);
failHandshake(ctx);
return;
}
setupStreamingPipeline(initiate.from, ctx);
}
Expand Down Expand Up @@ -525,7 +529,14 @@ void setupMessagingPipeline(InetAddressAndPort from, int useMessagingVersion, in

pipeline.addLast("deserialize", handler);

pipeline.remove(this);
try
{
pipeline.remove(this);
}
catch (NoSuchElementException ex)
{
// possible race with the handshake timeout firing and removing this handler already
}
}
}

Expand Down
Expand Up @@ -32,7 +32,6 @@

import static com.google.common.collect.Iterables.getOnlyElement;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -87,8 +86,7 @@ public void testConnectionsAreRejectedWithInvalidConfig() throws Throwable

cluster.get(1).runOnInstance(() ->
{
InboundMessageHandlers inbound = getOnlyElement(MessagingService.instance().messageHandlers.values());
assertEquals(0, inbound.count());
assertTrue(MessagingService.instance().messageHandlers.isEmpty());

OutboundConnections outbound = getOnlyElement(MessagingService.instance().channelManagers.values());
assertFalse(outbound.small.isConnected() || outbound.large.isConnected() || outbound.urgent.isConnected());
Expand Down

0 comments on commit 4670091

Please sign in to comment.