Skip to content

Commit

Permalink
Added roles management for user (#27)
Browse files Browse the repository at this point in the history
* Services before refactor

* Added services settings

* Disabled create_service function

* Part of changes - faced problem with geoserver - potential issue

* Added setting self and master passwords

* Settings simple implementation

* Settings Subclass implementation - seems working needs tests

* Settings full implementation

* Added create user method

* Fixed href to delete users

* Minor fixes in users

* Added roles management for user

* Added roles management for user

* Fixed serialize

* Fixed accept rewrite

* Cleaned data printing on save

* - Test fixes

* - Test fixes

* - Test fixes

Co-authored-by: Alessio Fabiani <alessio.fabiani@geo-solutions.it>
  • Loading branch information
jendrusk and Alessio Fabiani committed Feb 21, 2022
1 parent 301be5d commit 18fdf31
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 17 deletions.
71 changes: 65 additions & 6 deletions src/geoserver/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ def delete(self, config_object, purge=None, recurse=False):
"Content-type": "application/xml",
"Accept": "application/xml"
}

resp = self.http_request(rest_url, method='delete', headers=headers)
if resp.status_code != 200:
raise FailedRequestError(f'Failed to make DELETE request: {resp.status_code}, {resp.text}')
Expand Down Expand Up @@ -1332,11 +1331,42 @@ def get_services(self, ogc_type="wms"):
data = self.get_xml(f"{self.service_url}/services/{ogc_type}/workspaces/{ws.name}/settings")
services.append(service_from_index(self, data))
except FailedRequestError as e:
logger.debug(f"Not found {ogc_type} service for workspace {ws.name}"
)

logger.debug(f"Not found {ogc_type} service for workspace {ws.name}")
return services

def create_user(self, username, password):

users = self.get_users(names=username)
if len(users) > 0:
logging.warning(f"User {username} already exists")
tmp_cat = Catalog(service_url=self.service_url, username=username, password=password)
try:
tmp_cat.get_version()
except FailedRequestError as e:
logger.error("And we probably have incorrect password")
raise FailedRequestError

return users[0]

xml = (
"<user>"
"<userName>{username}</userName>"
"<password>{password}</password>"
"<enabled>true</enabled>"
"</user>"
).format(username=username, password=password)

headers = {"Content-Type": "application/xml"}
users_url = f"{self.service_url}/security/usergroup/users/"

resp = self.http_request(users_url, method='post', data=xml, headers=headers)
if resp.status_code not in (200, 201, 202):
raise FailedRequestError(f'Failed to create user {username} : {resp.status_code}, {resp.text}')

self._cache.pop(f"{self.service_url}/security/usergroup/users/", None)
users = self.get_users(names=username)
return users[0] if users else None

def get_users(self, names=None):
'''
Returns a list of users in the catalog.
Expand All @@ -1354,8 +1384,7 @@ def get_users(self, names=None):
users.extend([user_from_index(self, node) for node in data.findall("user")])

if users and names:
return ([ws for ws in users if ws.name in names])

return ([ws for ws in users if ws.user_name in names])
return users

def get_master_pwd(self):
Expand Down Expand Up @@ -1411,3 +1440,33 @@ def set_my_pwd(self, new_pwd):

def get_global_settings(self):
return GlobalSettings(self)

def get_roles(self):
url = f"{self.service_url}/security/roles"
resp = self.get_xml(rest_url=url)
roles = [x.text for x in resp.findall("role")]
return roles

def get_roles_user(self, username):
url = f"{self.service_url}/security/roles/user/{username}"
resp = self.get_xml(rest_url=url)
roles = [x.text for x in resp.findall("role")]
return roles

def add_role_user(self, rolename, username):
url = f"{self.service_url}/security/roles/role/{rolename}/user/{username}"
resp = self.http_request(url, method="post")

if resp.status_code != 200:
raise FailedRequestError(resp.content)

self._cache.clear()

def del_role_user(self, rolename, username):
url = f"{self.service_url}/security/roles/role/{rolename}/user/{username}"
resp = self.http_request(url, method="delete")

if resp.status_code != 200:
raise FailedRequestError(resp.content)

self._cache.clear()
3 changes: 2 additions & 1 deletion src/geoserver/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def __init__(self, dom):
"pngAcceleration": write_bool("pngAcceleration"),
"jpegAcceleration": write_bool("jpegAcceleration"),
"allowNativeMosaic": write_bool("allowNativeMosaic"),
"allowNativeWarp": write_bool("allowNativeWarp")}
"allowNativeWarp": write_bool("allowNativeWarp")
}


class CoverageAccess(StaticResourceInfo):
Expand Down
22 changes: 13 additions & 9 deletions src/geoserver/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,19 @@ def serialize(self, builder):
self.dirty['advertised'] = self.advertised

for k, writer in self.writers.items():

attr = getattr(self, k)

if issubclass(type(attr), StaticResourceInfo) and attr.dirty:
attr.serialize_all(builder)
else:
if k in self.dirty or self.write_all:
val = self.dirty[k] if self.dirty.get(k) else attr
writer(builder, val)
if hasattr(self, k) and issubclass(type(getattr(self, k)), StaticResourceInfo):
attr = getattr(self, k)
if attr.dirty:
attr.serialize_all(builder)
elif k in self.dirty:
val = self.dirty[k]
writer(builder, val)
elif self.write_all:
attr = None
elif self.write_all and hasattr(self, k):
attr = getattr(self, k)
val = self.dirty[k] if self.dirty.get(k) else attr
writer(builder, val)

def serialize_all(self, builder):
builder.start(self.resource_type, dict())
Expand Down
78 changes: 77 additions & 1 deletion test/securitytests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import subprocess
import re
import time
from geoserver.catalog import Catalog
from geoserver.catalog import Catalog, FailedRequestError
from geoserver.security import User

if GSPARAMS['GEOSERVER_HOME']:
dest = GSPARAMS['DATA_DIR']
Expand Down Expand Up @@ -74,6 +75,7 @@


class SecurityTests(unittest.TestCase):

def setUp(self):
self.cat = Catalog(GSPARAMS['GSURL'], username=GSPARAMS['GSUSER'], password=GSPARAMS['GSPASSWORD'])
self.bkp_cat = Catalog(GSPARAMS['GSURL'], username=GSPARAMS['GSUSER'], password=GSPARAMS['GSPASSWORD'])
Expand All @@ -84,6 +86,12 @@ def setUp(self):
def tearDown(self) -> None:
self.bkp_cat.set_master_pwd(self.bkp_masterpwd)
self.bkp_cat.set_my_pwd(self.bkp_my_pwd)
usrs = [x for x in self.cat.get_users(names="test_usr12")]
if len(usrs) > 0:
try:
self.cat.delete(usrs[0])
except FailedRequestError as e:
print(f"User DELETE endpoint not available! {e}")

def test_get_users(self):
users = self.cat.get_users()
Expand All @@ -107,3 +115,71 @@ def test_set_my_pwd(self):
self.assertIsNotNone(new_pwd)
self.assertEqual(new_pwd, test_pwd)
self.assertEqual(self.cat.password, test_pwd)

def test_create_user(self):
test_user = User(user_name='test_usr12', catalog=self.cat)
test_pass = 'test_pas12'
users = self.cat.get_users(names=test_user.user_name)
if len(users) > 0:
try:
self.cat.delete(test_user)
except FailedRequestError as e:
print(f"User DELETE endpoint not available! {e}")
users = self.cat.get_users(names=test_user.user_name)
self.assertEqual(len(users), 0, msg="Test user exists and I cant delete it")
self.cat.create_user(username=test_user.user_name, password=test_pass)

users = self.cat.get_users(names=test_user.user_name)
self.assertEqual(len(users), 1, msg="Test user was not created")

tmp_cat = Catalog(service_url=self.cat.service_url, username=test_user.user_name, password=test_pass)
try:
tmp_cat.get_users()
except Exception as e:
print(f"Some problem with new user, exc: {e}")

def test_create_existing_user(self):
test_user = User(user_name='test_usr12', catalog=self.cat)
test_pass = 'test_pas12'
self.cat.create_user(username=test_user.user_name, password=test_pass)
users = self.cat.get_users(names=test_user.user_name)
self.assertEqual(len(users), 1, msg="User creation before test failed")
self.cat.create_user(username=test_user.user_name, password=test_pass)


class RolesTests(unittest.TestCase):

def setUp(self) -> None:
self.cat = Catalog(GSPARAMS['GSURL'], username=GSPARAMS['GSUSER'], password=GSPARAMS['GSPASSWORD'])
self.test_usr = "test_usr_role"
self.test_pwd = "test_usr_role"

def tearDown(self) -> None:
usr = User(catalog=self.cat, user_name=self.test_usr)
if usr:
try:
self.cat.delete(usr)
except FailedRequestError as e:
print(f"User DELETE endpoint not available! {e}")

def test_get_all_roles(self):
roles = self.cat.get_roles()
self.assertGreater(len(roles), 0)
self.assertIn("ADMIN", roles)

def test_get_roles_user(self):
roles = self.cat.get_roles_user(username="admin")
self.assertGreater(len(roles), 0)
self.assertIn("ADMIN", roles)

def test_add_del_role_user(self):
self.cat.create_user(username=self.test_usr, password=self.test_pwd)
self.cat.add_role_user(rolename="ADMIN", username=self.test_usr)
usr_roles = self.cat.get_roles_user(username=self.test_usr)
self.assertIn("ADMIN", usr_roles)
tmp_cat = Catalog(service_url=self.cat.service_url, username=self.test_usr, password=self.test_pwd)
# If role added user should get response for this
tmp_cat.get_users()
self.cat.del_role_user(rolename="ADMIN", username=self.test_usr)
usr_roles = self.cat.get_roles_user(username=self.test_usr)
self.assertNotIn("ADMIN", usr_roles)

0 comments on commit 18fdf31

Please sign in to comment.