Skip to content

Commit

Permalink
More cleanups.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Jan 4, 2016
1 parent b7555db commit 97899e9
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 39 deletions.
118 changes: 83 additions & 35 deletions huey/bin/huey_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,41 +42,85 @@ def setup_logger(loglevel, logfile, worker_type):
def get_option_parser():
parser = optparse.OptionParser(
'Usage: %prog [options] path.to.huey_instance')
parser.add_option('-l', '--logfile', dest='logfile',
help='write logs to FILE', metavar='FILE')
parser.add_option('-v', '--verbose', dest='verbose',
help='verbose logging', action='store_true')
parser.add_option('-q', '--quiet', dest='verbose',
help='log exceptions only', action='store_false')
parser.add_option('-w', '--workers', dest='workers', type='int',
help='worker threads (default=1)', default=1)
parser.add_option('-t', '--threads', dest='workers', type='int',
help='same as "workers"', default=1)
parser.add_option('-k', '--worker-type', dest='worker_type',
help='worker type (thread, greenlet, process)',
default='thread')
parser.add_option('-n', '--no-periodic', action='store_false',
default=True, dest='periodic',
help='do NOT execute periodic tasks')
parser.add_option('-d', '--delay', dest='initial_delay', type='float',
help='initial delay in seconds (default=0.1)',
default=0.1)
parser.add_option('-m', '--max-delay', dest='max_delay', type='float',
help='maximum time to wait between polling the queue '
'(default=10)',
default=10)
parser.add_option('-b', '--backoff', dest='backoff', type='float',
help='amount to backoff delay when no results present '
'(default=1.15)',
default=1.15)
parser.add_option('-S', '--scheduler-interval', dest='scheduler_interval',
type='int', help='Granularity of scheduler.',
default=1)
parser.add_option('-u', '--utc', dest='utc', action='store_true',
help='use UTC time for all tasks (default=True)',
default=True)
parser.add_option('--localtime', dest='utc', action='store_false',
help='use local time for all tasks')

log_opts = parser.add_option_group(
'Logging',
'The following options pertain to the logging system.')
log_opts.add_option('-l', '--logfile',
dest='logfile',
help='write logs to FILE',
metavar='FILE')
log_opts.add_option('-v', '--verbose',
action='store_true',
dest='verbose',
help='log debugging statements')
log_opts.add_option('-q', '--quiet',
action='store_false',
dest='verbose',
help='only log exceptions')

worker_opts = parser.add_option_group(
'Workers',
('By default huey uses a single worker thread. To specify a different '
'number of workers, or a different execution model (such as multiple '
'processes or greenlets), use the options below.'))
worker_opts.add_option('-w', '--workers',
dest='workers',
type='int',
help='number of worker threads/processes (default=1)',
default=1)
worker_opts.add_option('-k', '--worker-type',
dest='worker_type',
help='worker execution model (thread, greenlet, process).',
default='thread',
choices=['greenlet', 'thread', 'process', 'gevent'])
worker_opts.add_option('-r', '--read-timeout',
dest='read_timeout',
type='float',
help=('read timeout used for queues that block rather than poll '
'(default=1)'),
default=1.0)
worker_opts.add_option('-d', '--delay',
dest='initial_delay',
type='float',
help='initial delay between polling intervals in seconds (default=0.1)',
default=0.1)
worker_opts.add_option('-m', '--max-delay',
dest='max_delay',
type='float',
help='maximum time to wait between polling the queue (default=10)',
default=10)
worker_opts.add_option('-b', '--backoff',
dest='backoff',
type='float',
help='amount to backoff delay when no results present (default=1.15)',
default=1.15)

scheduler_opts = parser.add_option_group(
'Scheduler',
('By default Huey will run the scheduler once every second to check '
'for tasks scheduled in the future, or tasks set to run at specific '
'intervals (periodic tasks). Use the options below to configure the '
'scheduler or to disable periodic task scheduling.'))
scheduler_opts.add_option('-s', '--scheduler-interval',
dest='scheduler_interval',
type='int',
help='Granularity of scheduler in seconds.',
default=1)
scheduler_opts.add_option('-n', '--no-periodic',
action='store_false',
default=True,
dest='periodic',
help='do NOT schedule periodic tasks')
scheduler_opts.add_option('-u', '--utc',
dest='utc',
action='store_true',
help='use UTC time for all tasks (default=True)',
default=True)
scheduler_opts.add_option('--localtime',
dest='utc',
action='store_false',
help='use local time for all tasks')
return parser


Expand Down Expand Up @@ -106,6 +150,10 @@ def consumer_main():
err('Example: huey_consumer.py app.queue.huey_instance')
sys.exit(1)

if options.workers < 1:
err('You must have at least one worker.')
sys.exit(1)

huey_instance = load_huey(args[0])

consumer = Consumer(
Expand Down
2 changes: 1 addition & 1 deletion huey/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def __init__(self, huey, workers=1, periodic=True, initial_delay=0.1,
self.backoff = backoff
self.max_delay = max_delay
self.utc = utc
self.scheduler_interval = scheduler_interval
self.scheduler_interval = max(min(scheduler_interval, 60), 1)
self.worker_type = worker_type
if worker_type not in worker_to_environment:
raise ValueError('worker_type must be one of %s.' %
Expand Down
1 change: 1 addition & 0 deletions huey/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
from huey.tests.test_registry import *
from huey.tests.test_storage import *
from huey.tests.test_utils import *
from huey.tests.test_wrapper import *
17 changes: 14 additions & 3 deletions huey/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ def test_threaded_execution(self):

with CaptureLogs() as capture:
consumer.start()
while r1.get() != 'v1' and r2.get() != 'v2' and r3.get() != 'v3':
time.sleep(.05)

r1.get(blocking=True)
r2.get(blocking=True)
r3.get(blocking=True)

consumer.stop()
for worker in consumer.worker_threads:
worker.join()
#consumer.wait_finished()

self.assertEqual(state, {'k1': 'v1', 'k2': 'v2', 'k3': 'v3'})

Expand All @@ -84,6 +85,16 @@ def setUp(self):
def get_periodic_tasks(self):
return [hourly_task.task_class()]

def test_scheduler_interval(self):
consumer = self.get_consumer(scheduler_interval=0.1)
self.assertEqual(consumer.scheduler_interval, 1)

consumer = self.get_consumer(scheduler_interval=120)
self.assertEqual(consumer.scheduler_interval, 60)

consumer = self.get_consumer(scheduler_interval=10)
self.assertEqual(consumer.scheduler_interval, 10)

def test_message_processing(self):
worker = self.consumer._create_worker()
self.assertEqual(state, {})
Expand Down
62 changes: 62 additions & 0 deletions huey/tests/test_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from huey.tests.base import BaseTestCase
from huey.wrapper import RedisHueyExt


state = {}

def pre_task():
state['pre'] = True

def post_task():
state['post'] = True

huey = RedisHueyExt()

@huey.task(pre_task, post_task)
def set_value(k, v):
state[k] = v
return v

@huey.task(pre_task)
def set_value_pre(k, v):
state[k] = v
return v

@huey.task(None, post_task)
def set_value_post(k, v):
state[k] = v
return v


class TestTaskWrapper(BaseTestCase):
def setUp(self):
super(TestTaskWrapper, self).setUp()
global state
state = {}

def test_task_wrapper(self):
self.assertEqual(state, {})

set_value('foo', 'bar')
huey.execute(huey.dequeue())

self.assertEqual(state, {
'pre': True,
'post': True,
'foo': 'bar'})

def test_pretask_only(self):
set_value_pre('foo', 'bar')
huey.execute(huey.dequeue())

self.assertEqual(state, {
'pre': True,
'foo': 'bar'})

def test_posttask_only(self):
set_value_post('foo', 'bar')
huey.execute(huey.dequeue())

self.assertEqual(state, {
'post': True,
'foo': 'bar'})
35 changes: 35 additions & 0 deletions huey/wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from functools import wraps

from huey import RedisHuey


def _task_wrapper(task_fn, pre_task=None, post_task=None):
@wraps(task_fn)
def inner(*args, **kwargs):
if pre_task is not None:
pre_task()
result = task_fn(*args, **kwargs)
if post_task is not None:
post_task()
return result
return inner


class RedisHueyExt(RedisHuey):
def task(self, pre_task=None, post_task=None, *args, **kwargs):
def decorator(fn):
return (super(RedisHueyExt, self)
.task(*args, **kwargs)(_task_wrapper(
fn,
pre_task=pre_task,
post_task=post_task)))
return decorator

def periodic_task(self, pre_task=None, post_task=None, *args, **kwargs):
def decorator(fn):
return (super(RedisHueyExt, self)
.periodic_task(*args, **kwargs)(_task_wrapper(
fn,
pre_task=pre_task,
post_task=post_task)))
return decorator

0 comments on commit 97899e9

Please sign in to comment.