diff --git a/canine/localization/base.py b/canine/localization/base.py index a91d73d4..72fcf3d2 100644 --- a/canine/localization/base.py +++ b/canine/localization/base.py @@ -506,14 +506,18 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], comm value ) - def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str]: + def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str, str]: """ - Returns a tuple of (setup script, teardown script) for the given job id. + Returns a tuple of (setup script, localization script, teardown script) for the given job id. Must call after pre-scanning inputs """ + + # generate job variable, exports, and localization_tasks arrays + # - job variables and exports are set when setup.sh is _sourced_ + # - localization tasks are run when localization.sh is _run_ job_vars = [] exports = [] - extra_tasks = [ + localization_tasks = [ 'if [[ -d $CANINE_JOB_INPUTS ]]; then cd $CANINE_JOB_INPUTS; fi' ] compute_env = self.environment('remote') @@ -521,7 +525,8 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ if val.type == 'stream': job_vars.append(shlex.quote(key)) dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - extra_tasks += [ + localization_tasks += [ + 'gsutil ls {} > /dev/null'.format(shlex.quote(val.path)), 'if [[ -e {0} ]]; then rm {0}; fi'.format(dest.remotepath), 'mkfifo {}'.format(dest.remotepath), "gsutil {} cat {} > {} &".format( @@ -537,7 +542,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ elif val.type == 'download': job_vars.append(shlex.quote(key)) dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path))) - extra_tasks += [ + localization_tasks += [ "if [[ ! -e {2}.fin ]]; then gsutil {0} -o GSUtil:check_hashes=if_fast_else_skip cp {1} {2} && touch {2}.fin; fi".format( '-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '', shlex.quote(val.path), @@ -556,6 +561,8 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ )) else: print("Unknown localization command:", val.type, "skipping", key, val.path, file=sys.stderr) + + # generate setup script setup_script = '\n'.join( line.rstrip() for line in [ @@ -567,8 +574,16 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ 'export CANINE_JOB_TEARDOWN="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'teardown.sh')), 'mkdir -p $CANINE_JOB_INPUTS', 'mkdir -p $CANINE_JOB_ROOT', - ] + exports + extra_tasks + ] + exports ) + '\ncd $CANINE_JOB_ROOT\n' + + # generate localization script + localization_script = '\n'.join([ + "#!/bin/bash", + "set -e" + ] + localization_tasks) + "\nset +e\n" + + # generate teardown script teardown_script = '\n'.join( line.rstrip() for line in [ @@ -586,7 +601,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ ), ] ) - return setup_script, teardown_script + return setup_script, localization_script, teardown_script @abc.abstractmethod def __enter__(self): @@ -623,7 +638,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty 3 phase task: 1) Pre-scan inputs to determine proper localization strategy for all inputs 2) Begin localizing job inputs. For each job, check the predetermined strategy - and set up the job's setup and teardown scripts + and set up the job's setup, localization, and teardown scripts 3) Finally, finalize the localization. This may include broadcasting the staging directory or copying a batch of gsutil files Returns the remote staging directory, which is now ready for final startup diff --git a/canine/localization/local.py b/canine/localization/local.py index 4b58409d..89480d70 100644 --- a/canine/localization/local.py +++ b/canine/localization/local.py @@ -82,7 +82,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty 3 phase task: 1) Pre-scan inputs to determine proper localization strategy for all inputs 2) Begin localizing job inputs. For each job, check the predetermined strategy - and set up the job's setup and teardown scripts + and set up the job's setup, localization, and teardown scripts 3) Finally, finalize the localization. This may include broadcasting the staging directory or copying a batch of gsutil files Returns the remote staging directory, which is now ready for final startup @@ -101,18 +101,29 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty jobId, )) self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport) - # Now localize job setup and teardown scripts - setup_script, teardown_script = self.job_setup_teardown(jobId, patterns) - # Setup: + + # Now localize job setup, localization, and teardown scripts + setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns) + + # Setup: script_path = self.reserve_path('jobs', jobId, 'setup.sh') with open(script_path.localpath, 'w') as w: w.write(setup_script) os.chmod(script_path.localpath, 0o775) + + # Localization: + script_path = self.reserve_path('jobs', jobId, 'localization.sh') + with open(script_path.localpath, 'w') as w: + w.write(localization_script) + os.chmod(script_path.localpath, 0o775) + # Teardown: script_path = self.reserve_path('jobs', jobId, 'teardown.sh') with open(script_path.localpath, 'w') as w: w.write(teardown_script) os.chmod(script_path.localpath, 0o775) + + # copy delocalization script os.symlink( os.path.join( os.path.dirname(__file__), diff --git a/canine/localization/nfs.py b/canine/localization/nfs.py index 48077352..4e5263af 100644 --- a/canine/localization/nfs.py +++ b/canine/localization/nfs.py @@ -96,7 +96,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty 3 phase task: 1) Pre-scan inputs to determine proper localization strategy for all inputs 2) Begin localizing job inputs. For each job, check the predetermined strategy - and set up the job's setup and teardown scripts + and set up the job's setup, localization, and teardown scripts 3) Finally, finalize the localization. This may include broadcasting the staging directory or copying a batch of gsutil files Returns the remote staging directory, which is now ready for final startup @@ -133,18 +133,29 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty jobId, )) self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport) - # Now localize job setup and teardown scripts - setup_script, teardown_script = self.job_setup_teardown(jobId, patterns) + + # Now localize job setup, localization, and teardown scripts + setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns) + # Setup: script_path = self.reserve_path('jobs', jobId, 'setup.sh') with open(script_path.localpath, 'w') as w: w.write(setup_script) os.chmod(script_path.localpath, 0o775) + + # Localization: + script_path = self.reserve_path('jobs', jobId, 'localization.sh') + with open(script_path.localpath, 'w') as w: + w.write(localization_script) + os.chmod(script_path.localpath, 0o775) + # Teardown: script_path = self.reserve_path('jobs', jobId, 'teardown.sh') with open(script_path.localpath, 'w') as w: w.write(teardown_script) os.chmod(script_path.localpath, 0o775) + + # copy delocalization script shutil.copyfile( os.path.join( os.path.dirname(__file__), diff --git a/canine/localization/remote.py b/canine/localization/remote.py index 115f99a7..2b2e3367 100644 --- a/canine/localization/remote.py +++ b/canine/localization/remote.py @@ -79,13 +79,22 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty ) ) self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport) - # Now localize job setup and teardown scripts - setup_script, teardown_script = self.job_setup_teardown(jobId, patterns) + + # Now localize job setup, localization, and teardown scripts + setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns) + # Setup: script_path = self.reserve_path('jobs', jobId, 'setup.sh') with transport.open(script_path.remotepath, 'w') as w: w.write(setup_script) transport.chmod(script_path.remotepath, 0o775) + + # Localization: + script_path = self.reserve_path('jobs', jobId, 'localization.sh') + with transport.open(script_path.remotepath, 'w') as w: + w.write(localization_script) + transport.chmod(script_path.remotepath, 0o775) + # Teardown: script_path = self.reserve_path('jobs', jobId, 'teardown.sh') with transport.open(script_path.remotepath, 'w') as w: diff --git a/canine/orchestrator.py b/canine/orchestrator.py index 35d1cf58..2a59007a 100644 --- a/canine/orchestrator.py +++ b/canine/orchestrator.py @@ -45,8 +45,11 @@ export CANINE_OUTPUT="{{CANINE_OUTPUT}}" export CANINE_JOBS="{{CANINE_JOBS}}" source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/setup.sh +$CANINE_JOBS/$SLURM_ARRAY_TASK_ID/localization.sh +LOCALIZER_JOB_RC=$? {{pipeline_script}} CANINE_JOB_RC=$? +[[ $LOCALIZER_JOB_RC != 0 ]] && CANINE_JOB_RC=$LOCALIZER_JOB_RC || CANINE_JOB_RC=$CANINE_JOB_RC source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/teardown.sh exit $CANINE_JOB_RC """.format(version=version) diff --git a/canine/test/test_localizer_batched.py b/canine/test/test_localizer_batched.py index 4a5684ac..159caa6d 100644 --- a/canine/test/test_localizer_batched.py +++ b/canine/test/test_localizer_batched.py @@ -276,7 +276,7 @@ def test_setup_teardown(self): 'string-incommon': Localization(None, os.urandom(8).hex()), # no localization. Setup teardown exports as string } - setup_text, teardown_text = localizer.job_setup_teardown( + setup_text, localization_text, teardown_text = localizer.job_setup_teardown( jobId=str(jid), patterns=output_patterns ) @@ -313,7 +313,7 @@ def test_setup_teardown(self): src=src, dest=path ), - setup_text + localization_text ) elif value.type == 'download': src = path @@ -325,7 +325,7 @@ def test_setup_teardown(self): src=src, dest=path ), - setup_text + localization_text ) if isinstance(path, PathType): path = path.remotepath diff --git a/canine/test/test_localizer_nfs.py b/canine/test/test_localizer_nfs.py index e95adc1a..2adbf321 100644 --- a/canine/test/test_localizer_nfs.py +++ b/canine/test/test_localizer_nfs.py @@ -94,7 +94,7 @@ def test_localize_file(self): self.assertTrue(transport.isdir(os.path.join(self.localizer.staging_dir, 'dirc'))) self.assertTrue(transport.isdir(os.path.join(self.localizer.staging_dir, 'dirc', 'test'))) - for (ldirpath, ldirnames, lfilenames), (rdirpath, rdirnames, rfilenames) in zip(os.walk(os.path.dirname(__file__)), transport.walk(self.localizer.reserve_path('dirc', 'test').controllerpath)): + for (ldirpath, ldirnames, lfilenames), (rdirpath, rdirnames, rfilenames) in zip(os.walk(os.path.dirname(__file__)), transport.walk(self.localizer.reserve_path('dirc', 'test').remotepath)): with self.subTest(dirname=ldirpath): self.assertEqual(os.path.basename(ldirpath), os.path.basename(rdirpath)) self.assertListEqual(sorted(ldirnames), sorted(rdirnames)) diff --git a/canine/test/test_orchestrator.py b/canine/test/test_orchestrator.py index d0716c5c..92e310b3 100644 --- a/canine/test/test_orchestrator.py +++ b/canine/test/test_orchestrator.py @@ -94,8 +94,11 @@ def test_localization(self): 'export CANINE_OUTPUT="/mnt/nfs/canine/outputs"\n' 'export CANINE_JOBS="/mnt/nfs/canine/jobs"\n' 'source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/setup.sh\n' + '$CANINE_JOBS/$SLURM_ARRAY_TASK_ID/localization.sh\n' + 'LOCALIZER_JOB_RC=$?\n' '/mnt/nfs/canine/script.sh\n' 'CANINE_JOB_RC=$?\n' + '[[ $LOCALIZER_JOB_RC != 0 ]] && CANINE_JOB_RC=$LOCALIZER_JOB_RC || CANINE_JOB_RC=$CANINE_JOB_RC\n' 'source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/teardown.sh\n' 'exit $CANINE_JOB_RC\n'.format(version=version) ) diff --git a/examples/example_pipeline.yaml b/examples/example_pipeline.yaml index 70824316..ff22ec1b 100644 --- a/examples/example_pipeline.yaml +++ b/examples/example_pipeline.yaml @@ -13,3 +13,5 @@ inputs: backend: type: Dummy n_workers: 1 +localization: + staging_dir: /mnt/nfs/canine