Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 83 additions & 3 deletions statshouse.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ class string_view {

#endif

enum { TL_MAX_TINY_STRING_LEN = 253 };

inline string_view shorten(string_view x) {
return string_view{x.data(), std::min(x.size(), static_cast<size_t>(TL_MAX_TINY_STRING_LEN))};
}

namespace wyhash {
// https://github.com/wangyi-fudan/wyhash

Expand Down Expand Up @@ -409,7 +415,6 @@ class TransportUDPBase {
TL_INT_SIZE = 4,
TL_LONG_SIZE = 8,
TL_DOUBLE_SIZE = 8,
TL_MAX_TINY_STRING_LEN = 253,
TL_BIG_STRING_LEN = 0xffffff,
TL_BIG_STRING_MARKER = 0xfe,
TL_STATSHOUSE_METRICS_BATCH_TAG = 0x56580239,
Expand Down Expand Up @@ -474,7 +479,6 @@ class TransportUDPBase {
std::memcpy(&v64, &v, sizeof(v));
return v64;
}
static string_view shorten(string_view x) { return string_view{x.data(), std::min<size_t>(x.size(), TL_MAX_TINY_STRING_LEN)}; }
// Trim is VERY costly, 2x slowdown even when no actual trim performed. We do not want to punish good guys, and for bad guys
// we have 'err_header_too_big' usage meta metric
// static char * pack_string_trim(char * begin, const char * end, const char *str, size_t len) {
Expand Down Expand Up @@ -1366,9 +1370,27 @@ class Registry {
std::function<int (const char *)> logger;
bool incremental_flush_disabled{false};
bool sampling_disabled{false};
std::unordered_map<uint64_t, int> metric_sample_prob; // use add_metric_sample_prob

void add_metric_sample_prob(string_view metric_name, int keep_percent) {
const string_view sn = shorten(metric_name);
if (sn.size() == 0) {
return;
}
const uint64_t h = wyhash::wyhash(sn.data(), sn.size(), 0, wyhash::_wyp);
int p = keep_percent;
if (p < 0) {
p = 0;
}
if (p > 100) {
p = 100;
}
metric_sample_prob[h] = p;
}
};
explicit Registry(const options &o)
: max_bucket_size{o.max_bucket_size}
: metric_sample_prob(o.metric_sample_prob)
, max_bucket_size{o.max_bucket_size}
, time_external{0}
, metrics_logging_enabled{false}
, sampling_disabled{o.sampling_disabled}
Expand Down Expand Up @@ -1594,6 +1616,11 @@ class Registry {
}
explicit operator bool() const { return ptr.get() != nullptr; }
bool write_count(double count, uint32_t timestamp = 0) const {
const double m = registry->metric_sample_prob_mult(ptr->key);
if (m <= 0) {
return true;
}
count *= m;
registry->log_count(ptr->key, count, timestamp);
if (timestamp) {
std::lock_guard<std::mutex> transport_lock{registry->transport_mu};
Expand All @@ -1613,6 +1640,14 @@ class Registry {
return write_values(&value, 1, 1, timestamp);
}
bool write_values(const double *values, size_t values_count, double count = 0, uint32_t timestamp = 0) const {
const double m = registry->metric_sample_prob_mult(ptr->key);
if (m <= 0) {
return true;
}
if (count == 0) {
count = 1;
}
count *= m;
registry->log_values(ptr->key, values, values_count, count, timestamp);
auto sampling = count != 0 && count != values_count;
if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) {
Expand All @@ -1633,6 +1668,11 @@ class Registry {
return write_unique(&value, 1, 1, timestamp);
}
bool write_unique(const uint64_t *values, size_t values_count, double count, uint32_t timestamp = 0) const {
const double m = registry->metric_sample_prob_mult(ptr->key);
if (m <= 0) {
return true;
}
count *= m;
registry->log_unique(ptr->key, values, values_count, count, timestamp);
auto sampling = count != 0 && count != values_count;
if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) {
Expand Down Expand Up @@ -1699,6 +1739,11 @@ class Registry {
return WaterlevelMetricRef{registry, key};
}
bool write_count(double count, uint32_t timestamp = 0) const {
const double m = registry->metric_sample_prob_mult(key);
if (m <= 0) {
return true;
}
count *= m;
registry->log_count(key, count, timestamp);
if (timestamp) {
std::lock_guard<std::mutex> transport_lock{registry->transport_mu};
Expand All @@ -1711,6 +1756,14 @@ class Registry {
return write_values(&value, 1, 1, timestamp);
}
bool write_values(const double *values, size_t values_count, double count = 0, uint32_t timestamp = 0) const {
const double m = registry->metric_sample_prob_mult(key);
if (m <= 0) {
return true;
}
if (count == 0) {
count = 1;
}
count *= m;
registry->log_values(key, values, values_count, count, timestamp);
auto sampling = count != 0 && count != values_count;
if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) {
Expand All @@ -1724,6 +1777,11 @@ class Registry {
return write_unique(&value, 1, 1, timestamp);
}
bool write_unique(const uint64_t *values, size_t values_count, double count, uint32_t timestamp = 0) const {
const double m = registry->metric_sample_prob_mult(key);
if (m <= 0) {
return true;
}
count *= m;
registry->log_unique(key, values, values_count, count, timestamp);
auto sampling = count != 0 && count != values_count;
if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) {
Expand Down Expand Up @@ -2006,6 +2064,27 @@ class Registry {
uint32_t t{time_external};
return t != 0 ? t : static_cast<uint32_t>(std::time(nullptr));
}
double metric_sample_prob_mult(const TransportUDPBase::MetricBuilder &key) const {
if (metric_sample_prob.empty()) {
return 1;
}
const string_view name = key.metric_name();
const uint64_t h = wyhash::wyhash(name.data(), name.size(), 0, wyhash::_wyp);
const auto it = metric_sample_prob.find(h);
if (it == metric_sample_prob.end()) {
return 1;
}
int p = it->second;
thread_local uint64_t seed = 0;
if (!seed) {
seed = static_cast<uint64_t>(std::random_device{}()) | 1ull;
}
const int roll = static_cast<int>(wyhash::wy2u0k(wyhash::wyrand(&seed), 100));
if (roll >= p) {
return 0;
}
return 100.0 / static_cast<double>(p);
}
inline void log_count(const TransportUDPBase::MetricBuilder &key, double count, uint32_t timestamp) const {
if (!metrics_logging_enabled.load(std::memory_order_relaxed)) return;
log_message_writer w{};
Expand Down Expand Up @@ -2146,6 +2225,7 @@ class Registry {
std::atomic<size_t> freelist_size;
std::atomic<size_t> bucket_count;
};
std::unordered_map<uint64_t, int> metric_sample_prob;
std::mutex mu;
mutable std::mutex transport_mu;
std::atomic<size_t> max_bucket_size;
Expand Down