Skip to content

Commit

Permalink
More.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Dec 31, 2015
1 parent f85328c commit 326be5d
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 7 deletions.
2 changes: 1 addition & 1 deletion examples/simple/cons.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ echo "-------------"
echo "In another terminal, run 'python main.py'"
echo "Stop the consumer using Ctrl+C"
PYTHONPATH=.:$PYTHONPATH
python ../../huey/bin/huey_consumer.py main.huey --workers=4 -S 10 -k thread
python ../../huey/bin/huey_consumer.py main.huey --workers=4 -S 10 -k process
1 change: 0 additions & 1 deletion examples/simple/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from gevent import monkey; monkey.patch_all()
from config import huey
from tasks import count_beans

Expand Down
8 changes: 6 additions & 2 deletions huey/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ def start(self):

def stop(self):
self.stop_flag.set()
self._logger.info('Shutting down')

def wait_finished(self):
self.scheduler.join()
[worker.join() for worker in self.worker_threads]

def run(self):
self.start()
Expand All @@ -332,8 +337,7 @@ def run(self):
self._logger.exception('Error in consumer.')
self.stop()
else:
self.scheduler.join()
[worker.join() for worker in self.worker_threads]
self.wait_finished()

def _set_signal_handler(self):
self._logger.info('Setting signal handler')
Expand Down
2 changes: 1 addition & 1 deletion huey/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from huey.registry import registry


test_huey = RedisHuey('testing')
test_huey = RedisHuey('testing', blocking=False, read_timeout=0.1)

# Logger used by the consumer.
logger = logging.getLogger('huey.consumer')
Expand Down
35 changes: 33 additions & 2 deletions huey/tests/test_consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import datetime
import threading
import time

from huey import crontab
from huey.consumer import Consumer
from huey.consumer import Scheduler
from huey.consumer import Worker
from huey.tests.base import CaptureLogs
Expand All @@ -11,10 +14,13 @@
# Store some global state.
state = {}

lock = threading.Lock()

# Create some test tasks.
@test_huey.task()
def modify_state(k, v):
state[k] = v
with lock:
state[k] = v
return v

@test_huey.task()
Expand Down Expand Up @@ -42,9 +48,34 @@ def hourly_task():
state['p'] = 'y'


class TestExecution(HueyTestCase):
def create_consumer(self, worker_type='thread'):
return Consumer(
self.huey,
max_delay=0.1,
workers=2,
worker_type=worker_type)

def test_threaded_execution(self):
consumer = self.create_consumer()
r1 = modify_state('k1', 'v1')
r2 = modify_state('k2', 'v2')
r3 = modify_state('k3', 'v3')

with CaptureLogs() as capture:
consumer.start()
while r1.get() != 'v1' and r2.get() != 'v2' and r3.get() != 'v3':
time.sleep(.05)

consumer.stop()
consumer.wait_finished()

self.assertEqual(state, {'k1': 'v1', 'k2': 'v2', 'k3': 'v3'})


class TestConsumerAPIs(HueyTestCase):
def setUp(self):
super(TestConsumer, self).setUp()
super(TestConsumerAPIs, self).setUp()
global state
state = {}

Expand Down

0 comments on commit 326be5d

Please sign in to comment.