Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trial tracemalloc as memory tracker (replacement v2 PR) #5946

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions benchmarks/benchmarks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

from os import environ
import resource
import tracemalloc

import numpy as np

ARTIFICIAL_DIM_SIZE = int(10e3) # For all artificial cubes, coords etc.

Expand Down Expand Up @@ -66,24 +69,44 @@ class TrackAddedMemoryAllocation:

"""

RESULT_MINIMUM_MB = 5.0
_DEFAULT_RESULT_MINIMUM_MB = 5.0
_DEFAULT_RESULT_ROUND_DP = 1

def __init__(self, use_tracemalloc=False, result_min_mb=None, result_round_dp=None):
self._use_tracemalloc = use_tracemalloc
if result_min_mb is None:
result_min_mb = self._DEFAULT_RESULT_MINIMUM_MB
self.RESULT_MINIMUM_MB = result_min_mb
if result_round_dp is None:
result_round_dp = self._DEFAULT_RESULT_ROUND_DP
self.RESULT_ROUND_DP = result_round_dp

@staticmethod
def process_resident_memory_mb():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.0

def __enter__(self):
self.mb_before = self.process_resident_memory_mb()
if self._use_tracemalloc:
self.mb_before = 0
tracemalloc.start()
else:
self.mb_before = self.process_resident_memory_mb()
return self

def __exit__(self, *_):
self.mb_after = self.process_resident_memory_mb()
if self._use_tracemalloc:
_, peak_mem = tracemalloc.get_traced_memory()
tracemalloc.stop()
self.mb_after = peak_mem * 1.0 / 1024**2
else:
self.mb_after = self.process_resident_memory_mb()

def addedmem_mb(self):
"""Return measured memory growth, in Mb."""
result = self.mb_after - self.mb_before
# Small results are too vulnerable to noise being interpreted as signal.
result = max(self.RESULT_MINIMUM_MB, result)
result = np.round(result, self.RESULT_ROUND_DP)
return result

@staticmethod
Expand Down Expand Up @@ -124,3 +147,23 @@ def on_demand_benchmark(benchmark_object):
"""
if "ON_DEMAND_BENCHMARKS" in environ:
return benchmark_object


def memtrace_benchmark(use_tracemalloc=False, result_min_mb=None):
# Call which returns a decorator == 'decorator with args'.
# N.B. embeds the the call argument in the env of the decorator returned
from functools import wraps

def decorator(decorated_func):
assert decorated_func.__name__[:6] == "track_"

@wraps(decorated_func)
def wrapper(*args, **kwargs):
with TrackAddedMemoryAllocation(
_use_tracemalloc=use_tracemalloc, result_min_mb=result_min_mb
):
result = decorated_func(*args, **kwargs)

return wrapper

return decorator
122 changes: 122 additions & 0 deletions benchmarks/benchmarks/memtrace_evaluation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright Iris contributors
#
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Benchmarks to evaluate tracemalloc/rss methods of memory measurement."""

from .. import TrackAddedMemoryAllocation
from .memory_exercising_task import SampleParallelTask


class MemcheckCommon:
# Basic controls over the test calculation
default_params = {
"measure": "tracemalloc", # alternate: "rss"
"runtype": "threads", # alternate: "processes"
"ysize": 10000,
"nx": 2000,
"nblocks": 6,
"nworkers": 3,
}

def _setup(self, **kwargs):
params = self.default_params.copy()
params.update(kwargs)
measure = params["measure"]
runtype = params["runtype"]
ysize = params["ysize"]
nx = params["nx"]
nblocks = params["nblocks"]
nworkers = params["nworkers"]

nyfull = ysize // nblocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you're applying this division twice, I think that's why adding blocks has such an exagerated effect on performance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes I see !
Hopefully that may explain the peculiar behaviour of the timings too.

I will fix this and re-investigate ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

84ff4ad fixes, I think

use_processes = {"threads": False, "processes": True}[runtype]
self.task = SampleParallelTask(
n_blocks=nblocks,
outerdim=nyfull // nblocks,
innerdim=nx,
n_workers=nworkers,
use_process_workers=use_processes,
)
self.use_tracemalloc = {"tracemalloc": True, "rss": False}[measure]

def run_time_calc(self):
# This usage is a bit crap, as we don't really care about the runtype.
self.task.perform()

def run_addedmem_calc(self):
with TrackAddedMemoryAllocation(
use_tracemalloc=self.use_tracemalloc,
result_min_mb=0.0,
) as tracer:
self.task.perform()
return tracer.addedmem_mb()


def memory_units_mib(func):
func.unit = "Mib"
return func


class MemcheckRunstyles(MemcheckCommon):
# only some are parametrised, or it's just too complicated!
param_names = [
"measure",
"runtype",
"ysize",
]
params = [
# measure
["tracemalloc", "rss"],
# runtype
["threads", "processes"],
# ysize
[10000, 40000],
]

def setup(self, measure, runtype, ysize):
self._setup(measure=measure, runtype=runtype, ysize=ysize)

def time_calc(self, measure, runtype, ysize):
self.run_time_calc()

@memory_units_mib
def track_addmem_calc(self, measure, runtype, ysize):
return self.run_addedmem_calc()


class MemcheckBlocksAndWorkers(MemcheckCommon):
# only some are parametrised, or it's just too complicated!
param_names = [
"nblocks",
"nworkers",
]
params = [
# nblocks
[1, 4, 9],
# nworkers
[1, 2, 3, 4],
]

def setup(self, nblocks, nworkers):
self.default_params["ysize"] = 20000
self._setup(
nblocks=nblocks,
nworkers=nworkers,
)

def time_calc(self, nblocks, nworkers):
self.run_time_calc()

@memory_units_mib
def track_addmem_calc(self, nblocks, nworkers):
return self.run_addedmem_calc()


class MemcheckBlocksAndWorkers_Rss(MemcheckBlocksAndWorkers):
def setup(self, nblocks, nworkers):
self.default_params["measure"] = "rss"
super().setup(
nblocks=nblocks,
nworkers=nworkers,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright Iris contributors
#
# This file is part of Iris and is released under the BSD license.
# See LICENSE in the root of the repository for full licensing details.
"""Provide a standard parallel calculation for testing the memory tracing."""

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

import numpy as np

"""
the basic operation is to for each worker to construct a (NY, NX) numpy
random array, of which it calculates and returns the mean(axis=0)
--> (NX,) result
The results are then collected --> (N_BLOCKS, NX),
and a mean over all calculated --> (NX,)
The final (single-value) result is the *minimum* of that.
"""

# _SHOW_DEBUG = True
_SHOW_DEBUG = False


def debug(msg):
if _SHOW_DEBUG:
print(msg)


def subtask_operation(arg):
i_task, ny, nx = arg
debug(f"\nRunning #{i_task}({ny}, {nx}) ..")
data = np.random.uniform(0.0, 1.0, size=(ny, nx)) # noqa: NPY002
sub_result = data.mean(axis=0)
debug(f"\n.. completed #{i_task}")
return sub_result


class SampleParallelTask:
def __init__(
self,
n_blocks=5,
outerdim=1000,
innerdim=250,
n_workers=4,
use_process_workers=False,
):
self.n_blocks = n_blocks
self.outerdim = outerdim
self.innerdim = innerdim
self.n_workers = n_workers
if use_process_workers:
self.pool_type = ProcessPoolExecutor
else:
self.pool_type = ThreadPoolExecutor
self._setup_calc()

def _setup_calc(self):
self._pool = self.pool_type(self.n_workers)

def perform(self):
partial_results = self._pool.map(
subtask_operation,
[
(i_task + 1, self.outerdim, self.innerdim)
for i_task in range(self.n_blocks)
],
)
combined = np.stack(list(partial_results))
stephenworsley marked this conversation as resolved.
Show resolved Hide resolved
result = np.mean(combined, axis=0)
result = result.min()
return result


if __name__ == "__main__":
nb = 12
nw = 3
ny, nx = 1000, 200
dims = (ny, nx)
use_processes = False
typ = "process" if use_processes else "thread"
msg = f"Starting: blocks={nb} workers={nw} size={dims} type={typ}"
print(msg)
calc = SampleParallelTask(
n_blocks=nb,
outerdim=ny,
innerdim=nx,
n_workers=nw,
use_process_workers=use_processes,
)
debug("Created.")
debug("Run..")
result = calc.perform()
debug("\n.. Run DONE.")
debug(f"result = {result}")
Loading