Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple signer #35

Merged
merged 4 commits into from Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 19 additions & 9 deletions cwt/cose.py
Expand Up @@ -82,11 +82,11 @@ def encode_and_sign(
sigs = []
for k in key:
p_header = self._dumps({1: k.alg})
u_header = self._dumps({4: k.kid} if k.kid else {})
u_header = {4: k.kid} if k.kid else {}
sig_structure = [ctx, b_protected, p_header, b"", b_payload]
sig = k.sign(self._dumps(sig_structure))
sigs.append(self._dumps([p_header, u_header, sig]))
res = CBORTag(18, [b_protected, unprotected, b_payload, sigs])
sigs.append([p_header, u_header, sig])
res = CBORTag(98, [b_protected, unprotected, b_payload, sigs])
return res if out == "cbor2/CBORTag" else self._dumps(res)

def encode_and_encrypt(
Expand Down Expand Up @@ -154,8 +154,10 @@ def decode(self, data: Union[bytes, CBORTag], key: COSEKey) -> Dict[int, Any]:
if not isinstance(data.value, list) or len(data.value) != 4:
raise ValueError("Invalid Signature1 format.")

msg = self._dumps(["Signature1", data.value[0], b"", data.value[2]])
key.verify(msg, data.value[3])
to_be_signed = self._dumps(
["Signature1", data.value[0], b"", data.value[2]]
)
key.verify(to_be_signed, data.value[3])
return self._loads(data.value[2])

# Signature
Expand All @@ -165,8 +167,16 @@ def decode(self, data: Union[bytes, CBORTag], key: COSEKey) -> Dict[int, Any]:
sigs = data.value[3]
if not isinstance(sigs, list):
raise ValueError("Invalid Signature format.")

msg = self._dumps(["Signature", data.value[0], b"", data.value[2]])
raise NotImplementedError()

for sig in sigs:
if not isinstance(sig, list) or len(sig) != 3:
raise ValueError("Invalid Signature format.")
uh = sig[1]
if uh[4] != key.kid:
continue
to_be_signed = self._dumps(
["Signature", data.value[0], sig[0], b"", data.value[2]]
)
key.verify(to_be_signed, sig[2])
return self._loads(data.value[2])
raise ValueError("Verification key not found.")
raise ValueError(f"Unsupported or unknown CBOR tag({data.tag}).")
83 changes: 81 additions & 2 deletions tests/test_cwt.py
Expand Up @@ -309,18 +309,58 @@ def test_cwt_encode_and_sign_with_tagged(self, ctx):
assert 2 in decoded and decoded[2] == "someone"
assert 7 in decoded and decoded[7] == b"123"

def test_cwt_encode_and_sign_with_multiple_signatures(self, ctx):
""""""
with open(key_path("private_key_es256.pem")) as key_file:
private_key_1 = cose_key.from_pem(key_file.read(), kid="1")
with open(key_path("public_key_es256.pem")) as key_file:
public_key_1 = cose_key.from_pem(key_file.read(), kid="1")
with open(key_path("private_key_ed25519.pem")) as key_file:
private_key_2 = cose_key.from_pem(key_file.read(), kid="2")
with open(key_path("public_key_ed25519.pem")) as key_file:
public_key_2 = cose_key.from_pem(key_file.read(), kid="2")
token = ctx.encode_and_sign(
{1: "https://as.example", 2: "someone", 7: b"123"},
[private_key_1, private_key_2],
)
decoded = ctx.decode(token, public_key_1)
assert isinstance(decoded, dict)
assert 1 in decoded and decoded[1] == "https://as.example"
decoded = ctx.decode(token, public_key_2)
assert isinstance(decoded, dict)
assert 1 in decoded and decoded[1] == "https://as.example"

def test_cwt_encode_and_encrypt_with_invalid_nonce(self, ctx):
""""""
enc_key = cose_key.from_symmetric_key(token_bytes(16), alg="AES-CCM-16-64-128")
with pytest.raises(ValueError) as err:
res = ctx.encode_and_encrypt(
ctx.encode_and_encrypt(
{1: "https://as.example", 2: "someone", 7: b"123"},
enc_key,
nonce=token_bytes(7), # should be 13
)
pytest.fail("encode_and_encrypt should fail: res=%s" % vars(res))
pytest.fail("encode_and_encrypt should fail.")
assert "The length of nonce should be" in str(err.value)

def test_cwt_encode_and_sign_with_signatures_kid_mismatch(self, ctx):
""""""
with open(key_path("private_key_es256.pem")) as key_file:
private_key_1 = cose_key.from_pem(key_file.read(), kid="1")
with open(key_path("public_key_es256.pem")) as key_file:
public_key_1 = cose_key.from_pem(key_file.read(), kid="3")
with open(key_path("private_key_ed25519.pem")) as key_file:
private_key_2 = cose_key.from_pem(key_file.read(), kid="2")
with open(key_path("public_key_ed25519.pem")) as key_file:
cose_key.from_pem(key_file.read(), kid="2")
token = ctx.encode_and_sign(
{1: "https://as.example", 2: "someone", 7: b"123"},
[private_key_1, private_key_2],
)
with pytest.raises(ValueError) as err:
ctx.decode(token, public_key_1)
pytest.fail("decode should fail.")
assert "Verification key not found." in str(err.value)

def test_cwt_decode_with_invalid_mac_key(self, ctx):
""""""
key = cose_key.from_symmetric_key("mysecret")
Expand Down Expand Up @@ -390,6 +430,45 @@ def test_cwt_decode_with_invalid_tagged_cwt(self, ctx):
pytest.fail("decode should fail.")
assert "Unsupported or unknown CBOR tag(62)." in str(err.value)

@pytest.mark.parametrize(
"invalid, msg",
[
(
cbor2.dumps(CBORTag(98, [])),
"Invalid Signature format.",
),
(
cbor2.dumps(CBORTag(98, {})),
"Invalid Signature format.",
),
(
cbor2.dumps(CBORTag(98, b"")),
"Invalid Signature format.",
),
(
cbor2.dumps(CBORTag(98, 123)),
"Invalid Signature format.",
),
(
cbor2.dumps(CBORTag(98, [b"", b"", b"", b""])),
"Invalid Signature format.",
),
(
cbor2.dumps(CBORTag(98, [b"", b"", b"", [b""]])),
"Invalid Signature format.",
),
],
)
def test_cwt_decode_with_invalid_sinatures(self, ctx, invalid, msg):
""""""
with open(key_path("public_key_es256.pem")) as key_file:
public_key = cose_key.from_pem(key_file.read(), kid="1")

with pytest.raises(ValueError) as err:
ctx.decode(invalid, public_key)
pytest.fail("decode should fail.")
assert msg in str(err.value)

@pytest.mark.parametrize(
"claims",
[
Expand Down