Skip to content

Commit

Permalink
BaseMailBox.box -> client, BaseMailBox.xoauth2 consistency, folder/id…
Browse files Browse the repository at this point in the history
…le inst in __init__
  • Loading branch information
ikvk committed Mar 14, 2022
1 parent d509806 commit 3c83377
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 52 deletions.
13 changes: 5 additions & 8 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ BaseMailBox.idle - `idle manager <#idle-workflow>`_

BaseMailBox.numbers - search mailbox for matching message numbers in current folder, returns [str]

BaseMailBox.box - imaplib.IMAP4/IMAP4_SSL client instance.
BaseMailBox.client - imaplib.IMAP4/IMAP4_SSL client instance.

Email attributes
^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -128,20 +128,17 @@ MailMessage and MailAttachment public attributes are cached by functools.lru_cac
Search criteria
^^^^^^^^^^^^^^^

This chapter about "criteria" and "charset" arguments of MailBox methods: fetch, uids, numbers

You can use 3 approaches to build search criteria:
You can use 3 arg types for "criteria" argument of MailBox methods: fetch, uids, numbers:

.. code-block:: python
from imap_tools import AND, OR, NOT
from imap_tools import AND
mailbox.fetch(AND(subject='weather')) # query, the str-like object
mailbox.fetch('TEXT "hello"') # str
mailbox.fetch(b'TEXT "\xd1\x8f"') # bytes, *charset arg is ignored
The "charset" is argument used for encode criteria to this encoding.
You can pass the criteria as bytes in the desired encoding - in this case, the encoding will be ignored.
Use "charset" argument for encode criteria to the desired encoding. If "criteria" is bytes - encoding will be ignored.

.. code-block:: python
Expand Down Expand Up @@ -257,7 +254,7 @@ use 'limit' argument for fetch in this case.
Actions with folders
^^^^^^^^^^^^^^^^^^^^

BaseMailBox.login has initial_folder arg, that is "INBOX" by default, use None for not set folder on login.
BaseMailBox.login/xoauth2 has initial_folder arg, that is "INBOX" by default, use None for not set folder on login.

.. code-block:: python
Expand Down
6 changes: 6 additions & 0 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
0.53.0
======
* [Breaking] BaseMailBox.box client instance renamed to BaseMailBox.client
* Fixed BaseMailBox.xoauth2 consistency with BaseMailBox.login
* BaseMailBox.folder/idle managers now instantiates in __init__

0.52.0
======
* [Breaking] STARTTLS logic moved to MailBoxTls
Expand Down
2 changes: 1 addition & 1 deletion imap_tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
from .utils import EmailAddress
from .errors import *

__version__ = '0.52.0'
__version__ = '0.53.0'
18 changes: 9 additions & 9 deletions imap_tools/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, mailbox):

def set(self, folder: AnyStr, readonly: bool = False) -> tuple:
"""Select current folder"""
result = self.mailbox.box.select(encode_folder(folder), readonly)
result = self.mailbox.client.select(encode_folder(folder), readonly)
check_command_status(result, MailboxFolderSelectError)
self._current_folder = folder
return result
Expand All @@ -54,7 +54,7 @@ def create(self, folder: AnyStr) -> tuple:
Create folder on the server.
Use email box delimiter to separate folders. Example for "|" delimiter: "folder|sub folder"
"""
result = self.mailbox.box._simple_command('CREATE', encode_folder(folder))
result = self.mailbox.client._simple_command('CREATE', encode_folder(folder))
check_command_status(result, MailboxFolderCreateError)
return result

Expand All @@ -69,14 +69,14 @@ def get(self) -> Optional[str]:

def rename(self, old_name: AnyStr, new_name: AnyStr) -> tuple:
"""Rename folder from old_name to new_name"""
result = self.mailbox.box._simple_command(
result = self.mailbox.client._simple_command(
'RENAME', encode_folder(old_name), encode_folder(new_name))
check_command_status(result, MailboxFolderRenameError)
return result

def delete(self, folder: AnyStr) -> tuple:
"""Delete folder"""
result = self.mailbox.box._simple_command('DELETE', encode_folder(folder))
result = self.mailbox.client._simple_command('DELETE', encode_folder(folder))
check_command_status(result, MailboxFolderDeleteError)
return result

Expand All @@ -96,10 +96,10 @@ def status(self, folder: Optional[AnyStr] = None, options: Optional[Iterable[str
for opt in options:
if opt not in MailBoxFolderStatusOptions.all:
raise MailboxFolderStatusValueError(str(opt))
status_result = self.mailbox.box._simple_command(
status_result = self.mailbox.client._simple_command(
command, encode_folder(folder), '({})'.format(' '.join(options)))
check_command_status(status_result, MailboxFolderStatusError)
result = self.mailbox.box._untagged_response(status_result[0], status_result[1], command)
result = self.mailbox.client._untagged_response(status_result[0], status_result[1], command)
check_command_status(result, MailboxFolderStatusError)
status_data = [i for i in result[1] if type(i) is bytes][0] # may contain tuples with encoded names
values = status_data.decode().split('(')[1].split(')')[0].split(' ')
Expand All @@ -117,9 +117,9 @@ def list(self, folder: AnyStr = '', search_args: str = '*', subscribed_only: boo
"""
folder_item_re = re.compile(r'\((?P<flags>[\S ]*)\) (?P<delim>[\S]+) (?P<name>.+)')
command = 'LSUB' if subscribed_only else 'LIST'
typ, data = self.mailbox.box._simple_command(
typ, data = self.mailbox.client._simple_command(
command, encode_folder(folder), encode_folder(search_args))
typ, data = self.mailbox.box._untagged_response(typ, data, command)
typ, data = self.mailbox.client._untagged_response(typ, data, command)
result = []
for folder_item in data:
if not folder_item:
Expand Down Expand Up @@ -150,7 +150,7 @@ def list(self, folder: AnyStr = '', search_args: str = '*', subscribed_only: boo

def subscribe(self, folder: AnyStr, value: bool) -> tuple:
"""subscribe/unsubscribe to folder"""
method = self.mailbox.box.subscribe if value else self.mailbox.box.unsubscribe
method = self.mailbox.client.subscribe if value else self.mailbox.client.unsubscribe
result = method(encode_folder(folder))
check_command_status(result, MailboxFolderSubscribeError)
return result
10 changes: 5 additions & 5 deletions imap_tools/idle.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ def __init__(self, mailbox):

def start(self):
"""Switch on mailbox IDLE mode"""
self._idle_tag = self.mailbox.box._command('IDLE') # b'KLIG3'
result = self.mailbox.box._get_response()
self._idle_tag = self.mailbox.client._command('IDLE') # b'KLIG3'
result = self.mailbox.client._get_response()
check_command_status((result, 'IDLE start'), MailboxTaggedResponseError, expected=None)
return result

def stop(self):
"""Switch off mailbox IDLE mode"""
self.mailbox.box.send(b"DONE\r\n")
self.mailbox.client.send(b"DONE\r\n")
return self.mailbox.consume_until_tagged_response(self._idle_tag)

def poll(self, timeout: Optional[float]) -> List[bytes]:
Expand All @@ -79,7 +79,7 @@ def poll(self, timeout: Optional[float]) -> List[bytes]:
'rfc2177 are advised to terminate the IDLE '
'and re-issue it at least every 29 minutes to avoid being logged off.'
)
sock = self.mailbox.box.sock
sock = self.mailbox.client.sock
old_timeout = sock.gettimeout()
# make socket non-blocking so the timeout can be implemented for this call
sock.settimeout(None)
Expand All @@ -90,7 +90,7 @@ def poll(self, timeout: Optional[float]) -> List[bytes]:
if events:
while True:
try:
line = self.mailbox.box._get_line()
line = self.mailbox.client._get_line()
except (socket.timeout, socket.error):
break
except imaplib.IMAP4.abort: # noqa
Expand Down
56 changes: 27 additions & 29 deletions imap_tools/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ class BaseMailBox:
idle_manager_class = IdleManager

def __init__(self):
self.folder = None # folder manager
self.idle = None # idle manager
self.client = self._get_mailbox_client()
self.folder = self.folder_manager_class(self)
self.idle = self.idle_manager_class(self)
self.login_result = None
self.box = self._get_mailbox_client()

def __enter__(self):
return self
Expand All @@ -48,10 +48,10 @@ def _get_mailbox_client(self) -> imaplib.IMAP4:

def consume_until_tagged_response(self, tag: bytes):
"""Waiting for tagged response"""
tagged_commands = self.box.tagged_commands
tagged_commands = self.client.tagged_commands
response_set = []
while True:
response: bytes = self.box._get_response() # noqa, example: b'IJDH3 OK IDLE Terminated'
response: bytes = self.client._get_response() # noqa, example: b'IJDH3 OK IDLE Terminated'
if tagged_commands[tag]:
break
response_set.append(response)
Expand All @@ -60,18 +60,26 @@ def consume_until_tagged_response(self, tag: bytes):
return result, response_set

def login(self, username: str, password: str, initial_folder: Optional[str] = 'INBOX') -> 'BaseMailBox':
login_result = self.box._simple_command('LOGIN', username, self.box._quote(password)) # noqa
login_result = self.client._simple_command('LOGIN', username, self.client._quote(password)) # noqa
check_command_status(login_result, MailboxLoginError)
self.box.state = 'AUTH' # logic from self.box.login
self.folder = self.folder_manager_class(self)
self.idle = self.idle_manager_class(self)
self.client.state = 'AUTH' # logic from self.client.login
if initial_folder is not None:
self.folder.set(initial_folder)
self.login_result = login_result
return self # return self in favor of context manager

def xoauth2(self, username: str, access_token: str, initial_folder: Optional[str] = 'INBOX') -> 'BaseMailBox':
"""Authenticate to account using OAuth 2.0 mechanism"""
auth_string = 'user={}\1auth=Bearer {}\1\1'.format(username, access_token)
result = self.client.authenticate('XOAUTH2', lambda x: auth_string) # noqa
check_command_status(result, MailboxLoginError)
if initial_folder is not None:
self.folder.set(initial_folder)
self.login_result = result
return self

def logout(self) -> tuple:
result = self.box.logout()
result = self.client.logout()
check_command_status(result, MailboxLogoutError, expected='BYE')
return result

Expand All @@ -83,7 +91,7 @@ def numbers(self, criteria: Criteria = 'ALL', charset: str = 'US-ASCII') -> List
:return email message numbers
"""
encoded_criteria = criteria if type(criteria) is bytes else str(criteria).encode(charset)
search_result = self.box.search(charset, encoded_criteria)
search_result = self.client.search(charset, encoded_criteria)
check_command_status(search_result, MailboxNumbersError)
return search_result[1][0].decode().split() if search_result[1][0] else []

Expand All @@ -98,7 +106,7 @@ def uids(self, criteria: Criteria = 'ALL', charset: str = 'US-ASCII', miss_no_ui
nums = self.numbers(criteria, charset)
if not nums:
return []
fetch_result = self.box.fetch(','.join(nums), "(UID)")
fetch_result = self.client.fetch(','.join(nums), "(UID)")
check_command_status(fetch_result, MailboxUidsError)
result = []
for fetch_item in fetch_result[1]:
Expand All @@ -112,14 +120,14 @@ def uids(self, criteria: Criteria = 'ALL', charset: str = 'US-ASCII', miss_no_ui

def _fetch_by_one(self, message_nums: Sequence[str], message_parts: str, reverse: bool) -> Iterable[list]: # noqa
for message_num in message_nums:
fetch_result = self.box.fetch(message_num, message_parts)
fetch_result = self.client.fetch(message_num, message_parts)
check_command_status(fetch_result, MailboxFetchError)
yield fetch_result[1]

def _fetch_in_bulk(self, message_nums: Sequence[str], message_parts: str, reverse: bool) -> Iterable[list]:
if not message_nums:
return
fetch_result = self.box.fetch(','.join(message_nums), message_parts)
fetch_result = self.client.fetch(','.join(message_nums), message_parts)
check_command_status(fetch_result, MailboxFetchError)
for built_fetch_item in chunks((reversed if reverse else iter)(fetch_result[1]), 2):
yield built_fetch_item
Expand Down Expand Up @@ -153,7 +161,7 @@ def fetch(self, criteria: Criteria = 'ALL', charset: str = 'US-ASCII', limit: Op
yield mail_message

def expunge(self) -> tuple:
result = self.box.expunge()
result = self.client.expunge()
check_command_status(result, MailboxExpungeError)
return result

Expand All @@ -166,7 +174,7 @@ def delete(self, uid_list: Union[str, Iterable[str]]) -> Optional[Tuple[tuple, t
uid_str = clean_uids(uid_list)
if not uid_str:
return None
store_result = self.box.uid('STORE', uid_str, '+FLAGS', r'(\Deleted)')
store_result = self.client.uid('STORE', uid_str, '+FLAGS', r'(\Deleted)')
check_command_status(store_result, MailboxDeleteError)
expunge_result = self.expunge()
return store_result, expunge_result
Expand All @@ -180,7 +188,7 @@ def copy(self, uid_list: Union[str, Iterable[str]], destination_folder: AnyStr)
uid_str = clean_uids(uid_list)
if not uid_str:
return None
copy_result = self.box.uid('COPY', uid_str, encode_folder(destination_folder)) # noqa
copy_result = self.client.uid('COPY', uid_str, encode_folder(destination_folder)) # noqa
check_command_status(copy_result, MailboxCopyError)
return copy_result

Expand Down Expand Up @@ -208,7 +216,7 @@ def flag(self, uid_list: Union[str, Iterable[str]], flag_set: Union[str, Iterabl
uid_str = clean_uids(uid_list)
if not uid_str:
return None
store_result = self.box.uid(
store_result = self.client.uid(
'STORE', uid_str, ('+' if value else '-') + 'FLAGS',
'({})'.format(' '.join(clean_flags(flag_set))))
check_command_status(store_result, MailboxFlagError)
Expand All @@ -232,7 +240,7 @@ def append(self, message: Union[MailMessage, bytes],
else:
timezone = datetime.datetime.now().astimezone().tzinfo # system timezone
cleaned_flags = clean_flags(flag_set or [])
typ, dat = self.box.append(
typ, dat = self.client.append(
encode_folder(folder), # noqa
'({})'.format(' '.join(cleaned_flags)) if cleaned_flags else None,
dt or datetime.datetime.now(timezone), # noqa
Expand All @@ -242,16 +250,6 @@ def append(self, message: Union[MailMessage, bytes],
check_command_status(append_result, MailboxAppendError)
return append_result

def xoauth2(self, username: str, access_token: str, initial_folder: str = 'INBOX') -> 'BaseMailBox':
"""Authenticate to account using OAuth 2.0 mechanism"""
auth_string = 'user={}\1auth=Bearer {}\1\1'.format(username, access_token)
result = self.box.authenticate('XOAUTH2', lambda x: auth_string) # noqa
check_command_status(result, MailboxLoginError)
self.folder = self.folder_manager_class(self)
self.folder.set(initial_folder)
self.login_result = result
return self


class MailBoxUnencrypted(BaseMailBox):
"""Working with the email box through IMAP4"""
Expand Down

0 comments on commit 3c83377

Please sign in to comment.