Skip to content

Commit

Permalink
Refactoring and review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Sep 8, 2022
1 parent 3ccabc3 commit 9adac01
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 71 deletions.
16 changes: 10 additions & 6 deletions cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -670,14 +670,18 @@ Status jemalloc_get_stat(const char* name, size_t* out) {
return Status::Invalid("jemalloc support is not built");
};

Status jemalloc_mallctl(const char* name, void* oldp, size_t* oldlenp, void* newp,
size_t newlen) {
return Status::NotImplemented("This Arrow build does not enable jemalloc");
Status jemalloc_get_stat(const char* name, uint64_t* out) {
return Status::Invalid("jemalloc support is not built");
}

Status jemalloc_stats_print(void (*write_cb)(void*, const char*), void* cbopaque,
const char* opts) {
return Status::NotImplemented("This Arrow build does not enable jemalloc");
Status jemalloc_get_statp(const char* name, uint64_t* out) {
return Status::Invalid("jemalloc support is not built");
}

Status jemalloc_peak_reset() { return Status::Invalid("jemalloc support is not built"); }

Status jemalloc_stats_print(const char* opts) {
return Status::Invalid("jemalloc support is not built");
}
#endif

Expand Down
54 changes: 16 additions & 38 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <string>

#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/visibility.h"
Expand Down Expand Up @@ -176,46 +177,23 @@ ARROW_EXPORT
Status jemalloc_set_decay_ms(int ms);

/// \brief Get basic allocation statistics from jemalloc
/// See the MALLCTL NAMESPACE section in jemalloc project documentation for
/// available stats.
Status jemalloc_get_stat(const char* name, size_t* out);
Status jemalloc_get_stat(const char* name, uint64_t* out);

/// \brief The mallctl() function provides a general interface for introspecting the
/// memory allocator, as well as setting modifiable parameters and triggering actions.
/// The period-separated name argument specifies a location in a tree-structured
/// namespace; see the MALLCTL NAMESPACE section in jemalloc project documentation for
/// more information on the tree contents. To read a value, pass a pointer via oldp to
/// adequate space to contain the value, and a pointer to its length via oldlenp;
/// otherwise pass NULL and NULL. Similarly, to write a value, pass a pointer to the
/// value via newp, and its length via newlen; otherwise pass NULL and 0.
///
ARROW_EXPORT Status jemalloc_mallctl(const char* name, void* oldp, size_t* oldlenp,
void* newp, size_t newlen);

/// \brief The malloc_stats_print() function writes summary statistics via the write_cb
/// callback function pointer and cbopaque data passed to write_cb, or malloc_message()
/// if write_cb is NULL. The statistics are presented in human-readable form unless “J”
/// is specified as a character within the opts string, in which case the statistics are
/// presented in JSON format. This function can be called repeatedly. General information
/// that never changes during execution can be omitted by specifying “g” as a character
/// within the opts string. Note that malloc_stats_print() uses the mallctl*() functions
/// internally, so inconsistent statistics can be reported if multiple threads use these
/// functions simultaneously. If --enable-stats is specified during configuration, “m”,
/// “d”, and “a” can be specified to omit merged arena, destroyed merged arena, and per
/// arena statistics, respectively; “b” and “l” can be specified to omit per size class
/// statistics for bins and large objects, respectively; “x” can be specified to omit all
/// mutex statistics; “e” can be used to omit extent statistics. Unrecognized characters
/// are silently ignored. Note that thread caching may prevent some statistics from being
/// completely up to date, since extra locking would be required to merge counters that
/// track thread cache operations.
///
/// The malloc_usable_size() function returns the usable size of the allocation pointed to
/// by ptr. The return value may be larger than the size that was requested during
/// allocation. The malloc_usable_size() function is not a mechanism for in-place
/// realloc(); rather it is provided solely as a tool for introspection purposes.
/// Any discrepancy between the requested allocation size and the size reported by
/// malloc_usable_size() should not be depended on, since such behavior is entirely
/// implementation-dependent.
ARROW_EXPORT Status jemalloc_stats_print(void (*write_cb)(void*, const char*),
void* cbopaque, const char* opts);
/// \brief Get basic allocation statistics from jemalloc by passing a pointer
/// This is useful for avoiding the overhead of repeated copy calls.
Status jemalloc_get_statp(const char* name, uint64_t* out);

/// \brief Resets the counter for bytes allocated in the calling thread to zero.
/// This affects subsequent calls to thread.peak.read, but not the values returned by
/// thread.allocated or thread.deallocated.
Status jemalloc_peak_reset();

/// \brief Prints summary statistics in human-readable form. See malloc_stats_print
/// documentation in jemalloc project documentation for available opt flags.
ARROW_EXPORT Status jemalloc_stats_print(const char* opts = "");

/// \brief Return a process-wide memory pool based on mimalloc.
///
Expand Down
45 changes: 35 additions & 10 deletions cpp/src/arrow/memory_pool_jemalloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,49 @@ Status jemalloc_set_decay_ms(int ms) {

#undef RETURN_IF_JEMALLOC_ERROR

#ifdef ARROW_JEMALLOC
Status jemalloc_get_stat(const char* name, size_t* out) {
size_t sz = sizeof(size_t);
int err = mallctl(name, out, &sz, NULL, 0);
uint64_t epoch;
size_t sz = sizeof(uint64_t);
mallctl("epoch", &epoch, &sz, &epoch, sz);

size_t value;
sz = sizeof(size_t);
int err = mallctl(name, &value, &sz, NULLPTR, 0);
*out = std::move(value);

return err ? arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name)
: Status::OK();
};

#ifdef ARROW_JEMALLOC
Status jemalloc_mallctl(const char* name, void* oldp, size_t* oldlenp, void* newp,
size_t newlen) {
int err = mallctl(name, oldp, oldlenp, newp, newlen);
return err ? arrow::internal::IOErrorFromErrno(err, "Memory allocation error.")
Status jemalloc_get_stat(const char* name, uint64_t* out) {
uint64_t value;
size_t sz = sizeof(uint64_t);
int err = mallctl(name, &value, &sz, NULLPTR, 0);
*out = value;

return err ? arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name)
: Status::OK();
};

Status jemalloc_get_statp(const char* name, uint64_t* out) {
uint64_t* value;
size_t sz = sizeof(uint64_t);
int err = mallctl(name, &value, &sz, NULLPTR, 0);
*out = *value;

return err ? arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name)
: Status::OK();
};

Status jemalloc_peak_reset() {
int err = mallctl("thread.peak.reset", NULLPTR, NULLPTR, NULLPTR, 0);
return err ? arrow::internal::IOErrorFromErrno(err, "Failed resetting thread.peak.")
: Status::OK();
}

Status jemalloc_stats_print(void (*write_cb)(void*, const char*), void* cbopaque,
const char* opts) {
malloc_stats_print(write_cb, cbopaque, opts);
Status jemalloc_stats_print(const char* opts) {
malloc_stats_print(NULLPTR, NULLPTR, opts);
return Status::OK();
}
#endif
Expand Down
100 changes: 83 additions & 17 deletions cpp/src/arrow/memory_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/config.h"
#include "arrow/util/logging.h"

namespace arrow {

Expand Down Expand Up @@ -173,31 +174,96 @@ TEST(Jemalloc, SetDirtyPageDecayMillis) {
}

TEST(Jemalloc, GetAllocationStats) {
size_t allocated, active, metadata, resident, mapped = 0;
#ifdef ARROW_JEMALLOC
auto pool = MemoryPool::CreateDefault();
uint8_t* data;

ASSERT_OK(pool->Allocate(42, &data));
size_t allocated, active, metadata, resident, mapped, retained, allocated0, active0,
metadata0, resident0, mapped0, retained0;
uint64_t thread_allocatedp, thread_deallocatedp, thread_allocatedp0,
thread_deallocatedp0, thread_allocated, thread_deallocated, thread_peak_read,
thread_allocated0, thread_deallocated0, thread_peak_read0;
auto pool = default_memory_pool();
ABORT_NOT_OK(jemalloc_memory_pool(&pool));
ASSERT_EQ("jemalloc", pool->backend_name());

// Record stats before allocating
ASSERT_OK(jemalloc_get_stat("stats.allocated", &allocated0));
ASSERT_OK(jemalloc_get_stat("stats.active", &active0));
ASSERT_OK(jemalloc_get_stat("stats.metadata", &metadata0));
ASSERT_OK(jemalloc_get_stat("stats.resident", &resident0));
ASSERT_OK(jemalloc_get_stat("stats.mapped", &mapped0));
ASSERT_OK(jemalloc_get_stat("stats.retained", &retained0));
ASSERT_OK(jemalloc_get_stat("thread.allocated", &thread_allocated0));
ASSERT_OK(jemalloc_get_stat("thread.deallocated", &thread_deallocated0));
ASSERT_OK(jemalloc_get_stat("thread.peak.read", &thread_peak_read0));
ASSERT_OK(jemalloc_get_statp("thread.allocatedp", &thread_allocatedp0));
ASSERT_OK(jemalloc_get_statp("thread.deallocatedp", &thread_deallocatedp0));

// Allocate memory
ASSERT_OK(jemalloc_set_decay_ms(10000));
ASSERT_OK(pool->Allocate(1256, &data));
ASSERT_EQ(1256, pool->bytes_allocated());
ASSERT_OK(pool->Reallocate(1256, 1214, &data));
ASSERT_EQ(1214, pool->bytes_allocated());

// Record stats after allocating
ASSERT_OK(jemalloc_get_stat("stats.allocated", &allocated));
ASSERT_OK(jemalloc_get_stat("stats.active", &active));
ASSERT_OK(jemalloc_get_stat("stats.metadata", &metadata));
ASSERT_OK(jemalloc_get_stat("stats.resident", &resident));
ASSERT_OK(jemalloc_get_stat("stats.mapped", &mapped));

// TODO
ASSERT_EQ(71424, allocated);
ASSERT_EQ(131072, active);
ASSERT_EQ(2814368, metadata);
ASSERT_EQ(2899968, resident);
ASSERT_EQ(6422528, mapped);

ASSERT_OK(jemalloc_get_stat("stats.retained", &retained));
ASSERT_OK(jemalloc_get_stat("thread.allocated", &thread_allocated));
ASSERT_OK(jemalloc_get_stat("thread.deallocated", &thread_deallocated));
ASSERT_OK(jemalloc_get_stat("thread.peak.read", &thread_peak_read));
ASSERT_OK(jemalloc_get_statp("thread.allocatedp", &thread_allocatedp));
ASSERT_OK(jemalloc_get_statp("thread.deallocatedp", &thread_deallocatedp));

// Reading stats via value return is equivalent to pointer passing
ASSERT_EQ(thread_allocated, thread_allocatedp);
ASSERT_EQ(thread_deallocated, thread_deallocatedp);
ASSERT_EQ(thread_allocated0, thread_allocatedp0);
ASSERT_EQ(thread_deallocated0, thread_deallocatedp0);

// Check allocated stats pre-allocation
ASSERT_EQ(119808, allocated0);
ASSERT_EQ(229376, active0);
ASSERT_EQ(2815136, metadata0);
ASSERT_EQ(3014656, resident0);
ASSERT_EQ(6520832, mapped0);
ASSERT_EQ(0, retained0);

// Check allocated stats change due to allocation
ASSERT_EQ(81920, allocated - allocated0);
ASSERT_EQ(81920, active - active0);
ASSERT_EQ(384, metadata - metadata0);
ASSERT_EQ(81920, resident - resident0);
ASSERT_EQ(81920, mapped - mapped0);
ASSERT_EQ(0, retained - retained0);

ASSERT_EQ(1280, thread_peak_read - thread_peak_read0);
ASSERT_EQ(2560, thread_allocated - thread_allocated0);
ASSERT_EQ(1280, thread_deallocated - thread_deallocated0);

// Resetting thread peak read metric
ASSERT_OK(pool->Allocate(12560, &data));
ASSERT_OK(jemalloc_get_stat("thread.peak.read", &thread_peak_read));
ASSERT_EQ(15616, thread_peak_read);
ASSERT_OK(jemalloc_peak_reset());
ASSERT_OK(pool->Allocate(1256, &data));
ASSERT_OK(jemalloc_get_stat("thread.peak.read", &thread_peak_read));
ASSERT_EQ(1280, thread_peak_read);

// Print statistics to stdout
ASSERT_OK(jemalloc_stats_print("ax"));
#else
ASSERT_RAISES(IOError, jemalloc_get_stat("stats.allocated", allocated));
ASSERT_RAISES(IOError, jemalloc_get_stat("stats.active", active));
ASSERT_RAISES(IOError, jemalloc_get_stat("stats.metadata", metadata));
ASSERT_RAISES(IOError, jemalloc_get_stat("stats.resident", resident));
ASSERT_RAISES(IOError, jemalloc_get_stat("stats.mapped", mapped));
size_t allocated;
uint64_t thread_peak_read, stats_allocated, stats_allocatedp;
ASSERT_RAISES(Invalid, jemalloc_get_stat("thread.peak.read", &thread_peak_read));
ASSERT_RAISES(Invalid, jemalloc_get_stat("stats.allocated", &allocated));
ASSERT_RAISES(Invalid, jemalloc_get_stat("stats.allocated", &stats_allocated));
ASSERT_RAISES(Invalid, jemalloc_get_statp("stats.allocatedp", &stats_allocatedp));
ASSERT_RAISES(Invalid, jemalloc_peak_reset());
ASSERT_RAISES(Invalid, jemalloc_stats_print("ax"));
#endif
}

Expand Down

0 comments on commit 9adac01

Please sign in to comment.