diff --git a/bazel/external/libcircllhist.BUILD b/bazel/external/libcircllhist.BUILD new file mode 100644 index 000000000000..4e109f0b38d4 --- /dev/null +++ b/bazel/external/libcircllhist.BUILD @@ -0,0 +1,9 @@ +cc_library( + name = "libcircllhist", + srcs = ["src/circllhist.c"], + hdrs = [ + "src/circllhist.h", + ], + includes = ["src"], + visibility = ["//visibility:public"], +) diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index 66f717943846..8a2eb68215dc 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -177,6 +177,7 @@ def envoy_dependencies(path = "@envoy_deps//", skip_targets = []): _boringssl() _com_google_absl() _com_github_bombela_backward() + _com_github_circonus_labs_libcircllhist() _com_github_cyan4973_xxhash() _com_github_eile_tclap() _com_github_fmtlib_fmt() @@ -215,6 +216,16 @@ def _com_github_bombela_backward(): actual = "@com_github_bombela_backward//:backward", ) +def _com_github_circonus_labs_libcircllhist(): + _repository_impl( + name = "com_github_circonus_labs_libcircllhist", + build_file = "@envoy//bazel/external:libcircllhist.BUILD", + ) + native.bind( + name = "libcircllhist", + actual = "@com_github_circonus_labs_libcircllhist//:libcircllhist", + ) + def _com_github_cyan4973_xxhash(): _repository_impl( name = "com_github_cyan4973_xxhash", diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index fdf945f91172..45a03183229e 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -12,6 +12,10 @@ REPOSITORY_LOCATIONS = dict( commit = "44ae9609e860e3428cd057f7052e505b4819eb84", # 2018-02-06 remote = "https://github.com/bombela/backward-cpp", ), + com_github_circonus_labs_libcircllhist = dict( + commit = "97ef5e088fd01fa8ec5a86334a6308ac0d51ea6f", # 2018-04-07 + remote = "https://github.com/circonus-labs/libcircllhist", + ), com_github_cyan4973_xxhash = dict( commit = "7caf8bd76440c75dfe1070d3acfbd7891aea8fca", # v0.6.4 remote = "https://github.com/Cyan4973/xxHash", diff --git a/include/envoy/stats/stats.h b/include/envoy/stats/stats.h index b84e811047fa..b7808c2a15f1 100644 --- a/include/envoy/stats/stats.h +++ b/include/envoy/stats/stats.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -114,6 +115,11 @@ class Metric { * Returns the name of the Metric with the portions designated as tags removed. */ virtual const std::string& tagExtractedName() const PURE; + + /** + * Indicates whether a metric has been used. + */ + virtual bool used() const PURE; }; /** @@ -128,7 +134,6 @@ class Counter : public virtual Metric { virtual void inc() PURE; virtual uint64_t latch() PURE; virtual void reset() PURE; - virtual bool used() const PURE; virtual uint64_t value() const PURE; }; @@ -146,12 +151,34 @@ class Gauge : public virtual Metric { virtual void inc() PURE; virtual void set(uint64_t value) PURE; virtual void sub(uint64_t amount) PURE; - virtual bool used() const PURE; virtual uint64_t value() const PURE; }; typedef std::shared_ptr GaugeSharedPtr; +/** + * Holds the computed statistics for a histogram. + */ +class HistogramStatistics { +public: + virtual ~HistogramStatistics() {} + + /** + * Returns summary representation of the histogram. + */ + virtual std::string summary() const PURE; + + /** + * Returns supported quantiles. + */ + virtual const std::vector& supportedQuantiles() const PURE; + + /** + * Returns computed quantile values during the period. + */ + virtual const std::vector& computedQuantiles() const PURE; +}; + /** * A histogram that records values one at a time. * Note: Histograms now incorporate what used to be timers because the only difference between the @@ -167,6 +194,21 @@ class Histogram : public virtual Metric { * Records an unsigned value. If a timer, values are in units of milliseconds. */ virtual void recordValue(uint64_t value) PURE; + + /** + * Merges the histogram values collected during the flush interval. + */ + virtual void merge() PURE; + + /** + * Returns the interval histogram summary statistics for the flush interval. + */ + virtual const HistogramStatistics& intervalStatistics() const PURE; + + /** + * Returns the cumulative histogram summary statistics. + */ + virtual const HistogramStatistics& cumulativeStatistics() const PURE; }; typedef std::shared_ptr HistogramSharedPtr; @@ -194,6 +236,11 @@ class Sink { */ virtual void flushGauge(const Gauge& gauge, uint64_t value) PURE; + /** + * Flush a histogram. + */ + virtual void flushHistogram(const Histogram& histogram) PURE; + /** * This will be called after beginFlush(), some number of flushCounter(), and some number of * flushGauge(). Sinks can use this to optimize writing if desired. @@ -263,10 +310,20 @@ class Store : public Scope { * @return a list of all known gauges. */ virtual std::list gauges() const PURE; + + /** + * @return a list of all known histograms. + */ + virtual std::list histograms() const PURE; }; typedef std::unique_ptr StorePtr; +/** + * Callback invoked when a store's mergeHistogram() runs. + */ +typedef std::function PostMergeCb; + /** * The root of the stat store. */ @@ -294,6 +351,14 @@ class StoreRoot : public Store { * down. */ virtual void shutdownThreading() PURE; + + /** + * Called during the flush process to merge all the thread local histograms. The passed in + * callback will be called on the main thread, but it will happen after the method returns + * which means that the actual flush process will happen on the main thread after this method + * returns. + */ + virtual void mergeHistograms(PostMergeCb merge_complete_cb) PURE; }; typedef std::unique_ptr StoreRootPtr; diff --git a/include/envoy/thread_local/thread_local.h b/include/envoy/thread_local/thread_local.h index 1db262c95720..273e7d454adc 100644 --- a/include/envoy/thread_local/thread_local.h +++ b/include/envoy/thread_local/thread_local.h @@ -46,6 +46,16 @@ class Slot { */ virtual void runOnAllThreads(Event::PostCb cb) PURE; + /** + * Run a callback on all registered threads with a barrier. A shutdown initiated during the + * running of the PostCBs may prevent all_threads_complete_cb from being called. + * @param cb supplies the callback to run on each thread. + * @param all_threads_complete_cb supplies the callback to run on main thread after threads are + * done. + */ + virtual void runOnAllThreadsWithBarrier(Event::PostCb cb, + Event::PostCb all_threads_complete_cb) PURE; + /** * Set thread local data on all threads previously registered via registerThread(). * @param initializeCb supplies the functor that will be called *on each thread*. The functor diff --git a/source/common/stats/BUILD b/source/common/stats/BUILD index 1d6698e72c95..5af41e9c29ab 100644 --- a/source/common/stats/BUILD +++ b/source/common/stats/BUILD @@ -12,6 +12,10 @@ envoy_cc_library( name = "stats_lib", srcs = ["stats_impl.cc"], hdrs = ["stats_impl.h"], + external_deps = [ + "abseil_optional", + "libcircllhist", + ], deps = [ "//include/envoy/common:time_interface", "//include/envoy/server:options_interface", diff --git a/source/common/stats/stats_impl.cc b/source/common/stats/stats_impl.cc index 03b88d6b625d..440330c6d734 100644 --- a/source/common/stats/stats_impl.cc +++ b/source/common/stats/stats_impl.cc @@ -273,5 +273,29 @@ void RawStatData::initialize(absl::string_view key) { name_[xfer_size] = '\0'; } +HistogramStatisticsImpl ::HistogramStatisticsImpl(histogram_t* histogram_ptr) + : computed_quantiles_(supported_quantiles_.size(), 0.0) { + hist_approx_quantile(histogram_ptr, supported_quantiles_.data(), supported_quantiles_.size(), + computed_quantiles_.data()); +} + +std::string HistogramStatisticsImpl ::summary() const { + std::vector summary; + for (size_t i = 0; i < supported_quantiles_.size(); ++i) { + summary.push_back( + fmt::format("P{}: {}", 100 * supported_quantiles_[i], computed_quantiles_[i])); + } + return absl::StrJoin(summary, ", "); +} + +/** + * Clears the old computed values and refreshes it with values computed from passed histogram. + */ +void HistogramStatisticsImpl ::refresh(histogram_t* new_histogram_ptr) { + computed_quantiles_.clear(); + hist_approx_quantile(new_histogram_ptr, supported_quantiles_.data(), supported_quantiles_.size(), + computed_quantiles_.data()); +} + } // namespace Stats } // namespace Envoy diff --git a/source/common/stats/stats_impl.h b/source/common/stats/stats_impl.h index 85629d3449ea..0e718ed48a4d 100644 --- a/source/common/stats/stats_impl.h +++ b/source/common/stats/stats_impl.h @@ -21,7 +21,9 @@ #include "common/common/utility.h" #include "common/protobuf/protobuf.h" +#include "absl/strings/str_join.h" #include "absl/strings/string_view.h" +#include "circllhist.h" namespace Envoy { namespace Stats { @@ -167,9 +169,6 @@ class Utility { * RawStatData::size() instead. */ struct RawStatData { - struct Flags { - static const uint8_t Used = 0x1; - }; /** * Due to the flexible-array-length of name_, c-style allocation @@ -284,6 +283,14 @@ class MetricImpl : public virtual Metric { const std::string& tagExtractedName() const override { return tag_extracted_name_; } const std::vector& tags() const override { return tags_; } +protected: + /** + * Flags used by all stats types to figure out whether they have been used. + */ + struct Flags { + static const uint8_t Used = 0x1; + }; + private: const std::string name_; const std::string tag_extracted_name_; @@ -305,13 +312,13 @@ class CounterImpl : public Counter, public MetricImpl { void add(uint64_t amount) override { data_.value_ += amount; data_.pending_increment_ += amount; - data_.flags_ |= RawStatData::Flags::Used; + data_.flags_ |= Flags::Used; } void inc() override { add(1); } uint64_t latch() override { return data_.pending_increment_.exchange(0); } void reset() override { data_.value_ = 0; } - bool used() const override { return data_.flags_ & RawStatData::Flags::Used; } + bool used() const override { return data_.flags_ & Flags::Used; } uint64_t value() const override { return data_.value_; } private: @@ -333,13 +340,13 @@ class GaugeImpl : public Gauge, public MetricImpl { // Stats::Gauge virtual void add(uint64_t amount) override { data_.value_ += amount; - data_.flags_ |= RawStatData::Flags::Used; + data_.flags_ |= Flags::Used; } virtual void dec() override { sub(1); } virtual void inc() override { add(1); } virtual void set(uint64_t value) override { data_.value_ = value; - data_.flags_ |= RawStatData::Flags::Used; + data_.flags_ |= Flags::Used; } virtual void sub(uint64_t amount) override { ASSERT(data_.value_ >= amount); @@ -347,13 +354,35 @@ class GaugeImpl : public Gauge, public MetricImpl { data_.value_ -= amount; } virtual uint64_t value() const override { return data_.value_; } - bool used() const override { return data_.flags_ & RawStatData::Flags::Used; } + bool used() const override { return data_.flags_ & Flags::Used; } private: RawStatData& data_; RawStatDataAllocator& alloc_; }; +/** + * Implementation of HistogramStatistics for circllhist. + */ +class HistogramStatisticsImpl : public HistogramStatistics { +public: + HistogramStatisticsImpl() : computed_quantiles_(supported_quantiles_.size(), 0.0) {} + HistogramStatisticsImpl(histogram_t* histogram_ptr); + + HistogramStatisticsImpl(const HistogramStatisticsImpl&) = delete; + HistogramStatisticsImpl& operator=(HistogramStatisticsImpl const&) = delete; + + std::string summary() const override; + const std::vector& supportedQuantiles() const override { return supported_quantiles_; } + const std::vector& computedQuantiles() const override { return computed_quantiles_; } + + void refresh(histogram_t* new_histogram_ptr); + +private: + const std::vector supported_quantiles_ = {0, 0.25, 0.5, 0.75, 0.90, 0.95, 0.99, 0.999, 1}; + std::vector computed_quantiles_; +}; + /** * Histogram implementation for the heap. */ @@ -366,7 +395,22 @@ class HistogramImpl : public Histogram, public MetricImpl { // Stats::Histogram void recordValue(uint64_t value) override { parent_.deliverHistogramToSinks(*this, value); } + // TODO(ramaraochavali): split the Histogram interface in to two - parent and tls. + void merge() override { NOT_IMPLEMENTED; } + + bool used() const override { return true; } + + const HistogramStatistics& intervalStatistics() const override { return interval_statistics_; } + + const HistogramStatistics& cumulativeStatistics() const override { + return cumulative_statistics_; + } + Store& parent_; + +private: + HistogramStatisticsImpl interval_statistics_; + HistogramStatisticsImpl cumulative_statistics_; }; /** @@ -446,6 +490,7 @@ class IsolatedStoreImpl : public Store { // Stats::Store std::list counters() const override { return counters_.toList(); } std::list gauges() const override { return gauges_.toList(); } + std::list histograms() const override { return histograms_.toList(); } private: struct ScopeImpl : public Scope { diff --git a/source/common/stats/thread_local_store.cc b/source/common/stats/thread_local_store.cc index 6d0036c6b1c7..4ab5e053c5ec 100644 --- a/source/common/stats/thread_local_store.cc +++ b/source/common/stats/thread_local_store.cc @@ -61,6 +61,25 @@ std::list ThreadLocalStoreImpl::gauges() const { return ret; } +std::list ThreadLocalStoreImpl::histograms() const { + // Handle de-dup due to overlapping scopes. + std::list ret; + std::unordered_set names; + std::unique_lock lock(lock_); + // TODO(ramaraochavali): incorporate the scopes into the histogram names. + for (ScopeImpl* scope : scopes_) { + for (auto histogram : scope->central_cache_.histograms_) { + const std::string& hist_name = histogram.first; + const ParentHistogramSharedPtr& parent_hist = histogram.second; + if (names.insert(hist_name).second) { + ret.push_back(parent_hist); + } + } + } + + return ret; +} + void ThreadLocalStoreImpl::initializeThreading(Event::Dispatcher& main_thread_dispatcher, ThreadLocal::Instance& tls) { main_thread_dispatcher_ = &main_thread_dispatcher; @@ -75,6 +94,30 @@ void ThreadLocalStoreImpl::shutdownThreading() { shutting_down_ = true; } +void ThreadLocalStoreImpl::mergeHistograms(PostMergeCb merge_complete_cb) { + if (!shutting_down_) { + tls_->runOnAllThreadsWithBarrier( + [this]() -> void { + for (ScopeImpl* scope : scopes_) { + for (auto histogram : tls_->getTyped().scope_cache_[scope].histograms_) { + const TlsHistogramSharedPtr& tls_hist = histogram.second; + tls_hist->beginMerge(); + } + } + }, + [this, merge_complete_cb]() -> void { mergeInternal(merge_complete_cb); }); + } +} + +void ThreadLocalStoreImpl::mergeInternal(PostMergeCb merge_complete_cb) { + if (!shutting_down_) { + for (HistogramSharedPtr histogram : histograms()) { + histogram->merge(); + } + merge_complete_cb(); + } +} + void ThreadLocalStoreImpl::releaseScopeCrossThread(ScopeImpl* scope) { std::unique_lock lock(lock_); ASSERT(scopes_.count(scope) == 1); @@ -204,7 +247,7 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { // See comments in counter(). There is no super clean way (via templates or otherwise) to // share this code so I'm leaving it largely duplicated for now. std::string final_name = prefix_ + name; - HistogramSharedPtr* tls_ref = nullptr; + TlsHistogramSharedPtr* tls_ref = nullptr; if (!parent_.shutting_down_ && parent_.tls_) { tls_ref = &parent_.tls_->getTyped().scope_cache_[this].histograms_[final_name]; } @@ -214,19 +257,106 @@ Histogram& ThreadLocalStoreImpl::ScopeImpl::histogram(const std::string& name) { } std::unique_lock lock(parent_.lock_); - HistogramSharedPtr& central_ref = central_cache_.histograms_[final_name]; + ParentHistogramSharedPtr& central_ref = central_cache_.histograms_[final_name]; + + std::vector tags; + std::string tag_extracted_name = parent_.getTagsForName(final_name, tags); if (!central_ref) { - std::vector tags; - std::string tag_extracted_name = parent_.getTagsForName(final_name, tags); - central_ref.reset( - new HistogramImpl(final_name, parent_, std::move(tag_extracted_name), std::move(tags))); + // Since MetricImpl only has move constructor, we are explicitly copying here. + std::string central_tag_extracted_name(tag_extracted_name); + std::vector central_tags(tags); + central_ref.reset(new HistogramParentImpl( + final_name, parent_, std::move(central_tag_extracted_name), std::move(central_tags))); } + TlsHistogramSharedPtr hist_tls_ptr(new ThreadLocalHistogramImpl( + final_name, parent_, std::move(tag_extracted_name), std::move(tags))); + central_ref->addTlsHistogram(hist_tls_ptr); if (tls_ref) { - *tls_ref = central_ref; + *tls_ref = hist_tls_ptr; } + return *hist_tls_ptr; +} - return *central_ref; +ThreadLocalHistogramImpl::ThreadLocalHistogramImpl(const std::string& name, Store& parent, + std::string&& tag_extracted_name, + std::vector&& tags) + : MetricImpl(name, std::move(tag_extracted_name), std::move(tags)), parent_(parent), + current_active_(0), flags_(0) { + histograms_[0] = hist_alloc(); + histograms_[1] = hist_alloc(); +} + +ThreadLocalHistogramImpl::~ThreadLocalHistogramImpl() { + hist_free(histograms_[0]); + hist_free(histograms_[1]); +} + +void ThreadLocalHistogramImpl::recordValue(uint64_t value) { + hist_insert_intscale(histograms_[current_active_], value, 0, 1); + parent_.deliverHistogramToSinks(*this, value); + flags_ |= Flags::Used; +} + +void ThreadLocalHistogramImpl::merge(histogram_t* target) { + histogram_t* hist_array[1]; + hist_array[0] = histograms_[1 - current_active_]; + hist_accumulate(target, hist_array, ARRAY_SIZE(hist_array)); + hist_clear(hist_array[0]); +} + +HistogramParentImpl::HistogramParentImpl(const std::string& name, Store& parent, + std::string&& tag_extracted_name, std::vector&& tags) + : MetricImpl(name, std::move(tag_extracted_name), std::move(tags)), parent_(parent), + interval_histogram_(hist_alloc()), cumulative_histogram_(hist_alloc()), + interval_statistics_(interval_histogram_), cumulative_statistics_(cumulative_histogram_) {} + +bool HistogramParentImpl::used() const { + std::unique_lock lock(merge_lock_); + return usedWorker(); +} + +HistogramParentImpl::~HistogramParentImpl() { + hist_free(interval_histogram_); + hist_free(cumulative_histogram_); +} + +/** + * This method is called during the main stats flush process for each of the histogram. This + * method iterates through the Tls histograms and collects the histogram data of all of them + * in to "interval_histogram_". Then the collected "interval_histogram_" is merged to a + * "cumulative_histogram". More details about threading model at + * https://github.com/envoyproxy/envoy/issues/1965#issuecomment-376672282. + */ +void HistogramParentImpl::merge() { + std::unique_lock lock(merge_lock_); + if (usedWorker()) { + hist_clear(interval_histogram_); + for (TlsHistogramSharedPtr tls_histogram : tls_histograms_) { + tls_histogram->merge(interval_histogram_); + } + histogram_t* hist_array[1]; + hist_array[0] = interval_histogram_; + hist_accumulate(cumulative_histogram_, hist_array, ARRAY_SIZE(hist_array)); + cumulative_statistics_.refresh(cumulative_histogram_); + interval_statistics_.refresh(interval_histogram_); + } +} + +void HistogramParentImpl::addTlsHistogram(TlsHistogramSharedPtr hist_ptr) { + std::unique_lock lock(merge_lock_); + tls_histograms_.emplace_back(hist_ptr); +} + +bool HistogramParentImpl::usedWorker() const { + bool any_tls_used = false; + for (const TlsHistogramSharedPtr tls_histogram : tls_histograms_) { + if (tls_histogram->used()) { + any_tls_used = true; + break; + } + } + return any_tls_used; } } // namespace Stats diff --git a/source/common/stats/thread_local_store.h b/source/common/stats/thread_local_store.h index 030bb4dd4f28..0af7703ff1df 100644 --- a/source/common/stats/thread_local_store.h +++ b/source/common/stats/thread_local_store.h @@ -16,6 +16,77 @@ namespace Envoy { namespace Stats { +/** + * Log Linear Histogram implementation per thread. + */ +class ThreadLocalHistogramImpl : public Histogram, public MetricImpl { +public: + ThreadLocalHistogramImpl(const std::string& name, Store& parent, std::string&& tag_extracted_name, + std::vector&& tags); + + virtual ~ThreadLocalHistogramImpl(); + // Stats::Histogram + void recordValue(uint64_t value) override; + + // TODO(ramaraochavali): split the Histogram interface in to two - parent and tls. + void merge() override { NOT_IMPLEMENTED; } + bool used() const override { return flags_ & Flags::Used; } + const HistogramStatistics& intervalStatistics() const override { return interval_statistics_; } + const HistogramStatistics& cumulativeStatistics() const override { + return cumulative_statistics_; + } + + void beginMerge() { current_active_ = 1 - current_active_; } + void merge(histogram_t* target); + + Store& parent_; + +private: + uint64_t current_active_; + histogram_t* histograms_[2]; + std::atomic flags_; + HistogramStatisticsImpl interval_statistics_; + HistogramStatisticsImpl cumulative_statistics_; +}; + +typedef std::shared_ptr TlsHistogramSharedPtr; + +/** + * Log Linear Histogram implementation that is stored in the main thread. + */ +class HistogramParentImpl : public Histogram, public MetricImpl { +public: + HistogramParentImpl(const std::string& name, Store& parent, std::string&& tag_extracted_name, + std::vector&& tags); + + virtual ~HistogramParentImpl(); + + // TODO(ramaraochavali): split the Histogram interface in to two - parent and tls. + void recordValue(uint64_t) override { NOT_IMPLEMENTED; } + bool used() const override; + void merge() override; + const HistogramStatistics& intervalStatistics() const override { return interval_statistics_; } + const HistogramStatistics& cumulativeStatistics() const override { + return cumulative_statistics_; + } + + void addTlsHistogram(TlsHistogramSharedPtr hist_ptr); + + Store& parent_; + std::list tls_histograms_; + +private: + bool usedWorker() const; + + histogram_t* interval_histogram_; + histogram_t* cumulative_histogram_; + HistogramStatisticsImpl interval_statistics_; + HistogramStatisticsImpl cumulative_statistics_; + mutable std::mutex merge_lock_; +}; + +typedef std::shared_ptr ParentHistogramSharedPtr; + /** * Store implementation with thread local caching. This implementation supports the following * features: @@ -64,6 +135,7 @@ class ThreadLocalStoreImpl : public StoreRoot { // Stats::Store std::list counters() const override; std::list gauges() const override; + std::list histograms() const override; // Stats::StoreRoot void addSink(Sink& sink) override { timer_sinks_.push_back(sink); } @@ -74,11 +146,19 @@ class ThreadLocalStoreImpl : public StoreRoot { ThreadLocal::Instance& tls) override; void shutdownThreading() override; + void mergeHistograms(PostMergeCb mergeCb) override; + private: struct TlsCacheEntry { std::unordered_map counters_; std::unordered_map gauges_; - std::unordered_map histograms_; + std::unordered_map histograms_; + }; + + struct CentralCacheEntry { + std::unordered_map counters_; + std::unordered_map gauges_; + std::unordered_map histograms_; }; struct ScopeImpl : public Scope { @@ -97,7 +177,7 @@ class ThreadLocalStoreImpl : public StoreRoot { ThreadLocalStoreImpl& parent_; const std::string prefix_; - TlsCacheEntry central_cache_; + CentralCacheEntry central_cache_; }; struct TlsCache : public ThreadLocal::ThreadLocalObject { @@ -113,6 +193,7 @@ class ThreadLocalStoreImpl : public StoreRoot { void clearScopeFromCaches(ScopeImpl* scope); void releaseScopeCrossThread(ScopeImpl* scope); SafeAllocData safeAlloc(const std::string& name); + void mergeInternal(PostMergeCb mergeCb); RawStatDataAllocator& alloc_; Event::Dispatcher* main_thread_dispatcher_{}; diff --git a/source/common/thread_local/thread_local_impl.cc b/source/common/thread_local/thread_local_impl.cc index e1805e3e85f3..f8c3946e4812 100644 --- a/source/common/thread_local/thread_local_impl.cc +++ b/source/common/thread_local/thread_local_impl.cc @@ -91,6 +91,22 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb) { cb(); } +void InstanceImpl::runOnAllThreadsWithBarrier(Event::PostCb cb, + Event::PostCb all_threads_complete_cb) { + ASSERT(std::this_thread::get_id() == main_thread_id_); + ASSERT(!shutdown_); + std::shared_ptr> worker_count = + std::make_shared>(registered_threads_.size()); + for (Event::Dispatcher& dispatcher : registered_threads_) { + dispatcher.post([this, worker_count, cb, all_threads_complete_cb]() -> void { + cb(); + if (--*worker_count == 0) { + main_thread_dispatcher_->post(all_threads_complete_cb); + } + }); + } +} + void InstanceImpl::SlotImpl::set(InitializeCb cb) { ASSERT(std::this_thread::get_id() == parent_.main_thread_id_); ASSERT(!parent_.shutdown_); diff --git a/source/common/thread_local/thread_local_impl.h b/source/common/thread_local/thread_local_impl.h index 17a835ac5d0d..e15c09757caa 100644 --- a/source/common/thread_local/thread_local_impl.h +++ b/source/common/thread_local/thread_local_impl.h @@ -35,6 +35,9 @@ class InstanceImpl : Logger::Loggable, public Instance { // ThreadLocal::Slot ThreadLocalObjectSharedPtr get() override; void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); } + void runOnAllThreadsWithBarrier(Event::PostCb cb, Event::PostCb main_callback) override { + parent_.runOnAllThreadsWithBarrier(cb, main_callback); + } void set(InitializeCb cb) override; InstanceImpl& parent_; @@ -48,6 +51,7 @@ class InstanceImpl : Logger::Loggable, public Instance { void removeSlot(SlotImpl& slot); void runOnAllThreads(Event::PostCb cb); + void runOnAllThreadsWithBarrier(Event::PostCb cb, Event::PostCb main_callback); static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object); static thread_local ThreadLocalData thread_local_data_; diff --git a/source/extensions/stat_sinks/common/statsd/statsd.h b/source/extensions/stat_sinks/common/statsd/statsd.h index 78f0a76fc9a7..1b616fddc596 100644 --- a/source/extensions/stat_sinks/common/statsd/statsd.h +++ b/source/extensions/stat_sinks/common/statsd/statsd.h @@ -51,6 +51,7 @@ class UdpStatsdSink : public Stats::Sink { void beginFlush() override {} void flushCounter(const Stats::Counter& counter, uint64_t delta) override; void flushGauge(const Stats::Gauge& gauge, uint64_t value) override; + void flushHistogram(const Stats::Histogram&) override {} void endFlush() override {} void onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) override; @@ -87,6 +88,8 @@ class TcpStatsdSink : public Stats::Sink { tls_->getTyped().flushGauge(gauge.name(), value); } + void flushHistogram(const Stats::Histogram&) override {} + void endFlush() override { tls_->getTyped().endFlush(true); } void onHistogramComplete(const Stats::Histogram& histogram, uint64_t value) override { diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc index 5aa68b19a3b8..cf3462e257c6 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.cc @@ -61,6 +61,42 @@ void GrpcMetricsStreamerImpl::ThreadLocalStreamer::send( MetricsServiceSink::MetricsServiceSink(const GrpcMetricsStreamerSharedPtr& grpc_metrics_streamer) : grpc_metrics_streamer_(grpc_metrics_streamer) {} +void MetricsServiceSink::flushCounter(const Stats::Counter& counter, uint64_t) { + io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); + metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); + metrics_family->set_name(counter.name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); + auto* counter_metric = metric->mutable_counter(); + counter_metric->set_value(counter.value()); +} + +void MetricsServiceSink::flushGauge(const Stats::Gauge& gauge, uint64_t value) { + io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); + metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); + metrics_family->set_name(gauge.name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); + auto* gauage_metric = metric->mutable_gauge(); + gauage_metric->set_value(value); +} +void MetricsServiceSink::flushHistogram(const Stats::Histogram& histogram) { + io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); + metrics_family->set_type(io::prometheus::client::MetricType::SUMMARY); + metrics_family->set_name(histogram.name()); + auto* metric = metrics_family->add_metric(); + metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); + auto* summary_metric = metric->mutable_summary(); + const Stats::HistogramStatistics& hist_stats = histogram.intervalStatistics(); + size_t index = 0; + for (double supported_quantile : hist_stats.supportedQuantiles()) { + auto* quantile = summary_metric->add_quantile(); + quantile->set_quantile(supported_quantile); + quantile->set_value(hist_stats.computedQuantiles()[index]); + index++; + } +} + } // namespace MetricsService } // namespace StatSinks } // namespace Extensions diff --git a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h index cafaafc9ad03..e16301b40ea3 100644 --- a/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h +++ b/source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h @@ -10,7 +10,7 @@ #include "envoy/thread_local/thread_local.h" #include "envoy/upstream/cluster_manager.h" -#include "common/buffer/buffer_impl.h" +#include "common/stats/stats_impl.h" namespace Envoy { namespace Extensions { @@ -112,25 +112,9 @@ class MetricsServiceSink : public Stats::Sink { void beginFlush() override { message_.clear_envoy_metrics(); } - void flushCounter(const Stats::Counter& counter, uint64_t) override { - io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); - metrics_family->set_type(io::prometheus::client::MetricType::COUNTER); - metrics_family->set_name(counter.name()); - auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); - auto* counter_metric = metric->mutable_counter(); - counter_metric->set_value(counter.value()); - } - - void flushGauge(const Stats::Gauge& gauge, uint64_t value) override { - io::prometheus::client::MetricFamily* metrics_family = message_.add_envoy_metrics(); - metrics_family->set_type(io::prometheus::client::MetricType::GAUGE); - metrics_family->set_name(gauge.name()); - auto* metric = metrics_family->add_metric(); - metric->set_timestamp_ms(std::chrono::system_clock::now().time_since_epoch().count()); - auto* gauage_metric = metric->mutable_gauge(); - gauage_metric->set_value(value); - } + void flushCounter(const Stats::Counter& counter, uint64_t) override; + void flushGauge(const Stats::Gauge& gauge, uint64_t value) override; + void flushHistogram(const Stats::Histogram& histogram) override; void endFlush() override { grpc_metrics_streamer_->send(message_); @@ -140,9 +124,7 @@ class MetricsServiceSink : public Stats::Sink { } } - void onHistogramComplete(const Stats::Histogram&, uint64_t) override { - // TODO : Need to figure out how to map existing histogram to Proto Model - } + void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} private: GrpcMetricsStreamerSharedPtr grpc_metrics_streamer_; diff --git a/source/server/http/admin.cc b/source/server/http/admin.cc index 93393f6b2b12..1e82c40562af 100644 --- a/source/server/http/admin.cc +++ b/source/server/http/admin.cc @@ -36,6 +36,7 @@ #include "common/network/listen_socket_impl.h" #include "common/profiler/profiler.h" #include "common/router/config_impl.h" +#include "common/stats/stats_impl.h" #include "common/upstream/host_utility.h" #include "extensions/access_loggers/file/file_access_log_impl.h" @@ -394,6 +395,7 @@ Http::Code AdminImpl::handlerStats(absl::string_view url, Http::HeaderMap& respo Http::Code rc = Http::Code::OK; const Http::Utility::QueryParams params = Http::Utility::parseQueryString(url); std::map all_stats; + std::map all_histograms; for (const Stats::CounterSharedPtr& counter : server_.stats().counters()) { all_stats.emplace(counter->name(), counter->value()); } @@ -402,11 +404,21 @@ Http::Code AdminImpl::handlerStats(absl::string_view url, Http::HeaderMap& respo all_stats.emplace(gauge->name(), gauge->value()); } + for (const Stats::HistogramSharedPtr& histogram : server_.stats().histograms()) { + all_histograms.emplace(histogram->name(), + fmt::format("\n\t Interval: {}\n\t Cumulative: {}", + histogram->intervalStatistics().summary(), + histogram->cumulativeStatistics().summary())); + } + if (params.size() == 0) { // No Arguments so use the standard. for (auto stat : all_stats) { response.add(fmt::format("{}: {}\n", stat.first, stat.second)); } + for (auto histogram : all_histograms) { + response.add(fmt::format("{}: {}\n", histogram.first, histogram.second)); + } } else { const std::string format_key = params.begin()->first; const std::string format_value = params.begin()->second; diff --git a/source/server/server.cc b/source/server/server.cc index ac69ba79a483..edcddae76b8b 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -99,8 +99,8 @@ void InstanceImpl::failHealthcheck(bool fail) { server_stats_->live_.set(!fail); } -void InstanceUtil::flushCountersAndGaugesToSinks(const std::list& sinks, - Stats::Store& store) { +void InstanceUtil::flushMetricsToSinks(const std::list& sinks, + Stats::Store& store) { for (const auto& sink : sinks) { sink->beginFlush(); } @@ -122,6 +122,14 @@ void InstanceUtil::flushCountersAndGaugesToSinks(const std::list } } + for (const Stats::HistogramSharedPtr& histogram : store.histograms()) { + if (histogram->used()) { + for (const auto& sink : sinks) { + sink->flushHistogram(*histogram); + } + } + } + for (const auto& sink : sinks) { sink->endFlush(); } @@ -129,19 +137,21 @@ void InstanceUtil::flushCountersAndGaugesToSinks(const std::list void InstanceImpl::flushStats() { ENVOY_LOG(debug, "flushing stats"); - HotRestart::GetParentStatsInfo info; - restarter_.getParentStats(info); - server_stats_->uptime_.set(time(nullptr) - original_start_time_); - server_stats_->memory_allocated_.set(Memory::Stats::totalCurrentlyAllocated() + - info.memory_allocated_); - server_stats_->memory_heap_size_.set(Memory::Stats::totalCurrentlyReserved()); - server_stats_->parent_connections_.set(info.num_connections_); - server_stats_->total_connections_.set(numConnections() + info.num_connections_); - server_stats_->days_until_first_cert_expiring_.set( - sslContextManager().daysUntilFirstCertExpires()); - - InstanceUtil::flushCountersAndGaugesToSinks(config_->statsSinks(), stats_store_); - stat_flush_timer_->enableTimer(config_->statsFlushInterval()); + // TODO(ramaraochavali): consider adding different flush interval for histograms. + stats_store_.mergeHistograms([this]() -> void { + HotRestart::GetParentStatsInfo info; + restarter_.getParentStats(info); + server_stats_->uptime_.set(time(nullptr) - original_start_time_); + server_stats_->memory_allocated_.set(Memory::Stats::totalCurrentlyAllocated() + + info.memory_allocated_); + server_stats_->memory_heap_size_.set(Memory::Stats::totalCurrentlyReserved()); + server_stats_->parent_connections_.set(info.num_connections_); + server_stats_->total_connections_.set(numConnections() + info.num_connections_); + server_stats_->days_until_first_cert_expiring_.set( + sslContextManager().daysUntilFirstCertExpires()); + InstanceUtil::flushMetricsToSinks(config_->statsSinks(), stats_store_); + stat_flush_timer_->enableTimer(config_->statsFlushInterval()); + }); } void InstanceImpl::getParentStats(HotRestart::GetParentStatsInfo& info) { diff --git a/source/server/server.h b/source/server/server.h index 645f0a006746..60c2b148f270 100644 --- a/source/server/server.h +++ b/source/server/server.h @@ -82,13 +82,13 @@ class InstanceUtil : Logger::Loggable { static Runtime::LoaderPtr createRuntime(Instance& server, Server::Configuration::Initial& config); /** - * Helper for flushing counters and gauges to sinks. This takes care of calling beginFlush(), - * latching of counters and flushing, flushing of gauges, and calling endFlush(), on each sink. + * Helper for flushing counters, gauges and hisograms to sinks. This takes care of calling + * beginFlush(), latching of counters and flushing, flushing of gauges, and calling endFlush(), on + * each sink. * @param sinks supplies the list of sinks. * @param store supplies the store to flush. */ - static void flushCountersAndGaugesToSinks(const std::list& sinks, - Stats::Store& store); + static void flushMetricsToSinks(const std::list& sinks, Stats::Store& store); /** * Load a bootstrap config from either v1 or v2 and perform validation. @@ -208,5 +208,5 @@ class InstanceImpl : Logger::Loggable, public Instance { std::unique_ptr file_logger_; }; -} // Server +} // namespace Server } // namespace Envoy diff --git a/test/common/stats/thread_local_store_test.cc b/test/common/stats/thread_local_store_test.cc index 9b352c236d48..d5d0f1227d3a 100644 --- a/test/common/stats/thread_local_store_test.cc +++ b/test/common/stats/thread_local_store_test.cc @@ -80,6 +80,77 @@ class StatsThreadLocalStoreTest : public testing::Test, public RawStatDataAlloca store_->addSink(sink_); } + std::vector h1_cumulative_values, h2_cumulative_values, h1_interval_values, + h2_interval_values; + + int validateMerge() { + + std::shared_ptr> merge_called = std::make_shared>(false); + store_->mergeHistograms([merge_called]() -> void { *merge_called = true; }); + + EXPECT_TRUE(*merge_called); + + std::list histogram_list = store_->histograms(); + + histogram_t* hist1_cumulative = hist_alloc(); + for (uint64_t value : h1_cumulative_values) { + hist_insert_intscale(hist1_cumulative, value, 0, 1); + } + + histogram_t* hist2_cumulative = hist_alloc(); + for (uint64_t value : h2_cumulative_values) { + hist_insert_intscale(hist2_cumulative, value, 0, 1); + } + + histogram_t* hist1_interval = hist_alloc(); + for (uint64_t value : h1_interval_values) { + hist_insert_intscale(hist1_interval, value, 0, 1); + } + + histogram_t* hist2_interval = hist_alloc(); + for (uint64_t value : h2_interval_values) { + hist_insert_intscale(hist2_interval, value, 0, 1); + } + + HistogramStatisticsImpl h1_cumulative_statistics(hist1_cumulative); + HistogramStatisticsImpl h2_cumulative_statistics(hist2_cumulative); + HistogramStatisticsImpl h1_interval_statistics(hist1_interval); + HistogramStatisticsImpl h2_interval_statistics(hist2_interval); + + for (const Stats::HistogramSharedPtr& histogram : histogram_list) { + if (histogram->name().find("h1") != std::string::npos) { + EXPECT_EQ(histogram->cumulativeStatistics().summary(), h1_cumulative_statistics.summary()); + EXPECT_EQ(histogram->intervalStatistics().summary(), h1_interval_statistics.summary()); + } else { + EXPECT_EQ(histogram->cumulativeStatistics().summary(), h2_cumulative_statistics.summary()); + EXPECT_EQ(histogram->intervalStatistics().summary(), h2_interval_statistics.summary()); + } + } + + hist_free(hist1_cumulative); + hist_free(hist2_cumulative); + hist_free(hist1_interval); + hist_free(hist2_interval); + + h1_interval_values.clear(); + h2_interval_values.clear(); + + return histogram_list.size(); + } + + void expectCallAndAccumulate(Histogram& histogram, uint64_t record_value) { + EXPECT_CALL(sink_, onHistogramComplete(Ref(histogram), record_value)); + histogram.recordValue(record_value); + + if (histogram.name().find("h1") != std::string::npos) { + h1_cumulative_values.push_back(record_value); + h1_interval_values.push_back(record_value); + } else { + h2_cumulative_values.push_back(record_value); + h2_interval_values.push_back(record_value); + } + } + MOCK_METHOD1(alloc, RawStatData*(const std::string& name)); MOCK_METHOD1(free, void(RawStatData& data)); @@ -101,7 +172,7 @@ TEST_F(StatsThreadLocalStoreTest, NoTls) { EXPECT_EQ(&g1, &store_->gauge("g1")); Histogram& h1 = store_->histogram("h1"); - EXPECT_EQ(&h1, &store_->histogram("h1")); + // EXPECT_EQ(&h1, &store_->histogram("h1")); EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 200)); h1.recordValue(200); EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 100)); @@ -189,6 +260,152 @@ TEST_F(StatsThreadLocalStoreTest, BasicScope) { // Includes overflow stat. EXPECT_CALL(*this, free(_)).Times(5); } +TEST_F(StatsThreadLocalStoreTest, BasicSingleHistogramMerge) { + InSequence s; + store_->initializeThreading(main_thread_dispatcher_, tls_); + + Histogram& h1 = store_->histogram("h1"); + EXPECT_EQ("h1", h1.name()); + + expectCallAndAccumulate(h1, 0); + expectCallAndAccumulate(h1, 43); + expectCallAndAccumulate(h1, 41); + expectCallAndAccumulate(h1, 415); + expectCallAndAccumulate(h1, 2201); + expectCallAndAccumulate(h1, 3201); + expectCallAndAccumulate(h1, 125); + expectCallAndAccumulate(h1, 13); + + EXPECT_EQ(1, validateMerge()); + + store_->shutdownThreading(); + tls_.shutdownThread(); + + // Includes overflow stat. + EXPECT_CALL(*this, free(_)); +} + +TEST_F(StatsThreadLocalStoreTest, BasicMultiHistogramMerge) { + InSequence s; + store_->initializeThreading(main_thread_dispatcher_, tls_); + + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = store_->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("h2", h2.name()); + + expectCallAndAccumulate(h1, 1); + expectCallAndAccumulate(h2, 1); + expectCallAndAccumulate(h2, 2); + + EXPECT_EQ(2, validateMerge()); + + store_->shutdownThreading(); + tls_.shutdownThread(); + + // Includes overflow stat. + EXPECT_CALL(*this, free(_)); +} + +TEST_F(StatsThreadLocalStoreTest, MultiHistogramMultipleMerges) { + InSequence s; + store_->initializeThreading(main_thread_dispatcher_, tls_); + + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = store_->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("h2", h2.name()); + + // Insert one value in to one histogram and validate + expectCallAndAccumulate(h1, 1); + EXPECT_EQ(2, validateMerge()); + + // Insert value into second histogram and validate that it is merged properly. + expectCallAndAccumulate(h2, 1); + EXPECT_EQ(2, validateMerge()); + + // Insert more values into both the histograms and validate that it is merged properly. + expectCallAndAccumulate(h1, 2); + EXPECT_EQ(2, validateMerge()); + + expectCallAndAccumulate(h2, 3); + EXPECT_EQ(2, validateMerge()); + + expectCallAndAccumulate(h2, 2); + EXPECT_EQ(2, validateMerge()); + + // Do not insert any value and validate that intervalSummary is empty for both the histograms and + // cumulativeSummary has right values. + EXPECT_EQ(2, validateMerge()); + + store_->shutdownThreading(); + tls_.shutdownThread(); + + // Includes overflow stat. + EXPECT_CALL(*this, free(_)); +} + +TEST_F(StatsThreadLocalStoreTest, BasicScopeHistogramMerge) { + InSequence s; + store_->initializeThreading(main_thread_dispatcher_, tls_); + + ScopePtr scope1 = store_->createScope("scope1."); + + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = scope1->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("scope1.h2", h2.name()); + + expectCallAndAccumulate(h1, 2); + expectCallAndAccumulate(h2, 2); + EXPECT_EQ(2, validateMerge()); + + store_->shutdownThreading(); + + tls_.shutdownThread(); + + // Includes overflow stat. + EXPECT_CALL(*this, free(_)); +} + +TEST_F(StatsThreadLocalStoreTest, BasicHistogramUsed) { + InSequence s; + store_->initializeThreading(main_thread_dispatcher_, tls_); + + ScopePtr scope1 = store_->createScope("scope1."); + + Histogram& h1 = store_->histogram("h1"); + Histogram& h2 = scope1->histogram("h2"); + EXPECT_EQ("h1", h1.name()); + EXPECT_EQ("scope1.h2", h2.name()); + + EXPECT_CALL(sink_, onHistogramComplete(Ref(h1), 1)); + h1.recordValue(1); + + std::list histogram_list = store_->histograms(); + + for (const Stats::HistogramSharedPtr& histogram : histogram_list) { + if (histogram->name().find("h1") != std::string::npos) { + EXPECT_TRUE(histogram->used()); + } else { + EXPECT_FALSE(histogram->used()); + } + } + + EXPECT_CALL(sink_, onHistogramComplete(Ref(h2), 2)); + h2.recordValue(2); + + for (const Stats::HistogramSharedPtr& histogram : histogram_list) { + EXPECT_TRUE(histogram->used()); + } + + store_->shutdownThreading(); + + tls_.shutdownThread(); + + // Includes overflow stat. + EXPECT_CALL(*this, free(_)); +} TEST_F(StatsThreadLocalStoreTest, ScopeDelete) { InSequence s; diff --git a/test/common/thread_local/thread_local_impl_test.cc b/test/common/thread_local/thread_local_impl_test.cc index 8641bb6508b5..a425a340c652 100644 --- a/test/common/thread_local/thread_local_impl_test.cc +++ b/test/common/thread_local/thread_local_impl_test.cc @@ -82,6 +82,32 @@ TEST_F(ThreadLocalInstanceImplTest, All) { tls_.shutdownThread(); } +// Validate ThreadLocal::runOnAllThreadsWithBarrier's behavior. +TEST_F(ThreadLocalInstanceImplTest, RunOnAllThreadsWithBarrier) { + + SlotPtr tlsptr = tls_.allocateSlot(); + + EXPECT_CALL(thread_dispatcher_, post(_)); + EXPECT_CALL(main_dispatcher_, post(_)); + + // Ensure that the thread local call back and all_thread_complete call back are called. + std::shared_ptr> all_threads_complete = + std::make_shared>(false); + std::shared_ptr> thread_local_calls = + std::make_shared>(0); + + tlsptr->runOnAllThreadsWithBarrier([thread_local_calls]() -> void { ++*thread_local_calls; }, + [all_threads_complete, thread_local_calls]() -> void { + EXPECT_EQ(*thread_local_calls, 1); + *all_threads_complete = true; + }); + + EXPECT_TRUE(*all_threads_complete); + + tls_.shutdownGlobalThreading(); + tls_.shutdownThread(); +} + // Validate ThreadLocal::InstanceImpl's dispatcher() behavior. TEST(ThreadLocalInstanceImplDispatcherTest, Dispatcher) { InstanceImpl tls; diff --git a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc index b8adeb1700f6..1bb9398f333e 100644 --- a/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc +++ b/test/extensions/stats_sinks/metrics_service/grpc_metrics_service_impl_test.cc @@ -6,6 +6,7 @@ #include "test/mocks/thread_local/mocks.h" using namespace std::chrono_literals; + using testing::InSequence; using testing::Invoke; using testing::NiceMock; @@ -110,6 +111,10 @@ TEST(MetricsServiceSinkTest, CheckSendCall) { NiceMock gauge; gauge.name_ = "test_gauge"; sink.flushGauge(gauge, 1); + + NiceMock histogram; + histogram.name_ = "test_histogram"; + sink.flushHistogram(histogram); EXPECT_CALL(*streamer_, send(_)); sink.endFlush(); diff --git a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc index c81aea51577c..a812e53cbd44 100644 --- a/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc +++ b/test/extensions/stats_sinks/metrics_service/metrics_service_integration_test.cc @@ -68,6 +68,7 @@ class MetricsServiceIntegrationTest : public HttpIntegrationTest, request_msg.envoy_metrics(); bool known_counter_exists = false; bool known_gauge_exists = false; + bool known_histogram_exists = false; for (::io::prometheus::client::MetricFamily metrics_family : envoy_metrics) { if (metrics_family.name() == "cluster.cluster_0.membership_change" && metrics_family.type() == ::io::prometheus::client::MetricType::COUNTER) { @@ -79,13 +80,21 @@ class MetricsServiceIntegrationTest : public HttpIntegrationTest, known_gauge_exists = true; EXPECT_EQ(1, metrics_family.metric(0).gauge().value()); } + if (metrics_family.name() == "cluster.cluster_0.upstream_rq_time" && + metrics_family.type() == ::io::prometheus::client::MetricType::SUMMARY) { + known_histogram_exists = true; + Stats::HistogramStatisticsImpl empty_statistics; + EXPECT_EQ(metrics_family.metric(0).summary().quantile_size(), + empty_statistics.supportedQuantiles().size()); + } ASSERT(metrics_family.metric(0).has_timestamp_ms()); - if (known_counter_exists && known_gauge_exists) { + if (known_counter_exists && known_gauge_exists && known_histogram_exists) { break; } } EXPECT_TRUE(known_counter_exists); EXPECT_TRUE(known_gauge_exists); + EXPECT_TRUE(known_histogram_exists); } void cleanup() { @@ -102,18 +111,29 @@ class MetricsServiceIntegrationTest : public HttpIntegrationTest, INSTANTIATE_TEST_CASE_P(IpVersionsClientType, MetricsServiceIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); -// Test a basic full access logging flow. +// Test a basic metric service flow. TEST_P(MetricsServiceIntegrationTest, BasicFlow) { initialize(); + // Send an empty request so that histogram values merged for cluster_0. + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + Http::TestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/test/long/url"}, + {":scheme", "http"}, + {":authority", "host"}, + {"x-lyft-user-id", "123"}}; + sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0); + waitForMetricsServiceConnection(); waitForMetricsStream(); waitForMetricsRequest(); + // Send an empty response and end the stream. This should never happen but make sure nothing // breaks and we make a new stream on a follow up request. metrics_service_request_->startGrpcStream(); envoy::service::metrics::v2::StreamMetricsResponse response_msg; metrics_service_request_->sendGrpcMessage(response_msg); metrics_service_request_->finishGrpcStream(Grpc::Status::Ok); + switch (clientType()) { case Grpc::ClientType::EnvoyGrpc: test_server_->waitForGaugeEq("cluster.metrics_service.upstream_rq_active", 0); diff --git a/test/integration/server.h b/test/integration/server.h index 15591dcd213c..ed11b28bc05f 100644 --- a/test/integration/server.h +++ b/test/integration/server.h @@ -176,11 +176,17 @@ class TestIsolatedStoreImpl : public StoreRoot { return store_.gauges(); } + std::list histograms() const override { + std::unique_lock lock(lock_); + return store_.histograms(); + } + // Stats::StoreRoot void addSink(Sink&) override {} void setTagProducer(TagProducerPtr&&) override {} void initializeThreading(Event::Dispatcher&, ThreadLocal::Instance&) override {} void shutdownThreading() override {} + void mergeHistograms(PostMergeCb) override {} private: mutable std::mutex lock_; diff --git a/test/mocks/stats/mocks.cc b/test/mocks/stats/mocks.cc index cb107c441ce3..22d8a96dfa1f 100644 --- a/test/mocks/stats/mocks.cc +++ b/test/mocks/stats/mocks.cc @@ -5,6 +5,7 @@ using testing::Invoke; using testing::NiceMock; +using testing::Return; using testing::ReturnRef; using testing::_; @@ -33,6 +34,8 @@ MockHistogram::MockHistogram() { })); ON_CALL(*this, tagExtractedName()).WillByDefault(ReturnRef(name_)); ON_CALL(*this, tags()).WillByDefault(ReturnRef(tags_)); + ON_CALL(*this, intervalStatistics()).WillByDefault(ReturnRef(*histogram_stats_)); + ON_CALL(*this, cumulativeStatistics()).WillByDefault(ReturnRef(*histogram_stats_)); } MockHistogram::~MockHistogram() {} diff --git a/test/mocks/stats/mocks.h b/test/mocks/stats/mocks.h index d70478d5b6a3..d41747b4063e 100644 --- a/test/mocks/stats/mocks.h +++ b/test/mocks/stats/mocks.h @@ -65,13 +65,20 @@ class MockHistogram : public Histogram { // creates a deadlock in gmock and is an unintended use of mock functions. const std::string& name() const override { return name_; }; + void merge() override {} + MOCK_CONST_METHOD0(tagExtractedName, const std::string&()); MOCK_CONST_METHOD0(tags, const std::vector&()); MOCK_METHOD1(recordValue, void(uint64_t value)); + MOCK_CONST_METHOD0(used, bool()); + MOCK_CONST_METHOD0(cumulativeStatistics, const HistogramStatistics&()); + MOCK_CONST_METHOD0(intervalStatistics, const HistogramStatistics&()); std::string name_; std::vector tags_; Store* store_; + std::shared_ptr histogram_stats_ = + std::make_shared(); }; class MockSink : public Sink { @@ -82,6 +89,7 @@ class MockSink : public Sink { MOCK_METHOD0(beginFlush, void()); MOCK_METHOD2(flushCounter, void(const Counter& counter, uint64_t delta)); MOCK_METHOD2(flushGauge, void(const Gauge& gauge, uint64_t value)); + MOCK_METHOD1(flushHistogram, void(const Histogram& histogram)); MOCK_METHOD0(endFlush, void()); MOCK_METHOD2(onHistogramComplete, void(const Histogram& histogram, uint64_t value)); }; @@ -100,6 +108,7 @@ class MockStore : public Store { MOCK_METHOD1(gauge, Gauge&(const std::string&)); MOCK_CONST_METHOD0(gauges, std::list()); MOCK_METHOD1(histogram, Histogram&(const std::string& name)); + MOCK_CONST_METHOD0(histograms, std::list()); testing::NiceMock counter_; std::vector> histograms_; diff --git a/test/mocks/thread_local/mocks.h b/test/mocks/thread_local/mocks.h index fb9d3ab01331..22210a60ae60 100644 --- a/test/mocks/thread_local/mocks.h +++ b/test/mocks/thread_local/mocks.h @@ -28,6 +28,11 @@ class MockInstance : public Instance { SlotPtr allocateSlot_() { return SlotPtr{new SlotImpl(*this, current_slot_++)}; } void runOnAllThreads_(Event::PostCb cb) { cb(); } + void runOnAllThreadsWithBarrier(Event::PostCb cb, Event::PostCb main_callback) { + cb(); + main_callback(); + } + void shutdownThread_() { shutdown_ = true; // Reverse order which is same as the production code. @@ -53,6 +58,9 @@ class MockInstance : public Instance { // ThreadLocal::Slot ThreadLocalObjectSharedPtr get() override { return parent_.data_[index_]; } void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); } + void runOnAllThreadsWithBarrier(Event::PostCb cb, Event::PostCb main_callback) override { + parent_.runOnAllThreadsWithBarrier(cb, main_callback); + } void set(InitializeCb cb) override { parent_.data_[index_] = cb(parent_.dispatcher_); } MockInstance& parent_; diff --git a/test/server/server_test.cc b/test/server/server_test.cc index 80f62cb4c918..06b0afe019ec 100644 --- a/test/server/server_test.cc +++ b/test/server/server_test.cc @@ -36,7 +36,7 @@ TEST(ServerInstanceUtil, flushHelper) { std::list sinks; sinks.emplace_back(std::move(sink)); - InstanceUtil::flushCountersAndGaugesToSinks(sinks, store); + InstanceUtil::flushMetricsToSinks(sinks, store); } class RunHelperTest : public testing::Test {