Skip to content

Commit

Permalink
Merge pull request #73 from dajiaji/refine-cose
Browse files Browse the repository at this point in the history
Refine COSE API.
  • Loading branch information
dajiaji committed May 9, 2021
2 parents 46afe88 + 2fbe5ff commit 80f062d
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 94 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Expand Up @@ -4,6 +4,11 @@ Changes
Unreleased
----------

- Add support for bytes-formatted protected header. `#73 <https://github.com/dajiaji/python-cwt/pull/73>`__
- Derive alg from kty and crv on from_jwk. `#73 <https://github.com/dajiaji/python-cwt/pull/73>`__
- Add alg_auto_inclusion. `#73 <https://github.com/dajiaji/python-cwt/pull/73>`__
- Move nonce generation from CWT to COSE. `#73 <https://github.com/dajiaji/python-cwt/pull/73>`__
- Re-order arguments of COSE API. `#73 <https://github.com/dajiaji/python-cwt/pull/73>`__
- Add support for COSE algorithm names for KeyBuilder.from_jwk. `#72 <https://github.com/dajiaji/python-cwt/pull/72>`__
- Add tests based on COSE WG examples. `#72 <https://github.com/dajiaji/python-cwt/pull/72>`__
- Move parameter auto-gen function from CWT to COSE. `#72 <https://github.com/dajiaji/python-cwt/pull/72>`__
Expand Down
103 changes: 70 additions & 33 deletions cwt/cose.py
Expand Up @@ -21,38 +21,42 @@ def __init__(self, options: Optional[Dict[str, Any]] = None):
Args:
options (Optional[Dict[str, Any]]): Options for the initial configuration
of COSE. At this time, ``kid_auto_inclusion`` (default value: ``True`` )
is only supported. If it is ``True`` and a ``kid`` can be identified,
``kid`` parameter is automatically set to the unprotected header.
of COSE. At this time, ``kid_auto_inclusion`` (default value: ``True``)
and ``alg_auto_inclusion`` are supported.
"""
self._recipients_builder = RecipientsBuilder()
self._kid_auto_inclusion = True
self._alg_auto_inclusion = True
if not options:
return
if "kid_auto_inclusion" in options:
if not isinstance(options["kid_auto_inclusion"], bool):
raise ValueError("kid_auto_inclusion should be bool.")
self._kid_auto_inclusion = options["kid_auto_inclusion"]
if "alg_auto_inclusion" in options:
if not isinstance(options["alg_auto_inclusion"], bool):
raise ValueError("alg_auto_inclusion should be bool.")
self._alg_auto_inclusion = options["alg_auto_inclusion"]

def encode_and_mac(
self,
protected: Dict[int, Any],
unprotected: Dict[int, Any],
payload: bytes,
key: COSEKey,
protected: Union[Dict[int, Any], bytes] = {},
unprotected: Dict[int, Any] = {},
recipients: Optional[List[Recipient]] = None,
out: str = "",
) -> Union[bytes, CBORTag]:
"""
Encodes data with MAC.
Args:
protected (Dict[int, Any]): Parameters that are to be cryptographically
payload (bytes): A content to be MACed.
key (COSEKey): A COSE key as a MAC Authentication key.
protected (Union[Dict[int, Any], bytes]): Parameters that are to be cryptographically
protected.
unprotected (Dict[int, Any]): Parameters that are not cryptographically
protected.
payload (bytes): A content to be MACed.
key (COSEKey): A COSE key as a MAC Authentication key.
recipients (Optional[List[Recipient]]): A list of recipient information structures.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If ``"cbor2/CBORTag"``
is specified. This function will return encoded data as
Expand All @@ -66,13 +70,18 @@ def encode_and_mac(
"""

ctx = "MAC0" if not recipients else "MAC"
b_protected = b""

# MAC0
if not recipients:
protected[1] = key.alg
if self._kid_auto_inclusion and key.kid:
unprotected[4] = key.kid
b_protected = self._dumps(protected)
if isinstance(protected, bytes):
b_protected = protected
else:
if self._alg_auto_inclusion:
protected[1] = key.alg
if self._kid_auto_inclusion and key.kid:
unprotected[4] = key.kid
b_protected = self._dumps(protected)
mac_structure = [ctx, b_protected, b"", payload]
tag = key.sign(self._dumps(mac_structure))
res = CBORTag(17, [b_protected, unprotected, payload, tag])
Expand All @@ -83,14 +92,20 @@ def encode_and_mac(
for rec in recipients:
recs.append(rec.to_list())
if recipients[0].alg == -6:
protected[1] = key.alg
if self._kid_auto_inclusion and key.kid:
unprotected[4] = key.kid
if not isinstance(protected, bytes):
if self._alg_auto_inclusion:
protected[1] = key.alg
if self._kid_auto_inclusion and key.kid:
unprotected[4] = key.kid
else:
raise NotImplementedError(
"Algorithms other than direct are not supported for recipients."
)
b_protected = self._dumps(protected) if protected else b""

if isinstance(protected, bytes):
b_protected = protected
else:
b_protected = self._dumps(protected) if protected else b""
mac_structure = [ctx, b_protected, b"", payload]
tag = key.sign(self._dumps(mac_structure))
cose_mac: List[Any] = [b_protected, unprotected, payload, tag]
Expand All @@ -100,22 +115,22 @@ def encode_and_mac(

def encode_and_sign(
self,
protected: Dict[int, Any],
unprotected: Dict[int, Any],
payload: bytes,
key: Union[COSEKey, List[COSEKey]],
protected: Union[Dict[int, Any], bytes] = {},
unprotected: Dict[int, Any] = {},
out: str = "",
) -> Union[bytes, CBORTag]:
"""
Encodes data with signing.
Args:
protected (Dict[int, Any]): Parameters that are to be cryptographically
payload (bytes): A content to be signed.
key (Union[COSEKey, List[COSEKey]]): One or more COSE keys as signing keys.
protected (Union[Dict[int, Any], bytes]): Parameters that are to be cryptographically
protected.
unprotected (Dict[int, Any]): Parameters that are not cryptographically
protected.
payload (bytes): A content to be signed.
key (Union[COSEKey, List[COSEKey]]): One or more COSE keys as signing keys.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If ``"cbor2/CBORTag"``
is specified. This function will return encoded data as
`cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s ``CBORTag`` object.
Expand All @@ -128,12 +143,17 @@ def encode_and_sign(
"""

ctx = "Signature" if not isinstance(key, COSEKey) else "Signature1"
if isinstance(key, COSEKey):
protected[1] = key.alg
if isinstance(key, COSEKey) and isinstance(protected, dict):
if self._alg_auto_inclusion:
protected[1] = key.alg
if self._kid_auto_inclusion and key.kid:
unprotected[4] = key.kid

b_protected = self._dumps(protected) if protected else b""
b_protected = b""
if isinstance(protected, bytes):
b_protected = protected
else:
b_protected = self._dumps(protected) if protected else b""

# Signature1
if isinstance(key, COSEKey):
Expand All @@ -145,6 +165,7 @@ def encode_and_sign(
# Signature
sigs = []
for k in key:
print(f"alg={k.alg}")
p_header = self._dumps({1: k.alg})
u_header = {4: k.kid} if k.kid else {}
sig_structure = [ctx, b_protected, p_header, b"", payload]
Expand All @@ -155,10 +176,10 @@ def encode_and_sign(

def encode_and_encrypt(
self,
protected: Dict[int, Any],
unprotected: Dict[int, Any],
payload: bytes,
key: COSEKey,
protected: Union[Dict[int, Any], bytes] = {},
unprotected: Dict[int, Any] = {},
nonce: bytes = b"",
recipients: Optional[List[Recipient]] = None,
out: str = "",
Expand All @@ -167,12 +188,12 @@ def encode_and_encrypt(
Encodes data with encryption.
Args:
protected (Dict[int, Any]): Parameters that are to be cryptographically
payload (bytes): A content to be encrypted.
key (COSEKey): A COSE key as an encryption key.
protected (Union[Dict[int, Any], bytes]): Parameters that are to be cryptographically
protected.
unprotected (Dict[int, Any]): Parameters that are not cryptographically
protected.
payload (bytes): A content to be encrypted.
key (COSEKey): A COSE key as an encryption key.
nonce (bytes): A nonce for encryption.
recipients (Optional[List[Recipient]]): A list of recipient information structures.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If ``"cbor2/CBORTag"``
Expand All @@ -188,13 +209,25 @@ def encode_and_encrypt(

ctx = "Encrypt0" if not recipients else "Encrypt"

if not nonce:
try:
nonce = key.generate_nonce()
except NotImplementedError:
raise ValueError(
"Nonce generation is not supported for the key. Set a nonce explicitly."
)

# Encrypt0
if not recipients:
protected[1] = key.alg
if isinstance(protected, bytes):
b_protected = protected
else:
if self._alg_auto_inclusion:
protected[1] = key.alg
b_protected = self._dumps(protected) if protected else b""
if self._kid_auto_inclusion and key.kid:
unprotected[4] = key.kid
unprotected[5] = nonce
b_protected = self._dumps(protected) if protected else b""
enc_structure = [ctx, b_protected, b""]
aad = self._dumps(enc_structure)
ciphertext = key.encrypt(payload, nonce, aad)
Expand All @@ -206,15 +239,19 @@ def encode_and_encrypt(
for rec in recipients:
recs.append(rec.to_list())
if recipients[0].alg == -6:
protected[1] = key.alg
if not isinstance(protected, bytes) and self._alg_auto_inclusion:
protected[1] = key.alg
if self._kid_auto_inclusion and key.kid:
unprotected[4] = key.kid
unprotected[5] = nonce
else:
raise NotImplementedError(
"Algorithms other than direct are not supported for recipients."
)
b_protected = self._dumps(protected) if protected else b""
if isinstance(protected, bytes):
b_protected = protected
else:
b_protected = self._dumps(protected) if protected else b""
enc_structure = [ctx, b_protected, b""]
aad = self._dumps(enc_structure)
ciphertext = key.encrypt(payload, nonce, aad)
Expand Down
26 changes: 5 additions & 21 deletions cwt/cwt.py
Expand Up @@ -168,11 +168,9 @@ def encode_and_mac(
else:
claims = claims.to_dict()
self._set_default_value(claims)
protected: Dict[int, Any] = {}
unprotected: Dict[int, Any] = {}
b_claims = self._dumps(claims)
res = self._cose.encode_and_mac(
protected, unprotected, b_claims, key, recipients, out="cbor2/CBORTag"
b_claims, key, {}, {}, recipients, out="cbor2/CBORTag"
)
if tagged:
return self._dumps(CBORTag(CWT.CBOR_TAG, res))
Expand Down Expand Up @@ -205,12 +203,8 @@ def encode_and_sign(
else:
claims = claims.to_dict()
self._set_default_value(claims)
protected: Dict[int, Any] = {}
unprotected: Dict[int, Any] = {}
b_claims = self._dumps(claims)
res = self._cose.encode_and_sign(
protected, unprotected, b_claims, key, out="cbor2/CBORTag"
)
res = self._cose.encode_and_sign(b_claims, key, {}, {}, out="cbor2/CBORTag")
if tagged:
return self._dumps(CBORTag(CWT.CBOR_TAG, res))
return self._dumps(res)
Expand Down Expand Up @@ -245,26 +239,16 @@ def encode_and_encrypt(
else:
claims = claims.to_dict()
self._set_default_value(claims)
protected: Dict[int, Any] = {}
unprotected: Dict[int, Any] = {}
if not nonce:
try:
nonce = key.generate_nonce()
except NotImplementedError:
raise ValueError(
"Nonce generation is not supported for the key. Set a nonce explicitly."
)

b_claims: bytes = b""
b_claims = b""
if isinstance(claims, dict):
b_claims = self._dumps(claims)
else:
b_claims = claims
res = self._cose.encode_and_encrypt(
protected,
unprotected,
b_claims,
key,
{},
{},
nonce,
recipients,
out="cbor2/CBORTag",
Expand Down
18 changes: 15 additions & 3 deletions cwt/key_builder.py
Expand Up @@ -248,12 +248,24 @@ def from_jwk(self, data: Union[str, bytes, Dict[str, Any]]) -> COSEKey:
cose_key[-1] = JWK_ELLIPTIC_CURVES[jwk["crv"]]

if cose_key[1] == 1: # OKP
if 3 not in cose_key:
cose_key[3] = -8 # EdDSA
for k, v in jwk.items():
if k not in JWK_PARAMS_OKP:
continue
cose_key[JWK_PARAMS_OKP[k]] = base64url_decode(v)

else: # EC2
if 3 not in cose_key:
if cose_key[-1] == 1: # P-256
cose_key[3] = -7
elif cose_key[-1] == 2: # P-384
cose_key[3] = -35
elif cose_key[-1] == 3: # P-521
cose_key[3] = -36
else:
# cose_key[-1] == 8 # secp256k1
cose_key[3] = -47
for k, v in jwk.items():
if k not in JWK_PARAMS_EC:
continue
Expand Down Expand Up @@ -510,11 +522,11 @@ def to_encrypted_cose_key(
unprotected[5] = nonce
b_payload = self._dumps(key.to_dict())
res: CBORTag = self._cose.encode_and_encrypt(
protected,
unprotected,
b_payload,
encryption_key,
nonce,
protected,
unprotected,
nonce=nonce,
out="cbor2/CBORTag",
)
return res.value
Expand Down
3 changes: 1 addition & 2 deletions tests/keys/private_key_ed25519.json
Expand Up @@ -4,6 +4,5 @@
"use": "sig",
"crv": "Ed25519",
"kid": "Ed25519-01",
"x": "2E6dX83gqD_D0eAmqnaHe1TC1xuld6iAKXfw2OVATr0",
"alg": "EdDSA"
"x": "2E6dX83gqD_D0eAmqnaHe1TC1xuld6iAKXfw2OVATr0"
}
3 changes: 1 addition & 2 deletions tests/keys/private_key_ed448.json
Expand Up @@ -4,6 +4,5 @@
"use": "sig",
"crv": "Ed448",
"kid": "Ed448-01",
"x": "25isUWIosUkM2ynOPFP5t7BbwM1_iFQmKBpHvA0hgXpRX6yyu-nq6BBmpS3J0DYTlZIoA4qwgSqA",
"alg": "EdDSA"
"x": "25isUWIosUkM2ynOPFP5t7BbwM1_iFQmKBpHvA0hgXpRX6yyu-nq6BBmpS3J0DYTlZIoA4qwgSqA"
}
3 changes: 1 addition & 2 deletions tests/keys/private_key_es256.json
Expand Up @@ -5,6 +5,5 @@
"crv": "P-256",
"kid": "P-256-01",
"x": "-eZXC6nV-xgthy8zZMCN8pcYSeE2XfWWqckA2fsxHPc",
"y": "BGU5soLgsu_y7GN2I3EPUXS9EZ7Sw0qif-V70JtInFI",
"alg": "ES256"
"y": "BGU5soLgsu_y7GN2I3EPUXS9EZ7Sw0qif-V70JtInFI"
}
3 changes: 1 addition & 2 deletions tests/keys/private_key_es256k.json
Expand Up @@ -5,6 +5,5 @@
"crv": "secp256k1",
"kid": "secp256k1-01",
"x": "6_W-2tVwr9SU0Ts9sFmZgt8JrlxDi8d6G5ltwL2futU",
"y": "mEOme4oiQsXnNwtlfyH5hEkvLDxrtqDUY0UXBIMRuL0",
"alg": "ES256K"
"y": "mEOme4oiQsXnNwtlfyH5hEkvLDxrtqDUY0UXBIMRuL0"
}
3 changes: 1 addition & 2 deletions tests/keys/private_key_es384.json
Expand Up @@ -5,6 +5,5 @@
"crv": "P-384",
"kid": "P-384-01",
"x": "_XyN9woHaS0mPimSW-etwJMEDSzxIMjp4PjezavU8SHJoClz1bQrcmPb1ZJxHxhI",
"y": "GCNfc32p9sRotx7u2oDGJ3Eqz6q5zPHLdizNn83oRsUTN31eCWfGLHWRury3xF50",
"alg": "ES384"
"y": "GCNfc32p9sRotx7u2oDGJ3Eqz6q5zPHLdizNn83oRsUTN31eCWfGLHWRury3xF50"
}
3 changes: 1 addition & 2 deletions tests/keys/private_key_es512.json
Expand Up @@ -5,6 +5,5 @@
"crv": "P-521",
"kid": "P-521-01",
"x": "APkZitSJMJUMB-iPCt47sWu_CrnUHg6IAR4qjmHON-2u41Rjg6DNOS0LZYJJt-AVH5NgGVi8ElIfjo71b9HXCTOc",
"y": "ASx-Cb--149HJ-e1KlSaY-1BOhwOdcTkxSt8BGbW7_hnGfzHsoXM3ywwNcp1Yad-FHUKwmCyMelMQEn2Rh4V2l3I",
"alg": "ES512"
"y": "ASx-Cb--149HJ-e1KlSaY-1BOhwOdcTkxSt8BGbW7_hnGfzHsoXM3ywwNcp1Yad-FHUKwmCyMelMQEn2Rh4V2l3I"
}

0 comments on commit 80f062d

Please sign in to comment.