/
aioimaplib.py
912 lines (749 loc) · 35.4 KB
/
aioimaplib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
# -*- coding: utf-8 -*-
# aioimaplib : an IMAPrev4 lib using python asyncio
# Copyright (C) 2016 Bruno Thomas
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import asyncio
import logging
import ssl
from copy import copy
from datetime import datetime, timezone, timedelta
import time
from enum import Enum
import re
import functools
import random
from collections import namedtuple
# to avoid imap servers to kill the connection after 30mn idling
# cf https://www.imapwiki.org/ClientImplementation/Synchronization
TWENTY_NINE_MINUTES = 1740
STOP_WAIT_SERVER_PUSH = 'stop_wait_server_push'
log = logging.getLogger(__name__)
IMAP4_PORT = 143
IMAP4_SSL_PORT = 993
STARTED, CONNECTED, NONAUTH, AUTH, SELECTED, LOGOUT = 'STARTED', 'CONNECTED', 'NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'
CRLF = b'\r\n'
ID_MAX_PAIRS_COUNT = 30
ID_MAX_FIELD_LEN = 30
ID_MAX_VALUE_LEN = 1024
AllowedVersions = ('IMAP4REV1', 'IMAP4')
Exec = Enum('Exec', 'is_sync is_async')
Cmd = namedtuple('Cmd', 'name valid_states exec')
Commands = {
'APPEND': Cmd('APPEND', (AUTH, SELECTED), Exec.is_sync),
'AUTHENTICATE': Cmd('AUTHENTICATE', (NONAUTH,), Exec.is_sync),
'CAPABILITY': Cmd('CAPABILITY', (NONAUTH, AUTH, SELECTED), Exec.is_async),
'CHECK': Cmd('CHECK', (SELECTED,), Exec.is_async),
'CLOSE': Cmd('CLOSE', (SELECTED,), Exec.is_sync),
'COMPRESS': Cmd('COMPRESS', (AUTH,), Exec.is_sync),
'COPY': Cmd('COPY', (SELECTED,), Exec.is_async),
'CREATE': Cmd('CREATE', (AUTH, SELECTED), Exec.is_async),
'DELETE': Cmd('DELETE', (AUTH, SELECTED), Exec.is_async),
'DELETEACL': Cmd('DELETEACL', (AUTH, SELECTED), Exec.is_async),
'ENABLE': Cmd('ENABLE', (AUTH,), Exec.is_sync),
'EXAMINE': Cmd('EXAMINE', (AUTH, SELECTED), Exec.is_sync),
'EXPUNGE': Cmd('EXPUNGE', (SELECTED,), Exec.is_async),
'FETCH': Cmd('FETCH', (SELECTED,), Exec.is_async),
'GETACL': Cmd('GETACL', (AUTH, SELECTED), Exec.is_async),
'GETQUOTA': Cmd('GETQUOTA', (AUTH, SELECTED), Exec.is_async),
'GETQUOTAROOT': Cmd('GETQUOTAROOT', (AUTH, SELECTED), Exec.is_async),
'ID': Cmd('ID', (NONAUTH, AUTH, LOGOUT, SELECTED), Exec.is_async),
'IDLE': Cmd('IDLE', (SELECTED,), Exec.is_sync),
'LIST': Cmd('LIST', (AUTH, SELECTED), Exec.is_async),
'LOGIN': Cmd('LOGIN', (NONAUTH,), Exec.is_sync),
'LOGOUT': Cmd('LOGOUT', (NONAUTH, AUTH, LOGOUT, SELECTED), Exec.is_sync),
'LSUB': Cmd('LSUB', (AUTH, SELECTED), Exec.is_async),
'MYRIGHTS': Cmd('MYRIGHTS', (AUTH, SELECTED), Exec.is_async),
'MOVE': Cmd('MOVE', (SELECTED,), Exec.is_sync),
'NAMESPACE': Cmd('NAMESPACE', (AUTH, SELECTED), Exec.is_async),
'NOOP': Cmd('NOOP', (NONAUTH, AUTH, SELECTED), Exec.is_async),
'RENAME': Cmd('RENAME', (AUTH, SELECTED), Exec.is_async),
'SEARCH': Cmd('SEARCH', (SELECTED,), Exec.is_async),
'SELECT': Cmd('SELECT', (AUTH, SELECTED), Exec.is_sync),
'SETACL': Cmd('SETACL', (AUTH, SELECTED), Exec.is_sync),
'SETQUOTA': Cmd('SETQUOTA', (AUTH, SELECTED), Exec.is_sync),
'SORT': Cmd('SORT', (SELECTED,), Exec.is_async),
'STARTTLS': Cmd('STARTTLS', (NONAUTH,), Exec.is_sync),
'STATUS': Cmd('STATUS', (AUTH, SELECTED), Exec.is_async),
'STORE': Cmd('STORE', (SELECTED,), Exec.is_async),
'SUBSCRIBE': Cmd('SUBSCRIBE', (AUTH, SELECTED), Exec.is_sync),
'THREAD': Cmd('THREAD', (SELECTED,), Exec.is_async),
'UID': Cmd('UID', (SELECTED,), Exec.is_async),
'UNSUBSCRIBE': Cmd('UNSUBSCRIBE', (AUTH, SELECTED), Exec.is_sync),
# for testing
'DELAY': Cmd('DELAY', (AUTH, SELECTED), Exec.is_sync),
}
Response = namedtuple('Response', 'result lines')
def quoted(arg):
""" Given a string, return a quoted string as per RFC 3501, section 9.
Implementation copied from https://github.com/mjs/imapclient
(imapclient/imapclient.py), 3-clause BSD license
"""
if isinstance(arg, str):
arg = arg.replace('\\', '\\\\')
arg = arg.replace('"', '\\"')
q = '"'
else:
arg = arg.replace(b'\\', b'\\\\')
arg = arg.replace(b'"', b'\\"')
q = b'"'
return q + arg + q
def arguments_rfs2971(**kwargs):
if kwargs:
if len(kwargs) > ID_MAX_PAIRS_COUNT:
raise ValueError('Must not send more than 30 field-value pairs')
args = ['(']
for field, value in kwargs.items():
field = quoted(str(field))
value = quoted(str(value)) if value is not None else 'NIL'
if len(field) > ID_MAX_FIELD_LEN:
raise ValueError('Field: {} must not be longer than 30'.format(field))
if len(value) > ID_MAX_VALUE_LEN:
raise ValueError('Field: {} value: {} must not be longer than 1024'.format(field, value))
args.extend((field, value))
args.append(')')
else:
args = ['NIL']
return args
class Command(object):
def __init__(self, name, tag, *args, prefix=None, untagged_resp_name=None, loop=asyncio.get_event_loop(), timeout=None):
self.name = name
self.tag = tag
self.args = args
self.prefix = prefix + ' ' if prefix else None
self.untagged_resp_name = untagged_resp_name or name
self.response = None
self._exception = None
self._event = asyncio.Event(loop=loop)
self._loop = loop
self._timeout = timeout
self._timer = asyncio.Handle(lambda: None, None, loop) # fake timer
self._set_timer()
self._literal_data = None
self._expected_size = 0
def __repr__(self):
return '{tag} {prefix}{name}{space}{args}'.format(
tag=self.tag, prefix=self.prefix or '', name=self.name,
space=' ' if self.args else '', args=' '.join(self.args))
# for tests
def __eq__(self, other):
return other is not None and other.tag == self.tag and other.name == self.name and other.args == self.args
def close(self, line, result):
self.append_to_resp(line, result=result)
self._timer.cancel()
self._event.set()
def begin_literal_data(self, expected_size, literal_data=b''):
self._expected_size = expected_size
self._literal_data = b''
return self.append_literal_data(literal_data)
def wait_literal_data(self):
return self._expected_size != 0 and len(self._literal_data) != self._expected_size
def wait_data(self):
return self.wait_literal_data()
def append_literal_data(self, data):
nb_bytes_to_add = self._expected_size - len(self._literal_data)
self._literal_data += data[0:nb_bytes_to_add]
if not self.wait_literal_data():
self.append_to_resp(self._literal_data)
self._end_literal_data()
self._reset_timer()
return data[nb_bytes_to_add:]
def append_to_resp(self, line, result='Pending'):
if self.response is None:
self.response = Response(result, [line])
else:
old = self.response
self.response = Response(result, old.lines + [line])
self._reset_timer()
@asyncio.coroutine
def wait(self):
yield from self._event.wait()
if self._exception is not None:
raise self._exception
def flush(self):
pass
def _end_literal_data(self):
self._expected_size = 0
self._literal_data = None
def _set_timer(self):
if self._timeout is not None:
self._timer = self._loop.call_later(self._timeout, self._timeout_callback)
def _timeout_callback(self):
self._exception = CommandTimeout(self)
self.close(str(self._exception), 'KO')
def _reset_timer(self):
self._timer.cancel()
self._set_timer()
class FetchCommand(Command):
FETCH_MESSAGE_DATA_RE = re.compile(r'[0-9]+ FETCH \(')
def __init__(self, tag, *args, prefix=None, untagged_resp_name=None,
loop=asyncio.get_event_loop(), timeout=None):
super().__init__('FETCH', tag, *args, prefix=prefix, untagged_resp_name=untagged_resp_name,
loop=loop, timeout=timeout)
def wait_data(self):
if self.response is None:
return False
last_fetch_index = 0
for index, line in enumerate(self.response.lines):
if isinstance(line, str) and self.FETCH_MESSAGE_DATA_RE.match(line):
last_fetch_index = index
return not matched_parenthesis(''.join(filter(lambda l: isinstance(l, str),
self.response.lines[last_fetch_index:])))
def matched_parenthesis(string):
return string.count('(') == string.count(')')
class IdleCommand(Command):
def __init__(self, tag, queue, *args, prefix=None, untagged_resp_name=None,
loop=asyncio.get_event_loop(), timeout=None):
super().__init__('IDLE', tag, *args, prefix=prefix, untagged_resp_name=untagged_resp_name,
loop=loop, timeout=timeout)
self.queue = queue
self.buffer = list()
def append_to_resp(self, line, result='Pending'):
if result != 'Pending':
super().append_to_resp(line, result)
else:
self.buffer.append(line)
def flush(self):
if self.buffer:
self.queue.put_nowait(copy(self.buffer))
self.buffer.clear()
class AioImapException(Exception):
def __init__(self, reason):
super().__init__(reason)
class Error(AioImapException):
def __init__(self, reason):
super().__init__(reason)
class Abort(Error):
def __init__(self, reason):
super().__init__(reason)
class CommandTimeout(AioImapException):
def __init__(self, command):
self.command = command
class IncompleteRead(AioImapException):
def __init__(self, cmd, data=b''):
self.cmd = cmd
self.data = data
def change_state(coro):
@functools.wraps(coro)
@asyncio.coroutine
def wrapper(self, *args, **kargs):
with (yield from self.state_condition):
res = yield from coro(self, *args, **kargs)
log.debug('state -> %s' % self.state)
self.state_condition.notify_all()
return res
return wrapper
# cf https://tools.ietf.org/html/rfc3501#section-9
# untagged responses types
literal_data_re = re.compile(rb'.*\{(?P<size>\d+)\}$')
message_data_re = re.compile(r'[0-9]+ ((FETCH)|(EXPUNGE))')
tagged_status_response_re = re.compile(r'[A-Z0-9]+ ((OK)|(NO)|(BAD))')
class IMAP4ClientProtocol(asyncio.Protocol):
def __init__(self, loop, conn_lost_cb=None):
self.loop = loop
self.transport = None
self.state = STARTED
self.state_condition = asyncio.Condition()
self.capabilities = set()
self.pending_async_commands = dict()
self.pending_sync_command = None
self.idle_queue = asyncio.Queue()
self.imap_version = None
self.literal_data = None
self.incomplete_line = b''
self.current_command = None
self.conn_lost_cb = conn_lost_cb
self.tagnum = 0
self.tagpre = int2ap(random.randint(4096, 65535))
def connection_made(self, transport):
self.transport = transport
self.state = CONNECTED
def data_received(self, d):
log.debug('Received : %s' % d)
try:
self._handle_responses(self.incomplete_line + d, self._handle_line, self.current_command)
self.incomplete_line = b''
self.current_command = None
except IncompleteRead as incomplete_read:
self.current_command = incomplete_read.cmd
self.incomplete_line = incomplete_read.data
def connection_lost(self, exc):
log.debug('connection lost: %s', exc)
if self.conn_lost_cb is not None:
self.conn_lost_cb(exc)
def _handle_responses(self, data, line_handler, current_cmd=None):
if not data:
if self.pending_sync_command is not None:
self.pending_sync_command.flush()
if current_cmd is not None and current_cmd.wait_data():
raise IncompleteRead(current_cmd)
return
if current_cmd is not None and current_cmd.wait_literal_data():
data = current_cmd.append_literal_data(data)
if current_cmd.wait_literal_data():
raise IncompleteRead(current_cmd)
line, separator, tail = data.partition(CRLF)
if not separator:
raise IncompleteRead(current_cmd, data)
cmd = line_handler(line.decode(), current_cmd)
begin_literal = literal_data_re.match(line)
if begin_literal:
size = int(begin_literal.group('size'))
if cmd is None:
cmd = Command('NIL', 'unused')
cmd.begin_literal_data(size)
self._handle_responses(tail, line_handler, current_cmd=cmd)
elif cmd is not None and cmd.wait_data():
self._handle_responses(tail, line_handler, current_cmd=cmd)
else:
self._handle_responses(tail, line_handler)
def _handle_line(self, line, current_cmd):
if not line:
return
if self.state == CONNECTED:
asyncio.ensure_future(self.welcome(line))
elif tagged_status_response_re.match(line):
self._response_done(line)
elif current_cmd is not None:
current_cmd.append_to_resp(line)
return current_cmd
elif line.startswith('*'):
return self._untagged_response(line)
elif line.startswith('+'):
self._continuation(line)
else:
log.info('unknown data received %s' % line)
def send(self, line):
data = ('%s\r\n' % line).encode()
log.debug('Sending : %s' % data)
self.transport.write(data)
@asyncio.coroutine
def execute(self, command):
if self.state not in Commands.get(command.name).valid_states:
raise Abort('command %s illegal in state %s' % (command.name, self.state))
if self.pending_sync_command is not None:
yield from self.pending_sync_command.wait()
if Commands.get(command.name).exec == Exec.is_sync:
if self.pending_async_commands:
yield from self.wait_async_pending_commands()
self.pending_sync_command = command
else:
if self.pending_async_commands.get(command.untagged_resp_name) is not None:
yield from self.pending_async_commands[command.untagged_resp_name].wait()
self.pending_async_commands[command.untagged_resp_name] = command
self.send(str(command))
try:
yield from command.wait()
except CommandTimeout:
if Commands.get(command.name).exec == Exec.is_sync:
self.pending_sync_command = None
else:
self.pending_async_commands.pop(command.untagged_resp_name, None)
raise
return command.response
@change_state
@asyncio.coroutine
def welcome(self, command):
if 'PREAUTH' in command:
self.state = AUTH
elif 'OK' in command:
self.state = NONAUTH
else:
raise Error(command)
yield from self.capability()
@change_state
@asyncio.coroutine
def login(self, user, password):
response = yield from self.execute(
Command('LOGIN', self.new_tag(), user, '%s' % quoted(password), loop=self.loop))
if 'OK' == response.result:
self.state = AUTH
for line in response.lines:
if 'CAPABILITY' in line:
self.capabilities = self.capabilities.union(set(line.replace('CAPABILITY', '').strip().split()))
return response
@change_state
@asyncio.coroutine
def logout(self):
response = (yield from self.execute(Command('LOGOUT', self.new_tag(), loop=self.loop)))
if 'OK' == response.result:
self.state = LOGOUT
return response
@change_state
@asyncio.coroutine
def select(self, mailbox='INBOX'):
response = yield from self.execute(
Command('SELECT', self.new_tag(), mailbox, loop=self.loop))
if 'OK' == response.result:
self.state = SELECTED
return response
@change_state
@asyncio.coroutine
def close(self):
response = yield from self.execute(Command('CLOSE', self.new_tag(), loop=self.loop))
if response.result == 'OK':
self.state = AUTH
return response
@asyncio.coroutine
def idle(self):
if 'IDLE' not in self.capabilities:
raise Abort('server has not IDLE capability')
return (yield from self.execute(IdleCommand(self.new_tag(), self.idle_queue, loop=self.loop)))
def has_pending_idle_command(self):
return self.pending_sync_command is not None and self.pending_sync_command.name == 'IDLE'
def idle_done(self):
self.send('DONE')
@asyncio.coroutine
def search(self, *criteria, charset='utf-8', by_uid=False):
args = ('CHARSET', charset) + criteria if charset is not None else criteria
prefix = 'UID' if by_uid else ''
return (yield from self.execute(
Command('SEARCH', self.new_tag(), *args, prefix=prefix, loop=self.loop)))
@asyncio.coroutine
def fetch(self, message_set, message_parts, by_uid=False, timeout=None):
return (yield from self.execute(
FetchCommand(self.new_tag(), message_set, message_parts,
prefix='UID' if by_uid else '', loop=self.loop, timeout=timeout)))
@asyncio.coroutine
def store(self, *args, by_uid=False):
return (yield from self.execute(
Command('STORE', self.new_tag(), *args,
prefix='UID' if by_uid else '', untagged_resp_name='FETCH', loop=self.loop)))
@asyncio.coroutine
def expunge(self, *args, by_uid=False):
return (yield from self.execute(
Command('EXPUNGE', self.new_tag(), *args,
prefix='UID' if by_uid else '', loop=self.loop)))
@asyncio.coroutine
def uid(self, command, *criteria, timeout=None):
if self.state not in Commands.get('UID').valid_states:
raise Abort('command UID illegal in state %s' % self.state)
if command.upper() == 'FETCH':
return (yield from self.fetch(criteria[0], criteria[1], by_uid=True, timeout=timeout))
if command.upper() == 'STORE':
return (yield from self.store(*criteria, by_uid=True))
if command.upper() == 'COPY':
return (yield from self.copy(*criteria, by_uid=True))
if command.upper() == 'MOVE':
return (yield from self.move(*criteria, by_uid=True))
if command.upper() == 'EXPUNGE':
if 'UIDPLUS' not in self.capabilities:
raise Abort('EXPUNGE with uids is only valid with UIDPLUS capability. UIDPLUS not in (%s)' % self.capabilities)
return (yield from self.expunge(*criteria, by_uid=True))
raise Abort('command UID only possible with COPY, FETCH, EXPUNGE (w/UIDPLUS) or STORE (was %s)' % command.upper())
@asyncio.coroutine
def copy(self, *args, by_uid=False):
return (yield from self.execute(
Command('COPY', self.new_tag(), *args, prefix='UID' if by_uid else '', loop=self.loop)))
@asyncio.coroutine
def move(self, uid_set, mailbox, by_uid=False):
if 'MOVE' not in self.capabilities:
raise Abort('server has not MOVE capability')
return (yield from self.execute(
Command('MOVE', self.new_tag(), uid_set, mailbox, prefix='UID' if by_uid else '', loop=self.loop)))
@asyncio.coroutine
def capability(self):
response = yield from self.execute(Command('CAPABILITY', self.new_tag(), loop=self.loop))
capability_list = response.lines[0].split()
self.capabilities = set(capability_list)
version = capability_list[0].upper()
if version not in AllowedVersions:
raise Error('server not IMAP4 compliant')
else:
self.imap_version = version
@asyncio.coroutine
def append(self, message_bytes, mailbox='INBOX', flags=None, date=None, timeout=None):
args = [mailbox]
if flags is not None:
if (flags[0], flags[-1]) != ('(', ')'):
args.append('(%s)' % flags)
else:
args.append(flags)
if date is not None:
args.append(time2internaldate(date))
args.append('{%s}' % len(message_bytes))
self.literal_data = message_bytes
return (yield from self.execute(Command('APPEND', self.new_tag(), *args, loop=self.loop, timeout=timeout)))
@asyncio.coroutine
def id(self, **kwargs):
args = arguments_rfs2971(**kwargs)
return (yield from self.execute(Command('ID', self.new_tag(), *args, loop=self.loop)))
simple_commands = {'NOOP', 'CHECK', 'STATUS', 'CREATE', 'DELETE', 'RENAME',
'SUBSCRIBE', 'UNSUBSCRIBE', 'LSUB', 'LIST', 'EXAMINE', 'ENABLE'}
@asyncio.coroutine
def namespace(self):
if 'NAMESPACE' not in self.capabilities:
raise Abort('server has not NAMESPACE capability')
return (yield from self.execute(Command('NAMESPACE', self.new_tag(), loop=self.loop)))
@asyncio.coroutine
def simple_command(self, name, *args):
if name not in self.simple_commands:
raise NotImplementedError('simple command only available for %s' % self.simple_commands)
return (yield from self.execute(Command(name, self.new_tag(), *args, loop=self.loop)))
@asyncio.coroutine
def wait_async_pending_commands(self):
yield from asyncio.wait([asyncio.ensure_future(cmd.wait()) for cmd in self.pending_async_commands.values()])
@asyncio.coroutine
def wait(self, state_regexp):
state_re = re.compile(state_regexp)
with (yield from self.state_condition):
yield from self.state_condition.wait_for(lambda: state_re.match(self.state))
def _untagged_response(self, line):
line = line.replace('* ', '')
if self.pending_sync_command is not None:
self.pending_sync_command.append_to_resp(line)
command = self.pending_sync_command
else:
match = message_data_re.match(line)
if match:
cmd_name, text = match.group(1), match.string
else:
cmd_name, _, text = line.partition(' ')
command = self.pending_async_commands.get(cmd_name.upper())
if command is not None:
command.append_to_resp(text)
else:
# noop is async and servers can send untagged responses
command = self.pending_async_commands.get('NOOP')
if command is not None:
command.append_to_resp(line)
else:
log.info('ignored untagged response : %s' % line)
return command
def _response_done(self, line):
log.debug('tagged status %s' % line)
tag, _, response = line.partition(' ')
if self.pending_sync_command is not None:
if self.pending_sync_command.tag != tag:
raise Abort('unexpected tagged response with pending sync command (%s) response: %s' %
(self.pending_sync_command, response))
command = self.pending_sync_command
self.pending_sync_command = None
else:
cmds = self._find_pending_async_cmd_by_tag(tag)
if len(cmds) == 0:
raise Abort('unexpected tagged (%s) response: %s' % (tag, response))
elif len(cmds) > 1:
raise Error('inconsistent state : two commands have the same tag (%s)' % cmds)
command = cmds.pop()
self.pending_async_commands.pop(command.untagged_resp_name)
response_result, _, response_text = response.partition(' ')
command.close(response_text, result=response_result)
def _continuation(self, line):
if self.pending_sync_command is not None and self.pending_sync_command.name == 'APPEND':
if self.literal_data is None:
Abort('asked for literal data but have no literal data to send')
self.transport.write(self.literal_data)
self.transport.write(CRLF)
self.literal_data = None
elif self.pending_sync_command is not None:
log.debug('continuation line appended to pending sync command %s : %s' % (self.pending_sync_command, line))
self.pending_sync_command.append_to_resp(line)
self.pending_sync_command.flush()
else:
log.info('server says %s (ignored)' % line)
def new_tag(self):
tag = self.tagpre + str(self.tagnum)
self.tagnum += 1
return tag
def _find_pending_async_cmd_by_tag(self, tag):
return [c for c in self.pending_async_commands.values() if c is not None and c.tag == tag]
class IMAP4(object):
TIMEOUT_SECONDS = 10
def __init__(self, host='127.0.0.1', port=IMAP4_PORT, loop=asyncio.get_event_loop(), timeout=TIMEOUT_SECONDS, conn_lost_cb=None, ssl_context=None):
self.timeout = timeout
self.port = port
self.host = host
self.protocol = None
self._idle_waiter = None
self.create_client(host, port, loop, conn_lost_cb, ssl_context)
def create_client(self, host, port, loop, conn_lost_cb=None, ssl_context=None):
self.protocol = IMAP4ClientProtocol(loop, conn_lost_cb)
loop.create_task(loop.create_connection(lambda: self.protocol, host, port, ssl=ssl_context))
def get_state(self):
return self.protocol.state
@asyncio.coroutine
def wait_hello_from_server(self):
yield from asyncio.wait_for(self.protocol.wait('AUTH|NONAUTH'), self.timeout)
@asyncio.coroutine
def login(self, user, password):
return (yield from asyncio.wait_for(self.protocol.login(user, password), self.timeout))
@asyncio.coroutine
def logout(self):
return (yield from asyncio.wait_for(self.protocol.logout(), self.timeout))
@asyncio.coroutine
def select(self, mailbox='INBOX'):
return (yield from asyncio.wait_for(self.protocol.select(mailbox), self.timeout))
@asyncio.coroutine
def search(self, *criteria, charset='utf-8'):
return (yield from asyncio.wait_for(self.protocol.search(*criteria, charset=charset), self.timeout))
@asyncio.coroutine
def uid_search(self, *criteria, charset='utf-8'):
return (
yield from asyncio.wait_for(self.protocol.search(*criteria, by_uid=True, charset=charset), self.timeout))
@asyncio.coroutine
def uid(self, command, *criteria):
return (yield from self.protocol.uid(command, *criteria, timeout=self.timeout))
@asyncio.coroutine
def store(self, *criteria):
return (yield from asyncio.wait_for(self.protocol.store(*criteria), self.timeout))
@asyncio.coroutine
def copy(self, *criteria):
return (yield from asyncio.wait_for(self.protocol.copy(*criteria), self.timeout))
@asyncio.coroutine
def expunge(self):
return (yield from asyncio.wait_for(self.protocol.expunge(), self.timeout))
@asyncio.coroutine
def fetch(self, message_set, message_parts):
return (yield from self.protocol.fetch(message_set, message_parts, timeout=self.timeout))
@asyncio.coroutine
def idle(self):
return (yield from self.protocol.idle())
def idle_done(self):
if self._idle_waiter is not None:
self._idle_waiter.cancel()
self.protocol.idle_done()
@asyncio.coroutine
def stop_wait_server_push(self):
if self.protocol.has_pending_idle_command():
yield from self.protocol.idle_queue.put(STOP_WAIT_SERVER_PUSH)
return True
return False
@asyncio.coroutine
def wait_server_push(self, timeout=TWENTY_NINE_MINUTES):
return (yield from asyncio.wait_for(self.protocol.idle_queue.get(), timeout=timeout))
@asyncio.coroutine
def idle_start(self, timeout=TWENTY_NINE_MINUTES):
if self._idle_waiter is not None:
self._idle_waiter.cancel()
idle = asyncio.ensure_future(self.idle())
self._idle_waiter = self.protocol.loop.call_later(timeout, lambda: asyncio.ensure_future(self.stop_wait_server_push()))
yield from self.wait_server_push(self.timeout) # idling continuation
return idle
def has_pending_idle(self):
return self.protocol.has_pending_idle_command()
@asyncio.coroutine
def id(self, **kwargs):
return (yield from asyncio.wait_for(self.protocol.id(**kwargs), self.timeout))
@asyncio.coroutine
def namespace(self):
return (yield from asyncio.wait_for(self.protocol.namespace(), self.timeout))
@asyncio.coroutine
def noop(self):
return (yield from asyncio.wait_for(self.protocol.simple_command('NOOP'), self.timeout))
@asyncio.coroutine
def check(self):
return (yield from asyncio.wait_for(self.protocol.simple_command('CHECK'), self.timeout))
@asyncio.coroutine
def examine(self, mailbox='INBOX'):
return (yield from asyncio.wait_for(self.protocol.simple_command('EXAMINE', mailbox), self.timeout))
@asyncio.coroutine
def status(self, mailbox, names):
return (yield from asyncio.wait_for(self.protocol.simple_command('STATUS', mailbox, names), self.timeout))
@asyncio.coroutine
def subscribe(self, mailbox):
return (yield from asyncio.wait_for(self.protocol.simple_command('SUBSCRIBE', mailbox), self.timeout))
@asyncio.coroutine
def unsubscribe(self, mailbox):
return (yield from asyncio.wait_for(self.protocol.simple_command('UNSUBSCRIBE', mailbox), self.timeout))
@asyncio.coroutine
def lsub(self, reference_name, mailbox_name):
return (yield from asyncio.wait_for(self.protocol.simple_command('LSUB', reference_name, mailbox_name), self.timeout))
@asyncio.coroutine
def create(self, mailbox_name):
return (yield from asyncio.wait_for(self.protocol.simple_command('CREATE', mailbox_name), self.timeout))
@asyncio.coroutine
def delete(self, mailbox_name):
return (yield from asyncio.wait_for(self.protocol.simple_command('DELETE', mailbox_name), self.timeout))
@asyncio.coroutine
def rename(self, old_mailbox_name, new_mailbox_name):
return (yield from asyncio.wait_for(self.protocol.simple_command('RENAME', old_mailbox_name, new_mailbox_name), self.timeout))
@asyncio.coroutine
def list(self, reference_name, mailbox_pattern):
return (yield from asyncio.wait_for(self.protocol.simple_command('LIST', reference_name, mailbox_pattern), self.timeout))
@asyncio.coroutine
def append(self, message_bytes, mailbox='INBOX', flags=None, date=None):
return (yield from self.protocol.append(message_bytes, mailbox, flags, date, timeout=self.timeout))
@asyncio.coroutine
def close(self):
return (yield from asyncio.wait_for(self.protocol.close(), self.timeout))
@asyncio.coroutine
def move(self, uid_set, mailbox):
return (yield from asyncio.wait_for(self.protocol.move(uid_set, mailbox), self.timeout))
@asyncio.coroutine
def enable(self, capability):
if 'ENABLE' not in self.protocol.capabilities:
raise Abort('server has not ENABLE capability')
return (yield from asyncio.wait_for(self.protocol.simple_command('ENABLE', capability), self.timeout))
def has_capability(self, capability):
return capability in self.protocol.capabilities
def extract_exists(response):
for line in response.lines:
if 'EXISTS' in line:
return int(line.replace(' EXISTS', ''))
class IMAP4_SSL(IMAP4):
def __init__(self, host='127.0.0.1', port=IMAP4_SSL_PORT, loop=asyncio.get_event_loop(),
timeout=IMAP4.TIMEOUT_SECONDS, ssl_context=None):
super().__init__(host, port, loop, timeout, None, ssl_context)
def create_client(self, host, port, loop, conn_lost_cb=None, ssl_context=None):
if ssl_context is None:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
super().create_client(host, port, loop, conn_lost_cb, ssl_context)
# functions from imaplib
def int2ap(num):
"""Convert integer to A-P string representation."""
val = ''
ap = 'ABCDEFGHIJKLMNOP'
num = int(abs(num))
while num:
num, mod = divmod(num, 16)
val += ap[mod:mod + 1]
return val
Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ')
Mon2num = {s.encode():n+1 for n, s in enumerate(Months[1:])}
def time2internaldate(date_time):
"""Convert date_time to IMAP4 INTERNALDATE representation.
Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'. The
date_time argument can be a number (int or float) representing
seconds since epoch (as returned by time.time()), a 9-tuple
representing local time, an instance of time.struct_time (as
returned by time.localtime()), an aware datetime instance or a
double-quoted string. In the last case, it is assumed to already
be in the correct format.
"""
if isinstance(date_time, (int, float)):
dt = datetime.fromtimestamp(date_time, timezone.utc).astimezone()
elif isinstance(date_time, tuple):
try:
gmtoff = date_time.tm_gmtoff
except AttributeError:
if time.daylight:
dst = date_time[8]
if dst == -1:
dst = time.localtime(time.mktime(date_time))[8]
gmtoff = -(time.timezone, time.altzone)[dst]
else:
gmtoff = -time.timezone
delta = timedelta(seconds=gmtoff)
dt = datetime(*date_time[:6], tzinfo=timezone(delta))
elif isinstance(date_time, datetime):
if date_time.tzinfo is None:
raise ValueError("date_time must be aware")
dt = date_time
elif isinstance(date_time, str) and (date_time[0],date_time[-1]) == ('"','"'):
return date_time # Assume in correct format
else:
raise ValueError("date_time not of a known type")
fmt = '"%d-{}-%Y %H:%M:%S %z"'.format(Months[dt.month])
return dt.strftime(fmt)