-
Notifications
You must be signed in to change notification settings - Fork 21
/
lsf_status.py
executable file
·237 lines (203 loc) · 7.33 KB
/
lsf_status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/usr/bin/env python3
import shlex
import sys
import time
from pathlib import Path
from subprocess import CalledProcessError
from typing import List
if not __name__.startswith("tests.src."):
sys.path.append(str(Path(__file__).parent.absolute()))
from OSLayer import OSLayer, TailError
from CookieCutter import CookieCutter
else:
from .OSLayer import OSLayer, TailError
from .CookieCutter import CookieCutter
class BjobsError(Exception):
pass
class UnknownStatusLine(Exception):
pass
UNKNOWN = "UNKWN"
ZOMBIE = "ZOMBI"
class StatusChecker:
SUCCESS = "success"
RUNNING = "running"
FAILED = "failed"
STATUS_TABLE = {
"PEND": RUNNING,
"RUN": RUNNING,
"DONE": SUCCESS,
"PSUSP": RUNNING,
"USUSP": RUNNING,
"SSUSP": RUNNING,
"WAIT": RUNNING,
"EXIT": FAILED,
"POST_DONE": SUCCESS,
"POST_ERR": FAILED,
}
def __init__(
self,
jobid: int,
outlog: str,
wait_between_tries: float = 0.001,
max_status_checks: int = 1,
kill_unknown: bool = False,
kill_zombie: bool = False,
):
self._jobid = jobid
self._outlog = outlog
self.wait_between_tries = wait_between_tries
self.max_status_checks = max_status_checks
self.kill_unknown = kill_unknown
self.kill_zombie = kill_zombie
@property
def jobid(self) -> int:
return self._jobid
@property
def outlog(self) -> str:
return self._outlog
@property
def bjobs_query_cmd(self) -> str:
return "bjobs -o 'stat' -noheader {jobid}".format(jobid=self.jobid)
def _handle_unknown_job(self) -> str:
if self.kill_unknown:
print(
"[lsf profile warning] {unknown} job status detected for {jobid}. "
"Killing job...".format(unknown=UNKNOWN, jobid=self.jobid),
file=sys.stderr,
)
self._kill_job()
# we return running regardless so that the zombie job gets cleaned up
return self.RUNNING
def _handle_zombie_job(self) -> str:
if self.kill_zombie:
print(
"[lsf profile warning] {zombie} job status detected for {jobid}. "
"Killing job...".format(zombie=ZOMBIE, jobid=self.jobid),
file=sys.stderr,
)
self._kill_job()
# zombie jobs are always considered failed as they don't recover
return self.FAILED
def _query_status_using_bjobs(self) -> str:
output_stream, error_stream = OSLayer.run_process(self.bjobs_query_cmd)
stdout_is_empty = not output_stream.strip()
if stdout_is_empty:
raise BjobsError(
"bjobs error.\nstdout is empty.\nstderr = {stderr}".format(
stderr=error_stream
)
)
if output_stream == UNKNOWN:
return self._handle_unknown_job()
if output_stream == ZOMBIE:
return self._handle_zombie_job()
return self.STATUS_TABLE[output_stream]
def _get_tail_of_log_file(self) -> List[str]:
# 30 lines gives us the whole LSF completion summary
tail = OSLayer.tail(self.outlog, num_lines=30)
return [line.decode().strip() for line in tail]
def _kill_job(self):
kill_cmd = "bkill -r {}".format(self.jobid)
_ = OSLayer.run_process(kill_cmd)
def _query_status_using_log(self) -> str:
try:
log_tail = self._get_tail_of_log_file()
except FileNotFoundError:
print("Log file {} not found".format(self.outlog), file=sys.stderr)
return self.FAILED
except TailError as error:
print("TailError: {}".format(error), file=sys.stderr)
return self.FAILED
try:
resource_summary_usage_line_index = log_tail.index(
"Resource usage summary:"
)
except ValueError: # resource usage line not in tail
return self.RUNNING
status_line = log_tail[resource_summary_usage_line_index - 2]
if status_line == "Successfully completed.":
return self.SUCCESS
elif status_line.startswith("Exited with exit code"):
return self.FAILED
else:
raise UnknownStatusLine(status_line)
def get_status(self) -> str:
status = None
for _ in range(self.max_status_checks):
try:
status = self._query_status_using_bjobs()
break # succeeded in getting the status
except BjobsError as error:
print(
"[Predicted exception] BjobsError: {error}".format(error=error),
file=sys.stderr,
)
print("Resuming...", file=sys.stderr)
time.sleep(self.wait_between_tries)
except KeyError as error:
print(
"[Predicted exception] Unknown job status: {error}".format(
error=error
),
file=sys.stderr,
)
print("Resuming...", file=sys.stderr)
time.sleep(self.wait_between_tries)
except CalledProcessError as error:
print(
"[Predicted exception] Error calling bjobs: {error}".format(
error=error
),
file=sys.stderr,
)
print("Resuming...", file=sys.stderr)
time.sleep(self.wait_between_tries)
bjobs_failed = status is None
if bjobs_failed:
print(
"bjobs failed {try_times} times. Checking log...".format(
try_times=self.max_status_checks
),
file=sys.stderr,
)
try:
status = self._query_status_using_log()
except UnknownStatusLine as error:
print("UnknownStatusLine: {}".format(error), file=sys.stderr)
status = self.FAILED
return status
if __name__ == "__main__":
# need to support quoted and unquoted jobid
# see https://github.com/Snakemake-Profiles/lsf/issues/45
split_args = shlex.split(" ".join(sys.argv[1:]))
jobid = int(split_args[0])
outlog = split_args[1]
if CookieCutter.get_unknwn_behaviour().lower() == "wait":
kill_unknown = False
elif CookieCutter.get_unknwn_behaviour().lower() == "kill":
kill_unknown = True
else:
raise ValueError(
"Unknown value for {}_behaviour: {}".format(
UNKNOWN, CookieCutter.get_unknwn_behaviour()
)
)
if CookieCutter.get_zombi_behaviour().lower() == "ignore":
kill_zombie = False
elif CookieCutter.get_zombi_behaviour().lower() == "kill":
kill_zombie = True
else:
raise ValueError(
"Unknown value for {}_behaviour: {}".format(
ZOMBIE, CookieCutter.get_zombi_behaviour()
)
)
lsf_status_checker = StatusChecker(
jobid,
outlog,
kill_unknown=kill_unknown,
kill_zombie=kill_zombie,
wait_between_tries=CookieCutter.get_wait_between_tries(),
max_status_checks=CookieCutter.get_max_status_checks(),
)
print(lsf_status_checker.get_status())