Skip to content

Commit

Permalink
Add Signer for COSE.encode_and_sign.
Browse files Browse the repository at this point in the history
  • Loading branch information
dajiaji committed Jun 4, 2021
1 parent 7fdb889 commit 7687f2a
Show file tree
Hide file tree
Showing 11 changed files with 654 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Expand Up @@ -4,6 +4,7 @@ Changes
Unreleased
----------

- Add Signer for encode_and_sign function. `#114 <https://github.com/dajiaji/python-cwt/pull/114>`__
- Divide CWT options into independent parameters. `#113 <https://github.com/dajiaji/python-cwt/pull/113>`__

Version 0.8.1
Expand Down
2 changes: 2 additions & 0 deletions cwt/__init__.py
Expand Up @@ -13,6 +13,7 @@
from .encrypted_cose_key import EncryptedCOSEKey
from .exceptions import CWTError, DecodeError, EncodeError, VerifyError
from .recipient import Recipient
from .signer import Signer

__version__ = "0.8.1"
__title__ = "cwt"
Expand All @@ -37,6 +38,7 @@
"EncryptedCOSEKey",
"Claims",
"Recipient",
"Signer",
"CWTError",
"EncodeError",
"DecodeError",
Expand Down
98 changes: 59 additions & 39 deletions cwt/const.py
Expand Up @@ -69,40 +69,6 @@
"WalnutDSA": 6, # WalnutDSA public key
}

# JWK Parameters
JWK_PARAMS_COMMON = {
"kty": 1,
"kid": 2,
"alg": 3,
# "use": *,
"key_ops": 4,
}

JWK_PARAMS_OKP = {
# "crv": -1,
"x": -2,
"d": -4,
}

JWK_PARAMS_EC = {
# "crv": -1,
"x": -2,
"y": -3,
"d": -4,
}

JWK_PARAMS_RSA = {
"n": -1,
"e": -2,
"d": -3,
"p": -4,
"q": -5,
"dp": -6,
"dq": -7,
"qi": -8,
"oth": -9,
}

COSE_KEY_PARAMS_SYMMETRIC = {
"k": -1,
}
Expand Down Expand Up @@ -192,6 +158,19 @@
# etc.
}

# COSE Algorithms for Signature with OKP.
COSE_ALGORITHMS_SIG_OKP = {
"EdDSA": -8,
}

# COSE Algorithms for Signature with EC2.
COSE_ALGORITHMS_SIG_EC2 = {
"ES256K": -47,
"ES512": -36,
"ES384": -35,
"ES256": -7,
}

# COSE Algorithms for Signature with RSA.
COSE_ALGORITHMS_SIG_RSA = {
"R1": -65535, # RSASSA-PKCS1-v1_5 using SHA-1 (No plan to support)
Expand Down Expand Up @@ -226,6 +205,40 @@
"HS512": 7,
}

# JWK Parameters
JWK_PARAMS_COMMON = {
"kty": 1,
"kid": 2,
"alg": 3,
# "use": *,
"key_ops": 4,
}

JWK_PARAMS_OKP = {
# "crv": -1,
"x": -2,
"d": -4,
}

JWK_PARAMS_EC = {
# "crv": -1,
"x": -2,
"y": -3,
"d": -4,
}

JWK_PARAMS_RSA = {
"n": -1,
"e": -2,
"d": -3,
"p": -4,
"q": -5,
"dp": -6,
"dq": -7,
"qi": -8,
"oth": -9,
}

JWK_TYPES = {
"OKP": 1,
"EC": 2, # EC2
Expand Down Expand Up @@ -256,16 +269,23 @@
}

# COSE Algorithms for recipients.
COSE_ALGORITHMS_RECIPIENT = dict(COSE_ALGORITHMS_CKDM, **COSE_ALGORITHMS_KEY_WRAP)
COSE_ALGORITHMS_RECIPIENT = {**COSE_ALGORITHMS_CKDM, **COSE_ALGORITHMS_KEY_WRAP}

# COSE Algorithms for Symmetric Keys.
COSE_ALGORITHMS_SYMMETRIC = dict(COSE_ALGORITHMS_MAC, **COSE_ALGORITHMS_CEK)
COSE_ALGORITHMS_SYMMETRIC = {**COSE_ALGORITHMS_MAC, **COSE_ALGORITHMS_CEK}

# COSE Algorithms for RSA Keys.
COSE_ALGORITHMS_RSA = dict(COSE_ALGORITHMS_SIG_RSA)
COSE_ALGORITHMS_RSA = {**COSE_ALGORITHMS_SIG_RSA}

# COSE Algorithms for RSA Keys.
COSE_ALGORITHMS_SIGNATURE = {
**COSE_ALGORITHMS_SIG_OKP,
**COSE_ALGORITHMS_SIG_EC2,
**COSE_ALGORITHMS_SIG_RSA,
}

# All of Supported COSE Algorithms.
COSE_ALGORITHMS = dict(COSE_ALGORITHMS_RSA, **COSE_ALGORITHMS_SYMMETRIC)
COSE_ALGORITHMS = {**COSE_ALGORITHMS_SIGNATURE, **COSE_ALGORITHMS_SYMMETRIC}

# COSE Named Algorithms for converting from JWK-like key.
COSE_NAMED_ALGORITHMS_SUPPORTED = dict(JOSE_ALGORITHMS_SUPPORTED, **COSE_ALGORITHMS)
COSE_NAMED_ALGORITHMS_SUPPORTED = {**JOSE_ALGORITHMS_SUPPORTED, **COSE_ALGORITHMS}
27 changes: 15 additions & 12 deletions cwt/cose.py
Expand Up @@ -7,6 +7,7 @@
from .cose_key_interface import COSEKeyInterface
from .recipient_interface import RecipientInterface
from .recipients import Recipients
from .signer import Signer
from .utils import to_cose_header


Expand Down Expand Up @@ -119,9 +120,10 @@ def encode_and_mac(
def encode_and_sign(
self,
payload: bytes,
key: Union[COSEKeyInterface, List[COSEKeyInterface]],
key: Optional[COSEKeyInterface] = None,
protected: Optional[Union[dict, bytes]] = None,
unprotected: Optional[dict] = None,
signers: List[Signer] = [],
external_aad: bytes = b"",
out: str = "",
) -> Union[bytes, CBORTag]:
Expand All @@ -130,10 +132,13 @@ def encode_and_sign(
Args:
payload (bytes): A content to be signed.
key (Union[COSEKeyInterface, List[COSEKeyInterface]]): One or more COSE keys as signing keys.
key (COSEKeyInterface): A signing key for single signer cases. When the ``signers``
parameter is set, this ``key`` will be ignored and should not be set.
protected (Optional[Union[dict, bytes]]): Parameters that are to be cryptographically
protected.
unprotected (Optional[dict]): Parameters that are not cryptographically protected.
signers (Optional[List[Signer]]): A list of signer information objects for multiple
signer cases.
external_aad(bytes): External additional authenticated data supplied by application.
out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If ``"cbor2/CBORTag"``
is specified. This function will return encoded data as
Expand All @@ -150,9 +155,9 @@ def encode_and_sign(
)
u = to_cose_header(unprotected)

ctx = "Signature" if not isinstance(key, COSEKeyInterface) else "Signature1"
if isinstance(key, COSEKeyInterface) and isinstance(p, dict):
if self._alg_auto_inclusion:
ctx = "Signature" if signers else "Signature1"
if not signers and key is not None:
if isinstance(p, dict) and self._alg_auto_inclusion:
p[1] = key.alg
if self._kid_auto_inclusion and key.kid:
u[4] = key.kid
Expand All @@ -164,20 +169,18 @@ def encode_and_sign(
b_protected = self._dumps(p) if p else b""

# Signature1
if isinstance(key, COSEKeyInterface):
if not signers and key is not None:
sig_structure = [ctx, b_protected, external_aad, payload]
sig = key.sign(self._dumps(sig_structure))
res = CBORTag(18, [b_protected, u, payload, sig])
return res if out == "cbor2/CBORTag" else self._dumps(res)

# Signature
sigs = []
for k in key:
p_header = self._dumps({1: k.alg})
u_header = {4: k.kid} if k.kid else {}
sig_structure = [ctx, b_protected, p_header, external_aad, payload]
sig = k.sign(self._dumps(sig_structure))
sigs.append([p_header, u_header, sig])
for s in signers:
sig_structure = [ctx, b_protected, s.protected, external_aad, payload]
s.sign(self._dumps(sig_structure))
sigs.append([s.protected, s.unprotected, s.signature])
res = CBORTag(98, [b_protected, u, payload, sigs])
return res if out == "cbor2/CBORTag" else self._dumps(res)

Expand Down
25 changes: 17 additions & 8 deletions cwt/cwt.py
Expand Up @@ -11,6 +11,7 @@
from .cose_key_interface import COSEKeyInterface
from .exceptions import DecodeError, VerifyError
from .recipient_interface import RecipientInterface
from .signer import Signer

CWT_DEFAULT_EXPIRES_IN = 3600 # 1 hour
CWT_DEFAULT_LEEWAY = 60 # 1 min
Expand Down Expand Up @@ -84,6 +85,7 @@ def encode(
key: COSEKeyInterface,
nonce: bytes = b"",
recipients: Optional[List[RecipientInterface]] = None,
signers: List[Signer] = [],
tagged: bool = False,
) -> bytes:
"""
Expand Down Expand Up @@ -112,14 +114,14 @@ def encode(
EncodeError: Failed to encode the claims.
"""
if isinstance(claims, Claims):
return self._encode(claims, key, nonce, recipients, tagged)
return self._encode(claims, key, nonce, recipients, signers, tagged)
if isinstance(claims, str):
claims = claims.encode("utf-8")
if isinstance(claims, bytes):
try:
claims = Claims.from_json(claims, self._claim_names)
except ValueError:
return self._encode(claims, key, nonce, recipients, tagged)
return self._encode(claims, key, nonce, recipients, signers, tagged)
else:
# Following code causes mypy error:
# for k, v in claims.items():
Expand All @@ -133,7 +135,7 @@ def encode(
json_claims[k] = v
if json_claims:
claims = Claims.from_json(json_claims, self._claim_names)
return self._encode(claims, key, nonce, recipients, tagged)
return self._encode(claims, key, nonce, recipients, signers, tagged)

def encode_and_mac(
self,
Expand Down Expand Up @@ -174,7 +176,8 @@ def encode_and_mac(
def encode_and_sign(
self,
claims: Union[Claims, Dict[int, Any], bytes],
key: Union[COSEKeyInterface, List[COSEKeyInterface]],
key: Optional[COSEKeyInterface] = None,
signers: List[Signer] = [],
tagged: bool = False,
) -> bytes:
"""
Expand All @@ -183,8 +186,11 @@ def encode_and_sign(
Args:
claims (Claims, Union[Dict[int, Any], bytes]): A CWT claims object or byte
string.
key (Union[COSEKeyInterface, List[COSEKeyInterface]]): A COSE key or a list of the keys used
to sign claims.
key (Optional[COSEKeyInterface]): A COSE key or a list of the keys used to sign claims.
When the ``signers`` parameter is set, this ``key`` parameter will be ignored and
should not be set.
signers (List[Signer]): A list of signer information objects for multiple
signer cases.
tagged (bool): An indicator whether the response is wrapped by CWT tag(61)
or not.
Returns:
Expand All @@ -199,7 +205,9 @@ def encode_and_sign(
claims = claims.to_dict()
self._set_default_value(claims)
b_claims = self._dumps(claims)
res = self._cose.encode_and_sign(b_claims, key, {}, {}, out="cbor2/CBORTag")
res = self._cose.encode_and_sign(
b_claims, key, {}, {}, signers=signers, out="cbor2/CBORTag"
)
if tagged:
return self._dumps(CBORTag(CWT.CBOR_TAG, res))
return self._dumps(res)
Expand Down Expand Up @@ -310,12 +318,13 @@ def _encode(
key: COSEKeyInterface,
nonce: bytes = b"",
recipients: Optional[List[RecipientInterface]] = None,
signers: List[Signer] = [],
tagged: bool = False,
) -> bytes:
if COSE_KEY_OPERATION_VALUES["sign"] in key.key_ops:
if [ops for ops in key.key_ops if ops in [3, 4, 9, 10]]:
raise ValueError("The key operation could not be specified.")
return self.encode_and_sign(claims, key, tagged)
return self.encode_and_sign(claims, key, signers, tagged)
if COSE_KEY_OPERATION_VALUES["encrypt"] in key.key_ops:
if [ops for ops in key.key_ops if ops in [1, 2, 9, 10]]:
raise ValueError("The key operation could not be specified.")
Expand Down

0 comments on commit 7687f2a

Please sign in to comment.