diff --git a/src/geoserver/catalog.py b/src/geoserver/catalog.py index be34983..5c820a0 100644 --- a/src/geoserver/catalog.py +++ b/src/geoserver/catalog.py @@ -227,7 +227,6 @@ def delete(self, config_object, purge=None, recurse=False): "Content-type": "application/xml", "Accept": "application/xml" } - resp = self.http_request(rest_url, method='delete', headers=headers) if resp.status_code != 200: raise FailedRequestError(f'Failed to make DELETE request: {resp.status_code}, {resp.text}') @@ -1332,11 +1331,42 @@ def get_services(self, ogc_type="wms"): data = self.get_xml(f"{self.service_url}/services/{ogc_type}/workspaces/{ws.name}/settings") services.append(service_from_index(self, data)) except FailedRequestError as e: - logger.debug(f"Not found {ogc_type} service for workspace {ws.name}" - ) - + logger.debug(f"Not found {ogc_type} service for workspace {ws.name}") return services + def create_user(self, username, password): + + users = self.get_users(names=username) + if len(users) > 0: + logging.warning(f"User {username} already exists") + tmp_cat = Catalog(service_url=self.service_url, username=username, password=password) + try: + tmp_cat.get_version() + except FailedRequestError as e: + logger.error("And we probably have incorrect password") + raise FailedRequestError + + return users[0] + + xml = ( + "" + "{username}" + "{password}" + "true" + "" + ).format(username=username, password=password) + + headers = {"Content-Type": "application/xml"} + users_url = f"{self.service_url}/security/usergroup/users/" + + resp = self.http_request(users_url, method='post', data=xml, headers=headers) + if resp.status_code not in (200, 201, 202): + raise FailedRequestError(f'Failed to create user {username} : {resp.status_code}, {resp.text}') + + self._cache.pop(f"{self.service_url}/security/usergroup/users/", None) + users = self.get_users(names=username) + return users[0] if users else None + def get_users(self, names=None): ''' Returns a list of users in the catalog. @@ -1354,8 +1384,7 @@ def get_users(self, names=None): users.extend([user_from_index(self, node) for node in data.findall("user")]) if users and names: - return ([ws for ws in users if ws.name in names]) - + return ([ws for ws in users if ws.user_name in names]) return users def get_master_pwd(self): @@ -1411,3 +1440,33 @@ def set_my_pwd(self, new_pwd): def get_global_settings(self): return GlobalSettings(self) + + def get_roles(self): + url = f"{self.service_url}/security/roles" + resp = self.get_xml(rest_url=url) + roles = [x.text for x in resp.findall("role")] + return roles + + def get_roles_user(self, username): + url = f"{self.service_url}/security/roles/user/{username}" + resp = self.get_xml(rest_url=url) + roles = [x.text for x in resp.findall("role")] + return roles + + def add_role_user(self, rolename, username): + url = f"{self.service_url}/security/roles/role/{rolename}/user/{username}" + resp = self.http_request(url, method="post") + + if resp.status_code != 200: + raise FailedRequestError(resp.content) + + self._cache.clear() + + def del_role_user(self, rolename, username): + url = f"{self.service_url}/security/roles/role/{rolename}/user/{username}" + resp = self.http_request(url, method="delete") + + if resp.status_code != 200: + raise FailedRequestError(resp.content) + + self._cache.clear() diff --git a/src/geoserver/settings.py b/src/geoserver/settings.py index 7c1b22d..1234876 100644 --- a/src/geoserver/settings.py +++ b/src/geoserver/settings.py @@ -112,7 +112,8 @@ def __init__(self, dom): "pngAcceleration": write_bool("pngAcceleration"), "jpegAcceleration": write_bool("jpegAcceleration"), "allowNativeMosaic": write_bool("allowNativeMosaic"), - "allowNativeWarp": write_bool("allowNativeWarp")} + "allowNativeWarp": write_bool("allowNativeWarp") + } class CoverageAccess(StaticResourceInfo): diff --git a/src/geoserver/support.py b/src/geoserver/support.py index 928cc62..7d5cfe2 100644 --- a/src/geoserver/support.py +++ b/src/geoserver/support.py @@ -274,15 +274,19 @@ def serialize(self, builder): self.dirty['advertised'] = self.advertised for k, writer in self.writers.items(): - - attr = getattr(self, k) - - if issubclass(type(attr), StaticResourceInfo) and attr.dirty: - attr.serialize_all(builder) - else: - if k in self.dirty or self.write_all: - val = self.dirty[k] if self.dirty.get(k) else attr - writer(builder, val) + if hasattr(self, k) and issubclass(type(getattr(self, k)), StaticResourceInfo): + attr = getattr(self, k) + if attr.dirty: + attr.serialize_all(builder) + elif k in self.dirty: + val = self.dirty[k] + writer(builder, val) + elif self.write_all: + attr = None + elif self.write_all and hasattr(self, k): + attr = getattr(self, k) + val = self.dirty[k] if self.dirty.get(k) else attr + writer(builder, val) def serialize_all(self, builder): builder.start(self.resource_type, dict()) diff --git a/test/securitytests.py b/test/securitytests.py index e77d282..fd67e0f 100644 --- a/test/securitytests.py +++ b/test/securitytests.py @@ -7,7 +7,8 @@ import subprocess import re import time -from geoserver.catalog import Catalog +from geoserver.catalog import Catalog, FailedRequestError +from geoserver.security import User if GSPARAMS['GEOSERVER_HOME']: dest = GSPARAMS['DATA_DIR'] @@ -74,6 +75,7 @@ class SecurityTests(unittest.TestCase): + def setUp(self): self.cat = Catalog(GSPARAMS['GSURL'], username=GSPARAMS['GSUSER'], password=GSPARAMS['GSPASSWORD']) self.bkp_cat = Catalog(GSPARAMS['GSURL'], username=GSPARAMS['GSUSER'], password=GSPARAMS['GSPASSWORD']) @@ -84,6 +86,12 @@ def setUp(self): def tearDown(self) -> None: self.bkp_cat.set_master_pwd(self.bkp_masterpwd) self.bkp_cat.set_my_pwd(self.bkp_my_pwd) + usrs = [x for x in self.cat.get_users(names="test_usr12")] + if len(usrs) > 0: + try: + self.cat.delete(usrs[0]) + except FailedRequestError as e: + print(f"User DELETE endpoint not available! {e}") def test_get_users(self): users = self.cat.get_users() @@ -107,3 +115,71 @@ def test_set_my_pwd(self): self.assertIsNotNone(new_pwd) self.assertEqual(new_pwd, test_pwd) self.assertEqual(self.cat.password, test_pwd) + + def test_create_user(self): + test_user = User(user_name='test_usr12', catalog=self.cat) + test_pass = 'test_pas12' + users = self.cat.get_users(names=test_user.user_name) + if len(users) > 0: + try: + self.cat.delete(test_user) + except FailedRequestError as e: + print(f"User DELETE endpoint not available! {e}") + users = self.cat.get_users(names=test_user.user_name) + self.assertEqual(len(users), 0, msg="Test user exists and I cant delete it") + self.cat.create_user(username=test_user.user_name, password=test_pass) + + users = self.cat.get_users(names=test_user.user_name) + self.assertEqual(len(users), 1, msg="Test user was not created") + + tmp_cat = Catalog(service_url=self.cat.service_url, username=test_user.user_name, password=test_pass) + try: + tmp_cat.get_users() + except Exception as e: + print(f"Some problem with new user, exc: {e}") + + def test_create_existing_user(self): + test_user = User(user_name='test_usr12', catalog=self.cat) + test_pass = 'test_pas12' + self.cat.create_user(username=test_user.user_name, password=test_pass) + users = self.cat.get_users(names=test_user.user_name) + self.assertEqual(len(users), 1, msg="User creation before test failed") + self.cat.create_user(username=test_user.user_name, password=test_pass) + + +class RolesTests(unittest.TestCase): + + def setUp(self) -> None: + self.cat = Catalog(GSPARAMS['GSURL'], username=GSPARAMS['GSUSER'], password=GSPARAMS['GSPASSWORD']) + self.test_usr = "test_usr_role" + self.test_pwd = "test_usr_role" + + def tearDown(self) -> None: + usr = User(catalog=self.cat, user_name=self.test_usr) + if usr: + try: + self.cat.delete(usr) + except FailedRequestError as e: + print(f"User DELETE endpoint not available! {e}") + + def test_get_all_roles(self): + roles = self.cat.get_roles() + self.assertGreater(len(roles), 0) + self.assertIn("ADMIN", roles) + + def test_get_roles_user(self): + roles = self.cat.get_roles_user(username="admin") + self.assertGreater(len(roles), 0) + self.assertIn("ADMIN", roles) + + def test_add_del_role_user(self): + self.cat.create_user(username=self.test_usr, password=self.test_pwd) + self.cat.add_role_user(rolename="ADMIN", username=self.test_usr) + usr_roles = self.cat.get_roles_user(username=self.test_usr) + self.assertIn("ADMIN", usr_roles) + tmp_cat = Catalog(service_url=self.cat.service_url, username=self.test_usr, password=self.test_pwd) + # If role added user should get response for this + tmp_cat.get_users() + self.cat.del_role_user(rolename="ADMIN", username=self.test_usr) + usr_roles = self.cat.get_roles_user(username=self.test_usr) + self.assertNotIn("ADMIN", usr_roles)