This repository has been archived by the owner on Nov 13, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 29
/
__init__.py
224 lines (172 loc) · 8.74 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""
Functions for generating and verifying JSON Web Tokens.
"""
from datetime import datetime, timedelta
from calendar import timegm
from os import urandom
from jwcrypto.jws import JWS, JWSHeaderRegistry
from jwcrypto.common import base64url_encode, base64url_decode, \
json_encode, json_decode
class _JWTError(Exception):
""" Exception raised if claim doesn't pass. Private to this module because
jwcrypto throws many exceptions too. """
pass
def generate_jwt(claims, priv_key=None,
algorithm='PS512', lifetime=None, expires=None,
not_before=None,
jti_size=16, other_headers=None):
"""
Generate a JSON Web Token.
:param claims: The claims you want included in the signature.
:type claims: dict
:param priv_key: The private key to be used to sign the token. Note: if you pass ``None`` then the token will be returned with an empty cryptographic signature and :obj:`algorithm` will be forced to the value ``none``.
:type priv_key: `jwcrypto.jwk.JWK <https://jwcrypto.readthedocs.io/en/latest/jwk.html>`_
:param algorithm: The algorithm to use for generating the signature. ``RS256``, ``RS384``, ``RS512``, ``PS256``, ``PS384``, ``PS512``, ``ES256``, ``ES384``, ``ES512``, ``HS256``, ``HS384``, ``HS512`` and ``none`` are supported.
:type algorithm: str
:param lifetime: How long the token is valid for.
:type lifetime: datetime.timedelta
:param expires: When the token expires (if :obj:`lifetime` isn't specified)
:type expires: datetime.datetime
:param not_before: When the token is valid from. Defaults to current time (if ``None`` is passed).
:type not_before: datetime.datetime
:param jti_size: Size in bytes of the unique token ID to put into the token (can be used to detect replay attacks). Defaults to 16 (128 bits). Specify 0 or ``None`` to omit the JTI from the token.
:type jti_size: int
:param other_headers: Any headers other than "typ" and "alg" may be specified, they will be included in the header.
:type other_headers: dict
:rtype: unicode
:returns: The JSON Web Token. Note this includes a header, the claims and a cryptographic signature. The following extra claims are added, per the `JWT spec <http://self-issued.info/docs/draft-ietf-oauth-json-web-token.html>`_:
- **exp** (*IntDate*) -- The UTC expiry date and time of the token, in number of seconds from 1970-01-01T0:0:0Z UTC.
- **iat** (*IntDate*) -- The UTC date and time at which the token was generated.
- **nbf** (*IntDate*) -- The UTC valid-from date and time of the token.
- **jti** (*str*) -- A unique identifier for the token.
:raises:
ValueError: If other_headers contains either the "typ" or "alg" header
"""
header = {
'typ': 'JWT',
'alg': algorithm if priv_key else 'none'
}
if other_headers is not None:
redefined_keys = set(header.keys()) & set(other_headers.keys())
if redefined_keys:
raise ValueError('other_headers re-specified the headers: {}'.format(', '.join(redefined_keys)))
header.update(other_headers)
claims = dict(claims)
now = datetime.utcnow()
if jti_size:
claims['jti'] = base64url_encode(urandom(jti_size))
claims['nbf'] = timegm((not_before or now).utctimetuple())
claims['iat'] = timegm(now.utctimetuple())
if lifetime:
claims['exp'] = timegm((now + lifetime).utctimetuple())
elif expires:
claims['exp'] = timegm(expires.utctimetuple())
if header['alg'] == 'none':
signature = ''
else:
token = JWS(json_encode(claims))
token.add_signature(priv_key, protected=header)
signature = json_decode(token.serialize())['signature']
return u'%s.%s.%s' % (
base64url_encode(json_encode(header)),
base64url_encode(json_encode(claims)),
signature
)
#pylint: disable=R0912,too-many-locals
def verify_jwt(jwt,
pub_key=None,
allowed_algs=None,
iat_skew=timedelta(),
checks_optional=False,
ignore_not_implemented=False):
"""
Verify a JSON Web Token.
:param jwt: The JSON Web Token to verify.
:type jwt: str or unicode
:param pub_key: The public key to be used to verify the token. Note: if you pass ``None`` and **allowed_algs** contains ``none`` then the token's signature will not be verified.
:type pub_key: `jwcrypto.jwk.JWK <https://jwcrypto.readthedocs.io/en/latest/jwk.html>`_
:param allowed_algs: Algorithms expected to be used to sign the token. The ``in`` operator is used to test membership.
:type allowed_algs: list or NoneType (meaning an empty list)
:param iat_skew: The amount of leeway to allow between the issuer's clock and the verifier's clock when verifiying that the token was generated in the past. Defaults to no leeway.
:type iat_skew: datetime.timedelta
:param checks_optional: If ``False``, then the token must contain the **typ** header property and the **iat**, **nbf** and **exp** claim properties.
:type checks_optional: bool
:param ignore_not_implemented: If ``False``, then the token must *not* contain the **jku**, **jwk**, **x5u**, **x5c** or **x5t** header properties.
:type ignore_not_implemented: bool
:rtype: tuple
:returns: ``(header, claims)`` if the token was verified successfully. The token must pass the following tests:
- Its header must contain a property **alg** with a value in **allowed_algs**.
- Its signature must verify using **pub_key** (unless its algorithm is ``none`` and ``none`` is in **allowed_algs**).
- If the corresponding property is present or **checks_optional** is ``False``:
- Its header must contain a property **typ** with the value ``JWT``.
- Its claims must contain a property **iat** which represents a date in the past (taking into account :obj:`iat_skew`).
- Its claims must contain a property **nbf** which represents a date in the past.
- Its claims must contain a property **exp** which represents a date in the future.
:raises: If the token failed to verify.
"""
if allowed_algs is None:
allowed_algs = []
if not isinstance(allowed_algs, list):
# jwcrypto only supports list of allowed algorithms
raise _JWTError('allowed_algs must be a list')
header, claims, _ = jwt.split('.')
parsed_header = json_decode(base64url_decode(header))
alg = parsed_header.get('alg')
if alg is None:
raise _JWTError('alg header not present')
if alg not in allowed_algs:
raise _JWTError('algorithm not allowed: ' + alg)
if not ignore_not_implemented:
for k in parsed_header:
if k not in JWSHeaderRegistry:
raise _JWTError('unknown header: ' + k)
if not JWSHeaderRegistry[k].supported:
raise _JWTError('header not implemented: ' + k)
if pub_key:
token = JWS()
token.allowed_algs = allowed_algs
token.deserialize(jwt, pub_key)
elif 'none' not in allowed_algs:
raise _JWTError('no key but none alg not allowed')
parsed_claims = json_decode(base64url_decode(claims))
utcnow = datetime.utcnow()
now = timegm(utcnow.utctimetuple())
typ = parsed_header.get('typ')
if typ is None:
if not checks_optional:
raise _JWTError('typ header not present')
elif typ != 'JWT':
raise _JWTError('typ header is not JWT')
iat = parsed_claims.get('iat')
if iat is None:
if not checks_optional:
raise _JWTError('iat claim not present')
elif iat > timegm((utcnow + iat_skew).utctimetuple()):
raise _JWTError('issued in the future')
nbf = parsed_claims.get('nbf')
if nbf is None:
if not checks_optional:
raise _JWTError('nbf claim not present')
elif nbf > now:
raise _JWTError('not yet valid')
exp = parsed_claims.get('exp')
if exp is None:
if not checks_optional:
raise _JWTError('exp claim not present')
elif exp <= now:
raise _JWTError('expired')
return parsed_header, parsed_claims
#pylint: enable=R0912
def process_jwt(jwt):
"""
Process a JSON Web Token without verifying it.
Call this before :func:`verify_jwt` if you need access to the header or claims in the token before verifying it. For example, the claims might identify the issuer such that you can retrieve the appropriate public key.
:param jwt: The JSON Web Token to verify.
:type jwt: str or unicode
:rtype: tuple
:returns: ``(header, claims)``
"""
header, claims, _ = jwt.split('.')
parsed_header = json_decode(base64url_decode(header))
parsed_claims = json_decode(base64url_decode(claims))
return parsed_header, parsed_claims