Skip to content

Commit

Permalink
[Backport 3.3.x][Fixes #8937] User with perms can edit approved and p…
Browse files Browse the repository at this point in the history
…ublished resources when Advanced workflow is enabled (#8978)

* - [Fixes #8937] User with perms can edit approved and published resources when Advanced workflow is enabled

* - Add advanced workflow permissions test

* - check if owner is group manager

* - modify tests

* - get resource group on getting obj_group_managers

* - remove unused class and tests

* - update assign_owner permissions

* - Align test cases with master

* - Align test cases with master

* - Align test cases with master

Co-authored-by: afabiani <alessio.fabiani@gmail.com>
Co-authored-by: afabiani <alessio.fabiani@geo-solutions.it>
  • Loading branch information
3 people committed Mar 29, 2022
1 parent 7804448 commit af6f7d9
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 225 deletions.
6 changes: 3 additions & 3 deletions geonode/base/populate_test_data.py
Expand Up @@ -314,7 +314,7 @@ def dump_models(path=None):
f.write(result)


def create_single_layer(name, keywords=None, owner=None, group=None):
def create_single_layer(name, keywords=None, owner=None, group=None, **kwargs):
admin, created = get_user_model().objects.get_or_create(username='admin')
if created:
admin.is_superuser = True
Expand Down Expand Up @@ -342,9 +342,9 @@ def create_single_layer(name, keywords=None, owner=None, group=None):
storeType="dataStore",
resource_type="layer",
typename=f"geonode:{title}",
group=group
group=group,
**kwargs
)

layer.save()

if isinstance(keywords, list):
Expand Down
73 changes: 2 additions & 71 deletions geonode/base/tests.py
Expand Up @@ -26,9 +26,9 @@
from unittest.mock import patch, Mock
from imagekit.cachefiles.backends import Simple

from guardian.shortcuts import assign_perm, get_perms
from guardian.shortcuts import assign_perm

from geonode.base.utils import OwnerRightsRequestViewUtils, ManageResourceOwnerPermissions
from geonode.base.utils import OwnerRightsRequestViewUtils
from geonode.base.templatetags.base_tags import display_change_perms_button
from geonode.documents.models import Document
from geonode.layers.models import Layer
Expand Down Expand Up @@ -664,75 +664,6 @@ def test_maintenance_true(self):
self.assertEqual(response.status_code, 503, 'User is allowed to get index page')


class TestOwnerPermissionManagement(TestCase):
"""
Only Layers has custom permissions so this is the only model which is tested.
Models are always treat in the same way
"""

def setUp(self):
User = get_user_model()
self.user = User.objects.create(username='test', email='test@test.com')
self.la = Layer.objects.create(owner=self.user, title='test', is_approved=True)

@override_settings(ADMIN_MODERATE_UPLOADS=True)
def test_owner_has_no_permissions(self):
l_manager = ManageResourceOwnerPermissions(self.la)
l_manager.set_owner_permissions_according_to_workflow()

self.assertEqual(self._retrieve_resource_perms_definition(self.la, ['read', 'download']).sort(),
get_perms(self.user, self.la.get_self_resource()).sort()
)

@override_settings(ADMIN_MODERATE_UPLOADS=False)
def test_user_has_own_permissions(self):
l_manager = ManageResourceOwnerPermissions(self.la)
l_manager.set_owner_permissions_according_to_workflow()

self.assertEqual(self._retrieve_resource_perms_definition(self.la).sort(),
get_perms(self.user, self.la.get_self_resource()).sort()
)

@override_settings(ADMIN_MODERATE_UPLOADS=True)
def test_user_has_permissions_restored(self):
self.la.is_approved = False
self.la.save()
l_manager = ManageResourceOwnerPermissions(self.la)
l_manager.set_owner_permissions_according_to_workflow()

self.assertEqual(self._retrieve_resource_perms_definition(self.la).sort(),
get_perms(self.user, self.la.get_self_resource()).sort()
)

@override_settings(ADMIN_MODERATE_UPLOADS=True)
def test_remove_and_add_perms(self):
l_manager = ManageResourceOwnerPermissions(self.la)
l_manager.set_owner_permissions_according_to_workflow()

self.assertEqual(self._retrieve_resource_perms_definition(self.la, ['read', 'download']).sort(),
get_perms(self.user, self.la.get_self_resource()).sort()
)

self.la.is_approved = False
self.la.save()

l_manager.set_owner_permissions_according_to_workflow()

self.assertEqual(self._retrieve_resource_perms_definition(self.la).sort(),
get_perms(self.user, self.la.get_self_resource()).sort()
)

def _retrieve_resource_perms_definition(self, resource, perm_key_bundle=[]):
ret = []
if perm_key_bundle:
for key in perm_key_bundle:
ret.extend(resource.BASE_PERMISSIONS.get(key, []))
ret.extend(resource.PERMISSIONS.get(key, []))
else:
[ret.extend(r) for r in list(resource.BASE_PERMISSIONS.values()) + list(resource.PERMISSIONS.values())]
return ret


class TestOwnerRightsRequestUtils(TestCase):

def setUp(self):
Expand Down
44 changes: 1 addition & 43 deletions geonode/base/utils.py
Expand Up @@ -33,9 +33,8 @@
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import ValidationError
# Geonode functionality
from guardian.shortcuts import get_perms, remove_perm, assign_perm

# Geonode functionality
from geonode.layers.models import Layer
from geonode.base.models import ResourceBase, Link, Configuration
from geonode.thumbs.utils import (
Expand Down Expand Up @@ -170,47 +169,6 @@ def is_admin_publish_mode():
return settings.ADMIN_MODERATE_UPLOADS


class ManageResourceOwnerPermissions:
def __init__(self, resource):
self.resource = resource

def set_owner_permissions_according_to_workflow(self):
if self.resource.is_approved and OwnerRightsRequestViewUtils.is_admin_publish_mode():
self._disable_owner_write_permissions()
else:
self._restore_owner_permissions()

def _disable_owner_write_permissions(self):

for perm in get_perms(self.resource.owner, self.resource.get_self_resource()):
remove_perm(perm, self.resource.owner, self.resource.get_self_resource())

for perm in get_perms(self.resource.owner, self.resource):
remove_perm(perm, self.resource.owner, self.resource)

for perm in self.resource.BASE_PERMISSIONS.get('read') + self.resource.BASE_PERMISSIONS.get('download'):
if not settings.RESOURCE_PUBLISHING and not settings.ADMIN_MODERATE_UPLOADS:
assign_perm(perm, self.resource.owner, self.resource.get_self_resource())
elif perm not in {'change_resourcebase_permissions', 'publish_resourcebase'}:
assign_perm(perm, self.resource.owner, self.resource.get_self_resource())

def _restore_owner_permissions(self):

for perm_list in self.resource.BASE_PERMISSIONS.values():
for perm in perm_list:
if not settings.RESOURCE_PUBLISHING and not settings.ADMIN_MODERATE_UPLOADS:
assign_perm(perm, self.resource.owner, self.resource.get_self_resource())
elif perm not in {'change_resourcebase_permissions', 'publish_resourcebase'}:
assign_perm(perm, self.resource.owner, self.resource.get_self_resource())

for perm_list in self.resource.PERMISSIONS.values():
for perm in perm_list:
if not settings.RESOURCE_PUBLISHING and not settings.ADMIN_MODERATE_UPLOADS:
assign_perm(perm, self.resource.owner, self.resource)
elif perm not in {'change_resourcebase_permissions', 'publish_resourcebase'}:
assign_perm(perm, self.resource.owner, self.resource)


def validate_extra_metadata(data, instance):
if not data:
return data
Expand Down
4 changes: 0 additions & 4 deletions geonode/documents/views.py
Expand Up @@ -35,7 +35,6 @@
from django.db.models import F
from django.forms.utils import ErrorList

from geonode.base.utils import ManageResourceOwnerPermissions
from geonode.decorators import check_keyword_write_perms
from geonode.documents.utils import get_download_response
from geonode.utils import resolve_object
Expand Down Expand Up @@ -98,9 +97,6 @@ def document_detail(request, docid):
if not document:
raise Http404(_("Not found"))

permission_manager = ManageResourceOwnerPermissions(document)
permission_manager.set_owner_permissions_according_to_workflow()

# Add metadata_author or poc if missing
document.add_missing_metadata_author_or_poc()

Expand Down
4 changes: 1 addition & 3 deletions geonode/layers/views.py
Expand Up @@ -101,7 +101,7 @@
from geonode.geoserver.helpers import (
ogc_server_settings,
set_layer_style)
from geonode.base.utils import ManageResourceOwnerPermissions

from geonode.tasks.tasks import set_permissions

from celery.utils.log import get_logger
Expand Down Expand Up @@ -363,8 +363,6 @@ def layer_detail(request, layername, template='layers/layer_detail.html'):
raise Http404(_("Not found"))
if not layer:
raise Http404(_("Not found"))
permission_manager = ManageResourceOwnerPermissions(layer)
permission_manager.set_owner_permissions_according_to_workflow()

# Add metadata_author or poc if missing
layer.add_missing_metadata_author_or_poc()
Expand Down
5 changes: 0 additions & 5 deletions geonode/maps/views.py
Expand Up @@ -75,8 +75,6 @@

from dal import autocomplete

from geonode.base.utils import ManageResourceOwnerPermissions

if check_ogc_backend(geoserver.BACKEND_PACKAGE):
# FIXME: The post service providing the map_status object
# should be moved to geonode.geoserver.
Expand Down Expand Up @@ -129,9 +127,6 @@ def map_detail(request, mapid, template='maps/map_detail.html'):
if not map_obj:
raise Http404(_("Not found"))

permission_manager = ManageResourceOwnerPermissions(map_obj)
permission_manager.set_owner_permissions_according_to_workflow()

# Add metadata_author or poc if missing
map_obj.add_missing_metadata_author_or_poc()

Expand Down
77 changes: 51 additions & 26 deletions geonode/security/models.py
Expand Up @@ -46,11 +46,10 @@
from geonode.groups.conf import settings as groups_settings

from .permissions import (
ADMIN_PERMISSIONS,
LAYER_ADMIN_PERMISSIONS,
VIEW_PERMISSIONS,
SERVICE_PERMISSIONS
)
ADMIN_PERMISSIONS,
SERVICE_PERMISSIONS,
LAYER_ADMIN_PERMISSIONS)

from .utils import (
_get_gf_services,
Expand All @@ -59,6 +58,7 @@
get_users_with_perms,
set_owner_permissions,
get_user_obj_perms_model,
get_owner_permissions_according_to_workflow,
remove_object_permissions,
purge_geofence_layer_rules,
sync_geofence_with_guardian,
Expand Down Expand Up @@ -184,22 +184,28 @@ def get_group_managers(self, user_groups):
- The list of "group managers" of the groups above
"""
obj_group_managers = []
obj_user_groups = []

perm_spec = {"groups": {}}
if self.group:
if GroupProfile.objects.filter(group=self.group).exists():
obj_user_groups.append(self.group.name)
if user_groups:
for _user_group in user_groups:
if not skip_registered_members_common_group(Group.objects.get(name=_user_group)):
try:
_group_profile = GroupProfile.objects.get(slug=_user_group)
managers = _group_profile.get_managers()
if managers:
for manager in managers:
if manager not in obj_group_managers and not manager.is_superuser:
obj_group_managers.append(manager)
except GroupProfile.DoesNotExist:
tb = traceback.format_exc()
logger.debug(tb)

# assign view permissions to group resources
obj_user_groups.extend(list(user_groups))

for group in obj_user_groups:
if not skip_registered_members_common_group(Group.objects.get(name=group)):
try:
_group_profile = GroupProfile.objects.get(slug=group)
managers = _group_profile.get_managers()
if managers:
for manager in managers:
if manager not in obj_group_managers and not manager.is_superuser:
obj_group_managers.append(manager)
except GroupProfile.DoesNotExist:
tb = traceback.format_exc()
logger.debug(tb)

if self.group and settings.RESOURCE_PUBLISHING:
perm_spec['groups'][self.group] = VIEW_PERMISSIONS

Expand Down Expand Up @@ -626,6 +632,11 @@ def get_workflow_perms(self, perm_spec=None):
if settings.ADMIN_MODERATE_UPLOADS or settings.RESOURCE_PUBLISHING:
# permissions = self._resolve_resource_permissions(resource=self, permissions=perm_spec)
# default permissions for resource owner and group managers

admin_perms = ADMIN_PERMISSIONS.copy()
if self.polymorphic_ctype.name == 'layer':
admin_perms += LAYER_ADMIN_PERMISSIONS

anonymous_group = Group.objects.get(name='anonymous')
registered_members_group_name = groups_settings.REGISTERED_MEMBERS_GROUP_NAME
user_groups = Group.objects.filter(
Expand All @@ -635,14 +646,10 @@ def get_workflow_perms(self, perm_spec=None):
if group_managers:
for group_manager in group_managers:
prev_perms = perm_spec['users'].get(group_manager, []) if isinstance(perm_spec['users'], dict) else []
# AF: Should be a manager being able to change the dataset data and style too by default?
# For the time being let's give to the manager "management" perms only.
# if self.polymorphic_ctype.name == 'layer':
# perm_spec['users'][group_manager] = list(
# set(prev_perms + VIEW_PERMISSIONS + ADMIN_PERMISSIONS + LAYER_ADMIN_PERMISSIONS))
# else:
perm_spec['users'][group_manager] = list(
set(prev_perms + VIEW_PERMISSIONS + ADMIN_PERMISSIONS))
prev_perms += VIEW_PERMISSIONS + admin_perms.copy()
if self.is_published or (settings.RESOURCE_PUBLISHING and not settings.ADMIN_MODERATE_UPLOADS):
prev_perms.remove('publish_resourcebase')
perm_spec['users'][group_manager] = list(set(prev_perms))

if member_group_perm:
for gr, perm in member_group_perm['groups'].items():
Expand All @@ -663,6 +670,24 @@ def get_workflow_perms(self, perm_spec=None):
prev_perms = perm_spec['groups'].get(anonymous_group, []) if isinstance(perm_spec['groups'], dict) else []
perm_spec['groups'][anonymous_group] = list(set(prev_perms + VIEW_PERMISSIONS))

if settings.ADMIN_MODERATE_UPLOADS and settings.RESOURCE_PUBLISHING:
if self.is_approved or self.is_published:
# Assign view and download permissions to owner and other users with edit permissions
edit_perms = ADMIN_PERMISSIONS + LAYER_ADMIN_PERMISSIONS
new_perms = {"users": {}, "groups": {}}
for user, perms in perm_spec["users"].items():
# if owner is a group manager, take the group manager permissions set above
if user in group_managers or user.is_superuser:
new_perms["users"][user] = perms
else:
new_perms["users"][user] = [perm for perm in perms if perm not in edit_perms]
for group, perms in perm_spec["groups"].items():
new_perms["groups"][group] = [perm for perm in perms if perm not in edit_perms]
perm_spec = new_perms
else:
# restore owner perms
owner_permissions = get_owner_permissions_according_to_workflow(self)
perm_spec["users"][self.owner] = owner_permissions
return perm_spec

def get_user_perms(self, user):
Expand Down

0 comments on commit af6f7d9

Please sign in to comment.