diff --git a/api/envoy/extensions/common/ratelimit/v3/ratelimit.proto b/api/envoy/extensions/common/ratelimit/v3/ratelimit.proto index 9b5d9a7b91af..73d729adc269 100644 --- a/api/envoy/extensions/common/ratelimit/v3/ratelimit.proto +++ b/api/envoy/extensions/common/ratelimit/v3/ratelimit.proto @@ -130,3 +130,15 @@ message LocalRateLimitDescriptor { // Token Bucket algorithm for local ratelimiting. type.v3.TokenBucket token_bucket = 2 [(validate.rules).message = {required: true}]; } + +// Configuration used to enable local cluster level rate limiting where the token buckets +// will be shared across all the Envoy instances in the local cluster. +// A share will be calculated based on the membership of the local cluster dynamically +// and the configuration. When the limiter refilling the token bucket, the share will be +// applied. By default, the token bucket will be shared evenly. +// +// See :ref:`local cluster name +// ` for more context +// about local cluster. +message LocalClusterRateLimit { +} diff --git a/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto b/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto index c253d049731c..a32475f352f3 100644 --- a/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto +++ b/api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto @@ -22,7 +22,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; // Local Rate limit :ref:`configuration overview `. // [#extension: envoy.filters.http.local_ratelimit] -// [#next-free-field: 16] +// [#next-free-field: 17] message LocalRateLimit { // The human readable prefix to use when emitting stats. string stat_prefix = 1 [(validate.rules).string = {min_len: 1}]; @@ -110,6 +110,23 @@ message LocalRateLimit { // If unspecified, the default value is false. bool local_rate_limit_per_downstream_connection = 11; + // Enables the local cluster level rate limiting, iff this is set explicitly. For example, + // given an Envoy gateway that contains N Envoy instances and a rate limit rule X tokens + // per second. If this is set, the total rate limit of whole gateway will always be X tokens + // per second regardless of how N changes. If this is not set, the total rate limit of whole + // gateway will be N * X tokens per second. + // + // .. note:: + // This should never be set if the ``local_rate_limit_per_downstream_connection`` is set to + // true. Because if per connection rate limiting is enabled, we assume that the token buckets + // should never be shared across Envoy instances. + // + // .. note:: + // This only works when the :ref:`local cluster name + // ` is set and + // the related cluster is defined in the bootstrap configuration. + common.ratelimit.v3.LocalClusterRateLimit local_cluster_rate_limit = 16; + // Defines the standard version to use for X-RateLimit headers emitted by the filter. // // Disabled by default. diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 9cf87762debb..374a85bf222d 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -318,6 +318,12 @@ new_features: change: | Added support to healthcheck with ProxyProtocol in TCP Healthcheck by setting :ref:`health_check_config `. +- area: local_rate_limit + change: | + Added support for :ref:`local cluster rate limit + `. + If set, the token buckets of the local rate limit will be shared across all the Envoy instances in the local + cluster. - area: ext_authz change: | added diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc index 603a61eca0d5..fbda5622e33f 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.cc @@ -13,16 +13,78 @@ namespace Filters { namespace Common { namespace LocalRateLimit { +SINGLETON_MANAGER_REGISTRATION(local_ratelimit_share_provider_manager); + +class DefaultEvenShareMonitor : public ShareProviderManager::ShareMonitor { +public: + uint32_t tokensPerFill(uint32_t origin_tokens_per_fill) const override { + ASSERT_IS_MAIN_OR_TEST_THREAD(); + return std::ceil(origin_tokens_per_fill * share_); + } + void onLocalClusterUpdate(const Upstream::Cluster& cluster) override { + ASSERT_IS_MAIN_OR_TEST_THREAD(); + const auto number = cluster.info()->endpointStats().membership_total_.value(); + share_ = number == 0 ? 1.0 : 1.0 / number; + } + +private: + double share_{1.0}; +}; + +ShareProviderManager::ShareProviderManager(Event::Dispatcher& main_dispatcher, + const Upstream::Cluster& cluster) + : main_dispatcher_(main_dispatcher), cluster_(cluster) { + // It's safe to capture the local cluster reference here because the local cluster is + // guaranteed to be static cluster and should never be removed. + handle_ = cluster_.prioritySet().addMemberUpdateCb([this](const auto&, const auto&) { + share_monitor_->onLocalClusterUpdate(cluster_); + return absl::OkStatus(); + }); + share_monitor_ = std::make_shared(); + share_monitor_->onLocalClusterUpdate(cluster_); +} + +ShareProviderManager::~ShareProviderManager() { + // Ensure the callback is unregistered on the main dispatcher thread. + main_dispatcher_.post([h = std::move(handle_)]() {}); +} + +ShareProviderSharedPtr +ShareProviderManager::getShareProvider(const ProtoLocalClusterRateLimit&) const { + // TODO(wbpcode): we may want to support custom share provider in the future based on the + // configuration. + return share_monitor_; +} + +ShareProviderManagerSharedPtr ShareProviderManager::singleton(Event::Dispatcher& dispatcher, + Upstream::ClusterManager& cm, + Singleton::Manager& manager) { + return manager.getTyped( + SINGLETON_MANAGER_REGISTERED_NAME(local_ratelimit_share_provider_manager), + [&dispatcher, &cm]() -> Singleton::InstanceSharedPtr { + const auto& local_cluster_name = cm.localClusterName(); + if (!local_cluster_name.has_value()) { + return nullptr; + } + auto cluster = cm.clusters().getCluster(local_cluster_name.value()); + if (!cluster.has_value()) { + return nullptr; + } + return ShareProviderManagerSharedPtr{ + new ShareProviderManager(dispatcher, cluster.value().get())}; + }); +} + LocalRateLimiterImpl::LocalRateLimiterImpl( const std::chrono::milliseconds fill_interval, const uint32_t max_tokens, const uint32_t tokens_per_fill, Event::Dispatcher& dispatcher, const Protobuf::RepeatedPtrField< envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors, - bool always_consume_default_token_bucket) + bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider) : fill_timer_(fill_interval > std::chrono::milliseconds(0) ? dispatcher.createTimer([this] { onFillTimer(); }) : nullptr), - time_source_(dispatcher.timeSource()), + time_source_(dispatcher.timeSource()), share_provider_(std::move(shared_provider)), always_consume_default_token_bucket_(always_consume_default_token_bucket) { if (fill_timer_ && fill_interval < std::chrono::milliseconds(50)) { throw EnvoyException("local rate limit token bucket fill timer must be >= 50ms"); @@ -105,13 +167,20 @@ void LocalRateLimiterImpl::onFillTimer() { void LocalRateLimiterImpl::onFillTimerHelper(TokenState& tokens, const RateLimit::TokenBucket& bucket) { + + uint32_t tokens_per_fill = bucket.tokens_per_fill_; + if (share_provider_ != nullptr) { + ASSERT_IS_MAIN_OR_TEST_THREAD(); + tokens_per_fill = share_provider_->tokensPerFill(tokens_per_fill); + } + // Relaxed consistency is used for all operations because we don't care about ordering, just the // final atomic correctness. uint32_t expected_tokens = tokens.tokens_.load(std::memory_order_relaxed); uint32_t new_tokens_value; do { // expected_tokens is either initialized above or reloaded during the CAS failure below. - new_tokens_value = std::min(bucket.max_tokens_, expected_tokens + bucket.tokens_per_fill_); + new_tokens_value = std::min(bucket.max_tokens_, expected_tokens + tokens_per_fill); // Testing hook. synchronizer_.syncPoint("on_fill_timer_pre_cas"); diff --git a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h index c0cc182a492a..11997cc684f6 100644 --- a/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h +++ b/source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h @@ -6,6 +6,8 @@ #include "envoy/event/timer.h" #include "envoy/extensions/common/ratelimit/v3/ratelimit.pb.h" #include "envoy/ratelimit/ratelimit.h" +#include "envoy/singleton/instance.h" +#include "envoy/upstream/cluster_manager.h" #include "source/common/common/thread_synchronizer.h" #include "source/common/protobuf/protobuf.h" @@ -16,6 +18,43 @@ namespace Filters { namespace Common { namespace LocalRateLimit { +using ProtoLocalClusterRateLimit = envoy::extensions::common::ratelimit::v3::LocalClusterRateLimit; + +class ShareProvider { +public: + virtual ~ShareProvider() = default; + virtual uint32_t tokensPerFill(uint32_t origin_tokens_per_fill) const PURE; +}; +using ShareProviderSharedPtr = std::shared_ptr; + +class ShareProviderManager; +using ShareProviderManagerSharedPtr = std::shared_ptr; + +class ShareProviderManager : public Singleton::Instance { +public: + ShareProviderSharedPtr getShareProvider(const ProtoLocalClusterRateLimit& config) const; + ~ShareProviderManager() override; + + static ShareProviderManagerSharedPtr singleton(Event::Dispatcher& dispatcher, + Upstream::ClusterManager& cm, + Singleton::Manager& manager); + + class ShareMonitor : public ShareProvider { + public: + virtual void onLocalClusterUpdate(const Upstream::Cluster& cluster) PURE; + }; + using ShareMonitorSharedPtr = std::shared_ptr; + +private: + ShareProviderManager(Event::Dispatcher& main_dispatcher, const Upstream::Cluster& cluster); + + Event::Dispatcher& main_dispatcher_; + const Upstream::Cluster& cluster_; + Envoy::Common::CallbackHandlePtr handle_; + ShareMonitorSharedPtr share_monitor_; +}; +using ShareProviderManagerSharedPtr = std::shared_ptr; + class LocalRateLimiterImpl { public: LocalRateLimiterImpl( @@ -23,7 +62,8 @@ class LocalRateLimiterImpl { const uint32_t tokens_per_fill, Event::Dispatcher& dispatcher, const Protobuf::RepeatedPtrField< envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors, - bool always_consume_default_token_bucket = true); + bool always_consume_default_token_bucket = true, + ShareProviderSharedPtr shared_provider = nullptr); ~LocalRateLimiterImpl(); bool requestAllowed(absl::Span request_descriptors) const; @@ -84,6 +124,9 @@ class LocalRateLimiterImpl { TokenState tokens_; absl::flat_hash_set descriptors_; std::vector sorted_descriptors_; + + ShareProviderSharedPtr share_provider_; + mutable Thread::ThreadSynchronizer synchronizer_; // Used for testing only. const bool always_consume_default_token_bucket_{}; diff --git a/source/extensions/filters/http/local_ratelimit/config.cc b/source/extensions/filters/http/local_ratelimit/config.cc index cbf719cae489..e0990cf4598f 100644 --- a/source/extensions/filters/http/local_ratelimit/config.cc +++ b/source/extensions/filters/http/local_ratelimit/config.cc @@ -19,7 +19,8 @@ Http::FilterFactoryCb LocalRateLimitFilterConfig::createFilterFactoryFromProtoTy FilterConfigSharedPtr filter_config = std::make_shared( proto_config, server_context.localInfo(), server_context.mainThreadDispatcher(), - context.scope(), server_context.runtime()); + server_context.clusterManager(), server_context.singletonManager(), context.scope(), + server_context.runtime()); return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void { callbacks.addStreamFilter(std::make_shared(filter_config)); }; @@ -29,9 +30,9 @@ Router::RouteSpecificFilterConfigConstSharedPtr LocalRateLimitFilterConfig::createRouteSpecificFilterConfigTyped( const envoy::extensions::filters::http::local_ratelimit::v3::LocalRateLimit& proto_config, Server::Configuration::ServerFactoryContext& context, ProtobufMessage::ValidationVisitor&) { - return std::make_shared(proto_config, context.localInfo(), - context.mainThreadDispatcher(), context.scope(), - context.runtime(), true); + return std::make_shared( + proto_config, context.localInfo(), context.mainThreadDispatcher(), context.clusterManager(), + context.singletonManager(), context.scope(), context.runtime(), true); } /** diff --git a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc index 6cc9a1de9d7c..e92274029cec 100644 --- a/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc +++ b/source/extensions/filters/http/local_ratelimit/local_ratelimit.cc @@ -23,7 +23,8 @@ const std::string& PerConnectionRateLimiter::key() { FilterConfig::FilterConfig( const envoy::extensions::filters::http::local_ratelimit::v3::LocalRateLimit& config, - const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, Stats::Scope& scope, + const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, + Upstream::ClusterManager& cm, Singleton::Manager& singleton_manager, Stats::Scope& scope, Runtime::Loader& runtime, const bool per_route) : dispatcher_(dispatcher), status_(toErrorCode(config.status().code())), stats_(generateStats(config.stat_prefix(), scope)), @@ -37,9 +38,6 @@ FilterConfig::FilterConfig( config.has_always_consume_default_token_bucket() ? config.always_consume_default_token_bucket().value() : true), - rate_limiter_(new Filters::Common::LocalRateLimit::LocalRateLimiterImpl( - fill_interval_, max_tokens_, tokens_per_fill_, dispatcher, descriptors_, - always_consume_default_token_bucket_)), local_info_(local_info), runtime_(runtime), filter_enabled_( config.has_filter_enabled() @@ -74,6 +72,31 @@ FilterConfig::FilterConfig( if (per_route && !config.has_token_bucket()) { throw EnvoyException("local rate limit token bucket must be set for per filter configs"); } + + Filters::Common::LocalRateLimit::ShareProviderSharedPtr share_provider; + if (config.has_local_cluster_rate_limit()) { + if (rate_limit_per_connection_) { + throw EnvoyException("local_cluster_rate_limit is set and " + "local_rate_limit_per_downstream_connection is set to true"); + } + if (!cm.localClusterName().has_value()) { + throw EnvoyException("local_cluster_rate_limit is set but no local cluster name is present"); + } + + // If the local cluster name is set then the relevant cluster must exist or the cluster + // manager will fail to initialize. + share_provider_manager_ = Filters::Common::LocalRateLimit::ShareProviderManager::singleton( + dispatcher, cm, singleton_manager); + if (!share_provider_manager_) { + throw EnvoyException("local_cluster_rate_limit is set but no local cluster is present"); + } + + share_provider = share_provider_manager_->getShareProvider(config.local_cluster_rate_limit()); + } + + rate_limiter_ = std::make_unique( + fill_interval_, max_tokens_, tokens_per_fill_, dispatcher, descriptors_, + always_consume_default_token_bucket_, std::move(share_provider)); } bool FilterConfig::requestAllowed( diff --git a/source/extensions/filters/http/local_ratelimit/local_ratelimit.h b/source/extensions/filters/http/local_ratelimit/local_ratelimit.h index e816da64e37f..6bad1e57636b 100644 --- a/source/extensions/filters/http/local_ratelimit/local_ratelimit.h +++ b/source/extensions/filters/http/local_ratelimit/local_ratelimit.h @@ -73,6 +73,7 @@ class FilterConfig : public Router::RouteSpecificFilterConfig { public: FilterConfig(const envoy::extensions::filters::http::local_ratelimit::v3::LocalRateLimit& config, const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, + Upstream::ClusterManager& cm, Singleton::Manager& singleton_manager, Stats::Scope& scope, Runtime::Loader& runtime, bool per_route = false); ~FilterConfig() override { // Ensure that the LocalRateLimiterImpl instance will be destroyed on the thread where its inner @@ -139,6 +140,7 @@ class FilterConfig : public Router::RouteSpecificFilterConfig { descriptors_; const bool rate_limit_per_connection_; const bool always_consume_default_token_bucket_{}; + Filters::Common::LocalRateLimit::ShareProviderManagerSharedPtr share_provider_manager_; std::unique_ptr rate_limiter_; const LocalInfo::LocalInfo& local_info_; Runtime::Loader& runtime_; diff --git a/test/extensions/filters/common/local_ratelimit/BUILD b/test/extensions/filters/common/local_ratelimit/BUILD index 96bd5d38a495..e9f44173cb40 100644 --- a/test/extensions/filters/common/local_ratelimit/BUILD +++ b/test/extensions/filters/common/local_ratelimit/BUILD @@ -12,7 +12,11 @@ envoy_cc_test( name = "local_ratelimit_test", srcs = ["local_ratelimit_test.cc"], deps = [ + "//source/common/singleton:manager_impl_lib", "//source/extensions/filters/common/local_ratelimit:local_ratelimit_lib", "//test/mocks/event:event_mocks", + "//test/mocks/upstream:cluster_manager_mocks", + "//test/mocks/upstream:cluster_priority_set_mocks", + "//test/test_common:utility_lib", ], ) diff --git a/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc b/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc index 9c1ead992a0c..0971998fa62d 100644 --- a/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc +++ b/test/extensions/filters/common/local_ratelimit/local_ratelimit_test.cc @@ -1,6 +1,10 @@ +#include "source/common/singleton/manager_impl.h" #include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h" #include "test/mocks/event/mocks.h" +#include "test/mocks/upstream/cluster_manager.h" +#include "test/mocks/upstream/cluster_priority_set.h" +#include "test/test_common/thread_factory_for_test.h" #include "test/test_common/utility.h" #include "gmock/gmock.h" @@ -15,6 +19,86 @@ namespace Filters { namespace Common { namespace LocalRateLimit { +TEST(ShareProviderManagerTest, ShareProviderManagerTest) { + NiceMock cm; + NiceMock dispatcher; + Singleton::ManagerImpl manager; + + NiceMock priority_set; + cm.local_cluster_name_ = "local_cluster"; + cm.initializeClusters({"local_cluster"}, {}); + + const auto* mock_local_cluster = cm.active_clusters_.at("local_cluster").get(); + + EXPECT_CALL(*mock_local_cluster, prioritySet()).WillOnce(ReturnRef(priority_set)); + EXPECT_CALL(priority_set, addMemberUpdateCb(_)); + + // Set the membership total to 2. + mock_local_cluster->info_->endpoint_stats_.membership_total_.set(2); + + ShareProviderManagerSharedPtr share_provider_manager = + ShareProviderManager::singleton(dispatcher, cm, manager); + EXPECT_NE(share_provider_manager, nullptr); + + auto provider = share_provider_manager->getShareProvider(ProtoLocalClusterRateLimit()); + EXPECT_NE(provider, nullptr); + + EXPECT_EQ(1, provider->tokensPerFill(1)); // At least 1 token per fill. + EXPECT_EQ(1, provider->tokensPerFill(2)); + EXPECT_EQ(2, provider->tokensPerFill(4)); + EXPECT_EQ(4, provider->tokensPerFill(8)); + + // Set the membership total to 4. + mock_local_cluster->info_->endpoint_stats_.membership_total_.set(4); + priority_set.runUpdateCallbacks(0, {}, {}); + + EXPECT_EQ(1, provider->tokensPerFill(1)); // At least 1 token per fill. + EXPECT_EQ(1, provider->tokensPerFill(4)); + EXPECT_EQ(2, provider->tokensPerFill(8)); + EXPECT_EQ(4, provider->tokensPerFill(16)); + + // Set the membership total to 0. + mock_local_cluster->info_->endpoint_stats_.membership_total_.set(0); + priority_set.runUpdateCallbacks(0, {}, {}); + + EXPECT_EQ(1, provider->tokensPerFill(1)); // At least 1 token per fill. + EXPECT_EQ(2, provider->tokensPerFill(2)); + EXPECT_EQ(4, provider->tokensPerFill(4)); + EXPECT_EQ(8, provider->tokensPerFill(8)); + + // Set the membership total to 1. + mock_local_cluster->info_->endpoint_stats_.membership_total_.set(1); + priority_set.runUpdateCallbacks(0, {}, {}); + + EXPECT_EQ(1, provider->tokensPerFill(1)); // At least 1 token per fill. + EXPECT_EQ(2, provider->tokensPerFill(2)); + EXPECT_EQ(4, provider->tokensPerFill(4)); + EXPECT_EQ(8, provider->tokensPerFill(8)); + + // Destroy the share provider manager. + // This is used to ensure the share provider is still safe to use even + // the share provider manager is destroyed. But note this should never + // happen in real production because the share provider manager should + // have longer life cycle than the limiter. + share_provider_manager.reset(); + + // Set the membership total to 4 again. + mock_local_cluster->info_->endpoint_stats_.membership_total_.set(4); + priority_set.runUpdateCallbacks(0, {}, {}); + + // The provider should still work but the value should not change. + EXPECT_EQ(1, provider->tokensPerFill(1)); // At least 1 token per fill. + EXPECT_EQ(2, provider->tokensPerFill(2)); + EXPECT_EQ(4, provider->tokensPerFill(4)); + EXPECT_EQ(8, provider->tokensPerFill(8)); +} + +class MockShareProvider : public ShareProvider { +public: + MockShareProvider() = default; + MOCK_METHOD(uint32_t, tokensPerFill, (uint32_t origin_tokens_per_fill), (const)); +}; + class LocalRateLimiterImplTest : public testing::Test { public: void initializeTimer() { @@ -24,12 +108,13 @@ class LocalRateLimiterImplTest : public testing::Test { } void initialize(const std::chrono::milliseconds fill_interval, const uint32_t max_tokens, - const uint32_t tokens_per_fill) { + const uint32_t tokens_per_fill, ShareProviderSharedPtr share_provider = nullptr) { initializeTimer(); - rate_limiter_ = std::make_shared( - fill_interval, max_tokens, tokens_per_fill, dispatcher_, descriptors_); + rate_limiter_ = + std::make_shared(fill_interval, max_tokens, tokens_per_fill, + dispatcher_, descriptors_, true, share_provider); } Thread::ThreadSynchronizer& synchronizer() { return rate_limiter_->synchronizer_; } @@ -154,6 +239,40 @@ TEST_F(LocalRateLimiterImplTest, TokenBucketMultipleTokensPerFill) { EXPECT_FALSE(rate_limiter_->requestAllowed(route_descriptors_)); } +// Verify token bucket functionality with max tokens and tokens per fill > 1 and +// share provider is used. +TEST_F(LocalRateLimiterImplTest, TokenBucketMultipleTokensPerFillWithShareProvider) { + auto share_provider = std::make_shared(); + EXPECT_CALL(*share_provider, tokensPerFill(_)) + .WillRepeatedly(testing::Invoke([](uint32_t tokens) { return tokens / 2; })); + + // Final tokens per fill is 2/2 = 1. + initialize(std::chrono::milliseconds(200), 2, 2, share_provider); + + // The limiter will be initialized with max tokens and it will not be shared. + // So, the initial tokens is 2. + // 2 -> 0 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(route_descriptors_)); + EXPECT_TRUE(rate_limiter_->requestAllowed(route_descriptors_)); + EXPECT_FALSE(rate_limiter_->requestAllowed(route_descriptors_)); + + // The tokens per fill will be handled by the share provider and it will be 1. + // 0 -> 1 tokens + EXPECT_CALL(*fill_timer_, enableTimer(std::chrono::milliseconds(200), nullptr)); + fill_timer_->invokeCallback(); + + // 1 -> 0 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(route_descriptors_)); + + // 0 -> 1 tokens + EXPECT_CALL(*fill_timer_, enableTimer(std::chrono::milliseconds(200), nullptr)); + fill_timer_->invokeCallback(); + + // 1 -> 0 tokens + EXPECT_TRUE(rate_limiter_->requestAllowed(route_descriptors_)); + EXPECT_FALSE(rate_limiter_->requestAllowed(route_descriptors_)); +} + // Verify token bucket functionality with max tokens > tokens per fill. TEST_F(LocalRateLimiterImplTest, TokenBucketMaxTokensGreaterThanTokensPerFill) { initialize(std::chrono::milliseconds(200), 2, 1); diff --git a/test/extensions/filters/http/local_ratelimit/BUILD b/test/extensions/filters/http/local_ratelimit/BUILD index 5d997f8345e2..ba14d61d122d 100644 --- a/test/extensions/filters/http/local_ratelimit/BUILD +++ b/test/extensions/filters/http/local_ratelimit/BUILD @@ -16,10 +16,13 @@ envoy_extension_cc_test( srcs = ["filter_test.cc"], extension_names = ["envoy.filters.http.local_ratelimit"], deps = [ + "//source/common/singleton:manager_impl_lib", "//source/extensions/filters/http/local_ratelimit:local_ratelimit_lib", "//test/common/stream_info:test_util", "//test/mocks/http:http_mocks", "//test/mocks/local_info:local_info_mocks", + "//test/mocks/upstream:cluster_manager_mocks", + "//test/test_common:utility_lib", "@envoy_api//envoy/extensions/filters/http/local_ratelimit/v3:pkg_cc_proto", ], ) diff --git a/test/extensions/filters/http/local_ratelimit/config_test.cc b/test/extensions/filters/http/local_ratelimit/config_test.cc index 37c3a991e256..885418422f99 100644 --- a/test/extensions/filters/http/local_ratelimit/config_test.cc +++ b/test/extensions/filters/http/local_ratelimit/config_test.cc @@ -2,6 +2,7 @@ #include "source/extensions/filters/http/local_ratelimit/local_ratelimit.h" #include "test/mocks/server/mocks.h" +#include "test/mocks/upstream/priority_set.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -306,12 +307,148 @@ stat_prefix: test NiceMock context; - EXPECT_CALL(context.dispatcher_, createTimer_(_)); EXPECT_THROW(factory.createRouteSpecificFilterConfig(*proto_config, context, ProtobufMessage::getNullValidationVisitor()), EnvoyException); } +TEST(Factory, LocalClusterRateLimitAndLocalRateLimitPerDownstreamConnection) { + const std::string config_yaml = R"( +stat_prefix: test +token_bucket: + max_tokens: 1 + tokens_per_fill: 1 + fill_interval: 1000s +filter_enabled: + runtime_key: test_enabled + default_value: + numerator: 100 + denominator: HUNDRED +filter_enforced: + runtime_key: test_enforced + default_value: + numerator: 100 + denominator: HUNDRED +local_cluster_rate_limit: {} +local_rate_limit_per_downstream_connection: true +)"; + + LocalRateLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + + EXPECT_THROW_WITH_MESSAGE( + factory.createRouteSpecificFilterConfig(*proto_config, context, + ProtobufMessage::getNullValidationVisitor()), + EnvoyException, + "local_cluster_rate_limit is set and local_rate_limit_per_downstream_connection is set to " + "true"); +} + +TEST(Factory, LocalClusterRateLimitAndWithoutLocalClusterName) { + const std::string config_yaml = R"( +stat_prefix: test +token_bucket: + max_tokens: 1 + tokens_per_fill: 1 + fill_interval: 1000s +filter_enabled: + runtime_key: test_enabled + default_value: + numerator: 100 + denominator: HUNDRED +filter_enforced: + runtime_key: test_enforced + default_value: + numerator: 100 + denominator: HUNDRED +local_cluster_rate_limit: {} +)"; + + LocalRateLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + + EXPECT_THROW_WITH_MESSAGE( + factory.createRouteSpecificFilterConfig(*proto_config, context, + ProtobufMessage::getNullValidationVisitor()), + EnvoyException, "local_cluster_rate_limit is set but no local cluster name is present"); +} + +TEST(Factory, LocalClusterRateLimitAndWithoutLocalCluster) { + const std::string config_yaml = R"( +stat_prefix: test +token_bucket: + max_tokens: 1 + tokens_per_fill: 1 + fill_interval: 1000s +filter_enabled: + runtime_key: test_enabled + default_value: + numerator: 100 + denominator: HUNDRED +filter_enforced: + runtime_key: test_enforced + default_value: + numerator: 100 + denominator: HUNDRED +local_cluster_rate_limit: {} +)"; + + LocalRateLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + context.cluster_manager_.local_cluster_name_ = "local_cluster"; + + EXPECT_THROW_WITH_MESSAGE( + factory.createRouteSpecificFilterConfig(*proto_config, context, + ProtobufMessage::getNullValidationVisitor()), + EnvoyException, "local_cluster_rate_limit is set but no local cluster is present"); +} + +TEST(Factory, LocalClusterRateLimit) { + const std::string config_yaml = R"( +stat_prefix: test +token_bucket: + max_tokens: 1 + tokens_per_fill: 1 + fill_interval: 1000s +filter_enabled: + runtime_key: test_enabled + default_value: + numerator: 100 + denominator: HUNDRED +filter_enforced: + runtime_key: test_enforced + default_value: + numerator: 100 + denominator: HUNDRED +local_cluster_rate_limit: {} +)"; + + LocalRateLimitFilterConfig factory; + ProtobufTypes::MessagePtr proto_config = factory.createEmptyRouteConfigProto(); + TestUtility::loadFromYaml(config_yaml, *proto_config); + + NiceMock context; + context.cluster_manager_.local_cluster_name_ = "local_cluster"; + context.cluster_manager_.initializeClusters({"local_cluster"}, {}); + + NiceMock priority_set; + const auto* local_cluster = context.cluster_manager_.active_clusters_.at("local_cluster").get(); + EXPECT_CALL(*local_cluster, prioritySet()).WillOnce(ReturnRef(priority_set)); + + EXPECT_CALL(context.dispatcher_, createTimer_(_)); + EXPECT_NO_THROW(factory.createRouteSpecificFilterConfig( + *proto_config, context, ProtobufMessage::getNullValidationVisitor())); +} + } // namespace LocalRateLimitFilter } // namespace HttpFilters } // namespace Extensions diff --git a/test/extensions/filters/http/local_ratelimit/filter_test.cc b/test/extensions/filters/http/local_ratelimit/filter_test.cc index f5b7e121a121..6994dc46d034 100644 --- a/test/extensions/filters/http/local_ratelimit/filter_test.cc +++ b/test/extensions/filters/http/local_ratelimit/filter_test.cc @@ -1,9 +1,12 @@ #include "envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.pb.h" +#include "source/common/singleton/manager_impl.h" #include "source/extensions/filters/http/local_ratelimit/local_ratelimit.h" #include "test/mocks/http/mocks.h" #include "test/mocks/local_info/mocks.h" +#include "test/mocks/upstream/cluster_manager.h" +#include "test/test_common/thread_factory_for_test.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -75,8 +78,9 @@ class FilterTest : public testing::Test { envoy::extensions::filters::http::local_ratelimit::v3::LocalRateLimit config; TestUtility::loadFromYaml(yaml, config); - config_ = std::make_shared(config, local_info_, dispatcher_, *stats_.rootScope(), - runtime_, per_route); + config_ = + std::make_shared(config, local_info_, dispatcher_, cm_, singleton_manager_, + *stats_.rootScope(), runtime_, per_route); filter_ = std::make_shared(config_); filter_->setDecoderFilterCallbacks(decoder_callbacks_); @@ -100,6 +104,9 @@ class FilterTest : public testing::Test { NiceMock dispatcher_; NiceMock runtime_; NiceMock local_info_; + NiceMock cm_; + Singleton::ManagerImpl singleton_manager_; + std::shared_ptr config_; std::shared_ptr filter_; std::shared_ptr filter_2_; diff --git a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc index 7c9834d9d3ae..9afa92925b70 100644 --- a/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc +++ b/test/extensions/filters/http/local_ratelimit/local_ratelimit_integration_test.cc @@ -1,3 +1,5 @@ +#include "source/extensions/filters/common/local_ratelimit/local_ratelimit_impl.h" + #include "test/integration/http_protocol_integration.h" #include "gtest/gtest.h" @@ -18,8 +20,8 @@ class LocalRateLimitFilterIntegrationTest : public Event::TestUsingSimulatedTime const std::string& initial_route_config) { // Set this flag to true to create fake upstream for xds_cluster. create_xds_upstream_ = true; - // Create static clusters. - createClusters(); + // Create static XDS cluster. + createXdsCluster(); config_helper_.prependFilter(filter_config); @@ -60,7 +62,38 @@ class LocalRateLimitFilterIntegrationTest : public Event::TestUsingSimulatedTime registerTestServerPorts({"http"}); } - void createClusters() { + void initializeFilterWithLocalCluster(const std::string& filter_config, + const std::string& initial_local_cluster_endpoints) { + config_helper_.prependFilter(filter_config); + + // Set this flag to true to create fake upstream for xds_cluster. + create_xds_upstream_ = true; + // Create static XDS cluster. + createXdsCluster(); + + // Create local cluster. + createLocalCluster(); + + on_server_init_function_ = [&]() { + AssertionResult result = + fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + + EXPECT_TRUE(compareSotwDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "", + {"local_cluster"}, true)); + sendSotwDiscoveryResponse( + Config::TypeUrl::get().ClusterLoadAssignment, + {TestUtility::parseYaml( + initial_local_cluster_endpoints)}, + "1"); + }; + initialize(); + } + + void createXdsCluster() { config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { auto* xds_cluster = bootstrap.mutable_static_resources()->add_clusters(); xds_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); @@ -69,6 +102,34 @@ class LocalRateLimitFilterIntegrationTest : public Event::TestUsingSimulatedTime }); } + void createLocalCluster() { + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + // Set local cluster name to "local_cluster". + bootstrap.mutable_cluster_manager()->set_local_cluster_name("local_cluster"); + + // Create local cluster. + auto* local_cluster = bootstrap.mutable_static_resources()->add_clusters(); + local_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + local_cluster->set_name("local_cluster"); + local_cluster->clear_load_assignment(); + + // This should be EDS cluster to load endpoints dynamically. + local_cluster->set_type(::envoy::config::cluster::v3::Cluster::EDS); + local_cluster->mutable_eds_cluster_config()->set_service_name("local_cluster"); + local_cluster->mutable_eds_cluster_config()->mutable_eds_config()->set_resource_api_version( + envoy::config::core::v3::ApiVersion::V3); + envoy::config::core::v3::ApiConfigSource* eds_api_config_source = + local_cluster->mutable_eds_cluster_config() + ->mutable_eds_config() + ->mutable_api_config_source(); + eds_api_config_source->set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC); + eds_api_config_source->set_transport_api_version(envoy::config::core::v3::V3); + envoy::config::core::v3::GrpcService* grpc_service = + eds_api_config_source->add_grpc_services(); + grpc_service->mutable_envoy_grpc()->set_cluster_name("xds_cluster"); + }); + } + void cleanUpXdsConnection() { if (xds_connection_ != nullptr) { AssertionResult result = xds_connection_->close(); @@ -107,6 +168,61 @@ name: envoy.filters.http.local_ratelimit local_rate_limit_per_downstream_connection: {} )EOF"; + const std::string filter_config_with_local_cluster_rate_limit_ = + R"EOF( +name: envoy.filters.http.local_ratelimit +typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.local_ratelimit.v3.LocalRateLimit + stat_prefix: http_local_rate_limiter + token_bucket: + max_tokens: 1 + tokens_per_fill: 1 + fill_interval: 1000s + filter_enabled: + runtime_key: local_rate_limit_enabled + default_value: + numerator: 100 + denominator: HUNDRED + filter_enforced: + runtime_key: local_rate_limit_enforced + default_value: + numerator: 100 + denominator: HUNDRED + response_headers_to_add: + - append_action: OVERWRITE_IF_EXISTS_OR_ADD + header: + key: x-local-rate-limit + value: 'true' + local_cluster_rate_limit: {} +)EOF"; + + const std::string initial_local_cluster_endpoints_ = R"EOF( +cluster_name: local_cluster +endpoints: +- lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 80 +)EOF"; + + const std::string update_local_cluster_endpoints_ = R"EOF( +cluster_name: local_cluster +endpoints: +- lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 80 + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 81 +)EOF"; + const std::string initial_route_config_ = R"EOF( name: basic_routes virtual_hosts: @@ -315,5 +431,40 @@ TEST_P(LocalRateLimitFilterIntegrationTest, BasicTestPerRouteAndRds) { cleanUpXdsConnection(); } +TEST_P(LocalRateLimitFilterIntegrationTest, TestLocalClusterRateLimit) { + initializeFilterWithLocalCluster(filter_config_with_local_cluster_rate_limit_, + initial_local_cluster_endpoints_); + + auto share_provider_manager = + test_server_->server() + .singletonManager() + .getTyped( + "local_ratelimit_share_provider_manager_singleton"); + ASSERT(share_provider_manager != nullptr); + auto share_provider = share_provider_manager->getShareProvider({}); + + test_server_->waitForGaugeEq("cluster.local_cluster.membership_total", 1); + simTime().advanceTimeWait(std::chrono::milliseconds(1)); + + EXPECT_EQ(1, share_provider->tokensPerFill(1)); + EXPECT_EQ(2, share_provider->tokensPerFill(2)); + EXPECT_EQ(4, share_provider->tokensPerFill(4)); + + sendSotwDiscoveryResponse( + Config::TypeUrl::get().ClusterLoadAssignment, + {TestUtility::parseYaml( + update_local_cluster_endpoints_)}, + "2"); + + test_server_->waitForGaugeEq("cluster.local_cluster.membership_total", 2); + simTime().advanceTimeWait(std::chrono::milliseconds(1)); + + EXPECT_EQ(1, share_provider->tokensPerFill(1)); + EXPECT_EQ(1, share_provider->tokensPerFill(2)); + EXPECT_EQ(2, share_provider->tokensPerFill(4)); + + cleanUpXdsConnection(); +} + } // namespace } // namespace Envoy