Skip to content

Commit

Permalink
Attempt at fixing #90
Browse files Browse the repository at this point in the history
  • Loading branch information
gresrun committed Dec 28, 2015
1 parent 6057fdc commit aac052d
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 41 deletions.
23 changes: 11 additions & 12 deletions src/main/java/net/greghaines/jesque/utils/PoolUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,56 @@
package net.greghaines.jesque.utils;

import net.greghaines.jesque.Config;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSentinelPool;
import redis.clients.util.Pool;

/**
* Convenience methods for doing work with pooled resources.
* Convenience methods for doing work with pooled Jedis connections.
*
* @author Greg Haines
*/
public final class PoolUtils {

/**
* Perform the given work with a resource from the given pool.
*
* Perform the given work with a Jedis connection from the given pool.
*
* @param pool the resource pool
* @param work the work to perform
* @param <T> the resource type
* @param <V> the result type
* @return the result of the given work
* @throws Exception if something went wrong
*/
public static <T, V> V doWorkInPool(final Pool<T> pool, final PoolWork<T, V> work) throws Exception {
public static <V> V doWorkInPool(final Pool<Jedis> pool, final PoolWork<Jedis, V> work) throws Exception {
if (pool == null) {
throw new IllegalArgumentException("pool must not be null");
}
if (work == null) {
throw new IllegalArgumentException("work must not be null");
}
final V result;
final T poolResource = pool.getResource();
final Jedis poolResource = pool.getResource();
try {
result = work.doWork(poolResource);
} finally {
pool.returnResource(poolResource);
poolResource.close();
}
return result;
}

/**
* Perform the given work with a resource from the given pool. Wraps any
* thrown checked exceptions in a RuntimeException.
*
* Perform the given work with a Jedis connection from the given pool.
* Wraps any thrown checked exceptions in a RuntimeException.
*
* @param pool the resource pool
* @param work the work to perform
* @param <T> the resource type
* @param <V> the result type
* @return the result of the given work
*/
public static <T, V> V doWorkInPoolNicely(final Pool<T> pool, final PoolWork<T, V> work) {
public static <V> V doWorkInPoolNicely(final Pool<Jedis> pool, final PoolWork<Jedis, V> work) {
final V result;
try {
result = doWorkInPool(pool, work);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testGetCount() {
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(jedis));
oneOf(jedis).llen(FAILED_KEY); will(returnValue(failCount));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final long count = this.failureDAO.getCount();
Assert.assertEquals(failCount, count);
Expand All @@ -86,7 +86,7 @@ public void testClear() {
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(jedis));
oneOf(jedis).del(FAILED_KEY);
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
this.failureDAO.clear();
}
Expand All @@ -111,7 +111,7 @@ public void testGetFailures() throws JsonProcessingException {
oneOf(pool).getResource(); will(returnValue(jedis));
oneOf(jedis).lrange(FAILED_KEY, offset, offset + count - 1);
will(returnValue(origJsons));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final List<JobFailure> fails = this.failureDAO.getFailures(offset, count);
Assert.assertNotNull(fails);
Expand All @@ -126,7 +126,7 @@ public void testRemove() throws JsonProcessingException {
oneOf(pool).getResource(); will(returnValue(jedis));
oneOf(jedis).lset(with(equal(FAILED_KEY)), with(equal(index)), with(any(String.class)));
oneOf(jedis).lrem(with(equal(FAILED_KEY)), with(equal(1L)), with(any(String.class)));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
this.failureDAO.remove(index);
}
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testRequeue() throws JsonProcessingException {
oneOf(jedis).lset(with(equal(FAILED_KEY)), with(equal(index)), with(any(String.class)));
oneOf(jedis).sadd(QUEUES_KEY, queue);
oneOf(jedis).rpush("resque:queue:" + queue, jobJson);
exactly(2).of(pool).returnResource(jedis);
exactly(2).of(jedis).close();
}});
final Date requeuedAt = this.failureDAO.requeue(index);
Assert.assertNotNull(requeuedAt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testGetRedisInfo() {
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(jedis));
oneOf(jedis).info(); will(returnValue(infoString));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final Map<String,String> redisInfo = this.keysDAO.getRedisInfo();
Assert.assertNotNull(redisInfo);
Expand All @@ -91,7 +91,7 @@ public void testGetKeyInfos() throws Exception {
oneOf(jedis).type(key); will(returnValue(KeyType.STRING.toString()));
oneOf(jedis).strlen(key); will(returnValue((long)values.get(i++).length()));
}
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final List<KeyInfo> keyInfos = this.keysDAO.getKeyInfos();
Assert.assertNotNull(keyInfos);
Expand All @@ -116,7 +116,7 @@ public void testGetKeyInfo() throws Exception {
oneOf(pool).getResource(); will(returnValue(jedis));
oneOf(jedis).type(key); will(returnValue(KeyType.STRING.toString()));
oneOf(jedis).strlen(key); will(returnValue(size));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final KeyInfo keyInfo = this.keysDAO.getKeyInfo(key);
Assert.assertNotNull(keyInfo);
Expand All @@ -137,7 +137,7 @@ public void testGetKeyInfo_ArrayValue() throws Exception {
oneOf(jedis).type(key); will(returnValue(KeyType.STRING.toString()));
oneOf(jedis).strlen(key); will(returnValue(size));
oneOf(jedis).get(key); will(returnValue(value));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final KeyInfo keyInfo = this.keysDAO.getKeyInfo(key, 0, 1);
Assert.assertNotNull(keyInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testGetQueueNames() {
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(jedis));
oneOf(jedis).smembers(QUEUES_KEY); will(returnValue(queueSet));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final List<String> queueNames = this.qInfoDAO.getQueueNames();
Assert.assertNotNull(queueNames);
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testGetPendingCount() {
oneOf(jedis).llen(queueKey); will(returnValue(queueCountMap.get(e.getKey())));
}
}
exactly(2).of(pool).returnResource(jedis);
exactly(2).of(jedis).close();
}});
final long pendingCount = this.qInfoDAO.getPendingCount();
Assert.assertEquals(8, pendingCount);
Expand All @@ -122,7 +122,7 @@ public void testGetProcessedCount() {
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(jedis));
oneOf(jedis).get("resque:stat:processed"); will(returnValue(Long.toString(count)));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final long processedCount = this.qInfoDAO.getProcessedCount();
Assert.assertEquals(count, (Long)processedCount);
Expand All @@ -133,7 +133,7 @@ public void testGetProcessedCount_Null() {
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(jedis));
oneOf(jedis).get("resque:stat:processed"); will(returnValue(null));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final long processedCount = this.qInfoDAO.getProcessedCount();
Assert.assertEquals(0L, processedCount);
Expand All @@ -146,7 +146,7 @@ public void testRemoveQueue() {
oneOf(pool).getResource(); will(returnValue(jedis));
oneOf(jedis).srem(QUEUES_KEY, queue);
oneOf(jedis).del("resque:queue:" + queue);
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
this.qInfoDAO.removeQueue(queue);
}
Expand All @@ -171,7 +171,7 @@ public void testGetQueueInfos() {
oneOf(jedis).llen(queueKey); will(returnValue(queueCountMap.get(e.getKey())));
}
}
exactly(2).of(pool).returnResource(jedis);
exactly(2).of(jedis).close();
}});
final List<QueueInfo> queueInfos = this.qInfoDAO.getQueueInfos();
Assert.assertNotNull(queueInfos);
Expand All @@ -197,7 +197,7 @@ public void testGetQueueInfo_List() throws JsonProcessingException {
exactly(2).of(jedis).type(queueKey); will(returnValue(KeyType.LIST.toString()));
oneOf(jedis).llen(queueKey); will(returnValue(size));
oneOf(jedis).lrange(queueKey, jobOffset, jobOffset + jobCount - 1); will(returnValue(payloads));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final QueueInfo queueInfo = this.qInfoDAO.getQueueInfo(name, jobOffset, jobCount);
Assert.assertNotNull(queueInfo);
Expand All @@ -223,7 +223,7 @@ public void testGetQueueInfo_ZSet() throws JsonProcessingException {
exactly(2).of(jedis).type(queueKey); will(returnValue(KeyType.ZSET.toString()));
oneOf(jedis).zcard(queueKey); will(returnValue(size));
oneOf(jedis).zrange(queueKey, jobOffset, jobOffset + jobCount - 1); will(returnValue(payloads));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final QueueInfo queueInfo = this.qInfoDAO.getQueueInfo(name, jobOffset, jobCount);
Assert.assertNotNull(queueInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testGetWorkerCount() {
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(jedis));
oneOf(jedis).scard(WORKERS_KEY); will(returnValue(workerCount));
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
final long count = this.workerInfoDAO.getWorkerCount();
Assert.assertEquals(workerCount, count);
Expand All @@ -83,7 +83,7 @@ public void testRemoveWorker() {
oneOf(jedis).srem(WORKERS_KEY, workerName); will(returnValue(1L));
oneOf(jedis).del("resque:worker:foo", "resque:worker:foo:started",
"resque:stat:failed:foo", "resque:stat:processed:foo");
oneOf(pool).returnResource(jedis);
oneOf(jedis).close();
}});
this.workerInfoDAO.removeWorker(workerName);
}
Expand Down
18 changes: 8 additions & 10 deletions src/test/java/net/greghaines/jesque/utils/TestPoolUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
public class TestPoolUtils {

private Mockery mockCtx;
private Pool<String> pool;
private PoolWork<String,String> work;
private Jedis resource;
private Pool<Jedis> pool;
private PoolWork<Jedis, String> work;

@SuppressWarnings("unchecked")
@Before
Expand All @@ -34,6 +35,7 @@ public void setUp() {
this.mockCtx.setImposteriser(ClassImposteriser.INSTANCE);
this.pool = this.mockCtx.mock(Pool.class);
this.work = this.mockCtx.mock(PoolWork.class);
this.resource = this.mockCtx.mock(Jedis.class);
}

@Test(expected = IllegalArgumentException.class)
Expand All @@ -48,48 +50,44 @@ public void testDoWorkInPool_NullWork() throws Exception {

@Test
public void testDoWorkInPool() throws Exception {
final String resource = "foo";
final String result = "bar";
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(resource));
oneOf(work).doWork(resource); will(returnValue(result));
oneOf(pool).returnResource(resource);
oneOf(resource).close();
}});
Assert.assertEquals(result, PoolUtils.doWorkInPool(this.pool, this.work));
}

@Test
public void testDoWorkInPoolNicely() throws Exception {
final String resource = "foo";
final String result = "bar";
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(resource));
oneOf(work).doWork(resource); will(returnValue(result));
oneOf(pool).returnResource(resource);
oneOf(resource).close();
}});
Assert.assertEquals(result, PoolUtils.doWorkInPoolNicely(this.pool, this.work));
}

@Test(expected = RuntimeException.class)
public void testDoWorkInPoolNicely_ThrowRuntimeEx() throws Exception {
final String resource = "foo";
final RuntimeException rte = new RuntimeException("foo");
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(resource));
oneOf(work).doWork(resource); will(throwException(rte));
oneOf(pool).returnResource(resource);
oneOf(resource).close();
}});
PoolUtils.doWorkInPoolNicely(this.pool, this.work);
}

@Test(expected = RuntimeException.class)
public void testDoWorkInPoolNicely_ThrowEx() throws Exception {
final String resource = "foo";
final Exception ex = new Exception("foo");
this.mockCtx.checking(new Expectations(){{
oneOf(pool).getResource(); will(returnValue(resource));
oneOf(work).doWork(resource); will(throwException(ex));
oneOf(pool).returnResource(resource);
oneOf(resource).close();
}});
PoolUtils.doWorkInPoolNicely(this.pool, this.work);
}
Expand Down

0 comments on commit aac052d

Please sign in to comment.