Skip to content

Commit

Permalink
format for 120 columns
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Mar 6, 2018
1 parent 462e5fc commit 74719c5
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 58 deletions.
4 changes: 2 additions & 2 deletions kuyruk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def inner(f):
queue_ = 'kuyruk' if callable(queue) else queue

return Task(f, self, queue_, **kwargs)

return inner

if callable(queue):
Expand Down Expand Up @@ -111,8 +112,7 @@ def send_tasks_to_queue(self, subtasks):
for subtask in subtasks:
queue = subtask.task._queue_for_host(subtask.host)
if queue not in declared_queues:
ch.queue_declare(queue=queue,
durable=True, auto_delete=False)
ch.queue_declare(queue=queue, durable=True, auto_delete=False)
declared_queues.add(queue)

description = subtask.task._get_description(subtask.args,
Expand Down
9 changes: 3 additions & 6 deletions kuyruk/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ def main():
parser = argparse.ArgumentParser(conflict_handler='resolve')

# Add common options
parser.add_argument(
'-v', '--version', action='version', version=__version__)
parser.add_argument(
'-a', '--app', required=True, help='path to the Kuyruk object')
parser.add_argument('-v', '--version', action='version', version=__version__)
parser.add_argument('-a', '--app', required=True, help='path to the Kuyruk object')

subparsers = parser.add_subparsers(
dest='subparser_name', help='sub-command name')
subparsers = parser.add_subparsers(dest='subparser_name', help='sub-command name')

# Parser for the "worker" sub-command
parser_worker = subparsers.add_parser('worker', help='run a worker')
Expand Down
3 changes: 1 addition & 2 deletions kuyruk/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ def from_pymodule(self, name):
raise TypeError
module = importer.import_module(name)
for key, value in module.__dict__.items():
if (key.isupper() and
not isinstance(value, types.ModuleType)):
if (key.isupper() and not isinstance(value, types.ModuleType)):
self._setattr(key, value)
logger.info("Config is loaded from module: %s", name)

Expand Down
25 changes: 9 additions & 16 deletions kuyruk/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ class Task:
back to queue.
"""
def __init__(self, f, kuyruk, queue,
retry=0, max_run_time=None,
reject_delay=0):

def __init__(self, f, kuyruk, queue, retry=0, max_run_time=None, reject_delay=0):
self.f = f
self.kuyruk = kuyruk
self.queue = queue
Expand All @@ -58,8 +57,7 @@ def __call__(self, *args, **kwargs):
def subtask(self, args=(), kwargs={}, host=None):
return SubTask(self, args, kwargs, host)

def send_to_queue(self, args=(), kwargs={},
host=None, wait_result=None, message_ttl=None):
def send_to_queue(self, args=(), kwargs={}, host=None, wait_result=None, message_ttl=None):
"""
Sends a message to the queue.
A worker will run the task's function when it receives the message.
Expand Down Expand Up @@ -91,8 +89,7 @@ def send_to_queue(self, args=(), kwargs={},
logger.debug("Task.send_to_queue args=%r, kwargs=%r", args, kwargs)
queue = self._queue_for_host(host)
description = self._get_description(args, kwargs)
self._send_signal(signals.task_presend, args=args, kwargs=kwargs,
description=description)
self._send_signal(signals.task_presend, args=args, kwargs=kwargs, description=description)

body = json.dumps(description)
msg = amqp.Message(body=body)
Expand All @@ -107,13 +104,11 @@ def send_to_queue(self, args=(), kwargs={},
with self.kuyruk.channel() as ch:
if wait_result:
result = Result(ch.connection)
ch.basic_consume(queue='amq.rabbitmq.reply-to', no_ack=True,
callback=result.process_message)
ch.basic_consume(queue='amq.rabbitmq.reply-to', no_ack=True, callback=result.process_message)

ch.queue_declare(queue=queue, durable=True, auto_delete=False)
ch.basic_publish(msg, exchange="", routing_key=queue)
self._send_signal(signals.task_postsend, args=args, kwargs=kwargs,
description=description)
self._send_signal(signals.task_postsend, args=args, kwargs=kwargs, description=description)

if wait_result:
return result.wait(wait_result)
Expand Down Expand Up @@ -209,11 +204,9 @@ def time_limit(seconds):
return

if platform.system() == 'Windows':
raise NotImplementedError(
"There is no way to implement a general purpose time-limit on "
"Windows. Read this issue for more details: "
"https://github.com/cenkalti/kuyruk/issues/54"
)
raise NotImplementedError("There is no way to implement a general purpose time-limit on "
"Windows. Read this issue for more details: "
"https://github.com/cenkalti/kuyruk/issues/54")

def signal_handler(signum, frame):
raise Timeout
Expand Down
62 changes: 30 additions & 32 deletions kuyruk/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ def _consumer_tag(self, queue):
def _declare_queues(self, ch):
for queue in self.queues:
logger.debug("queue_declare: %s", queue)
ch.queue_declare(
queue=queue, durable=True, auto_delete=False)
ch.queue_declare(queue=queue, durable=True, auto_delete=False)

def _pause_or_resume(self, channel):
try:
Expand All @@ -162,23 +161,17 @@ def _pause_or_resume(self, channel):
should_pause = load > self._max_load

if should_pause and self.consuming:
logger.warning(
'Load is above the treshold (%.2f/%s), '
'pausing consumer', load, self._max_load)
logger.warning('Load is above the treshold (%.2f/%s), ' 'pausing consumer', load, self._max_load)
self._cancel_queues(channel)
elif not should_pause and not self.consuming:
logger.warning(
'Load is below the treshold (%.2f/%s), '
'resuming consumer', load, self._max_load)
logger.warning('Load is below the treshold (%.2f/%s), ' 'resuming consumer', load, self._max_load)
self._consume_queues(channel)

def _consume_queues(self, ch):
self.consuming = True
for queue in self.queues:
logger.debug("basic_consume: %s", queue)
ch.basic_consume(queue=queue,
consumer_tag=self._consumer_tag(queue),
callback=self._process_message)
ch.basic_consume(queue=queue, consumer_tag=self._consumer_tag(queue), callback=self._process_message)

def _cancel_queues(self, ch):
self.consuming = False
Expand All @@ -202,14 +195,12 @@ def _process_message(self, message):

def _process_description(self, message, description):
try:
task = self._import_task(description['module'],
description['function'])
task = self._import_task(description['module'], description['function'])
args, kwargs = description['args'], description['kwargs']
except Exception:
logger.error('Cannot import task')
exc_info = sys.exc_info()
signals.worker_failure.send(self.kuyruk, description=description,
exc_info=exc_info, worker=self)
signals.worker_failure.send(self.kuyruk, description=description, exc_info=exc_info, worker=self)
message.channel.basic_reject(message.delivery_tag, requeue=False)
else:
self._process_task(message, description, task, args, kwargs)
Expand All @@ -226,12 +217,10 @@ def _process_task(self, message, description, task, args, kwargs):
queue = message.delivery_info['routing_key']
reply_to = message.properties.get('reply_to')
try:
result = self._run_task(message.channel.connection,
task, args, kwargs)
result = self._run_task(message.channel.connection, task, args, kwargs)
except Reject:
logger.warning('Task is rejected')
self._rejects.push(task.reject_delay, message.delivery_tag,
requeue=True)
self._rejects.push(task.reject_delay, message.delivery_tag, requeue=True)
except Discard:
logger.warning('Task is discarded')
message.channel.basic_reject(message.delivery_tag, requeue=False)
Expand All @@ -242,19 +231,29 @@ def _process_task(self, message, description, task, args, kwargs):
logger.error('Error while sending heartbeat')
exc_info = e.exc_info
logger.error(''.join(traceback.format_exception(*exc_info)))
signals.worker_failure.send(self.kuyruk, description=description,
task=task, args=args, kwargs=kwargs,
exc_info=exc_info, worker=self,
queue=queue)
signals.worker_failure.send(
self.kuyruk,
description=description,
task=task,
args=args,
kwargs=kwargs,
exc_info=exc_info,
worker=self,
queue=queue)
raise
except Exception:
logger.error('Task raised an exception')
exc_info = sys.exc_info()
logger.error(''.join(traceback.format_exception(*exc_info)))
signals.worker_failure.send(self.kuyruk, description=description,
task=task, args=args, kwargs=kwargs,
exc_info=exc_info, worker=self,
queue=queue)
signals.worker_failure.send(
self.kuyruk,
description=description,
task=task,
args=args,
kwargs=kwargs,
exc_info=exc_info,
worker=self,
queue=queue)
self._rejects.push(0, message.delivery_tag, requeue=False)
if reply_to:
self._send_reply(reply_to, message.channel, None, exc_info)
Expand All @@ -267,8 +266,7 @@ def _process_task(self, message, description, task, args, kwargs):
logger.debug("Task is processed")

def _run_task(self, connection, task, args, kwargs):
hb = Heartbeat(connection, self._on_heartbeat_error,
rejects=self._rejects)
hb = Heartbeat(connection, self._on_heartbeat_error, rejects=self._rejects)
hb.start()

self.current_task = task
Expand Down Expand Up @@ -314,8 +312,7 @@ def _send_reply(self, reply_to, channel, result, exc_info):
except Exception as e:
logger.error('Cannot serialize result as JSON: %s', e)
exc_info = sys.exc_info()
reply = {'result': None,
'exception': self._exc_info_dict(exc_info)}
reply = {'result': None, 'exception': self._exc_info_dict(exc_info)}
body = json.dumps(reply)

msg = amqp.Message(body=body)
Expand All @@ -327,7 +324,8 @@ def _exc_info_dict(exc_info):
return {
'type': '%s.%s' % (type_.__module__, str(type_.__name__)),
'value': str(val),
'traceback': ''.join(traceback.format_tb(tb))}
'traceback': ''.join(traceback.format_tb(tb)),
}

def _watch_load(self):
"""Pause consuming messages if lood goes above the allowed limit."""
Expand Down

0 comments on commit 74719c5

Please sign in to comment.