diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 263cc8491046..0c7ff418977b 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -43,11 +43,21 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, Stats::Store& loadCluster(cluster, stats, dns_resolver, ssl_context_manager, runtime, random); } + Optional local_cluster_name; + if (config.hasObject("local_cluster_name")) { + local_cluster_name.value(config.getString("local_cluster_name")); + if (get(local_cluster_name.value()) == nullptr) { + throw EnvoyException( + fmt::format("local cluster '{}' must be defined", local_cluster_name.value())); + } + } + tls.set(thread_local_slot_, - [this, &stats, &runtime, &random, local_zone_name, local_address]( + [this, &stats, &runtime, &random, local_zone_name, local_address, local_cluster_name]( Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectPtr { return ThreadLocal::ThreadLocalObjectPtr{new ThreadLocalClusterManagerImpl( - *this, dispatcher, runtime, random, local_zone_name, local_address)}; + *this, dispatcher, runtime, random, local_zone_name, local_address, + local_cluster_name)}; }); // To avoid threading issues, for those clusters that start with hosts already in them (like @@ -209,12 +219,29 @@ Http::AsyncClient& ClusterManagerImpl::httpAsyncClientForCluster(const std::stri ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl( ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, Runtime::Loader& runtime, Runtime::RandomGenerator& random, const std::string& local_zone_name, - const std::string& local_address) + const std::string& local_address, const Optional& local_cluster_name) : parent_(parent), dispatcher_(dispatcher) { + // If local cluster is defined then we need to initialize it first. + if (local_cluster_name.valid()) { + auto& local_cluster = parent.primary_clusters_[local_cluster_name.value()]; + thread_local_clusters_[local_cluster_name.value()].reset( + new ClusterEntry(*this, *local_cluster, runtime, random, parent.stats_, dispatcher, + local_zone_name, local_address, nullptr)); + } + + const HostSet* local_host_set = + local_cluster_name.valid() ? &thread_local_clusters_[local_cluster_name.value()]->host_set_ + : nullptr; + for (auto& cluster : parent.primary_clusters_) { - thread_local_clusters_[cluster.first].reset(new ClusterEntry(*this, *cluster.second, runtime, - random, parent.stats_, dispatcher, - local_zone_name, local_address)); + // If local cluster name is set then we already initialized this cluster. + if (local_cluster_name.valid() && local_cluster_name.value() == cluster.first) { + continue; + } + + thread_local_clusters_[cluster.first].reset( + new ClusterEntry(*this, *cluster.second, runtime, random, parent.stats_, dispatcher, + local_zone_name, local_address, local_host_set)); } for (auto& cluster : thread_local_clusters_) { @@ -281,7 +308,8 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::shutdown() { ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry( ThreadLocalClusterManagerImpl& parent, const Cluster& cluster, Runtime::Loader& runtime, Runtime::RandomGenerator& random, Stats::Store& stats_store, Event::Dispatcher& dispatcher, - const std::string& local_zone_name, const std::string& local_address) + const std::string& local_zone_name, const std::string& local_address, + const HostSet* local_host_set) : parent_(parent), primary_cluster_(cluster), http_async_client_( cluster, stats_store, dispatcher, local_zone_name, parent.parent_, runtime, random, @@ -289,15 +317,17 @@ ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry( switch (cluster.lbType()) { case LoadBalancerType::LeastRequest: { - lb_.reset(new LeastRequestLoadBalancer(host_set_, cluster.stats(), runtime, random)); + lb_.reset( + new LeastRequestLoadBalancer(host_set_, local_host_set, cluster.stats(), runtime, random)); break; } case LoadBalancerType::Random: { - lb_.reset(new RandomLoadBalancer(host_set_, cluster.stats(), runtime, random)); + lb_.reset(new RandomLoadBalancer(host_set_, local_host_set, cluster.stats(), runtime, random)); break; } case LoadBalancerType::RoundRobin: { - lb_.reset(new RoundRobinLoadBalancer(host_set_, cluster.stats(), runtime, random)); + lb_.reset( + new RoundRobinLoadBalancer(host_set_, local_host_set, cluster.stats(), runtime, random)); break; } } diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 1f54f7aa89b9..9a472c38222f 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -74,7 +74,8 @@ class ClusterManagerImpl : public ClusterManager { ClusterEntry(ThreadLocalClusterManagerImpl& parent, const Cluster& cluster, Runtime::Loader& runtime, Runtime::RandomGenerator& random, Stats::Store& stats_store, Event::Dispatcher& dispatcher, - const std::string& local_zone_name, const std::string& local_address); + const std::string& local_zone_name, const std::string& local_address, + const HostSet* local_host_set); Http::ConnectionPool::Instance* connPool(ResourcePriority priority); @@ -90,7 +91,8 @@ class ClusterManagerImpl : public ClusterManager { ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, Runtime::Loader& runtime, Runtime::RandomGenerator& random, const std::string& local_zone_name, - const std::string& local_address); + const std::string& local_address, + const Optional& local_cluster_name); void drainConnPools(HostPtr old_host, ConnPoolsContainer& container); static void updateClusterMembership(const std::string& name, ConstHostVectorPtr hosts, ConstHostVectorPtr healthy_hosts, @@ -98,7 +100,7 @@ class ClusterManagerImpl : public ClusterManager { ConstHostVectorPtr local_zone_healthy_hosts, const std::vector& hosts_added, const std::vector& hosts_removed, - ThreadLocal::Instance& tls, uint32_t thead_local_slot); + ThreadLocal::Instance& tls, uint32_t thread_local_slot); // ThreadLocal::ThreadLocalObject void shutdown() override; diff --git a/source/common/upstream/load_balancer_impl.cc b/source/common/upstream/load_balancer_impl.cc index 8fed50577334..4eca6d431e71 100644 --- a/source/common/upstream/load_balancer_impl.cc +++ b/source/common/upstream/load_balancer_impl.cc @@ -79,10 +79,11 @@ ConstHostPtr RoundRobinLoadBalancer::chooseHost() { return hosts_to_use[rr_index_++ % hosts_to_use.size()]; } -LeastRequestLoadBalancer::LeastRequestLoadBalancer(const HostSet& host_set, ClusterStats& stats, - Runtime::Loader& runtime, +LeastRequestLoadBalancer::LeastRequestLoadBalancer(const HostSet& host_set, + const HostSet* local_host_set, + ClusterStats& stats, Runtime::Loader& runtime, Runtime::RandomGenerator& random) - : LoadBalancerBase(host_set, stats, runtime, random) { + : LoadBalancerBase(host_set, local_host_set, stats, runtime, random) { host_set.addMemberUpdateCb( [this](const std::vector&, const std::vector& hosts_removed) -> void { if (last_host_) { diff --git a/source/common/upstream/load_balancer_impl.h b/source/common/upstream/load_balancer_impl.h index 048e66bb843b..0e02286109ae 100644 --- a/source/common/upstream/load_balancer_impl.h +++ b/source/common/upstream/load_balancer_impl.h @@ -11,9 +11,10 @@ namespace Upstream { */ class LoadBalancerBase { protected: - LoadBalancerBase(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime, - Runtime::RandomGenerator& random) - : stats_(stats), runtime_(runtime), random_(random), host_set_(host_set) {} + LoadBalancerBase(const HostSet& host_set, const HostSet* local_host_set, ClusterStats& stats, + Runtime::Loader& runtime, Runtime::RandomGenerator& random) + : stats_(stats), runtime_(runtime), random_(random), host_set_(host_set), + local_host_set_(local_host_set) {} /** * Pick the host list to use (healthy or all depending on how many in the set are not healthy). @@ -26,6 +27,7 @@ class LoadBalancerBase { private: const HostSet& host_set_; + const HostSet* local_host_set_; }; /** @@ -33,9 +35,10 @@ class LoadBalancerBase { */ class RoundRobinLoadBalancer : public LoadBalancer, LoadBalancerBase { public: - RoundRobinLoadBalancer(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime, + RoundRobinLoadBalancer(const HostSet& host_set, const HostSet* local_host_set_, + ClusterStats& stats, Runtime::Loader& runtime, Runtime::RandomGenerator& random) - : LoadBalancerBase(host_set, stats, runtime, random) {} + : LoadBalancerBase(host_set, local_host_set_, stats, runtime, random) {} // Upstream::LoadBalancer ConstHostPtr chooseHost() override; @@ -59,7 +62,8 @@ class RoundRobinLoadBalancer : public LoadBalancer, LoadBalancerBase { */ class LeastRequestLoadBalancer : public LoadBalancer, LoadBalancerBase { public: - LeastRequestLoadBalancer(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime, + LeastRequestLoadBalancer(const HostSet& host_set, const HostSet* local_host_set_, + ClusterStats& stats, Runtime::Loader& runtime, Runtime::RandomGenerator& random); // Upstream::LoadBalancer @@ -75,9 +79,9 @@ class LeastRequestLoadBalancer : public LoadBalancer, LoadBalancerBase { */ class RandomLoadBalancer : public LoadBalancer, LoadBalancerBase { public: - RandomLoadBalancer(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime, - Runtime::RandomGenerator& random) - : LoadBalancerBase(host_set, stats, runtime, random) {} + RandomLoadBalancer(const HostSet& host_set, const HostSet* local_host_set, ClusterStats& stats, + Runtime::Loader& runtime, Runtime::RandomGenerator& random) + : LoadBalancerBase(host_set, local_host_set, stats, runtime, random) {} // Upstream::LoadBalancer ConstHostPtr chooseHost() override; diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index c8c009007bac..751072a9dcdc 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -80,6 +80,65 @@ TEST_F(ClusterManagerImplTest, UnknownClusterType) { EXPECT_THROW(create(loader), EnvoyException); } +TEST_F(ClusterManagerImplTest, LocalClusterNotDefined) { + std::string json = R"EOF( + { + "local_cluster_name": "new_cluster", + "clusters": [ + { + "name": "cluster_1", + "connect_timeout_ms": 250, + "type": "static", + "lb_type": "round_robin", + "hosts": [{"url": "tcp://127.0.0.1:11001"}] + }, + { + "name": "cluster_2", + "connect_timeout_ms": 250, + "type": "static", + "lb_type": "round_robin", + "hosts": [{"url": "tcp://127.0.0.1:11002"}] + }] + } + )EOF"; + + Json::StringLoader loader(json); + EXPECT_THROW(create(loader), EnvoyException); +} + +TEST_F(ClusterManagerImplTest, LocalClusterDefined) { + std::string json = R"EOF( + { + "local_cluster_name": "new_cluster", + "clusters": [ + { + "name": "cluster_1", + "connect_timeout_ms": 250, + "type": "static", + "lb_type": "round_robin", + "hosts": [{"url": "tcp://127.0.0.1:11001"}] + }, + { + "name": "cluster_2", + "connect_timeout_ms": 250, + "type": "static", + "lb_type": "round_robin", + "hosts": [{"url": "tcp://127.0.0.1:11002"}] + }, + { + "name": "new_cluster", + "connect_timeout_ms": 250, + "type": "static", + "lb_type": "round_robin", + "hosts": [{"url": "tcp://127.0.0.1:11002"}] + }] + } + )EOF"; + + Json::StringLoader loader(json); + create(loader); +} + TEST_F(ClusterManagerImplTest, DuplicateCluster) { std::string json = R"EOF( { diff --git a/test/common/upstream/load_balancer_impl_test.cc b/test/common/upstream/load_balancer_impl_test.cc index 596b0c8fcf8c..a6b564ef116d 100644 --- a/test/common/upstream/load_balancer_impl_test.cc +++ b/test/common/upstream/load_balancer_impl_test.cc @@ -23,7 +23,7 @@ class RoundRobinLoadBalancerTest : public testing::Test { NiceMock random_; Stats::IsolatedStoreImpl stats_store_; ClusterStats stats_; - RoundRobinLoadBalancer lb_{cluster_, stats_, runtime_, random_}; + RoundRobinLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_}; }; TEST_F(RoundRobinLoadBalancerTest, NoHosts) { EXPECT_EQ(nullptr, lb_.chooseHost()); } @@ -196,7 +196,7 @@ class LeastRequestLoadBalancerTest : public testing::Test { NiceMock random_; Stats::IsolatedStoreImpl stats_store_; ClusterStats stats_; - LeastRequestLoadBalancer lb_{cluster_, stats_, runtime_, random_}; + LeastRequestLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_}; }; TEST_F(LeastRequestLoadBalancerTest, NoHosts) { EXPECT_EQ(nullptr, lb_.chooseHost()); } @@ -354,7 +354,7 @@ class RandomLoadBalancerTest : public testing::Test { NiceMock random_; Stats::IsolatedStoreImpl stats_store_; ClusterStats stats_; - RandomLoadBalancer lb_{cluster_, stats_, runtime_, random_}; + RandomLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_}; }; TEST_F(RandomLoadBalancerTest, NoHosts) { EXPECT_EQ(nullptr, lb_.chooseHost()); } diff --git a/test/common/upstream/load_balancer_simulation_test.cc b/test/common/upstream/load_balancer_simulation_test.cc index d33300e28bb4..bca5896a3472 100644 --- a/test/common/upstream/load_balancer_simulation_test.cc +++ b/test/common/upstream/load_balancer_simulation_test.cc @@ -119,7 +119,7 @@ class DISABLED_SimulationTest : public testing::Test { Stats::IsolatedStoreImpl stats_store_; ClusterStats stats_; // TODO: make per originating host load balancer. - RandomLoadBalancer lb_{cluster_, stats_, runtime_, random_}; + RandomLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_}; }; TEST_F(DISABLED_SimulationTest, strictlyEqualDistribution) {