From 0126723dfca3ca4e1298fe642b07488055221c55 Mon Sep 17 00:00:00 2001 From: Greg Haines Date: Fri, 7 Feb 2014 15:56:08 -0500 Subject: [PATCH] Remove WorkerExitOnEmpty as it can be replicated in local project, or better yet, implemented with a WorkerListener. --- .../jesque/worker/WorkerExitOnEmpty.java | 92 ------------------- 1 file changed, 92 deletions(-) delete mode 100755 src/main/java/net/greghaines/jesque/worker/WorkerExitOnEmpty.java diff --git a/src/main/java/net/greghaines/jesque/worker/WorkerExitOnEmpty.java b/src/main/java/net/greghaines/jesque/worker/WorkerExitOnEmpty.java deleted file mode 100755 index 34f85538..00000000 --- a/src/main/java/net/greghaines/jesque/worker/WorkerExitOnEmpty.java +++ /dev/null @@ -1,92 +0,0 @@ -package net.greghaines.jesque.worker; - -import static net.greghaines.jesque.utils.ResqueConstants.QUEUE; -import static net.greghaines.jesque.worker.JobExecutor.State.RUNNING; -import static net.greghaines.jesque.worker.WorkerEvent.WORKER_POLL; - -import java.util.Collection; -import java.util.concurrent.TimeUnit; - -import net.greghaines.jesque.Config; -import net.greghaines.jesque.Job; -import net.greghaines.jesque.json.ObjectMapperFactory; -import net.greghaines.jesque.utils.JesqueUtils; - -/** - * An implementation of Worker that exits if all queues are empty - * maxLoopOnEmptyQueues times. - *

- * This is useful if you have more queues that you want active workers for at - * one time, and you don't want workers working multiple queues. - *

- * Essentially once a worker starts a queue you want that queue worked to - * exhaustion, then you want that worker to die, and to start up another worker - * on a new queue. - * - * @author Timothy Hruska - */ -public class WorkerExitOnEmpty extends WorkerImpl { - - public static final int DEFAULT_MAX_LOOPS_ON_EMPTY_QUEUES = 3; - - private final int maxLoopsOnEmptyQueues; - - public WorkerExitOnEmpty(final Config config, final Collection queues, - final JobFactory jobFactory) { - this(config, queues, jobFactory, DEFAULT_MAX_LOOPS_ON_EMPTY_QUEUES); - } - - public WorkerExitOnEmpty(final Config config, final Collection queues, - final JobFactory jobFactory, final int maxLoopsOnEmptyQueues) { - super(config, queues, jobFactory); - this.maxLoopsOnEmptyQueues = maxLoopsOnEmptyQueues; - } - - /** - * Polls the queues for jobs and executes them.
- * Exits if all queues are empty maxLoopOnEmptyQueues times - * - * @see net.greghaines.jesque.worker.WorkerImpl#poll() - */ - @Override - protected void poll() { - int missCount = 0; - String curQueue = null; - int allQueuesEmptyCount = 0; - while (RUNNING.equals(this.state.get())) { - try { - if (isThreadNameChangingEnabled()) { - renameThread("Waiting for " + JesqueUtils.join(",", this.queueNames)); - } - curQueue = this.queueNames.poll(EMPTY_QUEUE_SLEEP_TIME, TimeUnit.MILLISECONDS); - if (curQueue != null) { - this.queueNames.add(curQueue); // Rotate the queues - checkPaused(); - // Might have been waiting in poll()/checkPaused() for a - // while - if (RUNNING.equals(this.state.get())) { - this.listenerDelegate.fireEvent(WORKER_POLL, this, curQueue, null, null, null, null); - final String payload = this.jedis.lpop(key(QUEUE, curQueue)); - if (payload != null) { - final Job job = ObjectMapperFactory.get().readValue(payload, Job.class); - process(job, curQueue); - missCount = 0; - allQueuesEmptyCount = 0; - } else if ((++missCount >= this.queueNames.size()) && RUNNING.equals(this.state.get())) { - // Keeps worker from busy-spinning on empty queues - missCount = 0; - Thread.sleep(EMPTY_QUEUE_SLEEP_TIME); - allQueuesEmptyCount++; - } - if (allQueuesEmptyCount >= this.maxLoopsOnEmptyQueues) { - // Set state to SHUTDOWN to break the loop - end(false); - } - } - } - } catch (Exception e) { - recoverFromException(curQueue, e); - } - } - } -}