-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(authentication): implement JWT authentication
- Users get JWT token on successful registration - Users get JWT token on successful login - Users need JWT token to access endpoints requiring authentication [finishes #164046245]
- Loading branch information
Showing
9 changed files
with
245 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,84 @@ | ||
# import jwt | ||
# | ||
# from django.conf import settings | ||
# | ||
# from rest_framework import authentication, exceptions | ||
# | ||
# from .models import User | ||
import jwt | ||
|
||
from django.conf import settings | ||
|
||
from rest_framework import authentication, exceptions | ||
|
||
from .models import User | ||
|
||
|
||
"""Configure JWT Here""" | ||
|
||
|
||
class JWTAuthentication(authentication.BaseAuthentication): | ||
auth_header_prefix = 'Bearer'.lower() | ||
|
||
def authenticate(self, request): | ||
""" | ||
This method will be called everytime an endpoint is accessed. | ||
This method can return 'None' when we want authentication to | ||
fail, for example when no authentication credentials are provided. | ||
It can also return `(user, token)` when authentication is successful. | ||
If we encounter an error, we raise an `AuthenticationFailed` error. | ||
""" | ||
request.user = None | ||
|
||
# we need the auth_header, which should be a list containing two | ||
# elements: 1) the name of the authentication header ('Bearer' in our | ||
# case) and 2) the JWT token. | ||
auth_header = authentication.get_authorization_header(request).split() | ||
|
||
if not auth_header: | ||
# if we get no authentication header, we do not attempt to | ||
# authenticate | ||
return None | ||
|
||
if len(auth_header) == 1: | ||
# We expect the length to be 2, so this is an invalid header. Do | ||
# not attempt to authenticate. | ||
return None | ||
|
||
elif len(auth_header) > 2: | ||
# Invalid token header. The length must be 2. Do not attempt | ||
# to authenticate | ||
return None | ||
|
||
# We have to decode both the prefix and token because they are in bytes, | ||
# and the JWT library we use can't handly bytes. | ||
prefix = auth_header[0].decode('utf-8') | ||
token = auth_header[1].decode('utf-8') | ||
|
||
if prefix.lower() != self.auth_header_prefix: | ||
# The auth header prefix should only be 'Bearer'. If otherwise, | ||
# don't attempt to authenticate | ||
return None | ||
|
||
# We can now attempt to authenticate after performing the above checks. | ||
return self._authenticate_credentials(request, token) | ||
|
||
def _authenticate_credentials(self, request, token): | ||
""" | ||
We will try to authenticate the token. If authentication is successful | ||
we return (user, token), otherwise we return an `AuthenticationFailed` | ||
error. | ||
""" | ||
try: | ||
payload = jwt.decode(token, settings.SECRET_KEY) | ||
except Exception: | ||
msg = 'Ivalid token provided. Authentication failure.' | ||
raise exceptions.AuthenticationFailed(msg) | ||
|
||
try: | ||
user = User.objects.get(pk=payload['id']) | ||
except User.DoesNotExist: | ||
msg = 'User matching this token was not found.' | ||
raise exceptions.AuthenticationFailed(msg) | ||
|
||
if not user.is_active: | ||
msg = 'Forbidden! This user has been deactivated.' | ||
raise exceptions.AuthenticationFailed(msg) | ||
|
||
return (user, token) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
from django.test import TestCase | ||
from django.urls import reverse | ||
|
||
from rest_framework import status | ||
from rest_framework.test import APIClient, APIRequestFactory | ||
|
||
from authors.apps.authentication.models import User | ||
from authors.apps.authentication.backends import JWTAuthentication | ||
|
||
|
||
class JWTAuthenticationTest(TestCase): | ||
"""Test the JWT Authentication implementation""" | ||
def setUp(self): | ||
self.user = User.objects.create(username='user1', email='user1@mail.com', password='password') | ||
self.login_data = {'user': { | ||
'email': 'user2@mail.com', | ||
'password': 'password' | ||
}} | ||
self.user_token = self.user.token | ||
self.user.save() | ||
self.client = APIClient() | ||
|
||
def test_user_gets_a_token_when_they_log_in(self): | ||
"""Users should get a token when they successfully log in""" | ||
self.client.post(reverse('authentication:register'), {'user': { | ||
'email': 'user2@mail.com', | ||
'username': 'user2', | ||
'password': 'password' | ||
}}, format='json') | ||
res = self.client.post(reverse('authentication:login'), self.login_data, format='json') | ||
self.assertEqual(res.status_code, status.HTTP_200_OK) | ||
self.assertIn('token', res.data) | ||
|
||
def test_if_user_passes_valid_token_to_access_secured_endpoint(self): | ||
"""Test if a user can access a secured endpoint after providing a valid token""" | ||
headers = {'HTTP_AUTHORIZATION': f'Bearer {self.user_token}'} | ||
res = self.client.get(reverse('authentication:get users'), **headers) | ||
self.assertEqual(res.status_code, status.HTTP_200_OK) | ||
|
||
def test_failure_if_user_passes_no_token(self): | ||
"""Test if a user can access a secured endpoint without providing a token""" | ||
res = self.client.get(reverse('authentication:get users')) | ||
self.assertEqual(res.status_code, status.HTTP_403_FORBIDDEN) | ||
self.assertEqual(res.data['detail'], 'Authentication credentials were not provided.') | ||
|
||
def test_failure_if_user_provides_invalid_token(self): | ||
"""Test if an invalid token can be decoded""" | ||
fake_token = self.user_token + 'ivalid' | ||
headers = {'HTTP_AUTHORIZATION': f'Bearer {fake_token}'} | ||
res = self.client.get(reverse('authentication:get users'), **headers) | ||
self.assertEqual(res.status_code, status.HTTP_403_FORBIDDEN) | ||
self.assertEqual(res.data['detail'], 'Ivalid token provided. Authentication failure.') | ||
|
||
def test_failure_if_user_does_not_exist(self): | ||
"""We register a user to get the token, then delete the user from the database. When a user tries to pass the token to access the endpoint, they should be forbidden from proceeding.""" | ||
test_user = User.objects.create(username='test_user', email='test_user@mail.com', password='password') | ||
test_token = test_user.token | ||
test_user.delete() | ||
client = APIClient() | ||
headers = {'HTTP_AUTHORIZATION': f'Bearer {test_token}'} | ||
res = client.get(reverse('authentication:get users'), **headers) | ||
self.assertEqual(res.status_code, status.HTTP_403_FORBIDDEN) | ||
self.assertEqual(res.data['detail'], 'User matching this token was not found.') | ||
|
||
def test_failure_because_user_is_inactive(self): | ||
"""Test if an inactive user can be authenticated""" | ||
inactive_user = User.objects.create(username='inactive_one', email='inactive@mail.com', password='password') | ||
inactive_user.is_active = False | ||
inactive_user.save() | ||
headers = {'HTTP_AUTHORIZATION': f'Bearer {inactive_user.token}'} | ||
res = self.client.get(reverse('authentication:get users'), **headers) | ||
self.assertEqual(res.status_code, status.HTTP_403_FORBIDDEN) | ||
self.assertEqual(res.data['detail'], 'Forbidden! This user has been deactivated.') | ||
|
||
def test_authentication_failure_because_header_is_None(self): | ||
"""Test if authentication fails when a request has authorization | ||
headers with a length of 0""" | ||
jwt_auth = JWTAuthentication() | ||
factory = APIRequestFactory() | ||
request = factory.get(reverse('authentication:get users')) | ||
request.META['HTTP_AUTHORIZATION'] = '' | ||
res = jwt_auth.authenticate(request) | ||
self.assertEqual(res, None) | ||
|
||
def test_authentication_failure_because_header_length_is_1(self): | ||
"""Test if authentication fails when a request has authorization | ||
headers with a length of 1""" | ||
jwt_auth = JWTAuthentication() | ||
factory = APIRequestFactory() | ||
request = factory.get(reverse('authentication:get users')) | ||
request.META['HTTP_AUTHORIZATION'] = 'length' | ||
res = jwt_auth.authenticate(request) | ||
self.assertEqual(res, None) | ||
|
||
def test_authentication_failure_if_header_length_is_greater_than_2(self): | ||
"""Test if authentication fails when a request has authorization | ||
headers with a length greater than 2""" | ||
jwt_auth = JWTAuthentication() | ||
factory = APIRequestFactory() | ||
request = factory.get(reverse('authentication:get users')) | ||
request.META['HTTP_AUTHORIZATION'] = b'length is greater than 2' | ||
res = jwt_auth.authenticate(request) | ||
self.assertEqual(res, None) | ||
|
||
def test_authentication_failure_if_prefixes_mismatch(self): | ||
"""We unit test our authentication method to see if the method | ||
returns `None` when the prefixes mismatch""" | ||
jwt_auth = JWTAuthentication() | ||
factory = APIRequestFactory() | ||
request = factory.get(reverse('authentication:get users')) | ||
request.META['HTTP_AUTHORIZATION'] = 'Token, {}'.format(self.user_token) | ||
res = jwt_auth.authenticate(request) | ||
self.assertEqual(res, None) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters