Skip to content

Commit

Permalink
save record of failed submission (#411)
Browse files Browse the repository at this point in the history
Signed-off-by: Jinzhe Zeng <jinzhe.zeng@rutgers.edu>

---------

Signed-off-by: Jinzhe Zeng <jinzhe.zeng@rutgers.edu>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
njzjz and pre-commit-ci[bot] committed Nov 13, 2023
1 parent f5596e0 commit 77bd939
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 17 deletions.
29 changes: 12 additions & 17 deletions dpdispatcher/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from dpdispatcher.dlog import dlog
from dpdispatcher.machine import Machine
from dpdispatcher.utils.job_status import JobStatus
from dpdispatcher.utils.record import record

# from dpdispatcher.slurm import SlurmResources
# %%
default_strategy = dict(if_cuda_multi_devices=False, ratio_unfinished=0.0)

Expand Down Expand Up @@ -249,9 +249,11 @@ def run_submission(
time.sleep(check_interval)
except (Exception, KeyboardInterrupt, SystemExit) as e:
self.submission_to_json()
record_path = record.write(self)
dlog.exception(e)
dlog.info(f"submission exit: {self.submission_hash}")
dlog.info(f"at {self.machine.context.remote_root}")
dlog.info(f"Submission information is saved in {str(record_path)}.")
dlog.debug(self.serialize())
raise e
else:
Expand Down Expand Up @@ -344,7 +346,6 @@ def update_submission_state(self):
dlog.debug(
f"debug:update_submission_state: job: {job.job_hash}, {job.job_id}, {job.job_state}"
)
# self.submission_to_json()

def handle_unexpected_submission_state(self):
"""Handle unexpected job state of the submission.
Expand All @@ -357,25 +358,15 @@ def handle_unexpected_submission_state(self):
job.handle_unexpected_job_state()
except Exception as e:
self.submission_to_json()
record_path = record.write(self)
raise RuntimeError(
f"Meet errors will handle unexpected submission state.\n"
f"Debug information: remote_root=={self.machine.context.remote_root}.\n"
f"Debug information: submission_hash=={self.submission_hash}.\n"
f"Please check the dirs and scripts in remote_root. "
f"The job information mentioned above may help."
f"Please check error messages above and in remote_root. "
f"The submission information is saved in {str(record_path)}."
) from e

# not used here, submitting job is in handle_unexpected_submission_state.

# def submit_submission(self):
# """submit the job belonging to the submission.
# """
# for job in self.belonging_jobs:
# job.submit_job()
# self.get_submission_state()

# def update_submi

def check_ratio_unfinished(self, ratio_unfinished: float) -> bool:
"""Calculate the ratio of unfinished tasks in the submission.
Expand Down Expand Up @@ -510,6 +501,8 @@ def download_jobs(self):

def clean_jobs(self):
self.machine.context.clean()
assert self.submission_hash is not None
record.remove(self.submission_hash)

def submission_to_json(self):
# self.update_submission_state()
Expand Down Expand Up @@ -851,7 +844,7 @@ def handle_unexpected_job_state(self):
if job_state == JobStatus.terminated:
self.fail_count += 1
dlog.info(
f"job: {self.job_hash} {self.job_id} terminated;"
f"job: {self.job_hash} {self.job_id} terminated; "
f"fail_cout is {self.fail_count}; resubmitting job"
)
retry_count = 3
Expand All @@ -860,7 +853,9 @@ def handle_unexpected_job_state(self):
retry_count = self.machine.retry_count + 1
if (self.fail_count) > 0 and (self.fail_count % retry_count == 0):
last_error_message = self.get_last_error_message()
err_msg = f"job:{self.job_hash} {self.job_id} failed {self.fail_count} times. job_detail:{self}"
err_msg = (
f"job:{self.job_hash} {self.job_id} failed {self.fail_count} times."
)
if last_error_message is not None:
err_msg += f"\nPossible remote error message: {last_error_message}"
raise RuntimeError(err_msg)
Expand Down
79 changes: 79 additions & 0 deletions dpdispatcher/utils/record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import json
from pathlib import Path
from typing import List


class Record:
"""Record failed or canceled submissions."""

def __init__(self) -> None:
self.record_directory = Path.home() / ".dpdispatcher" / "submission"
self.record_directory.mkdir(parents=True, exist_ok=True)

def get_submissions(self) -> List[str]:
"""Get all stored submission hashes.
Returns
-------
list[str]
List of submission hashes.
"""
return [
f.stem
for f in self.record_directory.iterdir()
if (f.is_file() and f.suffix == ".json")
]

def write(self, submission) -> Path:
"""Write submission data to file.
Parameters
----------
submission : dpdispatcher.Submission
Submission data.
Returns
-------
pathlib.Path
Path to submission data.
"""
submission_path = self.record_directory / f"{submission.submission_hash}.json"
submission_path.write_text(json.dumps(submission.serialize(), indent=2))
return submission_path

def get_submission(self, hash: str, not_exist_ok: bool = False) -> Path:
"""Get submission data by hash.
Parameters
----------
hash : str
Hash of submission data.
Returns
-------
pathlib.Path
Path to submission data.
"""
submission_file = self.record_directory / f"{hash}.json"
if not not_exist_ok and not submission_file.is_file():
raise FileNotFoundError(f"Submission file not found: {submission_file}")
return submission_file

def remove(self, hash: str):
"""Remove submission data by hash.
Call this method when the remote directory is cleaned.
Parameters
----------
hash : str
Hash of submission data.
"""
path = self.get_submission(hash, not_exist_ok=True)
if path.is_file():
path.unlink()


# the record object can be globally used
record = Record()
__all__ = ["record"]
1 change: 1 addition & 0 deletions tests/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dpdispatcher.submission import Job, Resources, Submission, Task # noqa: F401
from dpdispatcher.utils.hdfs_cli import HDFS # noqa: F401
from dpdispatcher.utils.job_status import JobStatus # noqa: F401
from dpdispatcher.utils.record import record # noqa: F401
from dpdispatcher.utils.utils import RetrySignal, retry # noqa: F401


Expand Down
2 changes: 2 additions & 0 deletions tests/test_run_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Resources,
Submission,
Task,
record,
setUpModule, # noqa: F401
)

Expand Down Expand Up @@ -132,6 +133,7 @@ def test_failed_submission(self):
# macos shell has some issues
if sys.platform == "linux":
self.assertTrue(err_msg in traceback.format_exc())
self.assertTrue(record.get_submission(submission.submission_hash).is_file())

def test_async_run_submission(self):
machine = Machine.load_from_dict(self.machine_dict)
Expand Down

0 comments on commit 77bd939

Please sign in to comment.