Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle failures during setup.sh #45

Merged
merged 8 commits into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions canine/localization/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,22 +506,27 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], comm
value
)

def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str]:
def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typing.Tuple[str, str, str]:
"""
Returns a tuple of (setup script, teardown script) for the given job id.
Returns a tuple of (setup script, localization script, teardown script) for the given job id.
Must call after pre-scanning inputs
"""

# generate job variable, exports, and localization_tasks arrays
# - job variables and exports are set when setup.sh is _sourced_
# - localization tasks are run when localization.sh is _run_
job_vars = []
exports = []
extra_tasks = [
localization_tasks = [
'if [[ -d $CANINE_JOB_INPUTS ]]; then cd $CANINE_JOB_INPUTS; fi'
]
compute_env = self.environment('remote')
for key, val in self.inputs[jobId].items():
if val.type == 'stream':
job_vars.append(shlex.quote(key))
dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path)))
extra_tasks += [
localization_tasks += [
'gsutil ls {} > /dev/null'.format(shlex.quote(val.path)),
'if [[ -e {0} ]]; then rm {0}; fi'.format(dest.remotepath),
'mkfifo {}'.format(dest.remotepath),
"gsutil {} cat {} > {} &".format(
Expand All @@ -537,7 +542,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ
elif val.type == 'download':
job_vars.append(shlex.quote(key))
dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path)))
extra_tasks += [
localization_tasks += [
"if [[ ! -e {2}.fin ]]; then gsutil {0} -o GSUtil:check_hashes=if_fast_else_skip cp {1} {2} && touch {2}.fin; fi".format(
'-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '',
shlex.quote(val.path),
Expand All @@ -556,6 +561,8 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ
))
else:
print("Unknown localization command:", val.type, "skipping", key, val.path, file=sys.stderr)

# generate setup script
setup_script = '\n'.join(
line.rstrip()
for line in [
Expand All @@ -567,8 +574,16 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ
'export CANINE_JOB_TEARDOWN="{}"'.format(os.path.join(compute_env['CANINE_JOBS'], jobId, 'teardown.sh')),
'mkdir -p $CANINE_JOB_INPUTS',
'mkdir -p $CANINE_JOB_ROOT',
] + exports + extra_tasks
] + exports
) + '\ncd $CANINE_JOB_ROOT\n'

# generate localization script
localization_script = '\n'.join([
"#!/bin/bash",
"set -e"
] + localization_tasks) + "\nset +e\n"

# generate teardown script
teardown_script = '\n'.join(
line.rstrip()
for line in [
Expand All @@ -586,7 +601,7 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ
),
]
)
return setup_script, teardown_script
return setup_script, localization_script, teardown_script

@abc.abstractmethod
def __enter__(self):
Expand Down Expand Up @@ -623,7 +638,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
3 phase task:
1) Pre-scan inputs to determine proper localization strategy for all inputs
2) Begin localizing job inputs. For each job, check the predetermined strategy
and set up the job's setup and teardown scripts
and set up the job's setup, localization, and teardown scripts
3) Finally, finalize the localization. This may include broadcasting the
staging directory or copying a batch of gsutil files
Returns the remote staging directory, which is now ready for final startup
Expand Down
19 changes: 15 additions & 4 deletions canine/localization/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
3 phase task:
1) Pre-scan inputs to determine proper localization strategy for all inputs
2) Begin localizing job inputs. For each job, check the predetermined strategy
and set up the job's setup and teardown scripts
and set up the job's setup, localization, and teardown scripts
3) Finally, finalize the localization. This may include broadcasting the
staging directory or copying a batch of gsutil files
Returns the remote staging directory, which is now ready for final startup
Expand All @@ -101,18 +101,29 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
jobId,
))
self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport)
# Now localize job setup and teardown scripts
setup_script, teardown_script = self.job_setup_teardown(jobId, patterns)
# Setup:

# Now localize job setup, localization, and teardown scripts
setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns)

# Setup:
script_path = self.reserve_path('jobs', jobId, 'setup.sh')
with open(script_path.localpath, 'w') as w:
w.write(setup_script)
os.chmod(script_path.localpath, 0o775)

# Localization:
script_path = self.reserve_path('jobs', jobId, 'localization.sh')
with open(script_path.localpath, 'w') as w:
w.write(localization_script)
os.chmod(script_path.localpath, 0o775)

# Teardown:
script_path = self.reserve_path('jobs', jobId, 'teardown.sh')
with open(script_path.localpath, 'w') as w:
w.write(teardown_script)
os.chmod(script_path.localpath, 0o775)

# copy delocalization script
os.symlink(
os.path.join(
os.path.dirname(__file__),
Expand Down
17 changes: 14 additions & 3 deletions canine/localization/nfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
3 phase task:
1) Pre-scan inputs to determine proper localization strategy for all inputs
2) Begin localizing job inputs. For each job, check the predetermined strategy
and set up the job's setup and teardown scripts
and set up the job's setup, localization, and teardown scripts
3) Finally, finalize the localization. This may include broadcasting the
staging directory or copying a batch of gsutil files
Returns the remote staging directory, which is now ready for final startup
Expand Down Expand Up @@ -133,18 +133,29 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
jobId,
))
self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport)
# Now localize job setup and teardown scripts
setup_script, teardown_script = self.job_setup_teardown(jobId, patterns)

# Now localize job setup, localization, and teardown scripts
setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns)

# Setup:
script_path = self.reserve_path('jobs', jobId, 'setup.sh')
with open(script_path.localpath, 'w') as w:
w.write(setup_script)
os.chmod(script_path.localpath, 0o775)

# Localization:
script_path = self.reserve_path('jobs', jobId, 'localization.sh')
with open(script_path.localpath, 'w') as w:
w.write(localization_script)
os.chmod(script_path.localpath, 0o775)

# Teardown:
script_path = self.reserve_path('jobs', jobId, 'teardown.sh')
with open(script_path.localpath, 'w') as w:
w.write(teardown_script)
os.chmod(script_path.localpath, 0o775)

# copy delocalization script
shutil.copyfile(
os.path.join(
os.path.dirname(__file__),
Expand Down
13 changes: 11 additions & 2 deletions canine/localization/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,22 @@ def localize(self, inputs: typing.Dict[str, typing.Dict[str, str]], patterns: ty
)
)
self.prepare_job_inputs(jobId, data, common_dests, overrides, transport=transport)
# Now localize job setup and teardown scripts
setup_script, teardown_script = self.job_setup_teardown(jobId, patterns)

# Now localize job setup, localization, and teardown scripts
setup_script, localization_script, teardown_script = self.job_setup_teardown(jobId, patterns)

# Setup:
script_path = self.reserve_path('jobs', jobId, 'setup.sh')
with transport.open(script_path.remotepath, 'w') as w:
w.write(setup_script)
transport.chmod(script_path.remotepath, 0o775)

# Localization:
script_path = self.reserve_path('jobs', jobId, 'localization.sh')
with transport.open(script_path.remotepath, 'w') as w:
w.write(localization_script)
transport.chmod(script_path.remotepath, 0o775)

# Teardown:
script_path = self.reserve_path('jobs', jobId, 'teardown.sh')
with transport.open(script_path.remotepath, 'w') as w:
Expand Down
3 changes: 3 additions & 0 deletions canine/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
export CANINE_OUTPUT="{{CANINE_OUTPUT}}"
export CANINE_JOBS="{{CANINE_JOBS}}"
source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/setup.sh
$CANINE_JOBS/$SLURM_ARRAY_TASK_ID/localization.sh
LOCALIZER_JOB_RC=$?
{{pipeline_script}}
CANINE_JOB_RC=$?
[[ $LOCALIZER_JOB_RC != 0 ]] && CANINE_JOB_RC=$LOCALIZER_JOB_RC || CANINE_JOB_RC=$CANINE_JOB_RC
source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/teardown.sh
exit $CANINE_JOB_RC
""".format(version=version)
Expand Down
6 changes: 3 additions & 3 deletions canine/test/test_localizer_batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def test_setup_teardown(self):
'string-incommon': Localization(None, os.urandom(8).hex()), # no localization. Setup teardown exports as string
}

setup_text, teardown_text = localizer.job_setup_teardown(
setup_text, localization_text, teardown_text = localizer.job_setup_teardown(
jobId=str(jid),
patterns=output_patterns
)
Expand Down Expand Up @@ -313,7 +313,7 @@ def test_setup_teardown(self):
src=src,
dest=path
),
setup_text
localization_text
)
elif value.type == 'download':
src = path
Expand All @@ -325,7 +325,7 @@ def test_setup_teardown(self):
src=src,
dest=path
),
setup_text
localization_text
)
if isinstance(path, PathType):
path = path.remotepath
Expand Down
2 changes: 1 addition & 1 deletion canine/test/test_localizer_nfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_localize_file(self):
self.assertTrue(transport.isdir(os.path.join(self.localizer.staging_dir, 'dirc')))
self.assertTrue(transport.isdir(os.path.join(self.localizer.staging_dir, 'dirc', 'test')))

for (ldirpath, ldirnames, lfilenames), (rdirpath, rdirnames, rfilenames) in zip(os.walk(os.path.dirname(__file__)), transport.walk(self.localizer.reserve_path('dirc', 'test').controllerpath)):
for (ldirpath, ldirnames, lfilenames), (rdirpath, rdirnames, rfilenames) in zip(os.walk(os.path.dirname(__file__)), transport.walk(self.localizer.reserve_path('dirc', 'test').remotepath)):
with self.subTest(dirname=ldirpath):
self.assertEqual(os.path.basename(ldirpath), os.path.basename(rdirpath))
self.assertListEqual(sorted(ldirnames), sorted(rdirnames))
Expand Down
3 changes: 3 additions & 0 deletions canine/test/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,11 @@ def test_localization(self):
'export CANINE_OUTPUT="/mnt/nfs/canine/outputs"\n'
'export CANINE_JOBS="/mnt/nfs/canine/jobs"\n'
'source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/setup.sh\n'
'$CANINE_JOBS/$SLURM_ARRAY_TASK_ID/localization.sh\n'
'LOCALIZER_JOB_RC=$?\n'
'/mnt/nfs/canine/script.sh\n'
'CANINE_JOB_RC=$?\n'
'[[ $LOCALIZER_JOB_RC != 0 ]] && CANINE_JOB_RC=$LOCALIZER_JOB_RC || CANINE_JOB_RC=$CANINE_JOB_RC\n'
'source $CANINE_JOBS/$SLURM_ARRAY_TASK_ID/teardown.sh\n'
'exit $CANINE_JOB_RC\n'.format(version=version)
)
Expand Down
2 changes: 2 additions & 0 deletions examples/example_pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ inputs:
backend:
type: Dummy
n_workers: 1
localization:
staging_dir: /mnt/nfs/canine