Skip to content

Commit

Permalink
Additional hardening of tests to reduce the number of intermittent
Browse files Browse the repository at this point in the history
BindException errors due to a TOCTOU issue with getLocalCluster.

test/integration/voldemort/performance/RoutedStoreParallelismTest.java
- switched to startVoldemortCluster

test/unit/voldemort/server/gossip/GossiperTest.java
- hand-coded test-specific startParallelVoldemortCluster. Not pretty. Not pretty at all. But, should retry in the face of such exceptions.

Switched TODO to comment about possible susceptability to BindExceptions:
- test/unit/voldemort/client/rebalance/RebalanceTest.java
- test/unit/voldemort/scheduled/StreamingSlopPusherTest.java
  • Loading branch information
jayjwylie committed Oct 16, 2012
1 parent de66f98 commit 7c793f6
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 67 deletions.
Expand Up @@ -29,7 +29,6 @@
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import voldemort.ServerTestUtils;
import voldemort.TestUtils;
import voldemort.VoldemortException;
import voldemort.client.ClientConfig;
import voldemort.cluster.Cluster;
Expand All @@ -40,7 +39,6 @@
import voldemort.cluster.failuredetector.FailureDetectorUtils;
import voldemort.cluster.failuredetector.MutableStoreVerifier;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.VoldemortServer;
import voldemort.store.SleepyStore;
import voldemort.store.Store;
Expand Down Expand Up @@ -141,15 +139,6 @@ public static void main(String[] args) throws Throwable {
ClientConfig clientConfig = new ClientConfig().setMaxConnectionsPerNode(maxConnectionsPerNode)
.setMaxThreads(maxThreads);

Map<Integer, VoldemortServer> serverMap = new HashMap<Integer, VoldemortServer>();

int[][] partitionMap = new int[numNodes][1];

for(int i = 0; i < numNodes; i++) {
partitionMap[i][0] = i;
}

Cluster cluster = ServerTestUtils.getLocalCluster(numNodes, partitionMap);
String storeDefinitionFile = "test/common/voldemort/config/single-store.xml";
StoreDefinition storeDefinition = new StoreDefinitionsMapper().readStoreList(new File(storeDefinitionFile))
.get(0);
Expand All @@ -161,33 +150,31 @@ public static void main(String[] args) throws Throwable {
clientConfig.getSocketBufferSize(),
clientConfig.getSocketKeepAlive());

// TODO: add a variant of ServerTestUtils.startVoldemortCluster that
// accepts StoreDefinitions in the interface.
for(int i = 0; i < cluster.getNumberOfNodes(); i++) {
VoldemortConfig config = ServerTestUtils.createServerConfig(true,
i,
TestUtils.createTempDir()
.getAbsolutePath(),
null,
storeDefinitionFile,
new Properties());

VoldemortServer server = ServerTestUtils.startVoldemortServer(socketStoreFactory,
config,
cluster);
serverMap.put(i, server);
VoldemortServer[] servers = new VoldemortServer[numNodes];
int[][] partitionMap = new int[numNodes][1];
for(int i = 0; i < numNodes; i++) {
partitionMap[i][0] = i;
}
Cluster cluster = ServerTestUtils.startVoldemortCluster(numNodes,
servers,
partitionMap,
socketStoreFactory,
true,
null,
storeDefinitionFile,
new Properties());

Map<Integer, VoldemortServer> serverMap = new HashMap<Integer, VoldemortServer>();
for(int i = 0; i < cluster.getNumberOfNodes(); i++) {
serverMap.put(i, servers[i]);
Store<ByteArray, byte[], byte[]> store = new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test-sleepy");

if(i < numSlowNodes)
store = new SleepyStore<ByteArray, byte[], byte[]>(delay, store);

StoreRepository storeRepository = server.getStoreRepository();
StoreRepository storeRepository = servers[i].getStoreRepository();
storeRepository.addLocalStore(store);
}

Map<Integer, Store<ByteArray, byte[], byte[]>> stores = new HashMap<Integer, Store<ByteArray, byte[], byte[]>>();

for(Node node: cluster.getNodes()) {
Store<ByteArray, byte[], byte[]> socketStore = ServerTestUtils.getSocketStore(socketStoreFactory,
"test-sleepy",
Expand Down
3 changes: 2 additions & 1 deletion test/unit/voldemort/client/rebalance/RebalanceTest.java
Expand Up @@ -81,7 +81,8 @@ protected Cluster getCurrentCluster(int nodeId) {
}
}

// TODO: refactor to take advantage of ServerTestUtils.startVoldemortCluster
// This method may be susceptible to BindException issues due to TOCTOU
// problem with getLocalCluster.
@Override
protected Cluster startServers(Cluster cluster,
String storeXmlFile,
Expand Down
3 changes: 2 additions & 1 deletion test/unit/voldemort/scheduled/StreamingSlopPusherTest.java
Expand Up @@ -78,7 +78,8 @@ public void setUp() throws Exception {
}
}

// TODO: Refactor to take advantage of ServerTestUtils.startVoldemortCluster
// This method may be susceptible to BindException issues due to TOCTOU
// problem with getLocalCluster.
private void startServers(int... nodeIds) {
for(int nodeId: nodeIds) {
if(nodeId < NUM_SERVERS) {
Expand Down
95 changes: 60 additions & 35 deletions test/unit/voldemort/server/gossip/GossiperTest.java
Expand Up @@ -45,7 +45,6 @@ public class GossiperTest {

private List<VoldemortServer> servers = new ArrayList<VoldemortServer>();
private Cluster cluster;
private Properties props = new Properties();
private static final int socketBufferSize = 4096;
private static final int adminSocketBufferSize = 8192;
private SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2,
Expand All @@ -54,6 +53,8 @@ public class GossiperTest {
socketBufferSize);
private static String storesXmlfile = "test/common/voldemort/config/stores.xml";
private final boolean useNio;
private CountDownLatch countDownLatch;
final private Properties props = new Properties();

public GossiperTest(boolean useNio) {
this.useNio = useNio;
Expand All @@ -64,22 +65,11 @@ public static Collection<Object[]> configs() {
return Arrays.asList(new Object[][] { { false }, { true } });
}

@Before
public void setUp() {
props.put("enable.gossip", "true");
props.put("gossip.interval.ms", "250");
props.put("socket.buffer.size", String.valueOf(socketBufferSize));
props.put("admin.streams.buffer.size", String.valueOf(adminSocketBufferSize));

// Start all in parallel to avoid exceptions during gossip

private void attemptParallelClusterStart(ExecutorService executorService) {
// Start all servers in parallel to avoid exceptions during gossip.
cluster = ServerTestUtils.getLocalCluster(3, new int[][] { { 0, 1, 2, 3 }, { 4, 5, 6, 7 },
{ 8, 9, 10, 11 } });
ExecutorService executorService = Executors.newFixedThreadPool(3);
final CountDownLatch countDownLatch = new CountDownLatch(3);

// TODO: Add a variant of ServerTestUtils.startVoldemortCluster that
// starts servers in parallel?
for(int i = 0; i < 3; i++) {
final int j = i;
executorService.submit(new Runnable() {
Expand All @@ -95,16 +85,41 @@ public void run() {
storesXmlfile,
props),
cluster));
countDownLatch.countDown();
} catch(IOException e) {
} catch(IOException ioe) {
logger.error("Caught IOException during parallel server start: "
+ e.getMessage());
e.printStackTrace();
throw new RuntimeException();
+ ioe.getMessage());
RuntimeException re = new RuntimeException();
re.initCause(ioe);
throw re;
} finally {
// Ensure setup progresses in face of errors
countDownLatch.countDown();
}
}
});
}
}

@Before
public void setUp() {
props.put("enable.gossip", "true");
props.put("gossip.interval.ms", "250");
props.put("socket.buffer.size", String.valueOf(socketBufferSize));
props.put("admin.streams.buffer.size", String.valueOf(adminSocketBufferSize));

ExecutorService executorService = Executors.newFixedThreadPool(3);
countDownLatch = new CountDownLatch(3);

boolean clusterStarted = false;
while(!clusterStarted) {
try {
attemptParallelClusterStart(executorService);
clusterStarted = true;
} catch(RuntimeException re) {
logger.info("Some server thread threw a RuntimeException. Will print out stacktrace and then try again. Assumption is that the RuntimeException is due to BindException that in turn is due to TOCTOU issue with getLocalCluster");
re.printStackTrace();
}
}

try {
countDownLatch.await();
Expand All @@ -122,13 +137,9 @@ private AdminClient getAdminClient(Cluster newCluster) {
return new AdminClient(newCluster, new AdminClientConfig());
}

// Protect against this test running forever until the root cause of running
// forever is found.
@Test(timeout = 1800)
public void testGossiper() throws Exception {
// First create a new cluster:
// Allocate ports for all nodes in the new cluster, to match existing
// cluster
private Cluster attemptStartAdditionalServer() throws IOException {
// Set up a new cluster that is one bigger than the original cluster

int originalSize = cluster.getNumberOfNodes();
int numOriginalPorts = originalSize * 3;
int ports[] = new int[numOriginalPorts + 3];
Expand Down Expand Up @@ -162,13 +173,27 @@ public void testGossiper() throws Exception {
storesXmlfile,
props),
newCluster);
// This step is only reached if startVoldemortServer does *not* throw a
// BindException due to TOCTOU problem with getLocalCluster
servers.add(newServer);
return newCluster;
}

// Wait a while until the new server starts
try {
Thread.sleep(500);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
// Protect against this test running forever until the root cause of running
// forever is found.
@Test(timeout = 1800)
public void testGossiper() throws Exception {
Cluster newCluster = null;

boolean startedAdditionalServer = false;
while(!startedAdditionalServer) {
try {
newCluster = attemptStartAdditionalServer();
startedAdditionalServer = true;
} catch(IOException ioe) {
logger.warn("Caught an IOException when attempting to start additional server. Will print stacktrace and then attempt to start additional server again.");
ioe.printStackTrace();
}
}

// Get the new cluster.xml
Expand All @@ -178,8 +203,7 @@ public void testGossiper() throws Exception {
MetadataStore.CLUSTER_KEY);

// Increment the version, let what would be the "donor node" know about
// it
// to seed the Gossip.
// it to seed the Gossip.
Version version = versionedClusterXML.getVersion();
((VectorClock) version).incrementVersion(3, ((VectorClock) version).getTimestamp() + 1);
((VectorClock) version).incrementVersion(0, ((VectorClock) version).getTimestamp() + 1);
Expand All @@ -194,6 +218,7 @@ public void testGossiper() throws Exception {
}

// Wait up to five seconds for Gossip to spread
final Cluster newFinalCluster = newCluster;
try {
TestUtils.assertWithBackoff(5000, new Attempt() {

Expand All @@ -206,11 +231,11 @@ public void checkCondition() {
assertEquals("server " + nodeId + " has heard "
+ " the gossip about number of nodes",
clusterAtServer.getNumberOfNodes(),
newCluster.getNumberOfNodes());
newFinalCluster.getNumberOfNodes());
assertEquals("server " + nodeId + " has heard "
+ " the gossip about partitions",
clusterAtServer.getNodeById(nodeId).getPartitionIds(),
newCluster.getNodeById(nodeId).getPartitionIds());
newFinalCluster.getNodeById(nodeId).getPartitionIds());
serversSeen++;
}
assertEquals("saw all servers", serversSeen, servers.size());
Expand Down

0 comments on commit 7c793f6

Please sign in to comment.