Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delta cds: add on-demand cds support. #8984

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions include/envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ class GrpcMux {
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) PURE;
/**
* The only difference between addToWatch() and addOrUpdateWatch() is that the 'resources' here
* means the *extra* resources we interested in.
*/
virtual Watch* addToWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) PURE;

/**
* Cleanup of a Watch* added by addOrUpdateWatch(). Receiving a Watch* from addOrUpdateWatch()
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ class Subscription {
* be passed to std::set_difference, which must be given sorted collections.
*/
virtual void updateResourceInterest(const std::set<std::string>& update_to_these_names) PURE;

/**
* Add the resources to fetch.
* @param resources vector of resource names to fetch.
*/
virtual void addToResourceInterest(const std::set<std::string>&){};
};

using SubscriptionPtr = std::unique_ptr<Subscription>;
Expand Down
14 changes: 14 additions & 0 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ class ClusterManager {
* @return Config::SubscriptionFactory& the subscription factory.
*/
virtual Config::SubscriptionFactory& subscriptionFactory() PURE;

virtual void updateClusterInterest(const std::set<std::string>& update_to_these_names) PURE;

virtual void addToClusterInterest(const std::set<std::string>& add_these_names) PURE;
};

using ClusterManagerPtr = std::unique_ptr<ClusterManager>;
Expand All @@ -250,6 +254,16 @@ class CdsApi {
* @return std::string last accepted version from fetch.
*/
virtual const std::string versionInfo() const PURE;

/**
* Update watch set of cluster resources interested.
*/
virtual void updateClusterInterest(const std::set<std::string>& update_to_these_names) PURE;

/**
* Add watch set of cluster resources interested.
*/
virtual void addToClusterInterest(const std::set<std::string>& add_these_names) PURE;
};

using CdsApiPtr = std::unique_ptr<CdsApi>;
Expand Down
41 changes: 33 additions & 8 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,24 @@ Watch* GrpcMuxImpl::addOrUpdateWatch(const std::string& type_url, Watch* watch,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) {
if (watch == nullptr) {
return addWatch(type_url, resources, callbacks, init_fetch_timeout);
} else {
updateWatch(type_url, watch, resources);
return watch;
watch = addWatch(type_url, callbacks, init_fetch_timeout);
}
// updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
updateWatch(type_url, watch, resources);
return watch;
}

Watch* GrpcMuxImpl::addToWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) {
if (watch == nullptr) {
watch = addWatch(type_url, callbacks, init_fetch_timeout);
}
// addToWatch() queues a discovery request for any of *extra* 'resources' we are not yet
// subscribed.
addToWatch(type_url, watch, resources);
return watch;
}

void GrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
Expand Down Expand Up @@ -95,8 +108,7 @@ void GrpcMuxImpl::handleStreamEstablishmentFailure() {
} while (all_subscribed.size() != subscriptions_.size());
}

Watch* GrpcMuxImpl::addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks,
Watch* GrpcMuxImpl::addWatch(const std::string& type_url, SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) {
auto watch_map = watch_maps_.find(type_url);
if (watch_map == watch_maps_.end()) {
Expand All @@ -108,11 +120,24 @@ Watch* GrpcMuxImpl::addWatch(const std::string& type_url, const std::set<std::st
}

Watch* watch = watch_map->second->addWatch(callbacks);
// updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
updateWatch(type_url, watch, resources);
return watch;
}

void GrpcMuxImpl::addToWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources) {
ASSERT(watch != nullptr);
SubscriptionState& sub = subscriptionStateFor(type_url);
WatchMap& watch_map = watchMapFor(type_url);

auto added_removed = watch_map.addToWatchInterest(watch, resources);
sub.updateSubscriptionInterest(added_removed.added_, added_removed.removed_);

// Tell the server about our change in interest, if any.
if (sub.subscriptionUpdatePending()) {
trySendDiscoveryRequests();
}
}

// Updates the list of resource names watched by the given watch. If an added name is new across
// the whole subscription, or if a removed name has no other watch interested in it, then the
// subscription will enqueue and attempt to send an appropriate discovery request.
Expand Down
16 changes: 14 additions & 2 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ class GrpcMuxImpl : public GrpcMux, Logger::Loggable<Logger::Id::config> {
Watch* addOrUpdateWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources, SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) override;

Watch* addToWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources, SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout) override;

void removeWatch(const std::string& type_url, Watch* watch) override;

void pause(const std::string& type_url) override;
Expand Down Expand Up @@ -67,8 +72,11 @@ class GrpcMuxImpl : public GrpcMux, Logger::Loggable<Logger::Id::config> {
const LocalInfo::LocalInfo& local_info() const { return local_info_; }

private:
Watch* addWatch(const std::string& type_url, const std::set<std::string>& resources,
SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout);
Watch* addWatch(const std::string& type_url, SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout);

void addToWatch(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources);

// Updates the list of resource names watched by the given watch. If an added name is new across
// the whole subscription, or if a removed name has no other watch interested in it, then the
Expand Down Expand Up @@ -188,6 +196,10 @@ class NullGrpcMuxImpl : public GrpcMux {
SubscriptionCallbacks&, std::chrono::milliseconds) override {
throw EnvoyException("ADS must be configured to support an ADS config source");
}
Watch* addToWatch(const std::string&, Watch*, const std::set<std::string>&,
SubscriptionCallbacks&, std::chrono::milliseconds) override {
throw EnvoyException("ADS must be configured to support an ADS config source");
}
void removeWatch(const std::string&, Watch*) override {
throw EnvoyException("ADS must be configured to support an ADS config source");
}
Expand Down
5 changes: 5 additions & 0 deletions source/common/config/grpc_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ void GrpcSubscriptionImpl::updateResourceInterest(
stats_.update_attempt_.inc();
}

void GrpcSubscriptionImpl::addToResourceInterest(const std::set<std::string>& add_these_names) {
watch_ = grpc_mux_->addToWatch(type_url_, watch_, add_these_names, *this, init_fetch_timeout_);
stats_.update_attempt_.inc();
}

// Config::SubscriptionCallbacks
void GrpcSubscriptionImpl::onConfigUpdate(
const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
Expand Down
1 change: 1 addition & 0 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class GrpcSubscriptionImpl : public Subscription, public SubscriptionCallbacks {
// Config::Subscription
void start(const std::set<std::string>& resource_names) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;
void addToResourceInterest(const std::set<std::string>& add_these_names) override;

// Config::SubscriptionCallbacks (all pass through to callbacks_!)
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
Expand Down
11 changes: 11 additions & 0 deletions source/common/config/watch_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ AddedRemoved WatchMap::updateWatchInterest(Watch* watch,
findRemovals(newly_removed_from_watch, watch));
}

AddedRemoved WatchMap::addToWatchInterest(Watch* watch,
const std::set<std::string>& add_these_names) {
std::vector<std::string> newly_added_to_watch(add_these_names.begin(), add_these_names.end());
std::vector<std::string> newly_removed_from_watch;
watch->resource_names_.insert(add_these_names.begin(), add_these_names.end());
std::set<std::string> additions = add_these_names;

return AddedRemoved(findAdditions(newly_added_to_watch, watch),
findRemovals(newly_removed_from_watch, watch));
}

absl::flat_hash_set<Watch*> WatchMap::watchesInterestedIn(const std::string& resource_name) {
absl::flat_hash_set<Watch*> ret = wildcard_watches_;
const auto watches_interested = watch_interest_.find(resource_name);
Expand Down
2 changes: 2 additions & 0 deletions source/common/config/watch_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class WatchMap : public SubscriptionCallbacks, public Logger::Loggable<Logger::I
// Y will be in removed_.
AddedRemoved updateWatchInterest(Watch* watch,
const std::set<std::string>& update_to_these_names);
// Adds the extra set of resource names that the given watch should watch.
AddedRemoved addToWatchInterest(Watch* watch, const std::set<std::string>& add_these_names);

// Expects that the watch to be removed has already had all of its resource names removed via
// updateWatchInterest().
Expand Down
8 changes: 8 additions & 0 deletions source/common/upstream/cds_api_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ class CdsApiImpl : public CdsApi,
}
const std::string versionInfo() const override { return system_version_info_; }

void updateClusterInterest(const std::set<std::string>& update_to_these_names) override {
subscription_->updateResourceInterest(update_to_these_names);
}

void addToClusterInterest(const std::set<std::string>& add_these_names) override {
subscription_->addToResourceInterest(add_these_names);
}

private:
// Config::SubscriptionCallbacks
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
Expand Down
8 changes: 8 additions & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

Config::SubscriptionFactory& subscriptionFactory() override { return subscription_factory_; }

void updateClusterInterest(const std::set<std::string>& update_to_these_names) override {
cds_api_->updateClusterInterest(update_to_these_names);
}

void addToClusterInterest(const std::set<std::string>& add_these_names) override {
cds_api_->addToClusterInterest(add_these_names);
}

protected:
virtual void postThreadLocalDrainConnections(const Cluster& cluster,
const HostVector& hosts_removed);
Expand Down
6 changes: 6 additions & 0 deletions test/common/config/delta_subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ TEST_F(DeltaSubscriptionImplTest, PauseHoldsRequest) {
subscription_->resume();
}

TEST_F(DeltaSubscriptionImplTest, AddResourceCauseRequest) {
startSubscription({});
expectSendMessage({"name1", "name2"}, {}, Grpc::Status::WellKnownGrpcStatus::Ok, "", {});
subscription_->addToResourceInterest({"name1", "name2"});
}

TEST_F(DeltaSubscriptionImplTest, ResponseCausesAck) {
startSubscription({"name1"});
deliverConfigUpdate({"name1"}, "someversion", true);
Expand Down
4 changes: 4 additions & 0 deletions test/mocks/config/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ class MockGrpcMux : public GrpcMux {
Watch*(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources, SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout));
MOCK_METHOD5(addToWatch,
Watch*(const std::string& type_url, Watch* watch,
const std::set<std::string>& resources, SubscriptionCallbacks& callbacks,
std::chrono::milliseconds init_fetch_timeout));
MOCK_METHOD2(removeWatch, void(const std::string& type_url, Watch* watch));
MOCK_METHOD1(pause, void(const std::string& type_url));
MOCK_METHOD1(resume, void(const std::string& type_url));
Expand Down
4 changes: 4 additions & 0 deletions test/mocks/upstream/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ class MockClusterManager : public ClusterManager {
MOCK_METHOD1(addThreadLocalClusterUpdateCallbacks_,
ClusterUpdateCallbacksHandle*(ClusterUpdateCallbacks& callbacks));
MOCK_METHOD0(subscriptionFactory, Config::SubscriptionFactory&());
MOCK_METHOD1(updateClusterInterest, void(const std::set<std::string>&));
MOCK_METHOD1(addToClusterInterest, void(const std::set<std::string>&));

NiceMock<Http::ConnectionPool::MockInstance> conn_pool_;
NiceMock<Http::MockAsyncClient> async_client_;
Expand Down Expand Up @@ -382,6 +384,8 @@ class MockCdsApi : public CdsApi {
MOCK_METHOD0(initialize, void());
MOCK_METHOD1(setInitializedCb, void(std::function<void()> callback));
MOCK_CONST_METHOD0(versionInfo, const std::string());
MOCK_METHOD1(updateClusterInterest, void(const std::set<std::string>&));
MOCK_METHOD1(addToClusterInterest, void(const std::set<std::string>&));

std::function<void()> initialized_callback_;
};
Expand Down