Skip to content

Commit

Permalink
Merge pull request #45 from julianhess/set_eee
Browse files Browse the repository at this point in the history
Handle failures during `setup.sh`
  • Loading branch information
agraubert committed Apr 17, 2020
2 parents fcbb115 + 2e682a4 commit bdfaaec
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 21 deletions.
31 changes: 23 additions & 8 deletions canine/localization/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,22 +506,27 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], comm
value
)

def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str]:
def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str, str]:
"""
Returns a tuple of (setup script, teardown script) for the given job id.
Returns a tuple of (setup script, localization script, teardown script) for the given job id.
Must call after pre-scanning inputs
"""

# generate job variable, exports, and localization_tasks arrays
# - job variables and exports are set when setup.sh is _sourced_
# - localization tasks are run when localization.sh is _run_
job_vars = []
exports = []
extra_tasks = [
localization_tasks = [
'if [[ -d $CANINE_JOB_INPUTS ]]; then cd $CANINE_JOB_INPUTS; fi'
]
compute_env = self.environment('remote')
for key, val in self.inputs[jobId].items():
if val.type == 'stream':
job_vars.append(shlex.quote(key))
dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path)))
extra_tasks += [
localization_tasks += [
'gsutil ls {} > /dev/null'.format(shlex.quote(val.path)),
'if [[ -e {0} ]]; then rm {0}; fi'.format(dest.remotepath),
'mkfifo {}'.format(dest.remotepath),
"gsutil {} cat {} > {} &".format(
Expand All @@ -537,7 +542,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ
elif val.type == 'download':
job_vars.append(shlex.quote(key))
dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path)))
extra_tasks += [
localization_tasks += [
"if [[ ! -e {2}.fin ]]; then gsutil {0} -o GSUtil:check_hashes=if_fast_else_skip cp {1} {2} && touch {2}.fin; fi".format(
'-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '',
shlex.quote(val.path),
Expand All @@ -556,6 +561,8 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ
))
else:
print("Unknown localization command:", val.type, "skipping", key, val.path, file=sys.stderr)

# generate setup script
setup_script = '\n'.join(
line.rstrip()
for line in [
Expand All @@ -567,8 +574,16 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ
'export CANINE_JOB_TEARDOWN="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'teardown.sh')),
'mkdir -p $CANINE_JOB_INPUTS',
'mkdir -p $CANINE_JOB_ROOT',
] + exports + extra_tasks
] + exports
) + '\ncd $CANINE_JOB_ROOT\n'

# generate localization script
localization_script = '\n'.join([
"#!/bin/bash",
"set -e"
] + localization_tasks) + "\nset +e\n"

# generate teardown script
teardown_script = '\n'.join(
line.rstrip()
for line in [
Expand All @@ -586,7 +601,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ
),
]
)
return setup_script, teardown_script
return setup_script, localization_script, teardown_script

@abc.abstractmethod
def __enter__(self):
Expand Down Expand Up @@ -623,7 +638,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
3 phase task:
1) Pre-scan inputs to determine proper localization strategy for all inputs
2) Begin localizing job inputs. For each job, check the predetermined strategy
and set up the job's setup and teardown scripts
and set up the job's setup, localization, and teardown scripts
3) Finally, finalize the localization. This may include broadcasting the
staging directory or copying a batch of gsutil files
Returns the remote staging directory, which is now ready for final startup
Expand Down
19 changes: 15 additions & 4 deletions canine/localization/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
3 phase task:
1) Pre-scan inputs to determine proper localization strategy for all inputs
2) Begin localizing job inputs. For each job, check the predetermined strategy
and set up the job's setup and teardown scripts
and set up the job's setup, localization, and teardown scripts
3) Finally, finalize the localization. This may include broadcasting the
staging directory or copying a batch of gsutil files
Returns the remote staging directory, which is now ready for final startup
Expand All @@ -101,18 +101,29 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
jobId,
))
self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport)
# Now localize job setup and teardown scripts
setup_script, teardown_script = self.job_setup_teardown(jobId, patterns)
# Setup:

# Now localize job setup, localization, and teardown scripts
setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns)

# Setup:
script_path = self.reserve_path('jobs', jobId, 'setup.sh')
with open(script_path.localpath, 'w') as w:
w.write(setup_script)
os.chmod(script_path.localpath, 0o775)

# Localization:
script_path = self.reserve_path('jobs', jobId, 'localization.sh')
with open(script_path.localpath, 'w') as w:
w.write(localization_script)
os.chmod(script_path.localpath, 0o775)

# Teardown:
script_path = self.reserve_path('jobs', jobId, 'teardown.sh')
with open(script_path.localpath, 'w') as w:
w.write(teardown_script)
os.chmod(script_path.localpath, 0o775)

# copy delocalization script
os.symlink(
os.path.join(
os.path.dirname(__file__),
Expand Down
17 changes: 14 additions & 3 deletions canine/localization/nfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
3 phase task:
1) Pre-scan inputs to determine proper localization strategy for all inputs
2) Begin localizing job inputs. For each job, check the predetermined strategy
and set up the job's setup and teardown scripts
and set up the job's setup, localization, and teardown scripts
3) Finally, finalize the localization. This may include broadcasting the
staging directory or copying a batch of gsutil files
Returns the remote staging directory, which is now ready for final startup
Expand Down Expand Up @@ -133,18 +133,29 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
jobId,
))
self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport)
# Now localize job setup and teardown scripts
setup_script, teardown_script = self.job_setup_teardown(jobId, patterns)

# Now localize job setup, localization, and teardown scripts
setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns)

# Setup:
script_path = self.reserve_path('jobs', jobId, 'setup.sh')
with open(script_path.localpath, 'w') as w:
w.write(setup_script)
os.chmod(script_path.localpath, 0o775)

# Localization:
script_path = self.reserve_path('jobs', jobId, 'localization.sh')
with open(script_path.localpath, 'w') as w:
w.write(localization_script)
os.chmod(script_path.localpath, 0o775)

# Teardown:
script_path = self.reserve_path('jobs', jobId, 'teardown.sh')
with open(script_path.localpath, 'w') as w:
w.write(teardown_script)
os.chmod(script_path.localpath, 0o775)

# copy delocalization script
shutil.copyfile(
os.path.join(
os.path.dirname(__file__),
Expand Down
13 changes: 11 additions & 2 deletions canine/localization/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,22 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
)
)
self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport)
# Now localize job setup and teardown scripts
setup_script, teardown_script = self.job_setup_teardown(jobId, patterns)

# Now localize job setup, localization, and teardown scripts
setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns)

# Setup:
script_path = self.reserve_path('jobs', jobId, 'setup.sh')
with transport.open(script_path.remotepath, 'w') as w:
w.write(setup_script)
transport.chmod(script_path.remotepath, 0o775)

# Localization:
script_path = self.reserve_path('jobs', jobId, 'localization.sh')
with transport.open(script_path.remotepath, 'w') as w:
w.write(localization_script)
transport.chmod(script_path.remotepath, 0o775)

# Teardown:
script_path = self.reserve_path('jobs', jobId, 'teardown.sh')
with transport.open(script_path.remotepath, 'w') as w:
Expand Down
3 changes: 3 additions & 0 deletions canine/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
export CANINE_OUTPUT="{{CANINE_OUTPUT}}"
export CANINE_JOBS="{{CANINE_JOBS}}"
source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/setup.sh
$CANINE_JOBS/$SLURM_ARRAY_TASK_ID/localization.sh
LOCALIZER_JOB_RC=$?
{{pipeline_script}}
CANINE_JOB_RC=$?
[[ $LOCALIZER_JOB_RC != 0 ]] && CANINE_JOB_RC=$LOCALIZER_JOB_RC || CANINE_JOB_RC=$CANINE_JOB_RC
source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/teardown.sh
exit $CANINE_JOB_RC
""".format(version=version)
Expand Down
6 changes: 3 additions & 3 deletions canine/test/test_localizer_batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def test_setup_teardown(self):
'string-incommon': Localization(None, os.urandom(8).hex()), # no localization. Setup teardown exports as string
}

setup_text, teardown_text = localizer.job_setup_teardown(
setup_text, localization_text, teardown_text = localizer.job_setup_teardown(
jobId=str(jid),
patterns=output_patterns
)
Expand Down Expand Up @@ -313,7 +313,7 @@ def test_setup_teardown(self):
src=src,
dest=path
),
setup_text
localization_text
)
elif value.type == 'download':
src = path
Expand All @@ -325,7 +325,7 @@ def test_setup_teardown(self):
src=src,
dest=path
),
setup_text
localization_text
)
if isinstance(path, PathType):
path = path.remotepath
Expand Down
2 changes: 1 addition & 1 deletion canine/test/test_localizer_nfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_localize_file(self):
self.assertTrue(transport.isdir(os.path.join(self.localizer.staging_dir, 'dirc')))
self.assertTrue(transport.isdir(os.path.join(self.localizer.staging_dir, 'dirc', 'test')))

for (ldirpath, ldirnames, lfilenames), (rdirpath, rdirnames, rfilenames) in zip(os.walk(os.path.dirname(__file__)), transport.walk(self.localizer.reserve_path('dirc', 'test').controllerpath)):
for (ldirpath, ldirnames, lfilenames), (rdirpath, rdirnames, rfilenames) in zip(os.walk(os.path.dirname(__file__)), transport.walk(self.localizer.reserve_path('dirc', 'test').remotepath)):
with self.subTest(dirname=ldirpath):
self.assertEqual(os.path.basename(ldirpath), os.path.basename(rdirpath))
self.assertListEqual(sorted(ldirnames), sorted(rdirnames))
Expand Down
3 changes: 3 additions & 0 deletions canine/test/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ def test_localization(self):
'export CANINE_OUTPUT="/mnt/nfs/canine/outputs"\n'
'export CANINE_JOBS="/mnt/nfs/canine/jobs"\n'
'source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/setup.sh\n'
'$CANINE_JOBS/$SLURM_ARRAY_TASK_ID/localization.sh\n'
'LOCALIZER_JOB_RC=$?\n'
'/mnt/nfs/canine/script.sh\n'
'CANINE_JOB_RC=$?\n'
'[[ $LOCALIZER_JOB_RC != 0 ]] && CANINE_JOB_RC=$LOCALIZER_JOB_RC || CANINE_JOB_RC=$CANINE_JOB_RC\n'
'source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/teardown.sh\n'
'exit $CANINE_JOB_RC\n'.format(version=version)
)
Expand Down
2 changes: 2 additions & 0 deletions examples/example_pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ inputs:
backend:
type: Dummy
n_workers: 1
localization:
staging_dir: /mnt/nfs/canine

0 comments on commit bdfaaec

Please sign in to comment.