Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dependency for get_client_ip #127

Merged
merged 2 commits into from
Dec 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 74 additions & 53 deletions drf_user/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Collection of general helper functions."""
import datetime
from typing import Dict
from typing import Optional
from typing import Union

import pytz
from django.http import HttpRequest
from django.utils import timezone
from django.utils.text import gettext_lazy as _
from drfaddons.utils import get_client_ip
from drfaddons.utils import send_message
from rest_framework.exceptions import APIException
from rest_framework.exceptions import AuthenticationFailed
Expand All @@ -19,11 +21,34 @@
from drf_user.models import OTPValidation
from drf_user.models import User

user_settings = update_user_settings()
otp_settings = user_settings["OTP"]
user_settings: Dict[
str, Union[bool, Dict[str, Union[int, str, bool]]]
] = update_user_settings()
otp_settings: Dict[str, Union[str, int]] = user_settings["OTP"]


def datetime_passed_now(source):
def get_client_ip(request: HttpRequest) -> Optional[str]:
"""
Fetches the IP address of a client from Request and
return in proper format.
Source: https://stackoverflow.com/a/4581997

Parameters
----------
request: django.http.HttpRequest

Returns
-------
ip: str or None
"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
return x_forwarded_for.split(",")[0]
else:
return request.META.get("REMOTE_ADDR")


def datetime_passed_now(source: datetime.datetime) -> bool:
"""
Compares provided datetime with current time on the basis of Django
settings. Checks source is in future or in past. False if it's in future.
Expand All @@ -43,7 +68,7 @@ def datetime_passed_now(source):
return source <= datetime.datetime.now()


def check_unique(prop, value):
def check_unique(prop: str, value: str) -> bool:
"""
This function checks if the value provided is present in Database
or can be created in DBMS as unique data.
Expand Down Expand Up @@ -72,7 +97,7 @@ def check_unique(prop, value):
return user.count() == 0


def generate_otp(prop, value):
def generate_otp(prop: str, value: str) -> OTPValidation:
"""
This function generates an OTP and saves it into Model. It also
sets various counters, such as send_counter,
Expand Down Expand Up @@ -100,7 +125,7 @@ def generate_otp(prop, value):
5039164
"""
# Create a random number
random_number = User.objects.make_random_password(
random_number: str = User.objects.make_random_password(
length=otp_settings["LENGTH"], allowed_chars=otp_settings["ALLOWED_CHARS"]
)

Expand All @@ -109,16 +134,16 @@ def generate_otp(prop, value):
while OTPValidation.objects.filter(otp__exact=random_number).filter(
is_validated=False
):
random_number = User.objects.make_random_password(
random_number: str = User.objects.make_random_password(
length=otp_settings["LENGTH"], allowed_chars=otp_settings["ALLOWED_CHARS"]
)

# Get or Create new instance of Model with value of provided value
# and set proper counter.
try:
otp_object = OTPValidation.objects.get(destination=value)
otp_object: OTPValidation = OTPValidation.objects.get(destination=value)
except OTPValidation.DoesNotExist:
otp_object = OTPValidation()
otp_object: OTPValidation = OTPValidation()
otp_object.destination = value
else:
if not datetime_passed_now(otp_object.reactive_at):
Expand All @@ -139,7 +164,7 @@ def generate_otp(prop, value):
return otp_object


def send_otp(value, otpobj, recip):
def send_otp(value: str, otpobj: OTPValidation, recip: str) -> Dict:
"""
This function sends OTP to specified value.
Parameters
Expand All @@ -156,27 +181,22 @@ def send_otp(value, otpobj, recip):
-------

"""
otp = otpobj.otp
otp: str = otpobj.otp

if not datetime_passed_now(otpobj.reactive_at):
raise PermissionDenied(
detail=_("OTP sending not allowed until: " + str(otpobj.reactive_at))
detail=_(f"OTP sending not allowed until: {otpobj.reactive_at}")
)

message = (
"OTP for verifying "
+ otpobj.get_prop_display()
+ ": "
+ value
+ " is "
+ otp
+ ". Don't share this with anyone!"
f"OTP for verifying {otpobj.get_prop_display()}: {value} is {otp}."
f" Don't share this with anyone!"
)

try:
rdata = send_message(message, otp_settings["SUBJECT"], [value], [recip])
rdata: dict = send_message(message, otp_settings["SUBJECT"], [value], [recip])
except ValueError as err:
raise APIException(_("Server configuration error occured: %s") % str(err))
raise APIException(_(f"Server configuration error occurred: {err}"))

otpobj.reactive_at = timezone.now() + datetime.timedelta(
minutes=otp_settings["COOLING_PERIOD"]
Expand All @@ -186,7 +206,7 @@ def send_otp(value, otpobj, recip):
return rdata


def login_user(user: User, request: HttpRequest) -> dict:
def login_user(user: User, request: HttpRequest) -> Dict[str, str]:
"""
This function is used to login a user. It saves the authentication in
AuthTransaction model.
Expand All @@ -201,7 +221,7 @@ def login_user(user: User, request: HttpRequest) -> dict:
dict:
Generated JWT tokens for user.
"""
token = RefreshToken.for_user(user)
token: RefreshToken = RefreshToken.for_user(user)

# Add custom claims
if hasattr(user, "email"):
Expand Down Expand Up @@ -232,7 +252,7 @@ def login_user(user: User, request: HttpRequest) -> dict:
}


def check_validation(value):
def check_validation(value: str) -> bool:
"""
This functions check if given value is already validated via OTP or not.
Parameters
Expand All @@ -252,13 +272,13 @@ def check_validation(value):

"""
try:
otp_object = OTPValidation.objects.get(destination=value)
otp_object: OTPValidation = OTPValidation.objects.get(destination=value)
return otp_object.is_validated
except OTPValidation.DoesNotExist:
return False


def validate_otp(value, otp):
def validate_otp(value: str, otp: int) -> bool:
"""
This function is used to validate the OTP for a particular value.
It also reduces the attempt count by 1 and resets OTP.
Expand All @@ -275,36 +295,37 @@ def validate_otp(value, otp):
"""
try:
# Try to get OTP Object from Model and initialize data dictionary
otp_object = OTPValidation.objects.get(destination=value, is_validated=False)

# Decrement validate_attempt
otp_object.validate_attempt -= 1

if str(otp_object.otp) == str(otp):
otp_object.is_validated = True
otp_object.save()
return True

elif otp_object.validate_attempt <= 0:
generate_otp(otp_object.prop, value)
raise AuthenticationFailed(
detail=_("Incorrect OTP. Attempt exceeded! OTP has been " "reset.")
)

else:
otp_object.save()
raise AuthenticationFailed(
detail=_(
"OTP Validation failed! "
+ str(otp_object.validate_attempt)
+ " attempts left!"
)
)

otp_object: OTPValidation = OTPValidation.objects.get(
destination=value, is_validated=False
)
except OTPValidation.DoesNotExist:
raise NotFound(
detail=_(
"No pending OTP validation request found for provided "
"destination. Kindly send an OTP first"
)
)
# Decrement validate_attempt
otp_object.validate_attempt -= 1

if str(otp_object.otp) == str(otp):
# match otp
otp_object.is_validated = True
otp_object.save()
return True

elif otp_object.validate_attempt <= 0:
# check if attempts exceeded and regenerate otp and raise error
generate_otp(otp_object.prop, value)
raise AuthenticationFailed(
detail=_("Incorrect OTP. Attempt exceeded! OTP has been reset.")
)

else:
# update attempts and raise error
otp_object.save()
raise AuthenticationFailed(
detail=_(
f"OTP Validation failed! {otp_object.validate_attempt} attempts left!"
)
)
2 changes: 1 addition & 1 deletion drf_user/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from django.conf import settings
from django.utils import timezone
from django.utils.text import gettext_lazy as _
from drfaddons.utils import get_client_ip
from drfaddons.utils import JsonResponse
from rest_framework import status
from rest_framework.exceptions import APIException
Expand Down Expand Up @@ -30,6 +29,7 @@
from drf_user.serializers import UserSerializer
from drf_user.utils import check_unique
from drf_user.utils import generate_otp
from drf_user.utils import get_client_ip
from drf_user.utils import login_user
from drf_user.utils import send_otp
from drf_user.utils import validate_otp
Expand Down
44 changes: 44 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime

import pytest
from django.http import HttpRequest
from django.test import TestCase
from django.utils import timezone
from model_bakery import baker
Expand All @@ -10,6 +11,7 @@
from drf_user import utils as utils
from drf_user.models import OTPValidation
from drf_user.models import User
from drf_user.utils import get_client_ip


class TestCheckUnique(TestCase):
Expand Down Expand Up @@ -145,3 +147,45 @@ def test_validate_otp_raises_invalid_otp_exception(self):
"OTP Validation failed! 2 attempts left!",
str(context_manager.exception.detail),
)


class TestGetClientIP(TestCase):
"""get_client_ip test"""

def test_meta_none(self):
"""Check get_client_ip returns None when META is empty"""
request = HttpRequest()
request.META = {}
ip = get_client_ip(request)
self.assertIsNone(ip)

def test_meta_single_with_http_x_forwarded_for(self):
"""Check get_client_ip returns first item from HTTP_X_FORWARDED_FOR"""
request = HttpRequest()
request.META = {
"HTTP_X_FORWARDED_FOR": "177.139.233.139, 198.84.193.157, 198.84.193.158",
}
result = get_client_ip(request)
self.assertEqual(result, "177.139.233.139")

def test_meta_single_with_remote_addr(self):
"""Check get_client_ip returns first item from HTTP_X_FORWARDED_FOR"""
request = HttpRequest()
request.META = {
"REMOTE_ADDR": "198.84.193.158",
}
result = get_client_ip(request)
self.assertEqual(result, "198.84.193.158")

def test_meta_multi(self):
"""
Check get_client_ip returns ip from HTTP_X_FORWARDED_FOR when
HTTP_X_FORWARDED_FOR and REMOTE_ADDR is present
"""
request = HttpRequest()
request.META = {
"HTTP_X_FORWARDED_FOR": "177.139.233.139, 198.84.193.157, 198.84.193.158",
"REMOTE_ADDR": "177.139.233.133",
}
result = get_client_ip(request)
self.assertEqual(result, "177.139.233.139")
4 changes: 2 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def test_raise_api_exception_when_email_invalid(self):

self.assertEqual(500, response.status_code)
self.assertEqual(
"Server configuration error occured: Invalid recipient.",
"Server configuration error occurred: Invalid recipient.",
response.json()["detail"],
)

Expand Down Expand Up @@ -584,7 +584,7 @@ def test_login_with_incorrect_email_mobile(self):
# when drf_addons is updated
self.assertEqual(500, response.status_code)
self.assertEqual(
"Server configuration error occured: Invalid recipient.",
"Server configuration error occurred: Invalid recipient.",
response.json()["detail"],
)

Expand Down