Skip to content

Commit

Permalink
local rate limit: add cross local cluster rate limit support (envoypr…
Browse files Browse the repository at this point in the history
…oxy#34276)

* local rate limit: add cross local cluster rate limit support

Signed-off-by: wbpcode <wbphub@live.com>

* change log

Signed-off-by: wbpcode <wbphub@live.com>

* fix typo

Signed-off-by: wbpcode <wbphub@live.com>

* add integration tests

Signed-off-by: wbpcode <wbphub@live.com>

* fix test

Signed-off-by: wbpcode <wbphub@live.com>

* main thread assert

Signed-off-by: wbpcode <wbphub@live.com>

* Update api/envoy/extensions/common/ratelimit/v3/ratelimit.proto

Co-authored-by: Adi (Suissa) Peleg <adip@google.com>
Signed-off-by: code <wangbaiping@corp.netease.com>

* Update api/envoy/extensions/filters/http/local_ratelimit/v3/local_rate_limit.proto

Co-authored-by: Adi (Suissa) Peleg <adip@google.com>
Signed-off-by: code <wangbaiping@corp.netease.com>

* address comments

Signed-off-by: wbpcode <wbphub@live.com>

* remove macro after envoyproxy#34766

Signed-off-by: wbpcode <wbphub@live.com>

* resolve confliction after merge main

Signed-off-by: wbpcode <wbphub@live.com>

---------

Signed-off-by: wbpcode <wbphub@live.com>
Signed-off-by: code <wangbaiping@corp.netease.com>
Co-authored-by: wbpcode <wbphub@live.com>
Co-authored-by: Adi (Suissa) Peleg <adip@google.com>
Signed-off-by: antoniovleonti <leonti@google.com>
  • Loading branch information
3 people authored and antoniovleonti committed Jun 26, 2024
1 parent ee2ecc4 commit 6a5d8dc
Show file tree
Hide file tree
Showing 14 changed files with 616 additions and 22 deletions.
12 changes: 12 additions & 0 deletions api/envoy/extensions/common/ratelimit/v3/ratelimit.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,15 @@ message LocalRateLimitDescriptor {
// Token Bucket algorithm for local ratelimiting.
type.v3.TokenBucket token_bucket = 2 [(validate.rules).message = {required: true}];
}

// Configuration used to enable local cluster level rate limiting where the token buckets
// will be shared across all the Envoy instances in the local cluster.
// A share will be calculated based on the membership of the local cluster dynamically
// and the configuration. When the limiter refilling the token bucket, the share will be
// applied. By default, the token bucket will be shared evenly.
//
// See :ref:`local cluster name
// <envoy_v3_api_field_config.bootstrap.v3.ClusterManager.local_cluster_name>` for more context
// about local cluster.
message LocalClusterRateLimit {
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// Local Rate limit :ref:`configuration overview <config_http_filters_local_rate_limit>`.
// [#extension: envoy.filters.http.local_ratelimit]

// [#next-free-field: 16]
// [#next-free-field: 17]
message LocalRateLimit {
// The human readable prefix to use when emitting stats.
string stat_prefix = 1 [(validate.rules).string = {min_len: 1}];
Expand Down Expand Up @@ -110,6 +110,23 @@ message LocalRateLimit {
// If unspecified, the default value is false.
bool local_rate_limit_per_downstream_connection = 11;

// Enables the local cluster level rate limiting, iff this is set explicitly. For example,
// given an Envoy gateway that contains N Envoy instances and a rate limit rule X tokens
// per second. If this is set, the total rate limit of whole gateway will always be X tokens
// per second regardless of how N changes. If this is not set, the total rate limit of whole
// gateway will be N * X tokens per second.
//
// .. note::
// This should never be set if the ``local_rate_limit_per_downstream_connection`` is set to
// true. Because if per connection rate limiting is enabled, we assume that the token buckets
// should never be shared across Envoy instances.
//
// .. note::
// This only works when the :ref:`local cluster name
// <envoy_v3_api_field_config.bootstrap.v3.ClusterManager.local_cluster_name>` is set and
// the related cluster is defined in the bootstrap configuration.
common.ratelimit.v3.LocalClusterRateLimit local_cluster_rate_limit = 16;

// Defines the standard version to use for X-RateLimit headers emitted by the filter.
//
// Disabled by default.
Expand Down
6 changes: 6 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,12 @@ new_features:
change: |
Added support to healthcheck with ProxyProtocol in TCP Healthcheck by setting
:ref:`health_check_config <envoy_v3_api_field_config.core.v3.HealthCheck.TcpHealthCheck.proxy_protocol_config>`.
- area: local_rate_limit
change: |
Added support for :ref:`local cluster rate limit
<envoy_v3_api_field_extensions.filters.http.local_ratelimit.v3.LocalRateLimit.local_cluster_rate_limit>`.
If set, the token buckets of the local rate limit will be shared across all the Envoy instances in the local
cluster.
- area: ext_authz
change: |
added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,78 @@ namespace Filters {
namespace Common {
namespace LocalRateLimit {

SINGLETON_MANAGER_REGISTRATION(local_ratelimit_share_provider_manager);

class DefaultEvenShareMonitor : public ShareProviderManager::ShareMonitor {
public:
uint32_t tokensPerFill(uint32_t origin_tokens_per_fill) const override {
ASSERT_IS_MAIN_OR_TEST_THREAD();
return std::ceil(origin_tokens_per_fill * share_);
}
void onLocalClusterUpdate(const Upstream::Cluster& cluster) override {
ASSERT_IS_MAIN_OR_TEST_THREAD();
const auto number = cluster.info()->endpointStats().membership_total_.value();
share_ = number == 0 ? 1.0 : 1.0 / number;
}

private:
double share_{1.0};
};

ShareProviderManager::ShareProviderManager(Event::Dispatcher& main_dispatcher,
const Upstream::Cluster& cluster)
: main_dispatcher_(main_dispatcher), cluster_(cluster) {
// It's safe to capture the local cluster reference here because the local cluster is
// guaranteed to be static cluster and should never be removed.
handle_ = cluster_.prioritySet().addMemberUpdateCb([this](const auto&, const auto&) {
share_monitor_->onLocalClusterUpdate(cluster_);
return absl::OkStatus();
});
share_monitor_ = std::make_shared<DefaultEvenShareMonitor>();
share_monitor_->onLocalClusterUpdate(cluster_);
}

ShareProviderManager::~ShareProviderManager() {
// Ensure the callback is unregistered on the main dispatcher thread.
main_dispatcher_.post([h = std::move(handle_)]() {});
}

ShareProviderSharedPtr
ShareProviderManager::getShareProvider(const ProtoLocalClusterRateLimit&) const {
// TODO(wbpcode): we may want to support custom share provider in the future based on the
// configuration.
return share_monitor_;
}

ShareProviderManagerSharedPtr ShareProviderManager::singleton(Event::Dispatcher& dispatcher,
Upstream::ClusterManager& cm,
Singleton::Manager& manager) {
return manager.getTyped<ShareProviderManager>(
SINGLETON_MANAGER_REGISTERED_NAME(local_ratelimit_share_provider_manager),
[&dispatcher, &cm]() -> Singleton::InstanceSharedPtr {
const auto& local_cluster_name = cm.localClusterName();
if (!local_cluster_name.has_value()) {
return nullptr;
}
auto cluster = cm.clusters().getCluster(local_cluster_name.value());
if (!cluster.has_value()) {
return nullptr;
}
return ShareProviderManagerSharedPtr{
new ShareProviderManager(dispatcher, cluster.value().get())};
});
}

LocalRateLimiterImpl::LocalRateLimiterImpl(
const std::chrono::milliseconds fill_interval, const uint32_t max_tokens,
const uint32_t tokens_per_fill, Event::Dispatcher& dispatcher,
const Protobuf::RepeatedPtrField<
envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors,
bool always_consume_default_token_bucket)
bool always_consume_default_token_bucket, ShareProviderSharedPtr shared_provider)
: fill_timer_(fill_interval > std::chrono::milliseconds(0)
? dispatcher.createTimer([this] { onFillTimer(); })
: nullptr),
time_source_(dispatcher.timeSource()),
time_source_(dispatcher.timeSource()), share_provider_(std::move(shared_provider)),
always_consume_default_token_bucket_(always_consume_default_token_bucket) {
if (fill_timer_ && fill_interval < std::chrono::milliseconds(50)) {
throw EnvoyException("local rate limit token bucket fill timer must be >= 50ms");
Expand Down Expand Up @@ -105,13 +167,20 @@ void LocalRateLimiterImpl::onFillTimer() {

void LocalRateLimiterImpl::onFillTimerHelper(TokenState& tokens,
const RateLimit::TokenBucket& bucket) {

uint32_t tokens_per_fill = bucket.tokens_per_fill_;
if (share_provider_ != nullptr) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
tokens_per_fill = share_provider_->tokensPerFill(tokens_per_fill);
}

// Relaxed consistency is used for all operations because we don't care about ordering, just the
// final atomic correctness.
uint32_t expected_tokens = tokens.tokens_.load(std::memory_order_relaxed);
uint32_t new_tokens_value;
do {
// expected_tokens is either initialized above or reloaded during the CAS failure below.
new_tokens_value = std::min(bucket.max_tokens_, expected_tokens + bucket.tokens_per_fill_);
new_tokens_value = std::min(bucket.max_tokens_, expected_tokens + tokens_per_fill);

// Testing hook.
synchronizer_.syncPoint("on_fill_timer_pre_cas");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "envoy/event/timer.h"
#include "envoy/extensions/common/ratelimit/v3/ratelimit.pb.h"
#include "envoy/ratelimit/ratelimit.h"
#include "envoy/singleton/instance.h"
#include "envoy/upstream/cluster_manager.h"

#include "source/common/common/thread_synchronizer.h"
#include "source/common/protobuf/protobuf.h"
Expand All @@ -16,14 +18,52 @@ namespace Filters {
namespace Common {
namespace LocalRateLimit {

using ProtoLocalClusterRateLimit = envoy::extensions::common::ratelimit::v3::LocalClusterRateLimit;

class ShareProvider {
public:
virtual ~ShareProvider() = default;
virtual uint32_t tokensPerFill(uint32_t origin_tokens_per_fill) const PURE;
};
using ShareProviderSharedPtr = std::shared_ptr<ShareProvider>;

class ShareProviderManager;
using ShareProviderManagerSharedPtr = std::shared_ptr<ShareProviderManager>;

class ShareProviderManager : public Singleton::Instance {
public:
ShareProviderSharedPtr getShareProvider(const ProtoLocalClusterRateLimit& config) const;
~ShareProviderManager() override;

static ShareProviderManagerSharedPtr singleton(Event::Dispatcher& dispatcher,
Upstream::ClusterManager& cm,
Singleton::Manager& manager);

class ShareMonitor : public ShareProvider {
public:
virtual void onLocalClusterUpdate(const Upstream::Cluster& cluster) PURE;
};
using ShareMonitorSharedPtr = std::shared_ptr<ShareMonitor>;

private:
ShareProviderManager(Event::Dispatcher& main_dispatcher, const Upstream::Cluster& cluster);

Event::Dispatcher& main_dispatcher_;
const Upstream::Cluster& cluster_;
Envoy::Common::CallbackHandlePtr handle_;
ShareMonitorSharedPtr share_monitor_;
};
using ShareProviderManagerSharedPtr = std::shared_ptr<ShareProviderManager>;

class LocalRateLimiterImpl {
public:
LocalRateLimiterImpl(
const std::chrono::milliseconds fill_interval, const uint32_t max_tokens,
const uint32_t tokens_per_fill, Event::Dispatcher& dispatcher,
const Protobuf::RepeatedPtrField<
envoy::extensions::common::ratelimit::v3::LocalRateLimitDescriptor>& descriptors,
bool always_consume_default_token_bucket = true);
bool always_consume_default_token_bucket = true,
ShareProviderSharedPtr shared_provider = nullptr);
~LocalRateLimiterImpl();

bool requestAllowed(absl::Span<const RateLimit::LocalDescriptor> request_descriptors) const;
Expand Down Expand Up @@ -84,6 +124,9 @@ class LocalRateLimiterImpl {
TokenState tokens_;
absl::flat_hash_set<LocalDescriptorImpl, LocalDescriptorHash, LocalDescriptorEqual> descriptors_;
std::vector<LocalDescriptorImpl> sorted_descriptors_;

ShareProviderSharedPtr share_provider_;

mutable Thread::ThreadSynchronizer synchronizer_; // Used for testing only.
const bool always_consume_default_token_bucket_{};

Expand Down
9 changes: 5 additions & 4 deletions source/extensions/filters/http/local_ratelimit/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ Http::FilterFactoryCb LocalRateLimitFilterConfig::createFilterFactoryFromProtoTy

FilterConfigSharedPtr filter_config = std::make_shared<FilterConfig>(
proto_config, server_context.localInfo(), server_context.mainThreadDispatcher(),
context.scope(), server_context.runtime());
server_context.clusterManager(), server_context.singletonManager(), context.scope(),
server_context.runtime());
return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<Filter>(filter_config));
};
Expand All @@ -29,9 +30,9 @@ Router::RouteSpecificFilterConfigConstSharedPtr
LocalRateLimitFilterConfig::createRouteSpecificFilterConfigTyped(
const envoy::extensions::filters::http::local_ratelimit::v3::LocalRateLimit& proto_config,
Server::Configuration::ServerFactoryContext& context, ProtobufMessage::ValidationVisitor&) {
return std::make_shared<const FilterConfig>(proto_config, context.localInfo(),
context.mainThreadDispatcher(), context.scope(),
context.runtime(), true);
return std::make_shared<const FilterConfig>(
proto_config, context.localInfo(), context.mainThreadDispatcher(), context.clusterManager(),
context.singletonManager(), context.scope(), context.runtime(), true);
}

/**
Expand Down
31 changes: 27 additions & 4 deletions source/extensions/filters/http/local_ratelimit/local_ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ const std::string& PerConnectionRateLimiter::key() {

FilterConfig::FilterConfig(
const envoy::extensions::filters::http::local_ratelimit::v3::LocalRateLimit& config,
const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, Stats::Scope& scope,
const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher,
Upstream::ClusterManager& cm, Singleton::Manager& singleton_manager, Stats::Scope& scope,
Runtime::Loader& runtime, const bool per_route)
: dispatcher_(dispatcher), status_(toErrorCode(config.status().code())),
stats_(generateStats(config.stat_prefix(), scope)),
Expand All @@ -37,9 +38,6 @@ FilterConfig::FilterConfig(
config.has_always_consume_default_token_bucket()
? config.always_consume_default_token_bucket().value()
: true),
rate_limiter_(new Filters::Common::LocalRateLimit::LocalRateLimiterImpl(
fill_interval_, max_tokens_, tokens_per_fill_, dispatcher, descriptors_,
always_consume_default_token_bucket_)),
local_info_(local_info), runtime_(runtime),
filter_enabled_(
config.has_filter_enabled()
Expand Down Expand Up @@ -74,6 +72,31 @@ FilterConfig::FilterConfig(
if (per_route && !config.has_token_bucket()) {
throw EnvoyException("local rate limit token bucket must be set for per filter configs");
}

Filters::Common::LocalRateLimit::ShareProviderSharedPtr share_provider;
if (config.has_local_cluster_rate_limit()) {
if (rate_limit_per_connection_) {
throw EnvoyException("local_cluster_rate_limit is set and "
"local_rate_limit_per_downstream_connection is set to true");
}
if (!cm.localClusterName().has_value()) {
throw EnvoyException("local_cluster_rate_limit is set but no local cluster name is present");
}

// If the local cluster name is set then the relevant cluster must exist or the cluster
// manager will fail to initialize.
share_provider_manager_ = Filters::Common::LocalRateLimit::ShareProviderManager::singleton(
dispatcher, cm, singleton_manager);
if (!share_provider_manager_) {
throw EnvoyException("local_cluster_rate_limit is set but no local cluster is present");
}

share_provider = share_provider_manager_->getShareProvider(config.local_cluster_rate_limit());
}

rate_limiter_ = std::make_unique<Filters::Common::LocalRateLimit::LocalRateLimiterImpl>(
fill_interval_, max_tokens_, tokens_per_fill_, dispatcher, descriptors_,
always_consume_default_token_bucket_, std::move(share_provider));
}

bool FilterConfig::requestAllowed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class FilterConfig : public Router::RouteSpecificFilterConfig {
public:
FilterConfig(const envoy::extensions::filters::http::local_ratelimit::v3::LocalRateLimit& config,
const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher,
Upstream::ClusterManager& cm, Singleton::Manager& singleton_manager,
Stats::Scope& scope, Runtime::Loader& runtime, bool per_route = false);
~FilterConfig() override {
// Ensure that the LocalRateLimiterImpl instance will be destroyed on the thread where its inner
Expand Down Expand Up @@ -139,6 +140,7 @@ class FilterConfig : public Router::RouteSpecificFilterConfig {
descriptors_;
const bool rate_limit_per_connection_;
const bool always_consume_default_token_bucket_{};
Filters::Common::LocalRateLimit::ShareProviderManagerSharedPtr share_provider_manager_;
std::unique_ptr<Filters::Common::LocalRateLimit::LocalRateLimiterImpl> rate_limiter_;
const LocalInfo::LocalInfo& local_info_;
Runtime::Loader& runtime_;
Expand Down
4 changes: 4 additions & 0 deletions test/extensions/filters/common/local_ratelimit/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ envoy_cc_test(
name = "local_ratelimit_test",
srcs = ["local_ratelimit_test.cc"],
deps = [
"//source/common/singleton:manager_impl_lib",
"//source/extensions/filters/common/local_ratelimit:local_ratelimit_lib",
"//test/mocks/event:event_mocks",
"//test/mocks/upstream:cluster_manager_mocks",
"//test/mocks/upstream:cluster_priority_set_mocks",
"//test/test_common:utility_lib",
],
)
Loading

0 comments on commit 6a5d8dc

Please sign in to comment.