-
Notifications
You must be signed in to change notification settings - Fork 19
/
auth.py
208 lines (173 loc) · 7.77 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""Python Flask API Auth0 integration example
"""
import json
from typing import Optional, Mapping, List, Dict, Any, Set
import jwt
import requests
from jwt.algorithms import RSAAlgorithm
from xcube.webapi.errors import ServiceAuthError, ServiceConfigError
class AuthConfig:
def __init__(self, domain: str, audience: str, algorithms: List[str]):
self._domain = domain
self._audience = audience
self._algorithms = algorithms
@property
def domain(self) -> str:
return self._domain
@property
def issuer(self) -> str:
return f"https://{self.domain}/"
@property
def well_known_jwks(self) -> str:
return f"https://{self.domain}/.well-known/jwks.json"
@property
def audience(self) -> str:
return self._audience
@property
def algorithms(self) -> List[str]:
return self._algorithms
@classmethod
def from_config(cls, config: Dict[str, Any]) -> Optional['AuthConfig']:
authentication = config.get('Authentication')
if not authentication:
return None
domain = authentication.get('Domain')
if not domain:
raise ServiceConfigError('Missing key "Domain" in section "Authentication"')
audience = authentication.get('Audience')
if not audience:
raise ServiceConfigError('Missing key "Audience" in section "Authentication"')
algorithms = authentication.get('Algorithms', ['RS256'])
if not algorithms:
raise ServiceConfigError('Value for key "Algorithms" in section "Authentication" must not be empty')
return AuthConfig(domain, audience, algorithms)
class AuthMixin:
@property
def auth_config(self) -> Optional[AuthConfig]:
# noinspection PyUnresolvedReferences
return AuthConfig.from_config(self.service_context.config)
@property
def granted_scopes(self) -> Set[str]:
id_token = self.get_id_token(require_auth=False)
if not id_token:
return set()
return set(id_token.get('permissions', []))
def get_access_token(self, require_auth: bool = False) -> Optional[str]:
"""Obtains the access token from the Authorization Header
"""
# noinspection PyUnresolvedReferences
auth = self.request.headers.get("Authorization", None)
if not auth:
if require_auth:
raise ServiceAuthError("Authorization header missing",
log_message="Authorization header is expected")
return None
parts = auth.split()
if parts[0].lower() != "bearer":
raise ServiceAuthError("Invalid header",
log_message='Authorization header must start with "Bearer"')
elif len(parts) == 1:
raise ServiceAuthError("Invalid header",
log_message="Bearer token not found")
elif len(parts) > 2:
raise ServiceAuthError("Invalid header",
log_message="Authorization header must be Bearer token")
return parts[1]
def get_id_token(self, require_auth: bool = False) -> Optional[Mapping[str, str]]:
"""
Decodes the access token is valid.
"""
access_token = self.get_access_token(require_auth=require_auth)
if access_token is None:
return None
auth_config = self.auth_config
if auth_config is None:
if require_auth:
raise ServiceAuthError("Invalid header",
log_message="Received access token, "
"but this server doesn't support authentication.")
return None
try:
unverified_header = jwt.get_unverified_header(access_token)
except jwt.InvalidTokenError:
raise ServiceAuthError("Invalid header",
log_message="Invalid header. Use an RS256 signed JWT Access Token")
if unverified_header["alg"] != "RS256": # e.g. "HS256"
raise ServiceAuthError("Invalid header",
log_message="Invalid header. Use an RS256 signed JWT Access Token")
# TODO: read jwks from cache
response = requests.get(auth_config.well_known_jwks)
jwks = json.loads(response.content)
rsa_key = {}
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"]
}
break
if rsa_key:
try:
id_token = jwt.decode(
access_token,
# TODO: this is stupid: we convert rsa_key to JWT JSON only to produce the public key JSON string
RSAAlgorithm.from_jwk(json.dumps(rsa_key)),
algorithms=auth_config.algorithms,
audience=auth_config.audience,
issuer=auth_config.issuer
)
except jwt.ExpiredSignatureError:
raise ServiceAuthError("Token expired",
log_message="Token is expired")
except jwt.InvalidTokenError:
raise ServiceAuthError("Invalid claims",
log_message="Incorrect claims, please check the audience and issuer")
except Exception:
raise ServiceAuthError("Invalid header",
log_message="Unable to parse authentication token.")
return id_token
raise ServiceAuthError("Invalid header",
log_message="Unable to find appropriate key")
def assert_scopes(required_scopes: Set[str],
granted_scopes: Set[str]):
"""
Assert scopes.
Raise ServiceAuthError if one of *required_scopes* is not in *granted_scopes*.
:param required_scopes: The list of required scopes
:param granted_scopes: The set of granted scopes
"""
missing_scope = _get_missing_scope(required_scopes, granted_scopes)
if missing_scope is not None:
raise ServiceAuthError('Missing permission', log_message=f'Missing permission {missing_scope}')
def check_scopes(required_scopes: Set[str],
granted_scopes: Set[str],
is_substitute: bool = False) -> bool:
"""
Check scopes.
This function is used to filter out a resource's sub-resources for which a given client has no permission.
If one of *required_scopes* is not in *granted_scopes*, fail.
If *granted_scopes* exists and *is_substitute*, fail too.
Else succeed.
:param required_scopes: The list of required scopes
:param granted_scopes: The set of granted scopes
:param is_substitute: True, if the resource to be checked is a substitute.
:return: True, if scopes are ok.
"""
return _get_missing_scope(required_scopes, granted_scopes, is_substitute=is_substitute) is None
def _get_missing_scope(required_scopes: Set[str],
granted_scopes: Set[str],
is_substitute: bool = False) -> Optional[str]:
for required_scope in required_scopes:
if required_scope not in granted_scopes:
# If the required scope is not a granted scope, fail
return required_scope
# If there are granted scopes then the client is authorized,
# hence fail for substitute resources (e.g. demo resources) as there is usually
# a better (non-demo) resource that replaces it.
if granted_scopes and is_substitute:
return '<is_substitute>'
# All ok.
return None