Skip to content

Commit

Permalink
Testing a threading delay
Browse files Browse the repository at this point in the history
  • Loading branch information
Koed00 committed Jul 11, 2015
1 parent 6c9dd1a commit 74af34f
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions django_q/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from multiprocessing import Queue, Value
from threading import Timer
from builtins import int

try:
import cPickle as pickle
Expand Down Expand Up @@ -28,16 +30,26 @@ def async(func, *args, **kwargs):
r = kwargs.pop('redis', redis_client)
# optional sync mode
s = kwargs.pop('sync', False)
# optional delay
d = kwargs.pop('delay', False)
# get an id
tag = uuid()
# build the task package
task = {'id': tag[1], 'name': tag[0], 'func': func, 'hook': hook, 'args': args, 'kwargs': kwargs,
'started': timezone.now()}
# sign it
pack = signing.SignedPackage.dumps(task)
# sync option
if s:
return _sync(task['id'], pack)
# push it
_sync(pack)
logger.debug('Sync executing {}'.format(tag))
return task['id']
# delay option
if d and isinstance(d, int):
Timer(d, r.rpush, [list_key, pack]).start()
logger.debug('Delayed pushing {} by {} seconds'.format(tag, d))
return task['id']
# push it real good
r.rpush(list_key, pack)
logger.debug('Pushed {}'.format(tag))
return task['id']
Expand Down Expand Up @@ -94,7 +106,7 @@ def fetch(task_id):
return Task.get_task(task_id)


def _sync(task_id, pack):
def _sync(pack):
"""
Simulates a package travelling through the cluster.
Expand All @@ -106,4 +118,3 @@ def _sync(task_id, pack):
cluster.worker(task_queue, result_queue, Value('b', -1))
result_queue.put('STOP')
cluster.monitor(result_queue)
return task_id

0 comments on commit 74af34f

Please sign in to comment.