Skip to content

Commit

Permalink
Make StartupConnectivityChecker only run a connectivity check if ther…
Browse files Browse the repository at this point in the history
…e are no nodes which are running a version prior to Cassandra 4

patch by Isaac Reath; reviwed by Paulo Motta, Stefan Miklosovic for CASSANDRA-18968
  • Loading branch information
isaacreath authored and smiklosovic committed Nov 10, 2023
1 parent ac8a868 commit 26a7d57
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.0.12
* Make StartupConnectivityChecker only run a connectivity check if there are no nodes which are running a version prior to Cassandra 4 (CASSANDRA-18968)
* Retrieve keyspaces metadata and schema version concistently in DescribeStatement (CASSANDRA-18921)
* Gossip NPE due to shutdown event corrupting empty statuses (CASSANDRA-18913)
* Synchronize CQLSSTableWriter#build on the Schema.instance object (CASSANDRA-18317)
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -36,12 +37,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;

import static org.apache.cassandra.net.Verb.PING_REQ;
Expand Down Expand Up @@ -77,11 +77,22 @@ public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean
* @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened;
* else false.
*/
public boolean execute(Set<InetAddressAndPort> peers, Function<InetAddressAndPort, String> getDatacenterSource)
public boolean execute(Set<InetAddressAndPort> peers, Function<InetAddressAndPort, String> getDatacenterSource,
Predicate<CassandraVersion> isUpgradingFromLowerVersionThan)
{
if (peers == null || this.timeoutNanos < 0)
return true;

// Check if there are any nodes which we know are running a version prior to 4.0.
// We use this intead of Gossiper::hasMajorVersion3Nodes because in the absence of version information for a peer
// we still prefer to run the startup connectivity check.
if (isUpgradingFromLowerVersionThan.test(CassandraVersion.CASSANDRA_4_0))
{
logger.debug("Skipping startup connectivity check as some nodes may be running Cassandra version 3 or older " +
"which does not support connectivity checking.");
return true;
}

// make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection)
peers = new HashSet<>(peers);
InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/service/CassandraDaemon.java
Expand Up @@ -667,7 +667,8 @@ public void start()
{
StartupClusterConnectivityChecker connectivityChecker = StartupClusterConnectivityChecker.create(DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(),
DatabaseDescriptor.getBlockForPeersInRemoteDatacenters());
connectivityChecker.execute(Gossiper.instance.getEndpoints(), DatabaseDescriptor.getEndpointSnitch()::getDatacenter);
connectivityChecker.execute(Gossiper.instance.getEndpoints(), DatabaseDescriptor.getEndpointSnitch()::getDatacenter,
Gossiper.instance::isUpgradingFromVersionLowerThan);

// check to see if transports may start else return without starting. This is needed when in survey mode or
// when bootstrap has not completed.
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.BiPredicate;
import java.util.function.Predicate;

import org.junit.After;
import org.junit.Assert;
Expand All @@ -36,6 +37,7 @@
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.HeartBeatState;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;

public class StartupClusterConnectivityCheckerTest
Expand All @@ -44,6 +46,8 @@ public class StartupClusterConnectivityCheckerTest
private StartupClusterConnectivityChecker globalQuorumConnectivityChecker;
private StartupClusterConnectivityChecker noopChecker;
private StartupClusterConnectivityChecker zeroWaitChecker;
private StartupClusterConnectivityChecker containsCassandra3NodesChecker;
private Predicate<CassandraVersion> defaultMockIsUpgradingFromVersionLowerThan;

private static final long TIMEOUT_NANOS = 100;
private static final int NUM_PER_DC = 6;
Expand All @@ -64,6 +68,20 @@ else if (peersC.contains(endpoint))
return null;
}

private static class MockIsUpgradingFromVersionLowerThan implements Predicate<CassandraVersion>
{
CassandraVersion clusterVersion;
MockIsUpgradingFromVersionLowerThan(CassandraVersion clusterVersion)
{
this.clusterVersion = clusterVersion;
}
@Override
public boolean test(CassandraVersion other)
{
return clusterVersion.compareTo(other) < 0;
}
}

@BeforeClass
public static void before()
{
Expand All @@ -77,7 +95,8 @@ public void setUp() throws UnknownHostException
globalQuorumConnectivityChecker = new StartupClusterConnectivityChecker(TIMEOUT_NANOS, true);
noopChecker = new StartupClusterConnectivityChecker(-1, false);
zeroWaitChecker = new StartupClusterConnectivityChecker(0, false);

containsCassandra3NodesChecker = new StartupClusterConnectivityChecker(TIMEOUT_NANOS, false);
defaultMockIsUpgradingFromVersionLowerThan = new MockIsUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0);
peersA = new HashSet<>();
peersAMinusLocal = new HashSet<>();
peersA.add(FBUtilities.getBroadcastAddressAndPort());
Expand Down Expand Up @@ -114,23 +133,23 @@ public void execute_HappyPath()
{
Sink sink = new Sink(true, true, peers);
MessagingService.instance().outboundSink.add(sink);
Assert.assertTrue(localQuorumConnectivityChecker.execute(peers, this::getDatacenter));
Assert.assertTrue(localQuorumConnectivityChecker.execute(peers, this::getDatacenter, defaultMockIsUpgradingFromVersionLowerThan));
}

@Test
public void execute_NotAlive()
{
Sink sink = new Sink(false, true, peers);
MessagingService.instance().outboundSink.add(sink);
Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter));
Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter, defaultMockIsUpgradingFromVersionLowerThan));
}

@Test
public void execute_NoConnectionsAcks()
{
Sink sink = new Sink(true, false, peers);
MessagingService.instance().outboundSink.add(sink);
Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter));
Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter, defaultMockIsUpgradingFromVersionLowerThan));
}

@Test
Expand Down Expand Up @@ -183,16 +202,26 @@ public void execute_ZeroWaitHasConnections() throws InterruptedException
{
Sink sink = new Sink(true, true, new HashSet<>());
MessagingService.instance().outboundSink.add(sink);
Assert.assertFalse(zeroWaitChecker.execute(peers, this::getDatacenter));
Assert.assertFalse(zeroWaitChecker.execute(peers, this::getDatacenter, defaultMockIsUpgradingFromVersionLowerThan));
MessagingService.instance().outboundSink.clear();
}

@Test
public void execute_HasCassandra3NodesSkipsExecution()
{
Sink sink = new Sink(true, true, peers);
MessagingService.instance().outboundSink.add(sink);
Predicate<CassandraVersion> isUpgradingFromVersionLowerThan = new MockIsUpgradingFromVersionLowerThan(new CassandraVersion("3.11.0"));
Assert.assertTrue(containsCassandra3NodesChecker.execute(peers, this::getDatacenter, isUpgradingFromVersionLowerThan));
Assert.assertEquals(0, sink.seenConnectionRequests.size());
}

private void checkAvailable(StartupClusterConnectivityChecker checker, Set<InetAddressAndPort> available,
boolean shouldPass)
{
Sink sink = new Sink(true, true, available);
MessagingService.instance().outboundSink.add(sink);
Assert.assertEquals(shouldPass, checker.execute(peers, this::getDatacenter));
Assert.assertEquals(shouldPass, checker.execute(peers, this::getDatacenter, defaultMockIsUpgradingFromVersionLowerThan));
MessagingService.instance().outboundSink.clear();
}

Expand Down

0 comments on commit 26a7d57

Please sign in to comment.