Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make cluster manager know about local cluster (optional) #149

Merged
merged 3 commits into from
Oct 14, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,21 @@ ClusterManagerImpl::ClusterManagerImpl(const Json::Object& config, Stats::Store&
loadCluster(cluster, stats, dns_resolver, ssl_context_manager, runtime, random);
}

Optional<std::string> local_cluster_name;
if (config.hasObject("local_cluster_name")) {
local_cluster_name.value(config.getString("local_cluster_name"));
if (get(local_cluster_name.value()) == nullptr) {
throw EnvoyException(
fmt::format("local cluster '{}' must be defined", local_cluster_name.value()));
}
}

tls.set(thread_local_slot_,
[this, &stats, &runtime, &random, local_zone_name, local_address](
[this, &stats, &runtime, &random, local_zone_name, local_address, local_cluster_name](
Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectPtr {
return ThreadLocal::ThreadLocalObjectPtr{new ThreadLocalClusterManagerImpl(
*this, dispatcher, runtime, random, local_zone_name, local_address)};
*this, dispatcher, runtime, random, local_zone_name, local_address,
local_cluster_name)};
});

// To avoid threading issues, for those clusters that start with hosts already in them (like
Expand Down Expand Up @@ -209,12 +219,29 @@ Http::AsyncClient& ClusterManagerImpl::httpAsyncClientForCluster(const std::stri
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl(
ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
Runtime::RandomGenerator& random, const std::string& local_zone_name,
const std::string& local_address)
const std::string& local_address, const Optional<std::string>& local_cluster_name)
: parent_(parent), dispatcher_(dispatcher) {
// If local cluster is defined then we need to initialize it first.
if (local_cluster_name.valid()) {
auto& local_cluster = parent.primary_clusters_[local_cluster_name.value()];
thread_local_clusters_[local_cluster_name.value()].reset(
new ClusterEntry(*this, *local_cluster, runtime, random, parent.stats_, dispatcher,
local_zone_name, local_address, nullptr));
}

const ClusterEntry* local_cluster = local_cluster_name.valid()
? thread_local_clusters_[local_cluster_name.value()].get()
: nullptr;

for (auto& cluster : parent.primary_clusters_) {
thread_local_clusters_[cluster.first].reset(new ClusterEntry(*this, *cluster.second, runtime,
random, parent.stats_, dispatcher,
local_zone_name, local_address));
// If local cluster name is set then we already initialized this cluster.
if (local_cluster_name.valid() && local_cluster_name == cluster.first) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: local_cluster_name.value()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

continue;
}

thread_local_clusters_[cluster.first].reset(
new ClusterEntry(*this, *cluster.second, runtime, random, parent.stats_, dispatcher,
local_zone_name, local_address, local_cluster));
}

for (auto& cluster : thread_local_clusters_) {
Expand Down Expand Up @@ -281,23 +308,31 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::shutdown() {
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
ThreadLocalClusterManagerImpl& parent, const Cluster& cluster, Runtime::Loader& runtime,
Runtime::RandomGenerator& random, Stats::Store& stats_store, Event::Dispatcher& dispatcher,
const std::string& local_zone_name, const std::string& local_address)
const std::string& local_zone_name, const std::string& local_address,
const ClusterEntry* local_cluster)
: parent_(parent), primary_cluster_(cluster),
http_async_client_(
cluster, stats_store, dispatcher, local_zone_name, parent.parent_, runtime, random,
Router::ShadowWriterPtr{new Router::ShadowWriterImpl(parent.parent_)}, local_address) {

const HostSet* local_host_set = nullptr;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless you are going to store const ClusterEntry* somewhere just pass const HostSet* to this function then you avoid doing the same if check again here as you do above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if (local_cluster) {
local_host_set = &local_cluster->host_set_;
}

switch (cluster.lbType()) {
case LoadBalancerType::LeastRequest: {
lb_.reset(new LeastRequestLoadBalancer(host_set_, cluster.stats(), runtime, random));
lb_.reset(
new LeastRequestLoadBalancer(host_set_, local_host_set, cluster.stats(), runtime, random));
break;
}
case LoadBalancerType::Random: {
lb_.reset(new RandomLoadBalancer(host_set_, cluster.stats(), runtime, random));
lb_.reset(new RandomLoadBalancer(host_set_, local_host_set, cluster.stats(), runtime, random));
break;
}
case LoadBalancerType::RoundRobin: {
lb_.reset(new RoundRobinLoadBalancer(host_set_, cluster.stats(), runtime, random));
lb_.reset(
new RoundRobinLoadBalancer(host_set_, local_host_set, cluster.stats(), runtime, random));
break;
}
}
Expand Down
8 changes: 5 additions & 3 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class ClusterManagerImpl : public ClusterManager {
ClusterEntry(ThreadLocalClusterManagerImpl& parent, const Cluster& cluster,
Runtime::Loader& runtime, Runtime::RandomGenerator& random,
Stats::Store& stats_store, Event::Dispatcher& dispatcher,
const std::string& local_zone_name, const std::string& local_address);
const std::string& local_zone_name, const std::string& local_address,
const ClusterEntry* local_cluster);

Http::ConnectionPool::Instance* connPool(ResourcePriority priority);

Expand All @@ -90,15 +91,16 @@ class ClusterManagerImpl : public ClusterManager {
ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
Runtime::Loader& runtime, Runtime::RandomGenerator& random,
const std::string& local_zone_name,
const std::string& local_address);
const std::string& local_address,
const Optional<std::string>& local_cluster_name);
void drainConnPools(HostPtr old_host, ConnPoolsContainer& container);
static void updateClusterMembership(const std::string& name, ConstHostVectorPtr hosts,
ConstHostVectorPtr healthy_hosts,
ConstHostVectorPtr local_zone_hosts,
ConstHostVectorPtr local_zone_healthy_hosts,
const std::vector<HostPtr>& hosts_added,
const std::vector<HostPtr>& hosts_removed,
ThreadLocal::Instance& tls, uint32_t thead_local_slot);
ThreadLocal::Instance& tls, uint32_t thread_local_slot);

// ThreadLocal::ThreadLocalObject
void shutdown() override;
Expand Down
7 changes: 4 additions & 3 deletions source/common/upstream/load_balancer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ ConstHostPtr RoundRobinLoadBalancer::chooseHost() {
return hosts_to_use[rr_index_++ % hosts_to_use.size()];
}

LeastRequestLoadBalancer::LeastRequestLoadBalancer(const HostSet& host_set, ClusterStats& stats,
Runtime::Loader& runtime,
LeastRequestLoadBalancer::LeastRequestLoadBalancer(const HostSet& host_set,
const HostSet* local_host_set,
ClusterStats& stats, Runtime::Loader& runtime,
Runtime::RandomGenerator& random)
: LoadBalancerBase(host_set, stats, runtime, random) {
: LoadBalancerBase(host_set, local_host_set, stats, runtime, random) {
host_set.addMemberUpdateCb(
[this](const std::vector<HostPtr>&, const std::vector<HostPtr>& hosts_removed) -> void {
if (last_host_) {
Expand Down
22 changes: 13 additions & 9 deletions source/common/upstream/load_balancer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ namespace Upstream {
*/
class LoadBalancerBase {
protected:
LoadBalancerBase(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime,
Runtime::RandomGenerator& random)
: stats_(stats), runtime_(runtime), random_(random), host_set_(host_set) {}
LoadBalancerBase(const HostSet& host_set, const HostSet* local_host_set, ClusterStats& stats,
Runtime::Loader& runtime, Runtime::RandomGenerator& random)
: stats_(stats), runtime_(runtime), random_(random), host_set_(host_set),
local_host_set_(local_host_set) {}

/**
* Pick the host list to use (healthy or all depending on how many in the set are not healthy).
Expand All @@ -26,16 +27,18 @@ class LoadBalancerBase {

private:
const HostSet& host_set_;
const HostSet* local_host_set_;
};

/**
* Implementation of LoadBalancer that performs RR selection across the hosts in the cluster.
*/
class RoundRobinLoadBalancer : public LoadBalancer, LoadBalancerBase {
public:
RoundRobinLoadBalancer(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime,
RoundRobinLoadBalancer(const HostSet& host_set, const HostSet* local_host_set_,
ClusterStats& stats, Runtime::Loader& runtime,
Runtime::RandomGenerator& random)
: LoadBalancerBase(host_set, stats, runtime, random) {}
: LoadBalancerBase(host_set, local_host_set_, stats, runtime, random) {}

// Upstream::LoadBalancer
ConstHostPtr chooseHost() override;
Expand All @@ -59,7 +62,8 @@ class RoundRobinLoadBalancer : public LoadBalancer, LoadBalancerBase {
*/
class LeastRequestLoadBalancer : public LoadBalancer, LoadBalancerBase {
public:
LeastRequestLoadBalancer(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime,
LeastRequestLoadBalancer(const HostSet& host_set, const HostSet* local_host_set_,
ClusterStats& stats, Runtime::Loader& runtime,
Runtime::RandomGenerator& random);

// Upstream::LoadBalancer
Expand All @@ -75,9 +79,9 @@ class LeastRequestLoadBalancer : public LoadBalancer, LoadBalancerBase {
*/
class RandomLoadBalancer : public LoadBalancer, LoadBalancerBase {
public:
RandomLoadBalancer(const HostSet& host_set, ClusterStats& stats, Runtime::Loader& runtime,
Runtime::RandomGenerator& random)
: LoadBalancerBase(host_set, stats, runtime, random) {}
RandomLoadBalancer(const HostSet& host_set, const HostSet* local_host_set, ClusterStats& stats,
Runtime::Loader& runtime, Runtime::RandomGenerator& random)
: LoadBalancerBase(host_set, local_host_set, stats, runtime, random) {}

// Upstream::LoadBalancer
ConstHostPtr chooseHost() override;
Expand Down
59 changes: 59 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,65 @@ TEST_F(ClusterManagerImplTest, UnknownClusterType) {
EXPECT_THROW(create(loader), EnvoyException);
}

TEST_F(ClusterManagerImplTest, LocalClusterNotDefined) {
std::string json = R"EOF(
{
"local_cluster_name": "new_cluster",
"clusters": [
{
"name": "cluster_1",
"connect_timeout_ms": 250,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11001"}]
},
{
"name": "cluster_2",
"connect_timeout_ms": 250,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11002"}]
}]
}
)EOF";

Json::StringLoader loader(json);
EXPECT_THROW(create(loader), EnvoyException);
}

TEST_F(ClusterManagerImplTest, LocalClusterDefined) {
std::string json = R"EOF(
{
"local_cluster_name": "new_cluster",
"clusters": [
{
"name": "cluster_1",
"connect_timeout_ms": 250,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11001"}]
},
{
"name": "cluster_2",
"connect_timeout_ms": 250,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11002"}]
},
{
"name": "new_cluster",
"connect_timeout_ms": 250,
"type": "static",
"lb_type": "round_robin",
"hosts": [{"url": "tcp://127.0.0.1:11002"}]
}]
}
)EOF";

Json::StringLoader loader(json);
create(loader);
}

TEST_F(ClusterManagerImplTest, DuplicateCluster) {
std::string json = R"EOF(
{
Expand Down
6 changes: 3 additions & 3 deletions test/common/upstream/load_balancer_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class RoundRobinLoadBalancerTest : public testing::Test {
NiceMock<Runtime::MockRandomGenerator> random_;
Stats::IsolatedStoreImpl stats_store_;
ClusterStats stats_;
RoundRobinLoadBalancer lb_{cluster_, stats_, runtime_, random_};
RoundRobinLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_};
};

TEST_F(RoundRobinLoadBalancerTest, NoHosts) { EXPECT_EQ(nullptr, lb_.chooseHost()); }
Expand Down Expand Up @@ -196,7 +196,7 @@ class LeastRequestLoadBalancerTest : public testing::Test {
NiceMock<Runtime::MockRandomGenerator> random_;
Stats::IsolatedStoreImpl stats_store_;
ClusterStats stats_;
LeastRequestLoadBalancer lb_{cluster_, stats_, runtime_, random_};
LeastRequestLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_};
};

TEST_F(LeastRequestLoadBalancerTest, NoHosts) { EXPECT_EQ(nullptr, lb_.chooseHost()); }
Expand Down Expand Up @@ -354,7 +354,7 @@ class RandomLoadBalancerTest : public testing::Test {
NiceMock<Runtime::MockRandomGenerator> random_;
Stats::IsolatedStoreImpl stats_store_;
ClusterStats stats_;
RandomLoadBalancer lb_{cluster_, stats_, runtime_, random_};
RandomLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_};
};

TEST_F(RandomLoadBalancerTest, NoHosts) { EXPECT_EQ(nullptr, lb_.chooseHost()); }
Expand Down
2 changes: 1 addition & 1 deletion test/common/upstream/load_balancer_simulation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class DISABLED_SimulationTest : public testing::Test {
Stats::IsolatedStoreImpl stats_store_;
ClusterStats stats_;
// TODO: make per originating host load balancer.
RandomLoadBalancer lb_{cluster_, stats_, runtime_, random_};
RandomLoadBalancer lb_{cluster_, nullptr, stats_, runtime_, random_};
};

TEST_F(DISABLED_SimulationTest, strictlyEqualDistribution) {
Expand Down