forked from zulip/zulip
-
Notifications
You must be signed in to change notification settings - Fork 0
/
outgoing_webhook.py
432 lines (364 loc) · 15.8 KB
/
outgoing_webhook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
import abc
import json
import logging
from time import perf_counter
from typing import Any, AnyStr, Dict, Optional
import requests
from django.conf import settings
from django.utils.translation import gettext as _
from requests import Response
from version import ZULIP_VERSION
from zerver.decorator import JsonableError
from zerver.lib.actions import check_add_reaction, check_send_message
from zerver.lib.message import MessageDict
from zerver.lib.outgoing_http import OutgoingSession
from zerver.lib.queue import retry_event
from zerver.lib.topic import get_topic_from_message_info
from zerver.lib.url_encoding import near_message_url
from zerver.models import (
GENERIC_INTERFACE,
SLACK_INTERFACE,
Service,
UserProfile,
email_to_domain,
get_client,
get_user_profile_by_id,
)
class OutgoingWebhookServiceInterface(metaclass=abc.ABCMeta):
def __init__(self, token: str, user_profile: UserProfile, service_name: str) -> None:
self.token: str = token
self.user_profile: UserProfile = user_profile
self.service_name: str = service_name
self.session: requests.Session = OutgoingSession(
role="webhook",
timeout=10,
headers={"User-Agent": "ZulipOutgoingWebhook/" + ZULIP_VERSION},
)
@abc.abstractmethod
def make_request(self, base_url: str, event: Dict[str, Any]) -> Optional[Response]:
raise NotImplementedError
@abc.abstractmethod
def process_success(self, response_json: Dict[str, Any]) -> Optional[Dict[str, Any]]:
raise NotImplementedError
class GenericOutgoingWebhookService(OutgoingWebhookServiceInterface):
def make_request(self, base_url: str, event: Dict[str, Any]) -> Optional[Response]:
"""
We send a simple version of the message to outgoing
webhooks, since most of them really only need
`content` and a few other fields. We may eventually
allow certain bots to get more information, but
that's not a high priority. We do send the gravatar
info to the clients (so they don't have to compute
it themselves).
"""
message_dict = MessageDict.finalize_payload(
event["message"],
apply_markdown=False,
client_gravatar=False,
keep_rendered_content=True,
)
request_data = {
"data": event["command"],
"message": message_dict,
"bot_email": self.user_profile.email,
"bot_full_name": self.user_profile.full_name,
"token": self.token,
"trigger": event["trigger"],
}
return self.session.post(base_url, json=request_data)
def process_success(self, response_json: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if "response_not_required" in response_json and response_json["response_not_required"]:
return None
if "response_string" in response_json:
# We are deprecating response_string.
content = str(response_json["response_string"])
success_data = dict(content=content)
return success_data
if "content" in response_json:
content = str(response_json["content"])
success_data = dict(content=content)
if "widget_content" in response_json:
success_data["widget_content"] = response_json["widget_content"]
if "reactions" in response_json:
success_data["reactions"] = response_json["reactions"]
return success_data
if "reactions" in response_json:
success_data = dict(reactions=response_json["reactions"])
return success_data
return None
class SlackOutgoingWebhookService(OutgoingWebhookServiceInterface):
def make_request(self, base_url: str, event: Dict[str, Any]) -> Optional[Response]:
if event["message"]["type"] == "private":
failure_message = "Slack outgoing webhooks don't support private messages."
fail_with_message(event, failure_message)
return None
request_data = [
("token", self.token),
("team_id", event["message"]["sender_realm_str"]),
("team_domain", email_to_domain(event["message"]["sender_email"])),
("channel_id", event["message"]["stream_id"]),
("channel_name", event["message"]["display_recipient"]),
("timestamp", event["message"]["timestamp"]),
("user_id", event["message"]["sender_id"]),
("user_name", event["message"]["sender_full_name"]),
("text", event["command"]),
("trigger_word", event["trigger"]),
("service_id", event["user_profile_id"]),
]
return self.session.post(base_url, data=request_data)
def process_success(self, response_json: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if "text" in response_json:
content = response_json["text"]
success_data = dict(content=content)
return success_data
return None
AVAILABLE_OUTGOING_WEBHOOK_INTERFACES: Dict[str, Any] = {
GENERIC_INTERFACE: GenericOutgoingWebhookService,
SLACK_INTERFACE: SlackOutgoingWebhookService,
}
def get_service_interface_class(interface: str) -> Any:
if interface not in AVAILABLE_OUTGOING_WEBHOOK_INTERFACES:
return AVAILABLE_OUTGOING_WEBHOOK_INTERFACES[GENERIC_INTERFACE]
else:
return AVAILABLE_OUTGOING_WEBHOOK_INTERFACES[interface]
def get_outgoing_webhook_service_handler(service: Service) -> Any:
service_interface_class = get_service_interface_class(service.interface_name())
service_interface = service_interface_class(
token=service.token, user_profile=service.user_profile, service_name=service.name
)
return service_interface
def send_response_message(
bot_id: int, message_info: Dict[str, Any], response_data: Dict[str, Any]
) -> None:
"""
bot_id is the user_id of the bot sending the response
message_info is used to address the message and should have these fields:
type - "stream" or "private"
display_recipient - like we have in other message events
topic - see get_topic_from_message_info
response_data is what the bot wants to send back and has these fields:
content - raw Markdown content for Zulip to render
WARNING: This function sends messages bypassing the stream access check
for the bot - so use with caution to not call this in codepaths
that might let someone send arbitrary messages to any stream through this.
"""
message_type = message_info["type"]
display_recipient = message_info["display_recipient"]
try:
topic_name: Optional[str] = get_topic_from_message_info(message_info)
except KeyError:
topic_name = None
bot_user = get_user_profile_by_id(bot_id)
realm = bot_user.realm
client = get_client("OutgoingWebhookResponse")
content = response_data.get("content")
assert content
widget_content = response_data.get("widget_content")
if message_type == "stream":
message_to = [display_recipient]
elif message_type == "private":
message_to = [recipient["email"] for recipient in display_recipient]
else:
raise JsonableError(_("Invalid message type"))
check_send_message(
sender=bot_user,
client=client,
message_type_name=message_type,
message_to=message_to,
topic_name=topic_name,
message_content=content,
widget_content=widget_content,
realm=realm,
skip_stream_access_check=True,
)
def add_response_reaction(
bot_id: int, message_info: Dict[str, Any], reaction_data: Dict[str, Any]
) -> None:
"""
This function is similar to send_response_reaction, but it adds reaction instead.
bot_id is the user_id of the bot sending the response
message_info is used to address the message to be added with the reaction and
should have at least these fields:
id - the id of the message
reaction_data contains the information about the reaction, and can have the following fields:
emoji_name - the name of the emoji
emoji_code - optional, see check_add_reaction
reaction_type - optional, can be "unicode_emoji", "realm_emoji", or "zulip_extra_emoji"
"""
bot_user = get_user_profile_by_id(bot_id)
message_id = message_info["id"]
emoji_name = reaction_data.get("emoji_name")
if emoji_name is not None:
check_add_reaction(
user_profile=bot_user,
message_id=message_id,
emoji_name=emoji_name,
emoji_code=reaction_data.get("emoji_code"),
reaction_type=reaction_data.get("reaction_type"),
)
else:
raise JsonableError(_("Emoji name is missing"))
def fail_with_message(event: Dict[str, Any], failure_message: str) -> None:
bot_id = event["user_profile_id"]
message_info = event["message"]
content = "Failure! " + failure_message
response_data = dict(content=content)
send_response_message(bot_id=bot_id, message_info=message_info, response_data=response_data)
def get_message_url(event: Dict[str, Any]) -> str:
bot_user = get_user_profile_by_id(event["user_profile_id"])
message = event["message"]
realm = bot_user.realm
return near_message_url(
realm=realm,
message=message,
)
def notify_bot_owner(
event: Dict[str, Any],
status_code: Optional[int] = None,
response_content: Optional[AnyStr] = None,
failure_message: Optional[str] = None,
exception: Optional[Exception] = None,
) -> None:
message_url = get_message_url(event)
bot_id = event["user_profile_id"]
bot = get_user_profile_by_id(bot_id)
bot_owner = bot.bot_owner
assert bot_owner is not None
notification_message = f"[A message]({message_url}) to your bot @_**{bot.full_name}** triggered an outgoing webhook."
if exception:
notification_message += (
"\nWhen trying to send a request to the webhook service, an exception "
f"of type {type(exception).__name__} occurred:\n```\n{exception}\n```"
)
elif failure_message:
notification_message += "\n" + failure_message
elif status_code == 407:
notification_message += (
"\nThe URL configured for the webhook is for a private or disallowed network."
)
elif status_code:
notification_message += f"\nThe webhook got a response with status code *{status_code}*."
if response_content:
notification_message += (
f"\nThe response contains the following payload:\n```\n{response_content!r}\n```"
)
message_info = dict(
type="private",
display_recipient=[dict(email=bot_owner.email)],
)
response_data = dict(content=notification_message)
send_response_message(bot_id=bot_id, message_info=message_info, response_data=response_data)
def request_retry(event: Dict[str, Any], failure_message: Optional[str] = None) -> None:
def failure_processor(event: Dict[str, Any]) -> None:
"""
The name of the argument is 'event' on purpose. This argument will hide
the 'event' argument of the request_retry function. Keeping the same name
results in a smaller diff.
"""
bot_user = get_user_profile_by_id(event["user_profile_id"])
fail_with_message(event, "Bot is unavailable")
notify_bot_owner(event, failure_message=failure_message)
logging.warning(
"Maximum retries exceeded for trigger:%s event:%s",
bot_user.email,
event["command"],
)
retry_event("outgoing_webhooks", event, failure_processor)
def process_success_response(
event: Dict[str, Any], service_handler: Any, response: Response
) -> None:
try:
response_json = json.loads(response.text)
except json.JSONDecodeError:
raise JsonableError(_("Invalid JSON in response"))
if not isinstance(response_json, dict):
raise JsonableError(_("Invalid response format"))
success_data = service_handler.process_success(response_json)
if success_data is None:
return
content = success_data.get("content")
reactions = success_data.get("reactions")
bot_id = event["user_profile_id"]
message_info = event["message"]
if content is not None and content.strip() != "":
widget_content = success_data.get("widget_content")
response_data = dict(content=content, widget_content=widget_content)
send_response_message(bot_id=bot_id, message_info=message_info, response_data=response_data)
if reactions is not None and isinstance(reactions, list):
for reaction in reactions:
add_response_reaction(bot_id=bot_id, message_info=message_info, reaction_data=reaction)
def do_rest_call(
base_url: str,
event: Dict[str, Any],
service_handler: OutgoingWebhookServiceInterface,
) -> Optional[Response]:
"""Returns response of call if no exception occurs."""
try:
start_time = perf_counter()
response = service_handler.make_request(
base_url,
event,
)
bot_profile = service_handler.user_profile
logging.info(
"Outgoing webhook request from %s@%s took %f seconds",
bot_profile.id,
bot_profile.realm.string_id,
perf_counter() - start_time,
)
if response is None:
return None
if str(response.status_code).startswith("2"):
try:
process_success_response(event, service_handler, response)
except JsonableError as e:
response_message = e.msg
logging.info("Outhook trigger failed:", stack_info=True)
fail_with_message(event, response_message)
response_message = f"The outgoing webhook server attempted to send a message in Zulip, but that request resulted in the following error:\n> {e}"
notify_bot_owner(event, failure_message=response_message)
return None
else:
logging.warning(
"Message %(message_url)s triggered an outgoing webhook, returning status "
'code %(status_code)s.\n Content of response (in quotes): "'
'%(response)s"',
{
"message_url": get_message_url(event),
"status_code": response.status_code,
"response": response.text,
},
)
failure_message = f"Third party responded with {response.status_code}"
fail_with_message(event, failure_message)
notify_bot_owner(event, response.status_code, response.content)
return response
except requests.exceptions.Timeout:
logging.info(
"Trigger event %s on %s timed out. Retrying",
event["command"],
event["service_name"],
)
failure_message = (
f"Request timed out after {settings.OUTGOING_WEBHOOK_TIMEOUT_SECONDS} seconds."
)
request_retry(event, failure_message=failure_message)
return None
except requests.exceptions.ConnectionError:
logging.info(
"Trigger event %s on %s resulted in a connection error. Retrying",
event["command"],
event["service_name"],
)
failure_message = "A connection error occurred. Is my bot server down?"
request_retry(event, failure_message=failure_message)
return None
except requests.exceptions.RequestException as e:
response_message = (
f"An exception of type *{type(e).__name__}* occurred for message `{event['command']}`! "
"See the Zulip server logs for more information."
)
logging.exception("Outhook trigger failed:", stack_info=True)
fail_with_message(event, response_message)
notify_bot_owner(event, exception=e)
return None