Skip to content

Commit

Permalink
Adding a getCas() for Store functions
Browse files Browse the repository at this point in the history
Change-Id: Ieced868b0b9870838801fd45eee010f61e215fd6
  • Loading branch information
ragsns committed Aug 13, 2012
1 parent bda95ec commit 6c15907
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 19 deletions.
12 changes: 9 additions & 3 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Expand Up @@ -66,6 +66,7 @@
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.StatsOperation;
import net.spy.memcached.ops.StoreOperation;
import net.spy.memcached.ops.StoreType;
import net.spy.memcached.ops.TimedOutOperationStatus;
import net.spy.memcached.transcoders.TranscodeService;
Expand Down Expand Up @@ -283,10 +284,13 @@ private <T> OperationFuture<Boolean> asyncStore(StoreType storeType,
final OperationFuture<Boolean> rv =
new OperationFuture<Boolean>(key, latch, operationTimeout);
Operation op = opFact.store(storeType, key, co.getFlags(), exp,
co.getData(), new OperationCallback() {
co.getData(), new StoreOperation.Callback() {
public void receivedStatus(OperationStatus val) {
rv.set(val.isSuccess(), val);
}
public void gotData(String key, long cas) {
rv.setCas(cas);
}

public void complete() {
latch.countDown();
Expand Down Expand Up @@ -485,7 +489,7 @@ public <T> Future<CASResponse> asyncCAS(String key, long casId, int exp,
final OperationFuture<CASResponse> rv =
new OperationFuture<CASResponse>(key, latch, operationTimeout);
Operation op = opFact.cas(StoreType.set, key, casId, co.getFlags(), exp,
co.getData(), new OperationCallback() {
co.getData(), new StoreOperation.Callback() {
public void receivedStatus(OperationStatus val) {
if (val instanceof CASOperationStatus) {
rv.set(((CASOperationStatus) val).getCASResponse(), val);
Expand All @@ -497,7 +501,9 @@ public void receivedStatus(OperationStatus val) {
throw new RuntimeException("Unhandled state: " + val);
}
}

public void gotData(String key, long cas) {
rv.setCas(cas);
}
public void complete() {
latch.countDown();
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/net/spy/memcached/OperationFactory.java
Expand Up @@ -182,7 +182,7 @@ MutatorOperation mutate(Mutator m, String key, long by, long def, int exp,
* @return the new store operation
*/
StoreOperation store(StoreType storeType, String key, int flags, int exp,
byte[] data, OperationCallback cb);
byte[] data, StoreOperation.Callback cb);

/**
* Resets a keys expiration time.
Expand Down Expand Up @@ -219,7 +219,7 @@ ConcatenationOperation cat(ConcatenationType catType, long casId, String key,
* @return the new store operation
*/
CASOperation cas(StoreType t, String key, long casId, int flags, int exp,
byte[] data, OperationCallback cb);
byte[] data, StoreOperation.Callback cb);

/**
* Create a new version operation.
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/net/spy/memcached/internal/OperationFuture.java
Expand Up @@ -57,6 +57,7 @@ public class OperationFuture<T> extends SpyObject implements Future<T> {
private final long timeout;
private Operation op;
private final String key;
private Long cas;

/**
* Create an OperationFuture for a given async operation.
Expand Down Expand Up @@ -89,6 +90,7 @@ public OperationFuture(String k, CountDownLatch l, AtomicReference<T> oref,
status = null;
timeout = opTimeout;
key = k;
cas = null;
}

/**
Expand Down Expand Up @@ -185,6 +187,32 @@ public String getKey() {
return key;
}

/**
* Set the key for this operation.
*
* @param inCas the CAS value
*/
public void setCas(long inCas) {
this.cas = inCas;
}

/**
* Get the CAS for this operation.
* If this is for an ASCII protocol configured client,
* the exception Un
*
* @return the CAS for this operation or null
* if unsuccessful.
*
*/
public long getCas() throws UnsupportedOperationException {
if (isDone() && getStatus().isSuccess() &&
(cas == null)) {
throw new UnsupportedOperationException("ASCII Protocol"
+ " does not return a CAS value");
}
return cas;
}
/**
* Get the current status of this operation.
*
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/net/spy/memcached/ops/BaseOperationFactory.java
Expand Up @@ -63,7 +63,7 @@ public Collection<Operation> clone(KeyedOperation op) {
CASOperation cop = (CASOperation) op;
rv.add(cas(cop.getStoreType(), first(op.getKeys()), cop.getCasValue(),
cop.getFlags(), cop.getExpiration(), cop.getData(),
cop.getCallback()));
(StoreOperation.Callback) cop.getCallback()));
} else if(op instanceof DeleteOperation) {
rv.add(delete(first(op.getKeys()), op.getCallback()));
} else if (op instanceof MutatorOperation) {
Expand All @@ -73,7 +73,8 @@ public Collection<Operation> clone(KeyedOperation op) {
} else if (op instanceof StoreOperation) {
StoreOperation so = (StoreOperation) op;
rv.add(store(so.getStoreType(), first(op.getKeys()), so.getFlags(),
so.getExpiration(), so.getData(), op.getCallback()));
so.getExpiration(), so.getData(),
(StoreOperation.Callback) op.getCallback()));
} else if (op instanceof ConcatenationOperation) {
ConcatenationOperation c = (ConcatenationOperation) op;
rv.add(cat(c.getStoreType(), c.getCasValue(), first(op.getKeys()),
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/net/spy/memcached/ops/StoreOperation.java
Expand Up @@ -52,4 +52,16 @@ public interface StoreOperation extends KeyedOperation {
* </p>
*/
byte[] getData();
/**
* Operation callback to get the CAS value
*/
interface Callback extends OperationCallback {
/**
* Callback for each result from a Store.
*
* @param key the key that was retrieved
* @param cas the CAS value for this record
*/
void gotData(String key, long cas);
}
}
Expand Up @@ -109,7 +109,7 @@ public StatsOperation stats(String arg, StatsOperation.Callback cb) {
}

public StoreOperation store(StoreType storeType, String key, int flags,
int exp, byte[] data, OperationCallback cb) {
int exp, byte[] data, StoreOperation.Callback cb) {
return new StoreOperationImpl(storeType, key, flags, exp, data, cb);
}

Expand All @@ -128,7 +128,7 @@ public NoopOperation noop(OperationCallback cb) {
}

public CASOperation cas(StoreType type, String key, long casId, int flags,
int exp, byte[] data, OperationCallback cb) {
int exp, byte[] data, StoreOperation.Callback cb) {
return new CASOperationImpl(key, casId, flags, exp, data, cb);
}

Expand Down
Expand Up @@ -110,7 +110,7 @@ public StatsOperation stats(String arg,
}

public StoreOperation store(StoreType storeType, String key, int flags,
int exp, byte[] data, OperationCallback cb) {
int exp, byte[] data, StoreOperation.Callback cb) {
return new StoreOperationImpl(storeType, key, flags, exp, data, 0, cb);
}

Expand All @@ -128,7 +128,7 @@ public NoopOperation noop(OperationCallback cb) {
}

public CASOperation cas(StoreType type, String key, long casId, int flags,
int exp, byte[] data, OperationCallback cb) {
int exp, byte[] data, StoreOperation.Callback cb) {
return new StoreOperationImpl(type, key, flags, exp, data, casId, cb);
}

Expand Down
Expand Up @@ -24,7 +24,6 @@
package net.spy.memcached.protocol.binary;

import net.spy.memcached.ops.CASOperation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.StoreOperation;
import net.spy.memcached.ops.StoreType;

Expand Down Expand Up @@ -69,7 +68,7 @@ private static byte cmdMap(StoreType t) {
}

public StoreOperationImpl(StoreType t, String k, int f, int e, byte[] d,
long c, OperationCallback cb) {
long c, StoreOperation.Callback cb) {
super(cmdMap(t), generateOpaque(), k, cb);
flags = f;
exp = e;
Expand Down Expand Up @@ -103,6 +102,12 @@ public StoreType getStoreType() {
return storeType;
}

@Override
protected void decodePayload(byte[] pl) {
super.decodePayload(pl);
((StoreOperation.Callback) getCallback()).gotData(key, responseCas);
}

@Override
public String toString() {
return super.toString() + " Cas: " + cas + " Exp: " + exp + " Flags: "
Expand Down
5 changes: 5 additions & 0 deletions src/test/java/net/spy/memcached/AsciiClientTest.java
Expand Up @@ -29,6 +29,7 @@
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.protocol.ascii.ExtensibleOperationImpl;
import org.junit.Test;

/**
* This test assumes a server is running on the host specified in the
Expand Down Expand Up @@ -60,6 +61,10 @@ public void initialize() {
});
}

@Override
@Test(expected=UnsupportedOperationException.class)
public void testSetReturnsCAS() {
}
@Override
protected String getExpectedVersionSource() {
return String.valueOf(new InetSocketAddress(TestConfig.IPV4_ADDR,
Expand Down
28 changes: 22 additions & 6 deletions src/test/java/net/spy/memcached/OperationFactoryTestBase.java
Expand Up @@ -54,6 +54,7 @@ public abstract class OperationFactoryTestBase extends MockObjectTestCase {
public static final String TEST_KEY = "someKey";
protected OperationFactory ofact = null;
protected OperationCallback genericCallback;
protected StoreOperation.Callback storeCallback;
private byte[] testData;

@Override
Expand All @@ -69,7 +70,18 @@ public void receivedStatus(OperationStatus status) {
fail("Unexpected status: " + status);
}
};
storeCallback = new StoreOperation.Callback() {
public void complete() {
fail("Unexpected invocation");
}

public void gotData(String key, long cas) {
}

public void receivedStatus(OperationStatus status) {
fail("Unexpected status: " + status);
}
};
testData = new byte[64];
new Random().nextBytes(testData);
}
Expand All @@ -89,15 +101,15 @@ public void testDeleteOperationCloning() {

public void testCASOperationCloning() {
CASOperation op = ofact.cas(StoreType.set, "someKey", 727582, 8174, 7175,
testData, genericCallback);
testData, storeCallback);

CASOperation op2 = cloneOne(CASOperation.class, op);
assertKey(op2);
assertEquals(727582, op2.getCasValue());
assertEquals(8174, op2.getFlags());
assertEquals(7175, op2.getExpiration());
assertBytes(op2.getData());
assertCallback(op2);
assertStoreCallback(op2);
}

public void testMutatorOperationIncrCloning() {
Expand Down Expand Up @@ -136,28 +148,28 @@ public void testStoreOperationAddCloning() {
int exp = 823862;
int flags = 7735;
StoreOperation op = ofact.store(StoreType.add, TEST_KEY, flags, exp,
testData, genericCallback);
testData, storeCallback);

StoreOperation op2 = cloneOne(StoreOperation.class, op);
assertKey(op2);
assertEquals(exp, op2.getExpiration());
assertEquals(flags, op2.getFlags());
assertSame(StoreType.add, op2.getStoreType());
assertCallback(op2);
assertStoreCallback(op2);
}

public void testStoreOperationSetCloning() {
int exp = 823862;
int flags = 7735;
StoreOperation op = ofact.store(StoreType.set, TEST_KEY, flags, exp,
testData, genericCallback);
testData, storeCallback);

StoreOperation op2 = cloneOne(StoreOperation.class, op);
assertKey(op2);
assertEquals(exp, op2.getExpiration());
assertEquals(flags, op2.getFlags());
assertSame(StoreType.set, op2.getStoreType());
assertCallback(op2);
assertStoreCallback(op2);
}

public void testConcatenationOperationAppendCloning() {
Expand Down Expand Up @@ -258,6 +270,10 @@ protected void assertCallback(Operation op) {
assertSame(genericCallback, op.getCallback());
}

protected void assertStoreCallback(Operation op) {
assertSame(storeCallback, op.getCallback());
}

private void assertBytes(byte[] bytes) {
assertTrue(Arrays.equals(testData, bytes));
}
Expand Down
8 changes: 8 additions & 0 deletions src/test/java/net/spy/memcached/ProtocolBaseCase.java
Expand Up @@ -883,6 +883,14 @@ public void testPrependNoSuchKey() throws Exception {
assertNull(client.get(key));
}

public void testSetReturnsCAS() throws Exception {

OperationFuture<Boolean> setOp = client.set("testSetReturnsCAS",
0, "testSetReturnsCAS");
setOp.get();
assertTrue(setOp.getCas() > 0);
}

private static class TestTranscoder implements Transcoder<String> {
private static final int FLAGS = 238885206;

Expand Down

0 comments on commit 6c15907

Please sign in to comment.