Skip to content

Commit

Permalink
Display users list based on user access limitation (#1742)
Browse files Browse the repository at this point in the history
* Refactor get_queryset method in UserViewSet

* Used get_accessible_facilities instead of subquery
to fetch the facilities linked to the user

* fixed failing tests

* fix queryset

---------

Co-authored-by: Aakash Singh <mail@singhaakash.dev>
Co-authored-by: khavinshankar <khavinshankar@gmail.com>
Co-authored-by: Vignesh Hari <vichuhari100@gmail.com>
  • Loading branch information
4 people committed May 15, 2024
1 parent ae7ef82 commit 23abce0
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 18 deletions.
11 changes: 4 additions & 7 deletions care/facility/tests/test_unlink_district_admins.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,8 @@ def test_unlink_home_facility_admin_different_district(self):
response = self.client.delete(
"/api/v1/users/" + username + "/clear_home_facility/"
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(
response.json()["facility"],
"Cannot unlink User's Home Facility from other district",
)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(response.json()["detail"], "Not found.")

def test_unlink_faciltity_admin_same_district(self):
self.client.force_login(self.admin1)
Expand All @@ -80,5 +77,5 @@ def test_unlink_faciltity_admin_different_district(self):
"/api/v1/users/" + username + "/delete_facility/",
{"facility": self.facility2.external_id},
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.json()["facility"], "Facility Access not Present")
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(response.json()["detail"], "Not found.")
33 changes: 32 additions & 1 deletion care/users/api/viewsets/users.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from django.core.cache import cache
from django.db.models import F
from django.db.models import F, Q, Subquery
from django_filters import rest_framework as filters
from drf_spectacular.utils import extend_schema
from dry_rest_permissions.generics import DRYPermissions
Expand All @@ -21,6 +21,7 @@
UserSerializer,
)
from care.users.models import User
from care.utils.cache.cache_allowed_facilities import get_accessible_facilities


def remove_facility_user_cache(user_id):
Expand Down Expand Up @@ -119,6 +120,36 @@ class UserViewSet(
# DRYPermissions(),
# ]

def get_queryset(self):
if self.request.user.is_superuser:
return self.queryset
query = Q(id=self.request.user.id)
if self.request.user.user_type >= User.TYPE_VALUE_MAP["StateReadOnlyAdmin"]:
query |= Q(
state=self.request.user.state,
user_type__lt=User.TYPE_VALUE_MAP["StateAdmin"],
is_superuser=False,
)
elif (
self.request.user.user_type >= User.TYPE_VALUE_MAP["DistrictReadOnlyAdmin"]
):
query |= Q(
district=self.request.user.district,
user_type__lt=User.TYPE_VALUE_MAP["DistrictAdmin"],
is_superuser=False,
)
else:
query |= Q(
id__in=Subquery(
FacilityUser.objects.filter(
facility_id__in=get_accessible_facilities(self.request.user)
).values("user_id")
),
user_type__lt=User.TYPE_VALUE_MAP["DistrictAdmin"],
is_superuser=False,
)
return self.queryset.filter(query)

def get_serializer_class(self):
if self.action == "list":
return UserListSerializer
Expand Down
24 changes: 14 additions & 10 deletions care/users/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,29 +123,33 @@ def setUpTestData(cls) -> None:
cls.local_body = cls.create_local_body(cls.district)
cls.super_user = cls.create_super_user("su", cls.district)
cls.facility = cls.create_facility(cls.super_user, cls.district, cls.local_body)

cls.user = cls.create_user("staff1", cls.district, home_facility=cls.facility)

cls.data_2 = cls.get_user_data(cls.district)
cls.data_2.update({"username": "user_2", "password": "password"})
cls.user_2 = cls.create_user(**cls.data_2)

cls.data_3 = cls.get_user_data(cls.district)
cls.data_3.update({"username": "user_3", "password": "password"})
cls.user_3 = cls.create_user(**cls.data_3)
cls.link_user_with_facility(cls.user_3, cls.facility, cls.super_user)

def test_user_can_access_url(self):
"""Test user can access the url by location"""
username = self.user.username
response = self.client.get(f"/api/v1/users/{username}/")
self.assertEqual(response.status_code, status.HTTP_200_OK)

def test_user_can_read_all(self):
"""Test user can read all"""
def test_user_can_read_all_users_within_accessible_facility(self):
"""Test user can read all users within the accessible facility"""
response = self.client.get("/api/v1/users/")
# test response code
self.assertEqual(response.status_code, status.HTTP_200_OK)
res_data_json = response.json()
# test total user count
self.assertEqual(res_data_json["count"], 2)
results = res_data_json["results"]
# test presence of usernames
self.assertIn(self.user.id, {r["id"] for r in results})
self.assertIn(self.user_2.id, {r["id"] for r in results})
self.assertIn(self.user_3.id, {r["id"] for r in results})

def test_user_can_modify_themselves(self):
"""Test user can modify the attributes for themselves"""
Expand All @@ -170,7 +174,8 @@ def test_user_cannot_read_others(self):
"""Test 1 user can read the attributes of the other user"""
username = self.data_2["username"]
response = self.client.get(f"/api/v1/users/{username}/")
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(response.json()["detail"], "Not found.")

def test_user_cannot_modify_others(self):
"""Test a user can't modify others"""
Expand All @@ -183,15 +188,14 @@ def test_user_cannot_modify_others(self):
"password": password,
},
)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(response.json()["detail"], "Not found.")

def test_user_cannot_delete_others(self):
"""Test a user can't delete others"""
field = "username"
response = self.client.delete(f"/api/v1/users/{self.data_2[field]}/")
# test response code
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
# test backend response(user_2 still exists)
self.assertEqual(
self.data_2[field],
User.objects.get(username=self.data_2[field]).username,
Expand Down

0 comments on commit 23abce0

Please sign in to comment.