Permalink
Browse files

Added a different reaping mode

  • Loading branch information...
Randgalt committed Jun 22, 2012
1 parent 8cae5ef commit 19988f7ee6fa010752473cfc985089e40123ea6b
@@ -36,10 +36,12 @@
{
private final String path;
private final long expirationMs;
+ private final Mode mode;
- private PathHolder(String path, int delayMs)
+ private PathHolder(String path, int delayMs, Mode mode)
{
this.path = path;
+ this.mode = mode;
this.expirationMs = System.currentTimeMillis() + delayMs;
}
@@ -86,6 +88,12 @@ public int hashCode()
}
}
+ public enum Mode
+ {
+ REAP_INDEFINITELY,
+ REAP_UNTIL_DELETE
+ }
+
/**
* Uses the default reaping threshold of 5 minutes and creates an internal thread pool
*
@@ -120,14 +128,26 @@ public Reaper(CuratorFramework client, ExecutorService executor, int reapingThre
}
/**
- * Add a path to be checked by the reaper. The path will be checked periodically
+ * Add a path (using Mode.REAP_INDEFINITELY) to be checked by the reaper. The path will be checked periodically
* until the path is removed of the reaper is closed.
*
* @param path path to check
*/
public void addPath(String path)
{
- queue.add(new PathHolder(path, reapingThresholdMs));
+ addPath(path, Mode.REAP_INDEFINITELY);
+ }
+
+ /**
+ * Add a path to be checked by the reaper. The path will be checked periodically
+ * until the path is removed of the reaper is closed.
+ *
+ * @param path path to check
+ * @param mode reaping mode
+ */
+ public void addPath(String path, Mode mode)
+ {
+ queue.add(new PathHolder(path, reapingThresholdMs, mode));
}
/**
@@ -138,7 +158,7 @@ public void addPath(String path)
*/
public boolean removePath(String path)
{
- return queue.remove(new PathHolder(path, reapingThresholdMs));
+ return queue.remove(new PathHolder(path, reapingThresholdMs, null));
}
/**
@@ -159,7 +179,8 @@ public Void call() throws Exception
{
while ( !Thread.currentThread().isInterrupted() )
{
- reap(queue.take().path);
+ PathHolder holder = queue.take();
+ reap(holder.path, holder.mode);
}
}
catch ( InterruptedException e )
@@ -186,8 +207,9 @@ public void close() throws IOException
}
}
- private void reap(String path)
+ private void reap(String path, Mode mode)
{
+ boolean addBack = true;
try
{
Stat stat = client.checkExists().forPath(path);
@@ -199,6 +221,10 @@ private void reap(String path)
{
client.delete().forPath(path);
log.info("Reaping path: " + path);
+ if ( mode == Mode.REAP_UNTIL_DELETE )
+ {
+ addBack = false;
+ }
}
catch ( KeeperException.NoNodeException ignore )
{
@@ -216,7 +242,7 @@ private void reap(String path)
log.error("Trying to reap: " + path, e);
}
- if ( !Thread.currentThread().isInterrupted() )
+ if ( addBack && !Thread.currentThread().isInterrupted() )
{
addPath(path);
}
@@ -17,6 +17,38 @@
public class TestReaper extends BaseClassForTests
{
+ @Test
+ public void testReapUntilDelete() throws Exception
+ {
+ Timing timing = new Timing();
+ Reaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ try
+ {
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/one/two/three");
+
+ Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
+
+ reaper = new Reaper(client, 100);
+ reaper.start();
+
+ reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_DELETE);
+ timing.sleepABit();
+
+ Assert.assertNull(client.checkExists().forPath("/one/two/three"));
+
+ client.create().forPath("/one/two/three");
+ timing.sleepABit();
+ Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
+ }
+ finally
+ {
+ Closeables.closeQuietly(reaper);
+ Closeables.closeQuietly(client);
+ }
+ }
+
@Test
public void testRemove() throws Exception
{

0 comments on commit 19988f7

Please sign in to comment.