-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
182 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,69 @@ | ||
# import jwt | ||
# | ||
# from django.conf import settings | ||
# | ||
# from rest_framework import authentication, exceptions | ||
# | ||
# from .models import User | ||
""" | ||
JWT Configuration module | ||
""" | ||
import jwt | ||
import json | ||
|
||
"""Configure JWT Here""" | ||
from django.conf import settings | ||
from django.http import HttpResponse | ||
|
||
from rest_framework import authentication, exceptions | ||
from rest_framework.authentication import get_authorization_header, BaseAuthentication | ||
|
||
class JWTAuthentication: | ||
def authenticate(self, argument2): | ||
pass | ||
from .models import User | ||
from authors.settings import SECRET_KEY | ||
|
||
def authenticate_header(self, argument2): | ||
pass | ||
|
||
class JWTAuthentication(BaseAuthentication): | ||
""" | ||
JWT Authentication helper class | ||
:param: BaseAuthentication: | ||
""" | ||
def authenticate(self, request): | ||
""" | ||
:param: request: request object to decode and authenticate | ||
""" | ||
|
||
auth = get_authorization_header(request).split() | ||
|
||
#Check if token and is present | ||
if not auth or auth[0].lower() != b'bearer': | ||
return None | ||
|
||
#Check if the correct credemtials were passed in 'request' parameter | ||
if len(auth) == 1: | ||
msg = 'Invalid token header. No credentials provided.' | ||
raise exceptions.AuthenticationFailed(msg) | ||
#Check if token header has correct number of segments | ||
elif len(auth) > 2: | ||
msg = 'Invalid token header' | ||
raise exceptions.AuthenticationFailed(msg) | ||
|
||
#Decode token sucessfully, else catch some errors; | ||
#errors can be due to expired signature, in decoding | ||
#token validity or a user not existing | ||
|
||
token = auth[1] | ||
try: | ||
payload = jwt.decode(token, SECRET_KEY) | ||
email = payload['email'] | ||
|
||
user = User.objects.get( | ||
email=email, | ||
is_active=True | ||
) | ||
#Except cuustom errors while using the token | ||
except Exception as e: | ||
if e.__class__.__name__ == 'DecodeError': | ||
raise exceptions.AuthenticationFailed('cannot decode token') | ||
if e.__class__.__name__ == 'ExpiredSignatureError': | ||
raise exceptions.AuthenticationFailed('Token has expired') | ||
return (user, token) | ||
|
||
|
||
def authenticate_header(self, request): | ||
""" | ||
Authenticate header prefix | ||
:param: request: object request to add prefix | ||
""" | ||
return 'Bearer' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
""" | ||
Token related tests module | ||
""" | ||
import json | ||
|
||
from rest_framework.views import status | ||
|
||
from .base import BaseTest | ||
|
||
class TokenTestcase(BaseTest): | ||
"""Test tokens testcase""" | ||
|
||
|
||
def test_token_without_bearer(self): | ||
""" | ||
test token must have a 'Bearer' prefix | ||
""" | ||
self.client.credentials(HTTP_AUTHORIZATION="Token akldjfakjdlfjs") | ||
|
||
get_user = self.client.get( | ||
self.USER_URL | ||
) | ||
self.output = json.loads(get_user.content)['user']['detail'] | ||
self.assertEqual(get_user.status_code, status.HTTP_401_UNAUTHORIZED) | ||
self.assertEqual(self.output, 'Authentication credentials were not provided.') | ||
|
||
|
||
def test_token_without_string_token(self): | ||
""" | ||
test token must have a token string | ||
""" | ||
self.client.credentials(HTTP_AUTHORIZATION="Bearer ") | ||
|
||
get_user = self.client.get( | ||
self.USER_URL | ||
) | ||
self.output = json.loads(get_user.content)['user']['detail'] | ||
self.assertEqual(get_user.status_code, status.HTTP_401_UNAUTHORIZED) | ||
self.assertEqual(self.output, 'Invalid token header. No credentials provided.') | ||
|
||
|
||
def test_token_with_invalid_content(self): | ||
""" | ||
test token must not have invalid content | ||
""" | ||
self.client.credentials(HTTP_AUTHORIZATION="Bearer kksdjkskjjds dsjkdksjd ") | ||
|
||
get_user = self.client.get( | ||
self.USER_URL | ||
) | ||
self.output = json.loads(get_user.content)['user']['detail'] | ||
self.assertEqual(get_user.status_code, status.HTTP_401_UNAUTHORIZED) | ||
self.assertEqual(self.output, 'Invalid token header') | ||
|
||
|
||
|
||
def test_authenticates_user(self): | ||
""" | ||
Test sucessful token generation on login | ||
""" | ||
register = self.client.post( | ||
self.SIGN_UP_URL, | ||
self.user_data, | ||
format="json") | ||
login = self.client.post( | ||
self.SIGN_IN_URL, | ||
self.user_data, | ||
format="json") | ||
token = json.loads(login.content)['user']['token'] | ||
self.client.credentials(HTTP_AUTHORIZATION="Bearer "+token) | ||
self.assertIn('token', login.data) | ||
self.assertEqual(token, login.data['token']) | ||
|
||
|
||
def test_invalid_token(self): | ||
""" | ||
Test secured endpoint must have a valid token | ||
""" | ||
register = self.client.post( | ||
self.SIGN_UP_URL, | ||
self.user_data, | ||
format="json", | ||
) | ||
login = self.client.post( | ||
self.SIGN_IN_URL, | ||
self.user_data, | ||
format="json") | ||
|
||
token = json.loads(login.content)['user']['token'] | ||
|
||
#tamper with the token authorizarion header | ||
self.client.credentials(HTTP_AUTHORIZATION="Bearer " + 'token') | ||
|
||
#try acessing a secured endpoint | ||
get_user = self.client.get( | ||
self.USER_URL | ||
) | ||
|
||
self.assertTrue('cannot decode token', json.loads(get_user.content)['user']['detail']) |