Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add the ability to control job conf plugin loading from environment v…
…ariables.

All job configuration parameters can be set using environment variables but currently the actual loading of the plugin cannot. This is problematic for setting up single job confs for multiple environemnts (like done with docker-galaxy-stable) when using plugins that may hang waiting for resources (such as Kubernetes for the K8 runner or a MQ for Pulsar for instance) or may require library dependencies not be setup in every environment the job configuration is deployed in.

xref bgruening/docker-galaxy-stable#328
  • Loading branch information
jmchilton committed Jun 6, 2017
1 parent 9d1a382 commit 7ad5f90
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 2 deletions.
6 changes: 5 additions & 1 deletion config/job_conf.xml.sample_advanced
Expand Up @@ -283,7 +283,11 @@
<param id="insecure">true</param>
<!-- True to communicate with Chronos over HTTPS; false otherwise-->
</plugin>

<!-- Additionally any plugin or destination (below) may define an "enabled" parameter that should
evaluate to True or False. When setup using
<param id="enabled" from_environ="<VAR>">DEFAULT</param>
plugins and destinations can be conditionally loaded using environment variables.
-->
</plugins>
<handlers default="handlers">
<!-- Additional job handlers - the id should match the name of a
Expand Down
17 changes: 16 additions & 1 deletion lib/galaxy/jobs/__init__.py
Expand Up @@ -180,6 +180,8 @@ def __parse_job_conf_xml(self, tree):
if plugin.get('type') == 'runner':
workers = plugin.get('workers', plugins.get('workers', JobConfiguration.DEFAULT_NWORKERS))
runner_kwds = self.__get_params(plugin)
if not self.__is_enabled(runner_kwds):
continue
runner_info = dict(id=plugin.get('id'),
load=plugin.get('load'),
workers=int(workers),
Expand Down Expand Up @@ -223,7 +225,11 @@ def __parse_job_conf_xml(self, tree):
if metrics_elements:
job_metrics.set_destination_conf_element( id, metrics_elements[ 0 ] )
job_destination = JobDestination(**dict(destination.items()))
job_destination['params'] = self.__get_params(destination)
params = self.__get_params(destination)
if not self.__is_enabled(params):
continue

job_destination['params'] = params
job_destination['env'] = self.__get_envs(destination)
destination_resubmits = self.__get_resubmits(destination)
if destination_resubmits:
Expand Down Expand Up @@ -490,6 +496,15 @@ def __get_resubmits(self, parent):
) )
return rval

def __is_enabled(self, params):
"""Check for an enabled parameter - pop it out - and return as boolean."""
enabled = True
if "enabled" in params:
raw_enabled = params.pop("enabled")
enabled = util.asbool(raw_enabled)

return enabled

@property
def default_job_tool_configuration(self):
"""
Expand Down
25 changes: 25 additions & 0 deletions test/unit/jobs/conditional_runners_job_conf.xml
@@ -0,0 +1,25 @@
<?xml version="1.0"?>
<!-- A sample job config that explicitly configures job running the way it is configured by default (if there is no explicit config). -->
<job_conf>
<plugins>
<plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner" workers="4"/>
<plugin id="local2" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner" workers="4">
<param id="enabled" from_environ="LOCAL2_ENABLED">True</param>
</plugin>
<plugin id="local3" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner" workers="4">
<param id="enabled" from_environ="LOCAL3_ENABLED">False</param>
</plugin>
</plugins>
<handlers>
<handler id="main"/>
</handlers>
<destinations default="local">
<destination id="local" runner="local"/>
<destination id="local2_dest" runner="local2">
<param id="enabled" from_environ="LOCAL2_ENABLED">True</param>
</destination>
<destination id="local3_dest" runner="local3">
<param id="enabled" from_environ="LOCAL3_ENABLED">False</param>
</destination>
</destinations>
</job_conf>
24 changes: 24 additions & 0 deletions test/unit/jobs/test_job_configuration.py
Expand Up @@ -11,6 +11,7 @@
# there are advantages to testing the documentation/examples.
SIMPLE_JOB_CONF = os.path.join( os.path.dirname( __file__ ), "..", "..", "..", "config", "job_conf.xml.sample_basic" )
ADVANCED_JOB_CONF = os.path.join( os.path.dirname( __file__ ), "..", "..", "..", "config", "job_conf.xml.sample_advanced" )
CONDITIONAL_RUNNER_JOB_CONF = os.path.join( os.path.dirname( __file__ ), "conditional_runners_job_conf.xml" )


class JobConfXmlParserTestCase( unittest.TestCase ):
Expand All @@ -27,9 +28,11 @@ def setUp( self ):
self.__write_config_from( SIMPLE_JOB_CONF )
self.app = bunch.Bunch( config=self.config, job_metrics=MockJobMetrics() )
self.__job_configuration = None
self.__original_environ = os.environ.copy()

def tearDown( self ):
shutil.rmtree( self.temp_directory )
os.environ = self.__original_environ

def test_load_simple_runner( self ):
runner_plugin = self.job_config.runner_plugins[ 0 ]
Expand Down Expand Up @@ -137,6 +140,27 @@ def test_macro_expansion( self ):
for name in ["foo_small", "foo_medium", "foo_large", "foo_longrunning"]:
assert self.job_config.destinations[ name ]

def test_conditional_runners( self ):
self.__write_config_from( CONDITIONAL_RUNNER_JOB_CONF )
runner_ids = [ r[ "id" ] for r in self.job_config.runner_plugins ]
assert "local2" in runner_ids
assert "local3" not in runner_ids

assert "local2_dest" in self.job_config.destinations
assert "local3_dest" not in self.job_config.destinations

def test_conditional_runners_from_environ( self ):
self.__write_config_from( CONDITIONAL_RUNNER_JOB_CONF )
os.environ["LOCAL2_ENABLED"] = "False"
os.environ["LOCAL3_ENABLED"] = "True"

runner_ids = [ r[ "id" ] for r in self.job_config.runner_plugins ]
assert "local2" not in runner_ids
assert "local3" in runner_ids

assert "local2_dest" not in self.job_config.destinations
assert "local3_dest" in self.job_config.destinations

# TODO: Add job metrics parsing test.

@property
Expand Down

0 comments on commit 7ad5f90

Please sign in to comment.