Skip to content

Commit

Permalink
Minor cleanup in preparation of making socket checkouts along non-blo…
Browse files Browse the repository at this point in the history
…cking code paths asynchronous.

src/java/voldemort/store/nonblockingstore/ThreadPoolBasedNonblockingStoreImpl.java
- fix missing ns to ms conversion

src/java/voldemort/store/socket/clientrequest/ClientRequestExecutor.java
src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorFactory.java
- remove unnecessary selectionKey from checkTimeout interface

src/java/voldemort/utils/pool/KeyedResourcePool.java
- removed extraneous maxCreateAttempts
- attemptGrow returns true if pool grew
- refactored checkoutOrCreateResource
  - renamed to attemptCheckout
  - attemptCheckout is non blocking
  - attemptCheckout does not check timeouts
- refactored checkout
  - removed unreachable code (only one attempt could ever be made so attempts and maxCreateAttempts are extraneous.
  - made it clearer that exceptions control flow through try block
* even though code looks a lot different, I believe exact
  functionality is preserved except for one thing: the
  possibility of a single additional non-blocking get on the
  pool (resources) when attemptGrow returns true.
  • Loading branch information
jayjwylie committed Oct 9, 2012
1 parent 8e403b2 commit 69f9896
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 38 deletions.
Expand Up @@ -143,7 +143,7 @@ public void run() {
+ operationName
+ ": time out exceeded");
try {
callback.requestComplete(ex, diff);
callback.requestComplete(ex, diff / Time.NS_PER_MS);
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
Expand Down
Expand Up @@ -71,7 +71,7 @@ public boolean isValid() {
return !s.isClosed() && s.isBound() && s.isConnected();
}

public synchronized boolean checkTimeout(SelectionKey selectionKey) {
public synchronized boolean checkTimeout() {
if(expiration <= 0)
return true;

Expand Down Expand Up @@ -162,7 +162,7 @@ public void close() {

@Override
protected void read(SelectionKey selectionKey) throws IOException {
if(!checkTimeout(selectionKey))
if(!checkTimeout())
return;

int count = 0;
Expand Down Expand Up @@ -211,7 +211,7 @@ protected void read(SelectionKey selectionKey) throws IOException {

@Override
protected void write(SelectionKey selectionKey) throws IOException {
if(!checkTimeout(selectionKey))
if(!checkTimeout())
return;

if(outputStream.getBuffer().hasRemaining()) {
Expand Down
Expand Up @@ -388,7 +388,7 @@ protected void processEvents() {
// its way to being canceled.
if(clientRequestExecutor != null) {
try {
clientRequestExecutor.checkTimeout(selectionKey);
clientRequestExecutor.checkTimeout();
} catch(Exception e) {
if(logger.isEnabledFor(Level.ERROR))
logger.error(e.getMessage(), e);
Expand Down
65 changes: 32 additions & 33 deletions src/java/voldemort/utils/pool/KeyedResourcePool.java
Expand Up @@ -34,14 +34,12 @@ public class KeyedResourcePool<K, V> {
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final long timeoutNs;
private final int poolMaxSize;
private final int maxCreateAttempts;
private final boolean isFair;

public KeyedResourcePool(ResourceFactory<K, V> objectFactory, ResourcePoolConfig config) {
this.objectFactory = Utils.notNull(objectFactory);
this.timeoutNs = Utils.notNull(config).getTimeout(TimeUnit.NANOSECONDS);
this.poolMaxSize = config.getMaxPoolSize();
this.maxCreateAttempts = config.getMaximumInvalidResourceCreationLimit();
this.resourcesMap = new ConcurrentHashMap<K, Pool<V>>();
this.isFair = config.isFair();
}
Expand Down Expand Up @@ -77,10 +75,12 @@ public static <K, V> KeyedResourcePool<K, V> create(ResourceFactory<K, V> factor
* and we have created fewer than the max size resources, then create a new
* one. If no resources are available and we are already at the max size
* then block for up to the maximum time specified. When we hit the maximum
* time, if we still have not retrieved a resource throw a TimeOutException.
* time, if we still have not retrieved a valid resource throw an exception.
*
* This method is guaranteed to either return a valid resource in the pool
* timeout + object creation time or throw an exception. If an exception is
* thrown, resource is guaranteed to be destroyed.
*
* This method is guaranteed to either fail or return a valid resource in
* the pool timeout + object creation time.
*
* @param key The key to checkout the resource for
* @return The resource
Expand All @@ -91,61 +91,58 @@ public V checkout(K key) throws Exception {
long startNs = System.nanoTime();
Pool<V> resources = getResourcePoolForKey(key);

// repeatedly attempt to checkout/create a resource until we get a valid
// one or we hit the timeout or max attempts
V resource = null;
try {
int attempts = 0;
for(; attempts < this.maxCreateAttempts; attempts++) {
resource = null;
checkNotClosed();
checkNotClosed();
resource = attemptCheckout(key, resources);

if(resource == null) {
long timeRemainingNs = this.timeoutNs - (System.nanoTime() - startNs);
if(timeRemainingNs < 0)
throw new TimeoutException("Could not acquire resource in "
+ (this.timeoutNs / Time.NS_PER_MS) + " ms.");
resource = checkoutOrCreateResource(key, resources, timeRemainingNs);
if(objectFactory.validate(key, resource))
return resource;
else
destroyResource(key, resources, resource);

resource = resources.blockingGet(timeoutNs);
if(resource == null) {
throw new TimeoutException("Timed out wait for resource after "
+ (timeoutNs / Time.NS_PER_MS) + " ms.");
}
}
throw new ExcessiveInvalidResourcesException(attempts);

if(!objectFactory.validate(key, resource))
throw new ExcessiveInvalidResourcesException(1);
} catch(Exception e) {
destroyResource(key, resources, resource);
throw e;
}
return resource;
}

/*
* Get a free resource if one exists. If not create one if there is space.
* If no space, block and see if a resource is returned in the given
* timeout. If no resource is returned in that time, throw a
* TimeoutException.
* If you create one, try again to get a free resource. This method does not
* block. It either returns null or a resource.
*/
private V checkoutOrCreateResource(K key, Pool<V> pool, long timeoutNs) throws Exception {
// see if there is anything in the pool
private V attemptCheckout(K key, Pool<V> pool) throws Exception {
V resource = pool.nonBlockingGet();
if(resource != null)
return resource;

// okay the queue is empty, maybe we have room to expand a bit?
if(pool.size.get() < this.poolMaxSize)
attemptGrow(key, pool);

// now block for next available resource
resource = pool.blockingGet(timeoutNs);
if(resource == null)
throw new TimeoutException("Timed out wait for resource after "
+ (timeoutNs / Time.NS_PER_MS) + " ms.");
// Attempt to expand the queue and retry.
if(pool.size.get() < this.poolMaxSize) {
if(attemptGrow(key, pool)) {
resource = pool.nonBlockingGet();
}
}

return resource;
}

/*
* Attempt to create a new object and add it to the pool--this only happens
* if there is room for the new object.
* if there is room for the new object. This method does not block.
*/
private void attemptGrow(K key, Pool<V> pool) throws Exception {
private boolean attemptGrow(K key, Pool<V> pool) throws Exception {
// attempt to increment, and if the incremented value is less
// than the pool size then create a new resource
if(pool.size.incrementAndGet() <= this.poolMaxSize) {
Expand All @@ -158,7 +155,9 @@ private void attemptGrow(K key, Pool<V> pool) throws Exception {
}
} else {
pool.size.decrementAndGet();
return false;
}
return true;
}

/*
Expand Down

0 comments on commit 69f9896

Please sign in to comment.