Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize selected mailbox updates #31

Merged
merged 1 commit into from
Dec 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 85 additions & 6 deletions pymap/backend/dict/mailbox.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@

from bisect import bisect_left
from collections import OrderedDict
from typing import Tuple, Sequence, Dict, Optional, Iterable, AsyncIterable
from itertools import islice
from typing import Tuple, Sequence, Dict, Optional, Iterable, AsyncIterable, \
List, Set, AbstractSet

from pymap.concurrent import ReadWriteLock
from pymap.context import subsystem
Expand All @@ -16,6 +19,60 @@
__all__ = ['Message', 'MailboxData', 'MailboxSet']


class _ModSequenceMapping:

def __init__(self) -> None:
super().__init__()
self._highest = 0
self._uids: Dict[int, int] = {}
self._updates: Dict[int, Set[int]] = {}
self._expunges: Dict[int, Set[int]] = {}
self._mod_seqs_order: List[int] = []

@property
def highest(self) -> int:
return self._highest

def _set(self, uids: Iterable[int], data: Dict[int, Set[int]]) -> None:
self._highest = mod_seq = self._highest + 1
self._mod_seqs_order.append(mod_seq)
new_uid_set = data.setdefault(mod_seq, set())
new_uid_set.update(uids)
for uid in uids:
prev_mod_seq = self._uids.get(uid, None)
self._uids[uid] = mod_seq
if prev_mod_seq is not None:
uid_set = self._updates[prev_mod_seq]
uid_set.discard(uid)
if not uid_set:
del self._updates[prev_mod_seq]
self._mod_seqs_order.remove(prev_mod_seq)

def update(self, uids: Iterable[int]) -> None:
return self._set(uids, self._updates)

def expunge(self, uids: Iterable[int]) -> None:
return self._set(uids, self._expunges)

def find_updated(self, mod_seq: int) \
-> Tuple[AbstractSet[int], AbstractSet[int]]:
updates_ret: Set[int] = set()
expunges_ret: Set[int] = set()
updates = self._updates
expunges = self._expunges
mod_seqs_order = self._mod_seqs_order
mod_seqs_len = len(mod_seqs_order)
idx = bisect_left(mod_seqs_order, mod_seq, 0, mod_seqs_len)
for newer_mod_seq in islice(mod_seqs_order, idx, mod_seqs_len):
updates_set = updates.get(newer_mod_seq)
expunges_set = expunges.get(newer_mod_seq)
if updates_set is not None:
updates_ret.update(updates_set)
if expunges_set is not None:
expunges_ret.update(expunges_set)
return updates_ret, expunges_ret


class MailboxData(MailboxDataInterface[Message]):
"""Implementation of :class:`~pymap.backend.mailbox.MailboxDataInterface`
for the dict backend.
Expand All @@ -32,6 +89,7 @@ def __init__(self, name: str) -> None:
def _reset_messages(self) -> None:
self._uid_validity = MailboxSnapshot.new_uid_validity()
self._max_uid = 100
self._mod_sequences = _ModSequenceMapping()
self._messages: Dict[int, Message] = OrderedDict()

@property
Expand Down Expand Up @@ -62,12 +120,28 @@ def parse_message(self, append_msg: AppendMessage) -> Message:
return Message.parse(0, append_msg.message, append_msg.flag_set,
append_msg.when, recent=True)

async def update_selected(self, selected: SelectedMailbox) \
-> SelectedMailbox:
selected.uid_validity = self.uid_validity
mod_sequence = selected.mod_sequence
selected.mod_sequence = self._mod_sequences.highest
if mod_sequence is None:
all_messages = list(self._messages.values())
selected.add_updates(all_messages, [])
else:
updated, expunged = self._mod_sequences.find_updated(mod_sequence)
updated_messages = [self._messages[uid] for uid in updated
if uid in self._messages]
selected.add_updates(updated_messages, expunged)
return selected

async def add(self, message: Message, recent: bool = False) -> Message:
async with self.messages_lock.write_lock():
self._max_uid += 1
msg_copy = message.copy(self._max_uid)
self._max_uid = new_uid = self._max_uid + 1
msg_copy = message.copy(new_uid)
msg_copy.recent = recent
self._messages[msg_copy.uid] = msg_copy
self._messages[new_uid] = msg_copy
self._mod_sequences.update([new_uid])
return msg_copy

async def get(self, uid: int, cached_msg: CachedMessage = None,
Expand All @@ -90,15 +164,20 @@ async def delete(self, uids: Iterable[int]) -> None:
del self._messages[uid]
except KeyError:
pass
self._mod_sequences.expunge(uids)

async def claim_recent(self, selected: SelectedMailbox) -> None:
uids: List[int] = []
async for msg in self.messages():
if msg.recent:
msg.recent = False
selected.session_flags.add_recent(msg.uid)
msg_uid = msg.uid
selected.session_flags.add_recent(msg_uid)
uids.append(msg_uid)
self._mod_sequences.update(uids)

async def save_flags(self, messages: Iterable[Message]) -> None:
pass
self._mod_sequences.update(msg.uid for msg in messages)

async def cleanup(self) -> None:
pass
Expand Down
18 changes: 14 additions & 4 deletions pymap/backend/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ def parse_message(self, append_msg: AppendMessage) -> MessageT:
"""
...

@abstractmethod
async def update_selected(self, selected: SelectedMailbox) \
-> SelectedMailbox:
"""Populates and returns the selected mailbox object with the state
needed to discover updates.

Args:
selected: the selected mailbox object.

"""
...

@abstractmethod
async def add(self, message: MessageT, recent: bool = False) -> MessageT:
"""Adds a new message to the end of the mailbox, returning a copy of
Expand Down Expand Up @@ -203,10 +215,8 @@ async def find(self, seq_set: SequenceSet, selected: SelectedMailbox,
requirement: The data required from each message.

"""
mbx_uids = frozenset({uid async for uid in self.uids()})
for seq, uid in selected.iter_set(seq_set, mbx_uids):
cached_msg = selected.get_message(uid)
msg = await self.get(uid, cached_msg, requirement)
for seq, cached_msg in selected.messages.get_all(seq_set):
msg = await self.get(cached_msg.uid, cached_msg, requirement)
if msg is not None:
yield (seq, msg)

Expand Down
7 changes: 7 additions & 0 deletions pymap/backend/maildir/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ def parse_message(self, append_msg: AppendMessage) -> Message:
append_msg.when, recent=True,
maildir_flags=self.maildir_flags)

async def update_selected(self, selected: SelectedMailbox) \
-> SelectedMailbox:
selected.uid_validity = self.uid_validity
all_messages = [msg async for msg in self.messages()]
selected.set_messages(all_messages)
return selected

async def add(self, message: Message, recent: bool = False) -> 'Message':
async with self.messages_lock.write_lock():
maildir_msg = message.maildir_msg
Expand Down
25 changes: 8 additions & 17 deletions pymap/backend/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,6 @@ def mailbox_set(self) \
-> MailboxSetInterface[MailboxDataInterface[MessageT]]:
...

@classmethod
async def _load_selected(cls, selected: SelectedMailbox,
mbx: MailboxDataInterface[MessageT]) \
-> SelectedMailbox:
selected.uid_validity = mbx.uid_validity
selected.next_uid = mbx.next_uid
selected.set_messages([msg async for msg in mbx.messages()])
return selected

async def _load_updates(self, selected: Optional[SelectedMailbox],
mbx: Optional[MailboxDataInterface[MessageT]]) \
-> Optional[SelectedMailbox]:
Expand All @@ -61,7 +52,7 @@ async def _load_updates(self, selected: Optional[SelectedMailbox],
except MailboxNotFound:
selected.set_deleted()
return selected
return await self._load_selected(selected, mbx)
return await mbx.update_selected(selected)
return selected

@classmethod
Expand Down Expand Up @@ -155,7 +146,7 @@ async def select_mailbox(self, name: str, readonly: bool = False) \
if not selected.readonly:
await mbx.claim_recent(selected)
snapshot = await mbx.snapshot()
return snapshot, await self._load_selected(selected, mbx)
return snapshot, await mbx.update_selected(selected)

async def check_mailbox(self, selected: SelectedMailbox, *,
wait_on: Event = None,
Expand All @@ -166,7 +157,7 @@ async def check_mailbox(self, selected: SelectedMailbox, *,
if wait_on is not None:
either_event = wait_on.or_event(mbx.selected_set.updated)
await either_event.wait()
return await self._load_selected(selected, mbx)
return await mbx.update_selected(selected)

async def fetch_messages(self, selected: SelectedMailbox,
sequence_set: SequenceSet,
Expand All @@ -183,7 +174,7 @@ async def fetch_messages(self, selected: SelectedMailbox,
msg.update_flags(seen_set, FlagOp.ADD)
await mbx.save_flags(msg for _, msg in ret)
mbx.selected_set.updated.set()
return ret, await self._load_selected(selected, mbx)
return ret, await mbx.update_selected(selected)

async def search_mailbox(self, selected: SelectedMailbox,
keys: FrozenSet[SearchKey]) \
Expand All @@ -197,7 +188,7 @@ async def search_mailbox(self, selected: SelectedMailbox,
async for seq, msg in mbx.find(search.sequence_set, selected, req):
if search.matches(seq, msg):
ret.append((seq, msg))
return ret, await self._load_selected(selected, mbx)
return ret, await mbx.update_selected(selected)

async def expunge_mailbox(self, selected: SelectedMailbox,
uid_set: SequenceSet = None) -> SelectedMailbox:
Expand All @@ -212,7 +203,7 @@ async def expunge_mailbox(self, selected: SelectedMailbox,
expunge_uids.append(msg.uid)
await mbx.delete(expunge_uids)
mbx.selected_set.updated.set()
return await self._load_selected(selected, mbx)
return await mbx.update_selected(selected)

async def copy_messages(self, selected: SelectedMailbox,
sequence_set: SequenceSet,
Expand All @@ -234,7 +225,7 @@ async def copy_messages(self, selected: SelectedMailbox,
uids.append((source_uid, msg.uid))
dest.selected_set.updated.set()
return (CopyUid(dest.uid_validity, uids),
await self._load_selected(selected, mbx))
await mbx.update_selected(selected))

async def update_flags(self, selected: SelectedMailbox,
sequence_set: SequenceSet,
Expand All @@ -253,4 +244,4 @@ async def update_flags(self, selected: SelectedMailbox,
messages.append((msg_seq, msg))
await mbx.save_flags(msg for _, msg in messages)
mbx.selected_set.updated.set()
return messages, await self._load_selected(selected, mbx)
return messages, await mbx.update_selected(selected)
6 changes: 3 additions & 3 deletions pymap/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def subsystem(self) -> str:
...

@abstractmethod
async def execute(self, future: Awaitable[RetT]) -> RetT:
def execute(self, future: Awaitable[RetT]) -> Awaitable[RetT]:
"""Executes the future and returns its result in the subsystem. For
:mod:`asyncio`, this simply means ``return await future``. For
:mod:`threading`, it uses
Expand Down Expand Up @@ -98,8 +98,8 @@ class _AsyncioSubsystem(Subsystem):
def subsystem(self) -> str:
return 'asyncio'

async def execute(self, future: Awaitable[RetT]) -> RetT:
return await future
def execute(self, future: Awaitable[RetT]) -> Awaitable[RetT]:
return future

def new_rwlock(self) -> '_AsyncioReadWriteLock':
return _AsyncioReadWriteLock()
Expand Down
24 changes: 15 additions & 9 deletions pymap/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

__all__ = ['FlagOp', 'PermanentFlags', 'SessionFlags']

_recent_set = frozenset({Recent})


class FlagOp(enum.Enum):
"""Types of operations when updating flags."""
Expand Down Expand Up @@ -56,7 +58,7 @@ class PermanentFlags:

def __init__(self, defined: Iterable[Flag]) -> None:
super().__init__()
self._defined = frozenset(defined) - {Recent}
self._defined = frozenset(defined) - _recent_set

@property
def defined(self) -> FrozenSet[Flag]:
Expand Down Expand Up @@ -99,11 +101,9 @@ class SessionFlags:

__slots__ = ['_defined', '_flags', '_recent']

_recent_set = frozenset({Recent})

def __init__(self, defined: Iterable[Flag]):
super().__init__()
self._defined = frozenset(defined) - self._recent_set
self._defined = frozenset(defined) - _recent_set
self._flags: Dict[int, FrozenSet[Flag]] = {}
self._recent: Set[int] = set()

Expand Down Expand Up @@ -142,19 +142,20 @@ def get(self, uid: int) -> FrozenSet[Flag]:
uid: The message UID value.

"""
recent = self._recent_set if uid in self._recent else frozenset()
recent = _recent_set if uid in self._recent else frozenset()
flags = self._flags.get(uid)
return recent if flags is None else (flags | recent)

def remove(self, uid: int) -> None:
def remove(self, uids: Iterable[int]) -> None:
"""Remove any session flags for the given message.

Args:
uid: The message UID value.
uids: The message UID values.

"""
self._recent.discard(uid)
self._flags.pop(uid, None)
for uid in uids:
self._recent.discard(uid)
self._flags.pop(uid, None)

def update(self, uid: int, flag_set: Iterable[Flag],
op: FlagOp = FlagOp.REPLACE) -> FrozenSet[Flag]:
Expand Down Expand Up @@ -183,6 +184,11 @@ def add_recent(self, uid: int) -> None:
"""
self._recent.add(uid)

@property
def recent(self) -> int:
"""The number of messages with the ``\\Recent`` flag."""
return len(self._recent)

@property
def recent_uids(self) -> AbstractSet[int]:
"""The message UIDs with the ``\\Recent`` flag."""
Expand Down
8 changes: 6 additions & 2 deletions pymap/interfaces/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
from ..parsing.response.fetch import EnvelopeStructure, BodyStructure
from ..parsing.specials import Flag

__all__ = ['CachedMessage', 'MessageInterface', 'MessageT']
__all__ = ['CachedMessage', 'MessageInterface', 'MessageT', 'FlagsKey']

#: Type variable with an upper bound of :class:`MessageInterface`.
MessageT = TypeVar('MessageT', bound='MessageInterface')

#: Type alias for the value used as a key in set comparisons detecting flag
#: updates.
FlagsKey = Tuple[int, FrozenSet[Flag]]


class CachedMessage(Protocol):
"""Cached message metadata used to track state changes. Used to produce
Expand Down Expand Up @@ -51,7 +55,7 @@ def get_flags(self, session_flags: SessionFlags = None) -> FrozenSet[Flag]:

@property
@abstractmethod
def flags_key(self) -> Tuple[int, FrozenSet[Flag]]:
def flags_key(self) -> FlagsKey:
"""Hashable value that represents the current flags of this
message, used for detecting mailbox updates.

Expand Down
7 changes: 3 additions & 4 deletions pymap/interfaces/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,9 @@ async def append_messages(self, name: str,
@abstractmethod
async def select_mailbox(self, name: str, readonly: bool = False) \
-> Tuple[MailboxInterface, SelectedMailbox]:
"""Selects a :class:`~pymap.interfaces.mailbox.MailboxInterface` object
corresponding to an existing mailbox owned by the user. The returned
session is then used as the ``selected`` argument to other methods to
fetch mailbox updates.
"""Selects an existing mailbox owned by the user. The returned session
is then used as the ``selected`` argument to other methods to fetch
mailbox updates.

See Also:
`RFC 3501 6.3.1.
Expand Down
Loading