/
actions.py
342 lines (257 loc) · 11.9 KB
/
actions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
from abc import ABC
from hashlib import sha256
from typing import Callable
from typing import Iterable
from typing import Tuple
from typing import Union
from uuid import uuid4
from libcloud.storage.types import ObjectDoesNotExistError
from opwen_email_server.constants import events
from opwen_email_server.constants import sync
from opwen_email_server.services.auth import AzureAuth
from opwen_email_server.services.sendgrid import SendSendgridEmail
from opwen_email_server.services.storage import AzureObjectsStorage
from opwen_email_server.services.storage import AzureObjectStorage
from opwen_email_server.services.storage import AzureTextStorage
from opwen_email_server.utils.email_parser import format_attachments
from opwen_email_server.utils.email_parser import format_inline_images
from opwen_email_server.utils.email_parser import get_domain
from opwen_email_server.utils.email_parser import get_domains
from opwen_email_server.utils.email_parser import parse_mime_email
from opwen_email_server.utils.log import LogMixin
from opwen_email_server.utils.serialization import from_base64
from opwen_email_server.utils.serialization import from_jsonl_bytes
from opwen_email_server.utils.serialization import to_base64
from opwen_email_server.utils.serialization import to_jsonl_bytes
from opwen_email_server.utils.serialization import to_msgpack_bytes
from opwen_email_server.utils.string import is_lowercase
Response = Union[dict, Tuple[str, int]]
class _Action(ABC, LogMixin):
def __call__(self, *args, **kwargs) -> Response:
try:
return self._action(*args, **kwargs)
except Exception as ex:
self.log_exception(ex, 'error in action %s',
self.__class__.__name__)
raise ex
def _action(self, *args, **kwargs) -> Response:
raise NotImplementedError # pragma: no cover
class Ping(_Action):
# noinspection PyMethodMayBeStatic
def _action(self): # type: ignore
return 'OK', 200
class SendOutboundEmails(_Action):
def __init__(self,
email_storage: AzureObjectStorage,
send_email: SendSendgridEmail):
self._email_storage = email_storage
self._send_email = send_email
def _action(self, resource_id): # type: ignore
email = self._email_storage.fetch_object(resource_id)
success = self._send_email(email)
if not success:
return 'error', 500
self.log_event(events.EMAIL_DELIVERED_FROM_CLIENT, {'domain': get_domain(email.get('from', ''))}) # noqa: E501
return 'OK', 200
class StoreInboundEmails(_Action):
def __init__(self,
raw_email_storage: AzureTextStorage,
email_storage: AzureObjectStorage,
pending_factory: Callable[[str], AzureTextStorage],
email_parser: Callable[[str], dict] = None):
self._raw_email_storage = raw_email_storage
self._email_storage = email_storage
self._pending_factory = pending_factory
self._email_parser = email_parser or self._parse_mime_email
def _action(self, resource_id): # type: ignore
try:
mime_email = self._raw_email_storage.fetch_text(resource_id)
except ObjectDoesNotExistError:
self.log_warning('Inbound email %s does not exist', resource_id)
return 'skipped', 202
email = self._email_parser(mime_email)
self._store_inbound_email(email)
self._raw_email_storage.delete(resource_id)
self.log_event(events.EMAIL_STORED_FOR_CLIENT, {'domain': get_domain(email.get('from') or '')}) # noqa: E501
return 'OK', 200
def _store_inbound_email(self, email: dict):
email_id = self._to_id(email)
email['_uid'] = email_id
self._email_storage.store_object(email_id, email)
for domain in get_domains(email):
pending_storage = self._pending_factory(domain)
pending_storage.store_text(email_id, 'pending')
def _parse_mime_email(self, mime_email: str) -> dict:
email = parse_mime_email(mime_email)
email = format_attachments(email)
email = format_inline_images(email, self.log_warning)
return email
@classmethod
def _to_id(cls, email: dict) -> str:
return sha256(to_msgpack_bytes(email)).hexdigest()
class StoreWrittenClientEmails(_Action):
def __init__(self,
client_storage: AzureObjectsStorage,
email_storage: AzureObjectStorage,
next_task: Callable[[str], None]):
self._client_storage = client_storage
self._email_storage = email_storage
self._next_task = next_task
def _action(self, resource_id): # type: ignore
emails = self._client_storage.fetch_objects(
resource_id, (sync.EMAILS_FILE, from_jsonl_bytes))
domain = ''
num_stored = 0
for email in emails:
email_id = email['_uid']
email = self._decode_attachments(email)
self._email_storage.store_object(email_id, email)
self._next_task(email_id)
num_stored += 1
domain = get_domain(email.get('from', ''))
self._client_storage.delete(resource_id)
self.log_event(events.EMAIL_STORED_FROM_CLIENT, {'domain': domain, 'num_emails': num_stored}) # noqa: E501
return 'OK', 200
@classmethod
def _decode_attachments(cls, email: dict) -> dict:
if not email.get('attachments'):
return email
for attachment in email['attachments']:
attachment['content'] = from_base64(attachment['content'])
return email
class ReceiveInboundEmail(_Action):
def __init__(self,
auth: AzureAuth,
raw_email_storage: AzureTextStorage,
next_task: Callable[[str], None]):
self._auth = auth
self._raw_email_storage = raw_email_storage
self._next_task = next_task
def _action(self, client_id, email): # type: ignore
domain = self._auth.domain_for(client_id)
if not domain:
self.log_event(events.UNREGISTERED_CLIENT, {'client_id': client_id}) # noqa: E501
return 'client is not registered', 403
email_id = self._new_email_id(email)
self._raw_email_storage.store_text(email_id, email)
self._next_task(email_id)
self.log_event(events.EMAIL_RECEIVED_FOR_CLIENT, {'domain': domain}) # noqa: E501
return 'received', 200
@classmethod
def _new_email_id(cls, email: str) -> str:
return sha256(email.encode('utf-8')).hexdigest()
class DownloadClientEmails(_Action):
def __init__(self,
auth: AzureAuth,
client_storage: AzureObjectsStorage,
email_storage: AzureObjectStorage,
pending_factory: Callable[[str], AzureTextStorage]):
self._auth = auth
self._client_storage = client_storage
self._email_storage = email_storage
self._pending_factory = pending_factory
def _action(self, client_id, compression): # type: ignore
domain = self._auth.domain_for(client_id)
if not domain:
self.log_event(events.UNREGISTERED_CLIENT, {'client_id': client_id}) # noqa: E501
return 'client is not registered', 403
if compression not in self._client_storage.compression_formats():
self.log_event(events.UNKNOWN_COMPRESSION_FORMAT, {'client_id': client_id}) # noqa: E501
return 'unknown compression format "{}"'.format(compression), 400
delivered = set()
def mark_delivered(email: dict) -> dict:
delivered.add(email['_uid'])
return email
pending_storage = self._pending_factory(domain)
pending = self._fetch_pending_emails(pending_storage)
pending = (mark_delivered(email) for email in pending)
pending = (self._encode_attachments(email) for email in pending)
resource_id = self._client_storage.store_objects(
(sync.EMAILS_FILE, pending, to_jsonl_bytes),
compression)
self._mark_emails_as_delivered(pending_storage, delivered)
self.log_event(events.EMAILS_DELIVERED_TO_CLIENT, {'domain': domain, 'num_emails': len(delivered)}) # noqa: E501
return {
'resource_id': resource_id,
}
def _fetch_pending_emails(self, pending_storage: AzureTextStorage):
for email_id in pending_storage.iter():
yield self._email_storage.fetch_object(email_id)
@classmethod
def _encode_attachments(cls, email: dict) -> dict:
if not email.get('attachments'):
return email
for attachment in email['attachments']:
content_bytes = attachment['content']
attachment['content'] = to_base64(content_bytes)
return email
@classmethod
def _mark_emails_as_delivered(cls, pending_storage: AzureTextStorage,
email_ids: Iterable[str]):
for email_id in email_ids:
pending_storage.delete(email_id)
class UploadClientEmails(_Action):
def __init__(self,
auth: AzureAuth,
next_task: Callable[[str], None]):
self._auth = auth
self._next_task = next_task
def _action(self, client_id, upload_info): # type: ignore
domain = self._auth.domain_for(client_id)
if not domain:
self.log_event(events.UNREGISTERED_CLIENT, {'client_id': client_id}) # noqa: E501
return 'client is not registered', 403
resource_id = upload_info['resource_id']
self._next_task(resource_id)
self.log_event(events.EMAILS_RECEIVED_FROM_CLIENT, {'domain': domain}) # noqa: E501
return 'uploaded', 200
class RegisterClient(_Action):
def __init__(self,
auth: AzureAuth,
client_storage: AzureObjectsStorage,
setup_mailbox: Callable[[str, str], None],
setup_mx_records: Callable[[str], None],
client_id_source: Callable[[], str] = None):
self._auth = auth
self._client_storage = client_storage
self._setup_mailbox = setup_mailbox
self._setup_mx_records = setup_mx_records
self._client_id_source = client_id_source or self._new_client_id
def _action(self, client, **auth_args): # type: ignore
domain = client['domain']
if not is_lowercase(domain):
return 'domain must be lowercase', 400
if self._auth.client_id_for(domain) is not None:
return 'client already exists', 409
client_id = self._client_id_source()
access_info = self._client_storage.access_info()
self._setup_mailbox(client_id, domain)
self._setup_mx_records(domain)
self._client_storage.ensure_exists()
self._auth.insert(client_id, domain)
self.log_event(events.NEW_CLIENT_REGISTERED, {'domain': domain}) # noqa: E501
return {
'client_id': client_id,
'storage_account': access_info.account,
'storage_key': access_info.key,
'resource_container': access_info.container,
}
@classmethod
def _new_client_id(cls) -> str:
return str(uuid4())
class CalculatePendingEmailsMetric(_Action):
def __init__(self,
auth: AzureAuth,
pending_factory: Callable[[str], AzureTextStorage]):
self._auth = auth
self._pending_factory = pending_factory
def _action(self, client_domain, **auth_args): # type: ignore
client_id = self._auth.client_id_for(client_domain)
if not client_id:
self.log_event(events.UNKNOWN_CLIENT_DOMAIN, {'client_domain': client_domain}) # noqa: E501
return 'unknown client domain', 404
pending_storage = self._pending_factory(client_domain)
pending_emails = sum(1 for _ in pending_storage.iter())
return {
'pending_emails': pending_emails,
}