Skip to content

Commit

Permalink
Consolidate the callback interfaces so there's basically just one.
Browse files Browse the repository at this point in the history
  • Loading branch information
dustin committed Jul 26, 2006
1 parent 45dcacf commit fe81635
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 50 deletions.
24 changes: 11 additions & 13 deletions src/java/net/spy/memcached/MemcachedClient.java
Expand Up @@ -25,6 +25,7 @@
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.MutatorOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.StatsOperation;
import net.spy.memcached.ops.StoreOperation;
import net.spy.memcached.ops.VersionOperation;
Expand Down Expand Up @@ -120,8 +121,8 @@ private Future<String> asyncStore(StoreOperation.StoreType storeType,
final SynchronizationObject<String> sync
=new SynchronizationObject<String>(null);
Operation op=new StoreOperation(storeType, key, co.getFlags(), exp,
co.getData(), new StoreOperation.Callback() {
public void storeResult(String val) {
co.getData(), new OperationCallback() {
public void receivedStatus(String val) {
sync.set(val);
}});
OperationFuture<String> rv=new OperationFuture<String>(sync, op);
Expand Down Expand Up @@ -183,7 +184,7 @@ public Future<Object> asyncGet(final String key) {

Operation op=new GetOperation(key, new GetOperation.Callback() {
private Object val=null;
public void getComplete() {
public void receivedStatus(String line) {
sync.set(val);
}
public void gotData(String k, int flags, byte[] data) {
Expand Down Expand Up @@ -235,7 +236,7 @@ public Future<Map<String, Object>> asyncGetBulk(Collection<String> keys) {
final SynchronizationObject<AtomicInteger> sync
=new SynchronizationObject<AtomicInteger>(requests);
GetOperation.Callback cb=new GetOperation.Callback() {
public void getComplete() {
public void receivedStatus(String line) {
requests.decrementAndGet();
sync.set(requests);
}
Expand Down Expand Up @@ -302,8 +303,8 @@ public Map<SocketAddress, String> getVersions() {
final SocketAddress sa=conn.getAddressOf(i);
ai.incrementAndGet();
addOp(i, new VersionOperation(
new VersionOperation.Callback() {
public void versionResult(String s) {
new OperationCallback() {
public void receivedStatus(String s) {
rv.put(sa, s);
ai.decrementAndGet();
sync.set(ai);
Expand Down Expand Up @@ -331,7 +332,7 @@ public Map<SocketAddress, Map<String, String>> getStats() {
public void gotStat(String name, String val) {
rv.get(sa).put(name, val);
}
public void statsComplete() {
public void receivedStatus(String line) {
todo.decrementAndGet();
sync.set(todo);
}}));
Expand All @@ -344,12 +345,9 @@ private long mutate(MutatorOperation.Mutator m, String key, int by) {
final SynchronizationObject<Long> sync=
new SynchronizationObject<Long>(null);
addOp(getServerForKey(key), new MutatorOperation(m, key, by,
new MutatorOperation.Callback() {
public void mutatorResult(Long val) {
if(val == null) {
val=new Long(-1);
}
sync.set(val);
new OperationCallback() {
public void receivedStatus(String val) {
sync.set(new Long(val==null?"-1":val));
}}));
try {
sync.waitUntilNotNull(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
Expand Down
3 changes: 3 additions & 0 deletions src/java/net/spy/memcached/ops/DeleteOperation.java
Expand Up @@ -5,6 +5,9 @@

import java.nio.ByteBuffer;

/**
* Operation to delete an item from the cache.
*/
public class DeleteOperation extends Operation {

private static final int OVERHEAD=32;
Expand Down
15 changes: 12 additions & 3 deletions src/java/net/spy/memcached/ops/GetOperation.java
Expand Up @@ -35,7 +35,7 @@ public void handleLine(String line) {
if(line.equals("END")) {
getLogger().debug("Get complete!");
if(cb != null) {
cb.getComplete();
cb.receivedStatus(line);
}
transitionState(State.COMPLETE);
data=null;
Expand Down Expand Up @@ -107,8 +107,17 @@ public void initialize() {
keys=null;
}

public interface Callback {
/**
* Operation callback for the get request.
*/
public interface Callback extends OperationCallback {
/**
* Callback for each result from a get.
*
* @param key the key that was retrieved
* @param flags the flags for this value
* @param data the data stored under this key
*/
void gotData(String key, int flags, byte[] data);
void getComplete();
}
}
28 changes: 18 additions & 10 deletions src/java/net/spy/memcached/ops/MutatorOperation.java
Expand Up @@ -12,14 +12,26 @@ public class MutatorOperation extends Operation {

public static final int OVERHEAD=32;

public enum Mutator { incr, decr }
/**
* Type of mutation to perform.
*/
public enum Mutator {
/**
* Increment a value on the memcached server.
*/
incr,
/**
* Decrement a value on the memcached server.
*/
decr
}

private Mutator mutator=null;
private String key=null;
private int amount=0;
private Callback cb=null;
private OperationCallback cb=null;

public MutatorOperation(Mutator m, String k, int amt, Callback c) {
public MutatorOperation(Mutator m, String k, int amt, OperationCallback c) {
super();
mutator=m;
key=k;
Expand All @@ -30,12 +42,12 @@ public MutatorOperation(Mutator m, String k, int amt, Callback c) {
@Override
public void handleLine(String line) {
getLogger().debug("Result: %s", line);
Long found=null;
String found=null;
if(!line.equals("NOT_FOUND")) {
found=new Long(line);
found=line;
}
if(cb != null) {
cb.mutatorResult(found);
cb.receivedStatus(found);
}
transitionState(State.COMPLETE);
}
Expand All @@ -49,8 +61,4 @@ public void initialize() {
setBuffer(b);
}

public interface Callback {
void mutatorResult(Long val);
}

}
32 changes: 30 additions & 2 deletions src/java/net/spy/memcached/ops/Operation.java
Expand Up @@ -11,8 +11,36 @@
* Operations on a memcached connection.
*/
public abstract class Operation extends SpyObject {
public enum State { WRITING, READING, COMPLETE }
public enum ReadType { LINE, DATA }
/**
* State of this operation.
*/
public enum State {
/**
* State indicating this operation is writing data to the server.
*/
WRITING,
/**
* State indicating this operation is reading data from the server.
*/
READING,
/**
* State indicating this operation is complete.
*/
COMPLETE
}
/**
* Data read types.
*/
public enum ReadType {
/**
* Read type indicating an operation currently wants to read lines.
*/
LINE,
/**
* Read type indicating an operation currently wants to read raw data.
*/
DATA
}

private State state=State.WRITING;
private ReadType readType=ReadType.LINE;
Expand Down
17 changes: 17 additions & 0 deletions src/java/net/spy/memcached/ops/OperationCallback.java
@@ -0,0 +1,17 @@
// Copyright (c) 2006 Dustin Sallings <dustin@spy.net>
// arch-tag: 47B3D166-3964-49D4-9B4C-A46B749B12A5

package net.spy.memcached.ops;

/**
* Callback that's invoked with the response of an operation.
*/
public interface OperationCallback {

/**
* Method invoked with the status when the operation is complete.
*
* @param line the line containing the final status of the operation
*/
void receivedStatus(String line);
}
17 changes: 14 additions & 3 deletions src/java/net/spy/memcached/ops/StatsOperation.java
Expand Up @@ -5,6 +5,9 @@

import java.nio.ByteBuffer;

/**
* Operation to retrieve statistics from a memcached server.
*/
public class StatsOperation extends Operation {

private static final byte[] MSG="stats\r\n".getBytes();
Expand All @@ -20,7 +23,7 @@ public StatsOperation(Callback c) {
public void handleLine(String line) {
if(line.equals("END")) {
if(cb != null) {
cb.statsComplete();
cb.receivedStatus(line);
}
transitionState(State.COMPLETE);
} else {
Expand All @@ -37,8 +40,16 @@ public void initialize() {
setBuffer(ByteBuffer.wrap(MSG));
}

public interface Callback {
void statsComplete();
/**
* Callback for stats operation.
*/
public interface Callback extends OperationCallback {
/**
* Invoked once for every stat returned from the server.
*
* @param name the name of the stat
* @param val the stat value.
*/
void gotStat(String name, String val);
}

Expand Down
38 changes: 26 additions & 12 deletions src/java/net/spy/memcached/ops/StoreOperation.java
Expand Up @@ -5,9 +5,30 @@

import java.nio.ByteBuffer;

/**
* Operation to store data in a memcached server.
*/
public class StoreOperation extends Operation {

public enum StoreType { set, add, replace }
/**
* The type of storage operation to perform.
*/
public enum StoreType {
/**
* Unconditionally store a value in the cache.
*/
set,
/**
* Store a value in the cache iff there is not already something stored
* for the given key.
*/
add,
/**
* Store a value in the cache iff there is already something stored for
* the given key.
*/
replace
}

// Overhead storage stuff to make sure the buffer pushes out far enough.
private static final int OVERHEAD = 32;
Expand All @@ -17,10 +38,10 @@ public enum StoreType { set, add, replace }
private int flags=0;
private int exp=0;
private byte[] data=null;
private Callback cb=null;
private OperationCallback cb=null;

public StoreOperation(StoreType t, String key, int flags, int exp,
byte[] data, Callback callback) {
byte[] data, OperationCallback callback) {
super();
this.type=t;
this.key=key;
Expand All @@ -31,9 +52,9 @@ public StoreOperation(StoreType t, String key, int flags, int exp,
}

@Override
public void handleLine(String firstLine) {
public void handleLine(String line) {
if(cb != null) {
cb.storeResult(firstLine);
cb.receivedStatus(line);
}
transitionState(State.COMPLETE);
}
Expand All @@ -52,11 +73,4 @@ public void initialize() {
setBuffer(bb);
}

public interface Callback {

/**
* Report the result of an asynchronous callback.
*/
public void storeResult(String val);
}
}
10 changes: 3 additions & 7 deletions src/java/net/spy/memcached/ops/VersionOperation.java
Expand Up @@ -12,9 +12,9 @@ public class VersionOperation extends Operation {

private static final byte[] REQUEST="version\r\n".getBytes();

private Callback cb=null;
private OperationCallback cb=null;

public VersionOperation(Callback c) {
public VersionOperation(OperationCallback c) {
super();
cb=c;
}
Expand All @@ -23,7 +23,7 @@ public VersionOperation(Callback c) {
public void handleLine(String line) {
if(cb != null) {
assert line.startsWith("VERSION ");
cb.versionResult(line.substring("VERSION ".length()));
cb.receivedStatus(line.substring("VERSION ".length()));
}
transitionState(State.COMPLETE);
}
Expand All @@ -33,8 +33,4 @@ public void initialize() {
setBuffer(ByteBuffer.wrap(REQUEST));
}

public interface Callback {
public void versionResult(String s);
}

}

0 comments on commit fe81635

Please sign in to comment.