Permalink
Browse files

Use a queue of new entries to write without checking write state.*

This isn't quite right.  There's a race condition between checking the write
queue and going into the select where an item can be added but won't be
checked.
  • Loading branch information...
1 parent 6d2b5ab commit 363a95acf17c2452986581f76fb44af953e12a28 @dustin committed Aug 10, 2006
Showing with 101 additions and 17 deletions.
  1. +1 −0 src/java/net/spy/memcached/MemcachedClient.java
  2. +100 −17 src/java/net/spy/memcached/MemcachedConnection.java
@@ -81,6 +81,7 @@
* Allow mockery.
*/
protected MemcachedClient() {
+ super();
}
/**
@@ -10,12 +10,16 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import net.spy.SpyObject;
import net.spy.memcached.ops.Operation;
@@ -35,10 +39,12 @@
private Selector selector=null;
private QueueAttachment[] connections=null;
private int emptySelects=0;
+ private ConcurrentLinkedQueue<QueueAttachment> addedQueue=null;
private SortedMap<Long, QueueAttachment> reconnectQueue=null;
public MemcachedConnection(InetSocketAddress[] a) throws IOException {
reconnectQueue=new TreeMap<Long, QueueAttachment>();
+ addedQueue=new ConcurrentLinkedQueue<QueueAttachment>();
selector=Selector.open();
connections=new QueueAttachment[a.length];
int cons=0;
@@ -60,15 +66,54 @@ public MemcachedConnection(InetSocketAddress[] a) throws IOException {
}
}
+ private boolean selectorsMakeSense() {
+ for(QueueAttachment qa : connections) {
+ if(qa.sk.isValid()) {
+ int sops=qa.sk.interestOps();
+ if(qa.channel.isConnected()) {
+ Operation op=qa.ops.peek();
+ if(op == null) {
+ assert sops == 0 : "Invalid ops: " + qa;
+ } else {
+ switch(op.getState()) {
+ case READING:
+ assert (sops & SelectionKey.OP_READ) != 0
+ : "Invalid ops: " + qa;
+ break;
+ case WRITING:
+ assert (sops & SelectionKey.OP_WRITE) != 0
+ : "Invalid ops: " + qa;
+ break;
+ case COMPLETE:
+ assert false : "Completed item in queue";
+ }
+ }
+ } else {
+ assert sops == SelectionKey.OP_CONNECT
+ : "Not connected, and not watching for connect.";
+ }
+ }
+ }
+ getLogger().debug("Checked the selectors.");
+ return true;
+ }
+
@SuppressWarnings("unchecked")
public void handleIO() throws IOException {
+
+ // Deal with all of the stuff that's been added, but may not be marked
+ // writable.
+ handleInputQueue();
+ getLogger().debug("Done dealing with queue.");
+
long delay=0;
if(!reconnectQueue.isEmpty()) {
long now=System.currentTimeMillis();
long then=reconnectQueue.firstKey();
delay=Math.max(then-now, 1);
}
getLogger().debug("Selecting with delay of %sms", delay);
+ assert selectorsMakeSense() : "Selectors don't make sense.";
int selected=selector.select(delay);
if(selected > 0) {
Set<SelectionKey> selectedKeys=selector.selectedKeys();
@@ -88,7 +133,7 @@ public void handleIO() throws IOException {
// It's very easy in NIO to write a bug such that your selector
// spins madly. This will catch that and let it break.
getLogger().info("No selectors ready, interrupted: "
- + Thread.interrupted());
+ + Thread.interrupted());
if(++emptySelects > EXCESSIVE_EMPTY) {
for(SelectionKey sk : selector.keys()) {
getLogger().info("%s has %s, interested in %s",
@@ -109,21 +154,48 @@ public void handleIO() throws IOException {
}
}
+ private void handleInputQueue() throws IOException {
+ if(!addedQueue.isEmpty()) {
+ getLogger().debug("Handling queue");
+ // If there's stuff in the added queue. Try to process it.
+ Collection<QueueAttachment> toAdd=new HashSet<QueueAttachment>();
+ try {
+ QueueAttachment qa=null;
+ while((qa=addedQueue.remove()) != null) {
+ if(qa.channel.isConnected()) {
+ Operation op=qa.ops.peek();
+ if(op != null
+ && op.getState() == Operation.State.WRITING) {
+ getLogger().debug(
+ "Handling queued write on %s", qa);
+ handleOperation(op, qa.sk, qa);
+ }
+ } else {
+ toAdd.add(qa);
+ }
+ }
+ } catch(NoSuchElementException e) {
+ // out of stuff.
+ }
+ addedQueue.addAll(toAdd);
+ }
+ }
+
// Handle IO for a specific selector.
private void handleIO(SelectionKey sk) throws IOException {
QueueAttachment qa=(QueueAttachment)sk.attachment();
if(sk.isConnectable()) {
getLogger().info("Connection state changed for %s", sk);
try {
if(qa.channel.finishConnect()) {
+ assert qa.channel.isConnected() : "Not onnected.";
synchronized(qa) {
qa.reconnectAttempt=0;
- if(hasPendingOperations(qa)) {
- sk.interestOps(SelectionKey.OP_WRITE);
- } else {
- sk.interestOps(0);
- }
}
+ sk.interestOps(0);
+ addedQueue.offer(qa);
+ } else {
+ assert !qa.channel.isConnected() : "connected";
}
} catch(IOException e) {
getLogger().warn("Problem handling connect", e);
@@ -169,22 +241,25 @@ private void handleOperation(Operation currentOp, SelectionKey sk,
}
break;
case WRITING:
- if(sk.isReadable()) {
+ boolean needsReconnect=false;
+ if(sk.isValid() && sk.isReadable()) {
+ getLogger().info("Readable in write mode.");
ByteBuffer b=ByteBuffer.allocate(1);
int read=qa.channel.read(b);
- assert read == -1
+ assert read <= 0
: "expected to read -1 bytes, read " + read;
+ needsReconnect=true;
+ }
+ if(needsReconnect) {
queueReconnect(qa);
- } else if(sk.isWritable()) {
+ } else {
ByteBuffer b=currentOp.getBuffer();
- int written=qa.channel.write(b);
+ int wrote=qa.channel.write(b);
getLogger().debug("Wrote %d bytes for %s",
- written, currentOp);
+ wrote, currentOp);
if(b.remaining() == 0) {
currentOp.writeComplete();
}
- } else {
- assert false : "Not readable or writable.";
}
break;
case COMPLETE:
@@ -210,11 +285,15 @@ private void handleOperation(Operation currentOp, SelectionKey sk,
qa.ops.remove();
// If there are more operations in the queue, tell
// it we want to write
+ if(sk.isValid()) {
+ sk.interestOps(0);
+ }
synchronized(qa) {
// After removing the cancelled operations, if there's
// another operation waiting to go, wait for write
if(hasPendingOperations(qa) && sk.isValid()) {
sk.interestOps(SelectionKey.OP_WRITE);
+ addedQueue.offer(qa);
}
}
break;
@@ -274,6 +353,7 @@ private void setupResend(QueueAttachment qa) {
if(op.getState() == Operation.State.WRITING) {
getLogger().warn("Resetting write state of op: %s", op);
op.getBuffer().reset();
+ addedQueue.offer(qa);
} else {
getLogger().warn(
"Discarding partially completed operation: %s", op);
@@ -312,12 +392,11 @@ public void addOperation(int which, Operation o) {
o.initialize();
synchronized(qa) {
qa.ops.add(o);
- // If this is the only operation in the queue, tell the selector
- // we want to write
- if(qa.reconnectAttempt == 0 && qa.ops.size() == 1) {
+ if(qa.ops.size() == 1 && qa.sk.isValid()) {
qa.sk.interestOps(SelectionKey.OP_WRITE);
}
}
+ addedQueue.offer(qa);
selector.wakeup();
getLogger().debug("Added %s to %d", o, which);
}
@@ -363,8 +442,12 @@ public QueueAttachment(SocketAddress sa, SocketChannel c) {
}
public String toString() {
+ int sops=0;
+ if(sk.isValid()) {
+ sops=sk.interestOps();
+ }
return "{QA sa=" + socketAddress + ", #ops=" + ops.size()
- + ", topop=" + ops.peek() + "}";
+ + ", topop=" + ops.peek() + ", interested=" + sops + "}";
}
}
}

0 comments on commit 363a95a

Please sign in to comment.