forked from celery/celery
-
Notifications
You must be signed in to change notification settings - Fork 40
/
messaging.py
90 lines (69 loc) · 2.72 KB
/
messaging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""
Sending and Receiving Messages
"""
from carrot.messaging import Publisher, Consumer
from celery import conf
from celery.utils import gen_unique_id
from celery.utils import mitemgetter
from celery.serialization import pickle
MSG_OPTIONS = ("mandatory", "priority",
"immediate", "routing_key")
get_msg_options = mitemgetter(*MSG_OPTIONS)
extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
class TaskPublisher(Publisher):
"""The AMQP Task Publisher class."""
exchange = conf.AMQP_EXCHANGE
exchange_type = conf.AMQP_EXCHANGE_TYPE
routing_key = conf.AMQP_PUBLISHER_ROUTING_KEY
serializer = "pickle"
encoder = pickle.dumps
def delay_task(self, task_name, task_args, task_kwargs, **kwargs):
"""Delay task for execution by the celery nodes."""
return self._delay_task(task_name=task_name, task_args=task_args,
task_kwargs=task_kwargs, **kwargs)
def delay_task_in_set(self, taskset_id, task_name, task_args, task_kwargs,
**kwargs):
"""Delay a task which part of a task set."""
return self._delay_task(task_name=task_name, part_of_set=taskset_id,
task_args=task_args, task_kwargs=task_kwargs,
**kwargs)
def retry_task(self, task_name, task_id, delivery_info, **kwargs):
kwargs["routing_key"] = delivery_info.get("routing_key")
kwargs["retries"] = kwargs.get("retries", 0) + 1
self._delay_task(task_name, task_id, **kwargs)
def _delay_task(self, task_name, task_id=None, part_of_set=None,
task_args=None, task_kwargs=None, **kwargs):
"""INTERNAL"""
task_id = task_id or gen_unique_id()
message_data = {
"task": task_name,
"id": task_id,
"args": task_args or [],
"kwargs": task_kwargs or {},
"retries": kwargs.get("retries", 0),
"eta": kwargs.get("eta"),
}
if part_of_set:
message_data["taskset"] = part_of_set
self.send(message_data, **extract_msg_options(kwargs))
return task_id
class TaskConsumer(Consumer):
"""The AMQP Task Consumer class."""
queue = conf.AMQP_CONSUMER_QUEUE
exchange = conf.AMQP_EXCHANGE
routing_key = conf.AMQP_CONSUMER_ROUTING_KEY
exchange_type = conf.AMQP_EXCHANGE_TYPE
decoder = pickle.loads
auto_ack = False
no_ack = False
class StatsPublisher(Publisher):
exchange = "celerygraph"
routing_key = "stats"
encoder = pickle.dumps
class StatsConsumer(Consumer):
queue = "celerygraph"
exchange = "celerygraph"
routing_key = "stats"
exchange_type = "direct"
decoder = pickle.loads
no_ack=True