Skip to content

Commit

Permalink
fix flaky GossipInfoTableTest
Browse files Browse the repository at this point in the history
patch by Stefan Miklosovic; reviewed by Andrés de la Peña for CASSANDRA-17584
  • Loading branch information
smiklosovic committed Jun 7, 2022
1 parent 9b4784b commit 457e16c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 33 deletions.
18 changes: 16 additions & 2 deletions src/java/org/apache/cassandra/db/virtual/GossipInfoTable.java
Expand Up @@ -20,7 +20,9 @@

import java.util.EnumSet;
import java.util.Map;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.db.marshal.InetAddressType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
Expand Down Expand Up @@ -59,14 +61,23 @@ final class GossipInfoTable extends AbstractVirtualTable
STATES_FOR_VALUES = applicationStates.toArray(new ApplicationState[0]);
}

private final Supplier<Map<InetAddressAndPort, EndpointState>> endpointStateMapSupplier;

/**
* Construct a new {@link GossipInfoTable} for the given {@code keyspace}.
*
* @param keyspace the name of the keyspace
*/
GossipInfoTable(String keyspace)
{
this(keyspace, () -> Gossiper.instance.endpointStateMap);
}

@VisibleForTesting
GossipInfoTable(String keyspace, Supplier<Map<InetAddressAndPort, EndpointState>> endpointStateMapSupplier)
{
super(buildTableMetadata(keyspace));
this.endpointStateMapSupplier = endpointStateMapSupplier;
}

/**
Expand All @@ -76,10 +87,13 @@ final class GossipInfoTable extends AbstractVirtualTable
public DataSet data()
{
SimpleDataSet result = new SimpleDataSet(metadata());
for (Map.Entry<InetAddressAndPort, EndpointState> entry : Gossiper.instance.endpointStateMap.entrySet())
for (Map.Entry<InetAddressAndPort, EndpointState> entry : endpointStateMapSupplier.get().entrySet())
{
InetAddressAndPort endpoint = entry.getKey();
EndpointState localState = entry.getValue();
// we are making a copy of endpoint state as a value of an entry of the returned map
// might be updated on the fly by LoadBroadcaster, and we want to be sure that
// the returned data are capturing a particular point in time
EndpointState localState = new EndpointState(entry.getValue());

SimpleDataSet dataSet = result.row(endpoint.getAddress(), endpoint.getPort())
.column(HOSTNAME, endpoint.getHostName())
Expand Down
64 changes: 33 additions & 31 deletions test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java
Expand Up @@ -18,11 +18,12 @@

package org.apache.cassandra.db.virtual;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

import com.google.common.collect.ImmutableList;
import org.junit.Before;
import org.awaitility.Awaitility;
import org.junit.BeforeClass;
import org.junit.Test;

Expand All @@ -33,64 +34,58 @@
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.InetAddressAndPort;

import static com.google.common.collect.ImmutableList.of;
import static org.assertj.core.api.Assertions.assertThat;

public class GossipInfoTableTest extends CQLTester
{
private static final String KS_NAME = "vts";

@SuppressWarnings("FieldCanBeLocal")
private GossipInfoTable table;

@BeforeClass
public static void setUpClass()
{
CQLTester.setUpClass();
}

@Before
public void config()
{
table = new GossipInfoTable(KS_NAME);
VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME, ImmutableList.of(table)));
}

@Test
public void testSelectAllWhenGossipInfoIsEmpty() throws Throwable
{
assertEmpty(execute("SELECT * FROM vts.gossip_info"));
// we have not triggered gossiper yet
VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace("vts_1",
of(new GossipInfoTable("vts_1", HashMap::new))));
assertEmpty(execute("SELECT * FROM vts_1.gossip_info"));
}

@SuppressWarnings("deprecation")
@Test
public void testSelectAllWithStateTransitions() throws Throwable
{
try
{
requireNetwork(); // triggers gossiper

UntypedResultSet resultSet = execute("SELECT * FROM vts.gossip_info");
ConcurrentMap<InetAddressAndPort, EndpointState> states = Gossiper.instance.endpointStateMap;
Awaitility.await().until(() -> !states.isEmpty());
Map.Entry<InetAddressAndPort, EndpointState> entry = states.entrySet().stream().findFirst()
.orElseThrow(AssertionError::new);
InetAddressAndPort endpoint = entry.getKey();
EndpointState localState = new EndpointState(entry.getValue());

assertThat(resultSet.size()).isEqualTo(1);
assertThat(Gossiper.instance.endpointStateMap.size()).isEqualTo(1);
Supplier<Map<InetAddressAndPort, EndpointState>> endpointStateMapSupplier = () -> new HashMap<InetAddressAndPort, EndpointState>() {{put(endpoint, localState);}};

Optional<Map.Entry<InetAddressAndPort, EndpointState>> entry = Gossiper.instance.endpointStateMap.entrySet()
.stream()
.findFirst();
assertThat(entry).isNotEmpty();
VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace("vts_2",
of(new GossipInfoTable("vts_2", endpointStateMapSupplier))));

UntypedResultSet resultSet = execute("SELECT * FROM vts_2.gossip_info");

assertThat(resultSet.size()).isEqualTo(1);
UntypedResultSet.Row row = resultSet.one();
assertThat(row.getColumns().size()).isEqualTo(64);

InetAddressAndPort endpoint = entry.get().getKey();
EndpointState localState = entry.get().getValue();

assertThat(endpoint).isNotNull();
assertThat(localState).isNotNull();
assertThat(row.getInetAddress("address")).isEqualTo(endpoint.getAddress());
assertThat(row.getInt("port")).isEqualTo(endpoint.getPort());
assertThat(row.getString("hostname")).isEqualTo(endpoint.getHostName());
assertThat(row.getInt("generation")).isEqualTo(localState.getHeartBeatState().getGeneration());
assertThat(row.getInt("heartbeat")).isNotNull();

assertValue(row, "status", localState, ApplicationState.STATUS);
assertValue(row, "load", localState, ApplicationState.LOAD);
Expand Down Expand Up @@ -140,6 +135,7 @@ public void testSelectAllWithStateTransitions() throws Throwable
assertVersion(row, "native_address_and_port_version", localState, ApplicationState.NATIVE_ADDRESS_AND_PORT);
assertVersion(row, "status_with_port_version", localState, ApplicationState.STATUS_WITH_PORT);
assertVersion(row, "sstable_versions_version", localState, ApplicationState.SSTABLE_VERSIONS);
assertVersion(row, "disk_usage_version", localState, ApplicationState.DISK_USAGE);
assertVersion(row, "x_11_padding", localState, ApplicationState.X_11_PADDING);
assertVersion(row, "x1", localState, ApplicationState.X1);
assertVersion(row, "x2", localState, ApplicationState.X2);
Expand All @@ -165,8 +161,10 @@ private void assertValue(UntypedResultSet.Row row, String column, EndpointState
{
assertThat(localState.getApplicationState(key)).as("'%s' is expected to be not-null", key)
.isNotNull();
assertThat(row.getString(column)).as("'%s' is expected to match column '%s'", key, column)
.isEqualTo(localState.getApplicationState(key).value);
String tableString = row.getString(column);
String stateString = localState.getApplicationState(key).value;
assertThat(tableString).as("'%s' is expected to match column '%s', table string: %s, state string: %s",
key, column, tableString, stateString).isEqualTo(stateString);
}
else
{
Expand All @@ -181,8 +179,12 @@ private void assertVersion(UntypedResultSet.Row row, String column, EndpointStat
{
assertThat(localState.getApplicationState(key)).as("'%s' is expected to be not-null", key)
.isNotNull();
assertThat(row.getInt(column)).as("'%s' is expected to match column '%s'", key, column)
.isEqualTo(localState.getApplicationState(key).version);

int tableVersion = row.getInt(column);
int stateVersion = localState.getApplicationState(key).version;

assertThat(tableVersion).as("'%s' is expected to match column '%s', table int: %s, state int: %s",
key, column, tableVersion, stateVersion).isEqualTo(stateVersion);
}
else
{
Expand Down

0 comments on commit 457e16c

Please sign in to comment.