Permalink
Browse files

Added unlock operation

Based on patch submitted by Brent Ryan.

Change-Id: I9a30f21108fcbbd43add4abd2f16af05a3fbe68b
Reviewed-on: http://review.couchbase.org/12907
Reviewed-by: Michael Wiederhold <mike@couchbase.com>
Tested-by: Raghavan N. Srinivas <raghavan.srinivas@gmail.com>
  • Loading branch information...
1 parent 1a3c1cf commit a606674af0dee53fe2c05cb2cbbad2439b82ba90 @ragsns ragsns committed with ragss Feb 1, 2012
@@ -41,6 +41,7 @@
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.ops.GetlOperation;
import net.spy.memcached.ops.Operation;
+import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.transcoders.Transcoder;
@@ -280,6 +281,93 @@ public void complete() {
}
/**
+ * Unlock the given key asynchronously from the cache.
+ *
+ * @param key the key to unlock
+ * @param casId the CAS identifier
+ * @param tc the transcoder to serialize and unserialize value
+ * @return whether or not the operation was performed
+ * @throws IllegalStateException in the rare circumstance where queue is too
+ * full to accept any more requests
+ */
+ public <T> OperationFuture<Boolean> asyncUnlock(final String key,
+ long casId, final Transcoder<T> tc) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(key,
+ latch, operationTimeout);
+ Operation op = opFact.unlock(key, casId, new OperationCallback() {
+
+ @Override
+ public void receivedStatus(OperationStatus s) {
+ rv.set(s.isSuccess(), s);
+ }
+
+ @Override
+ public void complete() {
+ latch.countDown();
+ }
+ });
+ rv.setOperation(op);
+ mconn.enqueueOperation(key, op);
+ return rv;
+ }
+
+ /**
+ * Unlock the given key asynchronously from the cache with the default
+ * transcoder.
+ *
+ * @param key the key to unlock
+ * @param casId the CAS identifier
+ * @return whether or not the operation was performed
+ * @throws IllegalStateException in the rare circumstance where queue is too
+ * full to accept any more requests
+ */
+ public OperationFuture<Boolean> asyncUnlock(final String key,
+ long casId) {
+ return asyncUnlock(key, casId, transcoder);
+ }
+
+ /**
+ * Unlock the given key synchronously from the cache.
+ *
+ * @param key the key to unlock
+ * @param casId the CAS identifier
+ * @param tc the transcoder to serialize and unserialize value
+ * @return whether or not the operation was performed
+ * @throws IllegalStateException in the rare circumstance where queue is too
+ * full to accept any more requests
+ */
+ public <T> Boolean unlock(final String key,
+ long casId, final Transcoder<T> tc) {
+ try {
+ return asyncUnlock(key, casId, tc).get(operationTimeout,
+ TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted waiting for value", e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Exception waiting for value", e);
+ } catch (TimeoutException e) {
+ throw new OperationTimeoutException("Timeout waiting for value", e);
+ }
+
+ }
+ /**
+ * Unlock the given key synchronously from the cache with the default
+ * transcoder.
+ *
+ * @param key the key to unlock
+ * @param casId the CAS identifier
+ * @return whether or not the operation was performed
+ * @throws IllegalStateException in the rare circumstance where queue is too
+ * full to accept any more requests
+ */
+
+ public Boolean unlock(final String key,
+ long casId) {
+ return unlock(key, casId, transcoder);
+ }
+
+ /**
* Gets the number of vBuckets that are contained in the cluster. This
* function is for internal use only and should rarely be since there
* are few use cases in which it is necessary.
@@ -26,8 +26,10 @@
import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClientIF;
+import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.transcoders.Transcoder;
+
/**
* This interface is provided as a helper for testing clients of the
* CouchbaseClient.
@@ -42,6 +44,18 @@
<T> CASValue<T> getAndLock(String key, int exp, Transcoder<T> tc);
CASValue<Object> getAndLock(String key, int exp);
+ <T> OperationFuture<Boolean> asyncUnlock(final String key,
+ long casId, final Transcoder<T> tc);
+
+ OperationFuture<Boolean> asyncUnlock(final String key,
+ long casId);
+
+ <T> Boolean unlock(final String key,
+ long casId, final Transcoder<T> tc);
+
+ Boolean unlock(final String key,
+ long casId);
+
int getNumVBuckets();
}
@@ -26,6 +26,7 @@
import java.net.URI;
import java.util.Arrays;
import net.spy.memcached.BinaryClientTest;
+import net.spy.memcached.CASValue;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.TestConfig;
@@ -103,6 +104,17 @@ public void testSimpleGetl() throws Exception {
: "Key was locked for too long";
}
+ public void testSimpleUnlock() throws Exception {
+ assertNull(client.get("getunltest"));
+ client.set("getunltest", 0, "value");
+ CASValue<Object> casv =
+ ((CouchbaseClient)client).getAndLock("getunltest", 6);
+ assert !client.set("getunltest", 1, "newvalue").get().booleanValue()
+ : "Key wasn't locked for the right amount of time";
+ ((CouchbaseClient)client).unlock("getunltest", casv.getCas());
+ assert client.set("getunltest", 1, "newvalue").get().booleanValue()
+ : "Key was locked for too long";
+ }
public void testGetStatsSlabs() throws Exception {
// Empty
}

0 comments on commit a606674

Please sign in to comment.