Skip to content

Commit

Permalink
Merge pull request #41 from dajiaji/add-support-for-aes-gcm
Browse files Browse the repository at this point in the history
Add support for AES GCM.
  • Loading branch information
dajiaji committed Apr 29, 2021
2 parents 61c12d4 + 7fe2eb9 commit 1890472
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 28 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Expand Up @@ -4,6 +4,9 @@ Changes
Unreleased
----------

- Add support for AES-GCM (A128GCM, A192GCM and A256GCM). `#41 <https://github.com/dajiaji/python-cwt/pull/41>`__
- Make key optional for KeyBuilder.from_symmetric_key. `#41 <https://github.com/dajiaji/python-cwt/pull/41>`__

Version 0.3.0
-------------

Expand Down
6 changes: 4 additions & 2 deletions cwt/key_builder.py
Expand Up @@ -32,7 +32,7 @@
from .cose_key import COSEKey
from .key_types.ec2 import EC2Key
from .key_types.okp import OKPKey
from .key_types.symmetric import AESCCMKey, HMACKey
from .key_types.symmetric import AESCCMKey, AESGCMKey, HMACKey


class KeyBuilder:
Expand Down Expand Up @@ -73,7 +73,7 @@ def __init__(self, options: Optional[Dict[str, Any]] = None):

def from_symmetric_key(
self,
key: Union[bytes, str],
key: Union[bytes, str] = b"",
alg: Union[int, str] = "HMAC 256/256",
kid: Union[bytes, str] = b"",
) -> COSEKey:
Expand Down Expand Up @@ -105,6 +105,8 @@ def from_symmetric_key(
kid = kid.encode("utf-8")
if kid:
cose_key[2] = kid
if alg_id in [1, 2, 3]:
return AESGCMKey(cose_key)
if alg_id in [4, 5, 6, 7]:
return HMACKey(cose_key)
if alg_id in [10, 11, 12, 13, 30, 31, 32, 33]:
Expand Down
77 changes: 71 additions & 6 deletions cwt/key_types/symmetric.py
@@ -1,12 +1,15 @@
import hashlib
import hmac
from secrets import token_bytes
from typing import Any, Dict, Optional

from cryptography.hazmat.primitives.ciphers.aead import AESCCM
from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESGCM

from ..cose_key import COSEKey
from ..exceptions import DecodeError, EncodeError, VerifyError

_CWT_DEFAULT_HMAC_KEY_SIZE = 32 # bytes


class SymmetricKey(COSEKey):
""""""
Expand All @@ -22,11 +25,10 @@ def __init__(self, cose_key: Dict[int, Any]):
raise ValueError("kty(1) should be Symmetric(4).")

# Validate k.
if -1 not in cose_key:
raise ValueError("k(-1) not found.")
if -1 in cose_key and not isinstance(cose_key[-1], bytes):
raise ValueError("k(-1) should be bytes(bstr).")
self._key = cose_key[-1]
if -1 in cose_key:
if not isinstance(cose_key[-1], bytes):
raise ValueError("k(-1) should be bytes(bstr).")
self._key = cose_key[-1]

if 3 not in cose_key:
raise ValueError("alg(3) not found.")
Expand All @@ -42,6 +44,8 @@ def __init__(self, cose_key: Dict[int, Any]):

self._hash_alg = None
self._trunc = 0
if not self._key:
self._key = token_bytes(_CWT_DEFAULT_HMAC_KEY_SIZE)

# Validate alg.
if self._alg == 4: # HMAC 256/64
Expand Down Expand Up @@ -85,55 +89,71 @@ def __init__(self, cose_key: Dict[int, Any]):

# Validate alg.
if self._alg == 10: # AES-CCM-16-64-128
if not self._key:
self._key = AESCCM.generate_key(bit_length=128)
if len(self._key) != 16:
raise ValueError(
"The length of AES-CCM-16-64-128 key should be 16 bytes."
)
self._cipher = AESCCM(self._key, tag_length=8)
self._nonce_len = 13
elif self._alg == 11: # AES-CCM-16-64-256
if not self._key:
self._key = AESCCM.generate_key(bit_length=256)
if len(self._key) != 32:
raise ValueError(
"The length of AES-CCM-16-64-256 key should be 32 bytes."
)
self._cipher = AESCCM(self._key, tag_length=8)
self._nonce_len = 13
elif self._alg == 12: # AES-CCM-64-64-128
if not self._key:
self._key = AESCCM.generate_key(bit_length=128)
if len(self._key) != 16:
raise ValueError(
"The length of AES-CCM-64-64-128 key should be 16 bytes."
)
self._cipher = AESCCM(self._key, tag_length=8)
self._nonce_len = 7
elif self._alg == 13: # AES-CCM-64-64-256
if not self._key:
self._key = AESCCM.generate_key(bit_length=256)
if len(self._key) != 32:
raise ValueError(
"The length of AES-CCM-64-64-256 key should be 32 bytes."
)
self._cipher = AESCCM(self._key, tag_length=8)
self._nonce_len = 7
elif self._alg == 30: # AES-CCM-16-128-128
if not self._key:
self._key = AESCCM.generate_key(bit_length=128)
if len(self._key) != 16:
raise ValueError(
"The length of AES-CCM-16-128-128 key should be 16 bytes."
)
self._cipher = AESCCM(self._key)
self._nonce_len = 13
elif self._alg == 31: # AES-CCM-16-128-256
if not self._key:
self._key = AESCCM.generate_key(bit_length=256)
if len(self._key) != 32:
raise ValueError(
"The length of AES-CCM-16-128-256 key should be 32 bytes."
)
self._cipher = AESCCM(self._key)
self._nonce_len = 13
elif self._alg == 32: # AES-CCM-64-128-128
if not self._key:
self._key = AESCCM.generate_key(bit_length=128)
if len(self._key) != 16:
raise ValueError(
"The length of AES-CCM-64-128-128 key should be 16 bytes."
)
self._cipher = AESCCM(self._key)
self._nonce_len = 7
elif self._alg == 33: # AES-CCM-64-128-256
if not self._key:
self._key = AESCCM.generate_key(bit_length=256)
if len(self._key) != 32:
raise ValueError(
"The length of AES-CCM-64-128-256 key should be 32 bytes."
Expand Down Expand Up @@ -164,3 +184,48 @@ def decrypt(self, msg: bytes, nonce: bytes, aad: Optional[bytes] = None) -> byte
return self._cipher.decrypt(nonce, msg, aad)
except Exception as err:
raise DecodeError("Failed to decrypt.") from err


class AESGCMKey(SymmetricKey):
""""""

def __init__(self, cose_key: Dict[int, Any]):
""""""
super().__init__(cose_key)

self._cipher: AESGCM

# Validate alg.
if self._alg == 1: # A128GCM
if not self._key:
self._key = AESGCM.generate_key(bit_length=128)
if len(self._key) != 16:
raise ValueError("The length of A128GCM key should be 16 bytes.")
elif self._alg == 2: # A192GCM
if not self._key:
self._key = AESGCM.generate_key(bit_length=192)
if len(self._key) != 24:
raise ValueError("The length of A192GCM key should be 24 bytes.")
elif self._alg == 3: # A256GCM
if not self._key:
self._key = AESGCM.generate_key(bit_length=256)
if len(self._key) != 32:
raise ValueError("The length of A256GCM key should be 32 bytes.")
else:
raise ValueError(f"Unsupported or unknown alg(3) for AES GCM: {self._alg}.")
self._cipher = AESGCM(self._key)
return

def encrypt(self, msg: bytes, nonce: bytes, aad: Optional[bytes] = None) -> bytes:
""""""
try:
return self._cipher.encrypt(nonce, msg, aad)
except Exception as err:
raise EncodeError("Failed to encrypt.") from err

def decrypt(self, msg: bytes, nonce: bytes, aad: Optional[bytes] = None) -> bytes:
""""""
try:
return self._cipher.decrypt(nonce, msg, aad)
except Exception as err:
raise DecodeError("Failed to decrypt.") from err
6 changes: 3 additions & 3 deletions docs/algorithms.rst
Expand Up @@ -79,11 +79,11 @@ COSE Algorithms
+------------------------+--------+-------+-----------------------------------------------------+
| ... |
+------------------------+--------+-------+-----------------------------------------------------+
| A128GCM | | 1 | AES-GCM mode w/ 128-bit key, 128-bit tag |
| A128GCM | | 1 | AES-GCM mode w/ 128-bit key, 128-bit tag |
+------------------------+--------+-------+-----------------------------------------------------+
| A192GCM | | 2 | AES-GCM mode w/ 192-bit key, 128-bit tag |
| A192GCM | | 2 | AES-GCM mode w/ 192-bit key, 128-bit tag |
+------------------------+--------+-------+-----------------------------------------------------+
| A256GCM | | 3 | AES-GCM mode w/ 256-bit key, 128-bit tag |
| A256GCM | | 3 | AES-GCM mode w/ 256-bit key, 128-bit tag |
+------------------------+--------+-------+-----------------------------------------------------+
| HMAC 256/64 || 4 | HMAC w/ SHA-256 truncated to 64 bits |
+------------------------+--------+-------+-----------------------------------------------------+
Expand Down
21 changes: 21 additions & 0 deletions tests/test_cwt.py
Expand Up @@ -247,6 +247,27 @@ def test_cwt_encode_and_encrypt_with_valid_alg_aes_ccm(self, ctx, alg, nonce, ke
assert 2 in decoded and decoded[2] == "someone"
assert 7 in decoded and decoded[7] == b"123"

@pytest.mark.parametrize(
"alg, key",
[
("A128GCM", token_bytes(16)),
("A192GCM", token_bytes(24)),
("A256GCM", token_bytes(32)),
],
)
def test_cwt_encode_and_encrypt_with_valid_alg_aes_gcm(self, ctx, alg, key):
""""""
enc_key = cose_key.from_symmetric_key(key, alg=alg)
token = ctx.encode_and_encrypt(
{1: "https://as.example", 2: "someone", 7: b"123"},
enc_key,
nonce=token_bytes(12),
)
decoded = ctx.decode(token, enc_key)
assert 1 in decoded and decoded[1] == "https://as.example"
assert 2 in decoded and decoded[2] == "someone"
assert 7 in decoded and decoded[7] == b"123"

def test_cwt_encode_and_encrypt_with_tagged(self, ctx):
""""""
key = token_bytes(16)
Expand Down
33 changes: 30 additions & 3 deletions tests/test_key_builder.py
Expand Up @@ -45,12 +45,39 @@ def test_key_builder_from_symmetric_key_hmac(self, ctx, alg):

@pytest.mark.parametrize(
"alg",
["xxx", 3, 8, 9, 34],
[
"HMAC 256/64",
"HMAC 256/256",
"HMAC 384/384",
"HMAC 512/512",
"A128GCM",
"A192GCM",
"A256GCM",
"AES-CCM-16-64-128",
"AES-CCM-16-64-256",
"AES-CCM-64-64-128",
"AES-CCM-64-64-256",
"AES-CCM-16-128-128",
"AES-CCM-16-128-256",
"AES-CCM-64-128-128",
"AES-CCM-64-128-256",
],
)
def test_key_builder_from_symmetric_key_without_key(self, ctx, alg):
try:
k = ctx.from_symmetric_key(alg=alg)
assert k.kty == 4
except Exception:
pytest.fail("from_symmetric_key should not fail.")

@pytest.mark.parametrize(
"alg",
["xxx", 0, 8, 9, 34],
)
def test_key_builder_from_symmetric_key_with_invalid_alg(self, ctx, alg):
with pytest.raises(ValueError) as err:
res = ctx.from_symmetric_key("mysecretpassword", alg=alg)
pytest.fail("from_symmetric_key should fail: res=%s" % vars(res))
ctx.from_symmetric_key("mysecretpassword", alg=alg)
pytest.fail("from_symmetric_key should fail.")
assert f"Unsupported or unknown alg({alg})." in str(err.value)

@pytest.mark.parametrize(
Expand Down

0 comments on commit 1890472

Please sign in to comment.