Skip to content

Commit

Permalink
Add a progress bar indicator for record processing
Browse files Browse the repository at this point in the history
As record processing is an operation that can take an arbitrary amount
of time depending on the size of the file, some users can be frustrated
because they don't know how much time the processing is going to take.

To help with this, add a progress bar indicator that periodically
updates while we are processing records in the `FileReader`. The class is
updated strategically to avoid impacting the performance of the actual
processing.

Signed-off-by: Pablo Galindo <pablogsal@gmail.com>
  • Loading branch information
pablogsal committed May 19, 2022
1 parent 8623bfb commit ecb37f5
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 28 deletions.
1 change: 1 addition & 0 deletions news/111.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a progress bar indicator to the record processing phases in the different reporters so users can have an approximate idea of how much time processing the result files will take.
4 changes: 3 additions & 1 deletion src/memray/_memray.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def start_thread_trace(frame: FrameType, event: str, arg: Any) -> None: ...
class FileReader:
@property
def metadata(self) -> Metadata: ...
def __init__(self, file_name: Union[str, Path]) -> None: ...
def __init__(
self, file_name: Union[str, Path], *, report_progress: bool = False
) -> None: ...
def get_allocation_records(self) -> Iterable[AllocationRecord]: ...
def get_high_watermark_allocation_records(
self,
Expand Down
120 changes: 99 additions & 21 deletions src/memray/_memray.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ cimport cython
import threading
from datetime import datetime

from rich.progress import Progress

from _memray.logging cimport setLogThreshold
from _memray.record_reader cimport RecordReader
from _memray.record_reader cimport RecordResult
Expand Down Expand Up @@ -335,21 +337,80 @@ def start_thread_trace(frame, event, arg):
return start_thread_trace


cdef class ProgressIndicator:
cdef bool _report_progress
cdef object _indicator
cdef object _context_manager
cdef object _task
cdef object _task_description
cdef object _total
cdef size_t _update_interval

def __init__(self,
str task_description,
object total,
bool report_progress=True,
size_t refresh_per_second=60,
float update_ratio=0.05,
):
self._report_progress = report_progress
self._total = total
# Some arbitrary number of recorsd to keep the bar updating
self._update_interval = 200
self._task_description = task_description
self._task = None
self._context_manager = None
if total is not None and total > 1.0 / update_ratio:
self._update_interval = <size_t>(total * update_ratio)
if report_progress:
self._indicator = Progress(
refresh_per_second=refresh_per_second,
transient=True,
)

def __enter__(self):
if not self._report_progress:
return self
self._context_manager = self._indicator.__enter__()
self._task = self._context_manager.add_task(
f"[blue]{self._task_description}...",
total=self._total
)
return self

def __exit__(self ,type, value, traceback):
if not self._report_progress:
return
return self._context_manager.__exit__(type, value, traceback)

cdef void update(self, size_t n_processed) except*:
if not self._report_progress:
return
assert(self._context_manager is not None)
if n_processed % self._update_interval == 0:
self._context_manager.update(
self._task,
advance=self._update_interval
)


cdef class FileReader:
cdef cppstring _path

cdef object _file
cdef vector[_MemoryRecord] _memory_records
cdef HighWatermark _high_watermark
cdef object _header
cdef bool _report_progress

def __cinit__(self, object file_name):
def __cinit__(self, object file_name, *, bool report_progress=False):
try:
self._file = open(file_name)
except OSError as exc:
raise OSError(f"Could not open file {file_name}: {exc.strerror}") from None

self._path = "/proc/self/fd/" + str(self._file.fileno())
self._report_progress = report_progress

# Initial pass to populate _header, _high_watermark, and _memory_records.
cdef shared_ptr[RecordReader] reader_sp = make_shared[RecordReader](
Expand All @@ -366,18 +427,27 @@ cdef class FileReader:
n_memory_records_approx = (stats["end_time"] - stats["start_time"]) / 10
self._memory_records.reserve(n_memory_records_approx)

cdef object total = stats['n_allocations'] or None
cdef HighWatermarkFinder finder
while True:
PyErr_CheckSignals()
ret = reader.nextRecord()
if ret == RecordResult.RecordResultAllocationRecord:
finder.processAllocation(reader.getLatestAllocation())
elif ret == RecordResult.RecordResultMemoryRecord:
self._memory_records.push_back(reader.getLatestMemoryRecord())
else:
break
self._high_watermark = finder.getHighWatermark()

cdef ProgressIndicator progress_indicator = ProgressIndicator(
"Calculating high watermark",
total=total,
report_progress=self._report_progress
)
with progress_indicator:
while True:
PyErr_CheckSignals()
ret = reader.nextRecord()
if ret == RecordResult.RecordResultAllocationRecord:
finder.processAllocation(reader.getLatestAllocation())
elif ret == RecordResult.RecordResultMemoryRecord:
self._memory_records.push_back(reader.getLatestMemoryRecord())
else:
break
progress_indicator.update(reader.getProcessedAllocations())
self._high_watermark = finder.getHighWatermark()

def __dealloc__(self):
self.close()

Expand Down Expand Up @@ -408,16 +478,24 @@ cdef class FileReader:
)
cdef RecordReader* reader = reader_sp.get()

while records_to_process > 0:
PyErr_CheckSignals()
ret = reader.nextRecord()
if ret == RecordResult.RecordResultAllocationRecord:
aggregator.addAllocation(reader.getLatestAllocation())
records_to_process -= 1
elif ret == RecordResult.RecordResultMemoryRecord:
pass
else:
break
cdef ProgressIndicator progress_indicator = ProgressIndicator(
"Processing allocation records",
total=records_to_process,
report_progress=self._report_progress
)

with progress_indicator:
while records_to_process > 0:
PyErr_CheckSignals()
ret = reader.nextRecord()
if ret == RecordResult.RecordResultAllocationRecord:
aggregator.addAllocation(reader.getLatestAllocation())
records_to_process -= 1
progress_indicator.update(records_to_process)
elif ret == RecordResult.RecordResultMemoryRecord:
pass
else:
break

for elem in Py_ListFromSnapshotAllocationRecords(
aggregator.getSnapshotAllocations(merge_threads)
Expand Down
8 changes: 8 additions & 0 deletions src/memray/_memray/record_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ RecordReader::nextRecord()
if (d_input->is_open()) LOG(ERROR) << "Failed to process allocation record";
return RecordResult::ERROR;
}
d_processed_allocations += 1;
return RecordResult::ALLOCATION_RECORD;
} break;
case RecordType::ALLOCATION_WITH_NATIVE: {
Expand All @@ -452,6 +453,7 @@ RecordReader::nextRecord()
}
return RecordResult::ERROR;
}
d_processed_allocations += 1;
return RecordResult::ALLOCATION_RECORD;
} break;
case RecordType::MEMORY_RECORD: {
Expand Down Expand Up @@ -635,6 +637,12 @@ RecordReader::getLatestMemoryRecord() const noexcept
return d_latest_memory_record;
}

size_t
RecordReader::getProcessedAllocations() const noexcept
{
return d_processed_allocations;
}

PyObject*
RecordReader::dumpAllRecords()
{
Expand Down
3 changes: 3 additions & 0 deletions src/memray/_memray/record_reader.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <assert.h>
#include <atomic>
#include <fstream>
#include <functional>
#include <limits>
Expand Down Expand Up @@ -50,6 +51,7 @@ class RecordReader
std::string getThreadName(thread_id_t tid);
Allocation getLatestAllocation() const noexcept;
MemoryRecord getLatestMemoryRecord() const noexcept;
size_t getProcessedAllocations() const noexcept;

private:
// Aliases
Expand Down Expand Up @@ -83,6 +85,7 @@ class RecordReader
std::unordered_map<thread_id_t, std::string> d_thread_names;
Allocation d_latest_allocation;
MemoryRecord d_latest_memory_record;
std::atomic<size_t> d_processed_allocations{0};

// Methods
[[nodiscard]] bool parseFramePush(FramePush* record);
Expand Down
1 change: 1 addition & 0 deletions src/memray/_memray/record_reader.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ cdef extern from "record_reader.h" namespace "memray::api":
string getThreadName(long int tid) except+
Allocation getLatestAllocation()
MemoryRecord getLatestMemoryRecord()
size_t getProcessedAllocations()
2 changes: 1 addition & 1 deletion src/memray/commands/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def write_report(
merge_threads: Optional[bool] = None,
) -> None:
try:
reader = FileReader(os.fspath(result_path))
reader = FileReader(os.fspath(result_path), report_progress=True)
if show_memory_leaks:
snapshot = reader.get_leaked_allocation_records(
merge_threads=merge_threads if merge_threads is not None else True
Expand Down
2 changes: 1 addition & 1 deletion src/memray/commands/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def run(self, args: argparse.Namespace, parser: argparse.ArgumentParser) -> None
result_path = Path(args.results)
if not result_path.exists() or not result_path.is_file():
raise MemrayCommandError(f"No such file: {args.results}", exit_code=1)
reader = FileReader(os.fspath(args.results))
reader = FileReader(os.fspath(args.results), report_progress=True)
try:
if args.include_all_allocations:
snapshot = iter(reader.get_allocation_records())
Expand Down
2 changes: 1 addition & 1 deletion src/memray/commands/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def run(self, args: argparse.Namespace, parser: argparse.ArgumentParser) -> None
result_path = Path(args.results)
if not result_path.exists() or not result_path.is_file():
raise MemrayCommandError(f"No such file: {args.results}", exit_code=1)
reader = FileReader(os.fspath(args.results))
reader = FileReader(os.fspath(args.results), report_progress=True)
try:
snapshot = iter(
reader.get_high_watermark_allocation_records(merge_threads=True)
Expand Down
2 changes: 1 addition & 1 deletion src/memray/commands/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def run(self, args: argparse.Namespace, parser: argparse.ArgumentParser) -> None
result_path = Path(args.results)
if not result_path.exists() or not result_path.is_file():
raise MemrayCommandError(f"No such file: {args.results}", exit_code=1)
reader = FileReader(os.fspath(args.results))
reader = FileReader(os.fspath(args.results), report_progress=True)
try:
snapshot = iter(
reader.get_high_watermark_allocation_records(merge_threads=False)
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_highwatermark_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def test_tracker_and_reporter_interactions_for_peak(self, tmp_path, merge_thread

# THEN
calls = [
call(os.fspath(result_path)),
call(os.fspath(result_path), report_progress=True),
call().get_high_watermark_allocation_records(merge_threads=merge_threads),
call().get_memory_records(),
]
Expand Down Expand Up @@ -190,7 +190,7 @@ def test_tracker_and_reporter_interactions_for_leak(self, tmp_path, merge_thread

# THEN
calls = [
call(os.fspath(result_path)),
call(os.fspath(result_path), report_progress=True),
call().get_leaked_allocation_records(merge_threads=merge_threads),
call().get_memory_records(),
]
Expand Down

0 comments on commit ecb37f5

Please sign in to comment.