Skip to content

Commit

Permalink
[GR-44520] Add benchmark RSS percentile metrics using psrecord.
Browse files Browse the repository at this point in the history
PullRequest: mx/1571
  • Loading branch information
peter-hofer committed Mar 3, 2023
2 parents 18b5c73 + 8614bc5 commit a8233f6
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 96 deletions.
2 changes: 1 addition & 1 deletion mx.py
Original file line number Diff line number Diff line change
Expand Up @@ -18371,7 +18371,7 @@ def alarm_handler(signum, frame):
abort(1, killsig=signal.SIGINT)

# The version must be updated for every PR (checked in CI) and the comment should reflect the PR's issue
version = VersionSpec("6.16.1") # GR-44550
version = VersionSpec("6.16.2") # GR-44520 add benchmark RSS percentile metrics using psrecord.

currentUmask = None
_mx_start_datetime = datetime.utcnow()
Expand Down
252 changes: 157 additions & 95 deletions mx_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ def __init__(self, *args, **kwargs):
self._desired_version = None
self._suite_dimensions = {}
self._command_mapper_hooks = []
self._trackers = []
self._currently_running_benchmark = None

def name(self):
Expand Down Expand Up @@ -432,6 +433,15 @@ def register_command_mapper_hook(self, name, func):
"""
self._command_mapper_hooks.append((name, func, self))

def register_tracker(self, name, tracker_type):
tracker = tracker_type(self)
self._trackers.append(tracker)

def hook(cmd, suite):
assert suite == self
return tracker.map_command(cmd)
self.register_command_mapper_hook(name, hook)

def version(self):
"""The suite version selected for execution which is either the :defaultSuiteVerion:
or the :desiredVersion: if any.
Expand Down Expand Up @@ -1120,7 +1130,10 @@ def compiled(pat):
return []

datapoints = []
rules = self.rules(out, benchmarks, bmSuiteArgs) + _get_trackers_rules(self, bmSuiteArgs) + extraRules
rules = self.rules(out, benchmarks, bmSuiteArgs)
for t in self._trackers:
rules += t.get_rules(bmSuiteArgs)
rules += extraRules

for rule in rules:
# pass working directory to rule without changing the signature of parse
Expand Down Expand Up @@ -2545,55 +2558,6 @@ def build_url():
return ""


def get_rss_parse_rule(suite, bmSuiteArgs):
if mx_benchmark_compatibility().bench_suite_needs_suite_args():
suite_name = suite.benchSuiteName(bmSuiteArgs)
else:
suite_name = suite.benchSuiteName()
if mx.get_os() == "linux":
# Output of 'time -v' on linux contains:
# Maximum resident set size (kbytes): 511336
rule = [
StdOutRule(
r"Maximum resident set size \(kbytes\): (?P<rss>[0-9]+)",
{
"benchmark": suite.currently_running_benchmark(),
"bench-suite": suite_name,
"config.vm-flags": ' '.join(suite.vmArgs(bmSuiteArgs)),
"metric.name": "max-rss",
"metric.value": ("<rss>", lambda x: int(float(x)/(1024))),
"metric.unit": "MB",
"metric.type": "numeric",
"metric.score-function": "id",
"metric.better": "lower",
"metric.iteration": 0
}
)
]
elif mx.get_os() == "darwin":
# Output of 'time -l' on linux contains (size in bytes):
# 523608064 maximum resident set size
rule = [
StdOutRule(
r"(?P<rss>[0-9]+)\s+maximum resident set size",
{
"benchmark": suite.currently_running_benchmark(),
"bench-suite": suite_name,
"config.vm-flags": ' '.join(suite.vmArgs(bmSuiteArgs)),
"metric.name": "max-rss",
"metric.value": ("<rss>", lambda x: int(float(x)/(1024*1024))),
"metric.unit": "MB",
"metric.type": "numeric",
"metric.score-function": "id",
"metric.better": "lower",
"metric.iteration": 0
}
)
]
else:
rule = []
return rule

_use_tracker = True

def enable_tracker():
Expand All @@ -2604,58 +2568,154 @@ def disable_tracker():
global _use_tracker
_use_tracker = False

def rss_hook(cmd, bmSuite):
"""
Tracks the max resident memory size used by the process using the 'time' command.
class Tracker(object):
def __init__(self, bmSuite):
self.bmSuite = bmSuite

:param list[str] cmd: the input command to modify
:param BenchmarkSuite bmSuite: the benchmark suite to which this command corresponds to if any
:return:
"""
if not _use_tracker:
return cmd
if mx.get_os() == "linux":
prefix = ["time", "-v"]
elif mx.get_os() == "darwin":
prefix = ["time", "-l"]
else:
mx.log(f"Ignoring the 'rss' tracker since it is not supported on {mx.get_os()}")
prefix = []
return prefix + cmd
def map_command(self, cmd):
raise NotImplementedError()

def get_rules(self, bmSuiteArgs):
raise NotImplementedError()

class RssTracker(Tracker):
def map_command(self, cmd):
"""
Tracks the max resident memory size used by the process using the 'time' command.
def psrecord_hook(cmd, bmSuite):
"""
Delegates the command execution to 'psrecord' that will also capture memory and CPU consumption of the process.
:param list[str] cmd: the input command to modify
:return:
"""
if not _use_tracker:
return cmd
if mx.get_os() == "linux":
prefix = ["time", "-v"]
elif mx.get_os() == "darwin":
prefix = ["time", "-l"]
else:
mx.log(f"Ignoring the 'rss' tracker since it is not supported on {mx.get_os()}")
prefix = []
return prefix + cmd

:param list[str] cmd: the input command to modify
:param BenchmarkSuite bmSuite: the benchmark suite to which this command corresponds to if any
:return:
"""
if not _use_tracker:
return cmd
def get_rules(self, bmSuiteArgs):
if mx_benchmark_compatibility().bench_suite_needs_suite_args():
suite_name = self.bmSuite.benchSuiteName(bmSuiteArgs)
else:
suite_name = self.bmSuite.benchSuiteName()
if mx.get_os() == "linux":
# Output of 'time -v' on linux contains:
# Maximum resident set size (kbytes): 511336
rules = [
StdOutRule(
r"Maximum resident set size \(kbytes\): (?P<rss>[0-9]+)",
{
"benchmark": self.bmSuite.currently_running_benchmark(),
"bench-suite": suite_name,
"config.vm-flags": ' '.join(self.bmSuite.vmArgs(bmSuiteArgs)),
"metric.name": "max-rss",
"metric.value": ("<rss>", lambda x: int(float(x)/(1024))),
"metric.unit": "MB",
"metric.type": "numeric",
"metric.score-function": "id",
"metric.better": "lower",
"metric.iteration": 0
}
)
]
elif mx.get_os() == "darwin":
# Output of 'time -l' on linux contains (size in bytes):
# 523608064 maximum resident set size
rules = [
StdOutRule(
r"(?P<rss>[0-9]+)\s+maximum resident set size",
{
"benchmark": self.bmSuite.currently_running_benchmark(),
"bench-suite": suite_name,
"config.vm-flags": ' '.join(self.bmSuite.vmArgs(bmSuiteArgs)),
"metric.name": "max-rss",
"metric.value": ("<rss>", lambda x: int(float(x)/(1024*1024))),
"metric.unit": "MB",
"metric.type": "numeric",
"metric.score-function": "id",
"metric.better": "lower",
"metric.iteration": 0
}
)
]
else:
rules = []
return rules

if mx.run(["psrecord", "-h"], nonZeroIsFatal=False, out=mx.OutputCapture(), err=mx.OutputCapture()) != 0:
mx.abort("Memory tracking requires the 'psrecord' dependency. Install it with: 'pip install psrecord'")

import datetime
bench_name = bmSuite.currently_running_benchmark() if bmSuite else "benchmark"
if bmSuite:
bench_name = f"{bmSuite.name()}-{bench_name}"
ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
text_output = os.path.join(os.getcwd(), f"ps_{bench_name}_{ts}.txt")
plot_output = os.path.join(os.getcwd(), f"ps_{bench_name}_{ts}.png")
return ["psrecord", "--log", text_output, "--plot", plot_output, "--include-children", " ".join(cmd)]
class PsrecordTracker(Tracker):
def map_command(self, cmd):
"""
Delegates the command execution to 'psrecord' that will also capture memory and CPU consumption of the process.
:param list[str] cmd: the input command to modify
"""
if not _use_tracker:
return cmd

_available_trackers = {
"rss": rss_hook,
"psrecord": psrecord_hook
}
import datetime
bench_name = self.bmSuite.currently_running_benchmark() if self.bmSuite else "benchmark"
if self.bmSuite:
bench_name = f"{self.bmSuite.name()}-{bench_name}"
ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
text_output = os.path.join(os.getcwd(), f"ps_{bench_name}_{ts}.txt")
plot_output = os.path.join(os.getcwd(), f"ps_{bench_name}_{ts}.png")
if mx.run(["psrecord", "-h"], nonZeroIsFatal=False, out=mx.OutputCapture(), err=mx.OutputCapture()) != 0:
mx.abort("Memory tracking requires the 'psrecord' dependency. Install it with: 'pip install psrecord'")

self.most_recent_text_output = text_output
return ["psrecord", "--interval", "0.050", "--log", text_output, "--plot", plot_output, "--include-children", " ".join(cmd)]

def get_rules(self, bmSuiteArgs):
return [PsrecordTracker.PsrecordRule(self, bmSuiteArgs)]

class PsrecordRule(CSVBaseRule):
def __init__(self, tracker, bmSuiteArgs, **kwargs):
replacement = {
"benchmark": tracker.bmSuite.currently_running_benchmark(),
"bench-suite": tracker.bmSuite.benchSuiteName(bmSuiteArgs) if mx_benchmark_compatibility().bench_suite_needs_suite_args() else tracker.bmSuite.benchSuiteName(),
"config.vm-flags": ' '.join(tracker.bmSuite.vmArgs(bmSuiteArgs)),
"metric.name": ("<metric_name>", str),
"metric.value": ("<metric_value>", int),
"metric.unit": "MB",
"metric.type": "numeric",
"metric.score-function": "id",
"metric.better": "lower",
"metric.iteration": 0
}
super().__init__(["elapsed-wall", "cpu%", "rss_mb", "vsz_mb"],
replacement, delimiter=' ', skipinitialspace=True, **kwargs)
self.tracker = tracker

def getCSVFiles(self, text):
file = self.tracker.most_recent_text_output
return [file] if file else []

def parseResults(self, text):
rows = super().parseResults(text)
rows = rows[1:] # first row contains malformatted (unquoted) headings
if not rows:
return []
values = sorted(float(r["rss_mb"]) for r in rows)

def _get_trackers_rules(suite, bmSuiteArgs):
return get_rss_parse_rule(suite, bmSuiteArgs)
def pc(k): # k-percentile with linear interpolation between closest ranks
x = (len(values) - 1) * k / 100
fr = int(x)
cl = int(x + 0.5)
v = values[fr] if fr == cl else values[fr] * (cl - x) + values[cl] * (x - fr)
return {"metric_name": "rss_p" + str(k), "metric_value": str(int(v))}

return [pc(100), pc(99), pc(95), pc(90), pc(75), pc(50), pc(25)]


_available_trackers = {
"rss": RssTracker,
"psrecord": PsrecordTracker,
}


class BenchmarkExecutor(object):
Expand Down Expand Up @@ -2979,8 +3039,7 @@ def benchmark(self, mxBenchmarkArgs, bmSuiteArgs, returnSuiteAndResults=False):
raise ValueError(f"Unknown tracker '{mxBenchmarkArgs.tracker}'. Use one of: {', '.join(_available_trackers.keys())}")
if suite:
mx.log(f"Registering tracker: {mxBenchmarkArgs.tracker}")
suite.register_command_mapper_hook(mxBenchmarkArgs.tracker,
_available_trackers[mxBenchmarkArgs.tracker])
suite.register_tracker(mxBenchmarkArgs.tracker, _available_trackers[mxBenchmarkArgs.tracker])

if mxBenchmarkArgs.list:
if mxBenchmarkArgs.benchmark and suite:
Expand Down Expand Up @@ -3067,7 +3126,10 @@ def benchmark(self, mxBenchmarkArgs, bmSuiteArgs, returnSuiteAndResults=False):
results.extend(self.execute(suite, benchnames, mxBenchmarkArgs, suiteArgs, fork_number=fork_num))
except (BenchmarkFailureError, RuntimeError):
failures_seen = True
failed_benchmarks.append(f"{suite.name()}:{benchnames[0]}")
if benchnames and len(benchnames) > 0:
failed_benchmarks.append(f"{suite.name()}:{benchnames[0]}")
else:
failed_benchmarks.append(f"{suite.name()}")
mx.log(traceback.format_exc())
if mxBenchmarkArgs.fail_fast:
mx.abort("Aborting execution since a failure happened and --fail-fast is enabled")
Expand Down

0 comments on commit a8233f6

Please sign in to comment.