Skip to content

Commit

Permalink
xds-failover: prevent sending initial_resource_versions when moving f…
Browse files Browse the repository at this point in the history
…rom fallback to primary (envoyproxy#35692)

Signed-off-by: Adi Suissa-Peleg <adip@google.com>
  • Loading branch information
adisuissa authored Aug 23, 2024
1 parent d7e0d60 commit 93099c6
Show file tree
Hide file tree
Showing 27 changed files with 306 additions and 170 deletions.
5 changes: 4 additions & 1 deletion envoy/config/grpc_mux.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ template <class ResponseProto> class GrpcStreamCallbacks {
/**
* For the GrpcStream to prompt the context to take appropriate action in response to
* failure to establish the gRPC stream.
* @param next_attempt_may_send_initial_resource_version a flag indicating whether the
* next reconnection attempt will be to the same source that was previously successful
* or not (used to pass primary/failover reconnection information to the GrpcMux).
*/
virtual void onEstablishmentFailure() PURE;
virtual void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) PURE;

/**
* For the GrpcStream to pass received protos to the context.
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/null_grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class NullGrpcMuxImpl : public GrpcMux,

void onWriteable() override {}
void onStreamEstablished() override {}
void onEstablishmentFailure() override {}
void onEstablishmentFailure(bool) override {}
void onDiscoveryResponse(std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&&,
ControlPlaneStats&) override {}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ bool DeltaSubscriptionState::subscriptionUpdatePending() const {
return must_send_discovery_request_;
}

void DeltaSubscriptionState::markStreamFresh(bool should_send_initial_resource_versions) {
any_request_sent_yet_in_current_stream_ = false;
should_send_initial_resource_versions_ = should_send_initial_resource_versions;
}

UpdateAck DeltaSubscriptionState::handleResponse(
const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
// We *always* copy the response's nonce into the next request, even if we're going to make that
Expand Down Expand Up @@ -286,21 +291,25 @@ DeltaSubscriptionState::getNextRequestAckless() {
// Also, since this might be a new server, we must explicitly state *all* of our subscription
// interest.
for (auto const& [resource_name, resource_state] : requested_resource_state_) {
// Populate initial_resource_versions with the resource versions we currently have.
// Resources we are interested in, but are still waiting to get any version of from the
// server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
if (!resource_state.isWaitingForServer()) {
(*request.mutable_initial_resource_versions())[resource_name] = resource_state.version();
if (should_send_initial_resource_versions_) {
// Populate initial_resource_versions with the resource versions we currently have.
// Resources we are interested in, but are still waiting to get any version of from the
// server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
if (!resource_state.isWaitingForServer()) {
(*request.mutable_initial_resource_versions())[resource_name] = resource_state.version();
}
}
// We are going over a list of resources that we are interested in, so add them to
// resource_names_subscribe.
names_added_.insert(resource_name);
}
for (auto const& [resource_name, resource_version] : wildcard_resource_state_) {
(*request.mutable_initial_resource_versions())[resource_name] = resource_version;
}
for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) {
(*request.mutable_initial_resource_versions())[resource_name] = resource_version;
if (should_send_initial_resource_versions_) {
for (auto const& [resource_name, resource_version] : wildcard_resource_state_) {
(*request.mutable_initial_resource_versions())[resource_name] = resource_version;
}
for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) {
(*request.mutable_initial_resource_versions())[resource_name] = resource_version;
}
}
// If this is a legacy wildcard request, then make sure that the resource_names_subscribe is
// empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {
// Whether there was a change in our subscription interest we have yet to inform the server of.
bool subscriptionUpdatePending() const;

void markStreamFresh() { any_request_sent_yet_in_current_stream_ = false; }
// Marks the stream as fresh for the next reconnection attempt. If
// should_send_initial_resource_versions is true, then the next request will
// also populate the initial_resource_versions field in the first request (if
// there are relevant resources).
void markStreamFresh(bool should_send_initial_resource_versions);

UpdateAck handleResponse(const envoy::service::discovery::v3::DeltaDiscoveryResponse& message);

Expand Down Expand Up @@ -169,6 +173,7 @@ class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> {

bool in_initial_legacy_wildcard_{true};
bool any_request_sent_yet_in_current_stream_{};
bool should_send_initial_resource_versions_{true};
bool must_send_discovery_request_{};

// Tracks changes in our subscription interest since the previous DeltaDiscoveryRequest we sent.
Expand Down
37 changes: 24 additions & 13 deletions source/extensions/config_subscription/grpc/grpc_mux_failover.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
Event::Dispatcher& dispatcher)
: grpc_mux_callbacks_(grpc_mux_callbacks), primary_callbacks_(*this),
primary_grpc_stream_(std::move(primary_stream_creator(&primary_callbacks_))),
connection_state_(ConnectionState::None), ever_connected_to_primary_(false) {
connection_state_(ConnectionState::None), ever_connected_to_primary_(false),
previously_connected_to_(ConnectedTo::None) {
ASSERT(primary_grpc_stream_ != nullptr);
if (failover_stream_creator.has_value()) {
ENVOY_LOG(warn, "Using xDS-Failover. Note that the implementation is currently considered "
Expand Down Expand Up @@ -202,7 +203,7 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
parent_.grpc_mux_callbacks_.onStreamEstablished();
}

void onEstablishmentFailure() override {
void onEstablishmentFailure(bool) override {
// This will be called when the primary stream fails to establish a connection, or after the
// connection was closed.
ASSERT(parent_.connectingToOrConnectedToPrimary());
Expand All @@ -221,11 +222,11 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
"in a row. Attempting to connect to the failover stream.");
// This will close the stream and prevent the retry timer from
// reconnecting to the primary source.
// TODO(adisuissa): need to ensure that when moving between primary and failover,
// the initial_resource_versions that are sent are empty. This will be
// done in a followup PR.
parent_.primary_grpc_stream_->closeStream();
parent_.grpc_mux_callbacks_.onEstablishmentFailure();
// Next attempt will be to the failover, set the value that
// determines whether to set initial_resource_versions or not.
parent_.grpc_mux_callbacks_.onEstablishmentFailure(parent_.previously_connected_to_ ==
ConnectedTo::Failover);
parent_.connection_state_ = ConnectionState::ConnectingToFailover;
parent_.failover_grpc_stream_->establishNewStream();
return;
Expand All @@ -237,7 +238,10 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
ENVOY_LOG_MISC(trace, "Not trying to connect to failover. Will try again to reconnect to the "
"primary (upon retry).");
parent_.connection_state_ = ConnectionState::ConnectingToPrimary;
parent_.grpc_mux_callbacks_.onEstablishmentFailure();
// Next attempt will be to the primary, set the value that
// determines whether to set initial_resource_versions or not.
parent_.grpc_mux_callbacks_.onEstablishmentFailure(parent_.previously_connected_to_ ==
ConnectedTo::Primary);
}

void onDiscoveryResponse(ResponseProtoPtr<ResponseType>&& message,
Expand All @@ -249,6 +253,7 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
parent_.ever_connected_to_primary_ = true;
primary_consecutive_failures_ = 0;
parent_.connection_state_ = ConnectionState::ConnectedToPrimary;
parent_.previously_connected_to_ = ConnectedTo::Primary;
parent_.grpc_mux_callbacks_.onDiscoveryResponse(std::move(message), control_plane_stats);
}

Expand Down Expand Up @@ -278,7 +283,7 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
parent_.grpc_mux_callbacks_.onStreamEstablished();
}

void onEstablishmentFailure() override {
void onEstablishmentFailure(bool) override {
// This will be called when the failover stream fails to establish a connection, or after the
// connection was closed.
ASSERT(parent_.connectingToOrConnectedToFailover());
Expand All @@ -288,12 +293,13 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
"before). Attempting to connect to the primary stream.");

// This will close the stream and prevent the retry timer from
// reconnecting to the failover source.
// TODO(adisuissa): need to ensure that when moving between primary and failover,
// the initial_resource_versions that are sent are empty. This will be
// done in a followup PR.
// reconnecting to the failover source. The next attempt will be to the
// primary source.
parent_.failover_grpc_stream_->closeStream();
parent_.grpc_mux_callbacks_.onEstablishmentFailure();
// Next attempt will be to the primary, set the value that
// determines whether to set initial_resource_versions or not.
parent_.grpc_mux_callbacks_.onEstablishmentFailure(parent_.previously_connected_to_ ==
ConnectedTo::Primary);
// Setting the connection state to None, and when the retry timer will
// expire, Envoy will try to connect to the primary source.
parent_.connection_state_ = ConnectionState::None;
Expand All @@ -312,6 +318,7 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
// Received a response from the failover. The failover is now considered available (no going
// back to the primary will be attempted).
parent_.connection_state_ = ConnectionState::ConnectedToFailover;
parent_.previously_connected_to_ = ConnectedTo::Failover;
parent_.grpc_mux_callbacks_.onDiscoveryResponse(std::move(message), control_plane_stats);
}

Expand Down Expand Up @@ -395,6 +402,10 @@ class GrpcMuxFailover : public GrpcStreamInterface<RequestType, ResponseType>,
// primary or failover source. Envoy is considered successfully connected to a source
// once it receives a response from it.
bool ever_connected_to_primary_{false};

enum class ConnectedTo { None, Primary, Failover };
// Used to track the most recent source that Envoy was connected to.
ConnectedTo previously_connected_to_;
};

} // namespace Config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ void GrpcMuxImpl::onStreamEstablished() {
}
}

void GrpcMuxImpl::onEstablishmentFailure() {
void GrpcMuxImpl::onEstablishmentFailure(bool) {
for (const auto& api_state : api_state_) {
for (auto watch : api_state.second->watches_) {
watch->callbacks_.onConfigUpdateFailed(
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/config_subscription/grpc/grpc_mux_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class GrpcMuxImpl : public GrpcMux,

// Config::GrpcStreamCallbacks
void onStreamEstablished() override;
void onEstablishmentFailure() override;
void onEstablishmentFailure(bool) override;
void
onDiscoveryResponse(std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message,
ControlPlaneStats& control_plane_stats) override;
Expand Down
7 changes: 5 additions & 2 deletions source/extensions/config_subscription/grpc/grpc_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class GrpcStream : public GrpcStreamInterface<RequestProto, ResponseProto>,
if (stream_ == nullptr) {
ENVOY_LOG(debug, "Unable to establish new stream to configuration server {}",
async_client_.destination());
callbacks_->onEstablishmentFailure();
callbacks_->onEstablishmentFailure(true);
setRetryTimer();
return;
}
Expand Down Expand Up @@ -112,7 +112,10 @@ class GrpcStream : public GrpcStreamInterface<RequestProto, ResponseProto>,
logClose(status, message);
stream_ = nullptr;
control_plane_stats_.connected_state_.set(0);
callbacks_->onEstablishmentFailure();
// By default Envoy will reconnect to the same server, so pass true here.
// This will be overridden by the mux-failover if Envoy will reconnect to a
// different server.
callbacks_->onEstablishmentFailure(true);
// Only retry the timer if not intentionally closed by Envoy.
if (!stream_intentionally_closed_) {
setRetryTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ void NewGrpcMuxImpl::onDiscoveryResponse(
void NewGrpcMuxImpl::onStreamEstablished() {
for (auto& [type_url, subscription] : subscriptions_) {
UNREFERENCED_PARAMETER(type_url);
subscription->sub_state_.markStreamFresh();
subscription->sub_state_.markStreamFresh(should_send_initial_resource_versions_);
}
pausable_ack_queue_.clear();
trySendDiscoveryRequests();
}

void NewGrpcMuxImpl::onEstablishmentFailure() {
void NewGrpcMuxImpl::onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) {
// If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately
// call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a
// crash, the iteration needs to dance around a little: collect pointers to all
Expand All @@ -208,6 +208,7 @@ void NewGrpcMuxImpl::onEstablishmentFailure() {
}
}
} while (all_subscribed.size() != subscriptions_.size());
should_send_initial_resource_versions_ = next_attempt_may_send_initial_resource_version;
}

void NewGrpcMuxImpl::onWriteable() { trySendDiscoveryRequests(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class NewGrpcMuxImpl

void onStreamEstablished() override;

void onEstablishmentFailure() override;
void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) override;

void onWriteable() override;

Expand Down Expand Up @@ -210,6 +210,9 @@ class NewGrpcMuxImpl
XdsConfigTrackerOptRef xds_config_tracker_;
EdsResourcesCachePtr eds_resources_cache_;

// Used to track whether initial_resource_versions should be populated on the
// next reconnection.
bool should_send_initial_resource_versions_{true};
bool started_{false};
// True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
// true because it may contain dangling pointers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ bool DeltaSubscriptionState::subscriptionUpdatePending() const {
return dynamicContextChanged();
}

void DeltaSubscriptionState::markStreamFresh(bool should_send_initial_resource_versions) {
any_request_sent_yet_in_current_stream_ = false;
should_send_initial_resource_versions_ = should_send_initial_resource_versions;
}

bool DeltaSubscriptionState::isHeartbeatResource(
const envoy::service::discovery::v3::Resource& resource) const {
if (!supports_heartbeats_) {
Expand Down Expand Up @@ -244,21 +249,25 @@ DeltaSubscriptionState::getNextRequestInternal() {
// Also, since this might be a new server, we must explicitly state *all* of our subscription
// interest.
for (auto const& [resource_name, resource_state] : requested_resource_state_) {
// Populate initial_resource_versions with the resource versions we currently have.
// Resources we are interested in, but are still waiting to get any version of from the
// server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
if (!resource_state.isWaitingForServer()) {
(*request->mutable_initial_resource_versions())[resource_name] = resource_state.version();
if (should_send_initial_resource_versions_) {
// Populate initial_resource_versions with the resource versions we currently have.
// Resources we are interested in, but are still waiting to get any version of from the
// server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
if (!resource_state.isWaitingForServer()) {
(*request->mutable_initial_resource_versions())[resource_name] = resource_state.version();
}
}
// We are going over a list of resources that we are interested in, so add them to
// resource_names_subscribe.
names_added_.insert(resource_name);
}
for (auto const& [resource_name, resource_version] : wildcard_resource_state_) {
(*request->mutable_initial_resource_versions())[resource_name] = resource_version;
}
for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) {
(*request->mutable_initial_resource_versions())[resource_name] = resource_version;
if (should_send_initial_resource_versions_) {
for (auto const& [resource_name, resource_version] : wildcard_resource_state_) {
(*request->mutable_initial_resource_versions())[resource_name] = resource_version;
}
for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) {
(*request->mutable_initial_resource_versions())[resource_name] = resource_version;
}
}
// If this is a legacy wildcard request, then make sure that the resource_names_subscribe is
// empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class DeltaSubscriptionState
// Whether there was a change in our subscription interest we have yet to inform the server of.
bool subscriptionUpdatePending() const override;

void markStreamFresh() override { any_request_sent_yet_in_current_stream_ = false; }
void markStreamFresh(bool should_send_initial_resource_versions) override;

void ttlExpiryCallback(const std::vector<std::string>& expired) override;

Expand Down Expand Up @@ -100,6 +100,7 @@ class DeltaSubscriptionState

bool in_initial_legacy_wildcard_{true};
bool any_request_sent_yet_in_current_stream_{};
bool should_send_initial_resource_versions_{true};

// Tracks changes in our subscription interest since the previous DeltaDiscoveryRequest we sent.
// TODO: Can't use absl::flat_hash_set due to ordering issues in gTest expectation matching.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ template <class S, class F, class RQ, class RS>
void GrpcMuxImpl<S, F, RQ, RS>::handleEstablishedStream() {
ENVOY_LOG(debug, "GrpcMuxImpl stream successfully established");
for (auto& [type_url, subscription_state] : subscriptions_) {
subscription_state->markStreamFresh();
subscription_state->markStreamFresh(should_send_initial_resource_versions_);
}
setAnyRequestSentYetInCurrentStream(false);
maybeUpdateQueueSizeStat(0);
Expand All @@ -283,7 +283,8 @@ void GrpcMuxImpl<S, F, RQ, RS>::handleEstablishedStream() {
}

template <class S, class F, class RQ, class RS>
void GrpcMuxImpl<S, F, RQ, RS>::handleStreamEstablishmentFailure() {
void GrpcMuxImpl<S, F, RQ, RS>::handleStreamEstablishmentFailure(
bool next_attempt_may_send_initial_resource_version) {
ENVOY_LOG(debug, "GrpcMuxImpl stream failed to establish");
// If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately
// call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a
Expand All @@ -302,6 +303,7 @@ void GrpcMuxImpl<S, F, RQ, RS>::handleStreamEstablishmentFailure() {
}
}
} while (all_subscribed.size() != subscriptions_.size());
should_send_initial_resource_versions_ = next_attempt_may_send_initial_resource_version;
}

template <class S, class F, class RQ, class RS>
Expand Down
Loading

0 comments on commit 93099c6

Please sign in to comment.