Skip to content

Commit

Permalink
add user authentication/authorization methods
Browse files Browse the repository at this point in the history
allows to configure plugins for user authentication/authorization. Includes methods to authenticate
based on OIDC token (see issue galaxyproject/galaxy#15526)
  • Loading branch information
SergeyYakubov committed Apr 10, 2023
1 parent b4fd6ef commit 8d75c68
Show file tree
Hide file tree
Showing 19 changed files with 301 additions and 20 deletions.
16 changes: 16 additions & 0 deletions app.yml.sample
Expand Up @@ -120,6 +120,22 @@
## Maximum number of seconds to sleep between each retry.
#amqp_publish_retry_interval_max: 60


## configure user authentication/authorization plugins
## parameters depend on auth type. Authentication plugin should return a username
## and authorization plugin should authorize this user
#user_auth:
# authentication:
# - type: oidc
# oidc_jwks_url: https://login.microsoftonline.com/xxx/discovery/v2.0/keys
# oidc_provider: azure
# oidc_username_in_token: preferred_username
# oidc_username_template: *.
# authorization:
# - type: userlist
# userlist_allowed_users:
# - xxx

## *Experimental*. Enable file caching by specifing a directory here.
## Directory used to store incoming file cache. It works fine for HTTP
## transfer, have not tested with staging by coping. Also there is no
Expand Down
21 changes: 17 additions & 4 deletions pulsar/client/client.py
Expand Up @@ -100,6 +100,7 @@ def __init__(self, destination_params, job_id):
setattr(self, attr, destination_params.get(attr, None))
self.env = destination_params.get("env", [])
self.files_endpoint = destination_params.get("files_endpoint", None)
self.token_endpoint = destination_params.get("token_endpoint", None)

default_file_action = self.destination_params.get("default_file_action", "transfer")
if default_file_action not in actions:
Expand Down Expand Up @@ -166,7 +167,8 @@ def __init__(self, destination_params, job_id, job_manager_interface):
super().__init__(destination_params, job_id)
self.job_manager_interface = job_manager_interface

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None):
def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
"""
Queue up the execution of the supplied `command_line` on the remote
server. Called launch for historical reasons, should be renamed to
Expand All @@ -190,6 +192,8 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s
if job_config and 'touch_outputs' in job_config:
# message clients pass the entire job config
launch_params['submit_extras'] = json_dumps({'touch_outputs': job_config['touch_outputs']})
if token_endpoint is not None:
launch_params["token_endpoint"] = json_dumps({'token_endpoint': token_endpoint})

if job_config and self.setup_handler.local:
# Setup not yet called, job properties were inferred from
Expand Down Expand Up @@ -344,7 +348,8 @@ def __init__(self, destination_params, job_id, client_manager):
self.client_manager = client_manager
self.amqp_key_prefix = self.destination_params.get("amqp_key_prefix")

def _build_setup_message(self, command_line, dependencies_description, env, remote_staging, job_config, dynamic_file_sources):
def _build_setup_message(self, command_line, dependencies_description, env, remote_staging, job_config,
dynamic_file_sources, token_endpoint):
"""
"""
launch_params = dict(command_line=command_line, job_id=self.job_id)
Expand All @@ -359,6 +364,8 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo
launch_params['remote_staging'] = remote_staging
launch_params['remote_staging']['ssh_key'] = self.ssh_key
launch_params['dynamic_file_sources'] = dynamic_file_sources
launch_params['token_endpoint'] = token_endpoint

if job_config and self.setup_handler.local:
# Setup not yet called, job properties were inferred from
# destination arguments. Hence, must have Pulsar setup job
Expand Down Expand Up @@ -397,7 +404,8 @@ def _build_status_request_message(self):

class MessageJobClient(BaseMessageJobClient):

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None):
def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
"""
"""
launch_params = self._build_setup_message(
Expand All @@ -407,6 +415,7 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s
remote_staging=remote_staging,
job_config=job_config,
dynamic_file_sources=dynamic_file_sources,
token_endpoint=token_endpoint,
)
self.client_manager.exchange.publish("setup", launch_params)
log.info("Job published to setup message queue: %s", self.job_id)
Expand All @@ -429,7 +438,8 @@ def __init__(self, destination_params, job_id, client_manager, shell):
self.remote_pulsar_path = destination_params["remote_pulsar_path"]
self.shell = shell

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, dynamic_file_sources=None):
def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
"""
"""
launch_params = self._build_setup_message(
Expand All @@ -439,6 +449,7 @@ def launch(self, command_line, dependencies_description=None, env=None, remote_s
remote_staging=remote_staging,
job_config=job_config,
dynamic_file_sources=dynamic_file_sources,
token_endpoint=token_endpoint,
)
base64_message = to_base64_json(launch_params)
submit_command = os.path.join(self.remote_pulsar_path, "scripts", "submit.bash")
Expand Down Expand Up @@ -479,6 +490,7 @@ def launch(
job_config=None,
dynamic_file_sources=None,
container_info=None,
token_endpoint=None,
pulsar_app_config=None
) -> Optional[ExternalId]:
"""
Expand All @@ -490,6 +502,7 @@ def launch(
remote_staging=remote_staging,
job_config=job_config,
dynamic_file_sources=dynamic_file_sources,
token_endpoint=token_endpoint,
)
container = None
guest_ports = None
Expand Down
2 changes: 1 addition & 1 deletion pulsar/client/staging/up.py
Expand Up @@ -70,7 +70,7 @@ def submit_job(client, client_job_description, job_config=None):
# potentially duplicated but we don't want to count on remote staging to include this
# it needs to be in the response to Pulsar even Pulsar is inititing staging actions
launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources

launch_kwds["token_endpoint"] = client.token_endpoint
# for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external
# id from the submission process (e.g. to TES).
launch_response = client.launch(**launch_kwds)
Expand Down
7 changes: 7 additions & 0 deletions pulsar/core.py
Expand Up @@ -9,12 +9,15 @@
from galaxy.tool_util.deps import build_dependency_manager
from galaxy.util.bunch import Bunch


from pulsar import messaging
from pulsar.cache import Cache
from pulsar.manager_factory import build_managers
from pulsar.tools import ToolBox
from pulsar.tools.authorization import get_authorizer

from pulsar.user_auth.manager import UserAuthManager

log = getLogger(__name__)

DEFAULT_PRIVATE_TOKEN = None
Expand All @@ -41,6 +44,7 @@ def __init__(self, **conf):
self.__setup_object_store(conf)
self.__setup_dependency_manager(conf)
self.__setup_job_metrics(conf)
self.__setup_user_auth_manager(conf)
self.__setup_managers(conf)
self.__setup_file_cache(conf)
self.__setup_bind_to_message_queue(conf)
Expand All @@ -66,6 +70,9 @@ def __setup_bind_to_message_queue(self, conf):
queue_state = messaging.bind_app(self, message_queue_url, conf)
self.__queue_state = queue_state

def __setup_user_auth_manager(self, conf):
self.user_auth_manager = UserAuthManager(conf)

def __setup_tool_config(self, conf):
"""
Setups toolbox object and authorization mechanism based
Expand Down
3 changes: 3 additions & 0 deletions pulsar/manager_endpoint_util.py
Expand Up @@ -77,6 +77,8 @@ def submit_job(manager, job_config):
submit_params = job_config.get('submit_params', {})
touch_outputs = job_config.get('touch_outputs', [])
dynamic_file_sources = job_config.get("dynamic_file_sources", None)
token_endpoint = job_config.get("token_endpoint", None)

job_config = None
if setup_params or force_setup:
input_job_id = setup_params.get("job_id", job_id)
Expand Down Expand Up @@ -104,6 +106,7 @@ def submit_job(manager, job_config):
"env": env,
"setup_params": setup_params,
"dynamic_file_sources": dynamic_file_sources,
"token_endpoint": token_endpoint,
}
manager.preprocess_and_launch(job_id, launch_config)
except Exception:
Expand Down
3 changes: 3 additions & 0 deletions pulsar/managers/base/__init__.py
Expand Up @@ -73,6 +73,7 @@ def __init__(self, name, app, **kwds):
self.tmp_dir = kwds.get("tmp_dir", None)
self.debug = str(kwds.get("debug", False)).lower() == "true"
self.authorizer = app.authorizer
self.user_auth_manager = app.user_auth_manager
self.__init_system_properties()
self.__init_env_vars(**kwds)
self.dependency_manager = app.dependency_manager
Expand Down Expand Up @@ -179,6 +180,8 @@ def _check_execution(self, job_id, tool_id, command_line):
log.debug("job_id: {} - Checking authorization of command_line [{}]".format(job_id, command_line))
authorization = self._get_authorization(job_id, tool_id)
job_directory = self._job_directory(job_id)
self.user_auth_manager.authorize(job_id, job_directory)

tool_files_dir = job_directory.tool_files_directory()
for file in self._list_dir(tool_files_dir):
if os.path.isdir(join(tool_files_dir, file)):
Expand Down
Empty file added pulsar/user_auth/__init__.py
Empty file.
66 changes: 66 additions & 0 deletions pulsar/user_auth/manager.py
@@ -0,0 +1,66 @@
from abc import ABC

import inspect


class UserAuthManager(ABC):
"""
Authorization/Authentication manager.
"""

def __init__(self, config):
self._authorization_methods = []
self._authentication_methods = []

try:
user_auth = config.get("user_auth", None)
if not user_auth:
return
authentications = user_auth.pop("authentication", [])
authorizations = user_auth.pop("authorization", [])

for authorization in authorizations:
authorization.update(user_auth)
obj = get_object("pulsar.user_auth.methods." + authorization["type"], "auth_type",
authorization["type"])
self._authorization_methods.append(obj(authorization))

for authentication in authentications:
authentication.update(user_auth)
obj = get_object("pulsar.user_auth.methods." + authentication["type"], "auth_type",
authentication["type"])
self._authentication_methods.append(obj(authentication))
except Exception as e:
raise Exception("cannot read auth configuration") from e

def authorize(self, job_id, job_directory):
authentication_info = self.__authenticate(job_id, job_directory)

if len(self._authorization_methods) == 0:
return True
for method in self._authorization_methods:
res = method.authorize(authentication_info)
if res:
return True

raise Exception("Could not authorize job execution on remote resource")

def __authenticate(self, job_id, job_directory):
if len(self._authentication_methods) == 0:
return {}
for method in self._authentication_methods:
res = method.authenticate(job_directory)
if res:
return res

raise Exception("Could not authenticate job %s" % job_id)


def get_object(module_name, attribute_name, attribute_value):
module = __import__(module_name)
for comp in module_name.split(".")[1:]:
module = getattr(module, comp)
for _, obj in inspect.getmembers(module):
if inspect.isclass(obj) and hasattr(obj, attribute_name) and getattr(obj, attribute_name) == attribute_value:
return obj
raise Exception("Cannot find object %s with attribute %s=%s " % (module_name, attribute_name, attribute_value))
Empty file.
18 changes: 18 additions & 0 deletions pulsar/user_auth/methods/allow_all.py
@@ -0,0 +1,18 @@
from pulsar.user_auth.methods.interface import AuthMethod


class AlwaysAllowAuthMethod(AuthMethod):
"""
Always allow
"""

def __init__(self, _config):
pass

auth_type = "allow_all"

def authorize(self, authentication_info):
return True

def authenticate(self, job_directory):
return {"username": "anonymous"}
15 changes: 15 additions & 0 deletions pulsar/user_auth/methods/interface.py
@@ -0,0 +1,15 @@
from abc import ABC, abstractmethod


class AuthMethod(ABC):
"""
Defines the interface to various authentication/authorization methods.
"""

@abstractmethod
def authorize(self, authentication_info):
raise NotImplementedError("a concrete class should implement this")

@abstractmethod
def authenticate(self, job_directory):
raise NotImplementedError("a concrete class should implement this")
74 changes: 74 additions & 0 deletions pulsar/user_auth/methods/oidc.py
@@ -0,0 +1,74 @@
import requests
import base64
import json
import jwt
import re
from cryptography.hazmat.backends import default_backend
from cryptography.x509 import load_der_x509_certificate

from pulsar.user_auth.methods.interface import AuthMethod

import logging

log = logging.getLogger(__name__)


def get_token(job_directory, provider):
log.debug("Getting OIDC token for provider " + provider + " from Galaxy")
endpoint = job_directory.load_metadata("launch_config")["token_endpoint"]
endpoint = endpoint + "&provider=" + provider
r = requests.get(url=endpoint)
return r.text


class OIDCAuth(AuthMethod):
"""
Authorization based on OIDC tokens
"""
auth_type = "oidc"

def __init__(self, config):
try:
self._provider = config["oidc_provider"]
self._jwks_url = config["oidc_jwks_url"]
self._username_in_token = config["oidc_username_in_token"]
self._username_template = config["oidc_username_template"]

except Exception as e:
raise Exception("cannot read OIDCAuth configuration") from e

def _verify_token(self, token):
try:
# Obtain appropriate cert from JWK URI
key_set = requests.get(self._jwks_url, timeout=5)
encoded_header, rest = token.split('.', 1)
headerobj = json.loads(base64.b64decode(encoded_header + '==').decode('utf8'))
key_id = headerobj['kid']
for key in key_set.json()['keys']:
if key['kid'] == key_id:
x5c = key['x5c'][0]
break
else:
raise jwt.DecodeError('Cannot find kid ' + key_id)
cert = load_der_x509_certificate(base64.b64decode(x5c), default_backend())
# Decode token (exp date is checked automatically)
decoded_token = jwt.decode(
token,
key=cert.public_key(),
algorithms=['RS256'],
options={'exp': True, 'verify_aud': False}
)
return decoded_token
except Exception as error:
raise Exception("Error verifying jwt token") from error

def authorize(self, authentication_info):
raise NotImplementedError("authorization not implemented for this class")

def authenticate(self, job_directory):
token = get_token(job_directory, self._provider)

decoded_token = self._verify_token(token)
user = decoded_token[self._username_in_token]
user = re.match(self._username_template, user).group(0)
return {"username": user}
21 changes: 21 additions & 0 deletions pulsar/user_auth/methods/userlist.py
@@ -0,0 +1,21 @@
from pulsar.user_auth.methods.interface import AuthMethod


class UserListAuth(AuthMethod):
"""
Defines authorization user by username
"""

def __init__(self, config):
try:
self._allowed_users = config["userlist_allowed_users"]
except Exception as e:
raise Exception("cannot read UsernameAuth configuration") from e

auth_type = "userlist"

def authorize(self, authentication_info):
return authentication_info["username"] in self._allowed_users

def authenticate(self, job_directory):
raise NotImplementedError("authentication not implemented for this class")

0 comments on commit 8d75c68

Please sign in to comment.