Skip to content

Commit

Permalink
eds: avoid send too many ClusterLoadAssignment requests (#7976)
Browse files Browse the repository at this point in the history
During initializing secondary clusters, for each initialized cluster, a ClusterLoadAssignment
request is sent to istio pilot with the cluster's name appended into request's resource_names
list. With a huge number of clusters(e.g 10k clusters), this behavior slows down Envoy's
initialization and consumes ton of memory. This change pauses ADS mux for ClusterLoadAssignment to avoid that.

Risk Level: Medium
Testing: tiny change, no test case added

Fixes #7955

Signed-off-by: lhuang8 <lhuang8@ebay.com>
  • Loading branch information
l8huang authored and htuch committed Aug 25, 2019
1 parent 6ff0bce commit 4d78ff5
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 15 deletions.
2 changes: 1 addition & 1 deletion source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class NullGrpcMuxImpl : public GrpcMux {
}
void pause(const std::string&) override {}
void resume(const std::string&) override {}
bool paused(const std::string&) const override { NOT_REACHED_GCOVR_EXCL_LINE; }
bool paused(const std::string&) const override { return false; }
};

} // namespace Config
Expand Down
3 changes: 3 additions & 0 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ void CdsApiImpl::onConfigUpdate(
cm_.adsMux().pause(Config::TypeUrl::get().ClusterLoadAssignment);
Cleanup eds_resume([this] { cm_.adsMux().resume(Config::TypeUrl::get().ClusterLoadAssignment); });

ENVOY_LOG(info, "cds: add {} cluster(s), remove {} cluster(s)", added_resources.size(),
removed_resources.size());

std::vector<std::string> exception_msgs;
std::unordered_set<std::string> cluster_names;
bool any_applied = false;
Expand Down
36 changes: 25 additions & 11 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ void ClusterManagerInitHelper::removeCluster(Cluster& cluster) {
maybeFinishInitialize();
}

void ClusterManagerInitHelper::initializeSecondaryClusters() {
started_secondary_initialize_ = true;
// Cluster::initialize() method can modify the list of secondary_init_clusters_ to remove
// the item currently being initialized, so we eschew range-based-for and do this complicated
// dance to increment the iterator before calling initialize.
for (auto iter = secondary_init_clusters_.begin(); iter != secondary_init_clusters_.end();) {
Cluster* cluster = *iter;
++iter;
ENVOY_LOG(debug, "initializing secondary cluster {}", cluster->info()->name());
cluster->initialize([cluster, this] { onClusterInit(*cluster); });
}
}

void ClusterManagerInitHelper::maybeFinishInitialize() {
// Do not do anything if we are still doing the initial static load or if we are waiting for
// CDS initialize.
Expand All @@ -121,15 +134,16 @@ void ClusterManagerInitHelper::maybeFinishInitialize() {
if (!secondary_init_clusters_.empty()) {
if (!started_secondary_initialize_) {
ENVOY_LOG(info, "cm init: initializing secondary clusters");
started_secondary_initialize_ = true;
// Cluster::initialize() method can modify the list of secondary_init_clusters_ to remove
// the item currently being initialized, so we eschew range-based-for and do this complicated
// dance to increment the iterator before calling initialize.
for (auto iter = secondary_init_clusters_.begin(); iter != secondary_init_clusters_.end();) {
Cluster* cluster = *iter;
++iter;
ENVOY_LOG(debug, "initializing secondary cluster {}", cluster->info()->name());
cluster->initialize([cluster, this] { onClusterInit(*cluster); });
// If the first CDS response doesn't have any primary cluster, ClusterLoadAssignment
// should be already paused by CdsApiImpl::onConfigUpdate(). Need to check that to
// avoid double pause ClusterLoadAssignment.
if (cm_.adsMux().paused(Config::TypeUrl::get().ClusterLoadAssignment)) {
initializeSecondaryClusters();
} else {
cm_.adsMux().pause(Config::TypeUrl::get().ClusterLoadAssignment);
Cleanup eds_resume(
[this] { cm_.adsMux().resume(Config::TypeUrl::get().ClusterLoadAssignment); });
initializeSecondaryClusters();
}
}

Expand Down Expand Up @@ -188,7 +202,7 @@ ClusterManagerImpl::ClusterManagerImpl(
: factory_(factory), runtime_(runtime), stats_(stats), tls_(tls.allocateSlot()),
random_(random), bind_config_(bootstrap.cluster_manager().upstream_bind_config()),
local_info_(local_info), cm_stats_(generateStats(stats)),
init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }),
init_helper_(*this, [this](Cluster& cluster) { onClusterInit(cluster); }),
config_tracker_entry_(
admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })),
time_source_(main_thread_dispatcher.timeSource()), dispatcher_(main_thread_dispatcher),
Expand Down Expand Up @@ -496,7 +510,7 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& clust
loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_);

if (use_active_map) {
ENVOY_LOG(info, "add/update cluster {} during init", cluster_name);
ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name);
auto& cluster_entry = active_clusters_.at(cluster_name);
createOrUpdateThreadLocalCluster(*cluster_entry);
init_helper_.addCluster(*cluster_entry->cluster_);
Expand Down
7 changes: 5 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
* @param per_cluster_init_callback supplies the callback to call when a cluster has itself
* initialized. The cluster manager can use this for post-init processing.
*/
ClusterManagerInitHelper(const std::function<void(Cluster&)>& per_cluster_init_callback)
: per_cluster_init_callback_(per_cluster_init_callback) {}
ClusterManagerInitHelper(ClusterManager& cm,
const std::function<void(Cluster&)>& per_cluster_init_callback)
: cm_(cm), per_cluster_init_callback_(per_cluster_init_callback) {}

enum class State {
// Initial state. During this state all static clusters are loaded. Any phase 1 clusters
Expand All @@ -128,9 +129,11 @@ class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
State state() const { return state_; }

private:
void initializeSecondaryClusters();
void maybeFinishInitialize();
void onClusterInit(Cluster& cluster);

ClusterManager& cm_;
std::function<void(Cluster& cluster)> per_cluster_init_callback_;
CdsApi* cds_{};
std::function<void()> initialized_callback_;
Expand Down
59 changes: 58 additions & 1 deletion test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
#include "gtest/gtest.h"

using testing::_;
using testing::ByRef;
using testing::Eq;
using testing::InSequence;
using testing::Invoke;
using testing::Mock;
Expand Down Expand Up @@ -2752,7 +2754,8 @@ class ClusterManagerInitHelperTest : public testing::Test {
public:
MOCK_METHOD1(onClusterInit, void(Cluster& cluster));

ClusterManagerInitHelper init_helper_{[this](Cluster& cluster) { onClusterInit(cluster); }};
NiceMock<MockClusterManager> cm_;
ClusterManagerInitHelper init_helper_{cm_, [this](Cluster& cluster) { onClusterInit(cluster); }};
};

TEST_F(ClusterManagerInitHelperTest, ImmediateInitialize) {
Expand Down Expand Up @@ -2824,6 +2827,60 @@ TEST_F(ClusterManagerInitHelperTest, UpdateAlreadyInitialized) {
cluster2.initialize_callback_();
}

// If secondary clusters initialization triggered outside of CdsApiImpl::onConfigUpdate()'s
// callback flows, sending ClusterLoadAssignment should not be paused before calling
// ClusterManagerInitHelper::maybeFinishInitialize(). This case tests that
// ClusterLoadAssignment request is paused and resumed properly.
TEST_F(ClusterManagerInitHelperTest, InitSecondaryWithoutEdsPaused) {
InSequence s;

ReadyWatcher cm_initialized;
init_helper_.setInitializedCb([&]() -> void { cm_initialized.ready(); });

NiceMock<MockClusterMockPrioritySet> cluster1;
ON_CALL(cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary));
init_helper_.addCluster(cluster1);

const auto& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
EXPECT_CALL(cm_.ads_mux_, paused(Eq(ByRef(type_url)))).WillRepeatedly(Return(false));
EXPECT_CALL(cm_.ads_mux_, pause(Eq(ByRef(type_url))));
EXPECT_CALL(cluster1, initialize(_));
EXPECT_CALL(cm_.ads_mux_, resume(Eq(ByRef(type_url))));

init_helper_.onStaticLoadComplete();

EXPECT_CALL(*this, onClusterInit(Ref(cluster1)));
EXPECT_CALL(cm_initialized, ready());
cluster1.initialize_callback_();
}

// If secondary clusters initialization triggered inside of CdsApiImpl::onConfigUpdate()'s
// callback flows, that's, the CDS response didn't have any primary cluster, sending
// ClusterLoadAssignment should be already paused by CdsApiImpl::onConfigUpdate().
// This case tests that ClusterLoadAssignment request isn't paused again.
TEST_F(ClusterManagerInitHelperTest, InitSecondaryWithEdsPaused) {
InSequence s;

ReadyWatcher cm_initialized;
init_helper_.setInitializedCb([&]() -> void { cm_initialized.ready(); });

NiceMock<MockClusterMockPrioritySet> cluster1;
ON_CALL(cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary));
init_helper_.addCluster(cluster1);

const auto& type_url = Config::TypeUrl::get().ClusterLoadAssignment;
EXPECT_CALL(cm_.ads_mux_, paused(Eq(ByRef(type_url)))).WillRepeatedly(Return(true));
EXPECT_CALL(cm_.ads_mux_, pause(Eq(ByRef(type_url)))).Times(0);
EXPECT_CALL(cluster1, initialize(_));
EXPECT_CALL(cm_.ads_mux_, resume(Eq(ByRef(type_url)))).Times(0);

init_helper_.onStaticLoadComplete();

EXPECT_CALL(*this, onClusterInit(Ref(cluster1)));
EXPECT_CALL(cm_initialized, ready());
cluster1.initialize_callback_();
}

TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) {
InSequence s;

Expand Down

0 comments on commit 4d78ff5

Please sign in to comment.