Skip to content

Commit

Permalink
Handle operations that are writing and reading at the same time.
Browse files Browse the repository at this point in the history
Especially in bulk cases, the server can be transmitting data from an
operation and receiving results from it simultaneously.  It's no
longer sufficient to consider an operation to be either reading or
writing, but it's likely that it will need to read at the very moment
it begins writing.
  • Loading branch information
dustin committed Oct 27, 2009
1 parent 0888741 commit 32762f8
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 6 deletions.
Expand Up @@ -77,7 +77,6 @@ public final OperationState getState() {
}

public final ByteBuffer getBuffer() {
assert cmd != null : "No output buffer.";
return cmd;
}

Expand Down
26 changes: 21 additions & 5 deletions src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
Expand Up @@ -83,14 +83,22 @@ public final void setupResend() {
// First, reset the current write op.
Operation op=getCurrentWriteOp();
if(op != null) {
op.getBuffer().reset();
ByteBuffer buf=op.getBuffer();
if(buf != null) {
buf.reset();
} else {
getLogger().info("No buffer for current write op, removing");
removeCurrentWriteOp();
}
}
// Now cancel all the pending read operations. Might be better to
// to requeue them.
while(hasReadOp()) {
op=removeCurrentReadOp();
getLogger().warn("Discarding partially completed op: %s", op);
op.cancel();
if (op != getCurrentWriteOp()) {
getLogger().warn("Discarding partially completed op: %s", op);
op.cancel();
}
}

getWbuf().clear();
Expand Down Expand Up @@ -123,7 +131,16 @@ public final void fillWriteBuffer(boolean shouldOptimize) {
Operation o=getCurrentWriteOp();
while(o != null && toWrite < getWbuf().capacity()) {
assert o.getState() == OperationState.WRITING;
// This isn't the most optimal way to do this, but it hints
// at a larger design problem that may need to be taken care
// if in the bowels of the client.
// In practice, readQ should be small, however.
if(!readQ.contains(o)) {
readQ.add(o);
}

ByteBuffer obuf=o.getBuffer();
assert obuf != null : "Didn't get a write buffer from " + o;
int bytesToCopy=Math.min(getWbuf().remaining(),
obuf.remaining());
byte b[]=new byte[bytesToCopy];
Expand Down Expand Up @@ -161,8 +178,7 @@ public final void fillWriteBuffer(boolean shouldOptimize) {
public final void transitionWriteItem() {
Operation op=removeCurrentWriteOp();
assert op != null : "There is no write item to transition";
getLogger().debug("Transitioning %s to read", op);
readQ.add(op);
getLogger().debug("Finished writing %s", op);
}

/* (non-Javadoc)
Expand Down
Expand Up @@ -126,6 +126,7 @@ protected void finishedPayload(byte[] pl) throws IOException {
transitionState(OperationState.COMPLETE);
} else {
OperationCallback cb = callbacks.remove(responseOpaque);
assert cb != null : "No callback for " + responseOpaque;
assert errorCode != 0 : "Got no error on a quiet mutation.";
OperationStatus status=getStatusForErrorCode(errorCode, pl);
assert status != null : "Got no status for a quiet mutation error";
Expand Down
@@ -0,0 +1,69 @@
package net.spy.memcached.test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Random;

import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.ConnectionFactoryBuilder.Protocol;
import net.spy.memcached.compat.SpyObject;
import net.spy.memcached.util.CacheLoader;

/**
* Test a multiget that's sufficiently large as to get data before the
* transision to read.
*
* Note that this is in manual tests currently because, while it predictably
* demonstrates the problems, I don't believe it generally demonstrates good
* behavior for a unit test.
*/
public class ExcessivelyLargeGetTest extends SpyObject implements Runnable {

// How many keys to do
private static final int N = 25000;

private final MemcachedClient client;
private final Collection<String> keys;
private final byte[] value = new byte[4096];

public ExcessivelyLargeGetTest() throws Exception {
client = new MemcachedClient(new ConnectionFactoryBuilder()
.setProtocol(Protocol.BINARY)
.setOpTimeout(15000)
.build(),
AddrUtil.getAddresses("127.0.0.1:11211"));
keys = new ArrayList<String>(N);
new Random().nextBytes(value);
}

public void run() {
int nullKey = 0;
// Load up a bunch of data.
CacheLoader cl = new CacheLoader(client);
for(int i = 0; i<N; i++) {
String k = "multi." + i;
keys.add(k);
cl.push(k, value);
}

Map<String, Object> got = client.getBulk(keys);
for(String k : keys) {
if(got.containsKey(k)) {
assert Arrays.equals(value, (byte[])got.get(k))
: "Incorrect result at " + k;
} else {
nullKey++;
}
}
System.out.println("Fetched " + got.size() + "/" + keys.size()
+ " (" + nullKey + " were null)");
}

public static void main(String[] args) throws Exception {
new ExcessivelyLargeGetTest().run();
}
}

0 comments on commit 32762f8

Please sign in to comment.