From 035eefc316d84093be9090ceff15d4ac8a90fcdc Mon Sep 17 00:00:00 2001 From: Zhihong Zhang Date: Wed, 24 Jun 2015 13:55:59 -0400 Subject: [PATCH] CURATOR-225: Added new ErrorMode value KEEP. --- .../recipes/queue/DistributedQueue.java | 26 +++--- .../framework/recipes/queue/ErrorMode.java | 7 +- .../recipes/queue/TestDistributedQueue.java | 81 +++++++++++++++++++ 3 files changed, 103 insertions(+), 11 deletions(-) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java index a183adf3b8..11fe845ab0 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java @@ -650,7 +650,8 @@ public void run() private enum ProcessMessageBytesCode { NORMAL, - REQUEUE + REQUEUE, + KEEP } private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception @@ -686,6 +687,9 @@ private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] byte { resultCode = ProcessMessageBytesCode.REQUEUE; break; + } else if ( errorMode.get() == ErrorMode.KEEP ) { + resultCode = ProcessMessageBytesCode.KEEP; + break; } } } @@ -743,28 +747,30 @@ protected boolean processWithLockSafety(String itemNode, ProcessType type) throw lockCreated = true; String itemPath = ZKPaths.makePath(queuePath, itemNode); - boolean requeue = false; + ProcessMessageBytesCode code = ProcessMessageBytesCode.NORMAL; byte[] bytes = null; if ( type == ProcessType.NORMAL ) { bytes = client.getData().forPath(itemPath); - requeue = (processMessageBytes(itemNode, bytes) == ProcessMessageBytesCode.REQUEUE); + code = processMessageBytes(itemNode, bytes); } - if ( requeue ) + if ( code == ProcessMessageBytesCode.REQUEUE ) { client.inTransaction() - .delete().forPath(itemPath) - .and() - .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(itemPath, bytes) - .and() - .commit(); + .delete().forPath(itemPath) + .and() + .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(makeItemPath(), bytes) + .and() + .commit(); + } else if (code == ProcessMessageBytesCode.KEEP) { + // Need to update the node with the same data so watcher will pick up the node again + client.setData().forPath(itemPath, bytes); } else { client.delete().forPath(itemPath); } - return true; } catch ( KeeperException.NodeExistsException ignore ) diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ErrorMode.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ErrorMode.java index 2cd558efd6..76fd6b2663 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ErrorMode.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/ErrorMode.java @@ -32,5 +32,10 @@ public enum ErrorMode /** * If the consumer throws an exception, delete the message */ - DELETE + DELETE, + + /** + * If the consumer throws an exception, keep the message in queue. + */ + KEEP } diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedQueue.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedQueue.java index 80509bc71e..90c33e629d 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedQueue.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedQueue.java @@ -38,6 +38,7 @@ import org.testng.Assert; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; @@ -329,6 +330,86 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) } } + /** + * Test ErrorMode.KEEP + * + * @throws Exception + */ + @Test + public void testKeepMode() throws Exception + { + Timing timing = new Timing(); + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + client.start(); + + final TestQueueItem item1 = new TestQueueItem("1"); + final TestQueueItem item2 = new TestQueueItem("2"); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch consumerLatch = new CountDownLatch(1); + + try + { + QueueConsumer consumer = new QueueConsumer() + { + @Override + public void consumeMessage(TestQueueItem message) throws Exception + { + startLatch.await(); + + // Proceed with test when 2nd item is processed + if (message.equals(item2)) { + consumerLatch.countDown(); + } + + throw new Exception(); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + } + }; + DistributedQueue queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_PATH) + .lockPath("/locks").putInBackground(false).buildQueue(); + try + { + queue.setErrorMode(ErrorMode.KEEP); + queue.start(); + + queue.put(item1); + queue.put(item2); + + // Snapshot of the queue before exception + List list1 = queue.getChildren(); + Collections.sort(list1); + startLatch.countDown(); + + // Wait till 2n time is consumed + consumerLatch.await(); + + // Wait one more second to let queue processing complete + Thread.sleep(1000); + + // Snapshot of the queue after exception + List list2 = queue.getChildren(); + Collections.sort(list2); + + // Check same items are in the queue + Assert.assertEquals(list1, list2); + + } + finally + { + queue.close(); + } + } + finally + { + client.close(); + } + + } + @Test public void testNoDuplicateProcessing() throws Exception {