From c2cbb9d071d170a2d41eb922a866d0a43e8dc054 Mon Sep 17 00:00:00 2001 From: Dustin Sallings Date: Thu, 5 Apr 2007 01:13:59 -0700 Subject: [PATCH] Returning futures from delete and flush. A client may now synchronize on deletes and flushes (e.g. the client test). --- TODO | 1 - .../net/spy/memcached/MemcachedClient.java | 38 +++++++++++++------ .../spy/memcached/ops/DeleteOperation.java | 5 ++- .../net/spy/memcached/ops/FlushOperation.java | 9 +++-- src/test/net/spy/memcached/ClientTest.java | 33 +++++++++++++--- 5 files changed, 64 insertions(+), 22 deletions(-) diff --git a/TODO b/TODO index 39ed27b90..d057213ee 100644 --- a/TODO +++ b/TODO @@ -1,5 +1,4 @@ * Store operations should return something more useful than strings. -* delete and flush need to return futures. * Write up some more overview documentation, download links, etc... -- diff --git a/src/java/net/spy/memcached/MemcachedClient.java b/src/java/net/spy/memcached/MemcachedClient.java index 468bf5204..2a9a467db 100644 --- a/src/java/net/spy/memcached/MemcachedClient.java +++ b/src/java/net/spy/memcached/MemcachedClient.java @@ -498,33 +498,49 @@ public long decr(String key, int by, long def) { * @param key the key to delete * @param when when the deletion should take effect */ - public void delete(String key, int when) { - addOp(getServerForKey(key), new DeleteOperation(key, when)); + public Future delete(String key, int when) { + final CountDownLatch latch=new CountDownLatch(1); + final OperationFuture rv=new OperationFuture(latch); + DeleteOperation op=new DeleteOperation(key, when, + new OperationCallback() { + public void receivedStatus(String line) { + rv.set(line.equals("DELETED")); + latch.countDown(); + }}); + rv.setOperation(op); + addOp(getServerForKey(key), op); + return rv; } /** * Shortcut to delete that will immediately delete the item from the cache. */ - public void delete(String key) { - delete(key, 0); + public Future delete(String key) { + return delete(key, 0); } /** * Flush all caches from all servers with a delay of application. */ - public void flush(int delay) { + public Future flush(int delay) { + final CountDownLatch latch=new CountDownLatch(conn.getNumConnections()); + final OperationFuture rv=new OperationFuture(latch); + // XXX: We will not be able to cancel this operation for(int i=0; i flush() { + return flush(-1); } /** @@ -677,7 +693,7 @@ public boolean cancel(boolean ign) { public T get() throws InterruptedException, ExecutionException { latch.await(); - if(op.isCancelled()) { + if(op != null && op.isCancelled()) { throw new ExecutionException(new RuntimeException("Cancelled")); } return obj; diff --git a/src/java/net/spy/memcached/ops/DeleteOperation.java b/src/java/net/spy/memcached/ops/DeleteOperation.java index cfd142b73..90181dfb4 100644 --- a/src/java/net/spy/memcached/ops/DeleteOperation.java +++ b/src/java/net/spy/memcached/ops/DeleteOperation.java @@ -14,17 +14,20 @@ public class DeleteOperation extends Operation { private String key=null; private int when=0; + private OperationCallback callback=null; - public DeleteOperation(String k, int w) { + public DeleteOperation(String k, int w, OperationCallback cb) { super(); key=k; when=w; + callback=cb; } @Override public void handleLine(String line) { getLogger().debug("Delete of %s returned %s", key, line); assert line.equals("DELETED") || line.equals("NOT_FOUND"); + callback.receivedStatus(line); transitionState(State.COMPLETE); } diff --git a/src/java/net/spy/memcached/ops/FlushOperation.java b/src/java/net/spy/memcached/ops/FlushOperation.java index a8a4e8719..510d0d596 100644 --- a/src/java/net/spy/memcached/ops/FlushOperation.java +++ b/src/java/net/spy/memcached/ops/FlushOperation.java @@ -12,20 +12,23 @@ public class FlushOperation extends Operation { private static final byte[] FLUSH="flush_all\r\n".getBytes(); private int delay=-1; + private OperationCallback callback=null; - public FlushOperation() { - super(); + public FlushOperation(OperationCallback cb) { + this(-1, cb); } - public FlushOperation(int d) { + public FlushOperation(int d, OperationCallback cb) { super(); delay=d; + callback=cb; } @Override public void handleLine(String line) { assert line.equals("OK"); getLogger().debug("Flush completed successfully"); + callback.receivedStatus(line); transitionState(State.COMPLETE); } diff --git a/src/test/net/spy/memcached/ClientTest.java b/src/test/net/spy/memcached/ClientTest.java index 3118ddb55..e3bb0b090 100644 --- a/src/test/net/spy/memcached/ClientTest.java +++ b/src/test/net/spy/memcached/ClientTest.java @@ -6,8 +6,10 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.Future; import junit.framework.TestCase; + import net.spy.test.SyncThread; /** @@ -25,10 +27,7 @@ protected void setUp() throws Exception { @Override protected void tearDown() throws Exception { - client.flush(); - // XXX: This exists because flush is async. We need to wait for the - // flush to finish before shutting down. - client.getVersions(); + assertTrue(client.flush().get()); client.shutdown(); client=null; super.tearDown(); @@ -165,7 +164,18 @@ public void testImmediateDelete() throws Exception { assertNull(client.get("test1")); } - public void testFutureDelete() throws Exception { + public void testDeleteFuture() throws Exception { + assertNull(client.get("test1")); + client.set("test1", 5, "test1value"); + assertEquals("test1value", client.get("test1")); + Future f=client.delete("test1"); + assertNull(client.get("test1")); + assertTrue("Deletion didn't return true", f.get()); + assertFalse("Second deletion returned true", + client.delete("test1").get()); + } + + public void testDelayedDelete() throws Exception { assertNull(client.get("test1")); client.set("test1", 5, "test1value"); assertEquals("test1value", client.get("test1")); @@ -182,7 +192,7 @@ public void testFutureDelete() throws Exception { assertEquals("test1value", client.get("test1")); } - public void testFutureFlush() throws Exception { + public void testDelayedFlush() throws Exception { assertNull(client.get("test1")); client.set("test1", 5, "test1value"); client.set("test2", 5, "test2value"); @@ -194,6 +204,17 @@ public void testFutureFlush() throws Exception { assertNull(client.get("test2")); } + public void testFlush() throws Exception { + assertNull(client.get("test1")); + client.set("test1", 5, "test1value"); + client.set("test2", 5, "test2value"); + assertEquals("test1value", client.get("test1")); + assertEquals("test2value", client.get("test2")); + assertTrue(client.flush().get()); + assertNull(client.get("test1")); + assertNull(client.get("test2")); + } + public void testGetKeys() throws Exception { client.set("test1", 5, "test1value"); client.set("test2", 5, "test2value");