diff --git a/include/cassandra.h b/include/cassandra.h index bbe9c6c1c..570acb1be 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -2934,6 +2934,28 @@ CASS_EXPORT void cass_cluster_set_monitor_reporting_interval(CassCluster* cluster, unsigned interval_secs); +/** + * Sets the amount of time after which metric histograms should be refreshed. + * Upon refresh histograms are reset to zero, effectively dropping any history to + * that point. Refresh occurs when a snapshot is requested so ths value should + * be thought of as a minimum time to refresh. + * + * If refresh is not enabled the driver will continue to accumulate histogram + * data over the life of a session; this is the default behaviour and replicates + * the behaviour of previous versions. + * + * Note that the specified interval must be > 0 otherwise CASS_ERROR_LIB_BAD_PARAMS + * will be returned. + * + * @public @memberof CassCluster + * + * @param cluster + * @param refresh_interval Minimum interval (in milliseconds) for refresh interval + */ +CASS_EXPORT CassError +cass_cluster_set_histogram_refresh_interval(CassCluster* cluster, + unsigned refresh_interval); + /*********************************************************************************** * * Session diff --git a/src/cluster_config.cpp b/src/cluster_config.cpp index 9fac5b326..408e3f82e 100644 --- a/src/cluster_config.cpp +++ b/src/cluster_config.cpp @@ -577,6 +577,14 @@ void cass_cluster_set_monitor_reporting_interval(CassCluster* cluster, unsigned cluster->config().set_monitor_reporting_interval_secs(interval_secs); } +CassError cass_cluster_set_histogram_refresh_interval(CassCluster* cluster, unsigned refresh_interval) { + if (refresh_interval <= 0) { + return CASS_ERROR_LIB_BAD_PARAMS; + } + cluster->config().set_cluster_histogram_refresh_interval(refresh_interval); + return CASS_OK; +} + void cass_cluster_free(CassCluster* cluster) { delete cluster->from(); } } // extern "C" diff --git a/src/config.hpp b/src/config.hpp index 6d34cd6b2..19ee0daca 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -74,7 +74,8 @@ class Config { , is_client_id_set_(false) , host_listener_(new DefaultHostListener()) , monitor_reporting_interval_secs_(CASS_DEFAULT_CLIENT_MONITOR_EVENTS_INTERVAL_SECS) - , cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory()) { + , cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory()) + , histogram_refresh_interval_(CASS_DEFAULT_HISTOGRAM_REFRESH_INTERVAL_NO_REFRESH) { profiles_.set_empty_key(String()); // Assign the defaults to the cluster profile @@ -392,6 +393,12 @@ class Config { } } + unsigned cluster_histogram_refresh_interval() const { return histogram_refresh_interval_; } + + void set_cluster_histogram_refresh_interval(unsigned refresh_interval) { + histogram_refresh_interval_ = refresh_interval; + } + private: void init_profiles(); @@ -441,6 +448,7 @@ class Config { unsigned monitor_reporting_interval_secs_; CloudSecureConnectionConfig cloud_secure_connection_config_; ClusterMetadataResolverFactory::Ptr cluster_metadata_resolver_factory_; + unsigned histogram_refresh_interval_; }; }}} // namespace datastax::internal::core diff --git a/src/constants.hpp b/src/constants.hpp index 2d2a743a7..371c74602 100644 --- a/src/constants.hpp +++ b/src/constants.hpp @@ -142,6 +142,7 @@ #define CASS_DEFAULT_MAX_TRACING_DATA_WAIT_TIME_MS 15 #define CASS_DEFAULT_RETRY_TRACING_DATA_WAIT_TIME_MS 3 #define CASS_DEFAULT_TRACING_CONSISTENCY CASS_CONSISTENCY_ONE +#define CASS_DEFAULT_HISTOGRAM_REFRESH_INTERVAL_NO_REFRESH 0 // Request-level defaults #define CASS_DEFAULT_CONSISTENCY CASS_CONSISTENCY_LOCAL_ONE diff --git a/src/metrics.hpp b/src/metrics.hpp index 59bee6f07..6cf028f7b 100644 --- a/src/metrics.hpp +++ b/src/metrics.hpp @@ -23,6 +23,7 @@ #include "allocated.hpp" #include "atomic.hpp" #include "constants.hpp" +#include "get_time.hpp" #include "scoped_lock.hpp" #include "scoped_ptr.hpp" #include "utils.hpp" @@ -268,9 +269,15 @@ class Metrics : public Allocated { int64_t percentile_999th; }; - Histogram(ThreadState* thread_state) + Histogram(ThreadState* thread_state, unsigned refresh_interval = CASS_DEFAULT_HISTOGRAM_REFRESH_INTERVAL_NO_REFRESH) : thread_state_(thread_state) - , histograms_(new PerThreadHistogram[thread_state->max_threads()]) { + , histograms_(new PerThreadHistogram[thread_state->max_threads()]) + , zero_snapshot_(Snapshot {0,0,0,0,0,0,0,0,0,0}) { + + refresh_interval_ = refresh_interval; + refresh_timestamp_ = get_time_since_epoch_ms(); + cached_snapshot_ = zero_snapshot_; + hdr_init(1LL, HIGHEST_TRACKABLE_VALUE, 3, &histogram_); uv_mutex_init(&mutex_); } @@ -286,38 +293,76 @@ class Metrics : public Allocated { void get_snapshot(Snapshot* snapshot) const { ScopedMutex l(&mutex_); - hdr_histogram* h = histogram_; - for (size_t i = 0; i < thread_state_->max_threads(); ++i) { - histograms_[i].add(h); + + // In the "no refresh" case (the default) fall back to the old behaviour; add per-thread + // timestamps to histogram_ (without any clearing of data) and return what's there. + if (refresh_interval_ == CASS_DEFAULT_HISTOGRAM_REFRESH_INTERVAL_NO_REFRESH) { + + for (size_t i = 0; i < thread_state_->max_threads(); ++i) { + histograms_[i].add(histogram_); + } + + if (histogram_->total_count == 0) { + // There is no data; default to 0 for the stats. + copy_snapshot(zero_snapshot_, snapshot); + } else { + histogram_to_snapshot(histogram_, snapshot); + } + return; } - if (h->total_count == 0) { - // There is no data; default to 0 for the stats. - snapshot->max = 0; - snapshot->min = 0; - snapshot->mean = 0; - snapshot->stddev = 0; - snapshot->median = 0; - snapshot->percentile_75th = 0; - snapshot->percentile_95th = 0; - snapshot->percentile_98th = 0; - snapshot->percentile_99th = 0; - snapshot->percentile_999th = 0; - } else { - snapshot->max = hdr_max(h); - snapshot->min = hdr_min(h); - snapshot->mean = static_cast(hdr_mean(h)); - snapshot->stddev = static_cast(hdr_stddev(h)); - snapshot->median = hdr_value_at_percentile(h, 50.0); - snapshot->percentile_75th = hdr_value_at_percentile(h, 75.0); - snapshot->percentile_95th = hdr_value_at_percentile(h, 95.0); - snapshot->percentile_98th = hdr_value_at_percentile(h, 98.0); - snapshot->percentile_99th = hdr_value_at_percentile(h, 99.0); - snapshot->percentile_999th = hdr_value_at_percentile(h, 99.9); + // Refresh interval is in use. If we've exceeded the interval clear histogram_, + // compute a new aggregate histogram and build (and cache) a new snapshot. Otherwise + // just return the cached version. + uint64_t now = get_time_since_epoch_ms(); + if (now - refresh_timestamp_ >= refresh_interval_) { + + hdr_reset(histogram_); + + for (size_t i = 0; i < thread_state_->max_threads(); ++i) { + histograms_[i].add(histogram_); + } + + if (histogram_->total_count == 0) { + copy_snapshot(zero_snapshot_, &cached_snapshot_); + } else { + histogram_to_snapshot(histogram_, &cached_snapshot_); + } + refresh_timestamp_ = now; } + + copy_snapshot(cached_snapshot_, snapshot); } private: + + void copy_snapshot(Snapshot from, Snapshot* to) const { + to->min = from.min; + to->max = from.max; + to->mean = from.mean; + to->stddev = from.stddev; + to->median = from.median; + to->percentile_75th = from.percentile_75th; + to->percentile_95th = from.percentile_95th; + to->percentile_98th = from.percentile_98th; + to->percentile_99th = from.percentile_99th; + to->percentile_999th = from.percentile_999th; + } + + void histogram_to_snapshot(hdr_histogram* h, Snapshot* to) const { + to->min = hdr_min(h); + to->max = hdr_max(h); + to->mean = static_cast(hdr_mean(h)); + to->stddev = static_cast(hdr_stddev(h)); + to->median = hdr_value_at_percentile(h, 50.0); + to->percentile_75th = hdr_value_at_percentile(h, 75.0); + to->percentile_95th = hdr_value_at_percentile(h, 95.0); + to->percentile_98th = hdr_value_at_percentile(h, 98.0); + to->percentile_99th = hdr_value_at_percentile(h, 99.0); + to->percentile_999th = hdr_value_at_percentile(h, 99.9); + } + + class WriterReaderPhaser { public: WriterReaderPhaser() @@ -409,14 +454,19 @@ class Metrics : public Allocated { hdr_histogram* histogram_; mutable uv_mutex_t mutex_; + unsigned refresh_interval_; + mutable uint64_t refresh_timestamp_; + mutable Snapshot cached_snapshot_; + const Snapshot zero_snapshot_; + private: DISALLOW_COPY_AND_ASSIGN(Histogram); }; - Metrics(size_t max_threads) + Metrics(size_t max_threads, unsigned histogram_refresh_interval) : thread_state_(max_threads) - , request_latencies(&thread_state_) - , speculative_request_latencies(&thread_state_) + , request_latencies(&thread_state_, histogram_refresh_interval) + , speculative_request_latencies(&thread_state_, histogram_refresh_interval) , request_rates(&thread_state_) , total_connections(&thread_state_) , connection_timeouts(&thread_state_) @@ -447,6 +497,8 @@ class Metrics : public Allocated { Counter connection_timeouts; Counter request_timeouts; + unsigned histogram_refresh_interval; + private: DISALLOW_COPY_AND_ASSIGN(Metrics); }; diff --git a/src/session_base.cpp b/src/session_base.cpp index 6ccb4f7c5..4cdc29ea2 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -97,7 +97,7 @@ Future::Ptr SessionBase::connect(const Config& config, const String& keyspace) { random_.reset(); } - metrics_.reset(new Metrics(config.thread_count_io() + 1)); + metrics_.reset(new Metrics(config.thread_count_io() + 1, config.cluster_histogram_refresh_interval())); cluster_.reset(); ClusterConnector::Ptr connector( diff --git a/tests/src/unit/tests/test_metrics.cpp b/tests/src/unit/tests/test_metrics.cpp index 16218f80d..28d3e5260 100644 --- a/tests/src/unit/tests/test_metrics.cpp +++ b/tests/src/unit/tests/test_metrics.cpp @@ -144,6 +144,92 @@ TEST(MetricsUnitTest, HistogramEmpty) { EXPECT_EQ(snapshot.percentile_999th, 0); } +TEST(MetricsUnitTest, HistogramWithRefreshInterval) { + unsigned refresh_interval = 1000; + Metrics::ThreadState thread_state(1); + Metrics::Histogram histogram(&thread_state, refresh_interval); + + Metrics::Histogram::Snapshot snapshot; + + // Retrieval before the first interval runs will simply return zeros + histogram.get_snapshot(&snapshot); + EXPECT_EQ(snapshot.min, 0); + EXPECT_EQ(snapshot.max, 0); + EXPECT_EQ(snapshot.median, 0); + EXPECT_EQ(snapshot.percentile_75th, 0); + EXPECT_EQ(snapshot.percentile_95th, 0); + EXPECT_EQ(snapshot.percentile_98th, 0); + EXPECT_EQ(snapshot.percentile_99th, 0); + EXPECT_EQ(snapshot.percentile_999th, 0); + EXPECT_EQ(snapshot.mean, 0); + EXPECT_EQ(snapshot.stddev, 0); + + // Values added during the first interval (or for that matter any + // interval) will be buffered in per-thread counters and will be + // included in the next generated snapshot + for (uint64_t i = 1; i <= 100; ++i) { + histogram.record_value(i); + } + test::Utils::msleep(1.2 * refresh_interval); + + histogram.get_snapshot(&snapshot); + EXPECT_EQ(snapshot.min, 1); + EXPECT_EQ(snapshot.max, 100); + EXPECT_EQ(snapshot.median, 50); + EXPECT_EQ(snapshot.percentile_75th, 75); + EXPECT_EQ(snapshot.percentile_95th, 95); + EXPECT_EQ(snapshot.percentile_98th, 98); + EXPECT_EQ(snapshot.percentile_99th, 99); + EXPECT_EQ(snapshot.percentile_999th, 100); + EXPECT_EQ(snapshot.mean, 50); + EXPECT_EQ(snapshot.stddev, 28); + + // Generated snapshot should only include values added within + // the current interval + test::Utils::msleep(1.2 * refresh_interval); + for (uint64_t i = 101; i <= 200; ++i) { + histogram.record_value(i); + } + + histogram.get_snapshot(&snapshot); + EXPECT_EQ(snapshot.min, 101); + EXPECT_EQ(snapshot.max, 200); + EXPECT_EQ(snapshot.median, 150); + EXPECT_EQ(snapshot.percentile_75th, 175); + EXPECT_EQ(snapshot.percentile_95th, 195); + EXPECT_EQ(snapshot.percentile_98th, 198); + EXPECT_EQ(snapshot.percentile_99th, 199); + EXPECT_EQ(snapshot.percentile_999th, 200); + EXPECT_EQ(snapshot.mean, 150); + EXPECT_EQ(snapshot.stddev, 28); +} + +// Variant of the case above. If we have no requests for the entirety +// of the refresh interval make sure the stats return zero +TEST(MetricsUnitTest, HistogramWithRefreshIntervalNoActivity) { + unsigned refresh_interval = 1000; + Metrics::ThreadState thread_state(1); + Metrics::Histogram histogram(&thread_state, refresh_interval); + + Metrics::Histogram::Snapshot snapshot; + + // Initial refresh interval (where we always return zero) + another interval of + // no activity + test::Utils::msleep(2.2 * refresh_interval); + + histogram.get_snapshot(&snapshot); + EXPECT_EQ(snapshot.min, 0); + EXPECT_EQ(snapshot.max, 0); + EXPECT_EQ(snapshot.median, 0); + EXPECT_EQ(snapshot.percentile_75th, 0); + EXPECT_EQ(snapshot.percentile_95th, 0); + EXPECT_EQ(snapshot.percentile_98th, 0); + EXPECT_EQ(snapshot.percentile_99th, 0); + EXPECT_EQ(snapshot.percentile_999th, 0); + EXPECT_EQ(snapshot.mean, 0); + EXPECT_EQ(snapshot.stddev, 0); +} + TEST(MetricsUnitTest, HistogramWithThreads) { HistogramThreadArgs args[NUM_THREADS];