Skip to content

Commit

Permalink
Gossip stateMapOrdering does not have correct ordering when both Endp…
Browse files Browse the repository at this point in the history
…ointState are in the bootstrapping set

patch by David Capwell; reviewed by Marcus Eriksson for CASSANDRA-18292
  • Loading branch information
dcapwell committed Mar 1, 2023
1 parent c747f70 commit 4e17922
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.2
* Gossip stateMapOrdering does not have correct ordering when both EndpointState are in the bootstrapping set (CASSANDRA-18292)
* Snapshot only sstables containing mismatching ranges on preview repair mismatch (CASSANDRA-17561)
* More accurate skipping of sstables in read path (CASSANDRA-18134)
* Prepare for JDK17 experimental support (CASSANDRA-18179, CASSANDRA-18258)
Expand Down
12 changes: 10 additions & 2 deletions src/java/org/apache/cassandra/gms/EndpointState.java
Expand Up @@ -184,17 +184,25 @@ void updateTimestamp()
updateTimestamp = nanoTime();
}

@VisibleForTesting
public void unsafeSetUpdateTimestamp(long value)
{
updateTimestamp = value;
}

public boolean isAlive()
{
return isAlive;
}

void markAlive()
@VisibleForTesting
public void markAlive()
{
isAlive = true;
}

void markDead()
@VisibleForTesting
public void markDead()
{
isAlive = false;
}
Expand Down
15 changes: 11 additions & 4 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Expand Up @@ -1489,7 +1489,7 @@ public boolean isAdministrativelyInactiveState(InetAddressAndPort endpoint)
return isAdministrativelyInactiveState(epState);
}

private static String getGossipStatus(EndpointState epState)
public static String getGossipStatus(EndpointState epState)
{
if (epState == null)
{
Expand Down Expand Up @@ -1536,7 +1536,8 @@ private static String getGossipStatus(EndpointState epState)
* In that case above, the {@link Map#entrySet()} ordering can be random, causing h4 to apply before h2, which will
* be rejected by subscripers (only after updating gossip causing zero retries).
*/
private static Comparator<Entry<InetAddressAndPort, EndpointState>> stateOrderMap()
@VisibleForTesting
static Comparator<Entry<InetAddressAndPort, EndpointState>> stateOrderMap()
{
// There apears to be some edge cases where the state we are ordering get added to the global state causing
// ordering to change... to avoid that rely on a cache
Expand All @@ -1553,10 +1554,16 @@ EndpointState get(Entry<InetAddressAndPort, EndpointState> e)
}
Cache cache = new Cache();
return ((Comparator<Entry<InetAddressAndPort, EndpointState>>) (e1, e2) -> {
String e1status = getGossipStatus(cache.get(e1));
String e2status = getGossipStatus(cache.get(e2));

if (Objects.equals(e1status, e2status) || (BOOTSTRAPPING_STATUS.contains(e1status) && BOOTSTRAPPING_STATUS.contains(e2status)))
return 0;

// check status first, make sure bootstrap status happens-after all others
if (BOOTSTRAPPING_STATUS.contains(getGossipStatus(cache.get(e1))))
if (BOOTSTRAPPING_STATUS.contains(e1status))
return 1;
if (BOOTSTRAPPING_STATUS.contains(getGossipStatus(cache.get(e2))))
if (BOOTSTRAPPING_STATUS.contains(e2status))
return -1;
return 0;
})
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/gms/VersionedValue.java
Expand Up @@ -102,6 +102,12 @@ private VersionedValue(String value)
this(value, VersionGenerator.getNextVersion());
}

@VisibleForTesting
public VersionedValue withVersion(int version)
{
return new VersionedValue(value, version);
}

public static VersionedValue unsafeMakeVersionedValue(String value, int version)
{
return new VersionedValue(value, version);
Expand Down
Expand Up @@ -66,6 +66,7 @@
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
Expand Down Expand Up @@ -255,7 +256,7 @@ public void paxosRepairHistoryIsntUpdatedInForcedRepair() throws Throwable
)
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " (k int primary key, v int)");
cluster.get(3).shutdown();
ClusterUtils.stopUnchecked(cluster.get(3));
InetAddressAndPort node3 = InetAddressAndPort.getByAddress(cluster.get(3).broadcastAddress());

for (int i = 0; i < 10; i++)
Expand Down Expand Up @@ -356,7 +357,7 @@ public void paxosAutoRepair() throws Throwable
)
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
cluster.get(3).shutdown();
ClusterUtils.stopUnchecked(cluster.get(3));
cluster.verbs(Verb.PAXOS_COMMIT_REQ).drop();
try
{
Expand Down
91 changes: 91 additions & 0 deletions test/unit/org/apache/cassandra/gms/GossiperTest.java
Expand Up @@ -22,12 +22,15 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.net.InetAddresses;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -46,14 +49,19 @@
import org.apache.cassandra.locator.SeedProvider;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.CassandraGenerators;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.assertj.core.api.Assertions;
import org.quicktheories.core.Gen;
import org.quicktheories.impl.Constraint;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.quicktheories.QuickTheory.qt;

public class GossiperTest
{
Expand Down Expand Up @@ -419,6 +427,89 @@ public void testNotFireDuplicatedNotificationsWithUpdateContainsOldAndNewState()
}
}

@Test
public void orderingComparator()
{
qt().forAll(epStateMapGen()).checkAssert(map -> {
Comparator<Map.Entry<InetAddressAndPort, EndpointState>> comp = Gossiper.stateOrderMap();
List<Map.Entry<InetAddressAndPort, EndpointState>> elements = new ArrayList<>(map.entrySet());
for (int i = 0; i < elements.size(); i++)
{
for (int j = 0; j < elements.size(); j++)
{
Map.Entry<InetAddressAndPort, EndpointState> e1 = elements.get(i);
boolean e1Bootstrapping = VersionedValue.BOOTSTRAPPING_STATUS.contains(Gossiper.getGossipStatus(e1.getValue()));
Map.Entry<InetAddressAndPort, EndpointState> e2 = elements.get(j);
boolean e2Bootstrapping = VersionedValue.BOOTSTRAPPING_STATUS.contains(Gossiper.getGossipStatus(e2.getValue()));
Ordering ordering = Ordering.compare(comp, e1, e2);

if (e1Bootstrapping == e2Bootstrapping)
{
// check generation
Ordering sub = Ordering.compare(e1.getValue().getHeartBeatState().getGeneration(), e2.getValue().getHeartBeatState().getGeneration());
if (sub == Ordering.EQ)
{
// check addressWPort
sub = Ordering.compare(e1.getKey(), e2.getKey());
}
Assertions.assertThat(ordering)
.describedAs("Both elements bootstrap check were equal: %s == %s", e1Bootstrapping, e2Bootstrapping)
.isEqualTo(sub);
}
else if (e1Bootstrapping)
{
Assertions.assertThat(ordering).isEqualTo(Ordering.GT);
}
else
{
Assertions.assertThat(ordering).isEqualTo(Ordering.LT);
}
}
}
});
}

enum Ordering
{
LT, EQ, GT;

static <T> Ordering compare(Comparator<T> comparator, T a, T b)
{
int rc = comparator.compare(a, b);
if (rc < 0) return LT;
if (rc == 0) return EQ;
return GT;
}

static <T extends Comparable<T>> Ordering compare(T a, T b)
{
return compare(Comparator.naturalOrder(), a, b);
}
}

private static Gen<Map<InetAddressAndPort, EndpointState>> epStateMapGen()
{
Gen<InetAddressAndPort> addressAndPorts = CassandraGenerators.INET_ADDRESS_AND_PORT_GEN;
Gen<EndpointState> states = CassandraGenerators.endpointStates();
Constraint sizeGen = Constraint.between(2, 10);
Gen<Map<InetAddressAndPort, EndpointState>> mapGen = rs -> {
int size = Math.toIntExact(rs.next(sizeGen));
Map<InetAddressAndPort, EndpointState> map = Maps.newHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++)
{
while (true)
{
InetAddressAndPort address = addressAndPorts.generate(rs);
if (map.containsKey(address)) continue;
map.put(address, states.generate(rs));
break;
}
}
return map;
};
return mapGen;
}

static class SimpleStateChangeListener implements IEndpointStateChangeSubscriber
{
static class OnChangeParams
Expand Down

0 comments on commit 4e17922

Please sign in to comment.