Skip to content

Commit

Permalink
Returning futures from delete and flush.
Browse files Browse the repository at this point in the history
	A client may now synchronize on deletes and flushes (e.g. the client test).
  • Loading branch information
dustin committed Apr 5, 2007
1 parent 9927c6d commit c2cbb9d
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 22 deletions.
1 change: 0 additions & 1 deletion TODO
@@ -1,5 +1,4 @@
* Store operations should return something more useful than strings.
* delete and flush need to return futures.
* Write up some more overview documentation, download links, etc...

--
Expand Down
38 changes: 27 additions & 11 deletions src/java/net/spy/memcached/MemcachedClient.java
Expand Up @@ -498,33 +498,49 @@ public long decr(String key, int by, long def) {
* @param key the key to delete
* @param when when the deletion should take effect
*/
public void delete(String key, int when) {
addOp(getServerForKey(key), new DeleteOperation(key, when));
public Future<Boolean> delete(String key, int when) {
final CountDownLatch latch=new CountDownLatch(1);
final OperationFuture<Boolean> rv=new OperationFuture<Boolean>(latch);
DeleteOperation op=new DeleteOperation(key, when,
new OperationCallback() {
public void receivedStatus(String line) {
rv.set(line.equals("DELETED"));
latch.countDown();
}});
rv.setOperation(op);
addOp(getServerForKey(key), op);
return rv;
}

/**
* Shortcut to delete that will immediately delete the item from the cache.
*/
public void delete(String key) {
delete(key, 0);
public Future<Boolean> delete(String key) {
return delete(key, 0);
}

/**
* Flush all caches from all servers with a delay of application.
*/
public void flush(int delay) {
public Future<Boolean> flush(int delay) {
final CountDownLatch latch=new CountDownLatch(conn.getNumConnections());
final OperationFuture<Boolean> rv=new OperationFuture<Boolean>(latch);
// XXX: We will not be able to cancel this operation
for(int i=0; i<conn.getNumConnections(); i++) {
addOp(i, new FlushOperation(delay));
addOp(i, new FlushOperation(delay, new OperationCallback(){
public void receivedStatus(String line) {
rv.set(line.equals("OK"));
latch.countDown();
}}));
}
return rv;
}

/**
* Flush all caches from all servers immediately.
*/
public void flush() {
for(int i=0; i<conn.getNumConnections(); i++) {
addOp(i, new FlushOperation());
}
public Future<Boolean> flush() {
return flush(-1);
}

/**
Expand Down Expand Up @@ -677,7 +693,7 @@ public boolean cancel(boolean ign) {

public T get() throws InterruptedException, ExecutionException {
latch.await();
if(op.isCancelled()) {
if(op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException("Cancelled"));
}
return obj;
Expand Down
5 changes: 4 additions & 1 deletion src/java/net/spy/memcached/ops/DeleteOperation.java
Expand Up @@ -14,17 +14,20 @@ public class DeleteOperation extends Operation {

private String key=null;
private int when=0;
private OperationCallback callback=null;

public DeleteOperation(String k, int w) {
public DeleteOperation(String k, int w, OperationCallback cb) {
super();
key=k;
when=w;
callback=cb;
}

@Override
public void handleLine(String line) {
getLogger().debug("Delete of %s returned %s", key, line);
assert line.equals("DELETED") || line.equals("NOT_FOUND");
callback.receivedStatus(line);
transitionState(State.COMPLETE);
}

Expand Down
9 changes: 6 additions & 3 deletions src/java/net/spy/memcached/ops/FlushOperation.java
Expand Up @@ -12,20 +12,23 @@ public class FlushOperation extends Operation {

private static final byte[] FLUSH="flush_all\r\n".getBytes();
private int delay=-1;
private OperationCallback callback=null;

public FlushOperation() {
super();
public FlushOperation(OperationCallback cb) {
this(-1, cb);
}

public FlushOperation(int d) {
public FlushOperation(int d, OperationCallback cb) {
super();
delay=d;
callback=cb;
}

@Override
public void handleLine(String line) {
assert line.equals("OK");
getLogger().debug("Flush completed successfully");
callback.receivedStatus(line);
transitionState(State.COMPLETE);
}

Expand Down
33 changes: 27 additions & 6 deletions src/test/net/spy/memcached/ClientTest.java
Expand Up @@ -6,8 +6,10 @@
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

import junit.framework.TestCase;

import net.spy.test.SyncThread;

/**
Expand All @@ -25,10 +27,7 @@ protected void setUp() throws Exception {

@Override
protected void tearDown() throws Exception {
client.flush();
// XXX: This exists because flush is async. We need to wait for the
// flush to finish before shutting down.
client.getVersions();
assertTrue(client.flush().get());
client.shutdown();
client=null;
super.tearDown();
Expand Down Expand Up @@ -165,7 +164,18 @@ public void testImmediateDelete() throws Exception {
assertNull(client.get("test1"));
}

public void testFutureDelete() throws Exception {
public void testDeleteFuture() throws Exception {
assertNull(client.get("test1"));
client.set("test1", 5, "test1value");
assertEquals("test1value", client.get("test1"));
Future<Boolean> f=client.delete("test1");
assertNull(client.get("test1"));
assertTrue("Deletion didn't return true", f.get());
assertFalse("Second deletion returned true",
client.delete("test1").get());
}

public void testDelayedDelete() throws Exception {
assertNull(client.get("test1"));
client.set("test1", 5, "test1value");
assertEquals("test1value", client.get("test1"));
Expand All @@ -182,7 +192,7 @@ public void testFutureDelete() throws Exception {
assertEquals("test1value", client.get("test1"));
}

public void testFutureFlush() throws Exception {
public void testDelayedFlush() throws Exception {
assertNull(client.get("test1"));
client.set("test1", 5, "test1value");
client.set("test2", 5, "test2value");
Expand All @@ -194,6 +204,17 @@ public void testFutureFlush() throws Exception {
assertNull(client.get("test2"));
}

public void testFlush() throws Exception {
assertNull(client.get("test1"));
client.set("test1", 5, "test1value");
client.set("test2", 5, "test2value");
assertEquals("test1value", client.get("test1"));
assertEquals("test2value", client.get("test2"));
assertTrue(client.flush().get());
assertNull(client.get("test1"));
assertNull(client.get("test2"));
}

public void testGetKeys() throws Exception {
client.set("test1", 5, "test1value");
client.set("test2", 5, "test2value");
Expand Down

0 comments on commit c2cbb9d

Please sign in to comment.