Skip to content

Commit

Permalink
JAVA-2055: Fix intermittent test failures (#1147)
Browse files Browse the repository at this point in the history
* NodeTargetingIT: Resolve Node to query based on address instead of position
* NodeTargetingIT: Wait for node to be marked down before proceeding
* MetricsIT: Check up to 5 seconds for CQL_REQUESTS metric to be fully incremented
* SpeculativeExecutionIT: Increase base specex delay
* Multiple: Stabilize tests by introducing QueryCounter
* SpeculativeExecutionIT: Set lbp when building session with profile
* NodeTargetingIT: Handle race case where node is down but connection is not removed yet
* NodeStateIT: Skip initial up events if fired
* NodeStateIT: Wait up to 500ms for event
  • Loading branch information
tolbertam committed Dec 13, 2018
1 parent 6794c38 commit f09258d
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 207 deletions.
Expand Up @@ -28,15 +28,17 @@
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule;
import com.datastax.oss.driver.api.testinfra.utils.ConditionChecker;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
import com.datastax.oss.simulacron.common.codec.ConsistencyLevel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import com.datastax.oss.simulacron.server.BoundNode;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -58,15 +60,15 @@ public void clear() {
simulacron.cluster().clearLogs();
simulacron.cluster().clearPrimes(true);
simulacron.cluster().node(4).stop();
ConditionChecker.checkThat(() -> getNode(4).getState() == NodeState.DOWN)
.before(5, TimeUnit.SECONDS);
}

@Test
public void should_use_node_on_statement() {
Collection<Node> nodeCol = sessionRule.session().getMetadata().getNodes().values();
List<Node> nodes = new ArrayList<>(nodeCol);
for (int i = 0; i < 10; i++) {
int nodeIndex = i % 3 + 1;
Node node = nodes.get(nodeIndex);
Node node = getNode(nodeIndex);

// given a statement with node explicitly set.
Statement statement = SimpleStatement.newInstance("select * system.local").setNode(node);
Expand All @@ -82,30 +84,25 @@ public void should_use_node_on_statement() {
@Test
public void should_fail_if_node_fails_query() {
String query = "mock";
Collection<Node> nodeCol = sessionRule.session().getMetadata().getNodes().values();
List<Node> nodes = new ArrayList<>(nodeCol);
simulacron.cluster().node(3).prime(when(query).then(unavailable(ConsistencyLevel.ALL, 1, 0)));

// given a statement with a node configured to fail the given query.
Node node1 = nodes.get(3);
Statement statement = SimpleStatement.newInstance(query).setNode(node1);
Node node3 = getNode(3);
Statement statement = SimpleStatement.newInstance(query).setNode(node3);
// when statement is executed an error should be raised.
try {
sessionRule.session().execute(statement);
fail("Should have thrown AllNodesFailedException");
} catch (AllNodesFailedException e) {
assertThat(e.getErrors().size()).isEqualTo(1);
assertThat(e.getErrors().get(node1)).isInstanceOf(UnavailableException.class);
assertThat(e.getErrors().get(node3)).isInstanceOf(UnavailableException.class);
}
}

@Test
public void should_fail_if_node_is_not_connected() {
// given a statement with node explicitly set that for which we have no active pool.

Collection<Node> nodeCol = sessionRule.session().getMetadata().getNodes().values();
List<Node> nodes = new ArrayList<>(nodeCol);
Node node4 = nodes.get(4);
Node node4 = getNode(4);

Statement statement = SimpleStatement.newInstance("select * system.local").setNode(node4);
try {
Expand All @@ -114,6 +111,21 @@ public void should_fail_if_node_is_not_connected() {
fail("Query should have failed");
} catch (NoNodeAvailableException e) {
assertThat(e.getErrors()).isEmpty();
} catch (AllNodesFailedException e) {
// its also possible that the query is tried. This can happen if the node was marked
// down, but not all connections have been closed yet. In this case, just verify that
// the expected host failed.
assertThat(e.getErrors().size()).isEqualTo(1);
assertThat(e.getErrors()).containsOnlyKeys(node4);
}
}

private Node getNode(int id) {
BoundNode boundNode = simulacron.cluster().node(id);
assertThat(boundNode).isNotNull();
InetSocketAddress address = (InetSocketAddress) boundNode.getAddress();
Node node = sessionRule.session().getMetadata().getNodes().get(address);
assertThat(node).isNotNull();
return node;
}
}
Expand Up @@ -21,7 +21,7 @@
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
import com.datastax.oss.driver.api.testinfra.ccm.CcmBridge;
import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.api.testinfra.utils.ConditionChecker;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.driver.internal.core.context.EventBus;
Expand All @@ -31,55 +31,49 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

@Category(ParallelizableTests.class)
public class NodeMetadataIT {

private static CcmRule ccmRule = CcmRule.getInstance();

private static SessionRule<CqlSession> sessionRule = SessionRule.builder(ccmRule).build();

@ClassRule public static TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule);
@ClassRule public static CcmRule ccmRule = CcmRule.getInstance();

@Test
public void should_expose_node_metadata() {
CqlSession session = sessionRule.session();
Node node = getUniqueNode(session);
try (CqlSession session = SessionUtils.newSession(ccmRule)) {
Node node = getUniqueNode(session);
// Run a few basic checks given what we know about our test environment:
assertThat(node.getConnectAddress()).isNotNull();
node.getBroadcastAddress()
.ifPresent(
broadcastAddress ->
assertThat(broadcastAddress.getAddress())
.isEqualTo(node.getConnectAddress().getAddress()));
assertThat(node.getListenAddress().get().getAddress())
.isEqualTo(node.getConnectAddress().getAddress());
assertThat(node.getDatacenter()).isEqualTo("dc1");
assertThat(node.getRack()).isEqualTo("r1");
if (!CcmBridge.DSE_ENABLEMENT) {
// CcmBridge does not report accurate C* versions for DSE, only approximated values
assertThat(node.getCassandraVersion()).isEqualTo(ccmRule.getCassandraVersion());
}
assertThat(node.getState()).isSameAs(NodeState.UP);
assertThat(node.getDistance()).isSameAs(NodeDistance.LOCAL);
assertThat(node.getHostId()).isNotNull();
assertThat(node.getSchemaVersion()).isNotNull();
long upTime1 = node.getUpSinceMillis();
assertThat(upTime1).isGreaterThan(-1);

// Run a few basic checks given what we know about our test environment:
assertThat(node.getConnectAddress()).isNotNull();
node.getBroadcastAddress()
.ifPresent(
broadcastAddress ->
assertThat(broadcastAddress.getAddress())
.isEqualTo(node.getConnectAddress().getAddress()));
assertThat(node.getListenAddress().get().getAddress())
.isEqualTo(node.getConnectAddress().getAddress());
assertThat(node.getDatacenter()).isEqualTo("dc1");
assertThat(node.getRack()).isEqualTo("r1");
if (!CcmBridge.DSE_ENABLEMENT) {
// CcmBridge does not report accurate C* versions for DSE, only approximated values
assertThat(node.getCassandraVersion()).isEqualTo(ccmRule.getCassandraVersion());
}
assertThat(node.getState()).isSameAs(NodeState.UP);
assertThat(node.getDistance()).isSameAs(NodeDistance.LOCAL);
assertThat(node.getHostId()).isNotNull();
assertThat(node.getSchemaVersion()).isNotNull();
long upTime1 = node.getUpSinceMillis();
assertThat(upTime1).isGreaterThan(-1);
// Note: open connections and reconnection status are covered in NodeStateIT

// Note: open connections and reconnection status are covered in NodeStateIT

// Force the node down and back up to check that upSinceMillis gets updated
EventBus eventBus = ((InternalDriverContext) session.getContext()).getEventBus();
eventBus.fire(TopologyEvent.forceDown(node.getConnectAddress()));
ConditionChecker.checkThat(() -> node.getState() == NodeState.FORCED_DOWN).becomesTrue();
assertThat(node.getUpSinceMillis()).isEqualTo(-1);
eventBus.fire(TopologyEvent.forceUp(node.getConnectAddress()));
ConditionChecker.checkThat(() -> node.getState() == NodeState.UP).becomesTrue();
assertThat(node.getUpSinceMillis()).isGreaterThan(upTime1);
// Force the node down and back up to check that upSinceMillis gets updated
EventBus eventBus = ((InternalDriverContext) session.getContext()).getEventBus();
eventBus.fire(TopologyEvent.forceDown(node.getConnectAddress()));
ConditionChecker.checkThat(() -> node.getState() == NodeState.FORCED_DOWN).becomesTrue();
assertThat(node.getUpSinceMillis()).isEqualTo(-1);
eventBus.fire(TopologyEvent.forceUp(node.getConnectAddress()));
ConditionChecker.checkThat(() -> node.getState() == NodeState.UP).becomesTrue();
assertThat(node.getUpSinceMillis()).isGreaterThan(upTime1);
}
}

private static Node getUniqueNode(CqlSession session) {
Expand Down
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -112,8 +113,24 @@ public class NodeStateIT {
public void setup() {
inOrder = Mockito.inOrder(nodeStateListener);

AtomicBoolean nonInitialEvent = new AtomicBoolean(false);
driverContext = (InternalDriverContext) sessionRule.session().getContext();
driverContext.getEventBus().register(NodeStateEvent.class, stateEvents::add);
driverContext
.getEventBus()
.register(
NodeStateEvent.class,
(e) -> {
// Skip transition from unknown to up if we haven't received any other events,
// these may just be the initial events that have typically fired by now, but
// may not have depending on timing.
if (!nonInitialEvent.get()
&& e.oldState == NodeState.UNKNOWN
&& e.newState == NodeState.UP) {
return;
}
nonInitialEvent.set(true);
stateEvents.add(e);
});

defaultLoadBalancingPolicy =
(ConfigurableIgnoresPolicy)
Expand Down Expand Up @@ -330,7 +347,7 @@ public void should_force_immediate_reconnection_when_up_topology_event()
(DefaultNode)
session.getMetadata().getNodes().get(localSimulacronNode.inetSocketAddress());
// UP fired a first time as part of the init process
Mockito.verify(localNodeStateListener).onUp(localMetadataNode);
Mockito.verify(localNodeStateListener, timeout(500)).onUp(localMetadataNode);

localSimulacronNode.stop();

Expand Down
Expand Up @@ -25,9 +25,11 @@
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.api.testinfra.utils.ConditionChecker;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -50,16 +52,24 @@ public void should_expose_metrics() {
session.execute("SELECT release_version FROM system.local");
}

assertThat(session.getMetrics())
.hasValueSatisfying(
metrics ->
assertThat(metrics.<Timer>getSessionMetric(DefaultSessionMetric.CQL_REQUESTS))
.hasValueSatisfying(
cqlRequests -> {
// No need to be very sophisticated, metrics are already covered
// individually in unit tests.
assertThat(cqlRequests.getCount()).isEqualTo(10);
}));
// Should have 10 requests, check within 5 seconds as metric increments after
// caller is notified.
ConditionChecker.checkThat(
() -> {
assertThat(session.getMetrics())
.hasValueSatisfying(
metrics ->
assertThat(
metrics.<Timer>getSessionMetric(
DefaultSessionMetric.CQL_REQUESTS))
.hasValueSatisfying(
cqlRequests -> {
// No need to be very sophisticated, metrics are already
// covered individually in unit tests.
assertThat(cqlRequests.getCount()).isEqualTo(10);
}));
})
.before(5, TimeUnit.SECONDS);
}
}

Expand Down
Expand Up @@ -36,6 +36,7 @@
import com.datastax.oss.driver.api.testinfra.loadbalancing.SortingLoadBalancingPolicy;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.api.testinfra.simulacron.QueryCounter;
import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoaderBuilder;
Expand Down Expand Up @@ -84,6 +85,12 @@ public class PerProfileRetryPolicyIT {
private static String QUERY_STRING = "select * from foo";
private static final SimpleStatement QUERY = SimpleStatement.newInstance(QUERY_STRING);

@SuppressWarnings("deprecation")
private final QueryCounter counter =
QueryCounter.builder(simulacron.cluster())
.withFilter((l) -> l.getQuery().equals(QUERY_STRING))
.build();

@Before
public void clear() {
simulacron.cluster().clearLogs();
Expand Down Expand Up @@ -139,21 +146,7 @@ public void should_use_policy_from_config_when_not_configured_in_request_profile
assertThat(errors).hasSize(1);
assertThat(errors.get(0).getValue()).isInstanceOf(UnavailableException.class);

assertQueryCount(0, 1);
assertQueryCount(1, 1);
}

private void assertQueryCount(int node, int expected) {
assertThat(
simulacron
.cluster()
.node(node)
.getLogs()
.getQueryLogs()
.stream()
.filter(l -> l.getQuery().equals(QUERY_STRING)))
.as("Expected query count to be %d for node %d", expected, node)
.hasSize(expected);
counter.assertNodeCounts(1, 1);
}

// A policy that simply rethrows always.
Expand Down

0 comments on commit f09258d

Please sign in to comment.