-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker.py
79 lines (65 loc) · 2.44 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import with_statement
from __future__ import absolute_import
import time
import logging
from kombu import Connection
from kombu.mixins import ConsumerMixin
from .queues import hg_queues
from .utils import handlePushes, PushJS
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
logger = logging.getLogger()
class Worker(ConsumerMixin):
def __init__(self, connection, settings):
self.connection = connection
self.retries = 0
self.max_retries = settings.MAX_HG_RETRIES
self.sentry = None
if hasattr(settings, 'RAVEN_CONFIG'):
from raven import Client
self.sentry = Client(**settings.RAVEN_CONFIG)
def get_consumers(self, Consumer, channel):
return [Consumer(queues=hg_queues,
callbacks=[self.process_pushes,
self.process_repo])]
def process_pushes(self, body, message):
if body.get('type') != 'hg-push':
return
logger.info('got hg-push message %r', body)
try:
handlePushes(body['repository_id'],
[PushJS(p['id'], p) for p in body['pushes']],
close_connection=True)
self.retries = 0
except KeyboardInterrupt:
raise
except Exception:
self.retries += 1
logger.error('handlePushes failed: %d' % self.retries,
exc_info=True)
if self.sentry:
self.sentry.captureException()
if self.retries > self.max_retries:
# this problem might be real, let's just die
# and have a human figure it out
raise
time.sleep(self.retries)
message.requeue()
return
message.ack()
def process_repo(self, body, message):
if body.get('type') != 'new-hg-repo':
return
logger.info('got message %r', body)
message.ack()
def run(args):
from django.conf import settings
import django
django.setup()
with Connection(settings.TRANSPORT) as conn:
try:
Worker(conn, settings).run()
except KeyboardInterrupt:
print('bye bye')