Skip to content

Commit

Permalink
Merge pull request #7091 from natefoo/handler-assignment-fixes
Browse files Browse the repository at this point in the history
Configurable job/workflow handler assignment methods
  • Loading branch information
jmchilton committed Dec 12, 2018
2 parents 1fd3fcc + e9aec17 commit 7bed9f0
Show file tree
Hide file tree
Showing 33 changed files with 927 additions and 268 deletions.
5 changes: 2 additions & 3 deletions config/galaxy.yml.sample
Expand Up @@ -1563,9 +1563,8 @@ galaxy:
# failing jobs are just failed outright.
#default_job_resubmission_condition: null

# In multiprocess configurations, notification between processes about
# new jobs must be done via the database. In single process
# configurations, this can be done in memory, which is a bit quicker.
# This option is deprecated, use the `mem-self` handler assignment
# option in the job configuration instead.
#track_jobs_in_database: true

# This enables splitting of jobs into tasks, if specified by the
Expand Down
66 changes: 59 additions & 7 deletions config/job_conf.xml.sample_advanced
Expand Up @@ -332,24 +332,76 @@
used by Galaxy is no environment variable of the specified name is found.
-->
</plugins>
<handlers default="handlers">
<!-- Additional job handlers - the id should match the name of a
[server:<id>] in galaxy.ini.
<handlers>
<!-- Job handler processes - for a full discussion of job handlers, see the documentation at:
https://docs.galaxyproject.org/en/latest/admin/scaling.html
The <handlers> container tag takes two optional attributes:
<handlers assign_with="method" default="id_or_tag"/>
- `assign_with` - How jobs should be assigned to handlers. The value can be a single method or a
comma-separated list that will be tried in order. The default depends on whether any handlers and a job
handler "pool" (such as uWSGI mules in a `job-handlers` farm) are configured. Valid methods are:
- `mem-self` - Jobs are assigned to the web worker that received the tool execution request from the
user via an internal in-memory queue. If a tool is configured to use a specific handler, that
configuration is ignored; the process that creates the job *always* handles it. This can be
slightly faster than `db-self` but only makes sense in single process environments without
dedicated job handlers. This option replaces the former `track_jobs_in_database` option in
galaxy.yml.
- `db-self` - Like `mem-self` but assignment occurs by setting a new job's 'handler' column in the
database to the process that created the job at the time it is created. Additionally, if a tool is
configured to use a specific handler (ID or tag), that handler is assigned (tags by
`db-preassign`). This is the default if no handlers are defined and no `job-handlers` pool is
present (the default for a completely unconfigured Galaxy).
- `db-preassign` - Jobs are assigned a handler by selecting one at random from the configured tag or
default handlers. This occurs by setting a new job's 'handler' column in the database to the
chosen handler ID (hence "preassign"ment). This is the default if handlers are defined and no
`job-handlers` pool is present.
- `uwsgi-mule-message` - Jobs are assigned a handler via uWSGI mule messaging (see uWSGI
documentation). A mule in the `job-handlers` (if sent to the default tag) or `job-handlers.<tag>`
farm will recieve the message and assign itself. This the default if a `job-handlers` pool is
present and no handlers are configured.
In the event that both a `job-handlers` pool is present and handlers are configured, the default is
`uwsgi-mule-message,db-preassign`. At present, only `uwsgi-mule-message` is capable of deferring
handler assignment to a later method (which would occur in the event that a tool is configured to use
a tag for which there is not a matching pool).
In all cases, if a tool is configured to use a specific handler (by ID, not tag), configured assignment
methods are ignored and that handler is directly assigned in the job's 'handler' column at job creation
time.
- `default` - An ID or tag of the handler(s) that should handle any jobs not assigned to a specific
handler (which is probably most of them). If unset, the default is any untagged handlers plus any
handlers in the `job-handlers` (no tag) pool.
Note that in the event such a mixed configuration environment exists (both a `job-handlers` pool (farm)
and untagged handlers are configured), the default value of
`assign_with="uwsgi-mule-message,db-preassign"` would prevent any of the configured handlers from being
assigned since `uwsgi-mule-message` is the preferred assignment method.
-->
<!-- Explicitly defined job handlers - the id should match the handler process's `server_name`. For webless
handlers, this is the value of the `server-name` argument to `galaxy-main`.
-->
<handler id="handler0" tags="handlers"/>
<handler id="handler1" tags="handlers"/>
<handler id="handler0"/>
<handler id="handler1"/>
<!-- Handlers will load all plugins defined in the <plugins> collection
above by default, but can be limited to a subset using <plugin>
tags. This is useful for heterogenous environments where the DRMAA
plugin would need to be loaded more than once with different
configs.
-->
-->
<handler id="sge_handler">
<plugin id="sge"/>
</handler>
<!-- Handlers are grouped by defining (comma-separated) tags -->
<handler id="special_handler0" tags="special_handlers"/>
<handler id="special_handler1" tags="special_handlers"/>
<handler id="trackster_handler"/>
</handlers>
<destinations default="local">
<!-- Destinations define details about remote resources and how jobs
Expand Down
27 changes: 22 additions & 5 deletions config/workflow_schedulers_conf.xml.sample
Expand Up @@ -13,12 +13,29 @@

<!-- Handlers (Galaxy server processes that perform the scheduling work) can
be defined here in the same format as in job_conf.xml. By default, the
handlers defined in job_conf.xml will be used (or `main` if there is no
job_conf.xml). -->
handlers defined in job_conf.xml will be used (or the web process that
receives the workflow scheduling request if handlers are not configured
in job_conf.xml).
The options here are the same as is documented for <handlers> in
job_conf.xml.sample_advanced with two exceptions:
- If a uWSGI farm named `workflow-schedulers` is present, it will be
preferred, followed by `job-handlers`. If any untagged handlers are
defined in this configuration they are eligible to schedule workflows
in addition to any matching mules.
- If uWSGI farms are present, the default assignment method is
`db-preassign` rather than `uwsgi-mule-message`, because `db-preassign`
is deterministic. All workflows scheduled in a single history will be
assigned to the same handler, ensuring they are scheduled serially
(preventing their outputs from being interleaved in the history). You
can override this by explicitly setting
`assign_with="uwsgi-mule-message"`.
-->
<!--
<handlers default="handlers">
<handler id="handler0" tags="handlers"/>
<handler id="handler1" tags="handlers"/>
<handlers>
<handler id="handler0"/>
<handler id="handler1"/>
</handlers>
-->

Expand Down
6 changes: 2 additions & 4 deletions doc/source/admin/galaxy_options.rst
Expand Up @@ -3277,10 +3277,8 @@
~~~~~~~~~~~~~~~~~~~~~~~~~~

:Description:
In multiprocess configurations, notification between processes
about new jobs must be done via the database. In single process
configurations, this can be done in memory, which is a bit
quicker.
This option is deprecated, use the `mem-self` handler assignment
option in the job configuration instead.
:Default: ``true``
:Type: bool

Expand Down
2 changes: 1 addition & 1 deletion doc/source/admin/jobs.md
Expand Up @@ -40,7 +40,7 @@ workers

The `<handlers>` configuration elements defines which Galaxy server processes (when [running multiple server processes](scaling.html)) should be used for running jobs, and how to group those processes.

The handlers configuration may define a ``default`` attribute. This is the the handler(s) that should be used if no explicit handler is defined for a job and is required if >1 handlers defined.
The handlers configuration may define a ``default`` attribute. This is the the handler(s) that should be used if no explicit handler is defined for a job. If unset, any untagged handlers will be used by default.

The collection contains `<handler>` elements.

Expand Down
20 changes: 16 additions & 4 deletions doc/source/admin/scaling.md
Expand Up @@ -94,8 +94,20 @@ Under this strategy, job handling is offloaded to dedicated non-web-serving proc
directly by the master uWSGI process. As a benefit of using mule messaging, only job handlers that are alive will be
selected to run jobs.

This is the recommended deployment strategy for Galaxy servers that run web servers and job handlers **on the same
host**.
This is the recommended deployment strategy.

```eval_rst
.. important::
If using **Zerg Mode** or running more than one uWSGI *master* process, do not use **uWSGI + Mules**. Doing so can
can cause jobs to be executed by mutiple handlers when recovering unassigned jobs at Galaxy server startup.
Multiple master processes is a rare configuration and is typically only used in the case of load balancing the web
application across multiple hosts. Note that multiple master proceses is not the same thing as the ``processess``
uWSGI configuration option, which is perfectly safe to set when using job handler mules.
For these scenarios, **uWSGI + Webless** is the recommended deployment strategy.
```

### uWSGI for web serving and Webless Galaxy applications as job handlers

Expand All @@ -110,8 +122,8 @@ Like mules, under this strategy, job handling is offloaded to dedicated non-web-
are [managed by the administrator](#starting-and-stopping). Because the handler is randomly assigned by the web worker
when the job is submitted via the UI/API, jobs may be assigned to dead handlers.

This is the recommended deployment strategy for Galaxy servers that run web servers and job handlers **on different
hosts**.
This is the recommended deployment strategy when **Zerg Mode** is used, and for Galaxy servers that run web servers and
job handlers **on different hosts**.

## Legacy Deployment Options

Expand Down
5 changes: 5 additions & 0 deletions lib/galaxy/app.py
Expand Up @@ -212,6 +212,11 @@ def postfork_sentry_client():
# Must be initialized after job_config.
self.workflow_scheduling_manager = scheduling_manager.WorkflowSchedulingManager(self)

# Must be initialized after any component that might make use of stack messaging is configured. Alternatively if
# it becomes more commonly needed we could create a prefork function registration method like we do with
# postfork functions.
self.application_stack.init_late_prefork()

self.containers = {}
if self.config.enable_beta_containers_interface:
self.containers = build_container_interfaces(
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/config.py
Expand Up @@ -154,7 +154,7 @@ def find_root(kwargs):


class Configuration(object):
deprecated_options = ('database_file', )
deprecated_options = ('database_file', 'track_jobs_in_database')

def __init__(self, **kwargs):
self.config_dict = kwargs
Expand Down
15 changes: 15 additions & 0 deletions lib/galaxy/exceptions/__init__.py
Expand Up @@ -185,6 +185,15 @@ class InternalServerError(MessageException):
err_code = error_codes.INTERNAL_SERVER_ERROR


class ToolExecutionError(MessageException):
status_code = 500
err_code = error_codes.TOOL_EXECUTION_ERROR

def __init__(self, err_msg, type="error", job=None):
super(ToolExecutionError, self).__init__(err_msg, type)
self.job = job


class NotImplemented(MessageException):
status_code = 501
err_code = error_codes.NOT_IMPLEMENTED
Expand Down Expand Up @@ -221,3 +230,9 @@ def __init__(self, msg=None, image=None, command=None, **kwargs):
super(ContainerRunError, self).__init__(msg, **kwargs)
self.image = image
self.command = command


class HandlerAssignmentError(Exception):
def __init__(self, msg=None, obj=None, **kwargs):
super(HandlerAssignmentError, self).__init__(msg, **kwargs)
self.obj = obj
5 changes: 5 additions & 0 deletions lib/galaxy/exceptions/error_codes.json
Expand Up @@ -144,6 +144,11 @@
"code": 500003,
"message": "Error in a configuration file."
},
{
"name": "TOOL_EXECUTION_ERROR",
"code": 500004,
"message": "Tool execution failed due to an internal server error."
},
{
"name": "NOT_IMPLEMENTED",
"code": 501001,
Expand Down
63 changes: 21 additions & 42 deletions lib/galaxy/jobs/__init__.py
Expand Up @@ -115,6 +115,8 @@ class JobConfiguration(ConfiguresHandlers):
These features are configured in the job configuration, by default, ``job_conf.xml``
"""
DEFAULT_BASE_HANDLER_POOLS = ('job-handlers',)

DEFAULT_NWORKERS = 4

JOB_RESOURCE_CONDITIONAL_XML = """<conditional name="__job_resource">
Expand All @@ -135,6 +137,8 @@ def __init__(self, app):
self.handlers = {}
self.handler_runner_plugins = {}
self.default_handler_id = None
self.handler_assignment_methods = None
self.handler_assignment_methods_configured = False
self.destinations = {}
self.destination_tags = {}
self.default_destination_id = None
Expand All @@ -150,7 +154,6 @@ def __init__(self, app):
output_size=None,
destination_user_concurrent_jobs={},
destination_total_concurrent_jobs={})
self._is_handler = None

default_resubmits = []
default_resubmit_condition = self.app.config.default_job_resubmission_condition
Expand All @@ -170,8 +173,7 @@ def __init__(self, app):
tree = load(job_config_file)
self.__parse_job_conf_xml(tree)
except IOError:
log.warning('Job configuration "%s" does not exist, using default'
' job configuration (this server will run jobs)',
log.warning('Job configuration "%s" does not exist, using default job configuration',
self.app.config.job_config_file)
self.__set_default_job_conf()
except Exception as e:
Expand Down Expand Up @@ -212,23 +214,15 @@ def __parse_job_conf_xml(self, tree):

# Parse handlers
handlers_conf = root.find('handlers')
self._init_handler_assignment_methods(handlers_conf)
self._init_handlers(handlers_conf)

# Determine the default handler(s)
try:
self.default_handler_id = self._get_default(self.app.config, handlers_conf, list(self.handlers.keys()))
except Exception:
pass
# For tets, this may not exist
try:
base_server_name = self.app.config.base_server_name
except AttributeError:
base_server_name = self.app.config.get('base_server_name', None)
if (self.default_handler_id is None
or (len(self.handlers) == 1 and base_server_name == next(iter(self.handlers.keys())))):
# Shortcut for compatibility with existing job confs that use the default handlers block,
# there are no defined handlers, or there's only one handler and it's this server
self.__set_default_job_handler()
if not self.handler_assignment_methods_configured:
self._set_default_handler_assignment_methods()
else:
self.app.application_stack.init_job_handling(self)
log.info("Job handler assignment methods set to: %s", ', '.join(self.handler_assignment_methods))
for tag, handlers in [(t, h) for t, h in self.handlers.items() if isinstance(h, list)]:
log.info("Tag [%s] handlers: %s", tag, ', '.join(handlers))

# Parse destinations
destinations = root.find('destinations')
Expand Down Expand Up @@ -269,7 +263,8 @@ def __parse_job_conf_xml(self, tree):
self.destinations[tag].append(job_destination)

# Determine the default destination
self.default_destination_id = self._get_default(self.app.config, destinations, list(self.destinations.keys()))
self.default_destination_id = self._get_default(
self.app.config, destinations, list(self.destinations.keys()), auto=True)

# Parse resources...
resources = root.find('resources')
Expand Down Expand Up @@ -346,32 +341,16 @@ def __set_default_job_conf(self):
if self.app.config.use_tasked_jobs:
self.runner_plugins.append(dict(id='tasks', load='tasks', workers=DEFAULT_LOCAL_WORKERS))
# Set the handlers
self.__set_default_job_handler()
self._init_handler_assignment_methods()
if not self.handler_assignment_methods_configured:
self._set_default_handler_assignment_methods()
else:
self.app.application_stack.init_job_handling(self)
# Set the destination
self.default_destination_id = 'local'
self.destinations['local'] = [JobDestination(id='local', runner='local')]
log.debug('Done loading job configuration')

def __set_default_job_handler(self):
# Called when self.default_handler_id is None
if self.app.application_stack.has_pool(self.app.application_stack.pools.JOB_HANDLERS):
if (self.app.application_stack.in_pool(self.app.application_stack.pools.JOB_HANDLERS)):
log.info("Found job handler pool managed by application stack, this server (%s) is a member of pool: "
"%s", self.app.config.server_name, self.app.application_stack.pools.JOB_HANDLERS)
self._is_handler = True
else:
log.info("Found job handler pool managed by application stack, this server (%s) will submit jobs to "
"pool: %s", self.app.config.server_name, self.app.application_stack.pools.JOB_HANDLERS)
else:
log.info('Did not find job handler pool managed by application stack, this server (%s) will handle jobs '
'submitted to it', self.app.config.server_name)
self._is_handler = True
self.app.application_stack.register_postfork_function(self.make_self_default_handler)

def make_self_default_handler(self):
self.default_handler_id = self.app.config.server_name
self.handlers[self.app.config.server_name] = [self.app.config.server_name]

def get_tool_resource_xml(self, tool_id, tool_type):
""" Given a tool id, return XML elements describing parameters to
insert into job resources.
Expand Down Expand Up @@ -484,7 +463,7 @@ def default_job_tool_configuration(self):
:returns: JobToolConfiguration -- a representation of a <tool> element that uses the default handler and destination
"""
return JobToolConfiguration(id='default', handler=self.default_handler_id, destination=self.default_destination_id)
return JobToolConfiguration(id='_default_', handler=self.default_handler_id, destination=self.default_destination_id)

# Called upon instantiation of a Tool object
def get_job_tool_configurations(self, ids):
Expand Down
8 changes: 0 additions & 8 deletions lib/galaxy/jobs/handler.py
Expand Up @@ -743,14 +743,6 @@ def put(self, job_id, tool_id):
if not self.track_jobs_in_database:
self.queue.put((job_id, tool_id))
self.sleeper.wake()
else:
# Workflow invocations farmed out to workers will submit jobs through here. If a handler is unassigned, we
# will submit for one, or else claim it ourself. TODO: This should be moved to a higher level as it's now
# implemented here and in MessageJobQueue
job = self.sa_session.query(model.Job).get(job_id)
if job.handler is None and self.app.application_stack.has_pool(self.app.application_stack.pools.JOB_HANDLERS):
msg = JobHandlerMessage(task='setup', job_id=job_id)
self.app.application_stack.send_message(self.app.application_stack.pools.JOB_HANDLERS, msg)

def shutdown(self):
"""Attempts to gracefully shut down the worker thread"""
Expand Down

0 comments on commit 7bed9f0

Please sign in to comment.