Skip to content

Commit

Permalink
fix: mypy lint for rfc7797
Browse files Browse the repository at this point in the history
  • Loading branch information
lepture committed Jul 17, 2023
1 parent 8a0a1d4 commit cc3ae9b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 20 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ jobs:

- name: Install dependencies
run: pip install -r requirements.txt

- name: flake8 lint
run: flake8 src/joserfc

- name: mypy lint
run: mypy

test:
needs: lint
runs-on: ubuntu-latest
Expand Down
12 changes: 6 additions & 6 deletions src/joserfc/rfc7515/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def extract_general_json(value: GeneralJSONSerialization) -> GeneralJSONSignatur
members = [__signature_to_member(sig) for sig in signatures]
obj = GeneralJSONSignature(members, payload)
obj.signatures = signatures
obj.segments.update({"payload": payload_segment})
obj.segments = {"payload": payload_segment}
return obj


Expand All @@ -98,17 +98,17 @@ def extract_flattened_json(value: FlattenedJSONSerialization) -> FlattenedJSONSi
payload = urlsafe_b64decode(payload_segment)
except (TypeError, ValueError, binascii.Error):
raise DecodeError("Invalid payload")
_sig: JSONSignatureDict = {
"protected": value["protected"],
"signature": value["signature"],
}

_sig: JSONSignatureDict = {"signature": value["signature"]}
if "protected" in value:
_sig["protected"] = value["protected"]
if "header" in value:
_sig["header"] = value["header"]

member = __signature_to_member(_sig)
obj = FlattenedJSONSignature(member, payload)
obj.signature = _sig
obj.segments.update({"payload": payload_segment})
obj.segments = {"payload": payload_segment}
return obj


Expand Down
28 changes: 15 additions & 13 deletions src/joserfc/rfc7797/json.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import typing as t
from ..rfc7515.json import verify_signature
from ..rfc7515.types import JSONSignatureDict
from ..jws import (
HeaderDict,
HeaderMember,
Expand All @@ -15,7 +17,6 @@
json_b64encode,
json_b64decode,
urlsafe_b64encode,
urlsafe_b64decode,
)
from ..errors import BadSignatureError
from .registry import JWSRegistry
Expand Down Expand Up @@ -53,7 +54,7 @@ def serialize_json(
signing_input = b".".join([protected_segment, to_bytes(payload)])
signature = urlsafe_b64encode(alg.sign(signing_input, key))

rv = {
rv: FlattenedJSONSerialization = {
"payload": to_unicode(payload),
"signature": to_unicode(signature),
}
Expand All @@ -80,17 +81,15 @@ def deserialize_json(
if headers["b64"] is True:
return _deserialize_json(value, public_key, registry=registry)

registry.check_header(headers)
key = guess_key(public_key, obj.member)
alg = registry.get_alg(headers["alg"])
signing_input = b".".join([obj.segments["header"], obj.payload])
sig = urlsafe_b64decode(obj.segments["signature"])
if not alg.verify(signing_input, sig, key):
payload_segment = obj.segments["payload"]
find_key = lambda d: guess_key(public_key, d)
assert obj.signature is not None
if not verify_signature(obj.member, obj.signature, payload_segment, registry, find_key):
raise BadSignatureError()
return obj


def _extract_json(value: FlattenedJSONSignature):
def _extract_json(value: FlattenedJSONSerialization):
if "signatures" in value:
return None

Expand All @@ -109,8 +108,11 @@ def _extract_json(value: FlattenedJSONSignature):

payload = to_bytes(value["payload"])
obj = FlattenedJSONSignature(member, payload)
obj.segments = {
"header": protected_segment,
"signature": to_bytes(value["signature"]),
}
_sig: JSONSignatureDict = {"signature": value["signature"]}
if "protected" in value:
_sig["protected"] = value["protected"]
if "header" in value:
_sig["header"] = value["header"]
obj.signature = _sig
obj.segments = {"payload": payload}
return obj

0 comments on commit cc3ae9b

Please sign in to comment.