Skip to content

Commit

Permalink
Refactoring and review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Sep 9, 2022
1 parent 29232eb commit d1b68ed
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 76 deletions.
10 changes: 1 addition & 9 deletions cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -666,15 +666,7 @@ Status jemalloc_set_decay_ms(int ms) {
return Status::Invalid("jemalloc support is not built");
}

Status jemalloc_get_stat(const char* name, size_t& out) {
return Status::Invalid("jemalloc support is not built");
}

Status jemalloc_get_stat(const char* name, uint64_t& out) {
return Status::Invalid("jemalloc support is not built");
}

Status jemalloc_get_statp(const char* name, uint64_t& out) {
Result<uint64_t> jemalloc_get_stat(const char* name) {
return Status::Invalid("jemalloc support is not built");
}

Expand Down
13 changes: 4 additions & 9 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,10 @@ ARROW_EXPORT Status jemalloc_memory_pool(MemoryPool** out);
ARROW_EXPORT
Status jemalloc_set_decay_ms(int ms);

/// \brief Get basic allocation statistics from jemalloc
/// \brief Get basic statistics from jemalloc's mallctl.
/// See the MALLCTL NAMESPACE section in jemalloc project documentation for
/// available stats.
Status jemalloc_get_stat(const char* name, size_t& out);
Status jemalloc_get_stat(const char* name, uint64_t& out);

/// \brief Get basic allocation statistics from jemalloc by passing a pointer
/// This is useful for avoiding the overhead of repeated copy calls.
Status jemalloc_get_statp(const char* name, uint64_t& out);
Result<uint64_t> jemalloc_get_stat(const char* name);

/// \brief Reset the counter for peak bytes allocated in the calling thread to zero.
/// This affects subsequent calls to thread.peak.read, but not the values returned by
Expand All @@ -193,8 +188,8 @@ Status jemalloc_peak_reset();

/// \brief Print summary statistics in human-readable form.
///
/// See malloc_stats_print
/// documentation in jemalloc project documentation for available opt flags.
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT Status jemalloc_stats_print(const char* opts = "");

/// \brief Return a process-wide memory pool based on mimalloc.
Expand Down
50 changes: 23 additions & 27 deletions cpp/src/arrow/memory_pool_jemalloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,38 +155,34 @@ Status jemalloc_set_decay_ms(int ms) {
#undef RETURN_IF_JEMALLOC_ERROR

#ifdef ARROW_JEMALLOC
Status jemalloc_get_stat(const char* name, size_t& out) {
uint64_t epoch;
Result<uint64_t> jemalloc_get_stat(const char* name) {
size_t sz = sizeof(uint64_t);
mallctl("epoch", &epoch, &sz, &epoch, sz);

size_t value;
sz = sizeof(size_t);
int err = mallctl(name, &value, &sz, NULLPTR, 0);
out = std::move(value);

return err ? arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name)
: Status::OK();
}

Status jemalloc_get_stat(const char* name, uint64_t& out) {
int err;
uint64_t value;
size_t sz = sizeof(uint64_t);
int err = mallctl(name, &value, &sz, NULLPTR, 0);
out = value;

return err ? arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name)
: Status::OK();
}
if (std::strcmp(name, "thread.allocatedp") == 0 ||
std::strcmp(name, "thread.deallocatedp") == 0) {
uint64_t* tmp_value;
err = mallctl(name, &tmp_value, &sz, NULLPTR, 0);
value = *tmp_value;
} else if (std::strcmp(name, "stats.allocated") == 0 ||
std::strcmp(name, "stats.active") == 0 ||
std::strcmp(name, "stats.metadata") == 0 ||
std::strcmp(name, "stats.resident") == 0 ||
std::strcmp(name, "stats.mapped") == 0 ||
std::strcmp(name, "stats.retained") == 0) {
uint64_t epoch;
mallctl("epoch", &epoch, &sz, &epoch, sz);
err = mallctl(name, &value, &sz, NULLPTR, 0);
} else {
err = mallctl(name, &value, &sz, NULLPTR, 0);
}

Status jemalloc_get_statp(const char* name, uint64_t& out) {
uint64_t* value;
size_t sz = sizeof(uint64_t);
int err = mallctl(name, &value, &sz, NULLPTR, 0);
out = *value;
if (err) {
return arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name);
}

return err ? arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name)
: Status::OK();
return value;
}

Status jemalloc_peak_reset() {
Expand Down
60 changes: 29 additions & 31 deletions cpp/src/arrow/memory_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ TEST(Jemalloc, SetDirtyPageDecayMillis) {
TEST(Jemalloc, GetAllocationStats) {
#ifdef ARROW_JEMALLOC
uint8_t* data;
size_t allocated, active, metadata, resident, mapped, retained, allocated0, active0,
uint64_t allocated, active, metadata, resident, mapped, retained, allocated0, active0,
metadata0, resident0, mapped0, retained0;
uint64_t thread_allocatedp, thread_deallocatedp, thread_allocatedp0,
thread_deallocatedp0, thread_allocated, thread_deallocated, thread_peak_read,
Expand All @@ -186,17 +186,17 @@ TEST(Jemalloc, GetAllocationStats) {
ASSERT_EQ("jemalloc", pool->backend_name());

// Record stats before allocating
ASSERT_OK(jemalloc_get_stat("stats.allocated", allocated0));
ASSERT_OK(jemalloc_get_stat("stats.active", active0));
ASSERT_OK(jemalloc_get_stat("stats.metadata", metadata0));
ASSERT_OK(jemalloc_get_stat("stats.resident", resident0));
ASSERT_OK(jemalloc_get_stat("stats.mapped", mapped0));
ASSERT_OK(jemalloc_get_stat("stats.retained", retained0));
ASSERT_OK(jemalloc_get_stat("thread.allocated", thread_allocated0));
ASSERT_OK(jemalloc_get_stat("thread.deallocated", thread_deallocated0));
ASSERT_OK(jemalloc_get_stat("thread.peak.read", thread_peak_read0));
ASSERT_OK(jemalloc_get_statp("thread.allocatedp", thread_allocatedp0));
ASSERT_OK(jemalloc_get_statp("thread.deallocatedp", thread_deallocatedp0));
ASSERT_OK_AND_ASSIGN(allocated0, jemalloc_get_stat("stats.allocated"));
ASSERT_OK_AND_ASSIGN(active0, jemalloc_get_stat("stats.active"));
ASSERT_OK_AND_ASSIGN(metadata0, jemalloc_get_stat("stats.metadata"));
ASSERT_OK_AND_ASSIGN(resident0, jemalloc_get_stat("stats.resident"));
ASSERT_OK_AND_ASSIGN(mapped0, jemalloc_get_stat("stats.mapped"));
ASSERT_OK_AND_ASSIGN(retained0, jemalloc_get_stat("stats.retained"));
ASSERT_OK_AND_ASSIGN(thread_allocated0, jemalloc_get_stat("thread.allocated"));
ASSERT_OK_AND_ASSIGN(thread_deallocated0, jemalloc_get_stat("thread.deallocated"));
ASSERT_OK_AND_ASSIGN(thread_peak_read0, jemalloc_get_stat("thread.peak.read"));
ASSERT_OK_AND_ASSIGN(thread_allocatedp0, jemalloc_get_stat("thread.allocatedp"));
ASSERT_OK_AND_ASSIGN(thread_deallocatedp0, jemalloc_get_stat("thread.deallocatedp"));

// Allocate memory
ASSERT_OK(jemalloc_set_decay_ms(10000));
Expand All @@ -206,17 +206,17 @@ TEST(Jemalloc, GetAllocationStats) {
ASSERT_EQ(1214, pool->bytes_allocated());

// Record stats after allocating
ASSERT_OK(jemalloc_get_stat("stats.allocated", allocated));
ASSERT_OK(jemalloc_get_stat("stats.active", active));
ASSERT_OK(jemalloc_get_stat("stats.metadata", metadata));
ASSERT_OK(jemalloc_get_stat("stats.resident", resident));
ASSERT_OK(jemalloc_get_stat("stats.mapped", mapped));
ASSERT_OK(jemalloc_get_stat("stats.retained", retained));
ASSERT_OK(jemalloc_get_stat("thread.allocated", thread_allocated));
ASSERT_OK(jemalloc_get_stat("thread.deallocated", thread_deallocated));
ASSERT_OK(jemalloc_get_stat("thread.peak.read", thread_peak_read));
ASSERT_OK(jemalloc_get_statp("thread.allocatedp", thread_allocatedp));
ASSERT_OK(jemalloc_get_statp("thread.deallocatedp", thread_deallocatedp));
ASSERT_OK_AND_ASSIGN(allocated, jemalloc_get_stat("stats.allocated"));
ASSERT_OK_AND_ASSIGN(active, jemalloc_get_stat("stats.active"));
ASSERT_OK_AND_ASSIGN(metadata, jemalloc_get_stat("stats.metadata"));
ASSERT_OK_AND_ASSIGN(resident, jemalloc_get_stat("stats.resident"));
ASSERT_OK_AND_ASSIGN(mapped, jemalloc_get_stat("stats.mapped"));
ASSERT_OK_AND_ASSIGN(retained, jemalloc_get_stat("stats.retained"));
ASSERT_OK_AND_ASSIGN(thread_allocated, jemalloc_get_stat("thread.allocated"));
ASSERT_OK_AND_ASSIGN(thread_deallocated, jemalloc_get_stat("thread.deallocated"));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));
ASSERT_OK_AND_ASSIGN(thread_allocatedp, jemalloc_get_stat("thread.allocatedp"));
ASSERT_OK_AND_ASSIGN(thread_deallocatedp, jemalloc_get_stat("thread.deallocatedp"));

// Reading stats via value return is equivalent to pointer passing
ASSERT_EQ(thread_allocated, thread_allocatedp);
Expand Down Expand Up @@ -246,22 +246,20 @@ TEST(Jemalloc, GetAllocationStats) {

// Resetting thread peak read metric
ASSERT_OK(pool->Allocate(12560, &data));
ASSERT_OK(jemalloc_get_stat("thread.peak.read", thread_peak_read));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));
ASSERT_EQ(15616, thread_peak_read);
ASSERT_OK(jemalloc_peak_reset());
ASSERT_OK(pool->Allocate(1256, &data));
ASSERT_OK(jemalloc_get_stat("thread.peak.read", thread_peak_read));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));
ASSERT_EQ(1280, thread_peak_read);

// Print statistics to stdout
ASSERT_OK(jemalloc_stats_print("ax"));
#else
size_t allocated;
uint64_t thread_peak_read, stats_allocated, stats_allocatedp;
ASSERT_RAISES(Invalid, jemalloc_get_stat("thread.peak.read", &thread_peak_read));
ASSERT_RAISES(Invalid, jemalloc_get_stat("stats.allocated", &allocated));
ASSERT_RAISES(Invalid, jemalloc_get_stat("stats.allocated", &stats_allocated));
ASSERT_RAISES(Invalid, jemalloc_get_statp("stats.allocatedp", &stats_allocatedp));
ASSERT_RAISES(Invalid, jemalloc_get_stat("thread.peak.read"));
ASSERT_RAISES(Invalid, jemalloc_get_stat("stats.allocated"));
ASSERT_RAISES(Invalid, jemalloc_get_stat("stats.allocated"));
ASSERT_RAISES(Invalid, jemalloc_get_stat("stats.allocatedp"));
ASSERT_RAISES(Invalid, jemalloc_peak_reset());
ASSERT_RAISES(Invalid, jemalloc_stats_print("ax"));
#endif
Expand Down

0 comments on commit d1b68ed

Please sign in to comment.