Skip to content

Commit

Permalink
Added support for memory mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
KOLANICH committed Sep 21, 2020
1 parent 1e90d70 commit df6b6e6
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 16 deletions.
3 changes: 2 additions & 1 deletion pgpy/pgp.py
Expand Up @@ -14,6 +14,7 @@
import itertools
import operator
import os
from mmap import mmap
import re
import warnings
import weakref
Expand Down Expand Up @@ -2353,7 +2354,7 @@ def verify(self, subject, signature=None):
sspairs = []

# some type checking
if not isinstance(subject, (type(None), PGPMessage, PGPKey, PGPUID, PGPSignature, six.string_types, bytes, bytearray)):
if not isinstance(subject, (type(None), PGPMessage, PGPKey, PGPUID, PGPSignature, six.string_types, bytes, bytearray, mmap)):
raise TypeError("Unexpected subject value: {:s}".format(str(type(subject))))
if not isinstance(signature, (type(None), PGPSignature)):
raise TypeError("Unexpected signature value: {:s}".format(str(type(signature))))
Expand Down
31 changes: 16 additions & 15 deletions pgpy/types.py
Expand Up @@ -14,6 +14,9 @@
import warnings
import weakref

from mmap import mmap


from enum import EnumMeta
from enum import IntEnum

Expand All @@ -22,6 +25,7 @@
from .decorators import sdproperty

from .errors import PGPError
from .utils import MMap

__all__ = ['Armorable',
'ParentRef',
Expand Down Expand Up @@ -86,7 +90,7 @@ def is_ascii(text):
if isinstance(text, six.string_types):
return bool(re.match(r'^[ -~\r\n]*$', text, flags=re.ASCII))

if isinstance(text, (bytes, bytearray)):
if isinstance(text, (bytes, bytearray, mmap)):
return bool(re.match(br'^[ -~\r\n]*$', text, flags=re.ASCII))

raise TypeError("Expected: ASCII input of type str, bytes, or bytearray") # pragma: no cover
Expand All @@ -99,8 +103,8 @@ def is_armor(text):
:raises: :py:exc:`TypeError` if ``text`` is not a ``str``, ``bytes``, or ``bytearray``
:returns: Whether the text is ASCII-armored.
"""
if isinstance(text, (bytes, bytearray)): # pragma: no cover
text = text.decode('latin-1')
if isinstance(text, (bytes, bytearray, mmap)): # pragma: no cover
text = codecs.latin_1_decode(text)[0]

return Armorable.__armor_regex.search(text) is not None

Expand All @@ -120,8 +124,8 @@ def ascii_unarmor(text):
m['body'] = bytearray(text)
return m

if isinstance(text, (bytes, bytearray)): # pragma: no cover
text = text.decode('latin-1')
if isinstance(text, (bytes, bytearray, mmap)): # pragma: no cover
text = codecs.latin_1_decode(text)[0]

m = Armorable.__armor_regex.search(text)

Expand Down Expand Up @@ -181,12 +185,9 @@ def magic(self):

@classmethod
def from_file(cls, filename):
with open(filename, 'rb') as file:
obj = cls()
data = bytearray(os.path.getsize(filename))
file.readinto(data)

po = obj.parse(data)
obj = cls()
with MMap(filename) as data:
po = obj.parse(data)

if po is not None:
return (obj, po)
Expand All @@ -196,7 +197,7 @@ def from_file(cls, filename):
@classmethod
def from_blob(cls, blob):
obj = cls()
if (not isinstance(blob, six.binary_type)) and (not isinstance(blob, bytearray)):
if (not isinstance(blob, six.binary_type)) and (not isinstance(blob, (bytearray, mmap))):
po = obj.parse(bytearray(blob, 'latin-1'))

else:
Expand Down Expand Up @@ -701,9 +702,9 @@ def __eq__(self, other):
if isinstance(other, Fingerprint):
return str(self) == str(other)

if isinstance(other, (six.text_type, bytes, bytearray)):
if isinstance(other, (bytes, bytearray)): # pragma: no cover
other = other.decode('latin-1')
if isinstance(other, (six.text_type, bytes, bytearray, mmap)):
if isinstance(other, (bytes, bytearray, mmap)): # pragma: no cover
other = codecs.latin_1_decode(other)[0]

other = str(other).replace(' ', '')
return any([self.replace(' ', '') == other,
Expand Down
23 changes: 23 additions & 0 deletions pgpy/utils.py
@@ -0,0 +1,23 @@
import mmap

try:
from pathlib import Path
except ImportError:
from pathlib2 import Path

class MMap:
__slots__ = ("path", "f", "m")
def __init__(self, path):
path = Path(path)
self.path = path
self.f = None
self.m = None

def __enter__(self):
self.f = self.path.open("rb")
self.m = mmap.mmap(self.f.fileno(), 0, prot=mmap.PROT_READ)
return self.m

def __exit__(self, *args, **kwargs):
self.m.close()
self.f.close()

0 comments on commit df6b6e6

Please sign in to comment.