Skip to content

Commit

Permalink
SPY-156: Expose more async mutate methods.
Browse files Browse the repository at this point in the history
Motivation
----------
More flexible incr/decr methods are exposed as sync variants than
with their async counterparts. This patch is an effort to bring them
on par (almost) completely.

Modifications
-------------
More async variants have been exposed, which were already supported
through the private asyncMutate method. Note that since the ascii
protocol doesn't support defaults and expiration for incr/decr, those
return an UnsupportedOperationException, because it is nontrivial to
fix. Please refer to the sync variants if you need that.

Result
------
When the binary protocol is used, much more asyncIncr/asyncDecr methods
are exposed, explicitly helping with default values and expiration.

Change-Id: I4403b14d6146afb325afa9c239da28b55e89e935
Reviewed-on: http://review.couchbase.org/34257
Reviewed-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
Tested-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
  • Loading branch information
daschl authored and Michael Nitschinger committed Mar 26, 2014
1 parent e6892a0 commit 6ada2e6
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 10 deletions.
147 changes: 141 additions & 6 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import net.spy.memcached.ops.StoreOperation;
import net.spy.memcached.ops.StoreType;
import net.spy.memcached.ops.TimedOutOperationStatus;
import net.spy.memcached.protocol.ascii.AsciiOperationFactory;
import net.spy.memcached.protocol.binary.BinaryOperationFactory;
import net.spy.memcached.transcoders.TranscodeService;
import net.spy.memcached.transcoders.Transcoder;
Expand Down Expand Up @@ -1978,6 +1979,12 @@ private long mutateWithDefault(Mutator t, String key, long by, long def,

private OperationFuture<Long> asyncMutate(Mutator m, String key, long by,
long def, int exp) {
if (!(opFact instanceof BinaryOperationFactory) && (def != 0 || exp != -1)) {
throw new UnsupportedOperationException("Default value or expiration "
+ "time are not supported on the async mutate methods. Use either the "
+ "binary protocol or the sync variant.");
}

final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture<Long> rv =
new OperationFuture<Long>(key, latch, operationTimeout, executorService);
Expand Down Expand Up @@ -2030,9 +2037,9 @@ public OperationFuture<Long> asyncIncr(String key, int by) {
/**
* Asynchronous decrement.
*
* @param key key to increment
* @param by the amount to increment the value by
* @return a future with the decremented value, or -1 if the increment failed.
* @param key key to decrement
* @param by the amount to decrement the value by
* @return a future with the decremented value, or -1 if the decrement failed.
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
Expand All @@ -2044,9 +2051,9 @@ public OperationFuture<Long> asyncDecr(String key, long by) {
/**
* Asynchronous decrement.
*
* @param key key to increment
* @param by the amount to increment the value by
* @return a future with the decremented value, or -1 if the increment failed.
* @param key key to decrement
* @param by the amount to decrement the value by
* @return a future with the decremented value, or -1 if the decrement failed.
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
Expand All @@ -2055,6 +2062,134 @@ public OperationFuture<Long> asyncDecr(String key, int by) {
return asyncMutate(Mutator.decr, key, by, 0, -1);
}

/**
* Asychronous increment.
*
* @param key key to increment
* @param by the amount to increment the value by
* @param def the default value (if the counter does not exist)
* @param exp the expiration of this object
* @return a future with the incremented value, or -1 if the increment failed.
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
@Override
public OperationFuture<Long> asyncIncr(String key, long by, long def,
int exp) {
return asyncMutate(Mutator.incr, key, by, def, exp);
}

/**
* Asychronous increment.
*
* @param key key to increment
* @param by the amount to increment the value by
* @param def the default value (if the counter does not exist)
* @param exp the expiration of this object
* @return a future with the incremented value, or -1 if the increment failed.
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
@Override
public OperationFuture<Long> asyncIncr(String key, int by, long def,
int exp) {
return asyncMutate(Mutator.incr, key, by, def, exp);
}

/**
* Asynchronous decrement.
*
* @param key key to decrement
* @param by the amount to decrement the value by
* @param def the default value (if the counter does not exist)
* @param exp the expiration of this object
* @return a future with the decremented value, or -1 if the decrement failed.
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
@Override
public OperationFuture<Long> asyncDecr(String key, long by, long def,
int exp) {
return asyncMutate(Mutator.decr, key, by, def, exp);
}

/**
* Asynchronous decrement.
*
* @param key key to decrement
* @param by the amount to decrement the value by
* @param def the default value (if the counter does not exist)
* @param exp the expiration of this object
* @return a future with the decremented value, or -1 if the decrement failed.
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
@Override
public OperationFuture<Long> asyncDecr(String key, int by, long def,
int exp) {
return asyncMutate(Mutator.decr, key, by, def, exp);
}

/**
* Asychronous increment.
*
* @param key key to increment
* @param by the amount to increment the value by
* @param def the default value (if the counter does not exist)
* @return a future with the incremented value, or -1 if the increment failed.
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
@Override
public OperationFuture<Long> asyncIncr(String key, long by, long def) {
return asyncMutate(Mutator.incr, key, by, def, 0);
}

/**
* Asychronous increment.
*
* @param key key to increment
* @param by the amount to increment the value by
* @param def the default value (if the counter does not exist)
* @return a future with the incremented value, or -1 if the increment failed.
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
@Override
public OperationFuture<Long> asyncIncr(String key, int by, long def) {
return asyncMutate(Mutator.incr, key, by, def, 0);
}

/**
* Asynchronous decrement.
*
* @param key key to decrement
* @param by the amount to decrement the value by
* @param def the default value (if the counter does not exist)
* @return a future with the decremented value, or -1 if the decrement failed.
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
@Override
public OperationFuture<Long> asyncDecr(String key, long by, long def) {
return asyncMutate(Mutator.decr, key, by, def, 0);
}

/**
* Asynchronous decrement.
*
* @param key key to decrement
* @param by the amount to decrement the value by
* @param def the default value (if the counter does not exist)
* @return a future with the decremented value, or -1 if the decrement failed.
* @throws IllegalStateException in the rare circumstance where queue is too
* full to accept any more requests
*/
@Override
public OperationFuture<Long> asyncDecr(String key, int by, long def) {
return asyncMutate(Mutator.decr, key, by, def, 0);
}

/**
* Increment the given counter, returning the new value.
*
Expand Down
24 changes: 20 additions & 4 deletions src/main/java/net/spy/memcached/MemcachedClientIF.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ <T> Future<Boolean> touch(final String key, final int exp,

long decr(String key, int by);

Future<Long> asyncIncr(String key, long by);

Future<Long> asyncIncr(String key, int by);

Future<Long> asyncDecr(String key, long by);

Future<Long> asyncDecr(String key, int by);

long incr(String key, long by, long def, int exp);

long incr(String key, int by, long def, int exp);
Expand All @@ -180,13 +188,13 @@ <T> Future<Boolean> touch(final String key, final int exp,

long decr(String key, int by, long def, int exp);

Future<Long> asyncIncr(String key, long by);
Future<Long> asyncIncr(String key, long by, long def, int exp);

Future<Long> asyncIncr(String key, int by);
Future<Long> asyncIncr(String key, int by, long def, int exp);

Future<Long> asyncDecr(String key, long by);
Future<Long> asyncDecr(String key, long by, long def, int exp);

Future<Long> asyncDecr(String key, int by);
Future<Long> asyncDecr(String key, int by, long def, int exp);

long incr(String key, long by, long def);

Expand All @@ -196,6 +204,14 @@ <T> Future<Boolean> touch(final String key, final int exp,

long decr(String key, int by, long def);

Future<Long> asyncIncr(String key, long by, long def);

Future<Long> asyncIncr(String key, int by, long def);

Future<Long> asyncDecr(String key, long by, long def);

Future<Long> asyncDecr(String key, int by, long def);

Future<Boolean> delete(String key);

Future<Boolean> delete(String key, long cas);
Expand Down
20 changes: 20 additions & 0 deletions src/test/java/net/spy/memcached/AsciiClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,24 @@ public void testAddGetSetStatusCodes() throws Exception {
assertEquals(StatusCode.ERR_NOT_STORED, add.getStatus().getStatusCode());
}

public void testAsyncIncrementWithDefault() throws Exception {
String k = "async-incr-with-default";
try {
client.asyncIncr(k, 1, 5);
assertTrue(false);
} catch (UnsupportedOperationException e) {
assertTrue(true);
}
}

public void testAsyncDecrementWithDefault() throws Exception {
String k = "async-decr-with-default";
try {
client.asyncDecr(k, 1, 5);
assertTrue(false);
} catch (UnsupportedOperationException e) {
assertTrue(true);
}
}

}
22 changes: 22 additions & 0 deletions src/test/java/net/spy/memcached/BinaryClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,26 @@ public void testAddGetSetStatusCodes() throws Exception {
assertEquals(StatusCode.ERR_EXISTS, add.getStatus().getStatusCode());
}

public void testAsyncIncrementWithDefault() throws Exception {
String k = "async-incr-with-default";
OperationFuture<Long> f = client.asyncIncr(k, 1, 5);
assertEquals(StatusCode.SUCCESS, f.getStatus().getStatusCode());
assertEquals(5, (long) f.get());

f = client.asyncIncr(k, 1, 5);
assertEquals(StatusCode.SUCCESS, f.getStatus().getStatusCode());
assertEquals(6, (long) f.get());
}

public void testAsyncDecrementWithDefault() throws Exception {
String k = "async-decr-with-default";
OperationFuture<Long> f = client.asyncDecr(k, 4, 10);
assertEquals(StatusCode.SUCCESS, f.getStatus().getStatusCode());
assertEquals(10, (long) f.get());

f = client.asyncDecr(k, 4, 10);
assertEquals(StatusCode.SUCCESS, f.getStatus().getStatusCode());
assertEquals(6, (long) f.get());
}

}

0 comments on commit 6ada2e6

Please sign in to comment.