diff --git a/curator-client/src/test/java/org/apache/curator/BasicTests.java b/curator-client/src/test/java/org/apache/curator/BasicTests.java index b886f3c1de..d25ae611fa 100644 --- a/curator-client/src/test/java/org/apache/curator/BasicTests.java +++ b/curator-client/src/test/java/org/apache/curator/BasicTests.java @@ -138,7 +138,7 @@ public void testReconnect() throws Exception server.stop(); Thread.sleep(1000); - server = new TestingServer(serverPort, tempDirectory); + server.restart(); Assert.assertTrue(client.blockUntilConnectedOrTimedOut()); byte[] readData = client.getZooKeeper().getData("/test", false, null); Assert.assertEquals(readData, writtenData); diff --git a/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java b/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java index c8398070ca..430f587fb3 100644 --- a/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java +++ b/curator-client/src/test/java/org/apache/curator/TestRetryLoop.java @@ -74,7 +74,7 @@ public void testRetryLoopWithFailure() throws Exception case 2: { - server = new TestingServer(serverPort, tempDirectory); + server.restart(); break; } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java index fe74a722d5..0e66f0e932 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFailedDeleteManager.java @@ -82,7 +82,7 @@ else if ( newState == ConnectionState.RECONNECTED ) timing.sleepABit(); - server = new TestingServer(server.getPort(), server.getTempDirectory()); + server.restart(); Assert.assertTrue(timing.awaitLatch(latch)); timing.sleepABit(); @@ -145,7 +145,7 @@ else if ( newState == ConnectionState.RECONNECTED ) timing.sleepABit(); - server = new TestingServer(server.getPort(), server.getTempDirectory()); + server.restart(); Assert.assertTrue(timing.awaitLatch(latch)); timing.sleepABit(); @@ -208,7 +208,7 @@ else if ( newState == ConnectionState.RECONNECTED ) timing.sleepABit(); - server = new TestingServer(server.getPort(), server.getTempDirectory()); + server.restart(); Assert.assertTrue(timing.awaitLatch(latch)); timing.sleepABit(); @@ -240,7 +240,6 @@ public void testBasic() throws Exception int serverPort = server.getPort(); server.stop(); // cause the next delete to fail - server = null; try { client.delete().forPath(PATH); @@ -251,11 +250,10 @@ public void testBasic() throws Exception // expected } - server = new TestingServer(serverPort, serverDir); + server.restart(); Assert.assertNotNull(client.checkExists().forPath(PATH)); server.stop(); // cause the next delete to fail - server = null; try { client.delete().guaranteed().forPath(PATH); @@ -266,7 +264,7 @@ public void testBasic() throws Exception // expected } - server = new TestingServer(serverPort, serverDir); + server.restart(); final int TRIES = 5; for ( int i = 0; i < TRIES; ++i ) diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java index 93d346b6b8..08a284c59f 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFramework.java @@ -266,7 +266,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) // expected } - server = new TestingServer(server.getPort(), server.getTempDirectory()); + server.restart(); try { client.setData().forPath("/test", "test".getBytes()); diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java index 4cc63303fe..9adb70f188 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkEdges.java @@ -359,7 +359,7 @@ public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleep { try { - server = new TestingServer(serverPort, tempDirectory); + server.restart(); } catch ( Exception e ) { diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index b827e15d7a..42ae74ee15 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -151,7 +151,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) Assert.assertEquals(getLeaders(latches).size(), 0); - server = new TestingServer(server.getPort(), server.getTempDirectory()); + server.restart(); Assert.assertEquals(waitForALeader(latches, timing).size(), 1); // should reconnect } finally diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java index 87306a1b4d..ec909f72f4 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderSelector.java @@ -182,7 +182,7 @@ else if ( newState == ConnectionState.LOST ) timing.sleepABit(); debugLeadershipWaitLatch.countDown(); - server = new TestingServer(server.getPort(), server.getTempDirectory()); + server.restart(); Assert.assertTrue(timing.awaitLatch(reconnectedLatch)); Assert.assertFalse(takeLeadershipLatch.await(3, TimeUnit.SECONDS)); diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java index 8b4ce4665c..3fe8110e90 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutexBase.java @@ -96,7 +96,7 @@ public Object call() throws Exception server.stop(); Assert.assertTrue(timing.awaitLatch(latch)); - server = new TestingServer(server.getPort(), server.getTempDirectory()); + server.restart(); } } finally diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingServer.java b/curator-test/src/main/java/org/apache/curator/test/TestingServer.java index fe9dade3f8..720949abc5 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingServer.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingServer.java @@ -37,47 +37,104 @@ public class TestingServer implements Closeable /** * Create the server using a random port - * - * @throws Exception errors + * + * @throws Exception + * errors */ public TestingServer() throws Exception { - this(-1, null); + this(true); + } + + /** + * Create the server using a random port + * + * @param start + * True if the server should be started, false otherwise + * @throws Exception + * errors + */ + public TestingServer(boolean start) throws Exception + { + this(-1, null, start); } /** * Create the server using the given port - * - * @param port the port - * @throws Exception errors + * + * @param port + * the port + * @throws Exception + * errors */ public TestingServer(int port) throws Exception { - this(port, null); + this(port, true); + } + + /** + * Create the server using the given port + * + * @param port + * the port + * @param start + * True if the server should be started, false otherwise + * @throws Exception + * errors + */ + public TestingServer(int port, boolean start) throws Exception + { + this(port, null, start); } /** * Create the server using the given port - * - * @param port the port - * @param tempDirectory directory to use - * @throws Exception errors + * + * @param port + * the port + * @param tempDirectory + * directory to use + * @throws Exception + * errors */ public TestingServer(int port, File tempDirectory) throws Exception { - this(new InstanceSpec(tempDirectory, port, -1, -1, true, -1)); + this(port, tempDirectory, true); } - public TestingServer(InstanceSpec spec) throws Exception + /** + * Create the server using the given port + * + * @param port + * the port + * @param tempDirectory + * directory to use + * @param start + * True if the server should be started, false otherwise + * @throws Exception + * errors + */ + public TestingServer(int port, File tempDirectory, boolean start) + throws Exception + { + this(new InstanceSpec(tempDirectory, port, -1, -1, true, -1), start); + } + + public TestingServer(InstanceSpec spec, boolean start) throws Exception { this.spec = spec; - testingZooKeeperServer = new TestingZooKeeperServer(new QuorumConfigBuilder(spec)); - testingZooKeeperServer.start(); + testingZooKeeperServer = new TestingZooKeeperServer( + new QuorumConfigBuilder(spec)); + + if (start) + { + testingZooKeeperServer.start(); + } } /** * Return the port being used - * + * * @return port */ public int getPort() @@ -87,7 +144,7 @@ public int getPort() /** * Returns the temp directory being used - * + * * @return directory */ public File getTempDirectory() @@ -95,6 +152,16 @@ public File getTempDirectory() return spec.getDataDirectory(); } + /** + * Start the server + * + * @throws Exception + */ + public void start() throws Exception + { + testingZooKeeperServer.start(); + } + /** * Stop the server without deleting the temp directory */ @@ -103,6 +170,19 @@ public void stop() throws IOException testingZooKeeperServer.stop(); } + /** + * Restart the server. If the server is currently running it will be stopped + * and restarted. If it's not currently running then it will be started. If + * it has been closed (had close() called on it) then an exception will be + * thrown. + * + * @throws Exception + */ + public void restart() throws Exception + { + testingZooKeeperServer.restart(); + } + /** * Close the server and any open clients and delete the temp directory */ @@ -114,7 +194,7 @@ public void close() throws IOException /** * Returns the connection string to use - * + * * @return connection string */ public String getConnectString() diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java index e706847c56..cb855e9e66 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java @@ -18,34 +18,34 @@ */ package org.apache.curator.test; +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerMain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicReference; /** * Thanks to Jeremie BORDIER (ahfeel) for this code */ public class TestingZooKeeperServer extends QuorumPeerMain implements Closeable { - private static final Logger logger = LoggerFactory.getLogger(TestingZooKeeperServer.class); + private static final Logger logger = LoggerFactory + .getLogger(TestingZooKeeperServer.class); private final QuorumConfigBuilder configBuilder; private final int thisInstanceIndex; private volatile ZooKeeperMainFace main; - private final AtomicReference state = new AtomicReference(State.LATENT); + private final AtomicReference state = new AtomicReference( + State.LATENT); private enum State { - LATENT, - STARTED, - STOPPED, - CLOSED + LATENT, STARTED, STOPPED, CLOSED } public TestingZooKeeperServer(QuorumConfigBuilder configBuilder) @@ -53,11 +53,13 @@ public TestingZooKeeperServer(QuorumConfigBuilder configBuilder) this(configBuilder, 0); } - public TestingZooKeeperServer(QuorumConfigBuilder configBuilder, int thisInstanceIndex) + public TestingZooKeeperServer(QuorumConfigBuilder configBuilder, + int thisInstanceIndex) { this.configBuilder = configBuilder; this.thisInstanceIndex = thisInstanceIndex; - main = (configBuilder.size() > 1) ? new TestingQuorumPeerMain() : new TestingZooKeeperMain(); + main = (configBuilder.size() > 1) ? new TestingQuorumPeerMain() + : new TestingZooKeeperMain(); } public QuorumPeer getQuorumPeer() @@ -70,32 +72,51 @@ public Collection getInstanceSpecs() return configBuilder.getInstanceSpecs(); } - public void kill() + public void kill() { main.kill(); state.set(State.STOPPED); } - public void restart() throws Exception + /** + * Restart the server. If the server is running it will be stopped and then + * started again. If it is not running (in a LATENT or STOPPED state) then + * it will be restarted. If it is in a CLOSED state then an exception will + * be thrown. + * + * @throws Exception + */ + public void restart() throws Exception { - if ( !state.compareAndSet(State.STOPPED, State.LATENT) ) + // Can't restart from a closed state as all the temporary data is gone + if (state.get() == State.CLOSED) + { + throw new IllegalStateException("Cannot restart a closed instance"); + } + + // If the server's currently running then stop it. + if (state.get() == State.STARTED) { - throw new IllegalStateException("Instance not stopped"); + stop(); } - main = (configBuilder.size() > 1) ? new TestingQuorumPeerMain() : new TestingZooKeeperMain(); + // Set to a LATENT state so we can restart + state.set(State.LATENT); + + main = (configBuilder.size() > 1) ? new TestingQuorumPeerMain() + : new TestingZooKeeperMain(); start(); } - public void stop() throws IOException + public void stop() throws IOException { - if ( state.compareAndSet(State.STARTED, State.STOPPED) ) + if (state.compareAndSet(State.STARTED, State.STOPPED)) { main.close(); } } - public InstanceSpec getInstanceSpec() + public InstanceSpec getInstanceSpec() { return configBuilder.getInstanceSpec(thisInstanceIndex); } @@ -105,10 +126,10 @@ public void close() throws IOException { stop(); - if ( state.compareAndSet(State.STOPPED, State.CLOSED) ) + if (state.compareAndSet(State.STOPPED, State.CLOSED)) { - InstanceSpec spec = getInstanceSpec(); - if ( spec.deleteDataDirectoryOnClose() ) + InstanceSpec spec = getInstanceSpec(); + if (spec.deleteDataDirectoryOnClose()) { DirectoryUtils.deleteRecursively(spec.getDataDirectory()); } @@ -117,29 +138,30 @@ public void close() throws IOException public void start() throws Exception { - if ( !state.compareAndSet(State.LATENT, State.STARTED) ) + if (!state.compareAndSet(State.LATENT, State.STARTED)) { return; } - new Thread - ( - new Runnable() + new Thread(new Runnable() + { + public void run() { - public void run() + try + { + QuorumPeerConfig config = configBuilder + .buildConfig(thisInstanceIndex); + main.runFromConfig(config); + } catch (Exception e) { - try - { - QuorumPeerConfig config = configBuilder.buildConfig(thisInstanceIndex); - main.runFromConfig(config); - } - catch ( Exception e ) - { - logger.error(String.format("From testing server (random state: %s) for instance: %s", String.valueOf(configBuilder.isFromRandom()), getInstanceSpec()), e); - } + logger.error( + String.format( + "From testing server (random state: %s) for instance: %s", + String.valueOf(configBuilder.isFromRandom()), + getInstanceSpec()), e); } } - ).start(); + }).start(); main.blockUntilStarted(); } diff --git a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java index 354b88add9..73de7fc52a 100644 --- a/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java +++ b/curator-x-discovery/src/test/java/org/apache/curator/x/discovery/TestServiceDiscovery.java @@ -26,7 +26,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.KillSession; -import org.apache.curator.test.TestingServer; import org.apache.curator.test.Timing; import org.apache.curator.x.discovery.details.JsonInstanceSerializer; import org.apache.curator.x.discovery.details.ServiceDiscoveryImpl; @@ -83,7 +82,7 @@ protected void internalRegisterService(ServiceInstance service) throws E KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString()); server.stop(); - server = new TestingServer(server.getPort(), server.getTempDirectory()); + server.restart(); closeables.add(server); timing.acquireSemaphore(semaphore, 2); @@ -129,7 +128,7 @@ protected void internalRegisterService(ServiceInstance service) throws E KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString()); server.stop(); - server = new TestingServer(server.getPort(), server.getTempDirectory()); + server.restart(); closeables.add(server); timing.acquireSemaphore(semaphore);