diff --git a/config/galaxy.ini.sample b/config/galaxy.ini.sample index c1a6fee70012..eaa02e41aafc 100644 --- a/config/galaxy.ini.sample +++ b/config/galaxy.ini.sample @@ -301,6 +301,11 @@ paste.app_factory = galaxy.web.buildapp:app_factory # Set to false to disable various checks Galaxy will do to ensure it # can run job scripts before attempting to execute or submit them. #check_job_script_integrity = True +# Number of checks to execute if check_job_script_integrity is enabled. +#check_job_script_integrity_count = 35 +# Time to sleep between checks if check_job_script_integrity is enabled (in seconds). +#check_job_script_integrity_sleep = .25 + # Citation related caching. Tool citations information maybe fetched from # external sources such as http://dx.doi.org/ by Galaxy - the following diff --git a/lib/galaxy/jobs/command_factory.py b/lib/galaxy/jobs/command_factory.py index 6d9a2b946668..e1762a7eef1c 100644 --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -1,9 +1,13 @@ from os import getcwd -from os import chmod from os.path import join from os.path import abspath from galaxy import util +from galaxy.jobs.runners.util.job_script import ( + INTEGRITY_INJECTION, + write_script, + check_script_integrity, +) from logging import getLogger log = getLogger( __name__ ) @@ -79,11 +83,12 @@ def build_command( def __externalize_commands(job_wrapper, commands_builder, remote_command_params, script_name="tool_script.sh"): local_container_script = join( job_wrapper.working_directory, script_name ) tool_commands = commands_builder.build() - with open( local_container_script, "w" ) as f: - script_contents = u"#!%s\n%s" % (DEFAULT_SHELL, tool_commands) - f.write(script_contents.encode(util.DEFAULT_ENCODING)) - chmod( local_container_script, 0755 ) - + config = job_wrapper.app.config + integrity_injection = "" + if check_script_integrity(config): + integrity_injection = INTEGRITY_INJECTION + script_contents = u"#!%s\n%s%s" % (DEFAULT_SHELL, integrity_injection, tool_commands) + write_script(local_container_script, script_contents, config) commands = local_container_script if 'working_directory' in remote_command_params: commands = "%s %s" % (DEFAULT_SHELL, join(remote_command_params['working_directory'], script_name)) diff --git a/lib/galaxy/jobs/runners/__init__.py b/lib/galaxy/jobs/runners/__init__.py index 3582678ed1ea..21a9ca13042b 100644 --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -20,6 +20,7 @@ from galaxy.util import ParamsWithSpecs from galaxy.util import ExecutionTimer from galaxy.util.bunch import Bunch +from galaxy.jobs.runners.util.job_script import write_script from galaxy.jobs.runners.util.job_script import job_script from galaxy.jobs.runners.util.env import env_to_statement @@ -306,41 +307,7 @@ def get_job_file(self, job_wrapper, **kwds): return job_script(**options) def write_executable_script( self, path, contents, mode=0o755 ): - with open( path, 'w' ) as f: - if isinstance(contents, unicode): - contents = contents.encode("UTF-8") - f.write( contents ) - os.chmod( path, mode ) - self._handle_script_integrity( path ) - - def _handle_script_integrity( self, path ): - if not getattr( self.app.config, "check_job_script_integrity", True ): - return - - script_integrity_verified = False - for i in range(10): - try: - proc = subprocess.Popen( [path], shell=True, env={"ABC_TEST_JOB_SCRIPT_INTEGRITY_XYZ": "1"} ) - proc.wait() - if proc.returncode == 42: - script_integrity_verified = True - break - - # Else we will sync and wait to see if the script becomes - # executable. - try: - # sync file system to avoid "Text file busy" problems. - # These have occurred both in Docker containers and on EC2 clusters - # under high load. - subprocess.check_call(["/bin/sync"]) - except Exception: - pass - time.sleep( .25 ) - except Exception: - pass - - if not script_integrity_verified: - raise Exception("Failed to write job script, could not verify job script integrity.") + write_script( path, contents, self.app.config, mode=mode ) def _complete_terminal_job( self, ajs, **kwargs ): if ajs.job_wrapper.get_state() != model.Job.states.DELETED: diff --git a/lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh b/lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh index 5820de9e4a1f..9e2285bd1ce2 100644 --- a/lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh +++ b/lib/galaxy/jobs/runners/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh @@ -1,12 +1,6 @@ #!/bin/sh -# The following block can be used by the job creation system -# to ensure this script is runnable before running it directly -# or submitting it to a cluster manager. -if [ -n "$ABC_TEST_JOB_SCRIPT_INTEGRITY_XYZ" ]; then - exit 42 -fi - +$integrity_injection $headers $slots_statement export GALAXY_SLOTS diff --git a/lib/galaxy/jobs/runners/util/job_script/__init__.py b/lib/galaxy/jobs/runners/util/job_script/__init__.py index 6556290a6ef7..37e96af8cc26 100644 --- a/lib/galaxy/jobs/runners/util/job_script/__init__.py +++ b/lib/galaxy/jobs/runners/util/job_script/__init__.py @@ -1,5 +1,9 @@ +import os from string import Template +import subprocess +import time from pkg_resources import resource_string + from galaxy.util import unicodify DEFAULT_JOB_FILE_TEMPLATE = Template( @@ -13,6 +17,19 @@ GALAXY_SLOTS="1" """ +INTEGRITY_INJECTION = """ +# The following block can be used by the job system +# to ensure this script is runnable before actually attempting +# to run it. +if [ -n "$ABC_TEST_JOB_SCRIPT_INTEGRITY_XYZ" ]; then + exit 42 +fi +""" + +INTEGRITY_SYNC_COMMAND = "/bin/sync" +DEFAULT_INTEGRITY_CHECK = True +DEFAULT_INTEGRITY_COUNT = 35 +DEFAULT_INTEGRITY_SLEEP = .25 REQUIRED_TEMPLATE_PARAMS = ['working_directory', 'command', 'exit_code_path'] OPTIONAL_TEMPLATE_PARAMS = { 'galaxy_lib': None, @@ -22,6 +39,7 @@ 'slots_statement': SLOTS_STATEMENT_CLUSTER_DEFAULT, 'instrument_pre_commands': '', 'instrument_post_commands': '', + 'integrity_injection': INTEGRITY_INJECTION, } @@ -67,3 +85,56 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): if not isinstance(template, Template): template = Template(template) return template.safe_substitute(template_params) + + +def check_script_integrity(config): + return getattr(config, "check_job_script_integrity", DEFAULT_INTEGRITY_CHECK) + + +def write_script(path, contents, config, mode=0o755): + with open(path, 'w') as f: + if isinstance(contents, unicode): + contents = contents.encode("UTF-8") + f.write(contents) + os.chmod(path, mode) + _handle_script_integrity(path, config) + + +def _handle_script_integrity(path, config): + if not check_script_integrity(config): + return + + script_integrity_verified = False + count = getattr(config, "check_job_script_integrity_count", DEFAULT_INTEGRITY_COUNT) + sleep_amt = getattr(config, "check_job_script_integrity_sleep", DEFAULT_INTEGRITY_SLEEP) + for i in range(count): + try: + proc = subprocess.Popen([path], shell=True, env={"ABC_TEST_JOB_SCRIPT_INTEGRITY_XYZ": "1"}) + proc.wait() + if proc.returncode == 42: + script_integrity_verified = True + break + + # Else we will sync and wait to see if the script becomes + # executable. + try: + # sync file system to avoid "Text file busy" problems. + # These have occurred both in Docker containers and on EC2 clusters + # under high load. + subprocess.check_call(INTEGRITY_SYNC_COMMAND) + except Exception: + pass + time.sleep(sleep_amt) + except Exception: + pass + + if not script_integrity_verified: + raise Exception("Failed to write job script, could not verify job script integrity.") + + +__all__ = [ + 'check_script_integrity', + 'job_script', + 'write_script', + 'INTEGRITY_INJECTION', +] diff --git a/test/unit/jobs/test_command_factory.py b/test/unit/jobs/test_command_factory.py index b88fac37187d..9623c1099dd8 100644 --- a/test/unit/jobs/test_command_factory.py +++ b/test/unit/jobs/test_command_factory.py @@ -163,6 +163,11 @@ def __init__(self, job_dir): self.working_directory = job_dir self.prepare_input_files_cmds = None self.commands_in_new_shell = False + self.app = Bunch( + config=Bunch( + check_job_script_integrity=False, + ) + ) def get_command_line(self): return self.command_line