From 274050249cbf3974b236540e92a5a6d1414e51da Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Mon, 30 Mar 2020 18:18:19 -0400 Subject: [PATCH 1/7] Add set -e to setup.sh --- canine/localization/base.py | 3 ++- canine/localization/nfs.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/canine/localization/base.py b/canine/localization/base.py index 79fb3560..b283cc2c 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -537,6 +537,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ line.rstrip() for line in [ '#!/bin/bash', + 'set -e', 'export CANINE_JOB_VARS={}'.format(':'.join(job_vars)), 'export CANINE_JOB_INPUTS="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'inputs')), 'export CANINE_JOB_ROOT="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'workspace')), @@ -545,7 +546,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ 'mkdir -p $CANINE_JOB_INPUTS', 'mkdir -p $CANINE_JOB_ROOT', ] + exports + extra_tasks - ) + '\ncd $CANINE_JOB_ROOT\n' + ) + '\ncd $CANINE_JOB_ROOT\nunset -e\n' teardown_script = '\n'.join( line.rstrip() for line in [ diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index 67729de1..0664fd58 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -213,6 +213,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ line.rstrip() for line in [ '#!/bin/bash', + 'set -e', 'export CANINE_JOB_VARS={}'.format(':'.join(job_vars)), 'export CANINE_JOB_INPUTS="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'inputs')), 'export CANINE_JOB_ROOT="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'workspace')), @@ -221,7 +222,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ 'mkdir -p $CANINE_JOB_INPUTS', 'mkdir -p $CANINE_JOB_ROOT', ] + exports + extra_tasks - ) + '\ncd $CANINE_JOB_ROOT\n' + ) + '\ncd $CANINE_JOB_ROOT\nunset -e\n' teardown_script = '\n'.join( line.rstrip() for line in [ From 6a0637e0abca42394f2abaeced987bb436f9cdec Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 31 Mar 2020 14:21:39 -0400 Subject: [PATCH 2/7] You undo `set -e` with `set +e`, not `unset -e` Gotta love bash --- canine/localization/base.py | 2 +- canine/localization/nfs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/canine/localization/base.py b/canine/localization/base.py index b283cc2c..9543126f 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -546,7 +546,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ 'mkdir -p $CANINE_JOB_INPUTS', 'mkdir -p $CANINE_JOB_ROOT', ] + exports + extra_tasks - ) + '\ncd $CANINE_JOB_ROOT\nunset -e\n' + ) + '\ncd $CANINE_JOB_ROOT\nset +e\n' teardown_script = '\n'.join( line.rstrip() for line in [ diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index 0664fd58..e7afb230 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -222,7 +222,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ 'mkdir -p $CANINE_JOB_INPUTS', 'mkdir -p $CANINE_JOB_ROOT', ] + exports + extra_tasks - ) + '\ncd $CANINE_JOB_ROOT\nunset -e\n' + ) + '\ncd $CANINE_JOB_ROOT\nset +e\n' teardown_script = '\n'.join( line.rstrip() for line in [ From c4cde6232b39981e62df10230393b9866ee65343 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Tue, 31 Mar 2020 19:25:25 -0400 Subject: [PATCH 3/7] Check if we have permission to access gs:// before attempting to stream --- canine/localization/base.py | 1 + canine/localization/nfs.py | 1 + 2 files changed, 2 insertions(+) diff --git a/canine/localization/base.py b/canine/localization/base.py index 9543126f..6d27cbc7 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -499,6 +499,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ job_vars.append(shlex.quote(key)) dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) extra_tasks += [ + 'gsutil ls {} > /dev/null'.format(shlex.quote(val.path)), 'if [[ -e {0} ]]; then rm {0}; fi'.format(dest.computepath), 'mkfifo {}'.format(dest.computepath), "gsutil {} cat {} > {} &".format( diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index e7afb230..ea395b46 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -175,6 +175,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ job_vars.append(shlex.quote(key)) dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) extra_tasks += [ + 'gsutil ls {} > /dev/null'.format(shlex.quote(val.path)), 'if [[ -e {0} ]]; then rm {0}; fi'.format(dest.computepath), 'mkfifo {}'.format(dest.computepath), "gsutil {} cat {} > {} &".format( From 2896beadf06d87603070fc338df8e78c797ea412 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Wed, 1 Apr 2020 15:41:58 -0400 Subject: [PATCH 4/7] Split setup.sh into setup and localization steps Previously, setup.sh did three things: * Create directory structure for the task * Export environment variables for the task * Perform any localization tasks (i.e., gsutil cp/cat) for the task We need to be able to catch errors arising from the third (localization) step. However, the first two steps need to be sourced in entrypoint.sh, since they export variables that must be available to script.sh when it's subsequently invoked in entrypoint.sh. Sourcing all three steps makes it difficult to recover any error codes from the localization step. Thus, we now source the first two steps in entrypoint.sh (via setup.sh), and break the localization tasks out into their own script (localization.sh), which entrypoint.sh runs. By recovering the exit code of localization.sh, we can infer whether the localization tasks failed Our previous approach of wrapping everything in setup.sh with set -e/set +e was problematic because it prevents teardown.sh from ever running -- since set -e get *sourced*, any errors during localization halt entrypoint.sh immediately. --- canine/localization/base.py | 33 ++++++++++++++++------- canine/localization/local.py | 19 ++++++++++--- canine/localization/nfs.py | 50 ++++++++++++++++++++++++++--------- canine/localization/remote.py | 17 +++++++++--- canine/orchestrator.py | 3 +++ 5 files changed, 92 insertions(+), 30 deletions(-) diff --git a/canine/localization/base.py b/canine/localization/base.py index 6d27cbc7..1b12c1fc 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -483,14 +483,18 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], comm value ) - def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str]: + def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str, str]: """ - Returns a tuple of (setup script, teardown script) for the given job id. + Returns a tuple of (setup script, localization script, teardown script) for the given job id. Must call after pre-scanning inputs """ + + # generate job variable, exports, and localization_tasks arrays + # - job variables and exports are set when setup.sh is _sourced_ + # - localization tasks are run when localization.sh is _run_ job_vars = [] exports = [] - extra_tasks = [ + localization_tasks = [ 'if [[ -d $CANINE_JOB_INPUTS ]]; then cd $CANINE_JOB_INPUTS; fi' ] compute_env = self.environment('compute') @@ -498,7 +502,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ if val.type == 'stream': job_vars.append(shlex.quote(key)) dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - extra_tasks += [ + localization_tasks += [ 'gsutil ls {} > /dev/null'.format(shlex.quote(val.path)), 'if [[ -e {0} ]]; then rm {0}; fi'.format(dest.computepath), 'mkfifo {}'.format(dest.computepath), @@ -515,7 +519,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ elif val.type == 'download': job_vars.append(shlex.quote(key)) dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - extra_tasks += [ + localization_tasks += [ "if [[ ! -e {2}.fin ]]; then gsutil {0} -o GSUtil:check_hashes=if_fast_else_skip cp {1} {2} && touch {2}.fin; fi".format( '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '', shlex.quote(val.path), @@ -534,11 +538,12 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ )) else: print("Unknown localization command:", val.type, "skipping", key, val.path, file=sys.stderr) + + # generate setup script setup_script = '\n'.join( line.rstrip() for line in [ '#!/bin/bash', - 'set -e', 'export CANINE_JOB_VARS={}'.format(':'.join(job_vars)), 'export CANINE_JOB_INPUTS="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'inputs')), 'export CANINE_JOB_ROOT="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'workspace')), @@ -546,8 +551,16 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ 'export CANINE_JOB_TEARDOWN="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'teardown.sh')), 'mkdir -p $CANINE_JOB_INPUTS', 'mkdir -p $CANINE_JOB_ROOT', - ] + exports + extra_tasks - ) + '\ncd $CANINE_JOB_ROOT\nset +e\n' + ] + exports + ) + '\ncd $CANINE_JOB_ROOT\n' + + # generate localization script + localization_script = '\n'.join([ + "#!/bin/bash", + "set -e" + ] + localization_tasks) + "\nset +e" + + # generate teardown script teardown_script = '\n'.join( line.rstrip() for line in [ @@ -565,7 +578,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ ), ] ) - return setup_script, teardown_script + return setup_script, localization_script, teardown_script @abc.abstractmethod def __enter__(self): @@ -602,7 +615,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty 3 phase task: 1) Pre-scan inputs to determine proper localization strategy for all inputs 2) Begin localizing job inputs. For each job, check the predetermined strategy - and set up the job's setup and teardown scripts + and set up the job's setup, localization, and teardown scripts 3) Finally, finalize the localization. This may include broadcasting the staging directory or copying a batch of gsutil files Returns the remote staging directory, which is now ready for final startup diff --git a/canine/localization/local.py b/canine/localization/local.py index 6a7e2d1c..3c51cfba 100644 --- a/canine/localization/local.py +++ b/canine/localization/local.py @@ -94,7 +94,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty 3 phase task: 1) Pre-scan inputs to determine proper localization strategy for all inputs 2) Begin localizing job inputs. For each job, check the predetermined strategy - and set up the job's setup and teardown scripts + and set up the job's setup, localization, and teardown scripts 3) Finally, finalize the localization. This may include broadcasting the staging directory or copying a batch of gsutil files Returns the remote staging directory, which is now ready for final startup @@ -113,18 +113,29 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty jobId, )) self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport) - # Now localize job setup and teardown scripts - setup_script, teardown_script = self.job_setup_teardown(jobId, patterns) - # Setup: + + # Now localize job setup, localization, and teardown scripts + setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns) + + # Setup: script_path = self.reserve_path('jobs', jobId, 'setup.sh') with open(script_path.localpath, 'w') as w: w.write(setup_script) os.chmod(script_path.localpath, 0o775) + + # Localization: + script_path = self.reserve_path('jobs', jobId, 'localization.sh') + with open(script_path.localpath, 'w') as w: + w.write(localization_script) + os.chmod(script_path.localpath, 0o775) + # Teardown: script_path = self.reserve_path('jobs', jobId, 'teardown.sh') with open(script_path.localpath, 'w') as w: w.write(teardown_script) os.chmod(script_path.localpath, 0o775) + + # copy delocalization script os.symlink( os.path.join( os.path.dirname(__file__), diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index ea395b46..a98a28e6 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -101,7 +101,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty 3 phase task: 1) Pre-scan inputs to determine proper localization strategy for all inputs 2) Begin localizing job inputs. For each job, check the predetermined strategy - and set up the job's setup and teardown scripts + and set up the job's setup, localization, and teardown scripts 3) Finally, finalize the localization. This may include broadcasting the staging directory or copying a batch of gsutil files Returns the remote staging directory, which is now ready for final startup @@ -138,18 +138,29 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty jobId, )) self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport) - # Now localize job setup and teardown scripts - setup_script, teardown_script = self.job_setup_teardown(jobId, patterns) - # Setup: + + # Now localize job setup, localization, and teardown scripts + setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns) + + # Setup: script_path = self.reserve_path('jobs', jobId, 'setup.sh') with open(script_path.localpath, 'w') as w: w.write(setup_script) os.chmod(script_path.localpath, 0o775) + + # Localization: + script_path = self.reserve_path('jobs', jobId, 'localization.sh') + with open(script_path.localpath, 'w') as w: + w.write(localization_script) + os.chmod(script_path.localpath, 0o775) + # Teardown: script_path = self.reserve_path('jobs', jobId, 'teardown.sh') with open(script_path.localpath, 'w') as w: w.write(teardown_script) os.chmod(script_path.localpath, 0o775) + + # copy delocalization script shutil.copyfile( os.path.join( os.path.dirname(__file__), @@ -159,14 +170,18 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty ) return self.finalize_staging_dir(inputs) - def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str]: + def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str, str]: """ - Returns a tuple of (setup script, teardown script) for the given job id. + Returns a tuple of (setup script, localization script, teardown script) for the given job id. Must call after pre-scanning inputs """ + + # generate job variable, exports, and localization_tasks arrays + # - job variables and exports are set when setup.sh is _sourced_ + # - localization tasks are run when localization.sh is _run_ job_vars = [] exports = [] - extra_tasks = [ + localization_tasks = [ 'if [[ -d $CANINE_JOB_INPUTS ]]; then cd $CANINE_JOB_INPUTS; fi' ] compute_env = self.environment('compute') @@ -174,7 +189,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ if val.type == 'stream': job_vars.append(shlex.quote(key)) dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - extra_tasks += [ + localization_tasks += [ 'gsutil ls {} > /dev/null'.format(shlex.quote(val.path)), 'if [[ -e {0} ]]; then rm {0}; fi'.format(dest.computepath), 'mkfifo {}'.format(dest.computepath), @@ -191,7 +206,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ elif val.type == 'download': job_vars.append(shlex.quote(key)) dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - extra_tasks += [ + localization_tasks += [ "if [[ ! -e {2}.fin ]]; then gsutil {0} -o GSUtil:check_hashes=if_fast_else_skip cp {1} {2} && touch {2}.fin; fi".format( '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '', shlex.quote(val.path), @@ -210,11 +225,12 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ )) else: print("Unknown localization command:", val.type, "skipping", key, val.path, file=sys.stderr) + + # generate setup script setup_script = '\n'.join( line.rstrip() for line in [ '#!/bin/bash', - 'set -e', 'export CANINE_JOB_VARS={}'.format(':'.join(job_vars)), 'export CANINE_JOB_INPUTS="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'inputs')), 'export CANINE_JOB_ROOT="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'workspace')), @@ -222,8 +238,16 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ 'export CANINE_JOB_TEARDOWN="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'teardown.sh')), 'mkdir -p $CANINE_JOB_INPUTS', 'mkdir -p $CANINE_JOB_ROOT', - ] + exports + extra_tasks - ) + '\ncd $CANINE_JOB_ROOT\nset +e\n' + ] + exports + ) + '\ncd $CANINE_JOB_ROOT\n' + + # generate localization script + localization_script = '\n'.join([ + "#!/bin/bash", + "set -e" + ] + localization_tasks) + "\nset +e" + + # generate teardown script teardown_script = '\n'.join( line.rstrip() for line in [ @@ -241,7 +265,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ ), ] ) - return setup_script, teardown_script + return setup_script, localization_script, teardown_script def delocalize(self, patterns: typing.Dict[str, str], output_dir: typing.Optional[str] = None) -> typing.Dict[str, typing.Dict[str, str]]: """ diff --git a/canine/localization/remote.py b/canine/localization/remote.py index 4b593f91..f8b7e784 100644 --- a/canine/localization/remote.py +++ b/canine/localization/remote.py @@ -79,18 +79,29 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty ) ) self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport) - # Now localize job setup and teardown scripts - setup_script, teardown_script = self.job_setup_teardown(jobId, patterns) - # Setup: + + # Now localize job setup, localization, and teardown scripts + setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns) + + # Setup: script_path = self.reserve_path('jobs', jobId, 'setup.sh') with transport.open(script_path.controllerpath, 'w') as w: w.write(setup_script) transport.chmod(script_path.controllerpath, 0o775) + + # Localization: + script_path = self.reserve_path('jobs', jobId, 'localization.sh') + with transport.open(script_path.controllerpath, 'w') as w: + w.write(localization_script) + transport.chmod(script_path.controllerpath, 0o775) + # Teardown: script_path = self.reserve_path('jobs', jobId, 'teardown.sh') with transport.open(script_path.controllerpath, 'w') as w: w.write(teardown_script) transport.chmod(script_path.controllerpath, 0o775) + + # copy delocalization script transport.send( os.path.join( os.path.dirname(__file__), diff --git a/canine/orchestrator.py b/canine/orchestrator.py index e00991ee..854e5d98 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -45,8 +45,11 @@ export CANINE_OUTPUT="{{CANINE_OUTPUT}}" export CANINE_JOBS="{{CANINE_JOBS}}" source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/setup.sh +$CANINE_JOBS/$SLURM_ARRAY_TASK_ID/localization.sh +LOCALIZER_JOB_RC=$? {{pipeline_script}} CANINE_JOB_RC=$? +[[ $LOCALIZER_JOB_RC != 0 ]] && CANINE_JOB_RC=$LOCALIZER_JOB_RC || CANINE_JOB_RC=$CANINE_JOB_RC source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/teardown.sh exit $CANINE_JOB_RC """.format(version=version) From 67f33eeff8b625eb33f74207631b1ce6a1c042c7 Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Thu, 2 Apr 2020 13:21:18 -0400 Subject: [PATCH 5/7] Update tests --- canine/test/test_localizer_batched.py | 6 +++--- canine/test/test_localizer_nfs.py | 6 +++--- canine/test/test_orchestrator.py | 3 +++ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/canine/test/test_localizer_batched.py b/canine/test/test_localizer_batched.py index 3a55e787..3a6d1340 100644 --- a/canine/test/test_localizer_batched.py +++ b/canine/test/test_localizer_batched.py @@ -292,7 +292,7 @@ def test_setup_teardown(self): 'string-incommon': Localization(None, os.urandom(8).hex()), # no localization. Setup teardown exports as string } - setup_text, teardown_text = localizer.job_setup_teardown( + setup_text, localization_text, teardown_text = localizer.job_setup_teardown( jobId=str(jid), patterns=output_patterns ) @@ -329,7 +329,7 @@ def test_setup_teardown(self): src=src, dest=path ), - setup_text + localization_text ) elif value.type == 'download': src = path @@ -341,7 +341,7 @@ def test_setup_teardown(self): src=src, dest=path ), - setup_text + localization_text ) if isinstance(path, PathType): path = path.computepath diff --git a/canine/test/test_localizer_nfs.py b/canine/test/test_localizer_nfs.py index 1253d01a..25673ac0 100644 --- a/canine/test/test_localizer_nfs.py +++ b/canine/test/test_localizer_nfs.py @@ -124,7 +124,7 @@ def test_setup_teardown(self): 'string-incommon': Localization(None, os.urandom(8).hex()), # no localization. Setup teardown exports as string } - setup_text, teardown_text = localizer.job_setup_teardown( + setup_text, localization_text, teardown_text = localizer.job_setup_teardown( jobId=str(jid), patterns=output_patterns ) @@ -161,7 +161,7 @@ def test_setup_teardown(self): src=src, dest=path ), - setup_text + localization_text ) elif value.type == 'download': src = path @@ -173,7 +173,7 @@ def test_setup_teardown(self): src=src, dest=path ), - setup_text + localization_text ) if isinstance(path, PathType): path = path.computepath diff --git a/canine/test/test_orchestrator.py b/canine/test/test_orchestrator.py index d0716c5c..92e310b3 100644 --- a/canine/test/test_orchestrator.py +++ b/canine/test/test_orchestrator.py @@ -94,8 +94,11 @@ def test_localization(self): 'export CANINE_OUTPUT="/mnt/nfs/canine/outputs"\n' 'export CANINE_JOBS="/mnt/nfs/canine/jobs"\n' 'source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/setup.sh\n' + '$CANINE_JOBS/$SLURM_ARRAY_TASK_ID/localization.sh\n' + 'LOCALIZER_JOB_RC=$?\n' '/mnt/nfs/canine/script.sh\n' 'CANINE_JOB_RC=$?\n' + '[[ $LOCALIZER_JOB_RC != 0 ]] && CANINE_JOB_RC=$LOCALIZER_JOB_RC || CANINE_JOB_RC=$CANINE_JOB_RC\n' 'source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/teardown.sh\n' 'exit $CANINE_JOB_RC\n'.format(version=version) ) From e7671d9bc9e275db5c98b4fff5ca46270a36494f Mon Sep 17 00:00:00 2001 From: Julian Hess Date: Fri, 3 Apr 2020 10:33:17 -0400 Subject: [PATCH 6/7] Append trailing newline --- canine/localization/base.py | 2 +- canine/localization/nfs.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/canine/localization/base.py b/canine/localization/base.py index 1b12c1fc..7f072169 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -558,7 +558,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ localization_script = '\n'.join([ "#!/bin/bash", "set -e" - ] + localization_tasks) + "\nset +e" + ] + localization_tasks) + "\nset +e\n" # generate teardown script teardown_script = '\n'.join( diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index a98a28e6..c37dbac2 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -245,7 +245,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ localization_script = '\n'.join([ "#!/bin/bash", "set -e" - ] + localization_tasks) + "\nset +e" + ] + localization_tasks) + "\nset +e\n" # generate teardown script teardown_script = '\n'.join( From 2e682a4da570f7c3cc6657ae2b180bacba97504a Mon Sep 17 00:00:00 2001 From: Aaron Graubert Date: Fri, 17 Apr 2020 16:14:28 -0400 Subject: [PATCH 7/7] Fixed bad merge of nfs and missing args to example pipeline --- canine/localization/nfs.py | 104 +-------------------------------- examples/example_pipeline.yaml | 2 + 2 files changed, 4 insertions(+), 102 deletions(-) diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index 5538c2dd..4e5263af 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -137,13 +137,13 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty # Now localize job setup, localization, and teardown scripts setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns) - # Setup: + # Setup: script_path = self.reserve_path('jobs', jobId, 'setup.sh') with open(script_path.localpath, 'w') as w: w.write(setup_script) os.chmod(script_path.localpath, 0o775) - # Localization: + # Localization: script_path = self.reserve_path('jobs', jobId, 'localization.sh') with open(script_path.localpath, 'w') as w: w.write(localization_script) @@ -165,106 +165,6 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty ) return self.finalize_staging_dir(inputs) -<<<<<<< HEAD - def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str, str]: - """ - Returns a tuple of (setup script, localization script, teardown script) for the given job id. - Must call after pre-scanning inputs - """ - - # generate job variable, exports, and localization_tasks arrays - # - job variables and exports are set when setup.sh is _sourced_ - # - localization tasks are run when localization.sh is _run_ - job_vars = [] - exports = [] - localization_tasks = [ - 'if [[ -d $CANINE_JOB_INPUTS ]]; then cd $CANINE_JOB_INPUTS; fi' - ] - compute_env = self.environment('compute') - for key, val in self.inputs[jobId].items(): - if val.type == 'stream': - job_vars.append(shlex.quote(key)) - dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - localization_tasks += [ - 'gsutil ls {} > /dev/null'.format(shlex.quote(val.path)), - 'if [[ -e {0} ]]; then rm {0}; fi'.format(dest.computepath), - 'mkfifo {}'.format(dest.computepath), - "gsutil {} cat {} > {} &".format( - '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '', - shlex.quote(val.path), - dest.computepath - ) - ] - exports.append('export {}="{}"'.format( - key, - dest.computepath - )) - elif val.type == 'download': - job_vars.append(shlex.quote(key)) - dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - localization_tasks += [ - "if [[ ! -e {2}.fin ]]; then gsutil {0} -o GSUtil:check_hashes=if_fast_else_skip cp {1} {2} && touch {2}.fin; fi".format( - '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '', - shlex.quote(val.path), - dest.computepath - ) - ] - exports.append('export {}="{}"'.format( - key, - dest.computepath - )) - elif val.type is None: - job_vars.append(shlex.quote(key)) - exports.append('export {}={}'.format( - key, - shlex.quote(val.path.computepath if isinstance(val.path, PathType) else val.path) - )) - else: - print("Unknown localization command:", val.type, "skipping", key, val.path, file=sys.stderr) - - # generate setup script - setup_script = '\n'.join( - line.rstrip() - for line in [ - '#!/bin/bash', - 'export CANINE_JOB_VARS={}'.format(':'.join(job_vars)), - 'export CANINE_JOB_INPUTS="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'inputs')), - 'export CANINE_JOB_ROOT="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'workspace')), - 'export CANINE_JOB_SETUP="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'setup.sh')), - 'export CANINE_JOB_TEARDOWN="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'teardown.sh')), - 'mkdir -p $CANINE_JOB_INPUTS', - 'mkdir -p $CANINE_JOB_ROOT', - ] + exports - ) + '\ncd $CANINE_JOB_ROOT\n' - - # generate localization script - localization_script = '\n'.join([ - "#!/bin/bash", - "set -e" - ] + localization_tasks) + "\nset +e\n" - - # generate teardown script - teardown_script = '\n'.join( - line.rstrip() - for line in [ - '#!/bin/bash', - 'if [[ -d {0} ]]; then cd {0}; fi'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'workspace')), - # 'mv ../stderr ../stdout .', - 'if which python3 2>/dev/null >/dev/null; then python3 {0} {1} {2} {3}; else python {0} {1} {2} {3}; fi'.format( - os.path.join(compute_env['CANINE_ROOT'], 'delocalization.py'), - compute_env['CANINE_OUTPUT'], - jobId, - ' '.join( - '-p {} {}'.format(name, shlex.quote(pattern)) - for name, pattern in patterns.items() - ) - ), - ] - ) - return setup_script, localization_script, teardown_script - -======= ->>>>>>> master def delocalize(self, patterns: typing.Dict[str, str], output_dir: typing.Optional[str] = None) -> typing.Dict[str, typing.Dict[str, str]]: """ Delocalizes output from all jobs. diff --git a/examples/example_pipeline.yaml b/examples/example_pipeline.yaml index 70824316..ff22ec1b 100644 --- a/examples/example_pipeline.yaml +++ b/examples/example_pipeline.yaml @@ -13,3 +13,5 @@ inputs: backend: type: Dummy n_workers: 1 +localization: + staging_dir: /mnt/nfs/canine