Skip to content
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
490 lines (403 sloc) 17 KB
import datetime
import logging
import os
import signal
import sys
import threading
import time
from multiprocessing import Event as ProcessEvent
from multiprocessing import Process
import gevent
from gevent import Greenlet
from gevent.event import Event as GreenEvent
except ImportError:
Greenlet = GreenEvent = None
from huey.constants import WORKER_GREENLET
from huey.constants import WORKER_PROCESS
from huey.constants import WORKER_THREAD
from huey.constants import WORKER_TYPES
from huey.exceptions import ConfigurationError
class BaseProcess(object):
process_name = 'BaseProcess'
def __init__(self, huey):
self.huey = huey
self._logger = self.create_logger()
def create_logger(self):
return logging.getLogger('huey.consumer.%s' % self.process_name)
def initialize(self):
def sleep_for_interval(self, start_ts, nseconds):
Sleep for a given interval with respect to the start timestamp.
So, if the start timestamp is 1337 and nseconds is 10, the method will
actually sleep for nseconds - (current_timestamp - start_timestamp). So
if the current timestamp is 1340, we'll only sleep for 7 seconds (the
goal being to sleep until 1347, or 1337 + 10).
sleep_time = nseconds - (time.time() - start_ts)
if sleep_time <= 0:
self._logger.debug('Sleeping for %s', sleep_time)
# Recompute time to sleep to improve accuracy in case the process was
# pre-empted by the kernel while logging.
sleep_time = nseconds - (time.time() - start_ts)
if sleep_time > 0:
def loop(self, now=None):
Process-specific implementation. Called repeatedly for as long as the
consumer is running. The `now` parameter is currently only used in the
unit-tests (to avoid monkey-patching datetime / time). Return value is
ignored, but an unhandled exception will lead to the process exiting.
raise NotImplementedError
class Worker(BaseProcess):
Worker implementation.
Will pull tasks from the queue, executing them or adding them to the
schedule if they are set to run in the future.
process_name = 'Worker'
def __init__(self, huey, default_delay, max_delay, backoff):
self.delay = self.default_delay = default_delay
self.max_delay = max_delay
self.backoff = backoff
super(Worker, self).__init__(huey)
def initialize(self):
for name, startup_hook in self.huey._startup.items():
self._logger.debug('calling startup hook "%s"', name)
except Exception as exc:
self._logger.exception('startup hook "%s" failed', name)
def loop(self, now=None):
task = None
task = self.huey.dequeue()
except Exception:
self._logger.exception('Error reading from queue')
if task is not None:
self.delay = self.default_delay
self.huey.execute(task, now)
except Exception as exc:
self._logger.exception('Unhandled error during execution '
'of task %s.',
elif not
def sleep(self):
if self.delay > self.max_delay:
self.delay = self.max_delay
self.delay *= self.backoff
class Scheduler(BaseProcess):
Scheduler handles enqueueing tasks when they are scheduled to execute. Note
that the scheduler does not actually execute any tasks, but simply enqueues
them so that they can be picked up by the worker processes.
If periodic tasks are enabled, the scheduler will wake up every 60 seconds
to enqueue any periodic tasks that should be run.
process_name = 'Scheduler'
def __init__(self, huey, interval, periodic):
super(Scheduler, self).__init__(huey)
self.interval = min(interval, 60)
self.periodic = periodic
self._next_loop = time.time()
self._next_periodic = time.time()
def loop(self, now=None):
current = self._next_loop
self._next_loop += self.interval
if self._next_loop < time.time():
self._logger.debug('scheduler skipping iteration to avoid race.')
task_list = self.huey.read_schedule(now)
except Exception:
self._logger.exception('Error reading schedule.')
for task in task_list:
self._logger.debug('Enqueueing %s', task)
if self.periodic:
current_p = self._next_periodic
if current_p <= time.time():
self._next_periodic += 60
self.enqueue_periodic_tasks(now, current)
self.sleep_for_interval(current, self.interval)
def enqueue_periodic_tasks(self, now, start):
self._logger.debug('Checking periodic tasks')
for task in self.huey.read_periodic(now):'Enqueueing periodic task %s.', task)
class Environment(object):
Provide a common interface to the supported concurrent environments.
def get_stop_flag(self):
raise NotImplementedError
def create_process(self, runnable, name):
raise NotImplementedError
def is_alive(self, proc):
raise NotImplementedError
class ThreadEnvironment(Environment):
def get_stop_flag(self):
return threading.Event()
def create_process(self, runnable, name):
t = threading.Thread(target=runnable, name=name)
t.daemon = True
return t
def is_alive(self, proc):
return proc.isAlive()
class GreenletEnvironment(Environment):
def get_stop_flag(self):
return GreenEvent()
def create_process(self, runnable, name):
def run_wrapper():
return Greenlet(run=run_wrapper)
def is_alive(self, proc):
return not proc.dead
class ProcessEnvironment(Environment):
def get_stop_flag(self):
return ProcessEvent()
def create_process(self, runnable, name):
p = Process(target=runnable, name=name)
p.daemon = True
return p
def is_alive(self, proc):
return proc.is_alive()
WORKER_THREAD: ThreadEnvironment,
WORKER_GREENLET: GreenletEnvironment,
'gevent': GreenletEnvironment, # Preserved for backwards-compat.
WORKER_PROCESS: ProcessEnvironment,
class Consumer(object):
Consumer sets up and coordinates the execution of the workers and scheduler
and registers signal handlers.
# Simplify providing custom implementations. See _create_worker and
# _create_scheduler if you need more sophisticated overrides.
worker_class = Worker
scheduler_class = Scheduler
def __init__(self, huey, workers=1, periodic=True, initial_delay=0.1,
backoff=1.15, max_delay=10.0, scheduler_interval=1,
worker_type=WORKER_THREAD, check_worker_health=True,
health_check_interval=10, flush_locks=False):
self._logger = logging.getLogger('huey.consumer')
if huey.immediate:
self._logger.warning('Consumer initialized with Huey instance '
'that has "immediate" mode enabled. This '
'must be disabled before the consumer can '
'be run.')
self.huey = huey
self.workers = workers # Number of workers.
self.periodic = periodic # Enable periodic task scheduler?
self.default_delay = initial_delay # Default queue polling interval.
self.backoff = backoff # Exponential backoff factor when queue empty.
self.max_delay = max_delay # Maximum interval between polling events.
# Ensure that the scheduler runs at an interval between 1 and 60s.
self.scheduler_interval = max(min(scheduler_interval, 60), 1)
if 60 % self.scheduler_interval != 0:
raise ConfigurationError('Scheduler interval must be a factor '
'of 60, e.g. 1, 2, 3, 4, 5, 6, 10, 12...')
self.worker_type = worker_type # What process model are we using?
# Configure health-check and consumer main-loop attributes.
self._stop_flag_timeout = 0.1
self._health_check = check_worker_health
self._health_check_interval = float(health_check_interval)
# Create the execution environment helper.
self.environment = self.get_environment(self.worker_type)
# Create the event used to signal the process should terminate. We'll
# also store a boolean flag to indicate whether we should restart after
# the processes are cleaned up.
self._received_signal = False
self._restart = False
self._graceful = True
self.stop_flag = self.environment.get_stop_flag()
# In the event the consumer was killed while running a task that held
# a lock, this ensures that all locks are flushed before starting.
if flush_locks:
# Create the scheduler process (but don't start it yet).
scheduler = self._create_scheduler()
self.scheduler = self._create_process(scheduler, 'Scheduler')
# Create the worker process(es) (also not started yet).
self.worker_threads = []
for i in range(workers):
worker = self._create_worker()
process = self._create_process(worker, 'Worker-%d' % (i + 1))
# The worker threads are stored as [(worker impl, worker_t), ...].
# The worker impl is not currently referenced in any consumer code,
# but it is referenced in the test-suite.
self.worker_threads.append((worker, process))
def flush_locks(self):
self._logger.debug('Flushing locks before starting up.')
flushed = self.huey.flush_locks()
if flushed:
self._logger.warning('Found stale locks: %s' % (
', '.join(key for key in flushed)))
def get_environment(self, worker_type):
if worker_type not in WORKER_TO_ENVIRONMENT:
raise ValueError('worker_type must be one of %s.' %
', '.join(WORKER_TYPES))
return WORKER_TO_ENVIRONMENT[worker_type]()
def _create_worker(self):
return self.worker_class(
def _create_scheduler(self):
return self.scheduler_class(
def _create_process(self, process, name):
Repeatedly call the `loop()` method of the given process. Unhandled
exceptions in the `loop()` method will cause the process to terminate.
def _run():
while not self.stop_flag.is_set():
except KeyboardInterrupt:
self._logger.exception('Process %s died!', name)
return self.environment.create_process(_run, name)
def start(self):
Start all consumer processes and register signal handlers.
if self.huey.immediate:
raise ConfigurationError(
'Consumer cannot be run with Huey instances where immediate '
'is enabled. Please check your configuration and ensure that '
'"huey.immediate = False".')
# Log startup message.'Huey consumer started with %s %s, PID %s at %s',
self.workers, self.worker_type, os.getpid(),
self.huey._get_timestamp())'Scheduler runs every %s second(s).',
self.scheduler_interval)'Periodic tasks are %s.',
'enabled' if self.periodic else 'disabled')
msg = ['The following commands are available:']
for command in self.huey._registry._registry:
msg.append('+ %s' % command)'\n'.join(msg))
# We'll temporarily ignore SIGINT and SIGHUP (so that it is inherited
# by the child-processes). Once the child processes are created, we
# restore the handler.
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
if hasattr(signal, 'SIGHUP'):
original_sighup_handler = signal.signal(signal.SIGHUP, signal.SIG_IGN)
for _, worker_process in self.worker_threads:
signal.signal(signal.SIGINT, original_sigint_handler)
if hasattr(signal, 'SIGHUP'):
signal.signal(signal.SIGHUP, original_sighup_handler)
def stop(self, graceful=False):
Set the stop-flag.
If `graceful=True`, this method blocks until the workers to finish
executing any tasks they might be currently working on.
if graceful:'Shutting down gracefully...')
for _, worker_process in self.worker_threads:
except KeyboardInterrupt:'Received request to shut down now.')
else:'All workers have stopped.')
else:'Shutting down')
def run(self):
Run the consumer.
timeout = self._stop_flag_timeout
health_check_ts = time.time()
while True:
except KeyboardInterrupt:'Received SIGINT')
self._logger.exception('Error in consumer.')
if self._received_signal:
if self.stop_flag.is_set():
if self._health_check:
now = time.time()
if now >= health_check_ts + self._health_check_interval:
health_check_ts = now
if self._restart:'Consumer will restart.')
python = sys.executable
os.execl(python, python, *sys.argv)
else:'Consumer exiting.')
def check_worker_health(self):
Check the health of the worker processes. Workers that have died will
be replaced with new workers.
self._logger.debug('Checking worker health.')
workers = []
restart_occurred = False
for i, (worker, worker_t) in enumerate(self.worker_threads):
if not self.environment.is_alive(worker_t):
self._logger.warning('Worker %d died, restarting.', i + 1)
worker = self._create_worker()
worker_t = self._create_process(worker, 'Worker-%d' % (i + 1))
restart_occurred = True
workers.append((worker, worker_t))
if restart_occurred:
self.worker_threads = workers
self._logger.debug('Workers are up and running.')
if not self.environment.is_alive(self.scheduler):
self._logger.warning('Scheduler died, restarting.')
scheduler = self._create_scheduler()
self.scheduler = self._create_process(scheduler, 'Scheduler')
self._logger.debug('Scheduler is up and running.')
return not restart_occurred
def _set_signal_handlers(self):
signal.signal(signal.SIGTERM, self._handle_stop_signal)
signal.signal(signal.SIGINT, signal.default_int_handler)
if hasattr(signal, 'SIGHUP'):
signal.signal(signal.SIGHUP, self._handle_restart_signal)
def _handle_stop_signal(self, sig_num, frame):'Received SIGTERM')
self._received_signal = True
self._restart = False
self._graceful = False
def _handle_restart_signal(self, sig_num, frame):'Received SIGHUP, will restart')
self._received_signal = True
self._restart = True
You can’t perform that action at this time.