Skip to content

Commit

Permalink
Additional refactoring and code prep before making connection checkou…
Browse files Browse the repository at this point in the history
…t async.

src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorPool.java

Substantial refactoring and preparation of
ClientRequestExecutorPool to actually do an asynchronous
checkout. I believe I have essentially preserved prior
behavior. I.e., checkout of destination still blocks. But, the
pattern now looks a lot more like what is needed to do the
checkout asyncrhnously and then callback with the checkedout
resource. The exact exception strings likely changed during this
refactoring (but control flow should be the same).

src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java

Skeleton of new class QueuedKeyedResourcePool that extends
KeyedResourcePool.
  • Loading branch information
jayjwylie committed Oct 9, 2012
1 parent 2d62421 commit 51ab567
Show file tree
Hide file tree
Showing 2 changed files with 319 additions and 67 deletions.
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
Expand All @@ -40,6 +41,7 @@
import voldemort.utils.Time;
import voldemort.utils.Utils;
import voldemort.utils.pool.KeyedResourcePool;
import voldemort.utils.pool.QueuedKeyedResourcePool;
import voldemort.utils.pool.ResourcePoolConfig;

/**
Expand All @@ -55,7 +57,7 @@

public class ClientRequestExecutorPool implements SocketStoreFactory {

private final KeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
private final QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
private final ClientRequestExecutorFactory factory;
private final ClientSocketStats stats;
private final boolean jmxEnabled;
Expand Down Expand Up @@ -93,7 +95,8 @@ public ClientRequestExecutorPool(int selectors,
socketBufferSize,
socketKeepAlive,
stats);
this.pool = new KeyedResourcePool<SocketDestination, ClientRequestExecutor>(factory, config);
this.pool = new QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor>(factory,
config);
if(stats != null) {
this.stats.setPool(pool);
}
Expand Down Expand Up @@ -210,105 +213,95 @@ public ClientSocketStats getStats() {
return stats;
}

public <T> void submitAsync(SocketDestination destination,
ClientRequest<T> delegate,
NonblockingStoreCallback callback,
long timeoutMs,
String operationName) {
AsyncRequest<T> asyncRequest = new AsyncRequest<T>(destination,
delegate,
callback,
timeoutMs,
operationName);
pool.requestResource(destination, asyncRequest);
return;
}

/**
* Context necessary to setup async request for SocketDestination
* destination and tracking details of this request.
* Wrap up an asynchronous request and actually issue it once a
* SocketDestination is checked out.
*/
private static class AsyncRequestContext<T> {
private class AsyncRequest<T> implements
QueuedKeyedResourcePool.ResourceRequest<ClientRequestExecutor> {

private final SocketDestination destination;
public final ClientRequest<T> delegate;
public final NonblockingStoreCallback callback;
public final long timeoutMs;
public final String operationName;

// private final long startTimeNs;
private ClientRequestExecutor clientRequestExecutor;
private final long startTimeNs;

public AsyncRequestContext(ClientRequest<T> delegate,
NonblockingStoreCallback callback,
long timeoutMs,
String operationName) {
public AsyncRequest(SocketDestination destination,
ClientRequest<T> delegate,
NonblockingStoreCallback callback,
long timeoutMs,
String operationName) {
this.destination = destination;
this.delegate = delegate;
this.callback = callback;
this.timeoutMs = timeoutMs;
this.operationName = operationName;

// this.startTimeNs = System.nanoTime();
this.clientRequestExecutor = null;
this.startTimeNs = System.nanoTime();
}

}

public <T> void submitAsync(SocketDestination destination,
ClientRequest<T> delegate,
NonblockingStoreCallback callback,
long timeoutMs,
String operationName) {
AsyncRequestContext<T> requestContext = new AsyncRequestContext<T>(delegate,
callback,
timeoutMs,
operationName);

submitAsyncRequest(destination, requestContext);
return;
}

/**
* Actually submit the enqueued async request with the checked out
* destination.
*/
private <T> void submitAsyncRequest(SocketDestination destination,
AsyncRequestContext<T> context) {
try {
context.clientRequestExecutor = pool.checkout(destination);

public void useResource(ClientRequestExecutor clientRequestExecutor) {
if(logger.isDebugEnabled()) {
logger.debug("Async request start; type: "
+ context.operationName
+ operationName
+ " requestRef: "
+ System.identityHashCode(context.delegate)
+ System.identityHashCode(delegate)
+ " time: "
// TODO: output startTimeNs instead?
+ System.currentTimeMillis()
+ " server: "
+ context.clientRequestExecutor.getSocketChannel()
.socket()
.getRemoteSocketAddress()
+ " local socket: "
+ context.clientRequestExecutor.getSocketChannel()
.socket()
.getLocalAddress()
+ clientRequestExecutor.getSocketChannel()
.socket()
.getRemoteSocketAddress() + " local socket: "
+ clientRequestExecutor.getSocketChannel().socket().getLocalAddress()
+ ":"
+ context.clientRequestExecutor.getSocketChannel()
.socket()
.getLocalPort());
+ clientRequestExecutor.getSocketChannel().socket().getLocalPort());
}

} catch(Exception e) {
// If we can't check out a socket from the pool, we'll usually get
// either an IOException (subclass) or an UnreachableStoreException
// error. However, in the case of asynchronous calls, we want the
// error to be reported via our callback, not returned to the caller
// directly.
NonblockingStoreCallbackClientRequest<T> clientRequest = new NonblockingStoreCallbackClientRequest<T>(destination,
delegate,
clientRequestExecutor,
callback);
clientRequestExecutor.addClientRequest(clientRequest, timeoutMs);
}

public void handleTimeout() {
long durationNs = System.nanoTime() - startTimeNs;
handleException(new TimeoutException("Could not acquire resource in " + timeoutMs
+ " ms. (Took " + durationNs + " ns.)"));
}

public void handleException(Exception e) {
if(!(e instanceof UnreachableStoreException))
e = new UnreachableStoreException("Failure in " + context.operationName + ": "
e = new UnreachableStoreException("Failure in " + operationName + ": "
+ e.getMessage(), e);

try {
context.callback.requestComplete(e, 0);
callback.requestComplete(e, 0);
} catch(Exception ex) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(ex, ex);
}

return;
}

NonblockingStoreCallbackClientRequest<T> clientRequest = new NonblockingStoreCallbackClientRequest<T>(destination,
context.delegate,
context.clientRequestExecutor,
context.callback);
context.clientRequestExecutor.addClientRequest(clientRequest, context.timeoutMs);
return;
public long getDeadlineNs() {
return startTimeNs + TimeUnit.MILLISECONDS.toNanos(timeoutMs);
}
}

private class NonblockingStoreCallbackClientRequest<T> implements ClientRequest<T> {
Expand Down

0 comments on commit 51ab567

Please sign in to comment.