Skip to content

Commit

Permalink
Big internal API refactoring / backend simplification.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Jan 16, 2016
1 parent b3172b3 commit 7678367
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 297 deletions.
156 changes: 79 additions & 77 deletions huey/api.py
Expand Up @@ -31,27 +31,22 @@ class Huey(object):
have as many as you like -- the only caveat is that one consumer process
must be executed for each Huey instance.
:param queue: a queue instance, e.g. ``RedisQueue()``
:param result_store: a place to store results, e.g. ``RedisResultStore()``
:param schedule: a place to store pending tasks, e.g. ``RedisSchedule()``
:param events: channel to send events on, e.g. ``RedisEventEmitter()``
:param name: a name for the task queue.
:param bool result_store: whether to store task results.
:param bool events: whether to enable consumer-sent events.
:param store_none: Flag to indicate whether tasks that return ``None``
should store their results in the result store.
:param always_eager: Useful for testing, this will execute all tasks
immediately, without enqueueing them.
:param store_errors: Flag to indicate whether error tracebacks and
metadata should be stored.
Example usage::
from huey.api import Huey, crontab
from huey.storage import RedisQueue, RedisDataStore, RedisSchedule
from huey import RedisHuey
queue = RedisQueue('my-app')
result_store = RedisDataStore('my-app')
schedule = RedisSchedule('my-app')
huey = Huey(queue, result_store, schedule)
# This is equivalent to the previous 4 lines:
# huey = RedisHuey('my-app', {'host': 'localhost', 'port': 6379})
# Create a huey instance and disable consumer-sent events.
huey = RedisHuey('my-app', events=False)
@huey.task()
def slow_function(some_arg):
Expand All @@ -63,19 +58,21 @@ def backup():
# do a backup every day at 3am
return
"""
def __init__(self, queue, result_store=None, schedule=None, events=None,
store_none=False, always_eager=False, store_errors=True):
self.queue = queue
def __init__(self, name='huey', result_store=True, events=True,
store_none=False, always_eager=False, store_errors=True,
blocking=False, **storage_kwargs):
self.name = name
self.result_store = result_store
self.schedule = schedule
self.events = events
if self.queue is not None:
self.blocking = self.queue.blocking
else:
self.blocking = False
self.store_none = store_none
self.always_eager = always_eager
self.store_errors = store_errors
self.blocking = blocking
self.storage = self.get_storage(**storage_kwargs)

def get_storage(self, **kwargs):
raise NotImplementedError('Storage API not implemented in the base '
'Huey class. Use `RedisHuey` instead.')

def task(self, retries=0, retry_delay=0, retries_as_argument=False,
include_task=False, name=None):
Expand Down Expand Up @@ -168,58 +165,63 @@ def inner(*args, **kwargs):
return decorator

@_wrapped_operation(QueueWriteException)
def _write(self, msg):
self.queue.write(msg)
def _enqueue(self, msg):
self.storage.enqueue(msg)

@_wrapped_operation(QueueReadException)
def _read(self):
return self.queue.read()
def _dequeue(self):
return self.storage.dequeue()

@_wrapped_operation(QueueRemoveException)
def _remove(self, msg):
return self.queue.remove(msg)
def _unqueue(self, msg):
return self.queue.unqueue(msg)

@_wrapped_operation(DataStoreGetException)
def _get(self, key, peek=False):
def _get_data(self, key, peek=False):
if peek:
return self.result_store.peek(key)
return self.storage.peek_data(key)
else:
return self.result_store.get(key)
return self.storage.pop_data(key)

@_wrapped_operation(DataStorePutException)
def _put_data(self, key, value):
return self.storage.put_data(key, value)

@_wrapped_operation(DataStorePutException)
def _put(self, key, value):
return self.result_store.put(key, value)
def _put_error(self, metadata):
self.storage.put_error(metadata)

@_wrapped_operation(DataStoreGetException)
def _get_errors(self, limit=None, offset=0):
return self.storage.get_errors(limit=limit, offset=offset)

@_wrapped_operation(ScheduleAddException)
def _add_schedule(self, data, ts):
if self.schedule is None:
raise AttributeError('Schedule not specified.')
self.schedule.add(data, ts)
def _add_to_schedule(self, data, ts):
self.storage.add_to_schedule(data, ts)

@_wrapped_operation(ScheduleReadException)
def _read_schedule(self, ts):
if self.schedule is None:
raise AttributeError('Schedule not specified.')
return self.schedule.read(ts)
return self.storage.read_schedule(ts)

def emit(self, message):
"""Events should always fail silently."""
try:
self.events.emit(message)
self.storage.emit(message)
except:
# Events always fail silently since they are treated as a non-
# critical component.
pass

def enqueue(self, task):
if self.always_eager:
return task.execute()

self._write(registry.get_message_for_task(task))
self._enqueue(registry.get_message_for_task(task))

if self.result_store:
return AsyncData(self, task)

def dequeue(self):
message = self._read()
message = self._dequeue()
if message:
return registry.get_task_for_message(message)

Expand Down Expand Up @@ -257,31 +259,26 @@ def execute(self, task):
except Exception as exc:
if self.result_store and self.store_errors:
metadata = self._get_task_metadata(task, exc, True)
self.result_store.put_error(pickle.dumps(metadata))
self._put_error(pickle.dumps(metadata))
raise

if result is None and not self.store_none:
return

if self.result_store and not isinstance(task, PeriodicQueueTask):
self._put(task.task_id, pickle.dumps(result))
self._put_data(task.task_id, pickle.dumps(result))

return result

def revoke(self, task, revoke_until=None, revoke_once=False):
if not self.result_store:
raise QueueException('A DataStore is required to revoke task')

serialized = pickle.dumps((revoke_until, revoke_once))
self._put(task.revoke_id, serialized)
self._put_data(task.revoke_id, serialized)

def restore(self, task):
self._get(task.revoke_id) # simply get and delete if there
self._get_data(task.revoke_id) # simply get and delete if there

def is_revoked(self, task, dt=None, peek=True):
if not self.result_store:
return False
res = self._get(task.revoke_id, peek=True)
res = self._get_data(task.revoke_id, peek=True)
if res is EmptyData:
return False
revoke_until, revoke_once = pickle.loads(res)
Expand All @@ -293,26 +290,10 @@ def is_revoked(self, task, dt=None, peek=True):
return True
return revoke_until is None or revoke_until > dt

def pending(self, limit=None):
return [registry.get_task_for_message(m)
for m in self.queue.items(limit)]

def scheduled(self, limit=None):
return [registry.get_task_for_message(m)
for m in self.schedule.items(limit)]

def all_results(self):
return self.result_store.items()

def errors(self, limit=None, offset=0):
return [
pickle.loads(error)
for error in self.result_store.get_errors(limit, offset)]

def add_schedule(self, task):
msg = registry.get_message_for_task(task)
ex_time = task.execute_time or datetime.datetime.fromtimestamp(0)
self._add_schedule(msg, ex_time)
self._add_to_schedule(msg, ex_time)

def read_schedule(self, ts):
return [registry.get_task_for_message(m)
Expand All @@ -327,15 +308,36 @@ def ready_to_run(self, cmd, dt=None):
dt = dt or datetime.datetime.utcnow()
return cmd.execute_time is None or cmd.execute_time <= dt

def pending(self, limit=None):
return [registry.get_task_for_message(m)
for m in self.storage.enqueued_items(limit)]

def pending_count(self):
return self.storage.queue_size()

def scheduled(self, limit=None):
return [registry.get_task_for_message(m)
for m in self.storage.scheduled_items(limit)]

def scheduled_count(self):
return self.storage.schedule_size()

def all_results(self):
return self.storage.result_items()

def result_count(self):
return self.storage.result_store_size()

def errors(self, limit=None, offset=0):
return [
pickle.loads(error)
for error in self.storage.get_errors(limit, offset)]

def __len__(self):
return len(self.queue)
return self.pending_count()

def flush(self):
self.queue.flush()
if self.result_store:
self.result_store.flush()
if self.schedule:
self.schedule.flush()
self.storage.flush_all()


class AsyncData(object):
Expand All @@ -348,7 +350,7 @@ def __init__(self, huey, task):
def _get(self):
task_id = self.task.task_id
if self._result is EmptyData:
res = self.huey._get(task_id)
res = self.huey._get_data(task_id)

if res is not EmptyData:
self._result = pickle.loads(res)
Expand Down

0 comments on commit 7678367

Please sign in to comment.