Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not to send any request at GrpcSubscriptionImpl destructor #4178

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions source/common/config/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ GrpcMuxImpl::~GrpcMuxImpl() {
}
}

void GrpcMuxImpl::noMoreRequestSending() {
for (const auto& api_state : api_state_) {
for (auto watch : api_state.second.watches_) {
watch->send_update_ = false;
}
}
}

void GrpcMuxImpl::start() { establishNewStream(); }

void GrpcMuxImpl::setRetryTimer() {
Expand Down
8 changes: 6 additions & 2 deletions source/common/config/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class GrpcMuxImpl : public GrpcMux,
MonotonicTimeSource& time_source = ProdMonotonicTimeSource::instance_);
~GrpcMuxImpl();

// Not to send any request, the object is about to be deleted.
void noMoreRequestSending();

void start() override;
GrpcMuxWatchPtr subscribe(const std::string& type_url, const std::vector<std::string>& resources,
GrpcMuxCallbacks& callbacks) override;
Expand Down Expand Up @@ -57,14 +60,14 @@ class GrpcMuxImpl : public GrpcMux,
GrpcMuxWatchImpl(const std::vector<std::string>& resources, GrpcMuxCallbacks& callbacks,
const std::string& type_url, GrpcMuxImpl& parent)
: resources_(resources), callbacks_(callbacks), type_url_(type_url), parent_(parent),
inserted_(true) {
inserted_(true), send_update_(true) {
entry_ = parent.api_state_[type_url].watches_.emplace(
parent.api_state_[type_url].watches_.begin(), this);
}
~GrpcMuxWatchImpl() override {
if (inserted_) {
parent_.api_state_[type_url_].watches_.erase(entry_);
if (!resources_.empty()) {
if (send_update_ && !resources_.empty()) {
parent_.sendDiscoveryRequest(type_url_);
}
}
Expand All @@ -75,6 +78,7 @@ class GrpcMuxImpl : public GrpcMux,
GrpcMuxImpl& parent_;
std::list<GrpcMuxWatchImpl*>::iterator entry_;
bool inserted_;
bool send_update_;
};

// Per muxed API state.
Expand Down
1 change: 1 addition & 0 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class GrpcSubscriptionImpl : public Config::Subscription<ResourceType> {
const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats)
: grpc_mux_(node, std::move(async_client), dispatcher, service_method, random),
grpc_mux_subscription_(grpc_mux_, stats) {}
~GrpcSubscriptionImpl() { grpc_mux_.noMoreRequestSending(); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem correct; grpc_mux_ might be an ADS connection. When I delete an individual subscriber, e.g. EDS, I shouldn't permanently disable the muxer. It seems like what we had before is the right thing, the gRPC subscription should relinquish its watch by deleting it, and then via RAII it cleans up its state in the gRPC mux. But, from the issue, there is some bug still. Can you deep dive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GrpcSubscriptionImpl owns grpc_mux_. Look at line 39 of this file.
At its destructor, grpc_mux_ is about to be deleted. It is OK to mark all its watchers; not to send request when a watcher is removed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right, was confusing this with GrpcMuxSubscriptionImpl. It's still not right IMHO, since you're reaching in and dealing with internals in a way that breaks abstractions. The correct solution should clean up via RAII of watchers.

Copy link
Member

@htuch htuch Aug 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look at the code a bit more. I think even with your change, there is a race to worry about. The completion thread should still be live when the client is deleted, as that lives in TLS (are you seeing https://github.com/envoyproxy/envoy/blob/master/source/common/grpc/google_async_client_impl.cc#L19 invoked?). The async client should have reset the stream on its way out. I wonder if we have an issue in https://github.com/envoyproxy/envoy/blob/master/source/common/grpc/google_async_client_impl.cc#L363?

Can you provide a full trace level log and backtrace with line numbers? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a trace log to the issue: #4167

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'm following up with @qiwzhang on IM offline, it looks like SDS still has an active gRPC subscription after TLS shutdown has begun, this is dangerous and we need to figure out a way to avoid that)


// Config::Subscription
void start(const std::vector<std::string>& resources,
Expand Down
2 changes: 0 additions & 2 deletions test/common/config/grpc_subscription_test_harness.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness {
dispatcher_, random_, *method_descriptor_, stats_));
}

~GrpcSubscriptionTestHarness() { EXPECT_CALL(async_stream_, sendMessage(_, false)); }

void expectSendMessage(const std::vector<std::string>& cluster_names,
const std::string& version) override {
expectSendMessage(cluster_names, version, Grpc::Status::GrpcStatus::Ok, "");
Expand Down