Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-16981: [C++] Expose jemalloc statistics for logging #13516

Merged
merged 13 commits into from
Sep 21, 2022
23 changes: 22 additions & 1 deletion cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,29 @@ MemoryPool* default_memory_pool() {

#ifndef ARROW_JEMALLOC
Status jemalloc_set_decay_ms(int ms) {
return Status::Invalid("jemalloc support is not built");
return Status::NotImplemented("jemalloc support is not built");
}

Result<int64_t> jemalloc_get_stat(const char* name) {
return Status::NotImplemented("jemalloc support is not built");
}

Status jemalloc_peak_reset() {
return Status::NotImplemented("jemalloc support is not built");
}

Status jemalloc_stats_print(const char* opts) {
return Status::NotImplemented("jemalloc support is not built");
}

Status jemalloc_stats_print(std::function<void(const char*)> write_cb, const char* opts) {
return Status::NotImplemented("jemalloc support is not built");
}

Result<std::string> jemalloc_stats_string(const char* opts) {
return Status::NotImplemented("jemalloc support is not built");
}

#endif

///////////////////////////////////////////////////////////////////////
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

#include <atomic>
#include <cstdint>
#include <functional>
rok marked this conversation as resolved.
Show resolved Hide resolved
#include <memory>
#include <string>

#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/visibility.h"
Expand Down Expand Up @@ -175,6 +177,37 @@ ARROW_EXPORT Status jemalloc_memory_pool(MemoryPool** out);
ARROW_EXPORT
Status jemalloc_set_decay_ms(int ms);

/// \brief Get basic statistics from jemalloc's mallctl.
/// See the MALLCTL NAMESPACE section in jemalloc project documentation for
/// available stats.
ARROW_EXPORT
Result<int64_t> jemalloc_get_stat(const char* name);

/// \brief Reset the counter for peak bytes allocated in the calling thread to zero.
/// This affects subsequent calls to thread.peak.read, but not the values returned by
/// thread.allocated or thread.deallocated.
ARROW_EXPORT
Status jemalloc_peak_reset();

/// \brief Print summary statistics in human-readable form to stderr.
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT
Status jemalloc_stats_print(const char* opts = "");

/// \brief Print summary statistics in human-readable form using a callback
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT
Status jemalloc_stats_print(std::function<void(const char*)> write_cb,
const char* opts = "");

/// \brief Get summary statistics in human-readable form.
/// See malloc_stats_print documentation in jemalloc project documentation for
/// available opt flags.
ARROW_EXPORT
Result<std::string> jemalloc_stats_string(const char* opts = "");

/// \brief Return a process-wide memory pool based on mimalloc.
///
/// May return NotImplemented if mimalloc is not available.
Expand Down
56 changes: 56 additions & 0 deletions cpp/src/arrow/memory_pool_jemalloc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include "arrow/memory_pool_internal.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h" // IWYU pragma: keep

// We can't put the jemalloc memory pool implementation into
Expand Down Expand Up @@ -153,4 +154,59 @@ Status jemalloc_set_decay_ms(int ms) {

#undef RETURN_IF_JEMALLOC_ERROR

#ifdef ARROW_JEMALLOC
rok marked this conversation as resolved.
Show resolved Hide resolved
Result<int64_t> jemalloc_get_stat(const char* name) {
size_t sz = sizeof(uint64_t);
int err;
uint64_t value;

// Update the statistics cached by mallctl.
if (std::strcmp(name, "stats.allocated") == 0 ||
std::strcmp(name, "stats.active") == 0 ||
std::strcmp(name, "stats.metadata") == 0 ||
std::strcmp(name, "stats.resident") == 0 ||
std::strcmp(name, "stats.mapped") == 0 ||
std::strcmp(name, "stats.retained") == 0) {
uint64_t epoch;
mallctl("epoch", &epoch, &sz, &epoch, sz);
rok marked this conversation as resolved.
Show resolved Hide resolved
}

err = mallctl(name, &value, &sz, nullptr, 0);

if (err) {
return arrow::internal::IOErrorFromErrno(err, "Failed retrieving ", &name);
}

return value;
}

Status jemalloc_peak_reset() {
int err = mallctl("thread.peak.reset", nullptr, nullptr, nullptr, 0);
return err ? arrow::internal::IOErrorFromErrno(err, "Failed resetting thread.peak.")
: Status::OK();
}

Result<std::string> jemalloc_stats_string(const char* opts) {
std::string stats;
auto write_cb = [&stats](const char* str) { stats.append(str); };
ARROW_UNUSED(jemalloc_stats_print(write_cb, opts));
return stats;
}

Status jemalloc_stats_print(const char* opts) {
malloc_stats_print(nullptr, nullptr, opts);
return Status::OK();
}

Status jemalloc_stats_print(std::function<void(const char*)> write_cb, const char* opts) {
auto cb_wrapper = [](void* opaque, const char* str) {
(*static_cast<std::function<void(const char*)>*>(opaque))(str);
};
if (write_cb) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this required? I think the caller should make sure that write_cb is initialized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not but not sure. I removed it for now. @benibus do you think this would be needed?

malloc_stats_print(cb_wrapper, &write_cb, opts);
}
return Status::OK();
}
#endif

} // namespace arrow
99 changes: 98 additions & 1 deletion cpp/src/arrow/memory_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/config.h"
#include "arrow/util/logging.h"

namespace arrow {

Expand Down Expand Up @@ -168,7 +169,103 @@ TEST(Jemalloc, SetDirtyPageDecayMillis) {
#ifdef ARROW_JEMALLOC
ASSERT_OK(jemalloc_set_decay_ms(0));
#else
ASSERT_RAISES(Invalid, jemalloc_set_decay_ms(0));
ASSERT_RAISES(NotImplemented, jemalloc_set_decay_ms(0));
#endif
}

TEST(Jemalloc, GetAllocationStats) {
#ifdef ARROW_JEMALLOC
uint8_t* data;
int64_t allocated, active, metadata, resident, mapped, retained, allocated0, active0,
metadata0, resident0, mapped0, retained0;
int64_t thread_allocated, thread_deallocated, thread_peak_read, thread_allocated0,
thread_deallocated0, thread_peak_read0;
auto pool = default_memory_pool();
rok marked this conversation as resolved.
Show resolved Hide resolved
ABORT_NOT_OK(jemalloc_memory_pool(&pool));
ASSERT_EQ("jemalloc", pool->backend_name());

// Record stats before allocating
ASSERT_OK_AND_ASSIGN(allocated0, jemalloc_get_stat("stats.allocated"));
ASSERT_OK_AND_ASSIGN(active0, jemalloc_get_stat("stats.active"));
ASSERT_OK_AND_ASSIGN(metadata0, jemalloc_get_stat("stats.metadata"));
ASSERT_OK_AND_ASSIGN(resident0, jemalloc_get_stat("stats.resident"));
ASSERT_OK_AND_ASSIGN(mapped0, jemalloc_get_stat("stats.mapped"));
ASSERT_OK_AND_ASSIGN(retained0, jemalloc_get_stat("stats.retained"));
ASSERT_OK_AND_ASSIGN(thread_allocated0, jemalloc_get_stat("thread.allocated"));
ASSERT_OK_AND_ASSIGN(thread_deallocated0, jemalloc_get_stat("thread.deallocated"));
ASSERT_OK_AND_ASSIGN(thread_peak_read0, jemalloc_get_stat("thread.peak.read"));

// Allocate memory
ASSERT_OK(jemalloc_set_decay_ms(10000));
rok marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_OK(pool->Allocate(1025, &data));
ASSERT_EQ(pool->bytes_allocated(), 1025);
ASSERT_OK(pool->Reallocate(1025, 1023, &data));
ASSERT_EQ(pool->bytes_allocated(), 1023);

// Record stats after allocating
ASSERT_OK_AND_ASSIGN(allocated, jemalloc_get_stat("stats.allocated"));
ASSERT_OK_AND_ASSIGN(active, jemalloc_get_stat("stats.active"));
ASSERT_OK_AND_ASSIGN(metadata, jemalloc_get_stat("stats.metadata"));
ASSERT_OK_AND_ASSIGN(resident, jemalloc_get_stat("stats.resident"));
ASSERT_OK_AND_ASSIGN(mapped, jemalloc_get_stat("stats.mapped"));
ASSERT_OK_AND_ASSIGN(retained, jemalloc_get_stat("stats.retained"));
ASSERT_OK_AND_ASSIGN(thread_allocated, jemalloc_get_stat("thread.allocated"));
ASSERT_OK_AND_ASSIGN(thread_deallocated, jemalloc_get_stat("thread.deallocated"));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));

// Check allocated stats pre-allocation
ASSERT_NEAR(allocated0, 120000, 100000);
ASSERT_NEAR(active0, 75000, 70000);
ASSERT_NEAR(metadata0, 3000000, 1000000);
ASSERT_NEAR(resident0, 3000000, 1000000);
ASSERT_NEAR(mapped0, 6500000, 1000000);
ASSERT_NEAR(retained0, 1500000, 2000000);

// Check allocated stats change due to allocation
ASSERT_NEAR(allocated - allocated0, 70000, 50000);
ASSERT_NEAR(active - active0, 100000, 90000);
ASSERT_NEAR(metadata - metadata0, 500, 460);
ASSERT_NEAR(resident - resident0, 120000, 110000);
ASSERT_NEAR(mapped - mapped0, 100000, 90000);
ASSERT_NEAR(retained - retained0, 0, 40000);

ASSERT_NEAR(thread_peak_read - thread_peak_read0, 1024, 700);
ASSERT_NEAR(thread_allocated - thread_allocated0, 2500, 500);
ASSERT_EQ(thread_deallocated - thread_deallocated0, 1280);

// Resetting thread peak read metric
ASSERT_OK(pool->Allocate(12560, &data));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));
ASSERT_NEAR(thread_peak_read, 15616, 1000);
ASSERT_OK(jemalloc_peak_reset());
ASSERT_OK(pool->Allocate(1256, &data));
ASSERT_OK_AND_ASSIGN(thread_peak_read, jemalloc_get_stat("thread.peak.read"));
ASSERT_NEAR(thread_peak_read, 1280, 100);

// Print statistics to stderr
ASSERT_OK(jemalloc_stats_print("J"));

// Read statistics into std::string
ASSERT_OK_AND_ASSIGN(std::string stats, jemalloc_stats_string("Jax"));

// Read statistics into std::string with a lambda
std::string stats2;
auto write_cb = [&stats2](const char* str) { stats2.append(str); };
ASSERT_OK(jemalloc_stats_print(write_cb, "Jax"));

ASSERT_EQ(stats.rfind("{\"jemalloc\":{\"version\"", 0), 0);
ASSERT_EQ(stats2.rfind("{\"jemalloc\":{\"version\"", 0), 0);
ASSERT_EQ(stats.substr(0, 100), stats2.substr(0, 100));
#else
std::string stats;
auto write_cb = [&stats](const char* str) { stats.append(str); };
ASSERT_RAISES(NotImplemented, jemalloc_get_stat("thread.peak.read"));
ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocated"));
ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocated"));
ASSERT_RAISES(NotImplemented, jemalloc_get_stat("stats.allocatedp"));
ASSERT_RAISES(NotImplemented, jemalloc_peak_reset());
ASSERT_RAISES(NotImplemented, jemalloc_stats_print(write_cb, "Jax"));
ASSERT_RAISES(NotImplemented, jemalloc_stats_print("ax"));
#endif
}

Expand Down