diff --git a/app.yml.sample b/app.yml.sample index 85d68ad6..096202d2 100644 --- a/app.yml.sample +++ b/app.yml.sample @@ -120,6 +120,22 @@ ## Maximum number of seconds to sleep between each retry. #amqp_publish_retry_interval_max: 60 + +## configure user authentication/authorization plugins +## parameters depend on auth type. Authentication plugin should return a username +## and authorization plugin should authorize this user +#user_auth: +# authentication: +# - type: oidc +# oidc_jwks_url: https://login.microsoftonline.com/xxx/discovery/v2.0/keys +# oidc_provider: azure +# oidc_username_in_token: preferred_username +# oidc_username_template: *. +# authorization: +# - type: userlist +# userlist_allowed_users: +# - xxx + ## *Experimental*. Enable file caching by specifing a directory here. ## Directory used to store incoming file cache. It works fine for HTTP ## transfer, have not tested with staging by coping. Also there is no diff --git a/pulsar/core.py b/pulsar/core.py index 4f124067..d5426736 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -9,12 +9,15 @@ from galaxy.tool_util.deps import build_dependency_manager from galaxy.util.bunch import Bunch + from pulsar import messaging from pulsar.cache import Cache from pulsar.manager_factory import build_managers from pulsar.tools import ToolBox from pulsar.tools.authorization import get_authorizer +from pulsar.user_auth.manager import UserAuthManager + log = getLogger(__name__) DEFAULT_PRIVATE_TOKEN = None @@ -41,6 +44,7 @@ def __init__(self, **conf): self.__setup_object_store(conf) self.__setup_dependency_manager(conf) self.__setup_job_metrics(conf) + self.__setup_user_auth_manager(conf) self.__setup_managers(conf) self.__setup_file_cache(conf) self.__setup_bind_to_message_queue(conf) @@ -66,6 +70,9 @@ def __setup_bind_to_message_queue(self, conf): queue_state = messaging.bind_app(self, message_queue_url, conf) self.__queue_state = queue_state + def __setup_user_auth_manager(self, conf): + self.user_auth_manager = UserAuthManager(conf) + def __setup_tool_config(self, conf): """ Setups toolbox object and authorization mechanism based diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 2bbe0e8e..5c89eb66 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -73,6 +73,7 @@ def __init__(self, name, app, **kwds): self.tmp_dir = kwds.get("tmp_dir", None) self.debug = str(kwds.get("debug", False)).lower() == "true" self.authorizer = app.authorizer + self.user_auth_manager = app.user_auth_manager self.__init_system_properties() self.__init_env_vars(**kwds) self.dependency_manager = app.dependency_manager @@ -179,6 +180,8 @@ def _check_execution(self, job_id, tool_id, command_line): log.debug("job_id: {} - Checking authorization of command_line [{}]".format(job_id, command_line)) authorization = self._get_authorization(job_id, tool_id) job_directory = self._job_directory(job_id) + self.user_auth_manager.authorize(job_id, job_directory) + tool_files_dir = job_directory.tool_files_directory() for file in self._list_dir(tool_files_dir): if os.path.isdir(join(tool_files_dir, file)): diff --git a/pulsar/user_auth/__init__.py b/pulsar/user_auth/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pulsar/user_auth/manager.py b/pulsar/user_auth/manager.py new file mode 100644 index 00000000..08b16f77 --- /dev/null +++ b/pulsar/user_auth/manager.py @@ -0,0 +1,66 @@ +from abc import ABC + +import inspect + + +class UserAuthManager(ABC): + """ + Authorization/Authentication manager. + """ + + def __init__(self, config): + self._authorization_methods = [] + self._authentication_methods = [] + + try: + user_auth = config.get("user_auth", None) + if not user_auth: + return + authentications = user_auth.pop("authentication", []) + authorizations = user_auth.pop("authorization", []) + + for authorization in authorizations: + authorization.update(user_auth) + obj = get_object("pulsar.user_auth.methods." + authorization["type"], "auth_type", + authorization["type"]) + self._authorization_methods.append(obj(authorization)) + + for authentication in authentications: + authentication.update(user_auth) + obj = get_object("pulsar.user_auth.methods." + authentication["type"], "auth_type", + authentication["type"]) + self._authentication_methods.append(obj(authentication)) + except Exception as e: + raise Exception("cannot read auth configuration") from e + + def authorize(self, job_id, job_directory): + authentication_info = self.__authenticate(job_id, job_directory) + + if len(self._authorization_methods) == 0: + return True + for method in self._authorization_methods: + res = method.authorize(authentication_info) + if res: + return True + + raise Exception("Could not authorize job execution on remote resource") + + def __authenticate(self, job_id, job_directory): + if len(self._authentication_methods) == 0: + return {} + for method in self._authentication_methods: + res = method.authenticate(job_directory) + if res: + return res + + raise Exception("Could not authenticate job %s" % job_id) + + +def get_object(module_name, attribute_name, attribute_value): + module = __import__(module_name) + for comp in module_name.split(".")[1:]: + module = getattr(module, comp) + for _, obj in inspect.getmembers(module): + if inspect.isclass(obj) and hasattr(obj, attribute_name) and getattr(obj, attribute_name) == attribute_value: + return obj + raise Exception("Cannot find object %s with attribute %s=%s " % (module_name, attribute_name, attribute_value)) diff --git a/pulsar/user_auth/methods/__init__.py b/pulsar/user_auth/methods/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pulsar/user_auth/methods/allow_all.py b/pulsar/user_auth/methods/allow_all.py new file mode 100644 index 00000000..c51c521f --- /dev/null +++ b/pulsar/user_auth/methods/allow_all.py @@ -0,0 +1,18 @@ +from pulsar.user_auth.methods.interface import AuthMethod + + +class AlwaysAllowAuthMethod(AuthMethod): + """ + Always allow + """ + + def __init__(self, _config): + pass + + auth_type = "allow_all" + + def authorize(self, authentication_info): + return True + + def authenticate(self, job_directory): + return {"username": "anonymous"} diff --git a/pulsar/user_auth/methods/interface.py b/pulsar/user_auth/methods/interface.py new file mode 100644 index 00000000..983ca9ca --- /dev/null +++ b/pulsar/user_auth/methods/interface.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod + + +class AuthMethod(ABC): + """ + Defines the interface to various authentication/authorization methods. + """ + + @abstractmethod + def authorize(self, authentication_info): + raise NotImplementedError("a concrete class should implement this") + + @abstractmethod + def authenticate(self, job_directory): + raise NotImplementedError("a concrete class should implement this") diff --git a/pulsar/user_auth/methods/oidc.py b/pulsar/user_auth/methods/oidc.py new file mode 100644 index 00000000..5a14957d --- /dev/null +++ b/pulsar/user_auth/methods/oidc.py @@ -0,0 +1,74 @@ +import requests +import base64 +import json +import jwt +import re +from cryptography.hazmat.backends import default_backend +from cryptography.x509 import load_der_x509_certificate + +from pulsar.user_auth.methods.interface import AuthMethod + +import logging + +log = logging.getLogger(__name__) + + +def get_token(job_directory, provider): + log.debug("Getting OIDC token for provider " + provider + " from Galaxy") + endpoint = job_directory.load_metadata("launch_config")["token_endpoint"] + endpoint = endpoint + "&provider=" + provider + r = requests.get(url=endpoint) + return r.text + + +class OIDCAuth(AuthMethod): + """ + Authorization based on OIDC tokens + """ + auth_type = "oidc" + + def __init__(self, config): + try: + self._provider = config["oidc_provider"] + self._jwks_url = config["oidc_jwks_url"] + self._username_in_token = config["oidc_username_in_token"] + self._username_template = config["oidc_username_template"] + + except Exception as e: + raise Exception("cannot read OIDCAuth configuration") from e + + def _verify_token(self, token): + try: + # Obtain appropriate cert from JWK URI + key_set = requests.get(self._jwks_url, timeout=5) + encoded_header, rest = token.split('.', 1) + headerobj = json.loads(base64.b64decode(encoded_header + '==').decode('utf8')) + key_id = headerobj['kid'] + for key in key_set.json()['keys']: + if key['kid'] == key_id: + x5c = key['x5c'][0] + break + else: + raise jwt.DecodeError('Cannot find kid ' + key_id) + cert = load_der_x509_certificate(base64.b64decode(x5c), default_backend()) + # Decode token (exp date is checked automatically) + decoded_token = jwt.decode( + token, + key=cert.public_key(), + algorithms=['RS256'], + options={'exp': True, 'verify_aud': False} + ) + return decoded_token + except Exception as error: + raise Exception("Error verifying jwt token") from error + + def authorize(self, authentication_info): + raise NotImplementedError("authorization not implemented for this class") + + def authenticate(self, job_directory): + token = get_token(job_directory, self._provider) + + decoded_token = self._verify_token(token) + user = decoded_token[self._username_in_token] + user = re.match(self._username_template, user).group(0) + return {"username": user} diff --git a/pulsar/user_auth/methods/userlist.py b/pulsar/user_auth/methods/userlist.py new file mode 100644 index 00000000..30f3c449 --- /dev/null +++ b/pulsar/user_auth/methods/userlist.py @@ -0,0 +1,21 @@ +from pulsar.user_auth.methods.interface import AuthMethod + + +class UserListAuth(AuthMethod): + """ + Defines authorization user by username + """ + + def __init__(self, config): + try: + self._allowed_users = config["userlist_allowed_users"] + except Exception as e: + raise Exception("cannot read UsernameAuth configuration") from e + + auth_type = "userlist" + + def authorize(self, authentication_info): + return authentication_info["username"] in self._allowed_users + + def authenticate(self, job_directory): + raise NotImplementedError("authentication not implemented for this class") diff --git a/requirements.txt b/requirements.txt index de5a11d4..ea638357 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ galaxy-util paramiko typing-extensions pydantic-tes>=0.1.5 +pyjwt ## Uncomment if using DRMAA queue manager. #drmaa diff --git a/test/manager_test.py b/test/manager_test.py index 147456c9..09491b95 100644 --- a/test/manager_test.py +++ b/test/manager_test.py @@ -2,7 +2,7 @@ from os.path import join -from .test_utils import BaseManagerTestCase +from .test_utils import BaseManagerTestCase, get_failing_user_auth_manager class ManagerTest(BaseManagerTestCase): @@ -34,6 +34,12 @@ def test_unauthorized_command_line(self): with self.assertRaises(Exception): self.manager.launch(job_id, 'python') + def test_unauthorized_user(self): + self.manager.user_auth_manager = get_failing_user_auth_manager() + job_id = self.manager.setup_job("123", "tool1", "1.0.0") + with self.assertRaises(Exception): + self.manager.launch(job_id, 'python') + def test_id_assigners(self): self._set_manager(assign_ids="galaxy") job_id = self.manager.setup_job("123", "tool1", "1.0.0") diff --git a/test/persistence_test.py b/test/persistence_test.py index 99367a4f..3c90868f 100644 --- a/test/persistence_test.py +++ b/test/persistence_test.py @@ -7,7 +7,7 @@ from pulsar.tools.authorization import get_authorizer from .test_utils import ( temp_directory, - TestDependencyManager + TestDependencyManager, get_test_user_auth_manager ) from galaxy.job_metrics import NULL_JOB_INSTRUMENTER from galaxy.util.bunch import Bunch @@ -125,6 +125,7 @@ def _app(): staging_directory=staging_directory, persistence_directory=staging_directory, authorizer=get_authorizer(None), + user_auth_manager=get_test_user_auth_manager(), dependency_manager=TestDependencyManager(), job_metrics=Bunch(default_job_instrumenter=NULL_JOB_INSTRUMENTER), object_store=None, diff --git a/test/test_utils.py b/test/test_utils.py index f8f88fb8..2e99932f 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -32,6 +32,7 @@ from pulsar.managers.util import drmaa from pulsar.tools import ToolBox from pulsar.managers.base import JobDirectory +from pulsar.user_auth.manager import UserAuthManager from unittest import TestCase, skip @@ -171,6 +172,7 @@ def setUp(self): self.app = minimal_app_for_managers() self.staging_directory = self.app.staging_directory self.authorizer = self.app.authorizer + self.user_auth_manager = self.app.user_auth_manager def tearDown(self): rmtree(self.staging_directory) @@ -225,13 +227,26 @@ def minimal_app_for_managers(): staging_directory = temp_directory_persist(prefix='minimal_app_') rmtree(staging_directory) authorizer = TestAuthorizer() + user_auth_manager = get_test_user_auth_manager() return Bunch(staging_directory=staging_directory, authorizer=authorizer, job_metrics=NullJobMetrics(), dependency_manager=TestDependencyManager(), + user_auth_manager=user_auth_manager, object_store=object()) +def get_test_user_auth_manager(): + config = {"user_auth": {"authentication": [{"type": "allow_all"}], "authorization": [{"type": "allow_all"}]}} + return UserAuthManager(config) + + +def get_failing_user_auth_manager(): + config = {"user_auth": {"authentication": [{"type": "allow_all"}], + "authorization": [{"type": "userlist", "userlist_allowed_users": []}]}} + return UserAuthManager(config) + + class NullJobMetrics: def __init__(self): @@ -282,11 +297,11 @@ def __init__(self, global_conf={}, app_conf={}, test_conf={}, web=True): @contextmanager def new_app(self): with test_pulsar_app( - self.global_conf, - self.app_conf, - self.test_conf, - staging_directory=self.staging_directory, - web=self.web, + self.global_conf, + self.app_conf, + self.test_conf, + staging_directory=self.staging_directory, + web=self.web, ) as app: yield app @@ -309,11 +324,11 @@ def restartable_pulsar_app_provider(**kwds): @nottest @contextmanager def test_pulsar_app( - global_conf={}, - app_conf={}, - test_conf={}, - staging_directory=None, - web=True, + global_conf={}, + app_conf={}, + test_conf={}, + staging_directory=None, + web=True, ): clean_staging_directory = False if staging_directory is None: @@ -420,7 +435,6 @@ def skip_without_drmaa(f): def _which(program): - def is_exe(fpath): return isfile(fpath) and access(fpath, X_OK) @@ -487,7 +501,6 @@ def dump_other_threads(): # Extracted from: https://github.com/python/cpython/blob/ # 937ee9e745d7ff3c2010b927903c0e2a83623324/Lib/test/support/__init__.py class EnvironmentVarGuard: - """Class to help protect the environment variable properly. Can be used as a context manager.""" diff --git a/test/user_authorization_test.py b/test/user_authorization_test.py new file mode 100644 index 00000000..3696a243 --- /dev/null +++ b/test/user_authorization_test.py @@ -0,0 +1,20 @@ +from unittest import TestCase + +from test.test_utils import get_test_user_auth_manager, get_failing_user_auth_manager + + +class UserAuthorizationTestCase(TestCase): + + def setUp(self): + self.authorizer = get_test_user_auth_manager() + self.failing_authorizer = get_failing_user_auth_manager() + + def test_passes(self): + self.authorizer.authorize("123", None) + + def test_fails(self): + with self.unauthorized_expectation(): + self.failing_authorizer.authorize("123", None) + + def unauthorized_expectation(self): + return self.assertRaises(Exception)