Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AES GCM. #41

Merged
merged 3 commits into from Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.rst
Expand Up @@ -4,6 +4,9 @@ Changes
Unreleased
----------

- Add support for AES-GCM (A128GCM, A192GCM and A256GCM). `#41 <https://github.com/dajiaji/python-cwt/pull/41>`__
- Make key optional for KeyBuilder.from_symmetric_key. `#41 <https://github.com/dajiaji/python-cwt/pull/41>`__

Version 0.3.0
-------------

Expand Down
6 changes: 4 additions & 2 deletions cwt/key_builder.py
Expand Up @@ -32,7 +32,7 @@
from .cose_key import COSEKey
from .key_types.ec2 import EC2Key
from .key_types.okp import OKPKey
from .key_types.symmetric import AESCCMKey, HMACKey
from .key_types.symmetric import AESCCMKey, AESGCMKey, HMACKey


class KeyBuilder:
Expand Down Expand Up @@ -73,7 +73,7 @@ def __init__(self, options: Optional[Dict[str, Any]] = None):

def from_symmetric_key(
self,
key: Union[bytes, str],
key: Union[bytes, str] = b"",
alg: Union[int, str] = "HMAC 256/256",
kid: Union[bytes, str] = b"",
) -> COSEKey:
Expand Down Expand Up @@ -105,6 +105,8 @@ def from_symmetric_key(
kid = kid.encode("utf-8")
if kid:
cose_key[2] = kid
if alg_id in [1, 2, 3]:
return AESGCMKey(cose_key)
if alg_id in [4, 5, 6, 7]:
return HMACKey(cose_key)
if alg_id in [10, 11, 12, 13, 30, 31, 32, 33]:
Expand Down
77 changes: 71 additions & 6 deletions cwt/key_types/symmetric.py
@@ -1,12 +1,15 @@
import hashlib
import hmac
from secrets import token_bytes
from typing import Any, Dict, Optional

from cryptography.hazmat.primitives.ciphers.aead import AESCCM
from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESGCM

from ..cose_key import COSEKey
from ..exceptions import DecodeError, EncodeError, VerifyError

_CWT_DEFAULT_HMAC_KEY_SIZE = 32 # bytes


class SymmetricKey(COSEKey):
""""""
Expand All @@ -22,11 +25,10 @@ def __init__(self, cose_key: Dict[int, Any]):
raise ValueError("kty(1) should be Symmetric(4).")

# Validate k.
if -1 not in cose_key:
raise ValueError("k(-1) not found.")
if -1 in cose_key and not isinstance(cose_key[-1], bytes):
raise ValueError("k(-1) should be bytes(bstr).")
self._key = cose_key[-1]
if -1 in cose_key:
if not isinstance(cose_key[-1], bytes):
raise ValueError("k(-1) should be bytes(bstr).")
self._key = cose_key[-1]

if 3 not in cose_key:
raise ValueError("alg(3) not found.")
Expand All @@ -42,6 +44,8 @@ def __init__(self, cose_key: Dict[int, Any]):

self._hash_alg = None
self._trunc = 0
if not self._key:
self._key = token_bytes(_CWT_DEFAULT_HMAC_KEY_SIZE)

# Validate alg.
if self._alg == 4: # HMAC 256/64
Expand Down Expand Up @@ -85,55 +89,71 @@ def __init__(self, cose_key: Dict[int, Any]):

# Validate alg.
if self._alg == 10: # AES-CCM-16-64-128
if not self._key:
self._key = AESCCM.generate_key(bit_length=128)
if len(self._key) != 16:
raise ValueError(
"The length of AES-CCM-16-64-128 key should be 16 bytes."
)
self._cipher = AESCCM(self._key, tag_length=8)
self._nonce_len = 13
elif self._alg == 11: # AES-CCM-16-64-256
if not self._key:
self._key = AESCCM.generate_key(bit_length=256)
if len(self._key) != 32:
raise ValueError(
"The length of AES-CCM-16-64-256 key should be 32 bytes."
)
self._cipher = AESCCM(self._key, tag_length=8)
self._nonce_len = 13
elif self._alg == 12: # AES-CCM-64-64-128
if not self._key:
self._key = AESCCM.generate_key(bit_length=128)
if len(self._key) != 16:
raise ValueError(
"The length of AES-CCM-64-64-128 key should be 16 bytes."
)
self._cipher = AESCCM(self._key, tag_length=8)
self._nonce_len = 7
elif self._alg == 13: # AES-CCM-64-64-256
if not self._key:
self._key = AESCCM.generate_key(bit_length=256)
if len(self._key) != 32:
raise ValueError(
"The length of AES-CCM-64-64-256 key should be 32 bytes."
)
self._cipher = AESCCM(self._key, tag_length=8)
self._nonce_len = 7
elif self._alg == 30: # AES-CCM-16-128-128
if not self._key:
self._key = AESCCM.generate_key(bit_length=128)
if len(self._key) != 16:
raise ValueError(
"The length of AES-CCM-16-128-128 key should be 16 bytes."
)
self._cipher = AESCCM(self._key)
self._nonce_len = 13
elif self._alg == 31: # AES-CCM-16-128-256
if not self._key:
self._key = AESCCM.generate_key(bit_length=256)
if len(self._key) != 32:
raise ValueError(
"The length of AES-CCM-16-128-256 key should be 32 bytes."
)
self._cipher = AESCCM(self._key)
self._nonce_len = 13
elif self._alg == 32: # AES-CCM-64-128-128
if not self._key:
self._key = AESCCM.generate_key(bit_length=128)
if len(self._key) != 16:
raise ValueError(
"The length of AES-CCM-64-128-128 key should be 16 bytes."
)
self._cipher = AESCCM(self._key)
self._nonce_len = 7
elif self._alg == 33: # AES-CCM-64-128-256
if not self._key:
self._key = AESCCM.generate_key(bit_length=256)
if len(self._key) != 32:
raise ValueError(
"The length of AES-CCM-64-128-256 key should be 32 bytes."
Expand Down Expand Up @@ -164,3 +184,48 @@ def decrypt(self, msg: bytes, nonce: bytes, aad: Optional[bytes] = None) -> byte
return self._cipher.decrypt(nonce, msg, aad)
except Exception as err:
raise DecodeError("Failed to decrypt.") from err


class AESGCMKey(SymmetricKey):
""""""

def __init__(self, cose_key: Dict[int, Any]):
""""""
super().__init__(cose_key)

self._cipher: AESGCM

# Validate alg.
if self._alg == 1: # A128GCM
if not self._key:
self._key = AESGCM.generate_key(bit_length=128)
if len(self._key) != 16:
raise ValueError("The length of A128GCM key should be 16 bytes.")
elif self._alg == 2: # A192GCM
if not self._key:
self._key = AESGCM.generate_key(bit_length=192)
if len(self._key) != 24:
raise ValueError("The length of A192GCM key should be 24 bytes.")
elif self._alg == 3: # A256GCM
if not self._key:
self._key = AESGCM.generate_key(bit_length=256)
if len(self._key) != 32:
raise ValueError("The length of A256GCM key should be 32 bytes.")
else:
raise ValueError(f"Unsupported or unknown alg(3) for AES GCM: {self._alg}.")
self._cipher = AESGCM(self._key)
return

def encrypt(self, msg: bytes, nonce: bytes, aad: Optional[bytes] = None) -> bytes:
""""""
try:
return self._cipher.encrypt(nonce, msg, aad)
except Exception as err:
raise EncodeError("Failed to encrypt.") from err

def decrypt(self, msg: bytes, nonce: bytes, aad: Optional[bytes] = None) -> bytes:
""""""
try:
return self._cipher.decrypt(nonce, msg, aad)
except Exception as err:
raise DecodeError("Failed to decrypt.") from err
6 changes: 3 additions & 3 deletions docs/algorithms.rst
Expand Up @@ -79,11 +79,11 @@ COSE Algorithms
+------------------------+--------+-------+-----------------------------------------------------+
| ... |
+------------------------+--------+-------+-----------------------------------------------------+
| A128GCM | | 1 | AES-GCM mode w/ 128-bit key, 128-bit tag |
| A128GCM | | 1 | AES-GCM mode w/ 128-bit key, 128-bit tag |
+------------------------+--------+-------+-----------------------------------------------------+
| A192GCM | | 2 | AES-GCM mode w/ 192-bit key, 128-bit tag |
| A192GCM | | 2 | AES-GCM mode w/ 192-bit key, 128-bit tag |
+------------------------+--------+-------+-----------------------------------------------------+
| A256GCM | | 3 | AES-GCM mode w/ 256-bit key, 128-bit tag |
| A256GCM | | 3 | AES-GCM mode w/ 256-bit key, 128-bit tag |
+------------------------+--------+-------+-----------------------------------------------------+
| HMAC 256/64 | ✅ | 4 | HMAC w/ SHA-256 truncated to 64 bits |
+------------------------+--------+-------+-----------------------------------------------------+
Expand Down
21 changes: 21 additions & 0 deletions tests/test_cwt.py
Expand Up @@ -247,6 +247,27 @@ def test_cwt_encode_and_encrypt_with_valid_alg_aes_ccm(self, ctx, alg, nonce, ke
assert 2 in decoded and decoded[2] == "someone"
assert 7 in decoded and decoded[7] == b"123"

@pytest.mark.parametrize(
"alg, key",
[
("A128GCM", token_bytes(16)),
("A192GCM", token_bytes(24)),
("A256GCM", token_bytes(32)),
],
)
def test_cwt_encode_and_encrypt_with_valid_alg_aes_gcm(self, ctx, alg, key):
""""""
enc_key = cose_key.from_symmetric_key(key, alg=alg)
token = ctx.encode_and_encrypt(
{1: "https://as.example", 2: "someone", 7: b"123"},
enc_key,
nonce=token_bytes(12),
)
decoded = ctx.decode(token, enc_key)
assert 1 in decoded and decoded[1] == "https://as.example"
assert 2 in decoded and decoded[2] == "someone"
assert 7 in decoded and decoded[7] == b"123"

def test_cwt_encode_and_encrypt_with_tagged(self, ctx):
""""""
key = token_bytes(16)
Expand Down
33 changes: 30 additions & 3 deletions tests/test_key_builder.py
Expand Up @@ -45,12 +45,39 @@ def test_key_builder_from_symmetric_key_hmac(self, ctx, alg):

@pytest.mark.parametrize(
"alg",
["xxx", 3, 8, 9, 34],
[
"HMAC 256/64",
"HMAC 256/256",
"HMAC 384/384",
"HMAC 512/512",
"A128GCM",
"A192GCM",
"A256GCM",
"AES-CCM-16-64-128",
"AES-CCM-16-64-256",
"AES-CCM-64-64-128",
"AES-CCM-64-64-256",
"AES-CCM-16-128-128",
"AES-CCM-16-128-256",
"AES-CCM-64-128-128",
"AES-CCM-64-128-256",
],
)
def test_key_builder_from_symmetric_key_without_key(self, ctx, alg):
try:
k = ctx.from_symmetric_key(alg=alg)
assert k.kty == 4
except Exception:
pytest.fail("from_symmetric_key should not fail.")

@pytest.mark.parametrize(
"alg",
["xxx", 0, 8, 9, 34],
)
def test_key_builder_from_symmetric_key_with_invalid_alg(self, ctx, alg):
with pytest.raises(ValueError) as err:
res = ctx.from_symmetric_key("mysecretpassword", alg=alg)
pytest.fail("from_symmetric_key should fail: res=%s" % vars(res))
ctx.from_symmetric_key("mysecretpassword", alg=alg)
pytest.fail("from_symmetric_key should fail.")
assert f"Unsupported or unknown alg({alg})." in str(err.value)

@pytest.mark.parametrize(
Expand Down