-
Notifications
You must be signed in to change notification settings - Fork 142
/
dsort.py
111 lines (100 loc) · 3.58 KB
/
dsort.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import json
import logging
import time
from typing import Dict
from aistore.sdk.const import (
HTTP_METHOD_POST,
URL_PATH_DSORT,
HTTP_METHOD_GET,
DEFAULT_DSORT_WAIT_TIMEOUT,
HTTP_METHOD_DELETE,
DSORT_ABORT,
DSORT_UUID,
)
from aistore.sdk.dsort_types import JobInfo
from aistore.sdk.errors import Timeout
from aistore.sdk.utils import validate_file, probing_frequency
class Dsort:
"""
Class for managing jobs for the dSort extension: https://github.com/NVIDIA/aistore/blob/main/docs/cli/dsort.md
"""
def __init__(self, client: "Client", dsort_id: str = ""):
self._client = client
self._dsort_id = dsort_id
@property
def dsort_id(self) -> str:
"""
Return dSort job id
"""
return self._dsort_id
def start(self, spec_file: str) -> str:
"""
Start a dSort job with a provided spec file location
Returns:
dSort job ID
"""
validate_file(spec_file)
with open(spec_file, "r", encoding="utf-8") as file_data:
spec = json.load(file_data)
self._dsort_id = self._client.request(
HTTP_METHOD_POST, path=URL_PATH_DSORT, json=spec
).text
return self._dsort_id
def abort(self):
"""
Abort a dSort job
"""
qparam = {DSORT_UUID: [self._dsort_id]}
self._client.request(
HTTP_METHOD_DELETE, path=f"{URL_PATH_DSORT}/{DSORT_ABORT}", params=qparam
)
def get_job_info(self) -> Dict[str, JobInfo]:
"""
Get info for a dsort job
Returns:
Dictionary of job info for all jobs associated with this dsort
"""
qparam = {DSORT_UUID: [self._dsort_id]}
return self._client.request_deserialize(
HTTP_METHOD_GET,
path=URL_PATH_DSORT,
res_model=Dict[str, JobInfo],
params=qparam,
)
def wait(
self,
timeout: int = DEFAULT_DSORT_WAIT_TIMEOUT,
verbose: bool = True,
):
"""
Wait for a dSort job to finish
Args:
timeout (int, optional): The maximum time to wait for the job, in seconds. Default timeout is 5 minutes.
verbose (bool, optional): Whether to log wait status to standard output
Raises:
requests.RequestException: "There was an ambiguous exception that occurred while handling..."
requests.ConnectionError: Connection error
requests.ConnectionTimeout: Timed out connecting to AIStore
requests.ReadTimeout: Timed out waiting response from AIStore
errors.Timeout: Timeout while waiting for the job to finish
"""
logger = logging.getLogger(f"{__name__}.wait")
logger.disabled = not verbose
passed = 0
sleep_time = probing_frequency(timeout)
while True:
if passed > timeout:
raise Timeout("dsort job to finish")
finished = True
for job_info in self.get_job_info().values():
if job_info.metrics.aborted:
logger.info("DSort job '%s' aborted", self._dsort_id)
return
# Shard creation is the last phase, so check if it's finished
finished = job_info.metrics.shard_creation.finished and finished
if finished:
logger.info("DSort job '%s' finished", self._dsort_id)
return
logger.info("Waiting on dsort job '%s'...", self._dsort_id)
time.sleep(sleep_time)
passed += sleep_time