Skip to content

Commit

Permalink
Merged ipv6 work from default.
Browse files Browse the repository at this point in the history
  • Loading branch information
dustin committed Feb 26, 2008
2 parents faf8dd4 + e4fcf07 commit 3158eac
Show file tree
Hide file tree
Showing 23 changed files with 761 additions and 145 deletions.
15 changes: 15 additions & 0 deletions src/main/java/net/spy/memcached/CASMutation.java
@@ -0,0 +1,15 @@
package net.spy.memcached;

/**
* Defines a mutation mechanism for a high-level CAS client interface.
*/
public interface CASMutation<T> {

/**
* Get the new value to replace the current value.
*
* @param current the current value in the cache
* @return the replacement value
*/
T getNewValue(T current);
}
91 changes: 91 additions & 0 deletions src/main/java/net/spy/memcached/CASMutator.java
@@ -0,0 +1,91 @@
package net.spy.memcached;

import net.spy.SpyObject;

/**
* Object that provides mutation via CAS over a given memcache client.
*
* <p>Example usage (reinventing incr):</p>
*
* <pre>
* // Get or create a client.
* MemcachedClient client=[...];
*
* // Get a mutator instance that uses that client.
* CASMutator&lt;Long&gt; mutator=new CASMutator&lt;Long&gt;(client);
*
* // Get a mutation that knows what to do when a value is found.
* CASMutation&lt;Long&gt; mutation=new CASMutation&lt;Long&gt;() {
* public Long getNewValue(Long current) {
* return current + 1;
* }
* };
*
* // Do a mutation.
* long currentValue=mutator.cas(someKey, 0L, 0, mutation);
* </pre>
*/
public class CASMutator<T> extends SpyObject {

private final MemcachedClient client;

/**
* Construct a CASMutator that uses the given client.
*
* @param c the client
*/
public CASMutator(MemcachedClient c) {
super();
client=c;
}

/**
* CAS a new value in for a key.
*
* @param key the key to be CASed
* @param initial the value to use when the object is not cached
* @param initialExp the expiration time to use when initializing
* @param m the mutation to perform on an object if a value exists for the
* key
* @return the new value that was set
*/
public T cas(final String key, final T initial, long initialExp,
final CASMutation<T> m) throws Exception {
T rv=initial;

boolean done=false;
while(!done) {
CASValue casval=client.gets(key);
T current=null;
// If there were a CAS value, check to see if it's compatible.
if(casval != null) {
@SuppressWarnings("unchecked")
T tmp = (T)casval.getValue();
current=tmp;
}
// If we have anything mutate and CAS, else add.
if(current != null) {
rv=m.getNewValue(current);
// There are three possibilities here:
// 1) It worked and we're done.
// 2) It collided and we need to reload and try again.
// 3) It disappeared between our fetch and our cas.
// We're ignoring #3 because it's *extremely* unlikely and the
// behavior will be fine in this code -- we'll do another gets
// and follow it up with either an add or another cas depending
// on whether it exists the next time.
if(client.cas(key, casval.getCas(), rv) == CASResponse.OK) {
done=true;
}
} else {
// No value found, try an add.
if(client.add(key, 0, initial).get()) {
done=true;
rv=initial;
}
}
}

return rv;
}
}
23 changes: 23 additions & 0 deletions src/main/java/net/spy/memcached/CASResponse.java
@@ -0,0 +1,23 @@
package net.spy.memcached;

/**
* Response codes for a CAS operation.
*/
public enum CASResponse {
/**
* Status indicating that the CAS was successful and the new value is
* stored in the cache.
*/
OK,
/**
* Status indicating the value was not found in the cache (an add
* operation may be issued to store the value).
*/
NOT_FOUND,
/**
* Status indicating the value was found in the cache, but exists with a
* different CAS value than expected. In this case, the value must be
* refetched and the CAS operation tried again.
*/
EXISTS
}
36 changes: 36 additions & 0 deletions src/main/java/net/spy/memcached/CASValue.java
@@ -0,0 +1,36 @@
package net.spy.memcached;

/**
* A value with a CAS identifier.
*/
public class CASValue {
private final long cas;
private final Object value;

/**
* Construct a new CASValue with the given identifer and value.
*
* @param c the CAS identifier
* @param v the value
*/
public CASValue(long c, Object v) {
super();
cas=c;
value=v;
}

/**
* Get the CAS identifier.
*/
public long getCas() {
return cas;
}

/**
* Get the object value.
*/
public Object getValue() {
return value;
}

}
102 changes: 102 additions & 0 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Expand Up @@ -23,8 +23,11 @@
import java.util.concurrent.atomic.AtomicReference;

import net.spy.SpyThread;
import net.spy.memcached.ops.CASOperationStatus;
import net.spy.memcached.ops.CancelledOperationStatus;
import net.spy.memcached.ops.DeleteOperation;
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.GetsOperation;
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
Expand Down Expand Up @@ -216,6 +219,57 @@ public void complete() {
return rv;
}

/**
* Asynchronous CAS operation.
*
* @param key the key
* @param casId the CAS identifier (from a gets operation)
* @param value the new value
* @return a future that will indicate the status of the CAS
*/
public Future<CASResponse> asyncCAS(String key, long casId, Object value) {
CachedData co=transcoder.encode(value);
final CountDownLatch latch=new CountDownLatch(1);
final OperationFuture<CASResponse> rv=new OperationFuture<CASResponse>(
latch);
Operation op=opFact.cas(key, casId, co.getFlags(),
co.getData(), new OperationCallback() {
public void receivedStatus(OperationStatus val) {
if(val instanceof CASOperationStatus) {
rv.set(((CASOperationStatus)val).getCASResponse());
} else if(val instanceof CancelledOperationStatus) {
// Cancelled, ignore and let it float up
} else {
throw new RuntimeException(
"Unhandled state: " + val);
}
}
public void complete() {
latch.countDown();
}});
rv.setOperation(op);
addOp(key, op);
return rv;
}

/**
* Perform a synchronous CAS operation.
*
* @param key the key
* @param casId the CAS identifier (from a gets operation)
* @param value the new value
* @return a CASResponse
*/
public CASResponse cas(String key, long casId, Object value) {
try {
return asyncCAS(key, casId, value).get();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
throw new RuntimeException("Exception waiting for value", e);
}
}

/**
* Add an object to the cache iff it does not exist already.
*
Expand Down Expand Up @@ -333,6 +387,54 @@ public void complete() {
return rv;
}

/**
* Gets (with CAS support) the given key asynchronously.
*
* @param key the key to fetch
* @return a future that will hold the return value of the fetch
*/
public Future<CASValue> asyncGets(final String key) {

final CountDownLatch latch=new CountDownLatch(1);
final OperationFuture<CASValue> rv=
new OperationFuture<CASValue>(latch);

Operation op=opFact.gets(key,
new GetsOperation.Callback() {
private CASValue val=null;
public void receivedStatus(OperationStatus status) {
rv.set(val);
}
public void gotData(String k, int flags, long cas, byte[] data) {
assert key.equals(k) : "Wrong key returned";
assert cas > 0 : "CAS was less than zero: " + cas;
val=new CASValue(cas,
transcoder.decode(new CachedData(flags, data)));
}
public void complete() {
latch.countDown();
}});
rv.setOperation(op);
addOp(key, op);
return rv;
}

/**
* Gets (with CAS support) with a single key.
*
* @param key the key to get
* @return the result from the cache and CAS id (null if there is none)
*/
public CASValue gets(String key) {
try {
return asyncGets(key).get();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for value", e);
} catch (ExecutionException e) {
throw new RuntimeException("Exception waiting for value", e);
}
}

/**
* Get with a single key.
*
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/net/spy/memcached/OperationFactory.java
Expand Up @@ -2,9 +2,11 @@

import java.util.Collection;

import net.spy.memcached.ops.CASOperation;
import net.spy.memcached.ops.DeleteOperation;
import net.spy.memcached.ops.FlushOperation;
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.GetsOperation;
import net.spy.memcached.ops.MutatatorOperation;
import net.spy.memcached.ops.Mutator;
import net.spy.memcached.ops.NoopOperation;
Expand Down Expand Up @@ -56,6 +58,16 @@ DeleteOperation delete(String key, int when,
*/
GetOperation get(String key, GetOperation.Callback callback);

/**
* Create a gets operation.
*
* @param key the key to get
* @param callback the callback that will contain the results
* @return a new GetsOperation
*/
GetsOperation gets(String key, GetsOperation.Callback callback);


/**
* Create a get operation.
*
Expand Down Expand Up @@ -102,6 +114,20 @@ MutatatorOperation mutate(Mutator m, String key, int by,
StoreOperation store(StoreType storeType, String key, int flags, int exp,
byte[] data, OperationCallback cb);

/**
* Create a CAS operation.
*
* @param key the key to store
* @param casId the CAS identifier value (from a gets operation)
* @param flags the storage flags
* @param exp the expiration time
* @param data the data
* @param cb the status callback
* @return the new store operation
*/
CASOperation cas(String key, long casId, int flags, byte[] data,
OperationCallback cb);

/**
* Create a new version operation.
*/
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/net/spy/memcached/ops/CASOperation.java
@@ -0,0 +1,8 @@
package net.spy.memcached.ops;

/**
* Operation that represents compare-and-swap.
*/
public interface CASOperation extends Operation {
// nothing
}
24 changes: 24 additions & 0 deletions src/main/java/net/spy/memcached/ops/CASOperationStatus.java
@@ -0,0 +1,24 @@
package net.spy.memcached.ops;

import net.spy.memcached.CASResponse;

/**
* OperationStatus subclass for indicating CAS status.
*/
public class CASOperationStatus extends OperationStatus {

private final CASResponse casResponse;

public CASOperationStatus(boolean success, String msg, CASResponse cres) {
super(success, msg);
casResponse=cres;
}

/**
* Get the CAS response indicated here.
*/
public CASResponse getCASResponse() {
return casResponse;
}

}
12 changes: 12 additions & 0 deletions src/main/java/net/spy/memcached/ops/CancelledOperationStatus.java
@@ -0,0 +1,12 @@
package net.spy.memcached.ops;

/**
* Operation status indicating an operation was cancelled.
*/
public class CancelledOperationStatus extends OperationStatus {

public CancelledOperationStatus() {
super(false, "cancelled");
}

}

0 comments on commit 3158eac

Please sign in to comment.