Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial commit of asynchronous checkouts.

src/java/log4j.properties
- print out more concise timestamp (don't print the date)
- print thread at end of message

src/java/voldemort/utils/pool/KeyedResourcePool.java
- reverted all protected data members back to private
- refactored attemptGrow to do the size check to determine if it
  is worth trying to attempt to grow. This made the method more
  useful to subclasses and cleaned up the local member that
  called it.
- added an internalClose method that returns whether or not the
  caller is "the one thread" responsible for closing everything
  down

src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java
- first complete implementation of this class
- Four TODOs left in the code for the sake of interim code review

test/integration/voldemort/nonblocking/E2ENonblockingCheckoutTest.java
- minor tweaks to the test
  • Loading branch information...
commit 3841081d1b789795e538783ae85d5b429fc42877 1 parent 51ab567
@jayjwylie jayjwylie authored
View
5 src/java/log4j.properties
@@ -5,7 +5,8 @@ log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=[%d %c] %p %m %n
+# log4j.appender.stdout.layout.ConversionPattern=[%d %c] %p %m %n
+log4j.appender.stdout.layout.ConversionPattern=[%d{ABSOLUTE} %c] %p %m [%t]%n
# Turn on all our debugging info
log4j.logger=INFO
@@ -16,4 +17,4 @@ log4j.logger.voldemort.server.niosocket=INFO
log4j.logger.voldemort.utils=INFO
log4j.logger.voldemort.client.rebalance=INFO
log4j.logger.voldemort.server=INFO
-log4j.logger.krati=WARN
+log4j.logger.krati=WARN
View
38 src/java/voldemort/utils/pool/KeyedResourcePool.java
@@ -29,10 +29,10 @@
private static final Logger logger = Logger.getLogger(KeyedResourcePool.class.getName());
- protected final ResourceFactory<K, V> objectFactory;
+ private final ResourceFactory<K, V> objectFactory;
private final ConcurrentMap<K, Pool<V>> resourcePoolMap;
- protected final AtomicBoolean isOpen = new AtomicBoolean(true);
- protected final long timeoutNs;
+ private final AtomicBoolean isOpen = new AtomicBoolean(true);
+ private final long timeoutNs;
private final int poolMaxSize;
private final boolean isFair;
@@ -127,10 +127,8 @@ public V checkout(K key) throws Exception {
protected V attemptCheckoutGrowCheckout(K key, Pool<V> pool) throws Exception {
V resource = attemptCheckout(pool);
if(resource == null) {
- if(pool.size.get() < this.poolMaxSize) {
- if(attemptGrow(key, pool)) {
- resource = attemptCheckout(pool);
- }
+ if(attemptGrow(key, pool)) {
+ resource = attemptCheckout(pool);
}
}
@@ -154,6 +152,10 @@ protected V attemptCheckout(Pool<V> pool) throws Exception {
* checkouts may occur.)
*/
protected boolean attemptGrow(K key, Pool<V> pool) throws Exception {
+ if(pool.size.get() >= this.poolMaxSize) {
+ // "fail fast" if not worth trying to grow the pool.
+ return false;
+ }
// attempt to increment, and if the incremented value is less
// than the pool size then create a new resource
if(pool.size.incrementAndGet() <= this.poolMaxSize) {
@@ -231,14 +233,10 @@ public void checkin(K key, V resource) throws Exception {
}
}
- /**
- * Close the pool. This will destroy all checked in resource immediately.
- * Once closed all attempts to checkout a new resource will fail. All
- * resources checked in after close is called will be immediately destroyed.
- */
- public void close() {
+ protected boolean internalClose() {
+ boolean wasOpen = isOpen.compareAndSet(true, false);
// change state to false and allow one thread.
- if(isOpen.compareAndSet(true, false)) {
+ if(wasOpen) {
for(Entry<K, Pool<V>> entry: resourcePoolMap.entrySet()) {
Pool<V> pool = entry.getValue();
// destroy each resource in the queue
@@ -247,6 +245,16 @@ public void close() {
resourcePoolMap.remove(entry.getKey());
}
}
+ return wasOpen;
+ }
+
+ /**
+ * Close the pool. This will destroy all checked in resource immediately.
+ * Once closed all attempts to checkout a new resource will fail. All
+ * resources checked in after close is called will be immediately destroyed.
+ */
+ public void close() {
+ internalClose();
}
/**
@@ -259,7 +267,7 @@ public void close() {
public void close(K key) {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
List<V> list = resourcePool.close();
- // destroy each resource currently in the queue
+ // destroy each resource currently in the pool
for(V value: list)
destroyResource(key, resourcePool, value);
}
View
269 src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java
@@ -8,6 +8,8 @@
import org.apache.log4j.Logger;
+import voldemort.store.UnreachableStoreException;
+
/**
* Extends simple implementation of a per-key resource pool with a non-blocking
* interface to enqueue requests for a resource when one becomes available. <br>
@@ -20,7 +22,8 @@
public interface ResourceRequest<V> {
- // Invoked with checkedout resource; resource guaranteed to be not-null
+ // Invoked with checked out resource; resource guaranteed to be
+ // not-null.
void useResource(V resource);
// Invoked sometime after deadline. Will never invoke useResource.
@@ -29,7 +32,7 @@
// Invoked upon resource pool exception. Will never invoke useResource.
void handleException(Exception e);
- // return deadline (in nanoseconds), after which timeoutCheckout()
+ // Returns deadline (in nanoseconds), after which handleTimeout()
// should be invoked.
long getDeadlineNs();
}
@@ -72,51 +75,105 @@ public QueuedKeyedResourcePool(ResourceFactory<K, V> objectFactory, ResourcePool
}
/**
+ * This method is the asynchronous (nonblocking) version of
+ * KeyedResourcePool.checkout. This method necessarily has a different
+ * function declaration (i.e., arguments passed and return type).
+ *
+ * This method either checks out a resource and uses that resource or
+ * enqueues a request to checkout the resource. I.e., there is a
+ * non-blocking fast-path that is tried optimistically.
*
* @param key The key to checkout the resource for
* @return The resource
+ *
*/
public void requestResource(K key, ResourceRequest<V> resourceRequest) {
- try {
- V resource = super.checkout(key);
- resourceRequest.useResource(resource);
- } catch(Exception e) {
- resourceRequest.handleException(e);
+ checkNotClosed();
+
+ // Non-blocking checkout attempt iff requestQueue is empty. If
+ // requestQueue is not empty and we attempted non-blocking checkout,
+ // then FIFO at risk.
+ Queue<ResourceRequest<V>> requestQueue = getRequestQueueForKey(key);
+ if(requestQueue.isEmpty()) {
+ Pool<V> resourcePool = getResourcePoolForKey(key);
+ V resource = null;
+
+ try {
+ resource = attemptCheckoutGrowCheckout(key, resourcePool);
+ } catch(Exception e) {
+ super.destroyResource(key, resourcePool, resource);
+ resourceRequest.handleException(e);
+ }
+ if(resource != null) {
+ // TODO: Is another try/catch block needed anywhere to ensure
+ // resource is destroyed if/when anything bad happens in
+ // useResource method?
+ resourceRequest.useResource(resource);
+ return;
+ }
}
- /*-
- checkNotClosed();
+ requestQueue.add(resourceRequest);
+ }
- long startNs = System.nanoTime();
- Pool<V> resources = getResourcePoolForKey(key);
+ /**
+ * Pops resource requests off the queue until queue is empty or an unexpired
+ * resource request is found. Invokes .handleTimeout on all expired resource
+ * requests popped off the queue.
+ *
+ * @return null or a valid ResourceRequest
+ */
+ private ResourceRequest<V> getNextUnexpiredResourceRequest(Queue<ResourceRequest<V>> requestQueue) {
+ ResourceRequest<V> resourceRequest = requestQueue.poll();
+ while(resourceRequest != null) {
+ if(resourceRequest.getDeadlineNs() < System.nanoTime()) {
+ resourceRequest.handleTimeout();
+ resourceRequest = requestQueue.poll();
+ } else {
+ break;
+ }
+ }
+ return resourceRequest;
+ }
+ /**
+ * Attempts to checkout a resource so that one queued request can be
+ * serviced.
+ *
+ * @param key The key for which to process the requestQueue
+ * @return true iff an item was processed from the Queue.
+ */
+ private boolean processQueue(K key) throws Exception {
+ Queue<ResourceRequest<V>> requestQueue = getRequestQueueForKey(key);
+ if(requestQueue.isEmpty()) {
+ return false;
+ }
+
+ // Attempt to get a resource.
+ Pool<V> resourcePool = getResourcePoolForKey(key);
V resource = null;
- try {
- checkNotClosed();
- resource = attemptCheckoutGrowCheckout(key, resources);
-
- if(resource == null) {
- long timeRemainingNs = this.timeoutNs - (System.nanoTime() - startNs);
- if(timeRemainingNs < 0)
- throw new TimeoutException("Could not acquire resource in "
- + (this.timeoutNs / Time.NS_PER_MS) + " ms.");
-
- resource = resources.blockingGet(timeoutNs);
- if(resource == null) {
- throw new TimeoutException("Timed out wait for resource after "
- + (timeoutNs / Time.NS_PER_MS) + " ms.");
- }
- }
- if(!objectFactory.validate(key, resource))
- throw new ExcessiveInvalidResourcesException(1);
+ try {
+ // Always attempt to grow to deal with destroyed resources.
+ attemptGrow(key, resourcePool);
+ resource = attemptCheckout(resourcePool);
} catch(Exception e) {
- destroyResource(key, resources, resource);
- System.err.println(e.toString());
- throw e;
+ super.destroyResource(key, resourcePool, resource);
+ }
+ if(resource == null) {
+ return false;
+ }
+
+ // With resource in hand, process the resource requests
+ ResourceRequest<V> resourceRequest = getNextUnexpiredResourceRequest(requestQueue);
+ if(resourceRequest == null) {
+ // Did not use the resource!
+ super.checkin(key, resource);
+ return false;
}
- return resource;
- */
+
+ resourceRequest.useResource(resource);
+ return true;
}
/**
@@ -127,38 +184,94 @@ public void requestResource(K key, ResourceRequest<V> resourceRequest) {
*/
@Override
public void checkin(K key, V resource) throws Exception {
+ // TODO: Unclear if invoking checkin and then invoking processQueue is
+ // "fair" or is "FIFO". In particular, is super.checkout invoked
+ // directly? If so, how should such a blocking checkout interact with
+ // non-blocking checkouts?
super.checkin(key, resource);
- /*-
- Pool<V> pool = getResourcePoolForExistingKey(key);
- if(isOpen.get() && objectFactory.validate(key, resource)) {
- boolean success = pool.nonBlockingPut(resource);
- if(!success) {
- destroyResource(key, pool, resource);
- throw new IllegalStateException("Checkin failed is the pool already full?");
- }
- } else {
- destroyResource(key, pool, resource);
- }
- */
+ while(processQueue(key)) {}
}
/*
- * A safe wrapper to destroy the given request that catches any user
- * exceptions
+ * A safe wrapper to destroy the given resource request.
*/
- protected void destroyRequest(K key,
- Queue<ResourceRequest<V>> requestQueue,
- ResourceRequest<V> resourceRequest) {
+ protected void destroyRequest(ResourceRequest<V> resourceRequest) {
if(resourceRequest != null) {
try {
- // objectFactory.destroy(key, resource);
- resourceRequest = null;
- } catch(Exception e) {
- logger.error("Exception while destorying invalid request:", e);
- } finally {
- // resourcePool.size.decrementAndGet();
+ Exception e = new UnreachableStoreException("Resource request destroyed before resource checked out.");
+ resourceRequest.handleException(e);
+ } catch(Exception ex) {
+ logger.error("Exception while destroying resource request:", ex);
+ }
+ }
+ }
+
+ /**
+ * Destroys all resource requests in requestQueue.
+ *
+ * @param requestQueue The queue for which all resource requests are to be
+ * destroyed.
+ */
+ private void destroyRequestQueue(Queue<ResourceRequest<V>> requestQueue) {
+ ResourceRequest<V> resourceRequest = requestQueue.poll();
+ while(resourceRequest != null) {
+ destroyRequest(resourceRequest);
+ resourceRequest = requestQueue.poll();
+ }
+ }
+
+ @Override
+ protected boolean internalClose() {
+ // wasOpen ensures only one thread destroys everything.
+ boolean wasOpen = super.internalClose();
+ if(wasOpen) {
+ for(Entry<K, Queue<ResourceRequest<V>>> entry: requestQueueMap.entrySet()) {
+ Queue<ResourceRequest<V>> requestQueue = entry.getValue();
+ destroyRequestQueue(requestQueue);
+ requestQueueMap.remove(entry.getKey());
}
}
+ return wasOpen;
+ }
+
+ /**
+ * Close the queue and the pool.
+ */
+ @Override
+ public void close() {
+ internalClose();
+ }
+
+ /**
+ * "Close" a specific resource pool and request queue by destroying all the
+ * resources in the pool and all the requests in the queue. This method does
+ * not affect whether any pool or queue is "open" in the sense of permitting
+ * new resources to be added or requests to be enqueued.
+ *
+ * @param key The key for the pool to close.
+ */
+ @Override
+ public void close(K key) {
+ // TODO: The close method in the super class is not documented at all.
+ // super.close(key) is called by ClientRequestExecutorPool.close which
+ // is called by SocketStoreclientFactory. Given the super class does not
+ // set any closed bit, unclear what the semantics of this.close(key)
+ // ought to be.
+ //
+ // Also, super.close(key) does nothing to protect against multiple
+ // threads accessing the method at the same time. And, super.close(key)
+ // does not remove the affected pool from super.resourcePoolMap. The
+ // semantics of super.close(key) are truly unclear.
+
+ // Destroy enqueued resource requests (if any exist) first.
+ Queue<ResourceRequest<V>> requestQueue = requestQueueMap.get(key);
+ if(requestQueue != null) {
+ destroyRequestQueue(requestQueue);
+ // TODO: requestQueueMap.remove(entry.getKey()); ?
+ }
+
+ // Destroy resources in the pool second.
+ super.close(key);
}
/*
@@ -214,46 +327,4 @@ public int getQueuedResourceRequestCount() {
return count;
}
- /**
- * Close the pool. This will destroy all checked in resource immediately.
- * Once closed all attempts to checkout a new resource will fail. All
- * resources checked in after close is called will be immediately destroyed.
- */
- @Override
- public void close() {
- super.close();
- /*-
- // change state to false and allow one thread.
- if(isOpen.compareAndSet(true, false)) {
- for(Entry<K, Pool<V>> entry: resourcesMap.entrySet()) {
- Pool<V> pool = entry.getValue();
- // destroy each resource in the queue
- for(V value = pool.nonBlockingGet(); value != null; value = pool.nonBlockingGet())
- destroyResource(entry.getKey(), entry.getValue(), value);
- resourcesMap.remove(entry.getKey());
- }
- }
- */
- }
-
- /**
- * "Close" a specific resource pool and request queue by destroying all the
- * resources in the pool and all the requests in the queue. This method does
- * not affect whether any pool or queue is "open" in the sense of permitting
- * new resources to be added or requests to be enqueued.
- *
- * @param key The key for the pool to close.
- */
- @Override
- public void close(K key) {
- // Destroy enqueued requests first.
- Queue<ResourceRequest<V>> requestQueue = getRequestQueueForExistingKey(key);
- ResourceRequest<V> resourceRequest = requestQueue.poll();
- while(resourceRequest != null) {
- destroyRequest(key, requestQueue, resourceRequest);
- resourceRequest = requestQueue.poll();
- }
- // Destroy resources in the pool second.
- super.close(key);
- }
}
View
37 test/integration/voldemort/nonblocking/E2ENonblockingCheckoutTest.java
@@ -66,7 +66,6 @@
private static final int NUM_PUTS = 25;
// Exempt some puts from performance requirements until warmed up
private static final int NUM_EXEMPT_PUTS = 2;
-
private static final long MAX_PUT_TIME_MS = 50;
private static final long SLOW_PUT_MS = 250;
/*
@@ -80,9 +79,9 @@
// Ensure that threads will contend at all servers
private static final int CONNECTIONS_PER_NODE = 1;
- private static final int CONNECTION_TIMEOUT_MS = 10 * 1000;
- private static final int SOCKET_TIMEOUT_MS = 100 * 1000;
- private static final int ROUTING_TIMEOUT_MS = 100 * 1000;
+ private static final int CONNECTION_TIMEOUT_MS = 500; // 10 * 1000;
+ private static final int SOCKET_TIMEOUT_MS = 2 * 1000; // 100 * 1000;
+ private static final int ROUTING_TIMEOUT_MS = 10 * 1000; // 100 * 1000;
private final SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(CONNECTIONS_PER_NODE,
CONNECTION_TIMEOUT_MS,
@@ -122,17 +121,29 @@ public E2ENonblockingCheckoutTest() {
@Before
public void setUp() throws Exception {
+ // PatternLayout patternLayout = new
+ // PatternLayout("%d{ABSOLUTE} %-5p [%t/%c]: %m%n");
+
Logger logger;
/*-
// To analyze whether checkout/checkin paths are blocking add
// log4j.trace statements to KeyedResourcePool checkout/checkin methods.
+ */
+ logger = Logger.getLogger("voldemort.store.socket.clientrequest.ClientRequestExecutorPool");
+ logger.setLevel(Level.TRACE);
+
logger = Logger.getLogger("voldemort.utils.pool.KeyedResourcePool");
logger.setLevel(Level.TRACE);
+
+ logger = Logger.getLogger("voldemort.utils.pool.QueuedKeyedResourcePool");
+ logger.setLevel(Level.TRACE);
+
logger = Logger.getLogger("voldemort.store.socket.SocketStore");
logger.setLevel(Level.DEBUG);
- */
+
logger = Logger.getLogger("voldemort.store.routed.action.PerformParallelPutRequests");
logger.setLevel(Level.DEBUG);
+
logger = Logger.getLogger("voldemort.store.routed.action.PerformSerialPutRequests");
logger.setLevel(Level.DEBUG);
@@ -229,8 +240,9 @@ public void run() {
sleepUntilNextPeriod();
String I = getString(i);
System.out.println("");
- System.out.println("Starting PUT of " + I + " (Thread: "
- + Thread.currentThread().getName() + ")");
+ String context = new String("PUT of " + I + " (Put #: " + i + ", Thread: "
+ + Thread.currentThread().getName() + ")");
+ System.out.println("START " + context);
long startTimeMs = System.currentTimeMillis();
boolean putDone = false;
while(!putDone) {
@@ -238,14 +250,13 @@ public void run() {
storeClient.put(I, I);
putDone = true;
} catch(ObsoleteVersionException e) {
- System.out.println("Retrying PUT of " + I + " (Thread: "
- + Thread.currentThread().getName() + ")");
+ System.out.println("RETRY " + context);
// What to do here?
}
}
long endTimeMs = System.currentTimeMillis();
- System.out.println("OPERATION DONE Time (ms): " + (endTimeMs - startTimeMs)
- + " (Thread: " + Thread.currentThread().getName() + ")");
+ System.out.println(" DONE " + context + " --- Time (ms): "
+ + (endTimeMs - startTimeMs));
if(i >= NUM_EXEMPT_PUTS) {
/*-
assertFalse("Operation completes without blocking on slow server:"
@@ -254,7 +265,9 @@ public void run() {
*/
assertFalse("False", false); // noop until fix
if((endTimeMs - startTimeMs) > this.putTimeLimitMs) {
- System.err.println("Operation blocked! Therefore, operation is not nonblocking... (Operation time: "
+ System.err.println("Operation blocked! Therefore, operation is not nonblocking... "
+ + context
+ + " (Operation time: "
+ (endTimeMs - startTimeMs) + " ms)");
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.