Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task queue metrics helper methods #108

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Features

TaskTiger forks a subprocess for each task, This comes with several benefits:
Memory leaks caused by tasks are avoided since the subprocess is terminated
when the task is finished. A hard time limit can be set for each task, after
when the task is finished. A hard time limit can be set for each task, after
which the task is killed if it hasn't completed. To ensure performance, any
necessary Python modules can be preloaded in the parent process.

Expand Down Expand Up @@ -552,15 +552,16 @@ Each queue can have tasks in the following states:
- ``scheduled``: Tasks that are scheduled for later execution.
- ``error``: Tasks that failed with an error.

To get a list of all tasks for a given queue and state, use
``Task.tasks_from_queue``. The method gives you back a tuple containing the
total number of tasks in the queue (useful if the tasks are truncated) and a
list of tasks in the queue, latest first. Using the ``skip`` and ``limit``
keyword arguments, you can fetch arbitrary slices of the queue. If you know the
task ID, you can fetch a given task using ``Task.from_id``. Both methods let
you load tracebacks from failed task executions using the ``load_executions``
keyword argument, which accepts an integer indicating how many executions
should be loaded.
To get a count of the number of tasks for a given queue and state, use
``Task.task_count_from_queue``. To get a list of all tasks for a given queue
and state, use ``Task.tasks_from_queue``. The method gives you back a tuple
containing the total number of tasks in the queue (useful if the tasks are
truncated) and a list of tasks in the queue, latest first. Using the ``skip``
and ``limit`` keyword arguments, you can fetch arbitrary slices of the queue.
If you know the task ID, you can fetch a given task using ``Task.from_id``.
Both methods let you load tracebacks from failed task executions using the
``load_executions`` keyword argument, which accepts an integer indicating how
many executions should be loaded.

Tasks can also be constructed and queued using the regular constructor, which
takes the TaskTiger instance, the function name and the options described in
Expand Down Expand Up @@ -661,6 +662,37 @@ executed.
for task in tiger.current_tasks:
print(task.n_executions())

Example 4: Printing the number of queued tasks for the default queue.

.. code:: python

from tasktiger import TaskTiger, Task

QUEUE_NAME = 'default'
TASK_STATE = 'queued'

tiger = TaskTiger()

count = Task.task_count_from_queue(tiger, QUEUE_NAME, TASK_STATE)

print('{state} tasks in {queue}: {count}'.format(
state=TASK_STATE.title(),
queue=QUEUE_NAME,
count=count,
)

Example 5: Printing all queues with task metrics.

.. code:: python

from tasktiger import TaskTiger, Task

tiger = TaskTiger()

metrics = Task.queue_metrics(tiger)

print(metrics)


Rollbar error handling
----------------------
Expand Down
12 changes: 10 additions & 2 deletions tasktiger/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
'current_tasks': None,
}


# from rq
def import_attribute(name):
"""Return an attribute from a dotted path name (e.g. "path.to.func")."""
Expand All @@ -47,12 +48,14 @@ def import_attribute(name):
except (ValueError, ImportError, AttributeError) as e:
raise TaskImportError(e)


def gen_id():
"""
Generates and returns a random hex-encoded 256-bit unique ID.
"""
return binascii.b2a_hex(os.urandom(32)).decode('utf8')


def gen_unique_id(serialized_name, args, kwargs):
"""
Generates and returns a hex-encoded 256-bit ID for the given task name and
Expand All @@ -64,6 +67,7 @@ def gen_unique_id(serialized_name, args, kwargs):
'kwargs': kwargs,
}, sort_keys=True).encode('utf8')).hexdigest()


def serialize_func_name(func):
"""
Returns the dotted serialized path to the passed function.
Expand All @@ -78,18 +82,20 @@ def serialize_func_name(func):
func_name = func.__name__
return ':'.join([func.__module__, func_name])


def dotted_parts(s):
"""
For a string "a.b.c", yields "a", "a.b", "a.b.c".
"""
idx = -1
while s:
idx = s.find('.', idx+1)
idx = s.find('.', idx + 1)
if idx == -1:
yield s
break
yield s[:idx]


def reversed_dotted_parts(s):
"""
For a string "a.b.c", yields "a.b.c", "a.b", "a".
Expand All @@ -103,12 +109,14 @@ def reversed_dotted_parts(s):
break
yield s[:idx]


def serialize_retry_method(retry_method):
if callable(retry_method):
return (serialize_func_name(retry_method), ())
else:
return (serialize_func_name(retry_method[0]), retry_method[1])


def get_timestamp(when):
# convert timedelta to datetime
if isinstance(when, datetime.timedelta):
Expand All @@ -117,4 +125,4 @@ def get_timestamp(when):
if when:
# Convert to unixtime: utctimetuple drops microseconds so we add
# them manually.
return calendar.timegm(when.utctimetuple()) + when.microsecond/1.e6
return calendar.timegm(when.utctimetuple()) + when.microsecond / 1.e6
5 changes: 5 additions & 0 deletions tasktiger/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import sys


class TaskImportError(ImportError):
"""
Raised when a task could not be imported.
"""


class JobTimeoutException(BaseException):
"""
Raised when a job takes longer to complete than the allowed maximum timeout
value.
"""


class StopRetry(Exception):
"""
Raised by a retry function to indicate that the task shouldn't be retried.
"""


class RetryException(BaseException):
"""
Alternative to retry_on for retrying a task. If raised within a task, the
Expand All @@ -32,6 +36,7 @@ def __init__(self, method=None, original_traceback=False, log_error=True):
self.exc_info = sys.exc_info() if original_traceback else None
self.log_error = log_error


class TaskNotFound(Exception):
"""
The task was not found or does not exist in the given queue/state.
Expand Down
5 changes: 3 additions & 2 deletions tasktiger/flask_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
from flask_script import Command


class TaskTigerCommand(Command):
capture_all_args = True
help = 'Run a TaskTiger worker'
Expand All @@ -14,10 +15,10 @@ def __init__(self, tiger):
def create_parser(self, *args, **kwargs):
# Override the default parser so we can pass all arguments to the
# TaskTiger parser.
func_stack = kwargs.pop('func_stack',())
func_stack = kwargs.pop('func_stack', ())
parent = kwargs.pop('parent', None)
parser = argparse.ArgumentParser(*args, add_help=False, **kwargs)
parser.set_defaults(func_stack=func_stack+(self,))
parser.set_defaults(func_stack=func_stack + (self,))
self.parser = parser
self.parent = parent
return parser
Expand Down
8 changes: 4 additions & 4 deletions tasktiger/redis_lock.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from redis.lock import LockError
import time
from redis import WatchError
from redis.lock import Lock as RedisLock, LockError

# TODO: Switch to Redlock (http://redis.io/topics/distlock) once the following
# bugs are fixed:
# * https://github.com/andymccurdy/redis-py/issues/554
# * https://github.com/andymccurdy/redis-py/issues/629
# * https://github.com/andymccurdy/redis-py/issues/601


# For now, we're using the old-style lock pattern (based on py-redis 2.8.0)
# The class below additionally catches ValueError for better compatibility with
# new-style locks (for when we upgrade), and adds a renew() method.

class Lock(object):
"""
A shared, distributed Lock. Using Redis for locking allows the Lock
Expand Down Expand Up @@ -113,11 +114,10 @@ def renew(self, timeout=None):
self.redis.getset(self.name, timeout_at)
self.acquired_until = timeout_at


# For now unused:
# New-style Lock with renew() method (andymccurdy/redis-py#629)
# XXX: when upgrading to the new-style class, take old-style locks into account
from redis import WatchError
from redis.lock import Lock as RedisLock
class NewStyleLock(RedisLock):
def renew(self, new_timeout):
"""
Expand Down
10 changes: 5 additions & 5 deletions tasktiger/redis_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
redis.call('zadd', {key}, {score}, {member})
end
"""
ZADD_NOUPDATE = ZADD_NOUPDATE_TEMPLATE.format(
ZADD_NOUPDATE = ZADD_NOUPDATE_TEMPLATE.format(
key='KEYS[1]', score='ARGV[1]', member='ARGV[2]', condition='not'
)
ZADD_UPDATE_EXISTING = ZADD_NOUPDATE_TEMPLATE.format(
ZADD_UPDATE_EXISTING = ZADD_NOUPDATE_TEMPLATE.format(
key='KEYS[1]', score='ARGV[1]', member='ARGV[2]', condition=''
)
ZADD_UPDATE_TEMPLATE = """
Expand Down Expand Up @@ -372,7 +372,7 @@ def zpoppush(self, source, destination, count, score, new_score,
(their score will not be updated).
"""
if score is None:
score = '+inf' # Include all elements.
score = '+inf' # Include all elements.
if withscores:
if on_success:
raise NotImplementedError()
Expand Down Expand Up @@ -438,7 +438,7 @@ def delete_if_not_in_zsets(self, key, member, set_list, client=None):
``set_list``. Returns the number of removed elements (0 or 1).
"""
return self._delete_if_not_in_zsets(
keys=[key]+set_list,
keys=[key] + set_list,
args=[member],
client=client)

Expand Down Expand Up @@ -491,7 +491,7 @@ def execute_pipeline(self, pipeline, client=None):
stack = pipeline.command_stack
script_args = [int(self.can_replicate_commands), len(stack)]
for args, options in stack:
script_args += [len(args)-1] + list(args)
script_args += [len(args) - 1] + list(args)

# Run the pipeline
if self.can_replicate_commands: # Redis 3.2 or higher
Expand Down
10 changes: 8 additions & 2 deletions tasktiger/retry.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
# The retry logic is documented in the README.
from .exceptions import StopRetry


def _fixed(retry, delay, max_retries):
if retry > max_retries:
raise StopRetry()
return delay


def fixed(delay, max_retries):
return (_fixed, (delay, max_retries))


def _linear(retry, delay, increment, max_retries):
if retry > max_retries:
raise StopRetry()
return delay + increment*(retry-1)
return delay + increment * (retry - 1)


def linear(delay, increment, max_retries):
return (_linear, (delay, increment, max_retries))


def _exponential(retry, delay, factor, max_retries):
if retry > max_retries:
raise StopRetry()
return delay * factor**(retry-1)
return delay * factor ** (retry - 1)


def exponential(delay, factor, max_retries):
return (_exponential, (delay, factor, max_retries))
2 changes: 2 additions & 0 deletions tasktiger/rollbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import rollbar
from rollbar.logger import RollbarHandler


class StructlogRollbarHandler(RollbarHandler):
def __init__(self, prefix, *args, **kwargs):
"""
Expand All @@ -17,6 +18,7 @@ def __init__(self, prefix, *args, **kwargs):
def format_title(self, data):
# Keys used to construct the title and for grouping purposes.
KEYS = ['event', 'func', 'exception_name', 'queue']

def format_field(field, value):
if field == 'queue':
return '%s=%s' % (field, value.split('.')[0])
Expand Down
4 changes: 3 additions & 1 deletion tasktiger/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

__all__ = ['periodic']


def _periodic(dt, period, start_date, end_date):
if end_date and dt >= end_date:
return None
Expand All @@ -11,7 +12,7 @@ def _periodic(dt, period, start_date, end_date):

# Determine the next time the task should be run
delta = dt - start_date
seconds = delta.seconds + delta.days*86400
seconds = delta.seconds + delta.days * 86400
runs = seconds // period
next_run = runs + 1
next_date = start_date + datetime.timedelta(seconds=next_run * period)
Expand All @@ -22,6 +23,7 @@ def _periodic(dt, period, start_date, end_date):

return next_date


def periodic(seconds=0, minutes=0, hours=0, days=0, weeks=0, start_date=None,
end_date=None):
"""
Expand Down
1 change: 1 addition & 0 deletions tasktiger/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from ._internal import g_fork_lock


class StatsThread(threading.Thread):
def __init__(self, tiger):
super(StatsThread, self).__init__()
Expand Down
Loading