-
Notifications
You must be signed in to change notification settings - Fork 660
/
authentication.py
178 lines (140 loc) · 6.08 KB
/
authentication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from typing import Optional, Set, Tuple, TypeVar
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AbstractBaseUser
from django.utils.translation import gettext_lazy as _
from rest_framework import HTTP_HEADER_ENCODING, authentication
from rest_framework.request import Request
from .exceptions import AuthenticationFailed, InvalidToken, TokenError
from .models import TokenUser
from .settings import api_settings
from .tokens import Token
from .utils import get_md5_hash_password
AUTH_HEADER_TYPES = api_settings.AUTH_HEADER_TYPES
if not isinstance(api_settings.AUTH_HEADER_TYPES, (list, tuple)):
AUTH_HEADER_TYPES = (AUTH_HEADER_TYPES,)
AUTH_HEADER_TYPE_BYTES: Set[bytes] = {
h.encode(HTTP_HEADER_ENCODING) for h in AUTH_HEADER_TYPES
}
AuthUser = TypeVar("AuthUser", AbstractBaseUser, TokenUser)
class JWTAuthentication(authentication.BaseAuthentication):
"""
An authentication plugin that authenticates requests through a JSON web
token provided in a request header.
"""
www_authenticate_realm = "api"
media_type = "application/json"
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.user_model = get_user_model()
def authenticate(self, request: Request) -> Optional[Tuple[AuthUser, Token]]:
header = self.get_header(request)
if header is None:
return None
raw_token = self.get_raw_token(header)
if raw_token is None:
return None
validated_token = self.get_validated_token(raw_token)
return self.get_user(validated_token), validated_token
def authenticate_header(self, request: Request) -> str:
return '{} realm="{}"'.format(
AUTH_HEADER_TYPES[0],
self.www_authenticate_realm,
)
def get_header(self, request: Request) -> bytes:
"""
Extracts the header containing the JSON web token from the given
request.
"""
header = request.META.get(api_settings.AUTH_HEADER_NAME)
if isinstance(header, str):
# Work around django test client oddness
header = header.encode(HTTP_HEADER_ENCODING)
return header
def get_raw_token(self, header: bytes) -> Optional[bytes]:
"""
Extracts an unvalidated JSON web token from the given "Authorization"
header value.
"""
parts = header.split()
if len(parts) == 0:
# Empty AUTHORIZATION header sent
return None
if parts[0] not in AUTH_HEADER_TYPE_BYTES:
# Assume the header does not contain a JSON web token
return None
if len(parts) != 2:
raise AuthenticationFailed(
_("Authorization header must contain two space-delimited values"),
code="bad_authorization_header",
)
return parts[1]
def get_validated_token(self, raw_token: bytes) -> Token:
"""
Validates an encoded JSON web token and returns a validated token
wrapper object.
"""
messages = []
for AuthToken in api_settings.AUTH_TOKEN_CLASSES:
try:
return AuthToken(raw_token)
except TokenError as e:
messages.append(
{
"token_class": AuthToken.__name__,
"token_type": AuthToken.token_type,
"message": e.args[0],
}
)
raise InvalidToken(
{
"detail": _("Given token not valid for any token type"),
"messages": messages,
}
)
def get_user(self, validated_token: Token) -> AuthUser:
"""
Attempts to find and return a user using the given validated token.
"""
try:
user_id = validated_token[api_settings.USER_ID_CLAIM]
except KeyError:
raise InvalidToken(_("Token contained no recognizable user identification"))
try:
user = self.user_model.objects.get(**{api_settings.USER_ID_FIELD: user_id})
except self.user_model.DoesNotExist:
raise AuthenticationFailed(_("User not found"), code="user_not_found")
if not user.is_active:
raise AuthenticationFailed(_("User is inactive"), code="user_inactive")
if api_settings.CHECK_REVOKE_TOKEN:
if validated_token.get(
api_settings.REVOKE_TOKEN_CLAIM
) != get_md5_hash_password(user.password):
raise AuthenticationFailed(
_("The user's password has been changed."), code="password_changed"
)
return user
class JWTStatelessUserAuthentication(JWTAuthentication):
"""
An authentication plugin that authenticates requests through a JSON web
token provided in a request header without performing a database lookup to obtain a user instance.
"""
def get_user(self, validated_token: Token) -> AuthUser:
"""
Returns a stateless user object which is backed by the given validated
token.
"""
if api_settings.USER_ID_CLAIM not in validated_token:
# The TokenUser class assumes tokens will have a recognizable user
# identifier claim.
raise InvalidToken(_("Token contained no recognizable user identification"))
return api_settings.TOKEN_USER_CLASS(validated_token)
JWTTokenUserAuthentication = JWTStatelessUserAuthentication
def default_user_authentication_rule(user: AuthUser) -> bool:
# Prior to Django 1.10, inactive users could be authenticated with the
# default `ModelBackend`. As of Django 1.10, the `ModelBackend`
# prevents inactive users from authenticating. App designers can still
# allow inactive users to authenticate by opting for the new
# `AllowAllUsersModelBackend`. However, we explicitly prevent inactive
# users from authenticating to enforce a reasonable policy and provide
# sensible backwards compatibility with older Django versions.
return user is not None and user.is_active