diff --git a/statshouse.hpp b/statshouse.hpp index 6a9dd3f..7fab93e 100644 --- a/statshouse.hpp +++ b/statshouse.hpp @@ -82,6 +82,12 @@ class string_view { #endif +enum { TL_MAX_TINY_STRING_LEN = 253 }; + +inline string_view shorten(string_view x) { + return string_view{x.data(), std::min(x.size(), static_cast(TL_MAX_TINY_STRING_LEN))}; +} + namespace wyhash { // https://github.com/wangyi-fudan/wyhash @@ -409,7 +415,6 @@ class TransportUDPBase { TL_INT_SIZE = 4, TL_LONG_SIZE = 8, TL_DOUBLE_SIZE = 8, - TL_MAX_TINY_STRING_LEN = 253, TL_BIG_STRING_LEN = 0xffffff, TL_BIG_STRING_MARKER = 0xfe, TL_STATSHOUSE_METRICS_BATCH_TAG = 0x56580239, @@ -474,7 +479,6 @@ class TransportUDPBase { std::memcpy(&v64, &v, sizeof(v)); return v64; } - static string_view shorten(string_view x) { return string_view{x.data(), std::min(x.size(), TL_MAX_TINY_STRING_LEN)}; } // Trim is VERY costly, 2x slowdown even when no actual trim performed. We do not want to punish good guys, and for bad guys // we have 'err_header_too_big' usage meta metric // static char * pack_string_trim(char * begin, const char * end, const char *str, size_t len) { @@ -1366,9 +1370,27 @@ class Registry { std::function logger; bool incremental_flush_disabled{false}; bool sampling_disabled{false}; + std::unordered_map metric_sample_prob; // use add_metric_sample_prob + + void add_metric_sample_prob(string_view metric_name, int keep_percent) { + const string_view sn = shorten(metric_name); + if (sn.size() == 0) { + return; + } + const uint64_t h = wyhash::wyhash(sn.data(), sn.size(), 0, wyhash::_wyp); + int p = keep_percent; + if (p < 0) { + p = 0; + } + if (p > 100) { + p = 100; + } + metric_sample_prob[h] = p; + } }; explicit Registry(const options &o) - : max_bucket_size{o.max_bucket_size} + : metric_sample_prob(o.metric_sample_prob) + , max_bucket_size{o.max_bucket_size} , time_external{0} , metrics_logging_enabled{false} , sampling_disabled{o.sampling_disabled} @@ -1594,6 +1616,11 @@ class Registry { } explicit operator bool() const { return ptr.get() != nullptr; } bool write_count(double count, uint32_t timestamp = 0) const { + const double m = registry->metric_sample_prob_mult(ptr->key); + if (m <= 0) { + return true; + } + count *= m; registry->log_count(ptr->key, count, timestamp); if (timestamp) { std::lock_guard transport_lock{registry->transport_mu}; @@ -1613,6 +1640,14 @@ class Registry { return write_values(&value, 1, 1, timestamp); } bool write_values(const double *values, size_t values_count, double count = 0, uint32_t timestamp = 0) const { + const double m = registry->metric_sample_prob_mult(ptr->key); + if (m <= 0) { + return true; + } + if (count == 0) { + count = 1; + } + count *= m; registry->log_values(ptr->key, values, values_count, count, timestamp); auto sampling = count != 0 && count != values_count; if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) { @@ -1633,6 +1668,11 @@ class Registry { return write_unique(&value, 1, 1, timestamp); } bool write_unique(const uint64_t *values, size_t values_count, double count, uint32_t timestamp = 0) const { + const double m = registry->metric_sample_prob_mult(ptr->key); + if (m <= 0) { + return true; + } + count *= m; registry->log_unique(ptr->key, values, values_count, count, timestamp); auto sampling = count != 0 && count != values_count; if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) { @@ -1699,6 +1739,11 @@ class Registry { return WaterlevelMetricRef{registry, key}; } bool write_count(double count, uint32_t timestamp = 0) const { + const double m = registry->metric_sample_prob_mult(key); + if (m <= 0) { + return true; + } + count *= m; registry->log_count(key, count, timestamp); if (timestamp) { std::lock_guard transport_lock{registry->transport_mu}; @@ -1711,6 +1756,14 @@ class Registry { return write_values(&value, 1, 1, timestamp); } bool write_values(const double *values, size_t values_count, double count = 0, uint32_t timestamp = 0) const { + const double m = registry->metric_sample_prob_mult(key); + if (m <= 0) { + return true; + } + if (count == 0) { + count = 1; + } + count *= m; registry->log_values(key, values, values_count, count, timestamp); auto sampling = count != 0 && count != values_count; if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) { @@ -1724,6 +1777,11 @@ class Registry { return write_unique(&value, 1, 1, timestamp); } bool write_unique(const uint64_t *values, size_t values_count, double count, uint32_t timestamp = 0) const { + const double m = registry->metric_sample_prob_mult(key); + if (m <= 0) { + return true; + } + count *= m; registry->log_unique(key, values, values_count, count, timestamp); auto sampling = count != 0 && count != values_count; if (sampling || timestamp || values_count >= registry->max_bucket_size.load(std::memory_order_relaxed)) { @@ -2006,6 +2064,27 @@ class Registry { uint32_t t{time_external}; return t != 0 ? t : static_cast(std::time(nullptr)); } + double metric_sample_prob_mult(const TransportUDPBase::MetricBuilder &key) const { + if (metric_sample_prob.empty()) { + return 1; + } + const string_view name = key.metric_name(); + const uint64_t h = wyhash::wyhash(name.data(), name.size(), 0, wyhash::_wyp); + const auto it = metric_sample_prob.find(h); + if (it == metric_sample_prob.end()) { + return 1; + } + int p = it->second; + thread_local uint64_t seed = 0; + if (!seed) { + seed = static_cast(std::random_device{}()) | 1ull; + } + const int roll = static_cast(wyhash::wy2u0k(wyhash::wyrand(&seed), 100)); + if (roll >= p) { + return 0; + } + return 100.0 / static_cast(p); + } inline void log_count(const TransportUDPBase::MetricBuilder &key, double count, uint32_t timestamp) const { if (!metrics_logging_enabled.load(std::memory_order_relaxed)) return; log_message_writer w{}; @@ -2146,6 +2225,7 @@ class Registry { std::atomic freelist_size; std::atomic bucket_count; }; + std::unordered_map metric_sample_prob; std::mutex mu; mutable std::mutex transport_mu; std::atomic max_bucket_size;