Skip to content

Commit

Permalink
Merge pull request #109 from letmaik/letmaik/cryptography-interop
Browse files Browse the repository at this point in the history
Add `from_pem_private_key`/`from_pem_public_key` methods
  • Loading branch information
TimothyClaeys committed Dec 15, 2023
2 parents 8883ffd + b9b8f22 commit b4f5f96
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 41 deletions.
19 changes: 19 additions & 0 deletions docs/pycose/keys/cosekey.rst
Expand Up @@ -35,6 +35,25 @@ The :class:`~pycose.keys.cosekey.CoseKey` class can be used to decode serialized
>>> cosekey.d
b'\x8fx\x1a\tSr\xf8[m\x9fa\t\xaeB&\x11sM}\xbf\xa0\x06\x9a-\xf2\x93[\xb2\xe0S\xbf5'

Alternatively, :class:`~pycose.keys.cosekey.CoseKey` objects can be initialized from PEM-encoded keys:

.. _`pyca/cryptography`: https://cryptography.io/

.. doctest::
:pyversion: >= 3.6

>>> from pycose.keys import CoseKey

>>> pem = '-----BEGIN PUBLIC KEY-----\n' \
... 'MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEyIBhex88X7Yrh5Q4hbmsUYpcVWNj\n' \
... 'mx1oE7TPomgpZJcQeNC3bX++GPsIWewWEGGFJKwHtRyfrL61DTTym3Rp8A==\n' \
... '-----END PUBLIC KEY-----\n'

>>> cosekey = CoseKey.from_pem_public_key(pem)
>>> cosekey
<COSE_Key(EC2Key): {'EC2KpY': "b'\\x10x\\xd0\\xb7m' ... (32 B)", 'EC2KpX': "b'\\xc8\\x80a{\\x1f' ... (32 B)", 'EC2KpCurve': 'P256', 'KpKty': 'KtyEC2'}>


Overview
--------

Expand Down
50 changes: 50 additions & 0 deletions pycose/keys/cosekey.py
Expand Up @@ -5,6 +5,8 @@

import cbor2

from cryptography.hazmat.primitives.serialization import NoEncryption, load_pem_private_key, load_pem_public_key

from pycose import utils
from pycose.algorithms import CoseAlgorithm
from pycose.exceptions import CoseException, CoseIllegalKeyType, CoseIllegalAlgorithm, CoseIllegalKeyOps
Expand Down Expand Up @@ -94,6 +96,54 @@ def from_dict(cls, received: dict) -> 'CK':

return key_obj

@staticmethod
def from_pem_private_key(
pem: str,
password: Optional[bytes] = None,
optional_params: Optional[dict] = None
) -> "CoseKey":
"""
Initialize a COSE key from a PEM-encoded private key.
:param pem: PEM-encoded private key.
:param password: Password to decrypt the key.
:return: an initialized CoseKey object.
"""
ext_key = load_pem_private_key(pem.encode(), password)
return CoseKey._from_cryptography_key(ext_key, optional_params)

@staticmethod
def from_pem_public_key(
pem: str,
optional_params: Optional[dict] = None
) -> "CoseKey":
"""
Initialize a COSE key from a PEM-encoded public key.
:param pem: PEM-encoded public key.
:return: an initialized CoseKey object.
"""
ext_key = load_pem_public_key(pem.encode())
return CoseKey._from_cryptography_key(ext_key, optional_params)

@staticmethod
def _from_cryptography_key(
ext_key,
optional_params: Optional[dict] = None
) -> "CoseKey":
"""
Initialize a COSE key from a cryptography key.
:param ext_key: A cryptography key.
:param optional_params: Optional parameters to add to the key.
:return: An initialized COSE Key object.
"""

for key_type in CoseKey._key_types.values():
if key_type._supports_cryptography_key_type(ext_key):
return key_type._from_cryptography_key(ext_key, optional_params)
raise CoseIllegalKeyType(f"Unsupported key type: {type(ext_key)}")

@staticmethod
def base64decode(to_decode: str) -> bytes:
"""
Expand Down
68 changes: 57 additions & 11 deletions pycose/keys/ec2.py
Expand Up @@ -9,10 +9,10 @@
from pycose.keys.keyops import SignOp, VerifyOp, DeriveKeyOp, DeriveBitsOp
from pycose.keys.keyparam import EC2KeyParam, EC2KpCurve, EC2KpX, EC2KpY, EC2KpD, KpKty, KeyParam
from pycose.keys.keytype import KtyEC2
from pycose.keys.curves import CoseCurve

if TYPE_CHECKING:
from pycose.keys.keyops import KEYOPS
from pycose.keys.curves import CoseCurve


@CoseKey.record_kty(KtyEC2)
Expand Down Expand Up @@ -42,6 +42,59 @@ def from_dict(cls, cose_key: dict) -> 'EC2Key':

return cls(crv=curve, x=x, y=y, d=d, optional_params=_optional_params, allow_unknown_key_attrs=True)

@staticmethod
def _from_cryptography_key(
ext_key: Union[ec.EllipticCurvePrivateKey, ec.EllipticCurvePublicKey],
optional_params: Optional[dict] = None
) -> 'EC2Key':
"""
Returns an initialized COSE Key object of type EC2Key.
:param ext_key: Python cryptography key.
:return: an initialized EC key
"""
if not EC2Key._supports_cryptography_key_type(ext_key):
raise CoseIllegalKeyType(f"Unsupported key type: {type(ext_key)}")

if isinstance(ext_key, ec.EllipticCurvePrivateKey):
priv_nums = ext_key.private_numbers()
pub_nums = priv_nums.public_numbers
else:
priv_nums = None
pub_nums = ext_key.public_numbers()

curves = {
type(curve_cls.curve_obj): curve_cls
for curve_cls in CoseCurve.get_registered_classes().values()
if curve_cls.key_type == KtyEC2
}

if type(pub_nums.curve) not in curves:
raise CoseUnsupportedCurve(f"Unsupported EC Curve: {type(pub_nums.curve)}")
curve = curves[type(pub_nums.curve)]

cose_key = {}
if pub_nums:
cose_key.update(
{
EC2KpCurve: curve,
EC2KpX: pub_nums.x.to_bytes(curve.size, "big"),
EC2KpY: pub_nums.y.to_bytes(curve.size, "big"),
}
)
if priv_nums:
cose_key.update(
{
EC2KpD: priv_nums.private_value.to_bytes(curve.size, "big"),
}
)
if optional_params:
cose_key.update(optional_params)
return EC2Key.from_dict(cose_key)

@staticmethod
def _supports_cryptography_key_type(ext_key) -> bool:
return isinstance(ext_key, (ec.EllipticCurvePrivateKey, ec.EllipticCurvePublicKey))

@staticmethod
def _key_transform(key: Union[Type['EC2KeyParam'], Type['KeyParam'], str, int], allow_unknown_attrs: bool = False):
return EC2KeyParam.from_id(key, allow_unknown_attrs)
Expand Down Expand Up @@ -220,16 +273,9 @@ def generate_key(cls, crv: Union[Type['CoseCurve'], str, int], optional_params:
if crv.key_type != KtyEC2:
raise CoseUnsupportedCurve(f'Unsupported COSE curve: {crv}')

private_key = ec.generate_private_key(crv.curve_obj, backend=default_backend())
d_value = private_key.private_numbers().private_value
x_coor = private_key.public_key().public_numbers().x
y_coor = private_key.public_key().public_numbers().y

return EC2Key(crv=crv,
d=d_value.to_bytes(crv.size, "big"),
x=x_coor.to_bytes(crv.size, "big"),
y=y_coor.to_bytes(crv.size, "big"),
optional_params=optional_params)
ext_key = ec.generate_private_key(crv.curve_obj, backend=default_backend())

return cls._from_cryptography_key(ext_key, optional_params)

def __delitem__(self, key):
if self._key_transform(key) != KpKty and self._key_transform(key) != EC2KpCurve:
Expand Down
80 changes: 67 additions & 13 deletions pycose/keys/okp.py
@@ -1,18 +1,20 @@
from typing import Optional, Type, Union, List, TYPE_CHECKING

from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import PrivateFormat, PublicFormat, Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat, PublicFormat, Encoding, NoEncryption
from cryptography.hazmat.primitives.asymmetric import ed25519, ed448, x25519, x448

from pycose import utils
from pycose.exceptions import CoseUnsupportedCurve, CoseInvalidKey, CoseIllegalKeyType, CoseIllegalKeyOps
from pycose.keys.cosekey import CoseKey, KpKty
from pycose.keys.keyops import KEYOPS, SignOp, VerifyOp, DeriveBitsOp, DeriveKeyOp
from pycose.keys.keyparam import OKPKeyParam, OKPKpCurve, OKPKpX, OKPKpD, KeyParam
from pycose.keys.keytype import KtyOKP
from pycose.keys.curves import CoseCurve
from pycose.keys import curves

if TYPE_CHECKING:
from pycose.keys.keyops import KEYOPS
from pycose.keys.curves import CoseCurve


@CoseKey.record_kty(KtyOKP)
Expand Down Expand Up @@ -41,6 +43,67 @@ def from_dict(cls, cose_key: dict) -> 'OKPKey':

return cls(crv=curve, x=x, d=d, optional_params=_optional_params, allow_unknown_key_attrs=True)

@staticmethod
def _from_cryptography_key(
ext_key: Union[
ed25519.Ed25519PrivateKey, ed25519.Ed25519PublicKey,
ed448.Ed448PrivateKey, ed448.Ed448PublicKey,
x25519.X25519PrivateKey, x25519.X25519PublicKey,
x448.X448PrivateKey, x448.X448PublicKey
],
optional_params: Optional[dict] = None,
) -> 'OKPKey':
"""
Returns an initialized COSE Key object of type OKPKey.
:param ext_key: Python cryptography key.
:return: an initialized OKP key
"""

curve = OKPKey._curve_from_cryptography_key(ext_key)

if hasattr(ext_key, 'private_bytes'):
priv_bytes = ext_key.private_bytes(
encoding=Encoding.Raw,
format=PrivateFormat.Raw,
encryption_algorithm=NoEncryption(),
)
pub_bytes = ext_key.public_key().public_bytes(
encoding=Encoding.Raw, format=PublicFormat.Raw
)
else:
priv_bytes = None
pub_bytes = ext_key.public_bytes(encoding=Encoding.Raw, format=PublicFormat.Raw)

cose_key = {
OKPKpCurve: curve,
OKPKpX: pub_bytes,
}
if priv_bytes:
cose_key[OKPKpD] = priv_bytes
if optional_params:
cose_key.update(optional_params)
return OKPKey.from_dict(cose_key)

@staticmethod
def _curve_from_cryptography_key(ext_key) -> Type[CoseCurve]:
if isinstance(ext_key, (ed25519.Ed25519PrivateKey, ed25519.Ed25519PublicKey)):
return curves.Ed25519
if isinstance(ext_key, (ed448.Ed448PrivateKey, ed448.Ed448PublicKey)):
return curves.Ed448
if isinstance(ext_key, (x25519.X25519PrivateKey, x25519.X25519PublicKey)):
return curves.X25519
if isinstance(ext_key, (x448.X448PrivateKey, x448.X448PublicKey)):
return curves.X448
raise CoseIllegalKeyType(f"Unsupported key type: {type(ext_key)}")

@classmethod
def _supports_cryptography_key_type(cls, ext_key) -> bool:
try:
cls._curve_from_cryptography_key(ext_key)
except CoseIllegalKeyType:
return False
return True

@staticmethod
def _key_transform(key: Union[Type['OKPKeyParam'], Type['KeyParam'], str, int],
allow_unknown_attrs: bool = False):
Expand Down Expand Up @@ -175,18 +238,9 @@ def generate_key(cls, crv: Union[Type['CoseCurve'], str, int], optional_params:
if crv.key_type != KtyOKP:
raise CoseUnsupportedCurve(f'Unsupported COSE curve: {crv}')

encoding = Encoding(serialization.Encoding.Raw)
private_format = PrivateFormat(serialization.PrivateFormat.Raw)
public_format = PublicFormat(serialization.PublicFormat.Raw)
encryption = serialization.NoEncryption()

private_key = crv.curve_obj.generate()
ext_key = crv.curve_obj.generate()

return OKPKey(
crv=crv,
x=private_key.public_key().public_bytes(encoding, public_format),
d=private_key.private_bytes(encoding, private_format, encryption),
optional_params=optional_params)
return cls._from_cryptography_key(ext_key, optional_params)

def __delitem__(self, key: Union['KeyParam', str, int]):
if self._key_transform(key) != KpKty and self._key_transform(key) != OKPKpCurve:
Expand Down
65 changes: 48 additions & 17 deletions pycose/keys/rsa.py
Expand Up @@ -14,6 +14,9 @@
from pycose.keys.keyops import KEYOPS
from pycose.keys.keyparam import KeyParam

def to_bstr(dec):
blen = (dec.bit_length() + 7) // 8
return dec.to_bytes(blen, byteorder="big")

@CoseKey.record_kty(KtyRSA)
class RSAKey(CoseKey):
Expand Down Expand Up @@ -71,6 +74,47 @@ def from_dict(cls, cose_key: dict) -> 'RSAKey':
optional_params=_optional_params,
allow_unknown_key_attrs=True)

@staticmethod
def _from_cryptography_key(
ext_key: Union[rsa.RSAPrivateKey, rsa.RSAPublicKey],
optional_params: Optional[dict] = None
) -> 'RSAKey':
"""
Returns an initialized COSE Key object of type RSAKey.
:param ext_key: Python cryptography key.
:return: an initialized RSA key
"""
if not RSAKey._supports_cryptography_key_type(ext_key):
raise CoseIllegalKeyType(f"Unsupported key type: {type(ext_key)}")

if isinstance(ext_key, rsa.RSAPrivateKey):
priv_nums = ext_key.private_numbers()
pub_nums = priv_nums.public_numbers
else:
priv_nums = None
pub_nums = ext_key.public_numbers()

cose_key = {
RSAKpE: to_bstr(pub_nums.e),
RSAKpN: to_bstr(pub_nums.n),
}
if priv_nums:
cose_key.update({
RSAKpD: to_bstr(priv_nums.d),
RSAKpP: to_bstr(priv_nums.p),
RSAKpQ: to_bstr(priv_nums.q),
RSAKpDP: to_bstr(priv_nums.dmp1),
RSAKpDQ: to_bstr(priv_nums.dmq1),
RSAKpQInv: to_bstr(priv_nums.iqmp),
})
if optional_params:
cose_key.update(optional_params)
return RSAKey.from_dict(cose_key)

@staticmethod
def _supports_cryptography_key_type(ext_key) -> bool:
return isinstance(ext_key, (rsa.RSAPrivateKey, rsa.RSAPublicKey))

@staticmethod
def _key_transform(key: Union[Type['RSAKeyParam'], Type['KeyParam'], str, int], allow_unknown_attrs: bool = False):
return RSAKeyParam.from_id(key, allow_unknown_attrs)
Expand Down Expand Up @@ -255,8 +299,8 @@ def key_ops(self, new_key_ops: List[Type['KEYOPS']]) -> None:
else:
CoseKey.key_ops.fset(self, new_key_ops)

@staticmethod
def generate_key(key_bits: int, optional_params: dict = None) -> 'RSAKey':
@classmethod
def generate_key(cls, key_bits: int, optional_params: dict = None) -> 'RSAKey':
"""
Generate a random RSAKey COSE key object. The RSA keys have two primes (see section 4 of RFC 8230).
Expand All @@ -266,22 +310,9 @@ def generate_key(key_bits: int, optional_params: dict = None) -> 'RSAKey':
:return: An COSE `RSAKey` key.
"""

key = rsa.generate_private_key(public_exponent=65537, key_size=key_bits, backend=default_backend())

private_numbers = key.private_numbers()
p = private_numbers.p.to_bytes((private_numbers.p.bit_length() + 7) // 8, byteorder='big')
q = private_numbers.q.to_bytes((private_numbers.q.bit_length() + 7) // 8, byteorder='big')
d = private_numbers.d.to_bytes((private_numbers.d.bit_length() + 7) // 8, byteorder='big')
dp = private_numbers.dmp1.to_bytes((private_numbers.dmp1.bit_length() + 7) // 8, byteorder='big')
dq = private_numbers.dmq1.to_bytes((private_numbers.dmq1.bit_length() + 7) // 8, byteorder='big')
qinv = private_numbers.iqmp.to_bytes((private_numbers.iqmp.bit_length() + 7) // 8, byteorder='big')

public_numbers = private_numbers.public_numbers

n = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, byteorder='big')
e = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, byteorder='big')
ext_key = rsa.generate_private_key(public_exponent=65537, key_size=key_bits, backend=default_backend())

return RSAKey(n=n, e=e, d=d, p=p, q=q, dp=dp, dq=dq, qinv=qinv, optional_params=optional_params)
return cls._from_cryptography_key(ext_key, optional_params)

def __repr__(self):
hdr = f'<COSE_Key(RSAKey): {self._key_repr()}>'
Expand Down
4 changes: 4 additions & 0 deletions pycose/keys/symmetric.py
Expand Up @@ -33,6 +33,10 @@ def from_dict(cls, cose_key: dict) -> 'SymmetricKey':

return cls(k=k, optional_params=_optional_params, allow_unknown_key_attrs=True)

@staticmethod
def _supports_cryptography_key_type(ext_key) -> bool:
return False

@staticmethod
def _key_transform(key: Union[Type['SymmetricKeyParam'], Type['KeyParam'], str, int],
allow_unknown_attrs: bool = False):
Expand Down

0 comments on commit b4f5f96

Please sign in to comment.