Permalink
Browse files

Additional refactoring and code prep before making connection checkou…

…t async.

src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorPool.java

Substantial refactoring and preparation of
ClientRequestExecutorPool to actually do an asynchronous
checkout. I believe I have essentially preserved prior
behavior. I.e., checkout of destination still blocks. But, the
pattern now looks a lot more like what is needed to do the
checkout asyncrhnously and then callback with the checkedout
resource. The exact exception strings likely changed during this
refactoring (but control flow should be the same).

src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java

Skeleton of new class QueuedKeyedResourcePool that extends
KeyedResourcePool.
  • Loading branch information...
1 parent 2d62421 commit 51ab567d1d9e61c3da9c6e400d7bd636a24228af @jayjwylie jayjwylie committed Aug 24, 2012
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -40,6 +41,7 @@
import voldemort.utils.Time;
import voldemort.utils.Utils;
import voldemort.utils.pool.KeyedResourcePool;
+import voldemort.utils.pool.QueuedKeyedResourcePool;
import voldemort.utils.pool.ResourcePoolConfig;
/**
@@ -55,7 +57,7 @@
public class ClientRequestExecutorPool implements SocketStoreFactory {
- private final KeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
+ private final QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
private final ClientRequestExecutorFactory factory;
private final ClientSocketStats stats;
private final boolean jmxEnabled;
@@ -93,7 +95,8 @@ public ClientRequestExecutorPool(int selectors,
socketBufferSize,
socketKeepAlive,
stats);
- this.pool = new KeyedResourcePool<SocketDestination, ClientRequestExecutor>(factory, config);
+ this.pool = new QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor>(factory,
+ config);
if(stats != null) {
this.stats.setPool(pool);
}
@@ -210,105 +213,95 @@ public ClientSocketStats getStats() {
return stats;
}
+ public <T> void submitAsync(SocketDestination destination,
+ ClientRequest<T> delegate,
+ NonblockingStoreCallback callback,
+ long timeoutMs,
+ String operationName) {
+ AsyncRequest<T> asyncRequest = new AsyncRequest<T>(destination,
+ delegate,
+ callback,
+ timeoutMs,
+ operationName);
+ pool.requestResource(destination, asyncRequest);
+ return;
+ }
+
/**
- * Context necessary to setup async request for SocketDestination
- * destination and tracking details of this request.
+ * Wrap up an asynchronous request and actually issue it once a
+ * SocketDestination is checked out.
*/
- private static class AsyncRequestContext<T> {
+ private class AsyncRequest<T> implements
+ QueuedKeyedResourcePool.ResourceRequest<ClientRequestExecutor> {
+ private final SocketDestination destination;
public final ClientRequest<T> delegate;
public final NonblockingStoreCallback callback;
public final long timeoutMs;
public final String operationName;
- // private final long startTimeNs;
- private ClientRequestExecutor clientRequestExecutor;
+ private final long startTimeNs;
- public AsyncRequestContext(ClientRequest<T> delegate,
- NonblockingStoreCallback callback,
- long timeoutMs,
- String operationName) {
+ public AsyncRequest(SocketDestination destination,
+ ClientRequest<T> delegate,
+ NonblockingStoreCallback callback,
+ long timeoutMs,
+ String operationName) {
+ this.destination = destination;
this.delegate = delegate;
this.callback = callback;
this.timeoutMs = timeoutMs;
this.operationName = operationName;
- // this.startTimeNs = System.nanoTime();
- this.clientRequestExecutor = null;
+ this.startTimeNs = System.nanoTime();
}
- }
-
- public <T> void submitAsync(SocketDestination destination,
- ClientRequest<T> delegate,
- NonblockingStoreCallback callback,
- long timeoutMs,
- String operationName) {
- AsyncRequestContext<T> requestContext = new AsyncRequestContext<T>(delegate,
- callback,
- timeoutMs,
- operationName);
-
- submitAsyncRequest(destination, requestContext);
- return;
- }
-
- /**
- * Actually submit the enqueued async request with the checked out
- * destination.
- */
- private <T> void submitAsyncRequest(SocketDestination destination,
- AsyncRequestContext<T> context) {
- try {
- context.clientRequestExecutor = pool.checkout(destination);
-
+ public void useResource(ClientRequestExecutor clientRequestExecutor) {
if(logger.isDebugEnabled()) {
logger.debug("Async request start; type: "
- + context.operationName
+ + operationName
+ " requestRef: "
- + System.identityHashCode(context.delegate)
+ + System.identityHashCode(delegate)
+ " time: "
+ // TODO: output startTimeNs instead?
+ System.currentTimeMillis()
+ " server: "
- + context.clientRequestExecutor.getSocketChannel()
- .socket()
- .getRemoteSocketAddress()
- + " local socket: "
- + context.clientRequestExecutor.getSocketChannel()
- .socket()
- .getLocalAddress()
+ + clientRequestExecutor.getSocketChannel()
+ .socket()
+ .getRemoteSocketAddress() + " local socket: "
+ + clientRequestExecutor.getSocketChannel().socket().getLocalAddress()
+ ":"
- + context.clientRequestExecutor.getSocketChannel()
- .socket()
- .getLocalPort());
+ + clientRequestExecutor.getSocketChannel().socket().getLocalPort());
}
- } catch(Exception e) {
- // If we can't check out a socket from the pool, we'll usually get
- // either an IOException (subclass) or an UnreachableStoreException
- // error. However, in the case of asynchronous calls, we want the
- // error to be reported via our callback, not returned to the caller
- // directly.
+ NonblockingStoreCallbackClientRequest<T> clientRequest = new NonblockingStoreCallbackClientRequest<T>(destination,
+ delegate,
+ clientRequestExecutor,
+ callback);
+ clientRequestExecutor.addClientRequest(clientRequest, timeoutMs);
+ }
+
+ public void handleTimeout() {
+ long durationNs = System.nanoTime() - startTimeNs;
+ handleException(new TimeoutException("Could not acquire resource in " + timeoutMs
+ + " ms. (Took " + durationNs + " ns.)"));
+ }
+
+ public void handleException(Exception e) {
if(!(e instanceof UnreachableStoreException))
- e = new UnreachableStoreException("Failure in " + context.operationName + ": "
+ e = new UnreachableStoreException("Failure in " + operationName + ": "
+ e.getMessage(), e);
-
try {
- context.callback.requestComplete(e, 0);
+ callback.requestComplete(e, 0);
} catch(Exception ex) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(ex, ex);
}
-
- return;
}
- NonblockingStoreCallbackClientRequest<T> clientRequest = new NonblockingStoreCallbackClientRequest<T>(destination,
- context.delegate,
- context.clientRequestExecutor,
- context.callback);
- context.clientRequestExecutor.addClientRequest(clientRequest, context.timeoutMs);
- return;
+ public long getDeadlineNs() {
+ return startTimeNs + TimeUnit.MILLISECONDS.toNanos(timeoutMs);
+ }
}
private class NonblockingStoreCallbackClientRequest<T> implements ClientRequest<T> {
Oops, something went wrong.

0 comments on commit 51ab567

Please sign in to comment.