From 6cd4f21c5465be83b4503de83684bdff85f90b1f Mon Sep 17 00:00:00 2001 From: KOLANICH Date: Thu, 12 Dec 2019 12:51:59 +0300 Subject: [PATCH] Added support for memory mappings --- pgpy/pgp.py | 3 ++- pgpy/types.py | 31 ++++++++++++++++--------------- pgpy/utils.py | 23 +++++++++++++++++++++++ 3 files changed, 41 insertions(+), 16 deletions(-) create mode 100644 pgpy/utils.py diff --git a/pgpy/pgp.py b/pgpy/pgp.py index f34a25fb..4871b647 100644 --- a/pgpy/pgp.py +++ b/pgpy/pgp.py @@ -14,6 +14,7 @@ import itertools import operator import os +from mmap import mmap import re import warnings import weakref @@ -2424,7 +2425,7 @@ def verify(self, subject, signature=None): sspairs = [] # some type checking - if not isinstance(subject, (type(None), PGPMessage, PGPKey, PGPUID, PGPSignature, str, bytes, bytearray)): + if not isinstance(subject, (type(None), PGPMessage, PGPKey, PGPUID, PGPSignature, str, bytes, bytearray, mmap)): raise TypeError("Unexpected subject value: {:s}".format(str(type(subject)))) if not isinstance(signature, (type(None), PGPSignature)): raise TypeError("Unexpected signature value: {:s}".format(str(type(signature)))) diff --git a/pgpy/types.py b/pgpy/types.py index 187d3984..36c1c006 100644 --- a/pgpy/types.py +++ b/pgpy/types.py @@ -14,12 +14,16 @@ import warnings import weakref +from mmap import mmap + + from enum import EnumMeta from enum import IntEnum from .decorators import sdproperty from .errors import PGPError +from .utils import MMap __all__ = ['Armorable', 'ParentRef', @@ -81,7 +85,7 @@ def is_ascii(text): if isinstance(text, str): return bool(re.match(r'^[ -~\r\n\t]*$', text, flags=re.ASCII)) - if isinstance(text, (bytes, bytearray)): + if isinstance(text, (bytes, bytearray, mmap)): return bool(re.match(br'^[ -~\r\n\t]*$', text, flags=re.ASCII)) raise TypeError("Expected: ASCII input of type str, bytes, or bytearray") # pragma: no cover @@ -94,8 +98,8 @@ def is_armor(text): :raises: :py:exc:`TypeError` if ``text`` is not a ``str``, ``bytes``, or ``bytearray`` :returns: Whether the text is ASCII-armored. """ - if isinstance(text, (bytes, bytearray)): # pragma: no cover - text = text.decode('latin-1') + if isinstance(text, (bytes, bytearray, mmap)): # pragma: no cover + text = codecs.latin_1_decode(text)[0] return Armorable.__armor_regex.search(text) is not None @@ -115,8 +119,8 @@ def ascii_unarmor(text): m['body'] = bytearray(text) return m - if isinstance(text, (bytes, bytearray)): # pragma: no cover - text = text.decode('latin-1') + if isinstance(text, (bytes, bytearray, mmap)): # pragma: no cover + text = codecs.latin_1_decode(text)[0] m = Armorable.__armor_regex.search(text) @@ -176,12 +180,9 @@ def magic(self): @classmethod def from_file(cls, filename): - with open(filename, 'rb') as file: - obj = cls() - data = bytearray(os.path.getsize(filename)) - file.readinto(data) - - po = obj.parse(data) + obj = cls() + with MMap(filename) as data: + po = obj.parse(data) if po is not None: return (obj, po) @@ -191,7 +192,7 @@ def from_file(cls, filename): @classmethod def from_blob(cls, blob): obj = cls() - if (not isinstance(blob, bytes)) and (not isinstance(blob, bytearray)): + if (not isinstance(blob, bytes)) and (not isinstance(blob, (bytearray, mmap))): po = obj.parse(bytearray(blob, 'latin-1')) else: @@ -691,9 +692,9 @@ def __eq__(self, other): if isinstance(other, Fingerprint): return str(self) == str(other) - if isinstance(other, (str, bytes, bytearray)): - if isinstance(other, (bytes, bytearray)): # pragma: no cover - other = other.decode('latin-1') + if isinstance(other, (str, bytes, bytearray, mmap)): + if isinstance(other, (bytes, bytearray, mmap)): # pragma: no cover + other = codecs.latin_1_decode(other)[0] other = other.replace(' ', '') return any([str(self) == other, diff --git a/pgpy/utils.py b/pgpy/utils.py new file mode 100644 index 00000000..8d8385dd --- /dev/null +++ b/pgpy/utils.py @@ -0,0 +1,23 @@ +import mmap + +try: + from pathlib import Path +except ImportError: + from pathlib2 import Path + +class MMap: + __slots__ = ("path", "f", "m") + def __init__(self, path): + path = Path(path) + self.path = path + self.f = None + self.m = None + + def __enter__(self): + self.f = self.path.open("rb") + self.m = mmap.mmap(self.f.fileno(), 0, prot=mmap.PROT_READ) + return self.m + + def __exit__(self, *args, **kwargs): + self.m.close() + self.f.close()