Skip to content

Commit

Permalink
Added support for memory mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
KOLANICH committed Nov 24, 2022
1 parent a0e56ff commit 6cd4f21
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 16 deletions.
3 changes: 2 additions & 1 deletion pgpy/pgp.py
Expand Up @@ -14,6 +14,7 @@
import itertools
import operator
import os
from mmap import mmap
import re
import warnings
import weakref
Expand Down Expand Up @@ -2424,7 +2425,7 @@ def verify(self, subject, signature=None):
sspairs = []

# some type checking
if not isinstance(subject, (type(None), PGPMessage, PGPKey, PGPUID, PGPSignature, str, bytes, bytearray)):
if not isinstance(subject, (type(None), PGPMessage, PGPKey, PGPUID, PGPSignature, str, bytes, bytearray, mmap)):
raise TypeError("Unexpected subject value: {:s}".format(str(type(subject))))
if not isinstance(signature, (type(None), PGPSignature)):
raise TypeError("Unexpected signature value: {:s}".format(str(type(signature))))
Expand Down
31 changes: 16 additions & 15 deletions pgpy/types.py
Expand Up @@ -14,12 +14,16 @@
import warnings
import weakref

from mmap import mmap


from enum import EnumMeta
from enum import IntEnum

from .decorators import sdproperty

from .errors import PGPError
from .utils import MMap

__all__ = ['Armorable',
'ParentRef',
Expand Down Expand Up @@ -81,7 +85,7 @@ def is_ascii(text):
if isinstance(text, str):
return bool(re.match(r'^[ -~\r\n\t]*$', text, flags=re.ASCII))

if isinstance(text, (bytes, bytearray)):
if isinstance(text, (bytes, bytearray, mmap)):
return bool(re.match(br'^[ -~\r\n\t]*$', text, flags=re.ASCII))

raise TypeError("Expected: ASCII input of type str, bytes, or bytearray") # pragma: no cover
Expand All @@ -94,8 +98,8 @@ def is_armor(text):
:raises: :py:exc:`TypeError` if ``text`` is not a ``str``, ``bytes``, or ``bytearray``
:returns: Whether the text is ASCII-armored.
"""
if isinstance(text, (bytes, bytearray)): # pragma: no cover
text = text.decode('latin-1')
if isinstance(text, (bytes, bytearray, mmap)): # pragma: no cover
text = codecs.latin_1_decode(text)[0]

return Armorable.__armor_regex.search(text) is not None

Expand All @@ -115,8 +119,8 @@ def ascii_unarmor(text):
m['body'] = bytearray(text)
return m

if isinstance(text, (bytes, bytearray)): # pragma: no cover
text = text.decode('latin-1')
if isinstance(text, (bytes, bytearray, mmap)): # pragma: no cover
text = codecs.latin_1_decode(text)[0]

m = Armorable.__armor_regex.search(text)

Expand Down Expand Up @@ -176,12 +180,9 @@ def magic(self):

@classmethod
def from_file(cls, filename):
with open(filename, 'rb') as file:
obj = cls()
data = bytearray(os.path.getsize(filename))
file.readinto(data)

po = obj.parse(data)
obj = cls()
with MMap(filename) as data:
po = obj.parse(data)

if po is not None:
return (obj, po)
Expand All @@ -191,7 +192,7 @@ def from_file(cls, filename):
@classmethod
def from_blob(cls, blob):
obj = cls()
if (not isinstance(blob, bytes)) and (not isinstance(blob, bytearray)):
if (not isinstance(blob, bytes)) and (not isinstance(blob, (bytearray, mmap))):
po = obj.parse(bytearray(blob, 'latin-1'))

else:
Expand Down Expand Up @@ -691,9 +692,9 @@ def __eq__(self, other):
if isinstance(other, Fingerprint):
return str(self) == str(other)

if isinstance(other, (str, bytes, bytearray)):
if isinstance(other, (bytes, bytearray)): # pragma: no cover
other = other.decode('latin-1')
if isinstance(other, (str, bytes, bytearray, mmap)):
if isinstance(other, (bytes, bytearray, mmap)): # pragma: no cover
other = codecs.latin_1_decode(other)[0]

other = other.replace(' ', '')
return any([str(self) == other,
Expand Down
23 changes: 23 additions & 0 deletions pgpy/utils.py
@@ -0,0 +1,23 @@
import mmap

try:
from pathlib import Path
except ImportError:
from pathlib2 import Path

class MMap:
__slots__ = ("path", "f", "m")
def __init__(self, path):
path = Path(path)
self.path = path
self.f = None
self.m = None

def __enter__(self):
self.f = self.path.open("rb")
self.m = mmap.mmap(self.f.fileno(), 0, prot=mmap.PROT_READ)
return self.m

def __exit__(self, *args, **kwargs):
self.m.close()
self.f.close()

0 comments on commit 6cd4f21

Please sign in to comment.