Skip to content

Commit

Permalink
JCBC-20: ViewConnection blocks when no ops pending
Browse files Browse the repository at this point in the history
When getting the next op in the queue we were using the poll()
function which immediately returns null if the queue is empty. This
immediate return causes the IO thread to spin and consumes a lot of
cpu. What we really want is to block when getting an item from the
queue until one is available. In order to do this we should be using
the take() function.

Change-Id: I9a5bcb4c3852976b55b3162fad513156e94aaaf6
Reviewed-on: http://review.couchbase.org/14959
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Tested-by: Michael Wiederhold <mike@couchbase.com>
  • Loading branch information
Mike Wiederhold committed Jun 29, 2012
1 parent 0af5374 commit e39cd76
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/main/java/com/couchbase/client/ViewConnection.java
Expand Up @@ -186,6 +186,7 @@ public boolean shutdown() throws IOException {
for (ViewNode n : couchNodes) {
if (n != null) {
n.shutdown();
interrupt();
if (n.hasWriteOps()) {
getLogger().warn("Shutting down with ops waiting to be written");
}
Expand Down
51 changes: 28 additions & 23 deletions src/main/java/com/couchbase/client/ViewNode.java
Expand Up @@ -95,31 +95,36 @@ public void run() {

public void doWrites() {
HttpOperation op;
while ((op = writeQ.poll()) != null) {
if (!op.isTimedOut() && !op.isCancelled()) {
AsyncConnectionRequest connRequest = connMgr.requestConnection();
try {
connRequest.waitFor();
} catch (InterruptedException e) {
getLogger().warn(
"Interrupted while trying to get a connection."
+ " Cancelling op");
op.cancel();
return;
}

NHttpClientConnection conn = connRequest.getConnection();
if (conn == null) {
getLogger().error("Failed to obtain connection. Cancelling op");
op.cancel();
} else {
HttpContext context = conn.getContext();
RequestHandle handle = new RequestHandle(connMgr, conn);
context.setAttribute("request-handle", handle);
context.setAttribute("operation", op);
conn.requestOutput();
try {
while ((op = writeQ.take()) != null) {
if (!op.isTimedOut() && !op.isCancelled()) {
AsyncConnectionRequest connRequest = connMgr.requestConnection();
try {
connRequest.waitFor();
} catch (InterruptedException e) {
getLogger().warn(
"Interrupted while trying to get a connection."
+ " Cancelling op");
op.cancel();
return;
}

NHttpClientConnection conn = connRequest.getConnection();
if (conn == null) {
getLogger().error("Failed to obtain connection. Cancelling op");
op.cancel();
} else {
HttpContext context = conn.getContext();
RequestHandle handle = new RequestHandle(connMgr, conn);
context.setAttribute("request-handle", handle);
context.setAttribute("operation", op);
conn.requestOutput();
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
getLogger().info("View connection interupted while waiting for op");
}
}

Expand Down

0 comments on commit e39cd76

Please sign in to comment.