This repository has been archived by the owner on Nov 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 14
/
token_helper.py
112 lines (74 loc) · 2.94 KB
/
token_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import json
from structlog import get_logger
from jwcrypto import jwe, jwt
from jwcrypto.common import base64url_decode
from jwcrypto.jwe import InvalidJWEData
from jwcrypto.jws import InvalidJWSSignature, InvalidJWSObject
from jwcrypto.jwt import JWTInvalidClaimFormat, JWTMissingClaim, JWTExpired
from app.authentication.invalid_token_exception import InvalidTokenException
logger = get_logger()
def decrypt_jwe(encrypted_token, secret_store, purpose):
try:
jwe_token = jwe.JWE(algs=['RSA-OAEP', 'A256GCM'])
jwe_token.deserialize(encrypted_token)
jwe_kid = extract_kid_from_header(encrypted_token)
logger.info("Decrypting JWE", kid=jwe_kid)
private_jwk = secret_store.get_private_key_by_kid(purpose, jwe_kid).as_jwk()
jwe_token.decrypt(private_jwk)
return jwe_token.payload.decode()
except InvalidJWEData as e:
raise InvalidTokenException(repr(e))
def encrypt_jwe(payload, kid, secret_store, purpose, alg="RSA-OAEP", enc="A256GCM"):
public_jwk = secret_store.get_public_key_by_kid(purpose, kid).as_jwk()
protected_header = {
"alg": alg,
"enc": enc,
"kid": kid,
}
token = jwe.JWE(plaintext=payload, protected=protected_header)
token.add_recipient(public_jwk)
return token.serialize(compact=True)
def decode_jwt(jwt_token, secret_store, purpose, leeway=None):
try:
jwt_kid = extract_kid_from_header(jwt_token)
logger.info("Decoding JWT", kid=jwt_kid)
public_jwk = secret_store.get_public_key_by_kid(purpose, jwt_kid).as_jwk()
check_claims = {
"jti": None,
"exp": None,
"iat": None,
}
signed_token = jwt.JWT(algs=['RS256'], check_claims=check_claims)
if leeway:
signed_token.leeway = leeway
signed_token.deserialize(jwt_token, key=public_jwk)
return json.loads(signed_token.claims)
except (InvalidJWSObject,
InvalidJWSSignature,
JWTInvalidClaimFormat,
JWTMissingClaim,
JWTExpired,
ValueError) as e:
raise InvalidTokenException(repr(e))
def encode_jwt(claims, kid, secret_store, purpose):
private_jwk = secret_store.get_private_key_by_kid(purpose, kid).as_jwk()
header = {
'kid': kid,
'typ': 'jwt',
'alg': 'RS256',
}
token = jwt.JWT(claims=claims, header=header)
token.make_signed_token(private_jwk)
return token.serialize()
def extract_kid_from_header(token):
header = token.split('.')[:1][0]
if header is "":
raise InvalidTokenException("Missing Headers")
try:
protected_header = base64url_decode(header).decode()
protected_header_data = json.loads(protected_header)
if 'kid' in protected_header:
return protected_header_data['kid']
except ValueError:
raise InvalidTokenException("Invalid Header")
raise InvalidTokenException("Missing kid")