Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Restructuring imports for simplicity

  • Loading branch information...
commit 4d35ecb06366658e16236de1d2bbd77662a2608f 1 parent 7fdeb9f
@coleifer authored
View
3  huey/__init__.py
@@ -0,0 +1,3 @@
+from huey.bin.config import BaseConfiguration
+from huey.decorators import queue_command, periodic_command, crontab
+from huey.queue import Invoker
View
3  huey/tests/config.py
@@ -1,7 +1,6 @@
import logging
+from huey import BaseConfiguration, Invoker
from huey.backends.dummy import DummyQueue, DummyDataStore
-from huey.bin.config import BaseConfiguration
-from huey.queue import Invoker
test_queue = DummyQueue('test-queue')
View
171 huey/tests/consumer.py
@@ -4,13 +4,12 @@
import time
import unittest
+from huey import queue_command, Invoker, BaseConfiguration
from huey.backends.dummy import DummyQueue, DummyDataStore
-from huey.decorators import queue_command
from huey.exceptions import QueueException
-from huey.queue import Invoker, QueueCommand, PeriodicQueueCommand
+from huey.queue import QueueCommand, PeriodicQueueCommand
from huey.registry import registry
from huey.utils import local_to_utc
-from huey.bin.config import BaseConfiguration
from huey.bin.huey_consumer import load_config, Consumer, IterableQueue
@@ -60,7 +59,7 @@ class TestLogHandler(logging.Handler):
def __init__(self, *args, **kwargs):
self.messages = []
logging.Handler.__init__(self, *args, **kwargs)
-
+
def emit(self, record):
self.messages.append(record.getMessage())
@@ -69,86 +68,86 @@ class SkewConsumerTestCase(unittest.TestCase):
def setUp(self):
global state
state = {}
-
+
self.orig_sleep = time.sleep
time.sleep = lambda x: None
-
+
self.consumer = Consumer(test_invoker, DummyConfiguration)
self.handler = TestLogHandler()
self.consumer.logger.addHandler(self.handler)
-
+
def tearDown(self):
self.consumer.shutdown()
self.consumer.logger.removeHandler(self.handler)
time.sleep = self.orig_sleep
-
+
def test_consumer_loader(self):
config = load_config('huey.tests.config.Config')
self.assertTrue(isinstance(config.QUEUE, DummyQueue))
self.assertEqual(config.QUEUE.name, 'test-queue')
-
+
def spawn(self, func, *args, **kwargs):
t = threading.Thread(target=func, args=args, kwargs=kwargs)
t.start()
return t
-
+
def test_iterable_queue(self):
store = []
q = IterableQueue()
-
+
def do_queue(queue, result):
for message in queue:
result.append(message)
-
+
t = self.spawn(do_queue, q, store)
q.put(1)
q.put(2)
q.put(StopIteration)
-
+
t.join()
self.assertFalse(t.is_alive())
self.assertEqual(store, [1, 2])
-
+
def test_message_processing(self):
self.consumer.start_message_receiver()
self.consumer.start_worker_pool()
-
+
self.assertFalse('k' in state)
-
+
res = modify_state('k', 'v')
res.get(blocking=True)
-
+
self.assertTrue('k' in state)
self.assertEqual(res.get(), 'v')
-
+
def test_worker(self):
res = modify_state('x', 'y')
-
+
cmd = test_invoker.dequeue()
self.assertEqual(res.get(), None)
-
+
# we will be calling release() after finishing work
self.consumer._pool.acquire()
self.consumer.worker(cmd)
-
+
self.assertTrue('x' in state)
self.assertEqual(res.get(), 'y')
-
+
def test_worker_exception(self):
res = blow_up()
cmd = test_invoker.dequeue()
-
+
self.consumer._pool.acquire()
self.consumer.worker(cmd)
-
+
self.assertEqual(self.handler.messages, [
'unhandled exception in worker thread',
])
-
+
def test_retries_and_logging(self):
# this will continually fail
res = retry_command('blampf')
-
+
cmd = test_invoker.dequeue()
self.consumer._pool.acquire()
self.consumer.worker(cmd)
@@ -156,7 +155,7 @@ def test_retries_and_logging(self):
'unhandled exception in worker thread',
're-enqueueing task %s, 2 tries left' % cmd.task_id,
])
-
+
cmd = test_invoker.dequeue()
self.assertEqual(cmd.retries, 2)
self.consumer._pool.acquire()
@@ -165,7 +164,7 @@ def test_retries_and_logging(self):
'unhandled exception in worker thread',
're-enqueueing task %s, 1 tries left' % cmd.task_id,
])
-
+
cmd = test_invoker.dequeue()
self.assertEqual(cmd.retries, 1)
self.consumer._pool.acquire()
@@ -174,7 +173,7 @@ def test_retries_and_logging(self):
'unhandled exception in worker thread',
're-enqueueing task %s, 0 tries left' % cmd.task_id,
])
-
+
cmd = test_invoker.dequeue()
self.assertEqual(cmd.retries, 0)
self.consumer._pool.acquire()
@@ -183,14 +182,14 @@ def test_retries_and_logging(self):
self.assertEqual(self.handler.messages[-1:], [
'unhandled exception in worker thread',
])
-
+
self.assertEqual(test_invoker.dequeue(), None)
-
+
def test_retries_with_success(self):
# this will fail once, then succeed
res = retry_command('blampf', False)
self.assertFalse('blampf' in state)
-
+
cmd = test_invoker.dequeue()
self.consumer._pool.acquire()
self.consumer.worker(cmd)
@@ -198,98 +197,98 @@ def test_retries_with_success(self):
'unhandled exception in worker thread',
're-enqueueing task %s, 2 tries left' % cmd.task_id,
])
-
+
cmd = test_invoker.dequeue()
self.assertEqual(cmd.retries, 2)
self.consumer._pool.acquire()
self.consumer.worker(cmd)
-
+
self.assertEqual(state['blampf'], 'fixed')
-
+
self.assertEqual(test_invoker.dequeue(), None)
-
+
def test_pooling(self):
# simulate acquiring two worker threads
self.consumer._pool.acquire()
self.consumer._pool.acquire()
-
+
res = modify_state('x', 'y')
-
- # dequeue a *single* message
+
+ # dequeue a *single* message
pt = self.spawn(self.consumer.check_message)
-
+
# work on any messages generated by the processor thread
st = self.spawn(self.consumer.worker_pool)
-
+
# our result is not available since all workers are blocked
self.assertEqual(res.get(), None)
self.assertFalse(self.consumer._pool.acquire(blocking=False))
-
+
# our processor is waiting
self.assertTrue(pt.is_alive())
self.assertEqual(self.consumer._queue.qsize(), 0)
-
+
# release a worker
self.consumer._pool.release()
-
+
# we can get and block now, but will set a timeout of 3 to indicate that
# something is wrong
self.assertEqual(res.get(blocking=True, timeout=3), 'y')
-
+
# this is done
pt.join()
-
+
def test_scheduling(self):
dt = datetime.datetime(2011, 1, 1, 0, 0)
dt2 = datetime.datetime(2037, 1, 1, 0, 0)
r1 = modify_state.schedule(args=('k', 'v'), eta=dt, convert_utc=False)
r2 = modify_state.schedule(args=('k2', 'v2'), eta=dt2, convert_utc=False)
-
- # dequeue a *single* message
+
+ # dequeue a *single* message
pt = self.spawn(self.consumer.check_message)
-
+
# work on any messages generated by the processor thread
st = self.spawn(self.consumer.worker_pool)
-
+
pt.join()
self.assertTrue('k' in state)
self.assertEqual(self.consumer.schedule._schedule, {})
-
- # dequeue a *single* message
+
+ # dequeue a *single* message
pt = self.spawn(self.consumer.check_message)
pt.join()
-
+
# it got stored in the schedule instead of executing
self.assertFalse('k2' in state)
self.assertTrue(r2.task_id in self.consumer.schedule._schedule)
-
+
# run through an iteration of the scheduler
self.consumer.check_schedule(dt)
-
+
# our command was not enqueued
self.assertEqual(len(self.consumer.invoker.queue), 0)
-
+
# try running the scheduler with the time the command should run
self.consumer.check_schedule(dt2)
-
+
# it was enqueued
self.assertEqual(len(self.consumer.invoker.queue), 1)
self.assertEqual(self.consumer.schedule._schedule, {})
-
+
# dequeue and inspect -- it won't be executed because the scheduler will
# see that it is scheduled to run in the future and plop it back into the
# schedule
command = self.consumer.invoker.dequeue()
self.assertEqual(command.task_id, r2.task_id)
self.assertEqual(command.execute_time, dt2)
-
+
def test_retry_scheduling(self):
# this will continually fail
res = retry_command_slow('blampf')
self.assertEqual(self.consumer.schedule._schedule, {})
-
+
cur_time = datetime.datetime.utcnow()
-
+
cmd = test_invoker.dequeue()
self.consumer._pool.acquire()
self.consumer.worker(cmd)
@@ -297,93 +296,93 @@ def test_retry_scheduling(self):
'unhandled exception in worker thread',
're-enqueueing task %s, 2 tries left' % cmd.task_id,
])
-
+
self.assertEqual(self.consumer.schedule._schedule, {
cmd.task_id: cmd,
})
cmd_from_sched = self.consumer.schedule._schedule[cmd.task_id]
self.assertEqual(cmd_from_sched.retries, 2)
exec_time = cmd.execute_time
-
+
self.assertEqual((exec_time - cur_time).seconds, 10)
-
+
def test_schedule_local_utc(self):
dt = datetime.datetime(2011, 1, 1, 0, 0)
dt2 = datetime.datetime(2037, 1, 1, 0, 0)
r1 = modify_state.schedule(args=('k', 'v'), eta=dt)
r2 = modify_state.schedule(args=('k2', 'v2'), eta=dt2)
-
- # dequeue a *single* message
+
+ # dequeue a *single* message
pt = self.spawn(self.consumer.check_message)
-
+
# work on any messages generated by the processor thread
st = self.spawn(self.consumer.worker_pool)
-
+
pt.join()
self.assertTrue('k' in state)
self.assertEqual(self.consumer.schedule._schedule, {})
-
- # dequeue a *single* message
+
+ # dequeue a *single* message
pt = self.spawn(self.consumer.check_message)
pt.join()
-
+
# it got stored in the schedule instead of executing
self.assertFalse('k2' in state)
self.assertTrue(r2.task_id in self.consumer.schedule._schedule)
-
+
# run through an iteration of the scheduler
self.consumer.check_schedule(dt)
-
+
# our command was not enqueued
self.assertEqual(len(self.consumer.invoker.queue), 0)
-
+
# try running the scheduler with the time the command should run
self.consumer.check_schedule(local_to_utc(dt2))
-
+
# it was enqueued
self.assertEqual(len(self.consumer.invoker.queue), 1)
self.assertEqual(self.consumer.schedule._schedule, {})
-
+
# dequeue and inspect -- it won't be executed because the scheduler will
# see that it is scheduled to run in the future and plop it back into the
# schedule
command = self.consumer.invoker.dequeue()
self.assertEqual(command.task_id, r2.task_id)
self.assertEqual(command.execute_time, local_to_utc(dt2))
-
+
def test_schedule_persistence(self):
dt = datetime.datetime(2037, 1, 1, 0, 0)
dt2 = datetime.datetime(2037, 1, 1, 0, 1)
r = modify_state.schedule(args=('k', 'v'), eta=dt, convert_utc=False)
r2 = modify_state.schedule(args=('k2', 'v2'), eta=dt2, convert_utc=False)
-
+
# two messages in the queue
self.assertEqual(len(self.consumer.invoker.queue), 2)
-
+
# pull 'em down
self.consumer.check_message()
self.consumer.check_message()
-
+
self.consumer.save_schedule()
self.consumer.schedule._schedule = {}
-
+
self.consumer.load_schedule()
self.assertTrue(r.task_id in self.consumer.schedule._schedule)
self.assertTrue(r2.task_id in self.consumer.schedule._schedule)
-
+
cmd1 = self.consumer.schedule._schedule[r.task_id]
cmd2 = self.consumer.schedule._schedule[r2.task_id]
-
+
self.assertEqual(cmd1.execute_time, dt)
self.assertEqual(cmd2.execute_time, dt2)
-
+
# check w/conversion
r3 = modify_state.schedule(args=('k3', 'v3'), eta=dt)
self.consumer.check_message()
-
+
self.consumer.save_schedule()
self.consumer.schedule._schedule = {}
-
+
self.consumer.load_schedule()
cmd3 = self.consumer.schedule._schedule[r3.task_id]
self.assertEqual(cmd3.execute_time, local_to_utc(dt))
View
36 huey/tests/crontab.py
@@ -1,7 +1,7 @@
import datetime
import unittest
-from huey.decorators import crontab
+from huey import crontab
class SkewCrontabTestCase(unittest.TestCase):
@@ -9,52 +9,52 @@ def test_crontab_month(self):
# validates the following months, 1, 4, 7, 8, 9
valids = [1, 4, 7, 8, 9]
validate_m = crontab(month='1,4,*/6,8-9')
-
+
for x in xrange(1, 13):
res = validate_m(datetime.datetime(2011, x, 1))
self.assertEqual(res, x in valids)
-
+
def test_crontab_day(self):
# validates the following days
valids = [1, 4, 7, 8, 9, 13, 19, 25, 31]
validate_d = crontab(day='*/6,1,4,8-9')
-
+
for x in xrange(1, 32):
res = validate_d(datetime.datetime(2011, 1, x))
self.assertEqual(res, x in valids)
-
+
def test_crontab_hour(self):
# validates the following hours
valids = [0, 1, 4, 6, 8, 9, 12, 18]
validate_h = crontab(hour='8-9,*/6,1,4')
-
+
for x in xrange(24):
res = validate_h(datetime.datetime(2011, 1, 1, x))
self.assertEqual(res, x in valids)
-
+
edge = crontab(hour=0)
self.assertTrue(edge(datetime.datetime(2011, 1, 1, 0, 0)))
self.assertFalse(edge(datetime.datetime(2011, 1, 1, 12, 0)))
-
+
def test_crontab_minute(self):
# validates the following minutes
valids = [0, 1, 4, 6, 8, 9, 12, 18, 24, 30, 36, 42, 48, 54]
validate_m = crontab(minute='4,8-9,*/6,1')
-
+
for x in xrange(60):
res = validate_m(datetime.datetime(2011, 1, 1, 1, x))
self.assertEqual(res, x in valids)
-
+
def test_crontab_day_of_week(self):
# validates the following days of week
# jan, 1, 2011 is a saturday
valids = [2, 4, 9, 11, 16, 18, 23, 25, 30]
validate_dow = crontab(day_of_week='0,2')
-
+
for x in xrange(1, 32):
res = validate_dow(datetime.datetime(2011, 1, x))
self.assertEqual(res, x in valids)
-
+
def test_crontab_all_together(self):
# jan 1, 2011 is a saturday
# may 1, 2011 is a sunday
@@ -65,22 +65,22 @@ def test_crontab_all_together(self):
hour='*/4',
minute='1-5,10-15,50'
)
-
+
self.assertTrue(validate(datetime.datetime(2011, 5, 1, 4, 11)))
self.assertTrue(validate(datetime.datetime(2011, 5, 7, 20, 50)))
self.assertTrue(validate(datetime.datetime(2011, 1, 1, 0, 1)))
-
+
# fails validation on month
self.assertFalse(validate(datetime.datetime(2011, 6, 4, 4, 11)))
-
+
# fails validation on day
self.assertFalse(validate(datetime.datetime(2011, 1, 6, 4, 11)))
-
+
# fails validation on day_of_week
self.assertFalse(validate(datetime.datetime(2011, 1, 4, 4, 11)))
-
+
# fails validation on hour
self.assertFalse(validate(datetime.datetime(2011, 1, 1, 1, 11)))
-
+
# fails validation on minute
self.assertFalse(validate(datetime.datetime(2011, 1, 1, 4, 6)))
Please sign in to comment.
Something went wrong with that request. Please try again.