8
8
"""
9
9
import logging
10
10
from contextlib import closing
11
+ from functools import partial
11
12
12
13
import requests
13
14
from kombu import Connection , Exchange , Queue
18
19
log = logging .getLogger (__name__ )
19
20
20
21
22
+ def noop (* args , ** kwargs ):
23
+ return None
24
+
25
+
21
26
def changesets_for_pushid (pushid , push_json_url ):
22
27
"""Return a list of changeset IDs in a repository push.
23
28
@@ -45,7 +50,7 @@ def changesets_for_pushid(pushid, push_json_url):
45
50
return changesets
46
51
47
52
48
- def process_push_message (body , message ):
53
+ def process_push_message (body , message , no_send = False ):
49
54
"""Process a hg push message from Mozilla Pulse.
50
55
51
56
The message body structure is described by https://mozilla-version-control-tools.readthedocs.io/en/latest/hgmo/notifications.html#common-properties-of-notifications
@@ -55,7 +60,10 @@ def process_push_message(body, message):
55
60
Args:
56
61
body: The decoded JSON message body as a Python dict.
57
62
message: A AMQP Message object.
63
+ no_send: Do not send any ping data or drain any queues.
58
64
"""
65
+ ack = noop if no_send else message .ack
66
+
59
67
log .debug (f'received message: { message } ' )
60
68
61
69
payload = body ['payload' ]
@@ -64,7 +72,7 @@ def process_push_message(body, message):
64
72
msgtype = payload ['type' ]
65
73
if msgtype != 'changegroup.1' :
66
74
log .info (f'skipped message of type { msgtype } ' )
67
- message . ack ()
75
+ ack ()
68
76
return
69
77
70
78
pushlog_pushes = payload ['data' ]['pushlog_pushes' ]
@@ -73,7 +81,7 @@ def process_push_message(body, message):
73
81
pcount = len (pushlog_pushes )
74
82
if pcount == 0 :
75
83
log .info (f'skipped message with zero pushes' )
76
- message . ack ()
84
+ ack ()
77
85
return
78
86
elif pcount > 1 :
79
87
# Raise this as a warning to draw attention. According to
@@ -83,7 +91,7 @@ def process_push_message(body, message):
83
91
log .warning (
84
92
f'skipped invalid message with multiple pushes (expected 0 or 1, got { pcount } )'
85
93
)
86
- message . ack ()
94
+ ack ()
87
95
return
88
96
89
97
pushdata = pushlog_pushes .pop ()
@@ -96,14 +104,18 @@ def process_push_message(body, message):
96
104
log .info (f'processing changeset { changeset } ' )
97
105
ping = payload_for_changeset (changeset , repo_url )
98
106
107
+ if no_send :
108
+ log .info (f'ping data (not sent): { ping } ' )
109
+ continue
110
+
99
111
# Pings need a unique ID so they can be de-duplicated by the ingestion
100
112
# service. We can use the changeset ID for the unique key.
101
113
send_ping (changeset , ping )
102
114
103
- message . ack ()
115
+ ack ()
104
116
105
117
106
- def run_pulse_listener (username , password , timeout ):
118
+ def run_pulse_listener (username , password , timeout , no_send ):
107
119
"""Run a Pulse message queue listener.
108
120
109
121
This function does not return.
@@ -149,10 +161,17 @@ def run_pulse_listener(username, password, timeout):
149
161
queue .queue_declare ()
150
162
queue .queue_bind ()
151
163
164
+ callback = partial (process_push_message , no_send = no_send )
165
+
152
166
# Pass auto_declare=False so that Consumer does not try to declare the
153
167
# exchange. Declaring exchanges is not allowed by the Pulse server.
154
168
with connection .Consumer (
155
- queue , callbacks = [process_push_message ], auto_declare = False
169
+ queue , callbacks = [callback ], auto_declare = False
156
170
) as consumer :
171
+
172
+ if no_send :
173
+ log .info ('transmission of ping data has been disabled' )
174
+ log .info ('message acks has been disabled' )
175
+
157
176
log .info ('reading messages' )
158
177
connection .drain_events (timeout = timeout )
0 commit comments