Skip to content

Commit

Permalink
[aiolib][fix] generalization of literal treatment
Browse files Browse the repository at this point in the history
  • Loading branch information
bamthomas committed Jun 10, 2017
1 parent b1a4a9b commit fc61ea1
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 145 deletions.
141 changes: 70 additions & 71 deletions aioimaplib/aioimaplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,28 +108,29 @@ def __repr__(self):
tag=self.tag, prefix=self.prefix or '', name=self.name,
space=' ' if self.args else '', args=' '.join(self.args))

# for tests
def __eq__(self, other):
return other is not None and other.tag == self.tag and other.name == self.name and other.args == self.args

def close(self, line, result):
self.append_to_resp(line, result=result)
self._timer.cancel()
self._event.set()

def begin_literal_data(self, data, expected_size):
self._literal_data = data
def begin_literal_data(self, expected_size, literal_data=b''):
self._expected_size = expected_size
self._reset_timer()

def end_literal_data(self):
self.append_to_resp(self._literal_data.rstrip(b')'))
self._expected_size = 0
self._literal_data = None
self._reset_timer()
self._literal_data = b''
return self.append_literal_data(literal_data)

def has_literal_data(self):
return self._expected_size != 0 and len(self._literal_data) != self._expected_size

def append_literal_data(self, data):
nb_bytes_to_add = self._expected_size - len(self._literal_data)
self._literal_data += data[0:nb_bytes_to_add]
if not self.has_literal_data():
self.append_to_resp(self._literal_data)
self._end_literal_data()
self._reset_timer()
return data[nb_bytes_to_add:]

Expand All @@ -147,6 +148,10 @@ def wait(self):
if self._exception is not None:
raise self._exception

def _end_literal_data(self):
self._expected_size = 0
self._literal_data = None

def _set_timer(self):
if self._timeout is not None:
self._timer = self._loop.call_later(self._timeout, self._timeout_callback)
Expand Down Expand Up @@ -175,6 +180,12 @@ def __init__(self, command):
self.command = command


class IncompleteLiteral(asyncio.IncompleteReadError):
def __init__(self, partial, expected, cmd):
super().__init__(partial, expected)
self.cmd = cmd


def change_state(coro):
@functools.wraps(coro)
@asyncio.coroutine
Expand All @@ -190,8 +201,8 @@ def wrapper(self, *args, **kargs):

# cf https://tools.ietf.org/html/rfc3501#section-9
# untagged responses types
fetch_message_with_literal_data_re = re.compile(rb'\* [0-9]+ FETCH [\w \\\[\]\(\)\.\-\:\+\"\;\$]+ \{(?P<size>\d+)\}\r\n')
message_data_without_literal_re = re.compile(r'[0-9]+ ((FETCH)|(EXPUNGE))([\w \(\)]+)?')
literal_data_re = re.compile(rb'.*\{(?P<size>\d+)\}$')
message_data_re = re.compile(r'[0-9]+ ((FETCH)|(EXPUNGE))')
tagged_status_response_re = re.compile(r'[A-Z0-9]+ ((OK)|(NO)|(BAD))')


Expand All @@ -208,6 +219,7 @@ def __init__(self, loop):
self.imap_version = None
self.literal_data = None
self.incomplete_line = b''
self.current_command = None

self.tagnum = 0
self.tagpre = int2ap(random.randint(4096, 65535))
Expand All @@ -218,49 +230,56 @@ def connection_made(self, transport):

def data_received(self, d):
log.debug('Received : %s' % d)
if self._incomplete_fetch_literal():
data = self._append_fetch_data(d)
else:
data = d
try:
self._handle_responses(data, self._handle_line, self._untagged_fetch_with_literal, self.incomplete_line)
self._handle_responses(d, self._handle_line, self.incomplete_line, self.current_command)
self.incomplete_line = b''
except asyncio.IncompleteReadError as incomplete_error:
log.debug('Incomplete line, storing partial : %s' % incomplete_error.partial)
self.incomplete_line = incomplete_error.partial

def _handle_responses(self, d, line_handler, fetch_handler, incomplete_line=b''):
self.current_command = None
except IncompleteLiteral as incomplete_literal:
log.debug('Incomplete literal, storing partial : %s' % incomplete_literal.partial)
self.incomplete_line = incomplete_literal.partial
self.current_command = incomplete_literal.cmd
except asyncio.IncompleteReadError as incomplete_read:
log.debug('Incomplete line, storing partial : %s' % incomplete_read.partial)
self.incomplete_line = incomplete_read.partial

def _handle_responses(self, d, line_handler, incomplete_line=b'', current_cmd=None):
if not d:
return
data = incomplete_line + d
match_fetch_message = fetch_message_with_literal_data_re.match(data)
if match_fetch_message:
head, crlf, tail = data.partition(CRLF)
msg_size = match_fetch_message.group('size')
# we want to cut -----------------------
# ...here |
# so 4+1 v
# b'* 3 FETCH (UID 3 RFC822 {4}\r\nmail)\r\n...
end_message_index_with_parenthesis = int(msg_size) + 1

fetch_handler(head + crlf + tail[0:end_message_index_with_parenthesis], end_message_index_with_parenthesis)
after_fetch = tail[end_message_index_with_parenthesis:]
self._handle_responses(after_fetch, line_handler, fetch_handler)
if current_cmd is not None and current_cmd.has_literal_data():
data = current_cmd.append_literal_data(data)

line, separator, tail = data.partition(CRLF)
if not separator:
raise asyncio.IncompleteReadError(data, b'line should end with CRLF')

cmd = line_handler(line.decode(), current_cmd)

has_literal = literal_data_re.match(line)
if has_literal:
size = int(has_literal.group('size'))
if cmd is None:
cmd = Command('NIL', 'unused')
cmd.begin_literal_data(size)

if size > len(tail):
raise IncompleteLiteral(tail, b'incomplete literal', cmd)

after_literal = cmd.append_literal_data(tail)
self._handle_responses(after_literal, line_handler, current_cmd=cmd)
else:
line, separator, tail = data.partition(CRLF)
if not separator:
raise asyncio.IncompleteReadError(data, b'line should end with CRLF')
else:
line_handler(line.decode())
self._handle_responses(tail, line_handler, fetch_handler)
self._handle_responses(tail, line_handler)

def _handle_line(self, line):
def _handle_line(self, line, current_cmd):
if not line:
return
if current_cmd is not None:
current_cmd.append_to_resp(line)
return current_cmd
elif self.state == CONNECTED:
asyncio.async(self.welcome(line))
elif line.startswith('*'):
self._untagged_response(line.replace('* ', ''))
return self._untagged_response(line.replace('* ', ''))
elif line.startswith('+'):
self._continuation(line.replace('+ ', ''))
elif tagged_status_response_re.match(line):
Expand Down Expand Up @@ -458,49 +477,29 @@ def wait(self, state_regexp):
with (yield from self.state_condition):
yield from self.state_condition.wait_for(lambda: state_re.match(self.state))

def _untagged_fetch_with_literal(self, raw_line, msg_size):
pending_fetch = self.pending_async_commands.get('FETCH')
if pending_fetch is None:
raise Abort('unexpected fetch message (%r) response:' % raw_line)
msg_header, _, msg = raw_line.partition(CRLF)
pending_fetch.append_to_resp(msg_header)
if len(msg) < msg_size:
# email message is not complete we should wait the future chunks
pending_fetch.begin_literal_data(msg, msg_size)
else:
pending_fetch.append_to_resp(msg.rstrip(b')'))

def _incomplete_fetch_literal(self):
return 'FETCH' in self.pending_async_commands and \
self.pending_async_commands.get('FETCH').has_literal_data()

def _append_fetch_data(self, data):
pending_fetch = self.pending_async_commands.get('FETCH')
rest = pending_fetch.append_literal_data(data)
if not pending_fetch.has_literal_data():
pending_fetch.end_literal_data()
return rest

def has_pending_idle_command(self):
return self.pending_sync_command is not None and self.pending_sync_command.name == 'IDLE'

def _untagged_response(self, line):
if self.pending_sync_command is not None:
if self.has_pending_idle_command():
asyncio.async(self.idle_queue.put(line))
return
else:
self.pending_sync_command.append_to_resp(line)
command = self.pending_sync_command
else:
match = message_data_without_literal_re.match(line)
match = message_data_re.match(line)
if match:
command, text = match.group(1), match.string
cmd_name, text = match.group(1), match.string
else:
command, _, text = line.partition(' ')
pending_async_command = self.pending_async_commands.get(command.upper())
if pending_async_command is not None:
pending_async_command.append_to_resp(text)
cmd_name, _, text = line.partition(' ')
command = self.pending_async_commands.get(cmd_name.upper())
if command is not None:
command.append_to_resp(text)
else:
log.info('ignored untagged response : %s' % line)
return command

def _response_done(self, line):
tag, _, response = line.partition(' ')
Expand Down
2 changes: 1 addition & 1 deletion aioimaplib/tests/test_acceptance_aioimaplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ def test_file_with_attachement(self):
result, data = yield from imap_client.fetch('1', '(RFC822)')

self.assertEqual('OK', result)
self.assertEqual([b'* 1 FETCH (UID 1 RFC822 {418898}', mail.as_bytes(), 'FETCH completed.'], data)
self.assertEqual(['1 FETCH (UID 1 RFC822 {418898}', mail.as_bytes(), ')', 'FETCH completed.'], data)

0 comments on commit fc61ea1

Please sign in to comment.