Skip to content

Commit

Permalink
[Fixes #10302] Incorrect permissions assigned on cloning a resource (#…
Browse files Browse the repository at this point in the history
…10309) (#10328)

* [Fixes #10302] Incorrect permissions assigned on cloning a resource

* [Fixes #10302] Incorrect permissions assigned on cloning a resource

* [Fixes #10302] Incorrect permissions assigned on cloning a resource

* [Fixes #10302] test fix broken tests

* [Fixes #10302] fix flake8 formatting

* [Fixes #10302] test fix broken tests

* [Fixes #10302] fix flake8

* - Fix test case

* [Fixes #10302] fix flake8

Co-authored-by: Alessio Fabiani <alessio.fabiani@geosolutionsgroup.com>

Co-authored-by: mattiagiupponi <51856725+mattiagiupponi@users.noreply.github.com>
Co-authored-by: Alessio Fabiani <alessio.fabiani@geosolutionsgroup.com>
  • Loading branch information
3 people committed Nov 22, 2022
1 parent 1f25962 commit c527b3e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
47 changes: 47 additions & 0 deletions geonode/base/api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import sys
import json
import logging
from django.test import override_settings
import gisdata

from PIL import Image
Expand Down Expand Up @@ -2420,6 +2421,52 @@ def test_resource_service_copy_with_perms_dataset(self):
)
self._assertCloningWithPerms(resource)

@patch.dict(os.environ, {"ASYNC_SIGNALS": "False"})
@override_settings(ASYNC_SIGNALS=False)
def test_resource_service_copy_with_perms_dataset_set_default_perms(self):
with self.settings(
ASYNC_SIGNALS=False
):
files = os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_water.shp")
files_as_dict, _ = get_files(files)
resource = Dataset.objects.create(
owner=get_user_model().objects.get(username='admin'),
name='test_copy_with_perms',
store='geonode_data',
subtype="vector",
alternate="geonode:test_copy_with_perms",
resource_type="dataset",
uuid=str(uuid4()),
files=list(files_as_dict.values())
)
_perms = {
'users': {
"bobby": ['base.add_resourcebase', 'base.download_resourcebase']
},
"groups": {
"anonymous": ["base.view_resourcebase", "base.download_resourcebae"]
}
}
resource.set_permissions(_perms)
# checking that bobby is in the original dataset perms list
self.assertTrue('bobby' in 'bobby' in [x.username for x in resource.get_all_level_info().get("users", [])])
# copying the resource, should remove the perms for bobby
# only the default perms should be available
copy_url = reverse('base-resources-resource-service-copy', kwargs={'pk': resource.pk})

self.assertTrue(self.client.login(username="admin", password="admin"))

response = self.client.put(copy_url)
self.assertEqual(response.status_code, 200)

resouce_service_dispatcher.apply((response.json().get("execution_id"),))

self.assertEqual('finished', self.client.get(response.json().get("status_url")).json().get("status"))
_resource = Dataset.objects.filter(title__icontains="test_copy_with_perms").last()
self.assertIsNotNone(_resource)
self.assertFalse('bobby' in 'bobby' in [x.username for x in _resource.get_all_level_info().get("users", [])])
self.assertTrue('admin' in 'admin' in [x.username for x in _resource.get_all_level_info().get("users", [])])

def test_resource_service_copy_with_perms_doc(self):
files = os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_water.shp")
files_as_dict, _ = get_files(files)
Expand Down
13 changes: 1 addition & 12 deletions geonode/resource/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,9 +460,8 @@ def copy(self, instance: ResourceBase, /, uuid: str = None, owner: settings.AUTH
try:
instance.set_processing_state(enumerations.STATE_RUNNING)
with transaction.atomic():
_owner = owner or instance.get_real_instance().owner
_perms = copy.copy(instance.get_real_instance().get_all_level_info())
_resource = copy.copy(instance.get_real_instance())
_resource.owner = owner or instance.get_real_instance().owner
_resource.pk = _resource.id = None
_resource.uuid = uuid or str(uuid4())
try:
Expand Down Expand Up @@ -509,16 +508,6 @@ def copy(self, instance: ResourceBase, /, uuid: str = None, owner: settings.AUTH
if _resource:
try:
to_update.update(defaults)
if 'user' in to_update:
to_update.pop('user')
# We need to remove any public access to the cloned dataset here
if 'users' in _perms and ("AnonymousUser" in _perms['users'] or get_anonymous_user() in _perms['users']):
anonymous_user = "AnonymousUser" if "AnonymousUser" in _perms['users'] else get_anonymous_user()
_perms['users'].pop(anonymous_user)
if 'groups' in _perms and ("anonymous" in _perms['groups'] or Group.objects.get(name='anonymous') in _perms['groups']):
anonymous_group = 'anonymous' if 'anonymous' in _perms['groups'] else Group.objects.get(name='anonymous')
_perms['groups'].pop(anonymous_group)
self.set_permissions(_resource.uuid, instance=_resource, owner=_owner, permissions=_perms)
# Refresh from DB
_resource.refresh_from_db()
return self.update(_resource.uuid, _resource, vals=to_update)
Expand Down

0 comments on commit c527b3e

Please sign in to comment.