Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

More tests for resource pool.

  • Loading branch information...
commit 2c3c94421e83e7d60a872ed795603ef7d4c39683 1 parent 4301934
@jkreps jkreps authored
View
4 src/java/voldemort/store/socket/SocketPool.java
@@ -52,8 +52,8 @@ public SocketPool(int maxConnectionsPerNode,
int soTimeoutMs,
int socketBufferSize) {
ResourcePoolConfig config = new ResourcePoolConfig().setIsFair(true)
- .setPoolSize(maxConnectionsPerNode)
- .setMaximumInvalidResourceCreationLimit(maxConnectionsPerNode)
+ .setMaxPoolSize(maxConnectionsPerNode)
+ .setMaxInvalidAttempts(maxConnectionsPerNode)
.setTimeout(connectionTimeoutMs,
TimeUnit.MILLISECONDS);
this.socketFactory = new SocketResourceFactory(soTimeoutMs, socketBufferSize);
View
55 src/java/voldemort/utils/pool/KeyedResourcePool.java
@@ -31,14 +31,14 @@
private final ConcurrentMap<K, Pool<V>> resourcesMap;
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final long timeoutNs;
- private final int poolSize;
+ private final int poolMaxSize;
private final int maxCreateAttempts;
private final boolean isFair;
public KeyedResourcePool(ResourceFactory<K, V> objectFactory, ResourcePoolConfig config) {
this.objectFactory = Utils.notNull(objectFactory);
this.timeoutNs = Utils.notNull(config).getTimeout(TimeUnit.NANOSECONDS);
- this.poolSize = config.getPoolSize();
+ this.poolMaxSize = config.getMaxPoolSize();
this.maxCreateAttempts = config.getMaximumInvalidResourceCreationLimit();
this.resourcesMap = new ConcurrentHashMap<K, Pool<V>>();
this.isFair = config.isFair();
@@ -94,7 +94,7 @@ public V checkout(K key) throws Exception {
V resource = null;
try {
int attempts = 0;
- do {
+ for(; attempts < this.maxCreateAttempts; attempts++) {
checkNotClosed();
long timeRemainingNs = this.timeoutNs - (System.nanoTime() - startNs);
if(timeRemainingNs < 0)
@@ -103,13 +103,14 @@ public V checkout(K key) throws Exception {
resource = checkoutOrCreateResource(key, resources, timeRemainingNs);
if(objectFactory.validate(key, resource))
return resource;
- } while(++attempts < this.maxCreateAttempts);
+ else
+ destroyResource(key, resources, resource);
+ }
+ throw new ExcessiveInvalidResourcesException(attempts);
} catch(Exception e) {
destroyResource(key, resources, resource);
throw e;
}
-
- return resource;
}
/*
@@ -124,12 +125,11 @@ private V checkoutOrCreateResource(K key, Pool<V> pool, long timeoutNs) throws E
if(resource != null)
return resource;
- // okay the queue is empty, maybe we have room to create a new resource
- resource = attemptCreate(key, pool);
- if(resource != null)
- return resource;
+ // okay the queue is empty, maybe we have room to expand a bit?
+ if(pool.size.get() < this.poolMaxSize)
+ attemptGrow(key, pool);
- // pool has reached max size, block for next available resource
+ // now block for next available resource
resource = pool.blockingGet(timeoutNs);
if(resource == null)
throw new TimeoutException("Timed out wait for resource after "
@@ -139,28 +139,23 @@ private V checkoutOrCreateResource(K key, Pool<V> pool, long timeoutNs) throws E
}
/*
- * Attempt to create a new object in the pool if there is room. Return the
- * object if created, else null.
+ * Attempt to create a new object and add it to the pool--this only happens
+ * if there is room for the new object.
*/
- private V attemptCreate(K key, Pool<V> pool) throws Exception {
- V resource = null;
- // do a sanity check on the size
- if(pool.size.get() < this.poolSize) {
- // now attempt to increment, and if the incremented value is less
- // than the pool size then create a new resource
- if(pool.size.incrementAndGet() <= this.poolSize) {
- try {
- resource = objectFactory.create(key);
- } catch(Exception e) {
- pool.size.decrementAndGet();
- throw e;
- }
- } else {
+ private void attemptGrow(K key, Pool<V> pool) throws Exception {
+ // attempt to increment, and if the incremented value is less
+ // than the pool size then create a new resource
+ if(pool.size.incrementAndGet() <= this.poolMaxSize) {
+ try {
+ V resource = objectFactory.create(key);
+ pool.nonBlockingPut(resource);
+ } catch(Exception e) {
pool.size.decrementAndGet();
+ throw e;
}
+ } else {
+ pool.size.decrementAndGet();
}
-
- return resource;
}
/*
@@ -169,7 +164,7 @@ private V attemptCreate(K key, Pool<V> pool) throws Exception {
private Pool<V> getResourcePoolForKey(K key) {
Pool<V> pool = resourcesMap.get(key);
if(pool == null) {
- pool = new Pool<V>(this.poolSize, this.isFair);
+ pool = new Pool<V>(this.poolMaxSize, this.isFair);
resourcesMap.putIfAbsent(key, pool);
pool = resourcesMap.get(key);
}
View
14 src/java/voldemort/utils/pool/ResourcePoolConfig.java
@@ -11,7 +11,7 @@
public class ResourcePoolConfig {
/* Note: if you change the defaults you must update the javadoc as well. */
- private int poolSize = 100;
+ private int poolMaxSize = 20;
private long timeoutNs = Long.MAX_VALUE;
private int maxInvalidResourceCreations = Integer.MAX_VALUE;
private boolean isFair = true;
@@ -23,21 +23,21 @@ public ResourcePoolConfig() {
/**
* Get the size of the pool
*/
- public int getPoolSize() {
- return poolSize;
+ public int getMaxPoolSize() {
+ return poolMaxSize;
}
/**
* The size of the pool to maintain for each key.
*
- * The default pool size is 100
+ * The default pool size is 20
*
* @param poolSize The desired per-key pool size
*/
- public ResourcePoolConfig setPoolSize(int poolSize) {
+ public ResourcePoolConfig setMaxPoolSize(int poolSize) {
if(poolSize <= 0)
throw new IllegalArgumentException("Pool size must be a positive number.");
- this.poolSize = poolSize;
+ this.poolMaxSize = poolSize;
return this;
}
@@ -74,7 +74,7 @@ public ResourcePoolConfig setTimeout(long timeout, TimeUnit unit) {
*
* @param limit The desired limit
*/
- public ResourcePoolConfig setMaximumInvalidResourceCreationLimit(int limit) {
+ public ResourcePoolConfig setMaxInvalidAttempts(int limit) {
if(limit <= 0)
throw new IllegalArgumentException("Limit must be a positive number.");
this.maxInvalidResourceCreations = limit;
View
103 test/integration/voldemort/performance/ResourcePoolPerfTest.java
@@ -0,0 +1,103 @@
+package voldemort.performance;
+
+import java.text.NumberFormat;
+
+import org.apache.commons.pool.KeyedObjectPool;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+
+import voldemort.utils.pool.KeyedResourcePool;
+import voldemort.utils.pool.ResourceFactory;
+import voldemort.utils.pool.ResourcePoolConfig;
+
+public class ResourcePoolPerfTest {
+
+ public static void main(String[] args) throws Exception {
+
+ final int numKeys = 10;
+ final int numThreads = 10;
+ final int numRequests = 10000000;
+ NumberFormat format = NumberFormat.getInstance();
+ format.setMaximumFractionDigits(2);
+
+ for(int poolSize: new int[] { 1, 5, 10 }) {
+ System.out.println("Perf test for voldemort pool with numThreads = " + numThreads
+ + ", poolSize = " + poolSize + ":");
+ final KeyedResourcePool<Integer, String> pool = KeyedResourcePool.create(new StringResourceFactory(),
+ new ResourcePoolConfig().setMaxPoolSize(poolSize)
+ .setIsFair(true));
+ PerformanceTest test = new PerformanceTest() {
+
+ @Override
+ public void doOperation(int id) throws Exception {
+ Integer key = id % numKeys;
+ String s = pool.checkout(key);
+ pool.checkin(key, s);
+ }
+ };
+ test.run(numRequests, numThreads);
+ test.printStats();
+ System.out.println();
+ }
+
+ System.out.println("--------------------------------------");
+ System.out.println();
+
+ for(int poolSize: new int[] { 1, 5, 10 }) {
+ System.out.println("Perf test for commons pool with numThreads = " + numThreads
+ + ", poolSize = " + poolSize + ":");
+ GenericKeyedObjectPool.Config config = new GenericKeyedObjectPool.Config();
+ config.maxActive = poolSize;
+ config.testOnBorrow = true;
+ config.whenExhaustedAction = GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK;
+ config.maxWait = 10000;
+ StringPoolableObjectFactory objFactory = new StringPoolableObjectFactory();
+ final KeyedObjectPool pool = new GenericKeyedObjectPool(objFactory, config);
+ PerformanceTest test = new PerformanceTest() {
+
+ @Override
+ public void doOperation(int id) throws Exception {
+ Integer key = id % numKeys;
+ String s = (String) pool.borrowObject(key);
+ pool.returnObject(key, s);
+ }
+ };
+ test.run(numRequests, numThreads);
+ test.printStats();
+ System.out.println();
+ }
+
+ }
+
+ private static class StringResourceFactory implements ResourceFactory<Integer, String> {
+
+ public String create(Integer key) {
+ return new String(key + "-val");
+ }
+
+ public void destroy(Integer key, String obj) {}
+
+ public boolean validate(Integer key, String value) {
+ return true;
+ }
+ }
+
+ private static class StringPoolableObjectFactory implements KeyedPoolableObjectFactory {
+
+ public void activateObject(Object k, Object v) {}
+
+ public void passivateObject(Object k, Object v) {}
+
+ public void destroyObject(Object k, Object v) {}
+
+ public Object makeObject(Object k) {
+ return new String(k + "-val");
+ }
+
+ public boolean validateObject(Object k, Object v) {
+ return true;
+ }
+
+ }
+
+}
View
8 test/integration/voldemort/socketpool/AbstractSocketPoolTest.java
@@ -50,9 +50,9 @@ public void run() {
// Size
assertEquals("resources In Hand(" + resourceInHand.get(key).get()
+ ") should be less than equal to pool size("
- + config.getPoolSize() + ")",
+ + config.getMaxPoolSize() + ")",
true,
- resourceInHand.get(key).get() <= config.getPoolSize());
+ resourceInHand.get(key).get() <= config.getMaxPoolSize());
// do something
doSomethingWithResource(key, resource);
@@ -65,8 +65,8 @@ public void run() {
} catch(TimeoutException e) {
// only if alloted resources are same as pool size
assertEquals("resources In Hand(" + resourceInHand.get(key).get()
- + ") should be same as pool size(" + config.getPoolSize()
- + ")", config.getPoolSize(), resourceInHand.get(key).get());
+ + ") should be same as pool size(" + config.getMaxPoolSize()
+ + ")", config.getMaxPoolSize(), resourceInHand.get(key).get());
++testStats.timeoutRequests;
System.out.println("saw timeout !!");
return;
View
6 test/integration/voldemort/socketpool/SimpleSocketPoolTest.java
@@ -12,7 +12,7 @@
public void testPoolLimitNoTimeout() throws Exception {
final ResourcePoolConfig config = new ResourcePoolConfig().setTimeout(1000,
TimeUnit.MILLISECONDS)
- .setPoolSize(20);
+ .setMaxPoolSize(20);
ResourceFactory<String, String> factory = ResourcePoolTestUtils.getBasicPoolFactory();
final AbstractSocketPoolTest<String, String> test = new AbstractSocketPoolTest<String, String>() {
@@ -36,7 +36,7 @@ protected String getRequestKey() throws Exception {
public void testPoolLimitSomeTimeout() throws Exception {
final ResourcePoolConfig config = new ResourcePoolConfig().setTimeout(50,
TimeUnit.MILLISECONDS)
- .setPoolSize(20);
+ .setMaxPoolSize(20);
ResourceFactory<String, String> factory = ResourcePoolTestUtils.getBasicPoolFactory();
final AbstractSocketPoolTest<String, String> test = new AbstractSocketPoolTest<String, String>() {
@@ -60,7 +60,7 @@ protected String getRequestKey() throws Exception {
public void testNoTimeout() throws Exception {
final ResourcePoolConfig config = new ResourcePoolConfig().setTimeout(100,
TimeUnit.MILLISECONDS)
- .setPoolSize(20);
+ .setMaxPoolSize(20);
ResourceFactory<String, String> factory = ResourcePoolTestUtils.getBasicPoolFactory();
final AbstractSocketPoolTest<String, String> test = new AbstractSocketPoolTest<String, String>() {
View
114 test/unit/voldemort/utils/pool/KeyedResourcePoolTest.java
@@ -1,7 +1,5 @@
package voldemort.utils.pool;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -13,6 +11,7 @@
private static int POOL_SIZE = 5;
private static long TIMEOUT_MS = 100;
+ private static int MAX_ATTEMPTS = 10;
private TestResourceFactory factory;
private KeyedResourcePool<String, TestResource> pool;
@@ -20,9 +19,10 @@
@Override
public void setUp() {
factory = new TestResourceFactory();
- ResourcePoolConfig config = new ResourcePoolConfig();
- config.setPoolSize(POOL_SIZE);
- config.setTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ ResourcePoolConfig config = new ResourcePoolConfig().setMaxPoolSize(POOL_SIZE)
+ .setTimeout(TIMEOUT_MS,
+ TimeUnit.MILLISECONDS)
+ .setMaxInvalidAttempts(MAX_ATTEMPTS);
this.pool = new KeyedResourcePool<String, TestResource>(factory, config);
}
@@ -46,6 +46,42 @@ public void testFullPoolBlocks() throws Exception {
}
}
+ public void testExceptions() throws Exception {
+ // we should start with an empty pool
+ assertEquals(0, this.pool.getTotalResourceCount());
+
+ Exception toThrow = new Exception("An exception!");
+
+ // test exception on destroy
+ TestResource checkedOut = this.pool.checkout("a");
+ assertEquals(1, this.pool.getTotalResourceCount());
+ assertEquals(0, this.pool.getCheckedInResourceCount());
+ this.factory.setDestroyException(toThrow);
+ try {
+ this.pool.checkin("a", checkedOut);
+ // checking out again should force destroy
+ this.pool.checkout("a");
+ assertTrue(checkedOut.isDestroyed());
+ } catch(Exception caught) {
+ fail("No exception expected.");
+ }
+ assertEquals(1, this.pool.getTotalResourceCount());
+ assertEquals(0, this.pool.getCheckedInResourceCount());
+
+ this.factory.setCreateException(toThrow);
+ try {
+ this.pool.checkout("b");
+ fail("Excpected exception!");
+ } catch(Exception caught) {
+ assertEquals("The exception thrown by the factory should propage to the caller.",
+ toThrow,
+ caught);
+ }
+ // failed checkout shouldn't effect count
+ assertEquals(1, this.pool.getTotalResourceCount());
+ assertEquals(0, this.pool.getCheckedInResourceCount());
+ }
+
public void testInvalidIsDestroyed() throws Exception {
TestResource r1 = this.pool.checkout("a");
r1.invalidate();
@@ -55,45 +91,14 @@ public void testInvalidIsDestroyed() throws Exception {
assertTrue("Invalid objects should be destroyed.", r1.isDestroyed());
}
- public void testMultithreaded() throws Exception {
- int numThreads = POOL_SIZE * 2;
- final String[] keys = new String[numThreads * 2];
- for(int i = 0; i < keys.length; i++)
- keys[i] = Integer.toString(i);
-
- final AtomicInteger totalExecutions = new AtomicInteger(0);
- final AtomicInteger destroyed = new AtomicInteger(0);
- final AtomicBoolean isStopped = new AtomicBoolean(false);
- ExecutorService executor = Executors.newFixedThreadPool(numThreads);
- for(int i = 0; i < numThreads; i++) {
- executor.execute(new Runnable() {
-
- public void run() {
- while(!isStopped.get()) {
- int curr = totalExecutions.getAndIncrement();
- String key = keys[curr % keys.length];
- try {
- TestResource r = pool.checkout(key);
- assertTrue(r.isValid());
- if(curr % 10021 == 0) {
- r.invalidate();
- destroyed.getAndIncrement();
- }
- pool.checkin(key, r);
- } catch(Exception e) {
- fail("Unexpected exception: " + e);
- }
- }
- }
- });
- }
- Thread.sleep(1000);
- isStopped.set(true);
- Thread.sleep(200);
- executor.shutdownNow();
- assertTrue(executor.awaitTermination(100, TimeUnit.MILLISECONDS));
- pool.close();
- assertEquals(factory.getCreated(), factory.getDestroyed());
+ public void testMaxInvalidCreations() throws Exception {
+ this.factory.setCreatedValid(false);
+ try {
+ this.pool.checkout("a");
+ fail("Exceeded max failed attempts without exception.");
+ } catch(ExcessiveInvalidResourcesException e) {
+ // this is expected
+ }
}
private static class TestResource {
@@ -143,13 +148,22 @@ public String toString() {
private final AtomicInteger created = new AtomicInteger(0);
private final AtomicInteger destroyed = new AtomicInteger(0);
+ private Exception createException;
+ private Exception destroyException;
+ private boolean isCreatedValid = true;
public TestResource create(String key) throws Exception {
+ if(createException != null)
+ throw createException;
TestResource r = new TestResource(Integer.toString(created.getAndIncrement()));
+ if(!isCreatedValid)
+ r.invalidate();
return r;
}
public void destroy(String key, TestResource obj) throws Exception {
+ if(destroyException != null)
+ throw destroyException;
destroyed.incrementAndGet();
obj.destroy();
}
@@ -166,6 +180,18 @@ public int getDestroyed() {
return this.destroyed.get();
}
+ public void setDestroyException(Exception e) {
+ this.destroyException = e;
+ }
+
+ public void setCreateException(Exception e) {
+ this.createException = e;
+ }
+
+ public void setCreatedValid(boolean isValid) {
+ this.isCreatedValid = isValid;
+ }
+
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.