Skip to content

Commit

Permalink
Quote file and foldernames for space handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
TorecLuik committed May 8, 2024
1 parent 880771e commit b05ed17
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 76 deletions.
76 changes: 29 additions & 47 deletions biomero/slurm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,23 +235,23 @@ class SlurmClient(Connection):
_DEFAULT_SLURM_CONVERTERS_PATH = "my-scratch/singularity_images/converters"
_DEFAULT_SLURM_GIT_SCRIPT_PATH = "slurm-scripts"
_OUT_SEP = "--split--"
_VERSION_CMD = "ls -h {slurm_images_path}/{image_path} | grep -oP '(?<=\-|\_)(v.+|latest)(?=.simg|.sif)'"
_VERSION_CMD = "ls -h '{slurm_images_path}/{image_path}' | grep -oP '(?<=\-|\_)(v.+|latest)(?=.simg|.sif)'"
# Note, grep returns exitcode 1 if no match is found!
# This will translate into a UnexpectedExit error, so mute that if you
# don't care about empty.
# Like below with the "|| :".
# Data could legit be empty.
_DATA_CMD = "ls -h {slurm_data_path} | grep -oP '.+(?=.zip)' || :"
_DATA_CMD = "ls -h '{slurm_data_path}' | grep -oP '.+(?=.zip)' || :"
_ALL_JOBS_CMD = "sacct --starttime {start_time} --endtime {end_time} --state {states} -o {columns} -n -X "
_ZIP_CMD = "7z a -y {filename} -tzip {data_location}/data/out"
_ZIP_CMD = "7z a -y '{filename}' -tzip '{data_location}/data/out'"
_ACTIVE_JOBS_CMD = "squeue -u $USER --nohead --format %F"
_JOB_STATUS_CMD = "sacct -n -o JobId,State,End -X -j {slurm_job_id}"
# TODO move all commands to a similar format.
# Then maybe allow overwrite from slurm-config.ini
_LOGFILE = "omero-{slurm_job_id}.log"
_CONVERTER_LOGFILE = "slurm-{slurm_job_id}_*.out"
_TAIL_LOG_CMD = "tail -n {n} {log_file} | strings"
_LOGFILE_DATA_CMD = "cat {log_file} | perl -wne '/Running [\w-]+? Job w\/ .+? \| .+? \| (.+?) \|.*/i and print$1'"
_CONVERTER_LOGFILE = "'slurm-{slurm_job_id}'_*.out"
_TAIL_LOG_CMD = "tail -n {n} '{log_file}' | strings"
_LOGFILE_DATA_CMD = "cat '{log_file}' | perl -wne '/Running [\w-]+? Job w\/ .+? \| .+? \| (.+?) \|.*/i and print$1'"

def __init__(self,
host=_DEFAULT_HOST,
Expand Down Expand Up @@ -435,7 +435,7 @@ def setup_container_images(self):
if self.slurm_model_paths:
modelpaths = " ".join(self.slurm_model_paths.values())
# mkdir cellprofiler imagej ...
r = self.run_commands([f"mkdir -p {modelpaths}"])
r = self.run_commands([f"mkdir -p '{modelpaths}'"])
if not r.ok:
raise SSHException(r)

Expand Down Expand Up @@ -494,7 +494,7 @@ def setup_converters(self):
"""
convert_cmds = []
if self.slurm_converters_path:
convert_cmds.append(f"mkdir -p {self.slurm_converters_path}")
convert_cmds.append(f"mkdir -p '{self.slurm_converters_path}'")
r = self.run_commands(convert_cmds)
# copy generic job array script over to slurm
convert_job_local = files("resources").joinpath(
Expand Down Expand Up @@ -526,8 +526,8 @@ def setup_converters(self):
# EDIT -- NO, then we can't update! Force rebuild!
# download /build new container
convert_cmds.append(
f"singularity build -F {convert_name}.sif {convert_def} >> sing.log 2>&1 ; echo 'finished {convert_name}.sif' &")
r = self.run_commands(convert_cmds)
f"singularity build -F '{convert_name}.sif' {convert_def} >> sing.log 2>&1 ; echo 'finished {convert_name}.sif' &")
_ = self.run_commands(convert_cmds)

def setup_job_scripts(self):
"""
Expand Down Expand Up @@ -569,13 +569,13 @@ def setup_directories(self):
dir_cmds = []
# a. data
if self.slurm_data_path:
dir_cmds.append(f"mkdir -p {self.slurm_data_path}")
dir_cmds.append(f"mkdir -p '{self.slurm_data_path}'")
# b. scripts
if self.slurm_script_path:
dir_cmds.append(f"mkdir -p {self.slurm_script_path}")
dir_cmds.append(f"mkdir -p '{self.slurm_script_path}'")
# c. workflows
if self.slurm_images_path:
dir_cmds.append(f"mkdir -p {self.slurm_images_path}")
dir_cmds.append(f"mkdir -p '{self.slurm_images_path}'")
r = self.run_commands(dir_cmds)
if not r.ok:
raise SSHException(r)
Expand Down Expand Up @@ -703,13 +703,13 @@ def cleanup_tmp_files(self,
cmds = []
# zip
if filename:
rmzip = f"rm {filename}.*"
rmzip = f"rm '{filename}'.*"
cmds.append(rmzip)
# log
if logfile is None:
logfile = self._LOGFILE
logfile = logfile.format(slurm_job_id=slurm_job_id)
rmlog = f"rm {logfile}"
rmlog = f"rm '{logfile}'"
cmds.append(rmlog)
# converter logs
clog = self._CONVERTER_LOGFILE
Expand All @@ -722,12 +722,12 @@ def cleanup_tmp_files(self,
data_location = self.extract_data_location_from_log(logfile)

if data_location:
rmdata = f"rm -rf {data_location} {data_location}.*"
rmdata = f"rm -rf '{data_location}' '{data_location}'.*"
cmds.append(rmdata)

# convert config file
config_file = f"config_{os.path.basename(data_location)}.txt"
rmconfig = f"rm {config_file}"
rmconfig = f"rm '{config_file}'"
cmds.append(rmconfig)
else:
logger.warning(f"Could not extract data location from log {logfile}. Skipping cleanup.")
Expand Down Expand Up @@ -1049,24 +1049,6 @@ def unpack_data(self, zipfile: str,
logger.info(f"Unpacking {zipfile} on Slurm")
return self.run_commands([cmd], env=env)

def convert_data(self, zipfile: str,
env: Optional[Dict[str, str]] = None) -> Result:
"""
Unpacks a zipped file on the remote Slurm cluster.
Args:
zipfile (str): The name of the zipped file to be unpacked.
env (Dict[str, str], optional): Optional environment variables
to set when running the command. Defaults to None.
Returns:
Result: The result of the command.
"""
cmd = self.get_convert_command(zipfile)
logger.info(f"Converting {zipfile} on Slurm")
return self.run_commands([cmd], env=env)

def generate_slurm_job_for_workflow(self,
workflow: str,
substitutes: Dict[str, str],
Expand Down Expand Up @@ -1149,7 +1131,7 @@ def update_slurm_scripts(self,
# ensure all dirs exist remotely
full_path = self.slurm_script_path+"/"+job_path
job_dir, _ = os.path.split(full_path)
self.run(f"mkdir -p {job_dir}")
self.run(f"mkdir -p '{job_dir}'")
# copy to remote file
result = self.put(local=io.StringIO(job_script),
remote=full_path)
Expand Down Expand Up @@ -1254,7 +1236,7 @@ def run_conversion_workflow_job(self, folder_name: str,
conversion_cmd, sbatch_env = self.get_conversion_command(
data_path, config_file, source_format, target_format)
commands = [
f"find {data_path}/data/in -name '*.{source_format}' | awk '{{print NR, $0}}' > {config_file}",
f"find '{data_path}/data/in' -name '*.{source_format}' | awk '{{print NR, $0}}' > {config_file}",
f"N=$(wc -l < \"{config_file}\")",
f"echo \"Number of .{source_format} files: $N\"",
conversion_cmd
Expand Down Expand Up @@ -1290,7 +1272,7 @@ def get_update_slurm_scripts_command(self) -> str:
A string containing the Git command
to update the Slurm scripts.
"""
update_cmd = f"git -C {self.slurm_script_path} pull"
update_cmd = f"git -C '{self.slurm_script_path}' pull"
return update_cmd

def check_job_status(self,
Expand Down Expand Up @@ -1629,7 +1611,7 @@ def get_workflow_command(self,
job_params.append(email_param)
job_param = "".join(job_params)
sbatch_cmd = f"sbatch{job_param} --output=omero-%j.log \
{self.slurm_script_path}/{job_script}"
'{self.slurm_script_path}/{job_script}'"

return sbatch_cmd, env

Expand Down Expand Up @@ -1673,7 +1655,7 @@ def get_conversion_command(self, data_path: str,
"CONFIG_FILE": f"{config_file}"
}

conversion_cmd = "sbatch --job-name=conversion --export=ALL,CONFIG_PATH=\"$PWD/$CONFIG_FILE\" --array=1-$N $SCRIPT_PATH/convert_job_array.sh"
conversion_cmd = "sbatch --job-name=conversion --export=ALL,CONFIG_PATH=\"$PWD/$CONFIG_FILE\" --array=1-$N '$SCRIPT_PATH/convert_job_array.sh'"
# conversion_cmd_waiting = "sbatch --job-name=conversion --export=ALL,CONFIG_PATH=\"$PWD/$CONFIG_FILE\" --array=1-$N --wait $SCRIPT_PATH/convert_job_array.sh"

return conversion_cmd, sbatch_env
Expand Down Expand Up @@ -1861,13 +1843,13 @@ def get_unzip_command(self, zipfile: str,
The command to extract the specified
filetypes from the zip file.
"""
unzip_cmd = f"mkdir {self.slurm_data_path}/{zipfile} \
{self.slurm_data_path}/{zipfile}/data \
{self.slurm_data_path}/{zipfile}/data/in \
{self.slurm_data_path}/{zipfile}/data/out \
{self.slurm_data_path}/{zipfile}/data/gt; \
7z x -y -o{self.slurm_data_path}/{zipfile}/data/in \
{self.slurm_data_path}/{zipfile}.zip {filter_filetypes}"
unzip_cmd = f"mkdir '{self.slurm_data_path}/{zipfile}' \
'{self.slurm_data_path}/{zipfile}/data' \
'{self.slurm_data_path}/{zipfile}/data/in' \
'{self.slurm_data_path}/{zipfile}/data/out' \
'{self.slurm_data_path}/{zipfile}/data/gt'; \
7z x -y -o'{self.slurm_data_path}/{zipfile}/data/in' \
'{self.slurm_data_path}/{zipfile}.zip' {filter_filetypes}"

return unzip_cmd

Expand Down
58 changes: 29 additions & 29 deletions tests/unit/test_slurm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ def test_get_unzip_command(slurm_client):
zipfile = "example"
filter_filetypes = "*.zarr *.tiff *.tif"
expected_command = (
f"mkdir {slurm_data_path}/{zipfile} \
{slurm_data_path}/{zipfile}/data \
{slurm_data_path}/{zipfile}/data/in \
{slurm_data_path}/{zipfile}/data/out \
{slurm_data_path}/{zipfile}/data/gt; \
7z x -y -o{slurm_data_path}/{zipfile}/data/in \
{slurm_data_path}/{zipfile}.zip {filter_filetypes}"
f"mkdir '{slurm_data_path}/{zipfile}' \
'{slurm_data_path}/{zipfile}/data' \
'{slurm_data_path}/{zipfile}/data/in' \
'{slurm_data_path}/{zipfile}/data/out' \
'{slurm_data_path}/{zipfile}/data/gt'; \
7z x -y -o'{slurm_data_path}/{zipfile}/data/in' \
'{slurm_data_path}/{zipfile}.zip' {filter_filetypes}"
)

# WHEN
Expand Down Expand Up @@ -149,7 +149,7 @@ def test_zip_data_on_slurm_server(mock_run_commands, mock_logger, slurm_client):

# THEN
mock_run_commands.assert_called_once_with(
[f"7z a -y {filename} -tzip {data_location}/data/out"], env=None)
[f"7z a -y '{filename}' -tzip '{data_location}/data/out'"], env=None)
assert result.ok is True
assert result.stdout == ""
mock_logger.info.assert_called_with(
Expand Down Expand Up @@ -195,7 +195,7 @@ def test_get_workflow_command(slurm_client,
slurm_client.slurm_script_path = "/path/to/slurm_script"

expected_sbatch_cmd = f"sbatch --param3=value3 --param4=value4 --time={time_limit} --mail-user={email} --output=omero-%j.log \
{slurm_client.slurm_script_path}/job_script.sh"
'{slurm_client.slurm_script_path}/job_script.sh'"
expected_env = {
"DATA_PATH": "/path/to/slurm_data/input_data_folder",
"IMAGE_PATH": "/path/to/slurm_images/workflow_path",
Expand Down Expand Up @@ -229,7 +229,7 @@ def test_run_conversion_workflow_job(mock_result, mock_run_commands, slurm_clien
expected_config_file = f"config_{folder_name}.txt"
expected_data_path = f"{slurm_client.slurm_data_path}/{folder_name}"
expected_conversion_cmd, expected_sbatch_env = (
"sbatch --job-name=conversion --export=ALL,CONFIG_PATH=\"$PWD/$CONFIG_FILE\" --array=1-$N $SCRIPT_PATH/convert_job_array.sh",
"sbatch --job-name=conversion --export=ALL,CONFIG_PATH=\"$PWD/$CONFIG_FILE\" --array=1-$N '$SCRIPT_PATH/convert_job_array.sh'",
{
"DATA_PATH": f"{expected_data_path}",
"CONVERSION_PATH": f"{slurm_client.slurm_converters_path}",
Expand All @@ -239,7 +239,7 @@ def test_run_conversion_workflow_job(mock_result, mock_run_commands, slurm_clien
}
)
expected_commands = [
f"find {expected_data_path}/data/in -name '*.{source_format}' | awk '{{print NR, $0}}' > {expected_config_file}",
f"find '{expected_data_path}/data/in' -name '*.{source_format}' | awk '{{print NR, $0}}' > {expected_config_file}",
f"N=$(wc -l < \"{expected_config_file}\")",
f"echo \"Number of .{source_format} files: $N\"",
expected_conversion_cmd
Expand Down Expand Up @@ -466,7 +466,7 @@ def test_extract_data_location_from_log_exc(mock_run_commands,

# THEN
mock_run_commands.assert_called_with(
[f"cat {logfile} | perl -wne '/Running [\\w-]+? Job w\\/ .+? \\| .+? \\| (.+?) \\|.*/i and print$1'"])
[f"cat '{logfile}' | perl -wne '/Running [\\w-]+? Job w\\/ .+? \\| .+? \\| (.+?) \\|.*/i and print$1'"])


@patch('biomero.slurm_client.SlurmClient.run_commands')
Expand All @@ -485,7 +485,7 @@ def test_extract_data_location_from_log_2(mock_run_commands,
# THEN
assert data_location == expected_data_location
mock_run_commands.assert_called_with(
[f"cat omero-{slurm_job_id}.log | perl -wne '/Running [\\w-]+? Job w\\/ .+? \\| .+? \\| (.+?) \\|.*/i and print$1'"])
[f"cat 'omero-{slurm_job_id}.log' | perl -wne '/Running [\\w-]+? Job w\\/ .+? \\| .+? \\| (.+?) \\|.*/i and print$1'"])


@patch('biomero.slurm_client.SlurmClient.run_commands')
Expand All @@ -505,7 +505,7 @@ def test_extract_data_location_from_log(mock_run_commands,
# THEN
assert data_location == expected_data_location
mock_run_commands.assert_called_with(
[f"cat {logfile} | perl -wne '/Running [\\w-]+? Job w\\/ .+? \\| .+? \\| (.+?) \\|.*/i and print$1'"])
[f"cat '{logfile}' | perl -wne '/Running [\\w-]+? Job w\\/ .+? \\| .+? \\| (.+?) \\|.*/i and print$1'"])


def test_get_job_status_command(slurm_client):
Expand Down Expand Up @@ -694,7 +694,7 @@ def test_update_slurm_scripts(mock_generate_job, mock_workflow_params_to_subs,
"workflow_name", {'PARAMS': '--param1 $PARAM1_NAME'})

# Assert that the remote directories are created
mock_run.assert_called_with("mkdir -p scriptpath")
mock_run.assert_called_with("mkdir -p 'scriptpath'")

# Assert that the job script is copied to the remote location
mock_put.assert_called_once_with(
Expand Down Expand Up @@ -853,11 +853,11 @@ def test_cleanup_tmp_files_loc(mock_extract_data_location, mock_run_commands,
# THEN
mock_extract_data_location.assert_not_called()
mock_run_commands.assert_called_once_with([
f"rm {filename}.*",
f"rm {logfile}",
f"rm slurm-{slurm_job_id}_*.out",
f"rm -rf {data_location} {data_location}.*",
f"rm config_path.txt"
f"rm '{filename}'.*",
f"rm '{logfile}'",
f"rm 'slurm-{slurm_job_id}'_*.out",
f"rm -rf '{data_location}' '{data_location}'.*",
f"rm 'config_path.txt'"
], sep=' ; ')

assert result.ok is True
Expand Down Expand Up @@ -887,11 +887,11 @@ def test_cleanup_tmp_files(mock_extract_data_location, mock_run_commands,
# THEN
mock_extract_data_location.assert_called_once_with(logfile)
mock_run_commands.assert_called_once_with([
f"rm {filename}.*",
f"rm {logfile}",
f"rm slurm-{slurm_job_id}_*.out",
f"rm -rf {found_location} {found_location}.*",
f"rm config_path.txt"
f"rm '{filename}'.*",
f"rm '{logfile}'",
f"rm 'slurm-{slurm_job_id}'_*.out",
f"rm -rf '{found_location}' '{found_location}'.*",
f"rm 'config_path.txt'"
], sep=' ; ')

assert result.ok is True
Expand Down Expand Up @@ -1003,7 +1003,7 @@ def test_setup_slurm_notok(mock_run, mock_validate):
# 1 create dirs
mock_run.assert_called()
mock_run.assert_any_call(
f'mkdir -p {dpath} && mkdir -p {spath} && mkdir -p {ipath}', env={})
f"mkdir -p '{dpath}' && mkdir -p '{spath}' && mkdir -p '{ipath}'", env={})


@patch('biomero.slurm_client.io.StringIO')
Expand Down Expand Up @@ -1053,7 +1053,7 @@ def test_setup_slurm(_mock_CachedSession,
mock_run.assert_called()
# 1 create dirs
mock_run.assert_any_call(
[f'mkdir -p {dpath}', f'mkdir -p {spath}', f'mkdir -p {ipath}'])
[f"mkdir -p '{dpath}'", f"mkdir -p '{spath}'", f"mkdir -p '{ipath}'"])
# 2 git
mock_run.assert_any_call(
['rm -rf "$LOCALREPO"', 'git clone "$REPOSRC" "$LOCALREPO" 2> /dev/null'],
Expand All @@ -1063,10 +1063,10 @@ def test_setup_slurm(_mock_CachedSession,
_mock_Connection_put.assert_called()
# mock_run.assert_any_call(f"mkdir -p {cpath}")
mock_run.assert_any_call(
[f"singularity build -F {convert_name}.sif {convert_def} >> sing.log 2>&1 ; echo 'finished {convert_name}.sif' &"])
[f"singularity build -F '{convert_name}.sif' {convert_def} >> sing.log 2>&1 ; echo 'finished {convert_name}.sif' &"])

# 4 images
mock_run.assert_any_call([f"mkdir -p {modelpaths}"])
mock_run.assert_any_call([f"mkdir -p '{modelpaths}'"])
_mock_Connection_put.assert_called_with(
local=mock_stringio(), remote=f'{ipath}/{script_name}')
mock_run.assert_any_call([f"time sh {script_name}"])
Expand Down

0 comments on commit b05ed17

Please sign in to comment.