Skip to content

Commit

Permalink
Add tests and type annotate utils
Browse files Browse the repository at this point in the history
  • Loading branch information
sumit4613 committed Dec 4, 2021
1 parent a95f33a commit 49d51cd
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 56 deletions.
108 changes: 54 additions & 54 deletions drf_user/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""Collection of general helper functions."""
from __future__ import annotations

import datetime
from typing import Dict
from typing import Optional

import pytz
from django.http import HttpRequest
Expand All @@ -18,11 +22,11 @@
from drf_user.models import OTPValidation
from drf_user.models import User

user_settings = update_user_settings()
otp_settings = user_settings["OTP"]
user_settings: Dict[str, bool | Dict] = update_user_settings()
otp_settings: Dict[str, str | int] = user_settings["OTP"]


def get_client_ip(request: HttpRequest):
def get_client_ip(request: HttpRequest) -> Optional[str]:
"""
Fetches the IP address of a client from Request and
return in proper format.
Expand All @@ -34,7 +38,7 @@ def get_client_ip(request: HttpRequest):
Returns
-------
ip: str
ip: str | None
"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
Expand All @@ -43,7 +47,7 @@ def get_client_ip(request: HttpRequest):
return request.META.get("REMOTE_ADDR")


def datetime_passed_now(source):
def datetime_passed_now(source: datetime.datetime) -> bool:
"""
Compares provided datetime with current time on the basis of Django
settings. Checks source is in future or in past. False if it's in future.
Expand All @@ -63,7 +67,7 @@ def datetime_passed_now(source):
return source <= datetime.datetime.now()


def check_unique(prop, value):
def check_unique(prop: str, value: str) -> bool:
"""
This function checks if the value provided is present in Database
or can be created in DBMS as unique data.
Expand Down Expand Up @@ -92,7 +96,7 @@ def check_unique(prop, value):
return user.count() == 0


def generate_otp(prop, value):
def generate_otp(prop: str, value: str) -> OTPValidation:
"""
This function generates an OTP and saves it into Model. It also
sets various counters, such as send_counter,
Expand Down Expand Up @@ -120,7 +124,7 @@ def generate_otp(prop, value):
5039164
"""
# Create a random number
random_number = User.objects.make_random_password(
random_number: str = User.objects.make_random_password(
length=otp_settings["LENGTH"], allowed_chars=otp_settings["ALLOWED_CHARS"]
)

Expand All @@ -129,16 +133,16 @@ def generate_otp(prop, value):
while OTPValidation.objects.filter(otp__exact=random_number).filter(
is_validated=False
):
random_number = User.objects.make_random_password(
random_number: str = User.objects.make_random_password(
length=otp_settings["LENGTH"], allowed_chars=otp_settings["ALLOWED_CHARS"]
)

# Get or Create new instance of Model with value of provided value
# and set proper counter.
try:
otp_object = OTPValidation.objects.get(destination=value)
otp_object: OTPValidation = OTPValidation.objects.get(destination=value)
except OTPValidation.DoesNotExist:
otp_object = OTPValidation()
otp_object: OTPValidation = OTPValidation()
otp_object.destination = value
else:
if not datetime_passed_now(otp_object.reactive_at):
Expand All @@ -159,7 +163,7 @@ def generate_otp(prop, value):
return otp_object


def send_otp(value, otpobj, recip):
def send_otp(value: str, otpobj: OTPValidation, recip: str) -> Dict:
"""
This function sends OTP to specified value.
Parameters
Expand All @@ -176,27 +180,22 @@ def send_otp(value, otpobj, recip):
-------
"""
otp = otpobj.otp
otp: str = otpobj.otp

if not datetime_passed_now(otpobj.reactive_at):
raise PermissionDenied(
detail=_("OTP sending not allowed until: " + str(otpobj.reactive_at))
detail=_(f"OTP sending not allowed until: {otpobj.reactive_at}")
)

message = (
"OTP for verifying "
+ otpobj.get_prop_display()
+ ": "
+ value
+ " is "
+ otp
+ ". Don't share this with anyone!"
f"OTP for verifying {otpobj.get_prop_display()}: {value} is {otp}."
f" Don't share this with anyone!"
)

try:
rdata = send_message(message, otp_settings["SUBJECT"], [value], [recip])
rdata: dict = send_message(message, otp_settings["SUBJECT"], [value], [recip])
except ValueError as err:
raise APIException(_("Server configuration error occured: %s") % str(err))
raise APIException(_(f"Server configuration error occurred: {err}"))

otpobj.reactive_at = timezone.now() + datetime.timedelta(
minutes=otp_settings["COOLING_PERIOD"]
Expand All @@ -206,7 +205,7 @@ def send_otp(value, otpobj, recip):
return rdata


def login_user(user: User, request: HttpRequest) -> dict:
def login_user(user: User, request: HttpRequest) -> Dict[str, str]:
"""
This function is used to login a user. It saves the authentication in
AuthTransaction model.
Expand All @@ -221,7 +220,7 @@ def login_user(user: User, request: HttpRequest) -> dict:
dict:
Generated JWT tokens for user.
"""
token = RefreshToken.for_user(user)
token: RefreshToken = RefreshToken.for_user(user)

# Add custom claims
if hasattr(user, "email"):
Expand Down Expand Up @@ -252,7 +251,7 @@ def login_user(user: User, request: HttpRequest) -> dict:
}


def check_validation(value):
def check_validation(value: str) -> bool:
"""
This functions check if given value is already validated via OTP or not.
Parameters
Expand All @@ -272,13 +271,13 @@ def check_validation(value):
"""
try:
otp_object = OTPValidation.objects.get(destination=value)
otp_object: OTPValidation = OTPValidation.objects.get(destination=value)
return otp_object.is_validated
except OTPValidation.DoesNotExist:
return False


def validate_otp(value, otp):
def validate_otp(value: str, otp: int) -> bool:
"""
This function is used to validate the OTP for a particular value.
It also reduces the attempt count by 1 and resets OTP.
Expand All @@ -295,36 +294,37 @@ def validate_otp(value, otp):
"""
try:
# Try to get OTP Object from Model and initialize data dictionary
otp_object = OTPValidation.objects.get(destination=value, is_validated=False)

# Decrement validate_attempt
otp_object.validate_attempt -= 1

if str(otp_object.otp) == str(otp):
otp_object.is_validated = True
otp_object.save()
return True

elif otp_object.validate_attempt <= 0:
generate_otp(otp_object.prop, value)
raise AuthenticationFailed(
detail=_("Incorrect OTP. Attempt exceeded! OTP has been " "reset.")
)

else:
otp_object.save()
raise AuthenticationFailed(
detail=_(
"OTP Validation failed! "
+ str(otp_object.validate_attempt)
+ " attempts left!"
)
)

otp_object: OTPValidation = OTPValidation.objects.get(
destination=value, is_validated=False
)
except OTPValidation.DoesNotExist:
raise NotFound(
detail=_(
"No pending OTP validation request found for provided "
"destination. Kindly send an OTP first"
)
)
# Decrement validate_attempt
otp_object.validate_attempt -= 1

if str(otp_object.otp) == str(otp):
# match otp
otp_object.is_validated = True
otp_object.save()
return True

elif otp_object.validate_attempt <= 0:
# check if attempts exceeded and regenerate otp and raise error
generate_otp(otp_object.prop, value)
raise AuthenticationFailed(
detail=_("Incorrect OTP. Attempt exceeded! OTP has been reset.")
)

else:
# update attempts and raise error
otp_object.save()
raise AuthenticationFailed(
detail=_(
f"OTP Validation failed! {otp_object.validate_attempt} attempts left!"
)
)
44 changes: 44 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime

import pytest
from django.http import HttpRequest
from django.test import TestCase
from django.utils import timezone
from model_bakery import baker
Expand All @@ -10,6 +11,7 @@
from drf_user import utils as utils
from drf_user.models import OTPValidation
from drf_user.models import User
from drf_user.utils import get_client_ip


class TestCheckUnique(TestCase):
Expand Down Expand Up @@ -145,3 +147,45 @@ def test_validate_otp_raises_invalid_otp_exception(self):
"OTP Validation failed! 2 attempts left!",
str(context_manager.exception.detail),
)


class TestGetClientIP(TestCase):
"""get_client_ip test"""

def test_meta_none(self):
"""Check get_client_ip returns None when META is empty"""
request = HttpRequest()
request.META = {}
ip = get_client_ip(request)
self.assertIsNone(ip)

def test_meta_single_with_http_x_forwarded_for(self):
"""Check get_client_ip returns first item from HTTP_X_FORWARDED_FOR"""
request = HttpRequest()
request.META = {
"HTTP_X_FORWARDED_FOR": "177.139.233.139, 198.84.193.157, 198.84.193.158",
}
result = get_client_ip(request)
self.assertEqual(result, "177.139.233.139")

def test_meta_single_with_remote_addr(self):
"""Check get_client_ip returns first item from HTTP_X_FORWARDED_FOR"""
request = HttpRequest()
request.META = {
"REMOTE_ADDR": "198.84.193.158",
}
result = get_client_ip(request)
self.assertEqual(result, "198.84.193.158")

def test_meta_multi(self):
"""
Check get_client_ip returns ip from HTTP_X_FORWARDED_FOR when
HTTP_X_FORWARDED_FOR and REMOTE_ADDR is present
"""
request = HttpRequest()
request.META = {
"HTTP_X_FORWARDED_FOR": "177.139.233.139, 198.84.193.157, 198.84.193.158",
"REMOTE_ADDR": "177.139.233.133",
}
result = get_client_ip(request)
self.assertEqual(result, "177.139.233.139")
4 changes: 2 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def test_raise_api_exception_when_email_invalid(self):

self.assertEqual(500, response.status_code)
self.assertEqual(
"Server configuration error occured: Invalid recipient.",
"Server configuration error occurred: Invalid recipient.",
response.json()["detail"],
)

Expand Down Expand Up @@ -584,7 +584,7 @@ def test_login_with_incorrect_email_mobile(self):
# when drf_addons is updated
self.assertEqual(500, response.status_code)
self.assertEqual(
"Server configuration error occured: Invalid recipient.",
"Server configuration error occurred: Invalid recipient.",
response.json()["detail"],
)

Expand Down

0 comments on commit 49d51cd

Please sign in to comment.