diff --git a/src/java/voldemort/store/nonblockingstore/ThreadPoolBasedNonblockingStoreImpl.java b/src/java/voldemort/store/nonblockingstore/ThreadPoolBasedNonblockingStoreImpl.java index 1ce3405a50..0aa81e49d1 100644 --- a/src/java/voldemort/store/nonblockingstore/ThreadPoolBasedNonblockingStoreImpl.java +++ b/src/java/voldemort/store/nonblockingstore/ThreadPoolBasedNonblockingStoreImpl.java @@ -143,7 +143,7 @@ public void run() { + operationName + ": time out exceeded"); try { - callback.requestComplete(ex, diff); + callback.requestComplete(ex, diff / Time.NS_PER_MS); } catch(Exception e) { if(logger.isEnabledFor(Level.WARN)) logger.warn(e, e); diff --git a/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutor.java b/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutor.java index 1298b9f162..d05e734894 100644 --- a/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutor.java +++ b/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutor.java @@ -71,7 +71,7 @@ public boolean isValid() { return !s.isClosed() && s.isBound() && s.isConnected(); } - public synchronized boolean checkTimeout(SelectionKey selectionKey) { + public synchronized boolean checkTimeout() { if(expiration <= 0) return true; @@ -162,7 +162,7 @@ public void close() { @Override protected void read(SelectionKey selectionKey) throws IOException { - if(!checkTimeout(selectionKey)) + if(!checkTimeout()) return; int count = 0; @@ -211,7 +211,7 @@ protected void read(SelectionKey selectionKey) throws IOException { @Override protected void write(SelectionKey selectionKey) throws IOException { - if(!checkTimeout(selectionKey)) + if(!checkTimeout()) return; if(outputStream.getBuffer().hasRemaining()) { diff --git a/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorFactory.java b/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorFactory.java index 0309cce369..ce95df84c2 100644 --- a/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorFactory.java +++ b/src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorFactory.java @@ -388,7 +388,7 @@ protected void processEvents() { // its way to being canceled. if(clientRequestExecutor != null) { try { - clientRequestExecutor.checkTimeout(selectionKey); + clientRequestExecutor.checkTimeout(); } catch(Exception e) { if(logger.isEnabledFor(Level.ERROR)) logger.error(e.getMessage(), e); diff --git a/src/java/voldemort/utils/pool/KeyedResourcePool.java b/src/java/voldemort/utils/pool/KeyedResourcePool.java index 29d7be2c01..2d81ff5234 100644 --- a/src/java/voldemort/utils/pool/KeyedResourcePool.java +++ b/src/java/voldemort/utils/pool/KeyedResourcePool.java @@ -34,14 +34,12 @@ public class KeyedResourcePool { private final AtomicBoolean isOpen = new AtomicBoolean(true); private final long timeoutNs; private final int poolMaxSize; - private final int maxCreateAttempts; private final boolean isFair; public KeyedResourcePool(ResourceFactory objectFactory, ResourcePoolConfig config) { this.objectFactory = Utils.notNull(objectFactory); this.timeoutNs = Utils.notNull(config).getTimeout(TimeUnit.NANOSECONDS); this.poolMaxSize = config.getMaxPoolSize(); - this.maxCreateAttempts = config.getMaximumInvalidResourceCreationLimit(); this.resourcesMap = new ConcurrentHashMap>(); this.isFair = config.isFair(); } @@ -77,10 +75,12 @@ public static KeyedResourcePool create(ResourceFactory factor * and we have created fewer than the max size resources, then create a new * one. If no resources are available and we are already at the max size * then block for up to the maximum time specified. When we hit the maximum - * time, if we still have not retrieved a resource throw a TimeOutException. + * time, if we still have not retrieved a valid resource throw an exception. + * + * This method is guaranteed to either return a valid resource in the pool + * timeout + object creation time or throw an exception. If an exception is + * thrown, resource is guaranteed to be destroyed. * - * This method is guaranteed to either fail or return a valid resource in - * the pool timeout + object creation time. * * @param key The key to checkout the resource for * @return The resource @@ -91,61 +91,58 @@ public V checkout(K key) throws Exception { long startNs = System.nanoTime(); Pool resources = getResourcePoolForKey(key); - // repeatedly attempt to checkout/create a resource until we get a valid - // one or we hit the timeout or max attempts V resource = null; try { - int attempts = 0; - for(; attempts < this.maxCreateAttempts; attempts++) { - resource = null; - checkNotClosed(); + checkNotClosed(); + resource = attemptCheckout(key, resources); + + if(resource == null) { long timeRemainingNs = this.timeoutNs - (System.nanoTime() - startNs); if(timeRemainingNs < 0) throw new TimeoutException("Could not acquire resource in " + (this.timeoutNs / Time.NS_PER_MS) + " ms."); - resource = checkoutOrCreateResource(key, resources, timeRemainingNs); - if(objectFactory.validate(key, resource)) - return resource; - else - destroyResource(key, resources, resource); + + resource = resources.blockingGet(timeoutNs); + if(resource == null) { + throw new TimeoutException("Timed out wait for resource after " + + (timeoutNs / Time.NS_PER_MS) + " ms."); + } } - throw new ExcessiveInvalidResourcesException(attempts); + + if(!objectFactory.validate(key, resource)) + throw new ExcessiveInvalidResourcesException(1); } catch(Exception e) { destroyResource(key, resources, resource); throw e; } + return resource; } /* * Get a free resource if one exists. If not create one if there is space. - * If no space, block and see if a resource is returned in the given - * timeout. If no resource is returned in that time, throw a - * TimeoutException. + * If you create one, try again to get a free resource. This method does not + * block. It either returns null or a resource. */ - private V checkoutOrCreateResource(K key, Pool pool, long timeoutNs) throws Exception { - // see if there is anything in the pool + private V attemptCheckout(K key, Pool pool) throws Exception { V resource = pool.nonBlockingGet(); if(resource != null) return resource; - // okay the queue is empty, maybe we have room to expand a bit? - if(pool.size.get() < this.poolMaxSize) - attemptGrow(key, pool); - - // now block for next available resource - resource = pool.blockingGet(timeoutNs); - if(resource == null) - throw new TimeoutException("Timed out wait for resource after " - + (timeoutNs / Time.NS_PER_MS) + " ms."); + // Attempt to expand the queue and retry. + if(pool.size.get() < this.poolMaxSize) { + if(attemptGrow(key, pool)) { + resource = pool.nonBlockingGet(); + } + } return resource; } /* * Attempt to create a new object and add it to the pool--this only happens - * if there is room for the new object. + * if there is room for the new object. This method does not block. */ - private void attemptGrow(K key, Pool pool) throws Exception { + private boolean attemptGrow(K key, Pool pool) throws Exception { // attempt to increment, and if the incremented value is less // than the pool size then create a new resource if(pool.size.incrementAndGet() <= this.poolMaxSize) { @@ -158,7 +155,9 @@ private void attemptGrow(K key, Pool pool) throws Exception { } } else { pool.size.decrementAndGet(); + return false; } + return true; } /*