-
Notifications
You must be signed in to change notification settings - Fork 116
/
submission.py
76 lines (67 loc) · 2.74 KB
/
submission.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
#
import argparse
import os
import time
import traceback
from pathlib import Path
from typing import Union
try: # loading numpy before loading the pickle, to avoid unexpected interactions
# pylint: disable=unused-import
import numpy # type: ignore # noqa
except ImportError:
pass
from . import job_environment, utils
from .logger import get_logger
def process_job(folder: Union[Path, str]) -> None:
"""Loads a pickled job, runs it and pickles the output
Parameter
---------
folder: Path/str
path of the folder where the job pickle will be stored (with a name containing its uuid)
Side-effect
-----------
Creates a picked output file next to the job file.
"""
os.environ["SUBMITIT_FOLDER"] = str(folder)
env = job_environment.JobEnvironment()
paths = env.paths
logger = get_logger()
logger.info(f"Starting with {env}")
logger.info(f"Loading pickle: {paths.submitted_pickle}")
wait_time = 60
for _ in range(wait_time):
if paths.submitted_pickle.exists():
break
time.sleep(1)
if not paths.submitted_pickle.exists():
raise RuntimeError(
f"Waited for {wait_time} seconds but could not find submitted jobs in path:\n{paths.submitted_pickle}"
)
try:
delayed = utils.DelayedSubmission.load(paths.submitted_pickle)
env = job_environment.JobEnvironment()
env._handle_signals(paths, delayed)
result = delayed.result()
logger.info("Job completed successfully")
del delayed # if it blocks here, you have a race condition that must be solved!
with utils.temporary_save_path(paths.result_pickle) as tmppath: # save somewhere else, and move
utils.cloudpickle_dump(("success", result), tmppath)
del result
logger.info("Exitting after successful completion")
except Exception as error: # TODO: check pickle methods for capturing traceback; pickling and raising
try:
with utils.temporary_save_path(paths.result_pickle) as tmppath:
utils.cloudpickle_dump(("error", traceback.format_exc()), tmppath)
except Exception as dumperror:
logger.error(f"Could not dump error:\n{error}\n\nbecause of {dumperror}")
logger.error("Submitted job triggered an exception")
raise error
def submitit_main() -> None:
parser = argparse.ArgumentParser(description="Run a job")
parser.add_argument("folder", type=str, help="Folder where the jobs are stored (in subfolder)")
args = parser.parse_args()
process_job(args.folder)