Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge remote-tracking branch 'upstream/master'

  • Loading branch information...
commit 1185185696c6221765ab00c6cd729b5cd4921382 2 parents 1be1a17 + f39b681
Gary Malouf authored
Showing with 428 additions and 107 deletions.
  1. +9 −0 CHANGES.txt
  2. +18 −6 curator-client/src/main/java/com/netflix/curator/utils/EnsurePath.java
  3. +24 −10 curator-client/src/test/java/com/netflix/curator/BasicTests.java
  4. +10 −0 curator-client/src/test/java/com/netflix/curator/TestEnsurePath.java
  5. +7 −0 curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCache.java
  6. +47 −12 curator-recipes/src/main/java/com/netflix/curator/framework/recipes/leader/LeaderSelector.java
  7. +44 −44 curator-recipes/src/main/java/com/netflix/curator/framework/recipes/locks/LockInternals.java
  8. +2 −2 curator-recipes/src/main/java/com/netflix/curator/framework/recipes/locks/StandardLockInternalsDriver.java
  9. +2 −2 curator-recipes/src/test/java/com/netflix/curator/framework/recipes/atomic/TestDistributedAtomicLong.java
  10. +30 −0 curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestPathChildrenCache.java
  11. +13 −8 curator-recipes/src/test/java/com/netflix/curator/framework/recipes/leader/TestLeaderSelector.java
  12. +14 −12 curator-recipes/src/test/java/com/netflix/curator/framework/recipes/leader/TestLeaderSelectorCluster.java
  13. +14 −11 curator-recipes/src/test/java/com/netflix/curator/framework/recipes/locks/TestInterProcessMutexBase.java
  14. +194 −0 curator-test/src/main/java/com/netflix/curator/test/Timing.java
View
9 CHANGES.txt
@@ -10,6 +10,15 @@ assumption was that users would create a new InterProcessMutex per thread. But,
arbitrary. For comparison, the JDK Lock doesn't have this requirement. I've fixed this however it
was a significant change internally. I'm counting on my tests to prove correctness.
+* EnsurePath wasn't doing its work in a RetryLoop.
+
+* Added a new class to the Test module, Timing, that is used to better coordinate timings in tests
+
+* LockInternals had a retry loop for all failures when it was only needed if the session expired
+and the lock node was lost. So, I refined the code to handle this specific case.
+
+* Issue 34: PathChildrenCache should ensure the path
+
1.1.2/1.0.3 - Feb. 8, 2012
==========================
* Added listener to Queues to listen for put completion
View
24 curator-client/src/main/java/com/netflix/curator/utils/EnsurePath.java
@@ -19,7 +19,9 @@
package com.netflix.curator.utils;
import com.netflix.curator.CuratorZookeeperClient;
+import com.netflix.curator.RetryLoop;
import org.apache.zookeeper.ZooKeeper;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -93,15 +95,25 @@ public void ensure(CuratorZookeeperClient client) throws Exception
private boolean isSet = false; // guarded by synchronization
@Override
- public synchronized void ensure(CuratorZookeeperClient client, String path) throws Exception
+ public synchronized void ensure(final CuratorZookeeperClient client, final String path) throws Exception
{
if ( !isSet )
{
- client.blockUntilConnectedOrTimedOut();
- ZKPaths.mkdirs(client.getZooKeeper(), path, true);
- helper.set(doNothingHelper);
-
- isSet = true;
+ RetryLoop.callWithRetry
+ (
+ client,
+ new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ ZKPaths.mkdirs(client.getZooKeeper(), path, true);
+ helper.set(doNothingHelper);
+ isSet = true;
+ return null;
+ }
+ }
+ );
}
}
}
View
34 curator-client/src/test/java/com/netflix/curator/BasicTests.java
@@ -20,6 +20,7 @@
import com.netflix.curator.retry.RetryOneTime;
import com.netflix.curator.test.KillSession;
import com.netflix.curator.test.TestingServer;
+import com.netflix.curator.test.Timing;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -28,8 +29,8 @@
import org.testng.Assert;
import org.testng.annotations.Test;
import java.io.File;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
public class BasicTests extends BaseClassForTests
{
@@ -38,6 +39,8 @@ public void testExpiredSession() throws Exception
{
// see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A4
+ final Timing timing = new Timing();
+
final CountDownLatch latch = new CountDownLatch(1);
Watcher watcher = new Watcher()
{
@@ -50,20 +53,31 @@ public void process(WatchedEvent event)
}
}
};
- final int TIMEOUT_SECONDS = 5;
- final CuratorZookeeperClient client = new CuratorZookeeperClient(server.getConnectString(), TIMEOUT_SECONDS * 1000, TIMEOUT_SECONDS * 1000, watcher, new RetryOneTime(2));
+
+ final CuratorZookeeperClient client = new CuratorZookeeperClient(server.getConnectString(), timing.session(), timing.connection(), watcher, new RetryOneTime(2));
client.start();
try
{
- client.blockUntilConnectedOrTimedOut();
- client.getZooKeeper().create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ RetryLoop.callWithRetry
+ (
+ client,
+ new Callable<Object>()
+ {
+ @Override
+ public Object call() throws Exception
+ {
+ client.getZooKeeper().create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- KillSession.kill(client.getZooKeeper(), server.getConnectString());
+ KillSession.kill(client.getZooKeeper(), server.getConnectString());
- Assert.assertTrue(latch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS));
- ZooKeeper zooKeeper = client.getZooKeeper();
- client.blockUntilConnectedOrTimedOut();
- Assert.assertNotNull(zooKeeper.exists("/foo", false));
+ Assert.assertTrue(timing.awaitLatch(latch));
+ ZooKeeper zooKeeper = client.getZooKeeper();
+ client.blockUntilConnectedOrTimedOut();
+ Assert.assertNotNull(zooKeeper.exists("/foo", false));
+ return null;
+ }
+ }
+ );
}
finally
{
View
10 curator-client/src/test/java/com/netflix/curator/TestEnsurePath.java
@@ -18,6 +18,7 @@
package com.netflix.curator;
+import com.netflix.curator.retry.RetryOneTime;
import com.netflix.curator.utils.EnsurePath;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
@@ -44,7 +45,12 @@ public void testBasic() throws Exception
{
ZooKeeper client = mock(ZooKeeper.class, Mockito.RETURNS_MOCKS);
CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class);
+ RetryPolicy retryPolicy = new RetryOneTime(1);
+ RetryLoop retryLoop = new RetryLoop(retryPolicy, null);
when(curator.getZooKeeper()).thenReturn(client);
+ when(curator.getRetryPolicy()).thenReturn(retryPolicy);
+ when(curator.newRetryLoop()).thenReturn(retryLoop);
+
Stat fakeStat = mock(Stat.class);
when(client.exists(Mockito.<String>any(), anyBoolean())).thenReturn(fakeStat);
@@ -63,8 +69,12 @@ public void testBasic() throws Exception
public void testSimultaneous() throws Exception
{
ZooKeeper client = mock(ZooKeeper.class, Mockito.RETURNS_MOCKS);
+ RetryPolicy retryPolicy = new RetryOneTime(1);
+ RetryLoop retryLoop = new RetryLoop(retryPolicy, null);
final CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class);
when(curator.getZooKeeper()).thenReturn(client);
+ when(curator.getRetryPolicy()).thenReturn(retryPolicy);
+ when(curator.newRetryLoop()).thenReturn(retryLoop);
final Stat fakeStat = mock(Stat.class);
final CountDownLatch startedLatch = new CountDownLatch(2);
View
7 curator-recipes/src/main/java/com/netflix/curator/framework/recipes/cache/PathChildrenCache.java
@@ -31,6 +31,7 @@
import com.netflix.curator.framework.listen.ListenerContainer;
import com.netflix.curator.framework.state.ConnectionState;
import com.netflix.curator.framework.state.ConnectionStateListener;
+import com.netflix.curator.utils.EnsurePath;
import com.netflix.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -67,6 +68,7 @@
private final String path;
private final ExecutorService executorService;
private final boolean cacheData;
+ private final EnsurePath ensurePath;
private final BlockingQueue<PathChildrenCacheEvent> listenerEvents = new LinkedBlockingQueue<PathChildrenCacheEvent>();
private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
@@ -134,6 +136,7 @@ public PathChildrenCache(CuratorFramework client, String path, boolean cacheData
this.path = path;
this.cacheData = cacheData;
executorService = Executors.newFixedThreadPool(1, threadFactory);
+ ensurePath = client.newNamespaceAwareEnsurePath(path);
}
/**
@@ -191,6 +194,8 @@ public void rebuild() throws Exception
{
Preconditions.checkArgument(!executorService.isShutdown());
+ ensurePath.ensure(client.getZookeeperClient());
+
List<String> children = client.getChildren().forPath(path);
for ( String child : children )
{
@@ -326,6 +331,8 @@ private void handleStateChange(ConnectionState newState)
private void refresh(final boolean forceGetDataAndStat) throws Exception
{
+ ensurePath.ensure(client.getZookeeperClient());
+
final BackgroundCallback callback = new BackgroundCallback()
{
@Override
View
59 curator-recipes/src/main/java/com/netflix/curator/framework/recipes/leader/LeaderSelector.java
@@ -54,6 +54,9 @@
private volatile boolean hasLeadership;
private volatile String id = "";
+ // guarded by synchronization
+ private boolean isQueued = false;
+
private static final ThreadFactory defaultThreadFactory = new ThreadFactoryBuilder().setNameFormat("LeaderSelector-%d").build();
/**
@@ -128,8 +131,7 @@ public String getId()
/**
* Attempt leadership. This attempt is done in the background - i.e. this method returns
- * immediately. Once you've been assigned leadership you can release it and call this method
- * again to re-obtain leadership
+ * immediately.
*/
public void start()
{
@@ -137,18 +139,37 @@ public void start()
Preconditions.checkArgument(!hasLeadership);
client.getConnectionStateListenable().addListener(listener);
- executorService.submit
- (
- new Callable<Object>()
- {
- @Override
- public Object call() throws Exception
+ requeue();
+ }
+
+ /**
+ * Re-queue an attempt for leadership. If this instance is already queued, nothing
+ * happens and false is returned. If the instance was not queued, it is re-qeued and true
+ * is returned
+ *
+ * @return true if re-queue is successful
+ */
+ public synchronized boolean requeue()
+ {
+ if ( !isQueued )
+ {
+ isQueued = true;
+ executorService.submit
+ (
+ new Callable<Object>()
{
- doWork();
- return null;
+ @Override
+ public Object call() throws Exception
+ {
+ doWork();
+ return null;
+ }
}
- }
- );
+ );
+
+ return true;
+ }
+ return false;
}
/**
@@ -269,10 +290,19 @@ public void run()
{
log.error("The leader threw an exception", e);
}
+ finally
+ {
+ clearIsQueued();
+ }
}
}
);
}
+ catch ( Exception e )
+ {
+ log.error("mutex.acquire() threw an exception", e);
+ throw e;
+ }
finally
{
hasLeadership = false;
@@ -286,4 +316,9 @@ public void run()
}
}
}
+
+ private synchronized void clearIsQueued()
+ {
+ isQueued = false;
+ }
}
View
88 curator-recipes/src/main/java/com/netflix/curator/framework/recipes/locks/LockInternals.java
@@ -19,7 +19,6 @@
package com.netflix.curator.framework.recipes.locks;
import com.google.common.collect.Lists;
-import com.netflix.curator.RetryLoop;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.api.CuratorEvent;
import com.netflix.curator.framework.api.CuratorEventType;
@@ -37,7 +36,6 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -49,7 +47,6 @@
private final String basePath;
private final CuratorListener listener;
private final EnsurePath ensurePath;
- private final Watcher watcher;
private final LockInternalsDriver driver;
private final String lockName;
private final AtomicReference<RevocationSpec> revocable = new AtomicReference<RevocationSpec>(null);
@@ -110,15 +107,6 @@ public void clean() throws Exception
ensurePath = client.newNamespaceAwareEnsurePath(basePath);
- watcher = new Watcher()
- {
- @Override
- public void process(WatchedEvent watchedEvent)
- {
- notifyFromWatcher();
- }
- };
-
listener = new CuratorListener()
{
@Override
@@ -177,53 +165,56 @@ String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Except
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
+ int retryCount = 0;
- PathAndFlag pathAndFlag = RetryLoop.callWithRetry
- (
- client.getZookeeperClient(),
- new Callable<PathAndFlag>()
+ ensurePath.ensure(client.getZookeeperClient());
+
+ String ourPath = null;
+ boolean hasTheLock = false;
+ boolean isDone = false;
+ while ( !isDone )
+ {
+ isDone = true;
+ try
{
- @Override
- public PathAndFlag call() throws Exception
+ if ( localLockNodeBytes != null )
{
- ensurePath.ensure(client.getZookeeperClient());
-
- String ourPath;
- if ( localLockNodeBytes != null )
- {
- ourPath = client.create().withProtectedEphemeralSequential().forPath(path, localLockNodeBytes);
- }
- else
+ ourPath = client.create().withProtectedEphemeralSequential().forPath(path, localLockNodeBytes);
+ }
+ else
+ {
+ ourPath = client.create().withProtectedEphemeralSequential().forPath(path);
+ }
+ hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
+ }
+ catch ( KeeperException.NoNodeException e )
+ {
+ // gets thrown by StandardLockInternalsDriver when it can't find the lock node
+ // this can happen when the session expires, etc. So, if the retry allows, just try it all again
+ if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis) )
+ {
+ isDone = false;
+ if ( ourPath != null )
{
- ourPath = client.create().withProtectedEphemeralSequential().forPath(path);
+ client.delete().inBackground().forPath(ourPath); // just in case
}
- boolean hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
- return new PathAndFlag(hasTheLock, ourPath);
+ }
+ else
+ {
+ throw e;
}
}
- );
+ }
- if ( pathAndFlag.hasTheLock )
+ if ( hasTheLock )
{
client.getCuratorListenable().addListener(listener);
- return pathAndFlag.path;
+ return ourPath;
}
return null;
}
- private static class PathAndFlag
- {
- final boolean hasTheLock;
- final String path;
-
- private PathAndFlag(boolean hasTheLock, String path)
- {
- this.hasTheLock = hasTheLock;
- this.path = path;
- }
- }
-
private void checkRevocableWatcher(String path) throws Exception
{
RevocationSpec entry = revocable.get();
@@ -268,6 +259,15 @@ private boolean internalLockLoop(long startMillis, Long millisToWait, String our
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
+ Watcher watcher = new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent event)
+ {
+ notifyFromWatcher();
+ }
+ };
+
synchronized(this)
{
Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath);
View
4 curator-recipes/src/main/java/com/netflix/curator/framework/recipes/locks/StandardLockInternalsDriver.java
@@ -52,12 +52,12 @@ public String fixForSorting(String str, String lockName)
return str;
}
- static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException.ConnectionLossException
+ static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException
{
if ( ourIndex < 0 )
{
log.error("Sequential path not found: " + sequenceNodeName);
- throw new KeeperException.ConnectionLossException(); // treat it as a kind of disconnection and just try again according to the retry policy
+ throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);
}
}
}
View
4 curator-recipes/src/test/java/com/netflix/curator/framework/recipes/atomic/TestDistributedAtomicLong.java
@@ -138,8 +138,8 @@ public Object call() throws Exception
@Test
public void testSimulation() throws Exception
{
- final int threadQty = 200;
- final int executionQty = 100;
+ final int threadQty = 20;
+ final int executionQty = 50;
final AtomicInteger optimisticTries = new AtomicInteger();
final AtomicInteger promotedLockTries = new AtomicInteger();
View
30 curator-recipes/src/test/java/com/netflix/curator/framework/recipes/cache/TestPathChildrenCache.java
@@ -25,6 +25,8 @@
import com.netflix.curator.framework.recipes.BaseClassForTests;
import com.netflix.curator.retry.RetryOneTime;
import com.netflix.curator.test.KillSession;
+import com.netflix.curator.test.Timing;
+import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
@@ -34,6 +36,34 @@
public class TestPathChildrenCache extends BaseClassForTests
{
@Test
+ public void testEnsurePath() throws Exception
+ {
+ Timing timing = new Timing();
+
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ client.start();
+ try
+ {
+ PathChildrenCache cache = new PathChildrenCache(client, "/one/two/three", false);
+ cache.start();
+ timing.sleepABit();
+
+ try
+ {
+ client.create().forPath("/one/two/three/four");
+ }
+ catch ( KeeperException.NoNodeException e )
+ {
+ Assert.fail("Path should exist", e);
+ }
+ }
+ finally
+ {
+ Closeables.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testDeleteThenCreate() throws Exception
{
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
View
21 curator-recipes/src/test/java/com/netflix/curator/framework/recipes/leader/TestLeaderSelector.java
@@ -25,7 +25,7 @@
import com.netflix.curator.framework.state.ConnectionState;
import com.netflix.curator.retry.RetryOneTime;
import com.netflix.curator.test.KillSession;
-import com.netflix.curator.test.TestingCluster;
+import com.netflix.curator.test.Timing;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.testng.internal.annotations.Sets;
@@ -88,9 +88,9 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
@Test
public void testKillSession() throws Exception
{
- final int TIMEOUT_SECONDS = 5;
+ final Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), TIMEOUT_SECONDS * 1000, TIMEOUT_SECONDS * 1000, new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
@@ -140,12 +140,17 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
leaderSelector1.start();
leaderSelector2.start();
- Assert.assertTrue(semaphore.tryAcquire(1, TIMEOUT_SECONDS * 2, TimeUnit.SECONDS));
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
- Assert.assertTrue(interruptedLatch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS));
- Assert.assertTrue(semaphore.tryAcquire(1, TIMEOUT_SECONDS * 2, TimeUnit.SECONDS));
+ Assert.assertTrue(timing.awaitLatch(interruptedLatch));
+ timing.sleepABit();
+
+ leaderSelector1.requeue();
+ leaderSelector2.requeue();
+
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
Assert.assertEquals(leaderCount.get(), 1);
leaderSelector1.close();
@@ -175,7 +180,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
- latch.await();
+ latch.await(10, TimeUnit.SECONDS);
}
});
@@ -189,7 +194,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
- latch.await();
+ latch.await(10, TimeUnit.SECONDS);
}
});
View
26 curator-recipes/src/test/java/com/netflix/curator/framework/recipes/leader/TestLeaderSelectorCluster.java
@@ -22,8 +22,10 @@
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.framework.api.UnhandledErrorListener;
import com.netflix.curator.framework.state.ConnectionState;
+import com.netflix.curator.retry.ExponentialBackoffRetry;
import com.netflix.curator.retry.RetryOneTime;
import com.netflix.curator.test.TestingCluster;
+import com.netflix.curator.test.Timing;
import com.netflix.curator.utils.ZKPaths;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -31,7 +33,6 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -41,14 +42,14 @@
@Test
public void testRestart() throws Exception
{
- final int TIMEOUT_SECONDS = 5;
+ final Timing timing = new Timing();
CuratorFramework client = null;
TestingCluster cluster = new TestingCluster(3);
cluster.start();
try
{
- client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), TIMEOUT_SECONDS * 1000, TIMEOUT_SECONDS * 1000, new RetryOneTime(1));
+ client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
final AtomicInteger errors = new AtomicInteger(0);
@@ -87,14 +88,14 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
};
LeaderSelector selector = new LeaderSelector(client, "/leader", listener);
selector.start();
- Assert.assertTrue(semaphore.tryAcquire(TIMEOUT_SECONDS * 4, TimeUnit.SECONDS));
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
TestingCluster.InstanceSpec connectionInstance = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
cluster.killServer(connectionInstance);
- Assert.assertTrue(reconnectedLatch.await(TIMEOUT_SECONDS * 4, TimeUnit.SECONDS));
+ Assert.assertTrue(timing.awaitLatch(reconnectedLatch));
selector.start();
- Assert.assertTrue(semaphore.tryAcquire(TIMEOUT_SECONDS * 4, TimeUnit.SECONDS));
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
Assert.assertEquals(errors.get(), 0);
}
@@ -108,15 +109,16 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
@Test
public void testLostRestart() throws Exception
{
- final int TIMEOUT_SECONDS = 5;
+ final Timing timing = new Timing();
CuratorFramework client = null;
TestingCluster cluster = new TestingCluster(3);
cluster.start();
try
{
- client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), TIMEOUT_SECONDS * 1000, TIMEOUT_SECONDS * 1000, new RetryOneTime(1));
+ client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(10, 3));
client.start();
+ client.sync("/", null);
final AtomicReference<AssertionError> error = new AtomicReference<AssertionError>(null);
final AtomicReference<String> lockNode = new AtomicReference<String>(null);
@@ -135,7 +137,7 @@ public void takeLeadership(CuratorFramework client) throws Exception
lockNode.set(names.get(0));
semaphore.release();
- Assert.assertTrue(internalLostLatch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS));
+ Assert.assertTrue(timing.awaitLatch(internalLostLatch));
lostLatch.countDown();
}
catch ( AssertionError e )
@@ -157,7 +159,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
};
LeaderSelector selector = new LeaderSelector(client, "/leader", listener);
selector.start();
- Assert.assertTrue(semaphore.tryAcquire(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS));
+ Assert.assertTrue(timing.multiple(4).acquireSemaphore(semaphore));
if ( error.get() != null )
{
throw new AssertionError(error.get());
@@ -167,7 +169,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
cluster.close();
cluster = null;
- Assert.assertTrue(lostLatch.await(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS));
+ Assert.assertTrue(timing.awaitLatch(lostLatch));
Assert.assertNotNull(lockNode.get());
@@ -184,7 +186,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
}
selector.start();
- Assert.assertTrue(semaphore.tryAcquire(TIMEOUT_SECONDS * 2, TimeUnit.SECONDS));
+ Assert.assertTrue(timing.acquireSemaphore(semaphore));
if ( error.get() != null )
{
throw new AssertionError(error.get());
View
25 curator-recipes/src/test/java/com/netflix/curator/framework/recipes/locks/TestInterProcessMutexBase.java
@@ -24,11 +24,13 @@
import com.netflix.curator.framework.recipes.BaseClassForTests;
import com.netflix.curator.retry.RetryOneTime;
import com.netflix.curator.test.KillSession;
+import com.netflix.curator.test.Timing;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -47,49 +49,50 @@
@Test
public void testKilledSession() throws Exception
{
- final int TIMEOUT_SECONDS = 5000;
+ final Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), TIMEOUT_SECONDS * 1000, TIMEOUT_SECONDS * 1000, new RetryOneTime(1));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
final InterProcessLock mutex1 = makeLock(client);
final InterProcessLock mutex2 = makeLock(client);
- final Semaphore acquireSemaphore = new Semaphore(0);
- Executors.newSingleThreadExecutor().submit
+ final Semaphore semaphore = new Semaphore(0);
+ ExecutorCompletionService<Object> service = new ExecutorCompletionService<Object>(Executors.newFixedThreadPool(2));
+ service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
- mutex1.acquire(10, TimeUnit.SECONDS);
- acquireSemaphore.release();
+ mutex1.acquire();
+ semaphore.release();
Thread.sleep(1000000);
return null;
}
}
);
- Executors.newSingleThreadExecutor().submit
+ service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
- mutex2.acquire(10, TimeUnit.SECONDS);
- acquireSemaphore.release();
+ mutex2.acquire();
+ semaphore.release();
Thread.sleep(1000000);
return null;
}
}
);
- Assert.assertTrue(acquireSemaphore.tryAcquire(1, TIMEOUT_SECONDS * 2, TimeUnit.SECONDS));
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
KillSession.kill(client.getZookeeperClient().getZooKeeper(), server.getConnectString());
- Assert.assertTrue(acquireSemaphore.tryAcquire(1, TIMEOUT_SECONDS * 2, TimeUnit.SECONDS));
+ Assert.assertTrue(timing.acquireSemaphore(semaphore, 1));
}
finally
{
View
194 curator-test/src/main/java/com/netflix/curator/test/Timing.java
@@ -0,0 +1,194 @@
+/*
+ *
+ * Copyright 2011 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.netflix.curator.test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility to get various testing times
+ */
+public class Timing
+{
+ private final long value;
+ private final TimeUnit unit;
+
+ private static final int DEFAULT_SECONDS = 5;
+
+ /**
+ * Use the default base time
+ */
+ public Timing()
+ {
+ value = DEFAULT_SECONDS;
+ unit = TimeUnit.SECONDS;
+ }
+
+ /**
+ * Use a multiple of the default base time
+ *
+ * @param multiple the multiple
+ */
+ public Timing(int multiple)
+ {
+ value = DEFAULT_SECONDS * multiple;
+ unit = TimeUnit.SECONDS;
+ }
+
+ /**
+ * @param value base time
+ * @param unit base time unit
+ */
+ public Timing(long value, TimeUnit unit)
+ {
+ this.value = value;
+ this.unit = unit;
+ }
+
+ /**
+ * Return the base time in milliseconds
+ *
+ * @return time ms
+ */
+ public int milliseconds()
+ {
+ return (int)TimeUnit.MILLISECONDS.convert(value, unit);
+ }
+
+ /**
+ * Return the base time in seconds
+ *
+ * @return time secs
+ */
+ public int seconds()
+ {
+ return (int)value;
+ }
+
+ /**
+ * Wait on the given latch
+ *
+ * @param latch latch to wait on
+ * @return result of {@link CountDownLatch#await(long, TimeUnit)}
+ */
+ public boolean awaitLatch(CountDownLatch latch)
+ {
+ Timing m = waitingMultiple();
+ try
+ {
+ return latch.await(m.value, m.unit);
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+
+ /**
+ * Wait on the given semaphore
+ *
+ * @param semaphore the semaphore
+ * @return result of {@link Semaphore#tryAcquire()}
+ */
+ public boolean acquireSemaphore(Semaphore semaphore)
+ {
+ Timing m = waitingMultiple();
+ try
+ {
+ return semaphore.tryAcquire(m.value, m.unit);
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+
+ /**
+ * Wait on the given semaphore
+ *
+ * @param semaphore the semaphore
+ * @param n number of permits to acquire
+ * @return result of {@link Semaphore#tryAcquire(int, long, TimeUnit)}
+ */
+ public boolean acquireSemaphore(Semaphore semaphore, int n)
+ {
+ Timing m = waitingMultiple();
+ try
+ {
+ return semaphore.tryAcquire(n, m.value, m.unit);
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+
+ /**
+ * Return a new timing that is a multiple of the this timing
+ *
+ * @param n the multiple
+ * @return this timing times the multiple
+ */
+ public Timing multiple(int n)
+ {
+ return new Timing(value * n, unit);
+ }
+
+ /**
+ * Return a new timing with the standard multiple for waiting on latches, etc.
+ *
+ * @return this timing multiplied
+ */
+ public Timing waitingMultiple()
+ {
+ return multiple(4);
+ }
+
+ /**
+ * Sleep for a small amount of time
+ *
+ * @throws InterruptedException if interrupted
+ */
+ public void sleepABit() throws InterruptedException
+ {
+ unit.sleep(value / 4);
+ }
+
+ /**
+ * Return the value to use for ZK session timeout
+ * @return session timeout
+ */
+ public int session()
+ {
+ return multiple(10).milliseconds();
+ }
+
+ /**
+ * Return the value to use for ZK connection timeout
+ * @return connection timeout
+ */
+ public int connection()
+ {
+ return milliseconds();
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.