Skip to content

Commit

Permalink
jemalloc_stats_print can output std::string.
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Sep 12, 2022
1 parent d1b68ed commit eb06ca9
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 54 deletions.
8 changes: 7 additions & 1 deletion cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -672,9 +672,15 @@ Result<uint64_t> jemalloc_get_stat(const char* name) {

Status jemalloc_peak_reset() { return Status::Invalid("jemalloc support is not built"); }

Status jemalloc_stats_print(const char* opts) {
Status jemalloc_stats_print(void (*write_cb)(void*, const char*), void* cbopaque,
const char* opts) {
return Status::Invalid("jemalloc support is not built");
}

Result<std::string> jemalloc_stats_print(const char* opts) {
return Status::Invalid("jemalloc support is not built");
}

#endif

///////////////////////////////////////////////////////////////////////
Expand Down
14 changes: 11 additions & 3 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,26 @@ Status jemalloc_set_decay_ms(int ms);
/// \brief Get basic statistics from jemalloc's mallctl.
/// See the MALLCTL NAMESPACE section in jemalloc project documentation for
/// available stats.
ARROW_EXPORT
Result<uint64_t> jemalloc_get_stat(const char* name);

/// \brief Reset the counter for peak bytes allocated in the calling thread to zero.
/// This affects subsequent calls to thread.peak.read, but not the values returned by
/// thread.allocated or thread.deallocated.
ARROW_EXPORT
Status jemalloc_peak_reset();

/// \brief Print summary statistics in human-readable form.
///
/// \brief Print summary statistics in human-readable form to stderr.
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT
Status jemalloc_stats_print(void (*write_cb)(void* opaque, const char* buf),
void* cbopaque, const char* opts = "");

/// \brief Get summary statistics in human-readable form.
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT Status jemalloc_stats_print(const char* opts = "");
Result<std::string> jemalloc_stats_print(const char* opts = "");

/// \brief Return a process-wide memory pool based on mimalloc.
///
Expand Down
58 changes: 42 additions & 16 deletions cpp/src/arrow/memory_pool_jemalloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,18 @@ Result<uint64_t> jemalloc_get_stat(const char* name) {
int err;
uint64_t value;

if (std::strcmp(name, "thread.allocatedp") == 0 ||
std::strcmp(name, "thread.deallocatedp") == 0) {
uint64_t* tmp_value;
err = mallctl(name, &tmp_value, &sz, NULLPTR, 0);
value = *tmp_value;
} else if (std::strcmp(name, "stats.allocated") == 0 ||
std::strcmp(name, "stats.active") == 0 ||
std::strcmp(name, "stats.metadata") == 0 ||
std::strcmp(name, "stats.resident") == 0 ||
std::strcmp(name, "stats.mapped") == 0 ||
std::strcmp(name, "stats.retained") == 0) {
if (std::strcmp(name, "stats.allocated") == 0 ||
std::strcmp(name, "stats.active") == 0 ||
std::strcmp(name, "stats.metadata") == 0 ||
std::strcmp(name, "stats.resident") == 0 ||
std::strcmp(name, "stats.mapped") == 0 ||
std::strcmp(name, "stats.retained") == 0) {
uint64_t epoch;
mallctl("epoch", &epoch, &sz, &epoch, sz);
err = mallctl(name, &value, &sz, NULLPTR, 0);
} else {
err = mallctl(name, &value, &sz, NULLPTR, 0);
}

err = mallctl(name, &value, &sz, NULLPTR, 0);

if (err) {
return arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name);
}
Expand All @@ -191,8 +185,40 @@ Status jemalloc_peak_reset() {
: Status::OK();
}

Status jemalloc_stats_print(const char* opts) {
malloc_stats_print(NULLPTR, NULLPTR, opts);
typedef struct {
char* buf;
size_t len;
} parser_t;

void write_cb(void* opaque, const char* str) {
parser_t* parser = reinterpret_cast<parser_t*>(opaque);
size_t len = strlen(str);
char* buf = (parser->buf == NULL)
? reinterpret_cast<char*>(mallocx(len + 1, MALLOCX_TCACHE_NONE))
: reinterpret_cast<char*>(
rallocx(parser->buf, parser->len + len + 1, MALLOCX_TCACHE_NONE));
if (buf == NULL) {
ARROW_LOG(ERROR) << "Unexpected input appending failure";
}
memcpy(&buf[parser->len], str, len + 1);
parser->buf = buf;
parser->len += len;
}

Result<std::string> jemalloc_stats_print(const char* opts) {
parser_t parser = parser_t{NULL, 0};
malloc_stats_print(write_cb, static_cast<void*>(&parser), opts);
std::string stats = parser.buf;
if (parser.buf != NULL) {
dallocx(parser.buf, MALLOCX_TCACHE_NONE);
}
return stats;
}

Status jemalloc_stats_print(void (*write_cb)(void*, const char*), void* cbopaque,
const char* opts) {
malloc_stats_print(reinterpret_cast<void (*)(void*, const char*)>(write_cb), cbopaque,
opts);
return Status::OK();
}
#endif
Expand Down
62 changes: 28 additions & 34 deletions cpp/src/arrow/memory_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,8 @@ TEST(Jemalloc, GetAllocationStats) {
uint8_t* data;
uint64_t allocated, active, metadata, resident, mapped, retained, allocated0, active0,
metadata0, resident0, mapped0, retained0;
uint64_t thread_allocatedp, thread_deallocatedp, thread_allocatedp0,
thread_deallocatedp0, thread_allocated, thread_deallocated, thread_peak_read,
thread_allocated0, thread_deallocated0, thread_peak_read0;
uint64_t thread_allocated, thread_deallocated, thread_peak_read, thread_allocated0,
thread_deallocated0, thread_peak_read0;
auto pool = default_memory_pool();
ABORT_NOT_OK(jemalloc_memory_pool(&pool));
ASSERT_EQ("jemalloc", pool->backend_name());
Expand All @@ -195,15 +194,13 @@ TEST(Jemalloc, GetAllocationStats) {
ASSERT_OK_AND_ASSIGN(thread_allocated0, jemalloc_get_stat("thread.allocated"));
ASSERT_OK_AND_ASSIGN(thread_deallocated0, jemalloc_get_stat("thread.deallocated"));
ASSERT_OK_AND_ASSIGN(thread_peak_read0, jemalloc_get_stat("thread.peak.read"));
ASSERT_OK_AND_ASSIGN(thread_allocatedp0, jemalloc_get_stat("thread.allocatedp"));
ASSERT_OK_AND_ASSIGN(thread_deallocatedp0, jemalloc_get_stat("thread.deallocatedp"));

// Allocate memory
ASSERT_OK(jemalloc_set_decay_ms(10000));
ASSERT_OK(pool->Allocate(1256, &data));
ASSERT_EQ(1256, pool->bytes_allocated());
ASSERT_EQ(pool->bytes_allocated(), 1256);
ASSERT_OK(pool->Reallocate(1256, 1214, &data));
ASSERT_EQ(1214, pool->bytes_allocated());
ASSERT_EQ(pool->bytes_allocated(), 1214);

// Record stats after allocating
ASSERT_OK_AND_ASSIGN(allocated, jemalloc_get_stat("stats.allocated"));
Expand All @@ -215,46 +212,43 @@ TEST(Jemalloc, GetAllocationStats) {
ASSERT_OK_AND_ASSIGN(thread_allocated, jemalloc_get_stat("thread.allocated"));
ASSERT_OK_AND_ASSIGN(thread_deallocated, jemalloc_get_stat("thread.deallocated"));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));
ASSERT_OK_AND_ASSIGN(thread_allocatedp, jemalloc_get_stat("thread.allocatedp"));
ASSERT_OK_AND_ASSIGN(thread_deallocatedp, jemalloc_get_stat("thread.deallocatedp"));

// Reading stats via value return is equivalent to pointer passing
ASSERT_EQ(thread_allocated, thread_allocatedp);
ASSERT_EQ(thread_deallocated, thread_deallocatedp);
ASSERT_EQ(thread_allocated0, thread_allocatedp0);
ASSERT_EQ(thread_deallocated0, thread_deallocatedp0);

// Check allocated stats pre-allocation
ASSERT_EQ(71424, allocated0);
ASSERT_EQ(131072, active0);
ASSERT_EQ(2814368, metadata0);
ASSERT_EQ(2899968, resident0);
ASSERT_EQ(6422528, mapped0);
ASSERT_EQ(0, retained0);
ASSERT_NEAR(allocated0, 122624, 100000);
ASSERT_NEAR(active0, 131072, 10000);
ASSERT_NEAR(metadata0, 3000000, 1000000);
ASSERT_NEAR(resident0, 3000000, 1000000);
ASSERT_NEAR(mapped0, 6500000, 1000000);
ASSERT_NEAR(retained0, 0, 100);

// Check allocated stats change due to allocation
ASSERT_EQ(81920, allocated - allocated0);
ASSERT_EQ(81920, active - active0);
ASSERT_EQ(384, metadata - metadata0);
ASSERT_EQ(98304, resident - resident0);
ASSERT_EQ(81920, mapped - mapped0);
ASSERT_EQ(0, retained - retained0);
ASSERT_NEAR(allocated - allocated0, 81920, 1000);
ASSERT_NEAR(active - active0, 81920, 1000);
ASSERT_NEAR(metadata - metadata0, 384, 100);
ASSERT_NEAR(resident - resident0, 98304, 1000);
ASSERT_NEAR(mapped - mapped0, 81920, 1000);
ASSERT_NEAR(retained - retained0, 0, 0);

ASSERT_EQ(1280, thread_peak_read - thread_peak_read0);
ASSERT_EQ(2560, thread_allocated - thread_allocated0);
ASSERT_EQ(1280, thread_deallocated - thread_deallocated0);
ASSERT_EQ(thread_peak_read - thread_peak_read0, 1280);
ASSERT_EQ(thread_allocated - thread_allocated0, 2560);
ASSERT_EQ(thread_deallocated - thread_deallocated0, 1280);

// Resetting thread peak read metric
ASSERT_OK(pool->Allocate(12560, &data));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));
ASSERT_EQ(15616, thread_peak_read);
ASSERT_NEAR(thread_peak_read, 15616, 1000);
ASSERT_OK(jemalloc_peak_reset());
ASSERT_OK(pool->Allocate(1256, &data));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));
ASSERT_EQ(1280, thread_peak_read);
ASSERT_NEAR(thread_peak_read, 1280, 100);

// Print statistics to stderr
ASSERT_OK(jemalloc_stats_print(NULLPTR, NULLPTR, "J"));

// Read statistics into std::string
ASSERT_OK_AND_ASSIGN(std::string stats, jemalloc_stats_print("Jax"));
ASSERT_TRUE(stats.rfind("{\"jemalloc\":{\"version\"", 0) == 0);

// Print statistics to stdout
ASSERT_OK(jemalloc_stats_print("ax"));
#else
ASSERT_RAISES(Invalid, jemalloc_get_stat("thread.peak.read"));
ASSERT_RAISES(Invalid, jemalloc_get_stat("stats.allocated"));
Expand Down

0 comments on commit eb06ca9

Please sign in to comment.