diff --git a/CHANGES.txt b/CHANGES.txt index bbe9c15776a2..232ac5f56bf9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0.12 + * Make StartupConnectivityChecker only run a connectivity check if there are no nodes which are running a version prior to Cassandra 4 (CASSANDRA-18968) * Retrieve keyspaces metadata and schema version concistently in DescribeStatement (CASSANDRA-18921) * Gossip NPE due to shutdown event corrupting empty statuses (CASSANDRA-18913) * Synchronize CQLSSTableWriter#build on the Schema.instance object (CASSANDRA-18317) diff --git a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java index 8bc1e5d9a4d1..a51dbbcfdf6d 100644 --- a/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java +++ b/src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; @@ -36,12 +37,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; -import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.net.Verb.PING_REQ; @@ -77,11 +77,22 @@ public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened; * else false. */ - public boolean execute(Set peers, Function getDatacenterSource) + public boolean execute(Set peers, Function getDatacenterSource, + Predicate isUpgradingFromLowerVersionThan) { if (peers == null || this.timeoutNanos < 0) return true; + // Check if there are any nodes which we know are running a version prior to 4.0. + // We use this intead of Gossiper::hasMajorVersion3Nodes because in the absence of version information for a peer + // we still prefer to run the startup connectivity check. + if (isUpgradingFromLowerVersionThan.test(CassandraVersion.CASSANDRA_4_0)) + { + logger.debug("Skipping startup connectivity check as some nodes may be running Cassandra version 3 or older " + + "which does not support connectivity checking."); + return true; + } + // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection) peers = new HashSet<>(peers); InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 35b814f8f37e..6f74452ba462 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -667,7 +667,8 @@ public void start() { StartupClusterConnectivityChecker connectivityChecker = StartupClusterConnectivityChecker.create(DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(), DatabaseDescriptor.getBlockForPeersInRemoteDatacenters()); - connectivityChecker.execute(Gossiper.instance.getEndpoints(), DatabaseDescriptor.getEndpointSnitch()::getDatacenter); + connectivityChecker.execute(Gossiper.instance.getEndpoints(), DatabaseDescriptor.getEndpointSnitch()::getDatacenter, + Gossiper.instance::isUpgradingFromVersionLowerThan); // check to see if transports may start else return without starting. This is needed when in survey mode or // when bootstrap has not completed. diff --git a/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java index 0785f277daba..c570357ba7b6 100644 --- a/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java +++ b/test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import java.util.function.BiPredicate; +import java.util.function.Predicate; import org.junit.After; import org.junit.Assert; @@ -36,6 +37,7 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.HeartBeatState; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.utils.CassandraVersion; import org.apache.cassandra.utils.FBUtilities; public class StartupClusterConnectivityCheckerTest @@ -44,6 +46,8 @@ public class StartupClusterConnectivityCheckerTest private StartupClusterConnectivityChecker globalQuorumConnectivityChecker; private StartupClusterConnectivityChecker noopChecker; private StartupClusterConnectivityChecker zeroWaitChecker; + private StartupClusterConnectivityChecker containsCassandra3NodesChecker; + private Predicate defaultMockIsUpgradingFromVersionLowerThan; private static final long TIMEOUT_NANOS = 100; private static final int NUM_PER_DC = 6; @@ -64,6 +68,20 @@ else if (peersC.contains(endpoint)) return null; } + private static class MockIsUpgradingFromVersionLowerThan implements Predicate + { + CassandraVersion clusterVersion; + MockIsUpgradingFromVersionLowerThan(CassandraVersion clusterVersion) + { + this.clusterVersion = clusterVersion; + } + @Override + public boolean test(CassandraVersion other) + { + return clusterVersion.compareTo(other) < 0; + } + } + @BeforeClass public static void before() { @@ -77,7 +95,8 @@ public void setUp() throws UnknownHostException globalQuorumConnectivityChecker = new StartupClusterConnectivityChecker(TIMEOUT_NANOS, true); noopChecker = new StartupClusterConnectivityChecker(-1, false); zeroWaitChecker = new StartupClusterConnectivityChecker(0, false); - + containsCassandra3NodesChecker = new StartupClusterConnectivityChecker(TIMEOUT_NANOS, false); + defaultMockIsUpgradingFromVersionLowerThan = new MockIsUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0); peersA = new HashSet<>(); peersAMinusLocal = new HashSet<>(); peersA.add(FBUtilities.getBroadcastAddressAndPort()); @@ -114,7 +133,7 @@ public void execute_HappyPath() { Sink sink = new Sink(true, true, peers); MessagingService.instance().outboundSink.add(sink); - Assert.assertTrue(localQuorumConnectivityChecker.execute(peers, this::getDatacenter)); + Assert.assertTrue(localQuorumConnectivityChecker.execute(peers, this::getDatacenter, defaultMockIsUpgradingFromVersionLowerThan)); } @Test @@ -122,7 +141,7 @@ public void execute_NotAlive() { Sink sink = new Sink(false, true, peers); MessagingService.instance().outboundSink.add(sink); - Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter)); + Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter, defaultMockIsUpgradingFromVersionLowerThan)); } @Test @@ -130,7 +149,7 @@ public void execute_NoConnectionsAcks() { Sink sink = new Sink(true, false, peers); MessagingService.instance().outboundSink.add(sink); - Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter)); + Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter, defaultMockIsUpgradingFromVersionLowerThan)); } @Test @@ -183,16 +202,26 @@ public void execute_ZeroWaitHasConnections() throws InterruptedException { Sink sink = new Sink(true, true, new HashSet<>()); MessagingService.instance().outboundSink.add(sink); - Assert.assertFalse(zeroWaitChecker.execute(peers, this::getDatacenter)); + Assert.assertFalse(zeroWaitChecker.execute(peers, this::getDatacenter, defaultMockIsUpgradingFromVersionLowerThan)); MessagingService.instance().outboundSink.clear(); } + @Test + public void execute_HasCassandra3NodesSkipsExecution() + { + Sink sink = new Sink(true, true, peers); + MessagingService.instance().outboundSink.add(sink); + Predicate isUpgradingFromVersionLowerThan = new MockIsUpgradingFromVersionLowerThan(new CassandraVersion("3.11.0")); + Assert.assertTrue(containsCassandra3NodesChecker.execute(peers, this::getDatacenter, isUpgradingFromVersionLowerThan)); + Assert.assertEquals(0, sink.seenConnectionRequests.size()); + } + private void checkAvailable(StartupClusterConnectivityChecker checker, Set available, boolean shouldPass) { Sink sink = new Sink(true, true, available); MessagingService.instance().outboundSink.add(sink); - Assert.assertEquals(shouldPass, checker.execute(peers, this::getDatacenter)); + Assert.assertEquals(shouldPass, checker.execute(peers, this::getDatacenter, defaultMockIsUpgradingFromVersionLowerThan)); MessagingService.instance().outboundSink.clear(); }