Skip to content

Commit

Permalink
fix: handle case where multiple perms exist with the same name on dif…
Browse files Browse the repository at this point in the history
…ferent models
  • Loading branch information
JCourt1 committed May 25, 2022
1 parent 020fb1d commit cef55f6
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 26 deletions.
12 changes: 12 additions & 0 deletions guardian/ctypes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from django.contrib.contenttypes.models import ContentType
from django.db.models import QuerySet
from django.utils.module_loading import import_string

from guardian.conf import settings as guardian_settings
Expand All @@ -10,5 +11,16 @@ def get_content_type(obj):
return get_content_type_function(obj)


def get_content_type_from_iterable_or_object(iterable_or_object):
if isinstance(iterable_or_object, list):
ctype = get_content_type(iterable_or_object[0])
elif isinstance(iterable_or_object, QuerySet):
ctype = get_content_type(iterable_or_object.model)
else:
ctype = get_content_type(iterable_or_object)

return ctype


def get_default_content_type(obj):
return ContentType.objects.get_for_model(obj)
18 changes: 4 additions & 14 deletions guardian/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.db.models.functions import Cast
from django.db.models.query import QuerySet
from guardian.core import ObjectPermissionChecker
from guardian.ctypes import get_content_type
from guardian.ctypes import get_content_type, get_content_type_from_iterable_or_object
from guardian.exceptions import ObjectNotPersisted
from django.contrib.auth.models import Permission

Expand Down Expand Up @@ -74,16 +74,6 @@ def _generate_create_kwargs(self, permission, ctype, obj=None, user_or_group=Non

return kwargs

def _get_content_type(self, iterable_or_object):
if isinstance(iterable_or_object, list):
ctype = get_content_type(iterable_or_object[0])
elif isinstance(iterable_or_object, QuerySet):
ctype = get_content_type(iterable_or_object.model)
else:
ctype = get_content_type(iterable_or_object)

return ctype

def _get_obj_list(self, queryset):
if isinstance(queryset, list):
objs = queryset
Expand All @@ -107,7 +97,7 @@ def assign_perm(self, perm, user_or_group, obj):
if getattr(obj, 'pk', None) is None:
raise ObjectNotPersisted("Object %s needs to be persisted first"
% obj)
ctype = self._get_content_type(obj)
ctype = get_content_type_from_iterable_or_object(obj)
permission = self._retrieve_perms(ctype, [perm])[0]
kwargs = self._generate_create_kwargs(permission, ctype, obj=obj, user_or_group=user_or_group)
obj_perm, _ = self.get_or_create(**kwargs)
Expand Down Expand Up @@ -138,7 +128,7 @@ def assign_perms_to_many_for_many(self, perms, users_or_groups, queryset, commit
"""
Bulk assigns given ``perms`` for all objects ``obj`` to a set of users or a set of groups.
"""
ctype = self._get_content_type(queryset)
ctype = get_content_type_from_iterable_or_object(queryset)
permissions = self._retrieve_perms(ctype, perms)
objects = self._get_obj_list(queryset)

Expand Down Expand Up @@ -253,7 +243,7 @@ def bulk_remove_perms(self, perms, user_or_group_or_iterable, iterable_or_object
is that ``post_delete`` signals would NOT be fired.
"""

ctype = self._get_content_type(iterable_or_object)
ctype = get_content_type_from_iterable_or_object(iterable_or_object)
if isinstance(user_or_group_or_iterable, (list, QuerySet)):
users_or_groups = user_or_group_or_iterable
else:
Expand Down
23 changes: 12 additions & 11 deletions guardian/shortcuts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
UUIDField,
)
from guardian.core import ObjectPermissionChecker
from guardian.ctypes import get_content_type
from guardian.ctypes import get_content_type, get_content_type_from_iterable_or_object
from guardian.exceptions import MixedContentTypeError, WrongAppError, MultipleIdentityAndObjectError
from guardian.utils import get_anonymous_user, get_group_obj_perms_model, get_identity, get_user_obj_perms_model
GroupObjectPermission = get_group_obj_perms_model()
Expand Down Expand Up @@ -135,12 +135,13 @@ def assign_perm(perm, user_or_group, obj=None):
return model.objects.assign_perm(perm, group, obj)


def _get_perms_objs(perms, are_global=False):
def _get_perms_objs(perms, obj=None):
filters_to_or = []
permissions = []
ctype = get_content_type_from_iterable_or_object(obj) if obj else None
for perm in perms:
if not isinstance(perm, Permission):
if are_global:
if not ctype:
try:
app_label, codename = perm.split(".", 1)
except ValueError:
Expand All @@ -152,11 +153,11 @@ def _get_perms_objs(perms, are_global=False):
Q(content_type__app_label=app_label) & Q(codename=codename)
)
else:
codename = perm
if "." in perm:
app_label, codename = perm.split(".", 1)
filter = Q(content_type__app_label=app_label) & Q(codename=codename)
else:
filter = Q(codename=perm)
app_label, codename = codename.split(".", 1)

filter = Q(content_type=ctype) & Q(codename=codename)
filters_to_or.append(filter)
else:
permissions.append(perm)
Expand Down Expand Up @@ -188,7 +189,7 @@ def bulk_assign_perms(perms, user_or_group, obj=None, commit=True):
"committing them, since the reference to the user or group is not part of the permissions returned."
)

perms = _get_perms_objs(perms, are_global=True)
perms = _get_perms_objs(perms, obj=None)

if user:
if isinstance(user, (QuerySet, list)):
Expand All @@ -205,7 +206,7 @@ def bulk_assign_perms(perms, user_or_group, obj=None, commit=True):
group.permissions.add(*perms)
return perms

perms = _get_perms_objs(perms)
perms = _get_perms_objs(perms, obj=obj)

if isinstance(obj, (QuerySet, list)):
if user:
Expand Down Expand Up @@ -338,7 +339,7 @@ def bulk_remove_perms(perms, user_or_group_or_iterable=None, obj=None, commit=Tr
"committing them, since the reference to the user or group is not part of the permissions returned."
)

perms = _get_perms_objs(perms, are_global=True)
perms = _get_perms_objs(perms, obj=None)

if user:
if isinstance(user, (QuerySet, list)):
Expand All @@ -355,7 +356,7 @@ def bulk_remove_perms(perms, user_or_group_or_iterable=None, obj=None, commit=Tr
group.permissions.remove(*perms)
return perms

perms = _get_perms_objs(perms)
perms = _get_perms_objs(perms, obj=obj)

if isinstance(obj, (QuerySet, list)):
if user:
Expand Down
84 changes: 83 additions & 1 deletion guardian/testapp/tests/test_shortcuts.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from guardian.exceptions import NotUserNorGroup
from guardian.exceptions import WrongAppError
from guardian.exceptions import MultipleIdentityAndObjectError
from guardian.testapp.models import CharPKModel, ChildTestModel, UUIDPKModel
from guardian.testapp.models import CharPKModel, ChildTestModel, UUIDPKModel, Project, Post
from guardian.testapp.tests.test_core import ObjectPermissionTestCase
from guardian.models import Group, Permission
from guardian.utils import get_identity
Expand Down Expand Up @@ -681,6 +681,38 @@ def test_assign_multiple_perms_from_multiple_groups_for_queryset(self):
unaffected_extra_objs=[],
)

def test_add_when_multiple_models_with_same_perm(self):
"""
The reason this is tested is because there was a bug in this case.
The scenario is that we have a permission with a certain name (here "add_post") that exists on two models in
the same app.
"""
assert ContentType.objects.get(model="post")
project_ctype = ContentType.objects.get(model="project")
Permission.objects.create(
codename='add_post',
name='Can add post',
content_type=project_ctype
)
post = Post.objects.create(title='Post1')
project = Project.objects.create(name='Project1')
auth_entities = [self.group, self.group_2]
perms_to_add = ["add_post"]

self._test_bulk_add_object_perms(
perms_to_add=perms_to_add,
affected_auth_entities=auth_entities,
obj=[project],
unaffected_extra_objs=[],
)

self._check_auth_entities_perms_on_objs(
auth_entities,
[post],
perms_to_add,
have_perms=False
)


class BulkRemovePermTest(BulkPermTestBase):
"""
Expand Down Expand Up @@ -1074,6 +1106,56 @@ def test_remove_multiple_perms_from_multiple_groups_for_queryset(self):
unaffected_extra_objs=[],
)

def test_remove_when_multiple_models_with_same_perm(self):
"""
The reason this is tested is because there was a bug in this case.
The scenario is that we have a permission with a certain name (here "add_post") that exists on two models in
the same app.
"""
assert ContentType.objects.get(model="post")
project_ctype = ContentType.objects.get(model="project")
Permission.objects.create(
codename='add_post',
name='Can add post',
content_type=project_ctype
)
post = Post.objects.create(title='Post1')
project = Project.objects.create(name='Project1')
auth_entities = [self.group, self.group_2]
perms_iterable = ["add_post"]

# check that perms on post have been removed, but not on project
bulk_assign_perms(perms_iterable, auth_entities, [post])
bulk_assign_perms(perms_iterable, auth_entities, [project])
self._check_auth_entities_perms_on_objs(
auth_entities,
[post],
perms_iterable,
have_perms=True
)

bulk_remove_perms(
perms_iterable,
auth_entities,
[post]
)

# check that perms on post have been removed, but not on project

self._check_auth_entities_perms_on_objs(
auth_entities,
[post],
perms_iterable,
have_perms=False
)

self._check_auth_entities_perms_on_objs(
auth_entities,
[project],
perms_iterable,
have_perms=True
)


class RemovePermTest(ObjectPermissionTestCase):
"""
Expand Down

0 comments on commit cef55f6

Please sign in to comment.