diff --git a/jwt/jwa.py b/jwt/jwa.py index 2d40458..392daa1 100644 --- a/jwt/jwa.py +++ b/jwt/jwa.py @@ -31,21 +31,24 @@ class AbstractSigningAlgorithm: def sign(self, message: bytes, key: AbstractJWKBase) -> bytes: - raise NotImplementedError() + raise NotImplementedError() # pragma: no cover def verify(self, message: bytes, key: AbstractJWKBase, signature: bytes) -> bool: - raise NotImplementedError() + raise NotImplementedError() # pragma: no cover -class NoneSigningAlgorithm(AbstractSigningAlgorithm): +class NoneAlgorithm(AbstractSigningAlgorithm): def sign(self, message: bytes, key: AbstractJWKBase) -> bytes: return b'' def verify(self, message: bytes, key: AbstractJWKBase, signature: bytes) -> bool: - return signature == b'' + return hmac.compare_digest(signature, b'') + + +none = NoneAlgorithm() class HMACAlgorithm(AbstractSigningAlgorithm): @@ -108,7 +111,7 @@ def verify(self, message: bytes, key: AbstractJWKBase, def supported_signing_algorithms(): return { - 'none': NoneSigningAlgorithm(), + 'none': none, 'HS256': HS256, 'HS384': HS384, 'HS512': HS512, diff --git a/jwt/jwk.py b/jwt/jwk.py index 4f7ce36..9fea91e 100644 --- a/jwt/jwk.py +++ b/jwt/jwk.py @@ -140,14 +140,18 @@ def is_sign_key(self) -> bool: def sign(self, message: bytes, hash_fun: Callable = None, **options) -> bytes: - signer = self.keyobj.signer(padding.PKCS1_v1_5(), hash_fun()) + signer = self.keyobj.signer(padding.PKCS1v15(), hash_fun()) signer.update(message) return signer.finalize() def verify(self, message: bytes, signature: bytes, hash_fun: Callable = None, **options) -> bool: - verifier = self.keyobj.verifier( - signature, padding.PKCS1_v1_5(), hash_fun()) + if self.is_sign_key(): + pubkey = self.keyobj.public_key() + else: + pubkey = self.keyobj + verifier = pubkey.verifier( + signature, padding.PKCS1v15(), hash_fun()) verifier.update(message) try: verifier.verify() diff --git a/jwt/jws.py b/jwt/jws.py index d4018bc..60142b1 100644 --- a/jwt/jws.py +++ b/jwt/jws.py @@ -44,12 +44,15 @@ def _retrieve_alg(self, alg: str) -> AbstractSigningAlgorithm: def encode(self, message: bytes, key: AbstractJWKBase = None, alg='HS256', optional_headers: dict = None) -> str: - if alg not in self._supported_algs: + if alg not in self._supported_algs: # pragma: no cover raise JWSEncodeError('unsupported algorithm: {}'.format(alg)) alg_impl = self._retrieve_alg(alg) - header = {} - header_b64 = b64encode(json.dumps(header).encode('ascii')) + header = optional_headers and optional_headers.copy() or {} + header['alg'] = alg + + header_b64 = b64encode( + json.dumps(header, separators=(',', ':')).encode('ascii')) message_b64 = b64encode(message) signing_message = header_b64 + '.' + message_b64 diff --git a/jwt/jwt.py b/jwt/jwt.py index 9c986f0..675233b 100644 --- a/jwt/jwt.py +++ b/jwt/jwt.py @@ -37,6 +37,9 @@ def encode(self, payload: dict, key: AbstractJWKBase = None, alg='HS256', message = json.dumps(payload).encode('utf-8') except ValueError as why: raise JWTEncodeError('payload must be able to encode in JSON') + + optional_headers = optional_headers and optional_headers.copy() or {} + optional_headers['typ'] = 'JWT' try: return self._jws.encode(message, key, alg, optional_headers) except JWSEncodeError as why: diff --git a/jwt/tests/test_jwa.py b/jwt/tests/test_jwa.py new file mode 100644 index 0000000..4cd8648 --- /dev/null +++ b/jwt/tests/test_jwa.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2017 Gehirn Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from unittest import TestCase + +from jwt.jwa import ( + HS256, + none, +) +from jwt.jwk import jwk_from_dict +from jwt.utils import b64decode + +from .helper import load_testdata + + +class NoneTest(TestCase): + + def setUp(self): + self.message = ( + b'eyJhbGciOiJub25lIn0' + b'.' + b'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0dHA6Ly9leGFt' + b'cGxlLmNvbS9pc19yb290Ijp0cnVlfQ' + ) + + def test_sign(self): + signature = none.sign(self.message, None) + self.assertEqual(signature, b'') + + def test_verify(self): + self.assertTrue(none.verify(self.message, None, b'')) + + +class HS256Test(TestCase): + + def setUp(self): + self.key = jwk_from_dict(json.loads(load_testdata('oct.json', 'r'))) + self.signature = b64decode( + 'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk' + ) + + self.message = ( + b'eyJ0eXAiOiJKV1QiLA0KICJhbGciOiJIUzI1NiJ9' + b'.' + b'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0dHA6Ly9leGFt' + b'cGxlLmNvbS9pc19yb290Ijp0cnVlfQ' + ) + + def test_sign(self): + signature = HS256.sign(self.message, self.key) + self.assertEqual(signature, self.signature) + + def test_verify(self): + self.assertTrue(HS256.verify(self.message, self.key, self.signature)) diff --git a/jwt/tests/test_jws.py b/jwt/tests/test_jws.py new file mode 100644 index 0000000..6d48678 --- /dev/null +++ b/jwt/tests/test_jws.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2017 Gehirn Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from unittest import TestCase + +from jwt.jws import JWS +from jwt.jwk import jwk_from_dict + +from .helper import load_testdata + + +class JWSTest(TestCase): + + def setUp(self): + self.inst = JWS() + self.key = jwk_from_dict( + json.loads(load_testdata('rsa_privkey.json', 'r'))) + self.pubkey = jwk_from_dict( + json.loads(load_testdata('rsa_pubkey.json', 'r'))) + + self.message = ( + b'{"iss":"joe",\r\n' + b' "exp":1300819380,\r\n' + b' "http://example.com/is_root":true}' + ) + self.compact_jws = ( + 'eyJhbGciOiJSUzI1NiJ9' + '.' + 'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0dHA6Ly9leGFt' + 'cGxlLmNvbS9pc19yb290Ijp0cnVlfQ' + '.' + 'cC4hiUPoj9Eetdgtv3hF80EGrhuB__dzERat0XF9g2VtQgr9PJbu3XOiZj5RZmh7' + 'AAuHIm4Bh-0Qc_lF5YKt_O8W2Fp5jujGbds9uJdbF9CUAr7t1dnZcAcQjbKBYNX4' + 'BAynRFdiuB--f_nZLgrnbyTyWzO75vRK5h6xBArLIARNPvkSjtQBMHlb1L07Qe7K' + '0GarZRmB_eSN9383LcOLn6_dO--xi12jzDwusC-eOkHWEsqtFZESc6BfI7noOPqv' + 'hJ1phCnvWh6IeYI2w9QOYEUipUTI8np6LbgGY9Fs98rqVt5AXLIhWkWywlVmtVrB' + 'p0igcN_IoypGlUPQGe77Rw' + ) + + def test_encode(self): + compact_jws = self.inst.encode(self.message, self.key, alg='RS256') + self.assertEqual(compact_jws, self.compact_jws) + + def test_decode(self): + message = self.inst.decode(self.compact_jws, self.key) + self.assertEqual(message, self.message) + + def test_decode_pubkey(self): + message = self.inst.decode(self.compact_jws, self.pubkey) + self.assertEqual(message, self.message) diff --git a/jwt/tests/test_jwt.py b/jwt/tests/test_jwt.py new file mode 100644 index 0000000..cc82d90 --- /dev/null +++ b/jwt/tests/test_jwt.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2017 Gehirn Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from unittest import TestCase + +from jwt.jwk import jwk_from_dict +from jwt.jwt import JWT + +from .helper import load_testdata + + +class JWTTest(TestCase): + + def setUp(self): + self.inst = JWT() + self.key = jwk_from_dict( + json.loads(load_testdata('oct.json', 'r'))) + + self.message = { + 'iss': 'joe', + 'exp': 1300819380, + 'http://example.com/is_root': True, + } + + self.compact_jws = ( + 'eyJ0eXAiOiJKV1QiLA0KICJhbGciOiJIUzI1NiJ9' + '.' + 'eyJpc3MiOiJqb2UiLA0KICJleHAiOjEzMDA4MTkzODAsDQogImh0dHA6Ly9leGFt' + 'cGxlLmNvbS9pc19yb290Ijp0cnVlfQ' + '.' + 'dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk' + ) + + def test_decode(self): + message = self.inst.decode(self.compact_jws, self.key) + self.assertEqual(message, self.message) diff --git a/jwt/tests/test_utils.py b/jwt/tests/test_utils.py index 7d0aad6..4cc0ffa 100644 --- a/jwt/tests/test_utils.py +++ b/jwt/tests/test_utils.py @@ -1,4 +1,18 @@ # -*- coding: utf-8 -*- +# +# Copyright 2017 Gehirn Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from jwt.utils import ( b64encode,