diff --git a/src/java/org/apache/cassandra/db/filter/ANNOptions.java b/src/java/org/apache/cassandra/db/filter/ANNOptions.java index f811372bfcfb..f3ad29d064c8 100644 --- a/src/java/org/apache/cassandra/db/filter/ANNOptions.java +++ b/src/java/org/apache/cassandra/db/filter/ANNOptions.java @@ -179,7 +179,11 @@ public void serialize(ANNOptions options, DataOutputPlus out, int version) throw { // ANN options are only supported in DS 11 and above, so don't serialize anything if the messaging version is lower if (version < MessagingService.VERSION_DS_11) + { + if (options != NONE) + throw new IllegalStateException("Unable to serialize ANN options with messaging version: " + version); return; + } int flags = flags(options); out.writeInt(flags); diff --git a/src/java/org/apache/cassandra/net/EndpointMessagingVersions.java b/src/java/org/apache/cassandra/net/EndpointMessagingVersions.java index 15da86203400..5ddba37a8907 100644 --- a/src/java/org/apache/cassandra/net/EndpointMessagingVersions.java +++ b/src/java/org/apache/cassandra/net/EndpointMessagingVersions.java @@ -37,6 +37,15 @@ public class EndpointMessagingVersions // protocol versions of the other nodes in the cluster private final ConcurrentMap versions = new NonBlockingHashMap<>(); + public EndpointMessagingVersions() + { + } + + private EndpointMessagingVersions(EndpointMessagingVersions versions) + { + this.versions.putAll(versions.versions); + } + /** * @return the last version associated with address, or @param version if this is the first such version */ @@ -91,4 +100,9 @@ public boolean knows(InetAddressAndPort endpoint) { return versions.containsKey(endpoint); } + + public EndpointMessagingVersions copy() + { + return new EndpointMessagingVersions(this); + } } diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandler.java b/src/java/org/apache/cassandra/net/InboundMessageHandler.java index bee9ecf27871..652ab90b2acf 100644 --- a/src/java/org/apache/cassandra/net/InboundMessageHandler.java +++ b/src/java/org/apache/cassandra/net/InboundMessageHandler.java @@ -75,7 +75,7 @@ public class InboundMessageHandler extends AbstractMessageHandler private final ConnectionType type; private final InetAddressAndPort self; private final InetAddressAndPort peer; - private final int version; + final int version; private final InboundMessageCallbacks callbacks; private final Consumer> consumer; diff --git a/src/java/org/apache/cassandra/net/InboundMessageHandlers.java b/src/java/org/apache/cassandra/net/InboundMessageHandlers.java index 5582cfda7581..2b176a113054 100644 --- a/src/java/org/apache/cassandra/net/InboundMessageHandlers.java +++ b/src/java/org/apache/cassandra/net/InboundMessageHandlers.java @@ -437,6 +437,13 @@ private long sumCounters(ToLongFunction mapping) + mapping.applyAsLong(legacyCounters); } + @VisibleForTesting + public void assertHandlersMessagingVersion(int expectedVersion) + { + for (InboundMessageHandler handler : handlers) + assert handler.version == expectedVersion : "Expected all handlers to be at version " + expectedVersion + " but found " + handler.version; + } + interface HandlerProvider { InboundMessageHandler provide(FrameDecoder decoder, diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java index 21bb06f9c6cf..2c2dfa7d83f4 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundConnection.java @@ -1117,6 +1117,8 @@ void onCompletedHandshake(Result result) assert !state.isClosed(); MessagingSuccess success = result.success(); + messagingVersion = success.messagingVersion; + settings.endpointToVersion.set(settings.to, messagingVersion); debug.onConnect(success.messagingVersion, settings); state.disconnected().maintenance.cancel(false); diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java index 0f1cb04b7feb..a6fed5047a37 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java +++ b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java @@ -498,7 +498,10 @@ public OutboundConnectionSettings withDefaults(ConnectionCategory category) applicationSendQueueReserveGlobalCapacityInBytes(), tcpNoDelay(), flushLowWaterMark, flushHighWaterMark, tcpConnectTimeoutInMS(), tcpUserTimeoutInMS(category), acceptVersions(category), - from(), socketFactory(), callbacks(), debug(), endpointToVersion()); + from(), socketFactory(), callbacks(), debug(), + // If a set of versions is passed, make sure we do a copy of it, as the version might be later updated + // depending on the handshake result (i.e. nodes might handshake a different version) + endpointToVersion().copy()); } private static boolean isInLocalDC(IEndpointSnitch snitch, InetAddressAndPort localHost, InetAddressAndPort remoteHost) diff --git a/test/unit/org/apache/cassandra/db/filter/ANNOptionsTest.java b/test/unit/org/apache/cassandra/db/filter/ANNOptionsTest.java index 8b414f06bca0..e3639462113d 100644 --- a/test/unit/org/apache/cassandra/db/filter/ANNOptionsTest.java +++ b/test/unit/org/apache/cassandra/db/filter/ANNOptionsTest.java @@ -241,13 +241,26 @@ private void testTransport(String query, ANNOptions expectedOptions) // ...with a version that doesn't support ANN options out = new DataOutputBuffer(); - ReadCommand.serializer.serialize(command, out, MessagingService.VERSION_DS_10); - Assertions.assertThat(ReadCommand.serializer.serializedSize(command, MessagingService.VERSION_DS_10)) - .isEqualTo(out.buffer().remaining()); - in = new DataInputBuffer(out.buffer(), true); - command = ReadCommand.serializer.deserialize(in, MessagingService.VERSION_DS_10); - actualOptions = command.rowFilter().annOptions(); - Assertions.assertThat(actualOptions).isEqualTo(ANNOptions.NONE); + if (expectedOptions != ANNOptions.NONE) { + try + { + ReadCommand.serializer.serialize(command, out, MessagingService.VERSION_DS_10); + } + catch (IllegalStateException e) + { + // expected + Assertions.assertThat(e) + .hasMessageContaining("Unable to serialize ANN options with messaging version: " + MessagingService.VERSION_DS_10); + } + } else { + ReadCommand.serializer.serialize(command, out, MessagingService.VERSION_DS_10); + Assertions.assertThat(ReadCommand.serializer.serializedSize(command, MessagingService.VERSION_DS_10)) + .isEqualTo(out.buffer().remaining()); + in = new DataInputBuffer(out.buffer(), true); + command = ReadCommand.serializer.deserialize(in, MessagingService.VERSION_DS_10); + actualOptions = command.rowFilter().annOptions(); + Assertions.assertThat(actualOptions).isEqualTo(ANNOptions.NONE); + } } catch (IOException e) { diff --git a/test/unit/org/apache/cassandra/net/ConnectionTest.java b/test/unit/org/apache/cassandra/net/ConnectionTest.java index e57e1e84f176..f165dec3442e 100644 --- a/test/unit/org/apache/cassandra/net/ConnectionTest.java +++ b/test/unit/org/apache/cassandra/net/ConnectionTest.java @@ -74,6 +74,9 @@ import static org.apache.cassandra.net.MessagingService.VERSION_30; import static org.apache.cassandra.net.MessagingService.VERSION_3014; import static org.apache.cassandra.net.MessagingService.VERSION_40; +import static org.apache.cassandra.net.MessagingService.VERSION_DS_10; +import static org.apache.cassandra.net.MessagingService.VERSION_DS_11; +import static org.apache.cassandra.net.MessagingService.minimum_version; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.MessagingService.current_version; import static org.apache.cassandra.net.ConnectionUtils.*; @@ -121,17 +124,24 @@ public void resetVerbs() throws Throwable timeouts.clear(); } + private static volatile long originalRpcTimeout = 0; + @BeforeClass public static void startup() { DatabaseDescriptor.daemonInitialization(); CommitLog.instance.start(); + // At the time of this commit, the default is 20 seconds and leads to significant delays + // in this test class, especially in testMessagePurging and testCloseIfEndpointDown. + originalRpcTimeout = DatabaseDescriptor.getRpcTimeout(TimeUnit.MILLISECONDS); + DatabaseDescriptor.setRpcTimeout(5000L); } @AfterClass public static void cleanup() throws InterruptedException { factory.shutdownNow(); + DatabaseDescriptor.setRpcTimeout(originalRpcTimeout); } interface SendTest @@ -192,30 +202,54 @@ Settings override(Settings settings) // 30 is used for CNDB compatibility static final AcceptVersions legacy = new AcceptVersions(VERSION_3014, VERSION_3014); + static final AcceptVersions ds10 = new AcceptVersions(minimum_version, VERSION_DS_10); + static final AcceptVersions ds11 = new AcceptVersions(minimum_version, VERSION_DS_11); + static final AcceptVersions current = new AcceptVersions(current_version, current_version); - static final List> MODIFIERS = ImmutableList.of( + static final List> MESSAGGING_VERSIONS = ImmutableList.of( settings -> settings.outbound(outbound -> outbound.withAcceptVersions(legacy)) .inbound(inbound -> inbound.withAcceptMessaging(legacy)), + // Mismatched versions (in both directions) to ensure both peers will still agree on the same version. + settings -> settings.outbound(outbound -> outbound.withAcceptVersions(ds11)) + .inbound(inbound -> inbound.withAcceptMessaging(ds10)), + settings -> settings.outbound(outbound -> outbound.withAcceptVersions(ds10)) + .inbound(inbound -> inbound.withAcceptMessaging(ds11)), + // This setting ensures that we cover the current case for the power set where no versions are overridden. + settings -> settings.outbound(outbound -> outbound.withAcceptVersions(current)) + .inbound(inbound -> inbound.withAcceptMessaging(current)) + ); + + + static final List> MODIFIERS = ImmutableList.of( settings -> settings.outbound(outbound -> outbound.withEncryption(encryptionOptions)) .inbound(inbound -> inbound.withEncryption(encryptionOptions)), settings -> settings.outbound(outbound -> outbound.withFraming(LZ4)) ); + // Messaging versions are a kind of modifier, but they can only be applied once per setting, so they are broken + // out into a separate list. static final List SETTINGS = applyPowerSet( - ImmutableList.of(Settings.SMALL, Settings.LARGE), + ImmutableList.of(ConnectionTest.Settings.SMALL, ConnectionTest.Settings.LARGE), + MESSAGGING_VERSIONS, MODIFIERS ); - private static List applyPowerSet(List settings, List> modifiers) + private static List applyPowerSet(List settings, + List> messagingVersions, + List> modifiers) { - List result = new ArrayList<>(); - for (Set> set : Sets.powerSet(new HashSet<>(modifiers))) + List result = new ArrayList<>(); + for (Function messagingVersion : messagingVersions) { - for (T s : settings) + for (Set> set : Sets.powerSet(new HashSet<>(modifiers))) { - for (Function f : set) - s = f.apply(s); - result.add(s); + for (ConnectionTest.Settings s : settings) + { + for (Function f : set) + s = f.apply(s); + s = messagingVersion.apply(s); + result.add(s); + } } } return result; @@ -306,6 +340,10 @@ public void testSendSmall() throws Throwable .expired ( 0, 0) .error ( 0, 0) .check(); + + // Ensure version is the same + inbound.assertHandlersMessagingVersion(outbound.messagingVersion()); + Assert.assertEquals(outbound.settings().endpointToVersion.get(endpoint), outbound.messagingVersion()); }); } @@ -360,6 +398,10 @@ public long serializedSize(Object noPayload, int version) .expired ( 0, 0) .error ( 0, 0) .check(); + + // Ensure version is the same + inbound.assertHandlersMessagingVersion(outbound.messagingVersion()); + Assert.assertEquals(outbound.settings().endpointToVersion.get(endpoint), outbound.messagingVersion()); }); }