Skip to content

Commit

Permalink
forward the error message of the failed task to the screen (#388)
Browse files Browse the repository at this point in the history
  • Loading branch information
njzjz committed Oct 21, 2023
1 parent 685bd07 commit 94a0756
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 4 deletions.
5 changes: 4 additions & 1 deletion dpdispatcher/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
test $? -ne 0 && exit 1
if [ ! -f {task_tag_finished} ] ;then
{command_env} ( {command} ) {log_err_part}
if test $? -eq 0; then touch {task_tag_finished}; else echo 1 > $REMOTE_ROOT/{flag_if_job_task_fail};fi
if test $? -eq 0; then touch {task_tag_finished}; else echo 1 > $REMOTE_ROOT/{flag_if_job_task_fail};tail -v -c 1000 $REMOTE_ROOT/{task_work_path}/{err_file} > $REMOTE_ROOT/{last_err_file};fi
fi &
"""

Expand Down Expand Up @@ -298,6 +298,7 @@ def gen_script_command(self, job):
log_err_part += f"2>>{shlex.quote(task.errlog)} "

flag_if_job_task_fail = job.job_hash + "_flag_if_job_task_fail"
last_err_file = job.job_hash + "_last_err_file"
single_script_command = script_command_template.format(
flag_if_job_task_fail=flag_if_job_task_fail,
command_env=command_env,
Expand All @@ -307,6 +308,8 @@ def gen_script_command(self, job):
command=task.command,
task_tag_finished=task_tag_finished,
log_err_part=log_err_part,
err_file=shlex.quote(task.errlog),
last_err_file=shlex.quote(last_err_file),
)
script_command += single_script_command

Expand Down
3 changes: 3 additions & 0 deletions dpdispatcher/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ def gen_script_command(self, job):
log_err_part += f"2>>{shlex.quote(task.errlog)} "

flag_if_job_task_fail = job.job_hash + "_flag_if_job_task_fail"
last_err_file = job.job_hash + "_last_err_file"
single_script_command = script_command_template.format(
flag_if_job_task_fail=flag_if_job_task_fail,
command_env=command_env,
Expand All @@ -278,6 +279,8 @@ def gen_script_command(self, job):
command=task.command,
task_tag_finished=task_tag_finished,
log_err_part=log_err_part,
err_file=shlex.quote(task.errlog),
last_err_file=shlex.quote(last_err_file),
)
if ii % slurm_job_size == 0:
script_command += f"{ii // slurm_job_size})\n"
Expand Down
19 changes: 16 additions & 3 deletions dpdispatcher/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
import uuid
from hashlib import sha1
from typing import Optional

from dargs.dargs import Argument, Variant

Expand Down Expand Up @@ -843,9 +844,11 @@ def handle_unexpected_job_state(self):
if hasattr(self.machine, "retry_count") and self.machine.retry_count > 0:
retry_count = self.machine.retry_count + 1
if (self.fail_count) > 0 and (self.fail_count % retry_count == 0):
raise RuntimeError(
f"job:{self.job_hash} {self.job_id} failed {self.fail_count} times.job_detail:{self}"
)
last_error_message = self.get_last_error_message()
err_msg = f"job:{self.job_hash} {self.job_id} failed {self.fail_count} times. job_detail:{self}"
if last_error_message is not None:
err_msg += f"\nPossible remote error message: {last_error_message}"
raise RuntimeError(err_msg)
self.submit_job()
if self.job_state != JobStatus.unsubmitted:
dlog.info(
Expand Down Expand Up @@ -923,6 +926,16 @@ def job_to_json(self):
self.job_hash + "_job.json", write_str=write_str
)

def get_last_error_message(self) -> Optional[str]:
"""Get last error message when the job is terminated."""
assert self.machine is not None
last_err_file = self.job_hash + "_last_err_file"
if self.machine.context.check_file_exists(last_err_file):
last_error_message = self.machine.context.read_file(last_err_file)
# red color
last_error_message = "\033[31m" + last_error_message + "\033[0m"
return last_error_message


class Resources:
"""Resources is used to describe the machine resources we need to do calculations.
Expand Down
35 changes: 35 additions & 0 deletions tests/test_run_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import random
import shutil
import sys
import traceback

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
__package__ = "tests"
Expand Down Expand Up @@ -98,6 +99,40 @@ def test_run_submission(self):
)
)

def test_failed_submission(self):
machine = Machine.load_from_dict(self.machine_dict)
resources = Resources.load_from_dict(self.resources_dict)

task_list = []
err_msg = "DPDISPATCHER_TEST"
# prevent err_msg directly in commands; we need to check error message
err_msg_shell = "".join([f'"{x}"' for x in err_msg])
for ii in range(1):
task = Task(
command=f'echo "Error!" {err_msg_shell} 1>&2 && exit 1',
task_work_path="./",
forward_files=[],
backward_files=[f"out{ii}.txt"],
outlog=f"out{ii}.txt",
errlog=f"err{ii}.txt",
)
task_list.append(task)

submission = Submission(
work_base="test_dir/",
machine=machine,
resources=resources,
forward_common_files=[],
backward_common_files=[],
task_list=task_list,
)
try:
submission.run_submission(check_interval=2)
except RuntimeError:
# macos shell has some issues
if sys.platform == "linux":
self.assertTrue(err_msg in traceback.format_exc())

def test_async_run_submission(self):
machine = Machine.load_from_dict(self.machine_dict)
resources = Resources.load_from_dict(self.resources_dict)
Expand Down

0 comments on commit 94a0756

Please sign in to comment.