Skip to content

Commit

Permalink
[Fixes #4990] fix set_layers_permissions managament command
Browse files Browse the repository at this point in the history
  • Loading branch information
gioscarda committed Oct 4, 2019
1 parent 578224c commit ad0af3d
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 186 deletions.
187 changes: 4 additions & 183 deletions geonode/layers/management/commands/set_layers_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@

from django.core.management.base import BaseCommand
from argparse import RawTextHelpFormatter
from geonode.layers.models import Layer
from django.db.models import Q
from django.contrib.auth.models import Group
from django.contrib.auth import get_user_model
from geonode.layers.utils import set_layers_permissions


class Command(BaseCommand):
Expand All @@ -43,27 +40,6 @@ class Command(BaseCommand):
To unset permissions use the '--delete (-d)' option.
"""

READ_PERMISSIONS = [
'view_resourcebase'
]

WRITE_PERMISSIONS = [
'change_layer_data',
'change_layer_style',
'change_resourcebase_metadata'
]

DOWNLOAD_PERMISSIONS = [
'download_resourcebase'
]

OWNER_PERMISSIONS = [
'change_resourcebase',
'delete_resourcebase',
'change_resourcebase_permissions',
'publish_resourcebase'
]

def create_parser(self, *args, **kwargs):
parser = super(Command, self).create_parser(*args, **kwargs)
parser.formatter_class = RawTextHelpFormatter
Expand Down Expand Up @@ -127,161 +103,6 @@ def handle(self, *args, **options):
users_usernames = options.get('users')
groups_names = options.get('groups')
delete_flag = options.get('delete_flag')
# Processing information
if not resources_names:
# If resources is None we consider all the existing layer
resources = Layer.objects.all()
else:
try:
resources = Layer.objects.filter(Q(title__in=resources_names) | Q(name__in=resources_names))
except Layer.DoesNotExist:
self.stdout.write(
'Warning - No resources have been found with these names: %s.' % (
", ".join(resources_names)
)
)
if not resources:
self.stdout.write("Err - No resources have been found. No update operations have been executed.")
else:
# PERMISSIONS
if not permissions_name:
self.stdout.write("Err - No permissions have been provided.")
else:
permissions = []
if permissions_name.lower() in ('read', 'r'):
if not delete_flag:
permissions = self.READ_PERMISSIONS
else:
permissions = self.READ_PERMISSIONS + self.WRITE_PERMISSIONS \
+ self.DOWNLOAD_PERMISSIONS + self.OWNER_PERMISSIONS
elif permissions_name.lower() in ('write', 'w'):
if not delete_flag:
permissions = self.READ_PERMISSIONS + self.WRITE_PERMISSIONS
else:
permissions = self.WRITE_PERMISSIONS
elif permissions_name.lower() in ('download', 'd'):
if not delete_flag:
permissions = self.READ_PERMISSIONS + self.DOWNLOAD_PERMISSIONS
else:
permissions = self.DOWNLOAD_PERMISSIONS
elif permissions_name.lower() in ('owner', 'o'):
if not delete_flag:
permissions = self.READ_PERMISSIONS + self.WRITE_PERMISSIONS \
+ self.DOWNLOAD_PERMISSIONS + self.OWNER_PERMISSIONS
else:
permissions = self.OWNER_PERMISSIONS
if not permissions:
self.stdout.write(
"Err - Permission must match one of these values: read (r), write (w), download (d), owner (o)."
)
else:
if not users_usernames and not groups_names:
self.stdout.write(
"Err - At least one user or one group must be provided."
)
else:
# USERS
users = []
if users_usernames:
User = get_user_model()
for username in users_usernames:
try:
user = User.objects.get(username=username)
users.append(user)
except User.DoesNotExist:
self.stdout.write(
'Warning! - The user {} does not exists. '
'It has been skipped.'.format(username)
)
# GROUPS
groups = []
if groups_names:
for group_name in groups_names:
try:
group = Group.objects.get(name=group_name)
groups.append(group)
except Group.DoesNotExist:
self.stdout.write(
'Warning! - The group {} does not exists. '
'It has been skipped.'.format(group_name)
)
if not users and not groups:
self.stdout.write(
'Err - Neither users nor groups corresponding to the typed names have been found. '
'No update operations have been executed.'
)
else:
# RESOURCES
for resource in resources:
# Existing permissions on the resource
perm_spec = resource.get_all_level_info()
self.stdout.write(
"Initial permissions info for the resource %s:\n%s" % (resource.title, str(perm_spec))
)
for u in users:
uname = u.username
# Add permissions
if not delete_flag:
# Check the permission already exists
if uname not in perm_spec["users"]:
perm_spec["users"][uname] = permissions
else:
u_perms_list = perm_spec["users"][uname]
base_set = set(u_perms_list)
target_set = set(permissions)
perm_spec["users"][uname] = list(base_set | target_set)
# Delete permissions
else:
# Skip resource owner
if u != resource.owner:
uname = u.username
if uname in perm_spec["users"]:
u_perms_set = set()
for up in perm_spec["users"][uname]:
if up not in permissions:
u_perms_set.add(up)
perm_spec["users"][uname] = list(u_perms_set)
else:
self.stdout.write(
"Warning! - The user %s does not have "
"any permission on the layer %s. "
"It has been skipped." % (u, resource.title)
)
else:
self.stdout.write(
"Warning! - The user %s is the layer %s owner, "
"so its permissions can't be changed. "
"It has been skipped." % (u, resource.title)
)
for g in groups:
gname = g.name
# Add permissions
if not delete_flag:
# Check the permission already exists
if gname not in perm_spec["groups"]:
perm_spec["groups"][gname] = permissions
else:
g_perms_list = perm_spec["groups"][gname]
base_set = set(g_perms_list)
target_set = set(permissions)
perm_spec["groups"][gname] = list(base_set | target_set)
# Delete permissions
else:
if gname in perm_spec["groups"]:
g_perms_set = set()
for gp in perm_spec["groups"][gname]:
if gp not in permissions:
g_perms_set.add(gp)
perm_spec["groups"][gname] = list(g_perms_set)
else:
self.stdout.write(
"Warning! - The group %s does not have any permission on the layer %s. "
"It has been skipped." % (g, resource.title)
)
# Set final permissions
resource.set_permissions(perm_spec)
self.stdout.write(
"Final permissions info for the resource %s:\n"
"%s" % (resource.title, str(perm_spec))
)
self.stdout.write("Permissions successfully updated!")
set_layers_permissions(
permissions_name, resources_names, users_usernames, groups_names, delete_flag
)
33 changes: 33 additions & 0 deletions geonode/layers/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from geonode.tests.utils import NotificationsTestsHelper
from geonode.layers.populate_layers_data import create_layer_data
from geonode.base.enumerations import CHARSETS
from geonode.layers import utils

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1306,3 +1307,35 @@ def test_geonode_rest_layer_uploader(self):
settings.OGC_SERVER['default']['BACKEND']
)
_l.delete()


class SetLayersPermissions(GeoNodeBaseTestSupport):

type = 'layer'

def setUp(self):
super(SetLayersPermissions, self).setUp()
create_layer_data()
self.username = 'test_username'
self.passwd = 'test_password'
self.user = get_user_model().objects.create(
username=self.username
)

@on_ogc_backend(geoserver.BACKEND_PACKAGE)
def test_assign_remove_permissions(self):
# Assing
layer = Layer.objects.all().first()
perm_spec = layer.get_all_level_info()
self.assertNotIn(self.user, perm_spec["users"])
utils.set_layers_permissions("write", None, [self.username], None, None)
layer_after = Layer.objects.get(name=layer.name)
perm_spec = layer_after.get_all_level_info()
for perm in utils.WRITE_PERMISSIONS:
self.assertIn(perm, perm_spec["users"][self.user])
# Remove
utils.set_layers_permissions("write", None, [self.username], None, True)
layer_after = Layer.objects.get(name=layer.name)
perm_spec = layer_after.get_all_level_info()
for perm in utils.WRITE_PERMISSIONS:
self.assertNotIn(perm, perm_spec["users"][self.user])

0 comments on commit ad0af3d

Please sign in to comment.