Skip to content
Merged
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/db/filter/ANNOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ public void serialize(ANNOptions options, DataOutputPlus out, int version) throw
{
// ANN options are only supported in DS 11 and above, so don't serialize anything if the messaging version is lower
if (version < MessagingService.VERSION_DS_11)
{
if (options != NONE)
throw new IllegalStateException("Unable to serialize ANN options with messaging version: " + version);
return;
}

int flags = flags(options);
out.writeInt(flags);
Expand Down
14 changes: 14 additions & 0 deletions src/java/org/apache/cassandra/net/EndpointMessagingVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ public class EndpointMessagingVersions
// protocol versions of the other nodes in the cluster
private final ConcurrentMap<InetAddressAndPort, Integer> versions = new NonBlockingHashMap<>();

public EndpointMessagingVersions()
{
}

private EndpointMessagingVersions(EndpointMessagingVersions versions)
{
this.versions.putAll(versions.versions);
}

/**
* @return the last version associated with address, or @param version if this is the first such version
*/
Expand Down Expand Up @@ -91,4 +100,9 @@ public boolean knows(InetAddressAndPort endpoint)
{
return versions.containsKey(endpoint);
}

public EndpointMessagingVersions copy()
{
return new EndpointMessagingVersions(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class InboundMessageHandler extends AbstractMessageHandler
private final ConnectionType type;
private final InetAddressAndPort self;
private final InetAddressAndPort peer;
private final int version;
final int version;

private final InboundMessageCallbacks callbacks;
private final Consumer<Message<?>> consumer;
Expand Down
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/net/InboundMessageHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,13 @@ private long sumCounters(ToLongFunction<InboundCounters> mapping)
+ mapping.applyAsLong(legacyCounters);
}

@VisibleForTesting
public void assertHandlersMessagingVersion(int expectedVersion)
{
for (InboundMessageHandler handler : handlers)
assert handler.version == expectedVersion : "Expected all handlers to be at version " + expectedVersion + " but found " + handler.version;
}

interface HandlerProvider
{
InboundMessageHandler provide(FrameDecoder decoder,
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/net/OutboundConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,8 @@ void onCompletedHandshake(Result<MessagingSuccess> result)
assert !state.isClosed();

MessagingSuccess success = result.success();
messagingVersion = success.messagingVersion;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the whole code path to make sure this is the right fix, as the way messaging versions are handled is far from obvious. It is indeed the right one as it ultimately affect the established.messagingVersion as used here.

Just one additional note, should we also invoke settings.endpointToVersion.set() with the new version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the whole code path to make sure this is the right fix, as the way messaging versions are handled is far from obvious. It is indeed the right one as it ultimately affect the established.messagingVersion as used here.

Thanks for reading it closely. I set it here with the intention of using it for the Established constructor, so we're on the same page there.

Just one additional note, should we also invoke settings.endpointToVersion.set() with the new version?

I tested with this yesterday and I will test a bit more today. My primary reason for pushing this without that additional line is from the InboundConnectionInitator here:

// record the "true" endpoint, i.e. the one the peer is identified with, as opposed to the socket it connected over
instance().versions.set(from, maxMessagingVersion);

It seems like these might be "okay" to diverge, but that's only true if the endpointToVersion views the peer as having a greater version than it actually does (because the only reason the connection version in that case is that the local host has a lower version). However, maybe the difference is not always that the connection is LT the remote peer.

I can confirm that these numbers vary without settings.endpointToVersion.set().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look at this a bit more today to come up with a decision.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbtourist - I added assert messagingVersion <= settings.endpointToVersion.get(settings.to) to that code block and the ConnectionTest tests passed. I think that likely means my understanding is correct. I don't know all of the details here, and I definitely don't know the past implementations that we need to be compatible with. Let me know if you think I should add settings.endpointToVersion.set(...), but at this time, I don't think it is strictly necessary for this patch.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My primary reason for pushing this without that additional line is from the InboundConnectionInitator here:

I've seen that, but the versions object you point at is not the same as the one used in settings, which seems specific to the connection. My proof to that is in the case RETRY block, we do set the version; wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize they were different versions objects. I saw the retry block, but wasn't quite sure how things integrated. I am happy to add the assignment in, if you think that is right. I just haven't had a chance to fully vet it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The versions inside settings.endpointToVersion don't seem to be used anywhere seriously, but setting the right version there seems correct to me nonetheless. Though honestly this code is not the best, so please take your time to vet my suggestion.

Copy link
Member Author

@michaeljmarshall michaeljmarshall Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

settings.endpointToVersion is set by calling endpointToVersion() in a constructor, which is:

    public EndpointMessagingVersions endpointToVersion()
    {
        if (endpointToVersion == null)
            return instance().versions;
        return endpointToVersion;
    }

so unless I'm mistaken, there is only one EndpointMessagingVersions object. I don't see another way to configure it. As such, I hesitate to change this value. It seems like its own distinct change. I will leave in the assertion so that it'll trigger test failures if the value is LT the agreed upon version for the connection.

Copy link
Member Author

@michaeljmarshall michaeljmarshall Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After adding the assertion and running the upgrade test, I hit the following:

java.lang.AssertionError: Found 101 > 100
	at org.apache.cassandra.net.OutboundConnection$1Initiate.onCompletedHandshake(OutboundConnection.java:1125)
	at org.apache.cassandra.net.OutboundConnection$1Initiate.lambda$attempt$1(OutboundConnection.java:1227)
	at org.apache.cassandra.net.AsyncPromise.invokeListener(AsyncPromise.java:364)
	at org.apache.cassandra.net.AsyncPromise.lambda$appendListener$2(AsyncPromise.java:379)
	at org.apache.cassandra.net.AsyncPromise.invokeListener(AsyncPromise.java:364)
	at org.apache.cassandra.net.AsyncPromise.notifyListeners(AsyncPromise.java:353)
	at org.apache.cassandra.net.AsyncPromise.trySet(AsyncPromise.java:185)
	at org.apache.cassandra.net.AsyncPromise.trySuccess(AsyncPromise.java:144)

After analyzing it and adding additional logs to CC (relevant logs pasted below), we are getting into trouble because IP addresses are getting reused and the versions map doesn't get cleaned up. writer-0 uses ds 10 and writer-1 uses ds 11, but they both end up with the same ip address because their runtimes do not overlap. The key log line comes here: Initiate(request: 100, min: 10, max: 101, type: SMALL_MESSAGES, framing: true, from: /192.168.240.4:7000) where we can see that the DS 11 coordinator sends a messaging requesting version 100 (ds 10) instead of 101 (ds 11).

This confirms that the versions map is shared. It also indicates we should either (1) update the map or (2) purge the map after a peer leaves (given the times in the logs, this doesn't appear automatic, but perhaps there is some timeout that must be hit before we purge the old versions).

[coordinatorDS11] INFO  [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,130 OutboundConnection.java:1222 - attempting to connect /192.168.240.4:7000->/192.168.240.5:7000-SMALL_MESSAGES-[no-channel] connecting to /192.168.240.5:7000(/192.168.240.5:7000), version = 101, accept AcceptVersions(min: 10, max: 101, dse: 4), framing = CRC, encryption = unencrypted
[writer-0] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:01:54,155 InboundConnectionInitiator.java:506 - /192.168.240.4:7000(/192.168.240.4:52266)->/192.168.240.5:7000-SMALL_MESSAGES-bd0b46c6 messaging connection established, version = 100, framing = CRC, encryption = unencrypted
[coordinatorDS11] DEBUG [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,164 EndpointMessagingVersions.java:67 - Assuming current protocol version for /192.168.240.5:7000
[coordinatorDS11] INFO  [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,165 EndpointMessagingVersions.java:45 - Setting version 100 for /192.168.240.5:7000
[coordinatorDS11] INFO  [Messaging-EventLoop-5-1] 2025-03-07 06:01:54,182 OutboundConnection.java:1157 - /192.168.240.4:7000(/192.168.240.4:52266)->/192.168.240.5:7000-SMALL_MESSAGES-f7ff5fd6 successfully connected, version = 100, framing = CRC, encryption = unencrypted

...

[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 OutboundConnection.java:1741 - called messagingVersion() /192.168.240.4:7000->/192.168.240.5:7000-SMALL_MESSAGES-[no-channel] messaging version is 100
[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,106 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,107 OutboundConnection.java:1222 - attempting to connect /192.168.240.4:7000->/192.168.240.5:7000-SMALL_MESSAGES-[no-channel] connecting to /192.168.240.5:7000(/192.168.240.5:7000), version = 100, accept AcceptVersions(min: 10, max: 101, dse: 4), framing = CRC, encryption = unencrypted
[writer-1] TRACE [Messaging-EventLoop-5-1] 2025-03-07 06:02:37,152 InboundConnectionInitiator.java:252 - Received handshake initiation message from peer /192.168.240.4:49722, message = Initiate(request: 100, min: 10, max: 101, type: SMALL_MESSAGES, framing: true, from: /192.168.240.4:7000)
[writer-1] TRACE [Messaging-EventLoop-5-1] 2025-03-07 06:02:37,155 InboundConnectionInitiator.java:263 - Connection version 101 (min 10) from /192.168.240.4:7000
[coordinatorDS11] INFO  [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,159 EndpointMessagingVersions.java:72 - Found version 100 for /192.168.240.5:7000, using min of that and current version 100
[writer-1] INFO  [Messaging-EventLoop-5-1] 2025-03-07 06:02:37,159 EndpointMessagingVersions.java:45 - Setting version 101 for /192.168.240.4:7000
[coordinatorDS11] ERROR [Messaging-EventLoop-5-2] 2025-03-07 06:02:37,160 AsyncPromise.java:368 - Failed to invoke listener org.apache.cassandra.net.OutboundConnection$1Initiate$$Lambda/0x00007f8bd44083d0@164bc11b to (success: org.apache.cassandra.net.OutboundConnectionInitiator$Result$MessagingSuccess@53f7c87)
[coordinatorDS11] java.lang.AssertionError: Found 101 > 100
[coordinatorDS11] 	at org.apache.cassandra.net.OutboundConnection$1Initiate.onCompletedHandshake(OutboundConnection.java:1125)
[coordinatorDS11] 	at org.apache.cassandra.net.OutboundConnection$1Initiate.lambda$attempt$1(OutboundConnection.java:1234)
[coordinatorDS11] 	at org.apache.cassandra.net.AsyncPromise.invokeListener(AsyncPromise.java:364)

Copy link
Member Author

@michaeljmarshall michaeljmarshall Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the gossiper calls MessagingService.instance().versions.reset(endpoint);. So perhaps this isn't code that is likely to be encountered. Either way, I am good to set the value now.

settings.endpointToVersion.set(settings.to, messagingVersion);
debug.onConnect(success.messagingVersion, settings);
state.disconnected().maintenance.cancel(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,10 @@ public OutboundConnectionSettings withDefaults(ConnectionCategory category)
applicationSendQueueReserveGlobalCapacityInBytes(),
tcpNoDelay(), flushLowWaterMark, flushHighWaterMark,
tcpConnectTimeoutInMS(), tcpUserTimeoutInMS(category), acceptVersions(category),
from(), socketFactory(), callbacks(), debug(), endpointToVersion());
from(), socketFactory(), callbacks(), debug(),
// If a set of versions is passed, make sure we do a copy of it, as the version might be later updated
// depending on the handshake result (i.e. nodes might handshake a different version)
endpointToVersion().copy());
}

private static boolean isInLocalDC(IEndpointSnitch snitch, InetAddressAndPort localHost, InetAddressAndPort remoteHost)
Expand Down
27 changes: 20 additions & 7 deletions test/unit/org/apache/cassandra/db/filter/ANNOptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,26 @@ private void testTransport(String query, ANNOptions expectedOptions)

// ...with a version that doesn't support ANN options
out = new DataOutputBuffer();
ReadCommand.serializer.serialize(command, out, MessagingService.VERSION_DS_10);
Assertions.assertThat(ReadCommand.serializer.serializedSize(command, MessagingService.VERSION_DS_10))
.isEqualTo(out.buffer().remaining());
in = new DataInputBuffer(out.buffer(), true);
command = ReadCommand.serializer.deserialize(in, MessagingService.VERSION_DS_10);
actualOptions = command.rowFilter().annOptions();
Assertions.assertThat(actualOptions).isEqualTo(ANNOptions.NONE);
if (expectedOptions != ANNOptions.NONE) {
try
{
ReadCommand.serializer.serialize(command, out, MessagingService.VERSION_DS_10);
}
catch (IllegalStateException e)
{
// expected
Assertions.assertThat(e)
.hasMessageContaining("Unable to serialize ANN options with messaging version: " + MessagingService.VERSION_DS_10);
}
} else {
ReadCommand.serializer.serialize(command, out, MessagingService.VERSION_DS_10);
Assertions.assertThat(ReadCommand.serializer.serializedSize(command, MessagingService.VERSION_DS_10))
.isEqualTo(out.buffer().remaining());
in = new DataInputBuffer(out.buffer(), true);
command = ReadCommand.serializer.deserialize(in, MessagingService.VERSION_DS_10);
actualOptions = command.rowFilter().annOptions();
Assertions.assertThat(actualOptions).isEqualTo(ANNOptions.NONE);
}
}
catch (IOException e)
{
Expand Down
60 changes: 51 additions & 9 deletions test/unit/org/apache/cassandra/net/ConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
import static org.apache.cassandra.net.MessagingService.VERSION_30;
import static org.apache.cassandra.net.MessagingService.VERSION_3014;
import static org.apache.cassandra.net.MessagingService.VERSION_40;
import static org.apache.cassandra.net.MessagingService.VERSION_DS_10;
import static org.apache.cassandra.net.MessagingService.VERSION_DS_11;
import static org.apache.cassandra.net.MessagingService.minimum_version;
import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.net.MessagingService.current_version;
import static org.apache.cassandra.net.ConnectionUtils.*;
Expand Down Expand Up @@ -121,17 +124,24 @@ public void resetVerbs() throws Throwable
timeouts.clear();
}

private static volatile long originalRpcTimeout = 0;

@BeforeClass
public static void startup()
{
DatabaseDescriptor.daemonInitialization();
CommitLog.instance.start();
// At the time of this commit, the default is 20 seconds and leads to significant delays
// in this test class, especially in testMessagePurging and testCloseIfEndpointDown.
originalRpcTimeout = DatabaseDescriptor.getRpcTimeout(TimeUnit.MILLISECONDS);
DatabaseDescriptor.setRpcTimeout(5000L);
}

@AfterClass
public static void cleanup() throws InterruptedException
{
factory.shutdownNow();
DatabaseDescriptor.setRpcTimeout(originalRpcTimeout);
}

interface SendTest
Expand Down Expand Up @@ -192,30 +202,54 @@ Settings override(Settings settings)

// 30 is used for CNDB compatibility
static final AcceptVersions legacy = new AcceptVersions(VERSION_3014, VERSION_3014);
static final AcceptVersions ds10 = new AcceptVersions(minimum_version, VERSION_DS_10);
static final AcceptVersions ds11 = new AcceptVersions(minimum_version, VERSION_DS_11);
static final AcceptVersions current = new AcceptVersions(current_version, current_version);

static final List<Function<Settings, Settings>> MODIFIERS = ImmutableList.of(
static final List<Function<Settings, Settings>> MESSAGGING_VERSIONS = ImmutableList.of(
settings -> settings.outbound(outbound -> outbound.withAcceptVersions(legacy))
.inbound(inbound -> inbound.withAcceptMessaging(legacy)),
// Mismatched versions (in both directions) to ensure both peers will still agree on the same version.
settings -> settings.outbound(outbound -> outbound.withAcceptVersions(ds11))
.inbound(inbound -> inbound.withAcceptMessaging(ds10)),
settings -> settings.outbound(outbound -> outbound.withAcceptVersions(ds10))
.inbound(inbound -> inbound.withAcceptMessaging(ds11)),
// This setting ensures that we cover the current case for the power set where no versions are overridden.
settings -> settings.outbound(outbound -> outbound.withAcceptVersions(current))
.inbound(inbound -> inbound.withAcceptMessaging(current))
);


static final List<Function<Settings, Settings>> MODIFIERS = ImmutableList.of(
settings -> settings.outbound(outbound -> outbound.withEncryption(encryptionOptions))
.inbound(inbound -> inbound.withEncryption(encryptionOptions)),
settings -> settings.outbound(outbound -> outbound.withFraming(LZ4))
);

// Messaging versions are a kind of modifier, but they can only be applied once per setting, so they are broken
// out into a separate list.
static final List<Settings> SETTINGS = applyPowerSet(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra complexity of these settings appears to be an issue for some tests. In looking closer, I can see that the rpc timeout is affecting some, like testMessagePurging. I think it might also affect testCloseIfEndpointDown. For example, calling DatabaseDescriptor.setRpcTimeout(5000L);, speeds up these tests a bit. The test failures appear to be timeouts, so maybe there is something to look into here. I haven't seen legitimate failures yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, the delay in testMessagePurging comes from this block:

void onFailure(Throwable cause)
{
if (cause instanceof ConnectException)
noSpamLogger.info("{} failed to connect", id(), cause);
else
noSpamLogger.error("{} failed to connect", id(), cause);
JVMStabilityInspector.inspectThrowable(cause);
if (hasPending())
{
Promise<Result<MessagingSuccess>> result = new AsyncPromise<>(eventLoop);
state = new Connecting(state.disconnected(), result, eventLoop.schedule(() -> attempt(result), max(100, retryRateMillis), MILLISECONDS));
retryRateMillis = min(1000, retryRateMillis * 2);
}
else
{
// this Initiate will be discarded
state = Disconnected.dormant(state.disconnected().maintenance);
}
}

ImmutableList.of(Settings.SMALL, Settings.LARGE),
ImmutableList.of(ConnectionTest.Settings.SMALL, ConnectionTest.Settings.LARGE),
MESSAGGING_VERSIONS,
MODIFIERS
);

private static <T> List<T> applyPowerSet(List<T> settings, List<Function<T, T>> modifiers)
private static List<Settings> applyPowerSet(List<ConnectionTest.Settings> settings,
List<Function<ConnectionTest.Settings, ConnectionTest.Settings>> messagingVersions,
List<Function<ConnectionTest.Settings, ConnectionTest.Settings>> modifiers)
{
List<T> result = new ArrayList<>();
for (Set<Function<T, T>> set : Sets.powerSet(new HashSet<>(modifiers)))
List<Settings> result = new ArrayList<>();
for (Function<ConnectionTest.Settings, ConnectionTest.Settings> messagingVersion : messagingVersions)
{
for (T s : settings)
for (Set<Function<Settings, ConnectionTest.Settings>> set : Sets.powerSet(new HashSet<>(modifiers)))
{
for (Function<T, T> f : set)
s = f.apply(s);
result.add(s);
for (ConnectionTest.Settings s : settings)
{
for (Function<Settings, ConnectionTest.Settings> f : set)
s = f.apply(s);
s = messagingVersion.apply(s);
result.add(s);
}
}
}
return result;
Expand Down Expand Up @@ -306,6 +340,10 @@ public void testSendSmall() throws Throwable
.expired ( 0, 0)
.error ( 0, 0)
.check();

// Ensure version is the same
inbound.assertHandlersMessagingVersion(outbound.messagingVersion());
Assert.assertEquals(outbound.settings().endpointToVersion.get(endpoint), outbound.messagingVersion());
});
}

Expand Down Expand Up @@ -360,6 +398,10 @@ public long serializedSize(Object noPayload, int version)
.expired ( 0, 0)
.error ( 0, 0)
.check();

// Ensure version is the same
inbound.assertHandlersMessagingVersion(outbound.messagingVersion());
Assert.assertEquals(outbound.settings().endpointToVersion.get(endpoint), outbound.messagingVersion());
});
}

Expand Down