Skip to content

Commit

Permalink
Fix IDLE waits by backend
Browse files Browse the repository at this point in the history
  • Loading branch information
icgood committed Aug 2, 2020
1 parent d5f032e commit c83845c
Show file tree
Hide file tree
Showing 15 changed files with 92 additions and 81 deletions.
3 changes: 2 additions & 1 deletion pymap/backend/dict/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def demo_password(self) -> str:

@classmethod
def parse_args(cls, args: Namespace) -> Mapping[str, Any]:
return {'demo_data': args.demo_data,
return {**super().parse_args(args),
'demo_data': args.demo_data,
'demo_user': args.demo_user,
'demo_password': args.demo_password}

Expand Down
19 changes: 15 additions & 4 deletions pymap/backend/dict/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from weakref import finalize, WeakKeyDictionary, WeakValueDictionary

from pymap.bytes import HashStream
from pymap.concurrent import ReadWriteLock
from pymap.concurrent import Event, ReadWriteLock
from pymap.context import subsystem
from pymap.flags import FlagOp
from pymap.interfaces.message import CachedMessage
Expand Down Expand Up @@ -194,6 +194,7 @@ def __init__(self, content_cache: _ContentCache,
self._content_cache = content_cache
self._thread_cache = thread_cache
self._readonly = False
self._updated = subsystem.get().new_event()
self._messages_lock = subsystem.get().new_rwlock()
self._selected_set = SelectedSet()
self._uid_validity = MailboxSnapshot.new_uid_validity()
Expand Down Expand Up @@ -221,8 +222,11 @@ def messages_lock(self) -> ReadWriteLock:
def selected_set(self) -> SelectedSet:
return self._selected_set

async def update_selected(self, selected: SelectedMailbox) \
-> SelectedMailbox:
async def update_selected(self, selected: SelectedMailbox, *,
wait_on: Event = None) -> SelectedMailbox:
if wait_on is not None:
either_event = wait_on.or_event(self._updated)
await either_event.wait()
mod_sequence = selected.mod_sequence
selected.mod_sequence = self._mod_sequences.highest
if mod_sequence is None:
Expand All @@ -248,6 +252,7 @@ async def append(self, append_msg: AppendMessage, *,
recent=recent, content=content)
self._messages[new_uid] = message
self._mod_sequences.update([new_uid])
self._updated.set()
return message

async def copy(self, uid: int, destination: MailboxData, *,
Expand All @@ -262,6 +267,7 @@ async def copy(self, uid: int, destination: MailboxData, *,
new_msg = Message.copy(message, uid=dest_uid, recent=recent)
destination._messages[dest_uid] = new_msg
destination._mod_sequences.update([dest_uid])
destination._updated.set()
return dest_uid

async def move(self, uid: int, destination: MailboxData, *,
Expand All @@ -272,11 +278,13 @@ async def move(self, uid: int, destination: MailboxData, *,
except KeyError:
return None
self._mod_sequences.expunge([uid])
self._updated.set()
async with destination.messages_lock.write_lock():
destination._max_uid = dest_uid = destination._max_uid + 1
new_msg = Message.copy(message, uid=dest_uid, recent=recent)
destination._messages[dest_uid] = new_msg
destination._mod_sequences.update([dest_uid])
destination._updated.set()
return dest_uid

async def get(self, uid: int, cached_msg: CachedMessage) -> Message:
Expand All @@ -292,9 +300,10 @@ async def get(self, uid: int, cached_msg: CachedMessage) -> Message:

async def update(self, uid: int, cached_msg: CachedMessage,
flag_set: FrozenSet[Flag], mode: FlagOp) -> Message:
self._mod_sequences.update([uid])
msg = await self.get(uid, cached_msg)
msg.permanent_flags = mode.apply(msg.permanent_flags, flag_set)
self._mod_sequences.update([uid])
self._updated.set()
return msg

async def delete(self, uids: Iterable[int]) -> None:
Expand All @@ -305,6 +314,7 @@ async def delete(self, uids: Iterable[int]) -> None:
except KeyError:
pass
self._mod_sequences.expunge(uids)
self._updated.set()

async def claim_recent(self, selected: SelectedMailbox) -> None:
uids: List[int] = []
Expand All @@ -315,6 +325,7 @@ async def claim_recent(self, selected: SelectedMailbox) -> None:
selected.session_flags.add_recent(msg_uid)
uids.append(msg_uid)
self._mod_sequences.update(uids)
self._updated.set()

async def cleanup(self) -> None:
pass
Expand Down
7 changes: 5 additions & 2 deletions pymap/backend/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Iterable, AsyncIterable
from typing_extensions import Protocol

from pymap.concurrent import Event
from pymap.flags import FlagOp
from pymap.interfaces.message import MessageT_co, CachedMessage
from pymap.listtree import ListTree
Expand Down Expand Up @@ -70,13 +71,15 @@ def selected_set(self) -> SelectedSet:
...

@abstractmethod
async def update_selected(self, selected: SelectedMailbox) \
-> SelectedMailbox:
async def update_selected(self, selected: SelectedMailbox, *,
wait_on: Event = None) -> SelectedMailbox:
"""Populates and returns the selected mailbox object with the state
needed to discover updates.
Args:
selected: the selected mailbox object.
wait_on: If given, block until this event signals or mailbox
activity occurs.
"""
...
Expand Down
3 changes: 2 additions & 1 deletion pymap/backend/maildir/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ def layout(self) -> str:
def parse_args(cls, args: Namespace) -> Mapping[str, Any]:
executor = ThreadPoolExecutor(args.concurrency)
subsystem = Subsystem.for_executor(executor)
return {'users_file': args.users_file,
return {**super().parse_args(args),
'users_file': args.users_file,
'base_dir': args.base_dir,
'layout': args.layout,
'subsystem': subsystem}
Expand Down
8 changes: 5 additions & 3 deletions pymap/backend/maildir/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
AsyncIterable
from typing_extensions import Final, Literal

from pymap.concurrent import ReadWriteLock
from pymap.concurrent import Event, ReadWriteLock
from pymap.context import subsystem
from pymap.exceptions import MailboxHasChildren, NotSupportedError
from pymap.flags import FlagOp
Expand Down Expand Up @@ -247,8 +247,10 @@ async def _get_maildir_msg(self, uid: int) \
maildir_msg = maildir.get_message_metadata(key)
return record, maildir_msg

async def update_selected(self, selected: SelectedMailbox) \
-> SelectedMailbox:
async def update_selected(self, selected: SelectedMailbox, *,
wait_on: Event = None) -> SelectedMailbox:
if wait_on is not None:
await wait_on.wait(timeout=1.0)
all_messages = [msg async for msg in self.messages()]
selected.set_messages(all_messages)
return selected
Expand Down
3 changes: 2 additions & 1 deletion pymap/backend/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ def _global_keys(self) -> GlobalKeys:

@classmethod
def parse_args(cls, args: Namespace) -> Mapping[str, Any]:
return {'address': args.address,
return {**super().parse_args(args),
'address': args.address,
'select': args.select,
'separator': args.separator.encode('utf-8'),
'prefix': args.prefix.encode('utf-8'),
Expand Down
14 changes: 12 additions & 2 deletions pymap/backend/redis/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from aioredis import Redis, ReplyError, MultiExecError # type: ignore

from pymap.bytes import HashStream
from pymap.concurrent import Event
from pymap.exceptions import MailboxNotFound, MailboxConflict, TemporaryFailure
from pymap.flags import FlagOp
from pymap.interfaces.message import CachedMessage
Expand Down Expand Up @@ -81,9 +82,11 @@ def _get_msg(self, uid: int, msg_raw: bytes) -> Message:
email_id=msg_email_id, thread_id=msg_thread_id,
redis=self._redis, ns_keys=self._ns_keys)

async def update_selected(self, selected: SelectedMailbox) \
-> SelectedMailbox:
async def update_selected(self, selected: SelectedMailbox, *,
wait_on: Event = None) -> SelectedMailbox:
last_mod_seq = selected.mod_sequence
if wait_on is not None:
await self._wait_updates(selected, last_mod_seq)
if last_mod_seq is None:
await self._load_initial(selected)
else:
Expand Down Expand Up @@ -255,6 +258,13 @@ async def _load_updates(self, selected: SelectedMailbox,
selected.mod_sequence = self._get_mod_seq(last_changes)
selected.add_updates(messages, expunged)

async def _wait_updates(self, selected: SelectedMailbox,
last_mod_seq: bytes) -> None:
keys = self._keys
redis = self._redis
await redis.xread([keys.changes], latest_ids=[last_mod_seq],
timeout=1000, count=1)


class MailboxSet(MailboxSetInterface[MailboxData]):
"""Implementation of :class:`~pymap.backend.mailbox.MailboxSetInterface`
Expand Down
12 changes: 1 addition & 11 deletions pymap/backend/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ async def append_messages(self, name: str,
if dest_selected:
dest_selected.session_flags.add_recent(msg.uid)
uids.append(msg.uid)
mbx.selected_set.updated.set()
return (AppendUid(mbx.uid_validity, uids),
await self._load_updates(selected, mbx))

Expand All @@ -205,10 +204,7 @@ async def check_mailbox(self, selected: SelectedMailbox, *,
mbx = await self._get_selected(selected)
if housekeeping:
await shield(mbx.cleanup())
if wait_on is not None:
either_event = wait_on.or_event(mbx.selected_set.updated)
await either_event.wait()
return await mbx.update_selected(selected)
return await mbx.update_selected(selected, wait_on=wait_on)

async def fetch_messages(self, selected: SelectedMailbox,
sequence_set: SequenceSet, set_seen: bool) \
Expand All @@ -223,8 +219,6 @@ async def fetch_messages(self, selected: SelectedMailbox,
msg = await mbx.get(cached_msg.uid, cached_msg)
if msg is not None:
ret.append((seq, msg))
if set_seen:
mbx.selected_set.updated.set()
return ret, await mbx.update_selected(selected)

async def search_mailbox(self, selected: SelectedMailbox,
Expand All @@ -251,7 +245,6 @@ async def expunge_mailbox(self, selected: SelectedMailbox,
uid_set = SequenceSet.all(uid=True)
expunge_uids = await mbx.find_deleted(uid_set, selected)
await mbx.delete(expunge_uids)
mbx.selected_set.updated.set()
return await mbx.update_selected(selected)

async def copy_messages(self, selected: SelectedMailbox,
Expand All @@ -271,7 +264,6 @@ async def copy_messages(self, selected: SelectedMailbox,
if dest_selected:
dest_selected.session_flags.add_recent(dest_uid)
uids.append((source_uid, dest_uid))
dest.selected_set.updated.set()
if not uids:
copy_uid: Optional[CopyUid] = None
else:
Expand All @@ -295,7 +287,6 @@ async def move_messages(self, selected: SelectedMailbox,
if dest_selected:
dest_selected.session_flags.add_recent(dest_uid)
uids.append((source_uid, dest_uid))
dest.selected_set.updated.set()
if not uids:
copy_uid: Optional[CopyUid] = None
else:
Expand All @@ -318,5 +309,4 @@ async def update_flags(self, selected: SelectedMailbox,
if not msg.expunged:
selected.session_flags.update(uid, flag_set, mode)
messages.append((seq, msg))
mbx.selected_set.updated.set()
return messages, await mbx.update_selected(selected)
28 changes: 19 additions & 9 deletions pymap/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
import os.path
import time
from abc import abstractmethod, ABCMeta
from asyncio import Event as _asyncio_Event, Lock as _asyncio_Lock
from concurrent.futures import Executor, ThreadPoolExecutor, TimeoutError
from asyncio import Event as _asyncio_Event, Lock as _asyncio_Lock, \
TimeoutError
from concurrent.futures import Executor, ThreadPoolExecutor
from contextlib import asynccontextmanager
from contextvars import copy_context, Context
from threading import local, Event as _threading_Event, Lock as _threading_Lock
Expand Down Expand Up @@ -230,8 +231,13 @@ def clear(self) -> None:
...

@abstractmethod
async def wait(self) -> None:
"""Wait until another thread signals the event."""
async def wait(self, *, timeout: float = None) -> None:
"""Wait until another thread signals the event.
Args:
timeout: Maximum time to wait, in seconds.
"""
...


Expand Down Expand Up @@ -320,7 +326,7 @@ class FileLock(ReadWriteLock): # pragma: no cover
The delay arguments are a sequence of floats used as the duration of
successive :func:`~asyncio.sleep` calls. If this sequence is exhausted
before a lock is established, :class:`TimeoutError` is thrown.
before a lock is established, :class:`~asyncio.TimeoutError` is thrown.
Args:
path: The path of the lock file.
Expand Down Expand Up @@ -439,8 +445,12 @@ def set(self) -> None:
def clear(self) -> None:
self._event.clear()

async def wait(self) -> None:
await self._event.wait()
async def wait(self, *, timeout: float = None) -> None:
task = asyncio.create_task(self._event.wait())
try:
await asyncio.wait_for(task, timeout)
except TimeoutError:
pass


class _ThreadingEvent(Event): # pragma: no cover
Expand Down Expand Up @@ -472,5 +482,5 @@ def set(self) -> None:
def clear(self) -> None:
self._event.clear()

async def wait(self) -> None:
self._event.wait()
async def wait(self, *, timeout: float = None) -> None:
self._event.wait(timeout=timeout)
4 changes: 0 additions & 4 deletions pymap/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ class IMAPConfig(metaclass=ABCMeta):
bad_command_limit: The number of consecutive commands received from
the client with parsing errors before the client is disconnected.
disable_idle: Disable the ``IDLE`` capability.
max_idle_wait: If given, the ``IDLE`` command will transparently cancel
and re-issue its request for updates every *N* seconds.
extra: Additional keywords used for special circumstances.
Attributes:
Expand All @@ -129,7 +127,6 @@ def __init__(self, args: Namespace, *,
bad_command_limit: Optional[int] = 5,
disable_search_keys: Iterable[bytes] = None,
disable_idle: bool = False,
max_idle_wait: float = None,
**extra: Any) -> None:
super().__init__()
self.args = args
Expand All @@ -139,7 +136,6 @@ def __init__(self, args: Namespace, *,
self.subsystem: Final = subsystem
self.bad_command_limit: Final = bad_command_limit
self.disable_search_keys: Final = disable_search_keys or []
self.max_idle_wait: Final = max_idle_wait
self.hash_context: Final = hash_context or \
get_hash(passlib_config=args.passlib_cfg)
self._ssl_context = ssl_context or self._load_certs(extra)
Expand Down
13 changes: 3 additions & 10 deletions pymap/imap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys
from argparse import ArgumentParser
from asyncio import shield, StreamReader, StreamWriter, AbstractServer, \
CancelledError
CancelledError, TimeoutError
from base64 import b64encode, b64decode
from contextlib import closing, AsyncExitStack
from typing import TypeVar, Union, Optional, Iterable, List, Awaitable
Expand Down Expand Up @@ -301,16 +301,9 @@ async def write_updates(self, untagged: Iterable[Response]) -> None:

async def handle_updates(self, state: ConnectionState, done: Event,
cmd: IdleCommand) -> None:
timeout = self.config.max_idle_wait
while not done.is_set():
receive_task = asyncio.create_task(
self._exec(state.receive_updates(cmd, done)))
try:
untagged = await asyncio.wait_for(receive_task, timeout)
except TimeoutError:
pass
else:
await shield(self.write_updates(untagged))
untagged = await self._exec(state.receive_updates(cmd, done))
await shield(self.write_updates(untagged))

async def idle(self, state: ConnectionState, cmd: IdleCommand) \
-> CommandResponse:
Expand Down
Loading

0 comments on commit c83845c

Please sign in to comment.