From 06357d2b4379e431dd88bf25064f656e39fa89e9 Mon Sep 17 00:00:00 2001 From: Kunal Mehta Date: Fri, 29 Sep 2023 16:05:24 -0400 Subject: [PATCH] WIP: pretty_bad_protocol: Remove unused code This is a very basic first pass, removing code that is clearly unused. --- securedrop/pretty_bad_protocol/_meta.py | 3 - securedrop/pretty_bad_protocol/_parsers.py | 179 --------- securedrop/pretty_bad_protocol/_trust.py | 45 --- securedrop/pretty_bad_protocol/_util.py | 29 -- securedrop/pretty_bad_protocol/gnupg.py | 445 +-------------------- securedrop/tests/test_encryption.py | 10 +- 6 files changed, 8 insertions(+), 703 deletions(-) diff --git a/securedrop/pretty_bad_protocol/_meta.py b/securedrop/pretty_bad_protocol/_meta.py index 368f37e854..a69189d6dc 100644 --- a/securedrop/pretty_bad_protocol/_meta.py +++ b/securedrop/pretty_bad_protocol/_meta.py @@ -125,10 +125,7 @@ class GPGBase: "import": _parsers.ImportResult, "export": _parsers.ExportResult, "list": _parsers.ListKeys, - "sign": _parsers.Sign, "verify": _parsers.Verify, - "expire": _parsers.KeyExpirationResult, - "signing": _parsers.KeySigningResult, "packets": _parsers.ListPackets, } diff --git a/securedrop/pretty_bad_protocol/_parsers.py b/securedrop/pretty_bad_protocol/_parsers.py index e5ff49b921..868d644abb 100644 --- a/securedrop/pretty_bad_protocol/_parsers.py +++ b/securedrop/pretty_bad_protocol/_parsers.py @@ -876,105 +876,6 @@ def progress(status_code): # type: ignore[no-untyped-def] return value -class KeyExpirationInterface: - """Interface that guards against misuse of --edit-key combined with --command-fd""" - - def __init__(self, expiration_time, passphrase=None): # type: ignore[no-untyped-def] - self._passphrase = passphrase - self._expiration_time = expiration_time - self._clean_key_expiration_option() - - def _clean_key_expiration_option(self): # type: ignore[no-untyped-def] - """validates the expiration option supplied""" - allowed_entry = re.findall(r"^(\d+)(|w|m|y)$", self._expiration_time) - if not allowed_entry: - raise UsageError("Key expiration option: %s is not valid" % self._expiration_time) - - def _input_passphrase(self, _input): # type: ignore[no-untyped-def] - if self._passphrase: - return f"{_input}{self._passphrase}\n" - return _input - - def _main_key_command(self): # type: ignore[no-untyped-def] - main_key_input = "expire\n%s\n" % self._expiration_time - return self._input_passphrase(main_key_input) - - def _sub_key_command(self, sub_key_number): # type: ignore[no-untyped-def] - sub_key_input = "key %d\nexpire\n%s\n" % (sub_key_number, self._expiration_time) - return self._input_passphrase(sub_key_input) - - def gpg_interactive_input(self, sub_keys_number): # type: ignore[no-untyped-def] - """processes series of inputs normally supplied on --edit-key but passed through stdin - this ensures that no other --edit-key command is actually passing through. - """ - deselect_sub_key = "key 0\n" - - _input = self._main_key_command() - for sub_key_number in range(1, sub_keys_number + 1): - _input += self._sub_key_command(sub_key_number) + deselect_sub_key - return "%ssave\n" % _input - - -class KeyExpirationResult: - """Handle status messages for key expiry - It does not really have a job, but just to conform to the API - """ - - def __init__(self, gpg): # type: ignore[no-untyped-def] - self._gpg = gpg - self.status = "ok" - - def _handle_status(self, key, value): # type: ignore[no-untyped-def] - """Parse a status code from the attached GnuPG process. - - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. - """ - if key in ( - "USERID_HINT", - "NEED_PASSPHRASE", - "GET_HIDDEN", - "SIGEXPIRED", - "KEYEXPIRED", - "GOOD_PASSPHRASE", - "GOT_IT", - "GET_LINE", - ): - pass - elif key in ("BAD_PASSPHRASE", "MISSING_PASSPHRASE"): - self.status = key.replace("_", " ").lower() - else: - self.status = "failed" - raise ValueError("Unknown status message: %r" % key) - - -class KeySigningResult: - """Handle status messages for key singing""" - - def __init__(self, gpg): # type: ignore[no-untyped-def] - self._gpg = gpg - self.status = "ok" - - def _handle_status(self, key, value): # type: ignore[no-untyped-def] - """Parse a status code from the attached GnuPG process. - - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. - """ - if key in ( - "USERID_HINT", - "NEED_PASSPHRASE", - "ALREADY_SIGNED", - "GOOD_PASSPHRASE", - "GOT_IT", - "GET_BOOL", - ): - pass - elif key in ("BAD_PASSPHRASE", "MISSING_PASSPHRASE"): - self.status = "{}: {}".format(key.replace("_", " ").lower(), value) - else: - self.status = "failed" - raise ValueError(f"Key signing, unknown status message: {key!r} ::{value}") - - class GenKey: """Handle status messages for key generation. @@ -1089,86 +990,6 @@ def _handle_status(self, key, value): # type: ignore[no-untyped-def] raise ValueError("Unknown status message: %r" % key) -class Sign: - """Parse GnuPG status messages for signing operations. - - :param gpg: An instance of :class:`gnupg.GPG`. - """ - - #: The type of signature created. - sig_type = None - #: The algorithm used to create the signature. - sig_algo = None - #: The hash algorithm used to create the signature. - sig_hash_also = None - #: The fingerprint of the signing keyid. - fingerprint = None - #: The timestamp on the signature. - timestamp = None - #: xxx fill me in - what = None - status = None - - def __init__(self, gpg): # type: ignore[no-untyped-def] - self._gpg = gpg - - def __nonzero__(self): # type: ignore[no-untyped-def] - """Override the determination for truthfulness evaluation. - - :rtype: bool - :returns: True if we have a valid signature, False otherwise. - """ - return self.fingerprint is not None - - __bool__ = __nonzero__ - - def __str__(self): # type: ignore[no-untyped-def] - return self.data.decode(self._gpg._encoding, self._gpg._decode_errors) - - def _handle_status(self, key, value): # type: ignore[no-untyped-def] - """Parse a status code from the attached GnuPG process. - - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. - """ - if key in ( - "USERID_HINT", - "NEED_PASSPHRASE", - "BAD_PASSPHRASE", - "GOOD_PASSPHRASE", - "MISSING_PASSPHRASE", - "PINENTRY_LAUNCHED", - "BEGIN_SIGNING", - "CARDCTRL", - "INV_SGNR", - "SIGEXPIRED", - "KEY_CONSIDERED", - ): - self.status = key.replace("_", " ").lower() - elif key == "SIG_CREATED": - ( - self.sig_type, - self.sig_algo, - self.sig_hash_algo, - self.what, - self.timestamp, - self.fingerprint, - ) = value.split() - elif key == "KEYEXPIRED": - self.status = "skipped signing key, key expired" - if (value is not None) and (len(value) > 0): - self.status += f" on {str(value)}" - elif key == "KEYREVOKED": - self.status = "skipped signing key, key revoked" - if (value is not None) and (len(value) > 0): - self.status += f" on {str(value)}" - elif key == "NODATA": - self.status = nodata(value) - elif key == "PROGRESS": - self.status = progress(value.split(" ", 1)[0]) - else: - raise ValueError("Unknown status message: %r" % key) - - class ListKeys(list): """Handle status messages for --list-keys. diff --git a/securedrop/pretty_bad_protocol/_trust.py b/securedrop/pretty_bad_protocol/_trust.py index 03eef6a232..f87407390f 100644 --- a/securedrop/pretty_bad_protocol/_trust.py +++ b/securedrop/pretty_bad_protocol/_trust.py @@ -42,51 +42,6 @@ def _create_trustdb(cls): # type: ignore[no-untyped-def] cls.fix_trustdb(trustdb) -def export_ownertrust(cls, trustdb=None): # type: ignore[no-untyped-def] - """Export ownertrust to a trustdb file. - - If there is already a file named :file:`trustdb.gpg` in the current GnuPG - homedir, it will be renamed to :file:`trustdb.gpg.bak`. - - :param string trustdb: The path to the trustdb.gpg file. If not given, - defaults to ``'trustdb.gpg'`` in the current GnuPG - homedir. - """ - if trustdb is None: - trustdb = os.path.join(cls.homedir, "trustdb.gpg") - - try: - os.rename(trustdb, trustdb + ".bak") - except OSError as err: - log.debug(str(err)) - - export_proc = cls._open_subprocess(["--export-ownertrust"]) - tdb = open(trustdb, "wb") - _util._threaded_copy_data(export_proc.stdout, tdb) - export_proc.wait() - - -def import_ownertrust(cls, trustdb=None): # type: ignore[no-untyped-def] - """Import ownertrust from a trustdb file. - - :param str trustdb: The path to the trustdb.gpg file. If not given, - defaults to :file:`trustdb.gpg` in the current GnuPG - homedir. - """ - if trustdb is None: - trustdb = os.path.join(cls.homedir, "trustdb.gpg") - - import_proc = cls._open_subprocess(["--import-ownertrust"]) - - try: - tdb = open(trustdb, "rb") - except OSError: - log.error("trustdb file %s does not exist!" % trustdb) - - _util._threaded_copy_data(tdb, import_proc.stdin) - import_proc.wait() - - def fix_trustdb(cls, trustdb=None): # type: ignore[no-untyped-def] """Attempt to repair a broken trustdb.gpg file. diff --git a/securedrop/pretty_bad_protocol/_util.py b/securedrop/pretty_bad_protocol/_util.py index f8e49438a8..5867e16e95 100644 --- a/securedrop/pretty_bad_protocol/_util.py +++ b/securedrop/pretty_bad_protocol/_util.py @@ -18,9 +18,7 @@ """Extra utilities for python-gnupg.""" -import io import os -import re import threading from datetime import datetime from io import BytesIO @@ -28,16 +26,7 @@ from . import _logger -# These are all the classes which are stream-like; they are used in -# :func:`_is_stream`. -_STREAMLIKE_TYPES = [io.IOBase] - - # Directory shortcuts: -# we don't want to use this one because it writes to the install dir: -# _here = getabsfile(currentframe()).rsplit(os.path.sep, 1)[0] -_here = os.path.join(os.getcwd(), "pretty_bad_protocol") # current dir -_test = os.path.join(os.path.join(_here, "test"), "tmp") # ./tests/tmp _user = os.environ.get("HOME") # $HOME # Fix for Issue #74: we shouldn't expect that a $HOME directory is set in all @@ -54,20 +43,12 @@ # that. Otherwise, we'll use the current directory + /gnupghome. _user = os.path.sep.join([_user, "gnupghome"]) -_ugpg = os.path.join(_user, ".gnupg") # $HOME/.gnupg _conf = os.path.join(os.path.join(_user, ".config"), "python-gnupg") # $HOME/.config/python-gnupg # Logger is disabled by default log = _logger.create_logger(0) -#: Compiled regex for determining a GnuPG binary's version: -_VERSION_STRING_REGEX = re.compile(r"(\d)(\.)(\d)(\.)(\d+)") - - -class GnuPGVersionError(ValueError): - """Raised when we couldn't parse GnuPG's version info.""" - def _copy_data(instream, outstream): # type: ignore[no-untyped-def] """Copy data from one stream to another. @@ -315,16 +296,6 @@ def _is_file(filename): # type: ignore[no-untyped-def] return False -def _is_stream(input): # type: ignore[no-untyped-def] - """Check that the input is a byte stream. - - :param input: An object provided for reading from or writing to. - :rtype: bool - :returns: True if :param:input is a stream, False if otherwise. - """ - return isinstance(input, tuple(_STREAMLIKE_TYPES)) - - def _is_list_or_tuple(instance): # type: ignore[no-untyped-def] """Check that ``instance`` is a list or tuple. diff --git a/securedrop/pretty_bad_protocol/gnupg.py b/securedrop/pretty_bad_protocol/gnupg.py index c16d2fd413..2466a194dd 100644 --- a/securedrop/pretty_bad_protocol/gnupg.py +++ b/securedrop/pretty_bad_protocol/gnupg.py @@ -29,7 +29,6 @@ import encodings import functools import os -import re import tempfile import textwrap import time @@ -38,8 +37,8 @@ #: see :pep:`328` http://docs.python.org/2.5/whatsnew/pep-328.html from . import _trust, _util from ._meta import GPGBase -from ._parsers import KeyExpirationInterface, _fix_unsafe -from ._util import _is_list_or_tuple, _is_stream, _make_binary_stream, log +from ._parsers import _fix_unsafe +from ._util import _is_list_or_tuple, _make_binary_stream, log class GPG(GPGBase): @@ -190,126 +189,7 @@ def fix_trustdb(self, trustdb=None): # type: ignore[no-untyped-def] # For backward compatibility with python-gnupg<=1.3.1: _fix_trustdb = fix_trustdb - @functools.wraps(_trust.import_ownertrust) - def import_ownertrust(self, trustdb=None): # type: ignore[no-untyped-def] - _trust.import_ownertrust(self) - - # For backward compatibility with python-gnupg<=1.3.1: - _import_ownertrust = import_ownertrust - - @functools.wraps(_trust.export_ownertrust) - def export_ownertrust(self, trustdb=None): # type: ignore[no-untyped-def] - _trust.export_ownertrust(self) - - # For backward compatibility with python-gnupg<=1.3.1: - _export_ownertrust = export_ownertrust - - def sign(self, data, **kwargs): # type: ignore[no-untyped-def] - """Create a signature for a message string or file. - - Note that this method is not for signing other keys. (In GnuPG's - terms, what we all usually call 'keysigning' is actually termed - 'certification'...) Even though they are cryptographically the same - operation, GnuPG differentiates between them, presumedly because these - operations are also the same as the decryption operation. If the - ``key_usage``s ``C (certification)``, ``S (sign)``, and ``E - (encrypt)``, were all the same key, the key would "wear down" through - frequent signing usage -- since signing data is usually done often -- - meaning that the secret portion of the keypair, also used for - decryption in this scenario, would have a statistically higher - probability of an adversary obtaining an oracle for it (or for a - portion of the rounds in the cipher algorithm, depending on the family - of cryptanalytic attack used). - - In simpler terms: this function isn't for signing your friends' keys, - it's for something like signing an email. - - :type data: :obj:`str` or :obj:`file` - :param data: A string or file stream to sign. - :param str default_key: The key to sign with. - :param str passphrase: The passphrase to pipe to stdin. - :param bool clearsign: If True, create a cleartext signature. - :param bool detach: If True, create a detached signature. - :param bool binary: If True, do not ascii armour the output. - :param str digest_algo: The hash digest to use. Again, to see which - hashes your GnuPG is capable of using, do: - :command:`$ gpg --with-colons --list-config digestname`. - The default, if unspecified, is ``'SHA512'``. - """ - if "default_key" in kwargs: - log.info("Signing message '{!r}' with keyid: {}".format(data, kwargs["default_key"])) - else: - log.warn("No 'default_key' given! Using first key on secring.") - - if hasattr(data, "read"): - result = self._sign_file(data, **kwargs) - elif not _is_stream(data): - stream = _make_binary_stream(data, self._encoding) - result = self._sign_file(stream, **kwargs) - stream.close() - else: - log.warn(f"Unable to sign message '{data}' with type {type(data)}") - result = None - return result - - def verify(self, data): # type: ignore[no-untyped-def] - """Verify the signature on the contents of the string ``data``. - - >>> gpg = GPG(homedir="doctests") - >>> input = gpg.gen_key_input(Passphrase='foo') - >>> key = gpg.gen_key(input) - >>> assert key - >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='bar') - >>> assert not sig - >>> sig = gpg.sign('hello',keyid=key.fingerprint,passphrase='foo') - >>> assert sig - >>> verify = gpg.verify(sig.data) - >>> assert verify - - """ - f = _make_binary_stream(data, self._encoding) - result = self.verify_file(f) - f.close() - return result - - def verify_file(self, file, sig_file=None): # type: ignore[no-untyped-def] - """Verify the signature on the contents of a file or file-like - object. Can handle embedded signatures as well as detached - signatures. If using detached signatures, the file containing the - detached signature should be specified as the ``sig_file``. - - :param file file: A file descriptor object. - - :param str sig_file: A file containing the GPG signature data for - ``file``. If given, ``file`` is verified via this detached - signature. Its type will be checked with :func:`_util._is_file`. - """ - - result = self._result_map["verify"](self) - - if sig_file is None: - log.debug("verify_file(): Handling embedded signature") - args = ["--verify"] - proc = self._open_subprocess(args) - writer = _util._threaded_copy_data(file, proc.stdin) - self._collect_output(proc, result, writer, stdin=proc.stdin) - else: - if not _util._is_file(sig_file): - log.debug("verify_file(): '%r' is not a file" % sig_file) - return result - log.debug("verify_file(): Handling detached verification") - sig_fh = None - try: - sig_fh = open(sig_file, "rb") - args = ["--verify %s -" % sig_fh.name] - proc = self._open_subprocess(args) - writer = _util._threaded_copy_data(file, proc.stdin) - self._collect_output(proc, result, writer, stdin=proc.stdin) - finally: - if sig_fh and not sig_fh.closed: - sig_fh.close() - return result - + # TODO: this is only used by tests def import_keys(self, key_data): # type: ignore[no-untyped-def] """ Import the key_data into our keyring. @@ -356,24 +236,6 @@ def import_keys(self, key_data): # type: ignore[no-untyped-def] data.close() return result - def recv_keys(self, *keyids, **kwargs): # type: ignore[no-untyped-def] - """Import keys from a keyserver. - - >>> gpg = gnupg.GPG(homedir="doctests") - >>> key = gpg.recv_keys('3FF0DB166A7476EA', keyserver='hkp://pgp.mit.edu') - >>> assert key - - :param str keyids: Each ``keyids`` argument should be a string - containing a keyid to request. - :param str keyserver: The keyserver to request the ``keyids`` from; - defaults to `gnupg.GPG.keyserver`. - """ - if keyids: - keys = " ".join([key for key in keyids]) - return self._recv_keys(keys, **kwargs) - else: - log.error("No keyids requested for --recv-keys!") - def delete_keys(self, fingerprints, secret=False, subkeys=False): # type: ignore[no-untyped-def] # noqa """Delete a key, or list of keys, from the current keyring. @@ -492,87 +354,6 @@ def list_packets(self, raw_data): # type: ignore[no-untyped-def] self._handle_io(args, _make_binary_stream(raw_data, self._encoding), result) return result - def sign_key(self, keyid, default_key=None, passphrase=None): # type: ignore[no-untyped-def] # noqa - """sign (an imported) public key - keyid, with default secret key - - >>> import gnupg - >>> gpg = gnupg.GPG(homedir="doctests") - >>> key_input = gpg.gen_key_input() - >>> key = gpg.gen_key(key_input) - >>> gpg.sign_key(key['fingerprint']) - >>> gpg.list_sigs(key['fingerprint']) - - :param str keyid: key shortID, longID, fingerprint or email_address - :param str passphrase: passphrase used when creating the key, leave None otherwise - - :returns: The result giving status of the key signing... - success can be verified by gpg.list_sigs(keyid) - """ - - args = [] - input_command = "" - if passphrase: - passphrase_arg = "--passphrase-fd 0" - input_command = "%s\n" % passphrase - args.append(passphrase_arg) - - if default_key: - args.append(str("--default-key %s" % default_key)) - - args.extend(["--command-fd 0", "--sign-key %s" % keyid]) - - p = self._open_subprocess(args) - result = self._result_map["signing"](self) - confirm_command = "%sy\n" % input_command - p.stdin.write(confirm_command.encode()) - self._collect_output(p, result, stdin=p.stdin) - return result - - def list_sigs(self, *keyids): # type: ignore[no-untyped-def] - """Get the signatures for each of the ``keyids``. - - >>> import gnupg - >>> gpg = gnupg.GPG(homedir="doctests") - >>> key_input = gpg.gen_key_input() - >>> key = gpg.gen_key(key_input) - >>> assert key.fingerprint - - :rtype: dict - :returns: res.sigs is a dictionary whose keys are the uids and whose - values are a set of signature keyids. - """ - return self._process_keys(keyids) - - def check_sigs(self, *keyids): # type: ignore[no-untyped-def] - """Validate the signatures for each of the ``keyids``. - - :rtype: dict - :returns: res.certs is a dictionary whose keys are the uids and whose - values are a set of signature keyids. - """ - return self._process_keys(keyids, check_sig=True) - - def _process_keys(self, keyids, check_sig=False): # type: ignore[no-untyped-def] - - if len(keyids) > self._batch_limit: - raise ValueError( - "List signatures is limited to %d keyids simultaneously" % self._batch_limit - ) - - args = ["--with-colons", "--fixed-list-mode"] - arg = "--check-sigs" if check_sig else "--list-sigs" - - if len(keyids): - arg += " " + " ".join(keyids) - - args.append(arg) - - proc = self._open_subprocess(args) - result = self._result_map["list"](self) - self._collect_output(proc, result, stdin=proc.stdin) - self._parse_keys(result) - return result - def _parse_keys(self, result): # type: ignore[no-untyped-def] lines = result.data.decode(self._encoding, self._decode_errors).splitlines() valid_keywords = "pub uid sec fpr sub sig rev".split() @@ -589,48 +370,6 @@ def _parse_keys(self, result): # type: ignore[no-untyped-def] if keyword in valid_keywords: getattr(result, keyword)(L) - def expire(self, keyid, expiration_time="1y", passphrase=None, expire_subkeys=True): # type: ignore[no-untyped-def] # noqa - """Changes GnuPG key expiration by passing in new time period (from now) through - subprocess's stdin - - >>> import gnupg - >>> gpg = gnupg.GPG(homedir="doctests") - >>> key_input = gpg.gen_key_input() - >>> key = gpg.gen_key(key_input) - >>> gpg.expire(key.fingerprint, '2w', 'good passphrase') - - :param str keyid: key shortID, longID, email_address or fingerprint - :param str expiration_time: 0 or number of days (d), or weeks (*w) , or months (*m) - or years (*y) for when to expire the key, from today. - :param str passphrase: passphrase used when creating the key, leave None otherwise - :param bool expire_subkeys: to indicate whether the subkeys will also change the - expiration time by the same period -- default is True - - :returns: The result giving status of the change in expiration... - the new expiration date can be obtained by .list_keys() - """ - - passphrase = passphrase.encode(self._encoding) if passphrase else passphrase - - try: - sub_keys_number = len(self.list_sigs(keyid)[0]["subkeys"]) if expire_subkeys else 0 - except IndexError: - sub_keys_number = 0 - - expiration_input = KeyExpirationInterface( - expiration_time, passphrase - ).gpg_interactive_input(sub_keys_number) - - args = ["--command-fd 0", "--edit-key %s" % keyid] - p = self._open_subprocess(args) - p.stdin.write(expiration_input.encode()) - - result = self._result_map["expire"](self) - p.stdin.write(expiration_input.encode()) - - self._collect_output(p, result, stdin=p.stdin) - return result - def gen_key(self, input): # type: ignore[no-untyped-def] """Generate a GnuPG key through batch file key generation. See :meth:`GPG.gen_key_input()` for creating the control input. @@ -975,113 +714,6 @@ def gen_key_input(self, separate_keyring=False, save_batchfile=False, testing=Fa return out - def encrypt(self, data, *recipients, **kwargs): # type: ignore[no-untyped-def] - """Encrypt the message contained in ``data`` to ``recipients``. - - :param str data: The file or bytestream to encrypt. - - :param str recipients: The recipients to encrypt to. Recipients must - be specified keyID/fingerprint. Care should be taken in Python2.x - to make sure that the given fingerprint is in fact a string and - not a unicode object. Multiple recipients may be specified by - doing ``GPG.encrypt(data, fpr1, fpr2, fpr3)`` etc. - - :param str default_key: The keyID/fingerprint of the key to use for - signing. If given, ``data`` will be encrypted and signed. - - :param str passphrase: If given, and ``default_key`` is also given, - use this passphrase to unlock the secret portion of the - ``default_key`` to sign the encrypted ``data``. Otherwise, if - ``default_key`` is not given, but ``symmetric=True``, then use - this passphrase as the passphrase for symmetric - encryption. Signing and symmetric encryption should *not* be - combined when sending the ``data`` to other recipients, else the - passphrase to the secret key would be shared with them. - - :param bool armor: If True, ascii armor the output; otherwise, the - output will be in binary format. (Default: True) - - :param bool encrypt: If True, encrypt the ``data`` using the - ``recipients`` public keys. (Default: True) - - :param bool symmetric: If True, encrypt the ``data`` to ``recipients`` - using a symmetric key. See the ``passphrase`` parameter. Symmetric - encryption and public key encryption can be used simultaneously, - and will result in a ciphertext which is decryptable with either - the symmetric ``passphrase`` or one of the corresponding private - keys. - - :param bool always_trust: If True, ignore trust warnings on recipient - keys. If False, display trust warnings. (default: True) - - :param str output: The output file to write to. If not specified, the - encrypted output is returned, and thus should be stored as an - object in Python. For example: - - >>> import shutil - >>> import gnupg - >>> if os.path.exists("doctests"): - ... shutil.rmtree("doctests") - >>> gpg = gnupg.GPG(homedir="doctests") - >>> key_settings = gpg.gen_key_input(key_type='RSA', - ... key_length=1024, - ... key_usage='ESCA', - ... passphrase='foo') - >>> key = gpg.gen_key(key_settings) - >>> message = "The crow flies at midnight." - >>> encrypted = str(gpg.encrypt(message, key.fingerprint)) - >>> assert encrypted != message - >>> assert not encrypted.isspace() - >>> decrypted = str(gpg.decrypt(encrypted, passphrase='foo')) - >>> assert not decrypted.isspace() - >>> decrypted - 'The crow flies at midnight.' - - - :param bool throw_keyids: If True, make all **recipients** keyids be - zero'd out in packet information. This is the same as using - **hidden_recipients** for all **recipients**. (Default: False). - - :param list hidden_recipients: A list of recipients that should have - their keyids zero'd out in packet information. - - :param str cipher_algo: The cipher algorithm to use. To see available - algorithms with your version of GnuPG, do: - :command:`$ gpg --with-colons --list-config ciphername`. - The default ``cipher_algo``, if unspecified, is ``'AES256'``. - - :param str digest_algo: The hash digest to use. Again, to see which - hashes your GnuPG is capable of using, do: - :command:`$ gpg --with-colons --list-config digestname`. - The default, if unspecified, is ``'SHA512'``. - - :param str compress_algo: The compression algorithm to use. Can be one - of ``'ZLIB'``, ``'BZIP2'``, ``'ZIP'``, or ``'Uncompressed'``. - - .. seealso:: :meth:`._encrypt` - """ - if _is_stream(data): - stream = data - else: - stream = _make_binary_stream(data, self._encoding) - result = self._encrypt(stream, recipients, **kwargs) - stream.close() - return result - - def decrypt(self, message, **kwargs): # type: ignore[no-untyped-def] - """Decrypt the contents of a string or file-like object ``message``. - - :type message: file or str or :class:`io.BytesIO` - :param message: A string or file-like object to decrypt. - :param bool always_trust: Instruct GnuPG to ignore trust checks. - :param str passphrase: The passphrase for the secret key used for decryption. - :param str output: A filename to write the decrypted output to. - """ - stream = _make_binary_stream(message, self._encoding) - result = self.decrypt_file(stream, **kwargs) - stream.close() - return result - def decrypt_file(self, filename, always_trust=False, passphrase=None, output=None): # type: ignore[no-untyped-def] # noqa """Decrypt the contents of a file-like object ``filename`` . @@ -1101,74 +733,3 @@ def decrypt_file(self, filename, always_trust=False, passphrase=None, output=Non self._handle_io(args, filename, result, passphrase, binary=True) log.debug("decrypt result: %r", result.data) return result - - -class GPGUtilities: - """Extra tools for working with GnuPG.""" - - def __init__(self, gpg): # type: ignore[no-untyped-def] - """Initialise extra utility functions.""" - self._gpg = gpg - - def find_key_by_email(self, email, secret=False): # type: ignore[no-untyped-def] - """Find user's key based on their email address. - - :param str email: The email address to search for. - :param bool secret: If True, search through secret keyring. - """ - for key in self.list_keys(secret=secret): - for uid in key["uids"]: - if re.search(email, uid): - return key - raise LookupError("GnuPG public key for email %s not found!" % email) - - def find_key_by_subkey(self, subkey): # type: ignore[no-untyped-def] - """Find a key by a fingerprint of one of its subkeys. - - :param str subkey: The fingerprint of the subkey to search for. - """ - for key in self.list_keys(): - for sub in key["subkeys"]: - if sub[0] == subkey: - return key - raise LookupError("GnuPG public key for subkey %s not found!" % subkey) - - def send_keys(self, keyserver, *keyids): # type: ignore[no-untyped-def] - """Send keys to a keyserver.""" - result = self._result_map["list"](self) - log.debug("send_keys: %r", keyids) - data = _util._make_binary_stream("", self._encoding) - args = ["--keyserver", keyserver, "--send-keys"] - args.extend(keyids) - self._handle_io(args, data, result, binary=True) - log.debug("send_keys result: %r", result.__dict__) - data.close() - return result - - def encrypted_to(self, raw_data): # type: ignore[no-untyped-def] - """Return the key to which raw_data is encrypted to.""" - # TODO: make this support multiple keys. - result = self._gpg.list_packets(raw_data) - if not result.key: - raise LookupError("Content is not encrypted to a GnuPG key!") - try: - return self.find_key_by_keyid(result.key) - except: # noqa: E722 - return self.find_key_by_subkey(result.key) - - def is_encrypted_sym(self, raw_data): # type: ignore[no-untyped-def] - result = self._gpg.list_packets(raw_data) - return bool(result.need_passphrase_sym) - - def is_encrypted_asym(self, raw_data): # type: ignore[no-untyped-def] - result = self._gpg.list_packets(raw_data) - return bool(result.key) - - def is_encrypted(self, raw_data): # type: ignore[no-untyped-def] - return self.is_encrypted_asym(raw_data) or self.is_encrypted_sym(raw_data) - - -if __name__ == "__main__": - from .test import test_gnupg - - test_gnupg.main() diff --git a/securedrop/tests/test_encryption.py b/securedrop/tests/test_encryption.py index 8dc6fde524..6291075076 100644 --- a/securedrop/tests/test_encryption.py +++ b/securedrop/tests/test_encryption.py @@ -243,11 +243,11 @@ def test_gpg_encrypt_and_decrypt_journalist_reply( ) # Amd the reply can't be decrypted without providing the source1's gpg secret - result = encryption_mgr._gpg.decrypt( - # For GPG 2.1+, a non-null passphrase _must_ be passed to decrypt() - encrypted_reply, - passphrase="test 123", - ) + with encrypted_reply_path.open("rb") as fobj: + result = encryption_mgr._gpg.decrypt_file( + fobj, + passphrase="test 123", + ) assert not result.ok # And the journalist is able to decrypt their reply