/
compute_tracker.py
393 lines (320 loc) · 12.2 KB
/
compute_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
import atexit
import logging
import multiprocessing
import os
import pickle
import subprocess
import sys
import time
import traceback
from datetime import datetime
from pathlib import Path
from queue import Empty as EmptyQueueException
from subprocess import PIPE, Popen
from sys import platform
import numpy as np
import pandas as pd
import psutil
import ujson as json
from pandas.io.json import json_normalize
from experiment_impact_tracker.cpu import rapl
from experiment_impact_tracker.cpu.common import get_my_cpu_info
from experiment_impact_tracker.cpu.intel import get_intel_power, get_rapl_power
from experiment_impact_tracker.data_info_and_router import (DATA_HEADERS,
INITIAL_INFO)
from experiment_impact_tracker.data_utils import *
from experiment_impact_tracker.emissions.common import \
is_capable_realtime_carbon_intensity
from experiment_impact_tracker.emissions.get_region_metrics import \
get_current_region_info_cached
from experiment_impact_tracker.gpu.nvidia import (get_gpu_info,
get_nvidia_gpu_power)
from experiment_impact_tracker.utils import (get_timestamp, processify,
safe_file_path,
write_json_data_to_file)
SLEEP_TIME = 1
STOP_MESSAGE = "Stop"
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)
def read_latest_stats(log_dir):
"""
Reads the latest last line of the jsonl file
:param log_dir: log directory to read from
:return: latest data
"""
log_path = os.path.join(log_dir, DATAPATH)
retry = 0
# Sometimes get a race condition
# TODO: should always use filelock for both reads and writes and clean up on termination
while retry < 3:
try:
try:
last_line = subprocess.check_output(["tail", "-1", log_path])
except:
return None
if last_line:
return json.loads(last_line)
else:
return None
except ValueError:
retry += 1
if retry >= 3:
raise
return None
def _sample_and_log_power(log_dir, initial_info, logger=None):
"""
Iterates over compatible metrics and logs the relevant information.
:param log_dir: The log directory to use
:param initial_info: Any initial information that was gathered
:param logger: A logger to use
:return: collected data
"""
current_process = psutil.Process(os.getppid())
process_ids = [current_process.pid] + [
child.pid for child in current_process.children(recursive=True)
]
process_ids = list(
set(process_ids)
) # dedupe so that we don't double count by accident
required_headers = _get_compatible_data_headers(get_current_region_info_cached()[0])
header_information = {}
# for all required headers make sure that we hit the corresponding function which gets that info
# some functions return multiple values in one call (for example one RAPL reading could get multiple things)
# so in that case we fill in information on multiple headers at once even though they have the same routing
# information.
for header in required_headers:
if header["name"] in header_information.keys():
# we already got that info from a multi-return function call
continue
start = time.time()
results = header["routing"]["function"](
process_ids,
logger=logger,
region=initial_info["region"]["id"],
log_dir=log_dir,
)
end = time.time()
logger.info(
"Datapoint {} took {} seconds".format(header["name"], (end - start))
)
if isinstance(results, dict):
# if we return a dict of results, could account for multiple headers
for header_name, item in results.items():
header_information[header_name] = item
else:
header_information[header["name"]] = results
header_information["process_ids"] = process_ids
# once we have gotten all the required info through routing calls for all headers, we log it
log_path = safe_file_path(os.path.join(log_dir, DATAPATH))
try:
write_json_data_to_file(log_path, header_information)
except:
logger.error(header_information)
raise
return header_information
@processify
def launch_power_monitor(queue, log_dir, initial_info, logger=None):
"""
Launches a separate process which monitors metrics
:param queue: A message queue to pass messages back and forth to the thread
:param log_dir: The log directory to use
:param initial_info: Any initial information that was gathered before the thread was launched.
:param logger: A logger to use
:return:
"""
logger.info("Starting process to monitor power")
while True:
try:
message = queue.get(block=False)
if isinstance(message, str):
if message == STOP_MESSAGE:
return
else:
queue.put(message)
except EmptyQueueException:
pass
try:
_sample_and_log_power(log_dir, initial_info, logger=logger)
except:
ex_type, ex_value, tb = sys.exc_info()
logger.error("Encountered exception within power monitor thread!")
logger.error("".join(traceback.format_tb(tb)))
raise
time.sleep(SLEEP_TIME)
def _get_compatible_data_headers(region=None):
"""
Given all the data headers check for each one if it is compatible with the current system.
:param region: The region we're in, required for some checks
:return: which headers are compatible
"""
compatible_headers = []
for header in DATA_HEADERS:
compat = True
for compatability_fn in header["compatability"]:
if not compatability_fn(region=region):
compat = False
break
if compat:
compatible_headers.append(header)
return compatible_headers
def _validate_compatabilities(compatabilities, *args, **kwargs):
"""
Given a list of compatability functions, run the checks
:param compatabilities: a list of compatability functions to call
:param args: any arguments to pass to compatability functions
:param kwargs: any arguments to pass to compatability functions
:return: True if everything compatible, False otherwise
"""
for compatability_fn in compatabilities:
if not compatability_fn(*args, **kwargs):
return False
return True
def gather_initial_info(log_dir: str):
"""Log one time info
For example, CPU/GPU info, version of this package, region, datetime for start of experiment,
CO2 estimate data.
:param log_dir: the log directory to write to
:return: gathered information
"""
info_path = safe_file_path(os.path.join(log_dir, INFOPATH))
data = {}
# Gather all the one-time info specified by the appropriate router
for info_ in INITIAL_INFO:
key = info_["name"]
compatabilities = info_["compatability"]
if _validate_compatabilities(compatabilities):
data[key] = info_["routing"]["function"]()
with open(info_path, "wb") as info_file:
pickle.dump(data, info_file)
# touch datafile to clear out any past cruft and write headers
data_path = safe_file_path(os.path.join(log_dir, DATAPATH))
if os.path.exists(data_path):
os.remove(data_path)
Path(data_path).touch()
return data
class ImpactTracker(object):
def __init__(self, logdir):
self.logdir = logdir
self._setup_logging()
self.logger.info("Gathering system info for reproducibility...")
self.initial_info = gather_initial_info(logdir)
self.logger.info("Done initial setup and information gathering...")
self.launched = False
def _setup_logging(self):
"""
Private function to set up logging handlers
:return:
"""
# Create a custom logger
logger = logging.getLogger(
"experiment_impact_tracker.compute_tracker.ImpactTracker"
)
# Create handlers
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler(
safe_file_path(
os.path.join(self.logdir, BASE_LOG_PATH, "impact_tracker_log.log")
)
)
c_handler.setLevel(logging.WARNING)
f_handler.setLevel(logging.ERROR)
# Create formatters and add it to handlers
c_format = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
f_format = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
# Add handlers to the logger
logger.addHandler(c_handler)
logger.addHandler(f_handler)
self.logger = logger
def launch_impact_monitor(self):
"""
Launches the separate thread which starts polling for metrics
:return:
"""
try:
# the defaults for multiprocessing changed in python 3.8.
# OS X multiprocessing starts processes with spawn instead of fork
multiprocessing.set_start_method("fork")
self.p, self.queue = launch_power_monitor(
self.logdir, self.initial_info, self.logger
)
def _terminate_monitor_and_log_final_info(p):
p.terminate()
log_final_info(self.logdir)
atexit.register(_terminate_monitor_and_log_final_info, self.p)
self.launched = True
except:
ex_type, ex_value, tb = sys.exc_info()
self.logger.error(
"Encountered exception when launching power monitor thread."
)
self.logger.error(ex_type, ex_value, "".join(traceback.format_tb(tb)))
raise
def get_latest_info_and_check_for_errors(self):
"""
Reads the latest information from the log file and checks for errors that may have occured in the separate
process
:return: latest stats
"""
try:
message = self.queue.get(block=False)
if isinstance(message, tuple):
ret, error = message
else:
self.queue.put(message)
if error:
ex_type, ex_value, tb_str = error
message = "%s (in subprocess)\n%s" % (str(ex_value), tb_str)
raise ex_type(message)
except EmptyQueueException:
# Nothing in the message queue
pass
return read_latest_stats(self.logdir)
def __enter__(self):
"""
Allows the object to function as a context and exit.
For example,
with ImpactTracker("./log1"):
do_thing1()
with ImpactTracker("./log2"):
do_thing2()
:return:
"""
if self.launched:
self.logger.error(
"Cannot enter an impact tracker after it has already been launched! Create a new "
"impact tracker object, please."
)
raise ValueError(
"Cannot enter an impact tracker after it has already been launched!"
)
self.launch_impact_monitor()
return self
def __exit__(self, exc_type, exc_value, tb):
"""
Allows the object to function as a context and exit.
For example,
with ImpactTracker("./log1"):
do_thing1()
with ImpactTracker("./log2"):
do_thing2()
:return:
"""
if exc_type is not None:
return False
# Code to start a new transaction
self.stop()
def stop(self):
"""
Stops the monitoring thread
:return:
"""
self.logger.info("Requesting thread shutdown.")
self.queue.put(STOP_MESSAGE)
time.sleep(1)
self.p.terminate()
self.logger.info("Starting - Logging final info.")
log_final_info(self.logdir)
self.logger.info("Done - Logging final info.")