/
jwt_auth.py
139 lines (115 loc) · 5.92 KB
/
jwt_auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from django.conf import settings
from django.utils import timezone
from rest_framework import exceptions, serializers
from rest_framework.authentication import CSRFCheck
from rest_framework_simplejwt.authentication import JWTAuthentication
from rest_framework_simplejwt.serializers import TokenRefreshSerializer
def set_jwt_access_cookie(response, access_token):
from rest_framework_simplejwt.settings import api_settings as jwt_settings
cookie_name = getattr(settings, 'JWT_AUTH_COOKIE', None)
access_token_expiration = (timezone.now() + jwt_settings.ACCESS_TOKEN_LIFETIME)
cookie_secure = getattr(settings, 'JWT_AUTH_SECURE', False)
cookie_httponly = getattr(settings, 'JWT_AUTH_HTTPONLY', True)
cookie_samesite = getattr(settings, 'JWT_AUTH_SAMESITE', 'Lax')
if cookie_name:
response.set_cookie(
cookie_name,
access_token,
expires=access_token_expiration,
secure=cookie_secure,
httponly=cookie_httponly,
samesite=cookie_samesite,
)
def set_jwt_refresh_cookie(response, refresh_token):
from rest_framework_simplejwt.settings import api_settings as jwt_settings
refresh_token_expiration = (timezone.now() + jwt_settings.REFRESH_TOKEN_LIFETIME)
refresh_cookie_name = getattr(settings, 'JWT_AUTH_REFRESH_COOKIE', None)
refresh_cookie_path = getattr(settings, 'JWT_AUTH_REFRESH_COOKIE_PATH', '/')
cookie_secure = getattr(settings, 'JWT_AUTH_SECURE', False)
cookie_httponly = getattr(settings, 'JWT_AUTH_HTTPONLY', True)
cookie_samesite = getattr(settings, 'JWT_AUTH_SAMESITE', 'Lax')
if refresh_cookie_name:
response.set_cookie(
refresh_cookie_name,
refresh_token,
expires=refresh_token_expiration,
secure=cookie_secure,
httponly=cookie_httponly,
samesite=cookie_samesite,
path=refresh_cookie_path,
)
def set_jwt_cookies(response, access_token, refresh_token):
set_jwt_access_cookie(response, access_token)
set_jwt_refresh_cookie(response, refresh_token)
def unset_jwt_cookies(response):
cookie_name = getattr(settings, 'JWT_AUTH_COOKIE', None)
refresh_cookie_name = getattr(settings, 'JWT_AUTH_REFRESH_COOKIE', None)
refresh_cookie_path = getattr(settings, 'JWT_AUTH_REFRESH_COOKIE_PATH', '/')
if cookie_name:
response.delete_cookie(cookie_name)
if refresh_cookie_name:
response.delete_cookie(refresh_cookie_name, path=refresh_cookie_path)
class CookieTokenRefreshSerializer(TokenRefreshSerializer):
refresh = serializers.CharField(required=False, help_text='WIll override cookie.')
def extract_refresh_token(self):
request = self.context['request']
if 'refresh' in request.data and request.data['refresh'] != '':
return request.data['refresh']
cookie_name = getattr(settings, 'JWT_AUTH_REFRESH_COOKIE', None)
if cookie_name and cookie_name in request.COOKIES:
return request.COOKIES.get(cookie_name)
else:
from rest_framework_simplejwt.exceptions import InvalidToken
raise InvalidToken('No valid refresh token found.')
def validate(self, attrs):
attrs['refresh'] = self.extract_refresh_token()
return super().validate(attrs)
def get_refresh_view():
""" Returns a Token Refresh CBV without a circular import """
from rest_framework_simplejwt.settings import api_settings as jwt_settings
from rest_framework_simplejwt.views import TokenRefreshView
class RefreshViewWithCookieSupport(TokenRefreshView):
serializer_class = CookieTokenRefreshSerializer
def finalize_response(self, request, response, *args, **kwargs):
if response.status_code == 200 and 'access' in response.data:
set_jwt_access_cookie(response, response.data['access'])
response.data['access_token_expiration'] = (timezone.now() + jwt_settings.ACCESS_TOKEN_LIFETIME)
if response.status_code == 200 and 'refresh' in response.data:
set_jwt_refresh_cookie(response, response.data['refresh'])
return super().finalize_response(request, response, *args, **kwargs)
return RefreshViewWithCookieSupport
class JWTCookieAuthentication(JWTAuthentication):
"""
An authentication plugin that hopefully authenticates requests through a JSON web
token provided in a request cookie (and through the header as normal, with a
preference to the header).
"""
def enforce_csrf(self, request):
"""
Enforce CSRF validation for session based authentication.
"""
check = CSRFCheck()
# populates request.META['CSRF_COOKIE'], which is used in process_view()
check.process_request(request)
reason = check.process_view(request, None, (), {})
if reason:
# CSRF failed, bail with explicit error message
raise exceptions.PermissionDenied(f'CSRF Failed: {reason}')
def authenticate(self, request):
cookie_name = getattr(settings, 'JWT_AUTH_COOKIE', None)
header = self.get_header(request)
if header is None:
if cookie_name:
raw_token = request.COOKIES.get(cookie_name)
if getattr(settings, 'JWT_AUTH_COOKIE_ENFORCE_CSRF_ON_UNAUTHENTICATED', False): #True at your own risk
self.enforce_csrf(request)
elif raw_token is not None and getattr(settings, 'JWT_AUTH_COOKIE_USE_CSRF', False):
self.enforce_csrf(request)
else:
return None
else:
raw_token = self.get_raw_token(header)
if raw_token is None:
return None
validated_token = self.get_validated_token(raw_token)
return self.get_user(validated_token), validated_token