-
Notifications
You must be signed in to change notification settings - Fork 2
/
AcceleratorLogParser.py
executable file
·401 lines (338 loc) · 16.1 KB
/
AcceleratorLogParser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
#!/usr/bin/env python3
'''
Parse Altair Accelerator logfiles and write job information to a yaml file.
Development notes:
NC uses postgres.
Get a list of tables:
nc cmd vovsql_query -e "select * from pg_catalog.pg_tables where schemaname != 'pg_catalog' and schemaname != 'information_schema'"
public metadata rtdamgr {} t f f f
public loadinfo rtdamgr {} t f f f
public users rtdamgr {} t f t f
public hosts rtdamgr {} t f t f
public jobclasses rtdamgr {} t f t f
public projects rtdamgr {} t f t f
public osgroups rtdamgr {} t f t f
public fsgroups rtdamgr {} t f t f
public tools rtdamgr {} t f t f
public resources rtdamgr {} t f t f
public grabbeds rtdamgr {} t f t f
public statuses rtdamgr {} t f t f
public jobs rtdamgr {} t f t f
pg_temp_2 import_jobs_1916 rtdamgr {} t f f f
pg_temp_2 import_jobs_normalized_1916 rtdamgr {} t f f f
Get list of columns in the table:
nc cmd vovsql_query -e "select column_name, data_type from INFORMATION_SCHEMA.COLUMNS where table_name = 'jobs'"
id integer
loadinfo_id integer
jobclassid integer
projectid integer
fsgroupid integer
userid integer
osgroupid integer
toolid integer
resourcesid integer
grabbedid integer
exehostid integer
statusid integer
spriority smallint
submittime integer
starttime integer
endtime integer
deadline integer
exitstatus smallint
maxram integer
maxvm integer
cputime real
susptime integer
failcode integer
fstokens integer
Get columns we need for analysis:
nc cmd vovsql_query -e "select id, submittime, starttime, endtime, exitstatus, maxram, maxvm, cputime, susptime from jobs"
Get list of columns in the resources table:
nc cmd vovsql_query -e "select column_name, data_type from INFORMATION_SCHEMA.COLUMNS where table_name = 'resources'"
id integer
name {character varying}
Get list of resources
nc cmd vovsql_query -e "select id, name from resources"
Get list of columns in the users table:
nc cmd vovsql_query -e "select column_name, data_type from INFORMATION_SCHEMA.COLUMNS where table_name = 'users'"
id integer
name {character varying}
Get list of users in the users table:
nc cmd vovsql_query -e "select id, name from users"
id integer
name {character varying}
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
'''
import argparse
import logging
from math import ceil
from MemoryUtils import MEM_MB, MEM_GB, mem_string_to_float, mem_string_to_int
from os import makedirs, path, remove
from os.path import dirname
import re
from SchedulerJobInfo import SchedulerJobInfo
from SchedulerLogParser import SchedulerLogParser, logger as SchedulerLogParser_logger
import subprocess # nosec
from subprocess import CalledProcessError, check_output # nosec
from textwrap import dedent
from VersionCheck import logger as VersionCheck_logger, VersionCheck
logger = logging.getLogger(__file__)
logger_formatter = logging.Formatter('%(levelname)s:%(asctime)s: %(message)s')
logger_streamHandler = logging.StreamHandler()
logger_streamHandler.setFormatter(logger_formatter)
logger.addHandler(logger_streamHandler)
logger.setLevel(logging.INFO)
class AcceleratorLogParser(SchedulerLogParser):
'''
Parse Accelerator vovsql_query output to get job completion information.
'''
def __init__(self, default_mem_gb: float=0.0, sql_input_file: str=None, sql_output_file: str=None, output_csv: str=None, starttime: str=None, endtime: str=None):
'''
Constructor
Args:
sql_input_file (str): File with the output of vovsql_query so can process output offline.
sql_output_file (str): File where vovsql_query output will be written. Can be used to process job data on another machine without vovsql_query access.
output_csv (str): CSV file where parsed jobs will be written.
starttime (str): Select jobs after the specified time
endtime (str): Select jobs after the specified time
Raises:
FileNotFoundError: If sql_input_file doesn't exist.
'''
super().__init__(None, output_csv, starttime, endtime)
if default_mem_gb:
self._default_mem_gb = default_mem_gb
logger.debug(f"self._default_mem_gb set to {self._default_mem_gb}")
else:
self._default_mem_gb = AcceleratorLogParser.DEFAULT_MEMORY_GB
self._sql_input_file = sql_input_file
self._sql_output_file = sql_output_file
if sql_input_file and sql_output_file:
raise ValueError(f"Cannot specify sql_input_file and sql_output_file.")
if not(sql_input_file or sql_output_file):
raise ValueError(f"Must specify either sql_input_file or sql_output_file")
if sql_input_file and not path.exists(sql_input_file):
raise FileNotFoundError(f"sql_input_file doesn't exist: {sql_input_file}")
if sql_output_file:
sql_output_dir = dirname(sql_output_file)
if not path.exists(sql_output_dir):
makedirs(sql_output_dir)
self._sql_fh = None
self._line_number = 0
self._next_job_fields = {}
self._eof = False
def parse_jobs(self) -> None:
'''
Parse all the jobs from the Accelerator vovsql_query command.
Returns:
None
'''
job = True
while job:
job = self.parse_job()
if job:
if self._output_csv_fh:
self.write_job_to_csv(job)
logger.debug(f"Parsed {self.num_output_jobs()} jobs.")
DEFAULT_MEMORY_GB = 0.0 # By default, jobs only limited by core count, not memory, if memory request not specified.
_VOVSQL_JOBS_COLUMN_TUPLES = [
('jobs.id', 'd'),
('jobs.submittime', 'd'),
('jobs.starttime', 'd'),
('jobs.endtime', 'd'),
('resources.name', 's'),
('jobs.exitstatus', 'd'),
('jobs.maxram', 'd'),
('jobs.maxvm', 'd'),
('jobs.cputime', 'f'),
('jobs.susptime', 'd'),
]
_VOVSQL_COLUMNS = [field_tuple[0] for field_tuple in _VOVSQL_JOBS_COLUMN_TUPLES]
_VOVSQL_QUERY = f"select {', '.join(_VOVSQL_COLUMNS)} from jobs inner join resources on jobs.resourcesid=resources.id" # nosec
_VOVSQL_QUERY_COMMAND_LIST = [
"nc", "cmd", "vovsql_query", "-e", _VOVSQL_QUERY
]
_VOVSQL_QUERY_COMMAND = f"nc cmd vovsql_query -e \"{_VOVSQL_QUERY}\""
def parse_job(self):
if self._eof:
return None
if not self._sql_fh:
if not self._sql_input_file:
self._call_vovsql_query()
self._sql_fh = open(self._sql_input_file, 'r')
while True:
line = self._sql_fh.readline()
self._line_number += 1
if line == '':
logger.debug(f"Hit EOF at line {self._line_number}")
self._eof = True
self._clean_up()
return None
line = line.lstrip().rstrip()
logger.debug(f"line {self._line_number}: '{line}'")
if re.match(f'^\s*$', line):
logger.debug(" Skipping blank line")
continue
if re.match(f'^\s*#', line):
logger.debug(" Skipping comment line")
continue
raw_fields = line.split(' ')
fields = []
while len(raw_fields):
field = raw_fields.pop(0)
if field.startswith('{'):
while not field.endswith('}'):
field += ' ' + raw_fields.pop(0)
fields.append(field)
logger.debug(f" {len(fields)} fields: {fields}")
job_fields = {}
for field_tuple in self._VOVSQL_JOBS_COLUMN_TUPLES:
field_name = field_tuple[0]
field_format = field_tuple[1]
field_value = fields.pop(0)
logger.debug(f" {field_name}: '{field_value}'")
if field_value:
try:
if field_format == 'd':
field_value = mem_string_to_int(field_value)
elif field_format == 'f':
field_value = mem_string_to_float(field_value)
elif field_format == 's':
pass
else:
raise ValueError(f"Invalid format of {field_format} for field {field_name}")
except ValueError as e:
logger.exception(f"Unable to convert field {field_name} to format {field_format}: {field_value}")
raise
job_fields[field_name] = field_value
job_id = job_fields['jobs.id']
logger.debug(f" job_id: {job_id}")
job = self._create_job_from_job_fields(job_fields)
if self._job_in_time_window(job):
self._num_input_jobs += 1
return job
job = None
def _call_vovsql_query(self):
'''
Call vovsql_query to get job information
'''
if self._sql_input_file:
raise RuntimeError("Cannot call _call_vovsql_query when sql_input_file given for input")
# Create a file handle to write the vovsql_query output to.
sql_write_fh = open(self._sql_output_file, 'w')
logger.debug(f"Calling vovsql_query:\n{' '.join(self._VOVSQL_QUERY_COMMAND_LIST)}")
rc = subprocess.run(self._VOVSQL_QUERY_COMMAND_LIST, stdout=sql_write_fh, stderr=subprocess.STDOUT, encoding='UTF-8').returncode # nosec
sql_write_fh.close()
if rc:
logger.error(f"vovsql_query failed with rc={rc}. See {self._sql_output_file}")
exit(1)
self._sql_input_file = self._sql_output_file
def _create_job_from_job_fields(self, job_fields):
resources = job_fields['resources.name']
# Get number of requested cores from the resources field
# If not found then set the default to 1 core.
matches = re.search(r'CORES/(\d+)', resources)
if matches:
num_cores = int(matches.group(1))
logger.debug(f"num_cores={num_cores}")
else:
num_cores = 1
logger.debug(f"num_cores not found. Default to {num_cores}")
maxram = job_fields['jobs.maxram'] * MEM_MB
logger.debug(f"maxram={maxram}")
# Get the amount of memory requested for the job.
# If not found then use the maximum of the default memory request or 110% of the actual memory used.
matches = re.search(r'RAM/(\d+)', resources)
if matches:
max_mem_gb = float(matches.group(1)) * MEM_MB / MEM_GB
logger.debug(f"max_mem_gb={max_mem_gb}")
else:
# Request max of default memory or 10% more than used
max_mem_gb = max(self._default_mem_gb, round(maxram * 1.10 / MEM_GB, 3))
logger.debug(f"max_mem_gb not found default to {max_mem_gb}")
num_hosts = 1
job = SchedulerJobInfo(
job_id = job_fields['jobs.id'],
resource_request = resources,
num_cores = num_cores,
max_mem_gb = max_mem_gb,
num_hosts = num_hosts,
submit_time = job_fields['jobs.submittime'],
start_time = job_fields['jobs.starttime'],
finish_time = job_fields['jobs.endtime'],
exit_status = job_fields['jobs.exitstatus'],
ru_maxrss = maxram,
ru_utime = job_fields['jobs.cputime'],
)
return job
def _clean_up(self):
'''
Clean up after the last vovsql_query line has been read.
'''
if not self._sql_input_file:
# Delete the tmp file
remove(self._sql_output_file)
def main():
parser = argparse.ArgumentParser(description="Parse Altair Accelerator logs.", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
accelerator_mutex_group = parser.add_mutually_exclusive_group(required=True)
accelerator_mutex_group.add_argument("--show-data-collection-cmd", action='store_const', const=True, default=False, help="Show the command to create the SQL_OUTPUT_FILE.")
accelerator_mutex_group.add_argument("--sql-output-file", help=f"File where the output of sql query will be written. Cannot be used with --sql-input-file. Required if --sql-input-file not set.")
accelerator_mutex_group.add_argument("--sql-input-file", help="File with the output of sql query so can process it offline. Cannot be used with --sql-output-file. Required if --sql-output-file not set.")
parser.add_argument("--output-csv", help="CSV file with parsed job completion records")
parser.add_argument("--default-mem-gb", required=False, default=AcceleratorLogParser.DEFAULT_MEMORY_GB, help="Default amount of memory (in GB) requested for jobs that do not have a memory request.")
parser.add_argument("--starttime", help="Select jobs after the specified time. Format YYYY-MM-DDTHH:MM:SS")
parser.add_argument("--endtime", help="Select jobs before the specified time. Format YYYY-MM-DDTHH:MM:SS")
parser.add_argument("--disable-version-check", action='store_const', const=True, default=False, help="Disable git version check")
parser.add_argument("--debug", '-d', action='store_const', const=True, default=False, help="Enable debug mode")
args = parser.parse_args()
if args.debug:
logger.setLevel(logging.DEBUG)
SchedulerLogParser_logger.setLevel(logging.DEBUG)
VersionCheck_logger.setLevel(logging.DEBUG)
if not args.disable_version_check and not VersionCheck().check_git_version():
exit(1)
if args.show_data_collection_cmd:
print(dedent(f"""\
Run the following command to save the job information to a file:
{AcceleratorLogParser._VOVSQL_QUERY_COMMAND} > SQL_OUTPUT_FILE
Then you can parse OUTPUT_FILE using the following command:
{__file__} --sql-output-file SQL_OUTPUT_FILE --output-csv OUTPUT_CSV
"""))
exit(0)
if not (args.sql_output_file or args.sql_input_file):
logger.error("one of the arguments --sql-output-file --sql-input-file is required")
exit(1)
if not args.output_csv:
logger.error("the following arguments are required: --output-csv")
exit(1)
logger.info('Started Altair Accelerator log parser')
if args.sql_input_file:
if not path.exists(args.sql_input_file):
logger.error(f"sql input file doesn't exist: {args.sql_input_file}")
exit(1)
logger.info(f"Accelerator job data will be read from {args.sql_input_file} instead of calling nc vovsql_query.")
else:
try:
result = subprocess.run(["nc", "-h"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding='UTF-8') # nosec
except FileNotFoundError as e:
logger.error(f"Cannot find nc command. You must set up the Accelerator environment (source vovrc.sh) or specify --sql-input-file.")
exit(1)
except CalledProcessError as e:
logger.error(f"'nc -h' failed. You must set up the Accelerator environment (source vovrc.sh) or specify --sql-input-file.")
print(f"returncode: {e.returncode}")
print(f"output:\n{e.stdout}")
exit(1)
output = result.stdout
if result.returncode != 2 or 'Altair Engineering.' not in output:
logger.error(f"Unexpected result from 'nc -h'\nreturncode: expected 2, actual {result.returncode}\noutput:\n{output}")
if 'Usage: nc' in output:
logger.error(f"'nc -h' called ncat (netcat), not Altair nc.")
logger.info(f"'nc -h' failed. You must set up the Accelerator environment (source vovrc.sh) or specify --sql-input-file.")
exit(1)
logger.info(f"Sql job data will be written to {args.sql_output_file}")
logger.info(f"Parsed job information will be written to {args.output_csv}")
acceleratorLogParser = AcceleratorLogParser(default_mem_gb=float(args.default_mem_gb), sql_input_file=args.sql_input_file, sql_output_file=args.sql_output_file, output_csv=args.output_csv, starttime=args.starttime, endtime=args.endtime)
acceleratorLogParser.parse_jobs()
if __name__ == '__main__':
main()