Skip to content

Commit

Permalink
Merge pull request #312 from prebuilder/checks
Browse files Browse the repository at this point in the history
  • Loading branch information
Commod0re committed Nov 11, 2022
2 parents b5cc2f3 + b8c28c5 commit 002e1d5
Show file tree
Hide file tree
Showing 4 changed files with 3,075 additions and 32 deletions.
104 changes: 102 additions & 2 deletions pgpy/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
import os
import time
import zlib
import warnings

from collections import namedtuple
from enum import Enum
from enum import IntEnum
from enum import IntFlag
from enum import EnumMeta

from pyasn1.type.univ import ObjectIdentifier

import six
Expand All @@ -19,7 +23,6 @@
from cryptography.hazmat.primitives.ciphers import algorithms

from .decorators import classproperty
from .types import FlagEnum
from ._curves import BrainpoolP256R1, BrainpoolP384R1, BrainpoolP512R1, X25519, Ed25519

__all__ = ['Backend',
Expand All @@ -35,19 +38,41 @@
'SignatureType',
'KeyServerPreferences',
'S2KGNUExtension',
'SecurityIssues',
'String2KeyType',
'TrustLevel',
'KeyFlags',
'Features',
'FlagEnumMeta',
'RevocationKeyClass',
'NotationDataFlags',
'TrustFlags']
'TrustFlags',
'check_assymetric_algo_and_its_parameters',
'is_hash_considered_secure']


# this is 50 KiB
_hashtunedata = bytearray([10, 11, 12, 13, 14, 15, 16, 17] * 128 * 50)


class FlagEnumMeta(EnumMeta):
def __and__(self, other):
return { f for f in iter(self) if f.value & other }

def __rand__(self, other): # pragma: no cover
return self & other


if six.PY2:
class FlagEnum(IntEnum):
__metaclass__ = FlagEnumMeta

else:
namespace = FlagEnumMeta.__prepare__('FlagEnum', (IntEnum,))
FlagEnum = FlagEnumMeta('FlagEnum', (IntEnum,), namespace)



class Backend(Enum):
OpenSSL = openssl.backend

Expand Down Expand Up @@ -343,6 +368,10 @@ class HashAlgorithm(IntEnum):
SHA384 = 0x09
SHA512 = 0x0A
SHA224 = 0x0B
#SHA3_256 = 13
#SHA3_384 = 14
#SHA3_512 = 15


def __init__(self, *args):
super(self.__class__, self).__init__()
Expand All @@ -365,6 +394,9 @@ def is_supported(self):
return True


secondPreimageResistantHashes = {HashAlgorithm.SHA1}
collisionResistantHashses = {HashAlgorithm.SHA256, HashAlgorithm.SHA384, HashAlgorithm.SHA512}

class RevocationReason(IntEnum):
"""Reasons explaining why a key or certificate was revoked."""
#: No reason was specified. This is the default reason.
Expand Down Expand Up @@ -540,3 +572,71 @@ class TrustFlags(FlagEnum):
SubRevoked = 0x40
Disabled = 0x80
PendingCheck = 0x100


class SecurityIssues(IntFlag):
OK = 0
wrongSig = (1 << 0)
expired = (1 << 1)
disabled = (1 << 2)
revoked = (1 << 3)
invalid = (1 << 4)
brokenAssymetricFunc = (1 << 5)
hashFunctionNotCollisionResistant = (1 << 6)
hashFunctionNotSecondPreimageResistant = (1 << 7)
assymetricKeyLengthIsTooShort = (1 << 8)
insecureCurve = (1 << 9)
noSelfSignature = (1 << 10)

# https://safecurves.cr.yp.to/
safeCurves = {
EllipticCurveOID.Curve25519,
EllipticCurveOID.Ed25519,
}

minimumAssymetricKeyLegths = {
PubKeyAlgorithm.RSAEncryptOrSign: 2048,
PubKeyAlgorithm.RSASign: 2048,
PubKeyAlgorithm.ElGamal: 2048,
PubKeyAlgorithm.DSA: 2048,

PubKeyAlgorithm.ECDSA: safeCurves,
PubKeyAlgorithm.EdDSA: safeCurves,
PubKeyAlgorithm.ECDH: safeCurves,
}



def is_hash_considered_secure(hash):
if hash in collisionResistantHashses:
return SecurityIssues.OK

warnings.warn("Hash function " + repr(hash) + " is not considered collision resistant")
issues = SecurityIssues.hashFunctionNotCollisionResistant

if hash not in secondPreimageResistantHashes:
issues |= hashFunctionNotSecondPreimageResistant

return issues

def check_assymetric_algo_and_its_parameters(algo, size):
if algo in minimumAssymetricKeyLegths:
minLOrSetOfSecureCurves = minimumAssymetricKeyLegths[algo]
if isinstance(minLOrSetOfSecureCurves, set): # ECC
curve = size
safeCurvesForThisAlg = minLOrSetOfSecureCurves
if curve in safeCurvesForThisAlg:
return SecurityIssues.OK
else:
warnings.warn("Curve " + repr(curve) + " is not considered secure for " + repr(algo))
return SecurityIssues.insecureCurve
else:
minL = minLOrSetOfSecureCurves
if size < minL:
warnings.warn("Assymetric algo " + repr(algo) + " needs key at least of " + repr(minL) + " bits effective length to be considered secure")
return SecurityIssues.assymetricKeyLengthIsTooShort
else:
return SecurityIssues.OK
else:
warnings.warn("Assymetric algo " + repr(algo) + " is not considered secure")
return SecurityIssues.brokenAssymetricFunc
88 changes: 84 additions & 4 deletions pgpy/pgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
from .constants import RevocationReason
from .constants import SignatureType
from .constants import SymmetricKeyAlgorithm
from .constants import SecurityIssues
from .constants import check_assymetric_algo_and_its_parameters
from .constants import is_hash_considered_secure

from .decorators import KeyAction

Expand Down Expand Up @@ -171,6 +174,13 @@ def hash_algorithm(self):
The :py:obj:`~constants.HashAlgorithm` used when computing this signature.
"""
return self._signature.halg


def check_primitives(self):
return is_hash_considered_secure(self.hash_algorithm)

def check_soundness(self):
return self.check_primitives()

@property
def is_expired(self):
Expand Down Expand Up @@ -1627,6 +1637,7 @@ def __init__(self):
self._signatures = SorteDeque()
self._uids = SorteDeque()
self._sibling = None
self._self_verified = None
self._require_usage_flags = True

def __bytearray__(self):
Expand Down Expand Up @@ -2350,6 +2361,60 @@ def bind(self, key, **prefs):

return self._sign(key, sig, **prefs)

def is_considered_insecure(self, self_verifying=False):
res = self.check_soundness(self_verifying=self_verifying)

for sk in self.subkeys.values():
res |= sk.check_soundness(self_verifying=self_verifying)
return res

def self_verify(self):
selfSigs = list(self.self_signatures)
res = SecurityIssues.OK
if selfSigs:
for s in selfSigs:
if not self.verify(self, s):
res |= SecurityIssues.invalid
break
else:
return SecurityIssues.noSelfSignature
return res

def _do_self_signatures_verification(self):
try:
self._self_verified = SecurityIssues.OK
self._self_verified = self.self_verify()
except:
self._self_verified = None
raise

@property
def self_verified(self):
warnings.warn("TODO: Self-sigs verification is not yet working because self-sigs are not parsed!!!")
return SecurityIssues.OK

if self._self_verified is None:
self._do_self_signatures_verification()

return self._self_verified

def check_primitives(self):
return check_assymetric_algo_and_its_parameters(self.key_algorithm, self.key_size)

def check_management(self, self_verifying=False):
res = self.self_verified
if self.is_expired:
warnings.warn("Key " + repr(self) + " has expired at " + str(self.expires_at))
res |= SecurityIssues.expired

warnings.warn("TODO: Revocation checks are not yet implemented!!!")
warnings.warn("TODO: Flags (s.a. `disabled`) checks are not yet implemented!!!")
res |= int(bool(list(self.revocation_signatures))) * SecurityIssues.revoked
return res

def check_soundness(self, self_verifying=False):
return self.check_management(self_verifying) | self.check_primitives()

def verify(self, subject, signature=None):
"""
Verify a subject with a signature using this key.
Expand Down Expand Up @@ -2412,11 +2477,26 @@ def _filter_sigs(sigs):
sigv &= self.subkeys[sig.signer].verify(subj, sig)

else:
verified = self._key.verify(sig.hashdata(subj), sig.__sig__, getattr(hashes, sig.hash_algorithm.name)())
if verified is NotImplemented:
raise NotImplementedError(sig.key_algorithm)
if isinstance(subj, PGPKey):
self_verifying = signerFp == subj.fingerprint
else:
self_verifying = False

subkey_issues = self.check_soundness(self_verifying)
signature_issues = self.check_primitives()

if self_verifying:
signature_issues &= ~SecurityIssues.hashFunctionNotCollisionResistant

issues = signature_issues | subkey_issues
if issues:
sigv.add_sigsubj(sig, self, subj, issues)
else:
verified = self._key.verify(sig.hashdata(subj), sig.__sig__, getattr(hashes, sig.hash_algorithm.name)())
if verified is NotImplemented:
raise NotImplementedError(sig.key_algorithm)

sigv.add_sigsubj(sig, self, subj, verified)
sigv.add_sigsubj(sig, self, subj, SecurityIssues.wrongSig if not verified else SecurityIssues.OK)

return sigv

Expand Down

0 comments on commit 002e1d5

Please sign in to comment.