Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds-failover: plumbing the API and adding integration test #34761

Merged
merged 10 commits into from
Jun 28, 2024
59 changes: 44 additions & 15 deletions source/common/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "source/common/common/assert.h"
#include "source/common/protobuf/utility.h"

#include "absl/status/status.h"

namespace Envoy {
namespace Config {

Expand Down Expand Up @@ -68,11 +70,13 @@ namespace {
/**
* Check the grpc_services and cluster_names for API config sanity. Throws on error.
* @param api_config_source the config source to validate.
* @param max_grpc_services the maximal number of grpc services allowed.
* @return an invalid status when an API config has the wrong number of gRPC
* services or cluster names, depending on expectations set by its API type.
*/
absl::Status
checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_config_source) {
checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_config_source,
int max_grpc_services) {
const bool is_grpc =
(api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::GRPC ||
api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::DELTA_GRPC);
Expand All @@ -89,10 +93,10 @@ checkApiConfigSourceNames(const envoy::config::core::v3::ApiConfigSource& api_co
fmt::format("{}::(DELTA_)GRPC must not have a cluster name specified: {}",
api_config_source.GetTypeName(), api_config_source.DebugString()));
}
if (api_config_source.grpc_services().size() > 1) {
return absl::InvalidArgumentError(
fmt::format("{}::(DELTA_)GRPC must have a single gRPC service specified: {}",
api_config_source.GetTypeName(), api_config_source.DebugString()));
if (api_config_source.grpc_services_size() > max_grpc_services) {
return absl::InvalidArgumentError(fmt::format(
"{}::(DELTA_)GRPC must have no more than {} gRPC services specified: {}",
api_config_source.GetTypeName(), max_grpc_services, api_config_source.DebugString()));
}
} else {
if (!api_config_source.grpc_services().empty()) {
Expand Down Expand Up @@ -133,7 +137,9 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster(
envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC) {
return absl::OkStatus();
}
RETURN_IF_NOT_OK(checkApiConfigSourceNames(api_config_source));
RETURN_IF_NOT_OK(checkApiConfigSourceNames(
api_config_source,
Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") ? 2 : 1));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be computed in the `checkApiConfigSourceNames function instead of being passed as function param?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will require the checkApiConfigSourceNames() function to become a method (rather than a function), or the runtime passed to it, and I'm not sure if it's worth it as this runtime-flag will disappear in the future.
If you feel strongly about this, then I'll pass the runtime as a second param.


const bool is_grpc =
(api_config_source.api_type() == envoy::config::core::v3::ApiConfigSource::GRPC);
Expand All @@ -153,6 +159,14 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster(
primary_clusters, api_config_source.grpc_services()[0].envoy_grpc().cluster_name(),
api_config_source.GetTypeName()));
}
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") &&
api_config_source.grpc_services_size() == 2 &&
api_config_source.grpc_services()[1].has_envoy_grpc()) {
// If an Envoy failover gRPC exists, we validate its cluster name.
RETURN_IF_NOT_OK(Utility::validateClusterName(
primary_clusters, api_config_source.grpc_services()[1].envoy_grpc().cluster_name(),
api_config_source.GetTypeName()));
}
}
// Otherwise, there is no cluster name to validate.
return absl::OkStatus();
Expand All @@ -161,15 +175,23 @@ absl::Status Utility::checkApiConfigSourceSubscriptionBackingCluster(
absl::optional<std::string>
Utility::getGrpcControlPlane(const envoy::config::core::v3::ApiConfigSource& api_config_source) {
if (api_config_source.grpc_services_size() > 0) {
// Only checking for the first entry in grpc_services, because Envoy's xDS implementation
// currently only considers the first gRPC endpoint and ignores any other xDS management servers
// specified in an ApiConfigSource.
std::string res = "";
// In case more than one grpc service is defined, concatenate the names for
// a unique GrpcControlPlane identifier.
if (api_config_source.grpc_services(0).has_envoy_grpc()) {
return api_config_source.grpc_services(0).envoy_grpc().cluster_name();
res = api_config_source.grpc_services(0).envoy_grpc().cluster_name();
} else if (api_config_source.grpc_services(0).has_google_grpc()) {
res = api_config_source.grpc_services(0).google_grpc().target_uri();
}
if (api_config_source.grpc_services(0).has_google_grpc()) {
return api_config_source.grpc_services(0).google_grpc().target_uri();
// Concatenate the failover gRPC service.
if (api_config_source.grpc_services_size() == 2) {
if (api_config_source.grpc_services(1).has_envoy_grpc()) {
absl::StrAppend(&res, ",", api_config_source.grpc_services(1).envoy_grpc().cluster_name());
} else if (api_config_source.grpc_services(1).has_google_grpc()) {
absl::StrAppend(&res, ",", api_config_source.grpc_services(1).google_grpc().target_uri());
}
}
return res;
}
return absl::nullopt;
}
Expand Down Expand Up @@ -204,8 +226,10 @@ Utility::parseRateLimitSettings(const envoy::config::core::v3::ApiConfigSource&
absl::StatusOr<Grpc::AsyncClientFactoryPtr> Utility::factoryForGrpcApiConfigSource(
Grpc::AsyncClientManager& async_client_manager,
const envoy::config::core::v3::ApiConfigSource& api_config_source, Stats::Scope& scope,
bool skip_cluster_check) {
RETURN_IF_NOT_OK(checkApiConfigSourceNames(api_config_source));
bool skip_cluster_check, int grpc_service_idx) {
RETURN_IF_NOT_OK(checkApiConfigSourceNames(
api_config_source,
Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support") ? 2 : 1));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above.


if (api_config_source.api_type() != envoy::config::core::v3::ApiConfigSource::GRPC &&
api_config_source.api_type() != envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) {
Expand All @@ -214,8 +238,13 @@ absl::StatusOr<Grpc::AsyncClientFactoryPtr> Utility::factoryForGrpcApiConfigSour
api_config_source.DebugString()));
}

if (grpc_service_idx >= api_config_source.grpc_services_size()) {
// No returned factory in case there's no entry.
return nullptr;
}

envoy::config::core::v3::GrpcService grpc_service;
grpc_service.MergeFrom(api_config_source.grpc_services(0));
grpc_service.MergeFrom(api_config_source.grpc_services(grpc_service_idx));

return async_client_manager.factoryForGrpcService(grpc_service, scope, skip_cluster_check);
}
Expand Down
7 changes: 5 additions & 2 deletions source/common/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,15 @@ class Utility {
* @param async_client_manager gRPC async client manager.
* @param api_config_source envoy::config::core::v3::ApiConfigSource. Must have config type GRPC.
* @param skip_cluster_check whether to skip cluster validation.
* @return Grpc::AsyncClientFactoryPtr gRPC async client factory.
* @param grpc_service_idx index of the grpc service in the api_config_source. If there's no entry
* in the given index, a nullptr factory will be returned.
* @return Grpc::AsyncClientFactoryPtr gRPC async client factory, or nullptr if there's no
* grpc_service in the given index.
*/
static absl::StatusOr<Grpc::AsyncClientFactoryPtr>
factoryForGrpcApiConfigSource(Grpc::AsyncClientManager& async_client_manager,
const envoy::config::core::v3::ApiConfigSource& api_config_source,
Stats::Scope& scope, bool skip_cluster_check);
Stats::Scope& scope, bool skip_cluster_check, int grpc_service_idx);

/**
* Translate opaque config from google.protobuf.Any to defined proto message.
Expand Down
61 changes: 43 additions & 18 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,25 @@ getOrigin(const Network::TransportSocketOptionsConstSharedPtr& options, HostCons

bool isBlockingAdsCluster(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
absl::string_view cluster_name) {
bool blocking_ads_cluster = false;
if (bootstrap.dynamic_resources().has_ads_config()) {
const auto& ads_config_source = bootstrap.dynamic_resources().ads_config();
// We only care about EnvoyGrpc, not GoogleGrpc, because we only need to delay ADS mux
// initialization if it uses an Envoy cluster that needs to be initialized first. We don't
// depend on the same cluster initialization when opening a gRPC stream for GoogleGrpc.
return (ads_config_source.grpc_services_size() > 0 &&
ads_config_source.grpc_services(0).has_envoy_grpc() &&
ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name);
blocking_ads_cluster =
(ads_config_source.grpc_services_size() > 0 &&
ads_config_source.grpc_services(0).has_envoy_grpc() &&
ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name);
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
// Validate the failover server if there is one.
blocking_ads_cluster |=
(ads_config_source.grpc_services_size() == 2 &&
ads_config_source.grpc_services(1).has_envoy_grpc() &&
ads_config_source.grpc_services(1).envoy_grpc().cluster_name() == cluster_name);
}
}
return false;
return blocking_ads_cluster;
}

} // namespace
Expand Down Expand Up @@ -447,14 +456,22 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo
if (!factory) {
return absl::InvalidArgumentError(fmt::format("{} not found", name));
}
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
ads_mux_ = factory->create(factory_or_error.value()->createUncachedRawAsyncClient(), nullptr,
dispatcher_, random_, *stats_.rootScope(),
dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache);
auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 0);
THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw);
Grpc::AsyncClientFactoryPtr factory_failover = nullptr;
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
auto factory_failover_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 1);
THROW_IF_STATUS_NOT_OK(factory_failover_or_error, throw);
factory_failover = std::move(factory_failover_or_error.value());
}
ads_mux_ = factory->create(
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr,
dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this large code block is is identical to one below, could be factored out into a dedicated function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few differences between the initializations of SotW and delta, and in this PR I don't want to make major changes - I'm keeping the same pattern as was prior to this PR.
We are planning to work more on this code base, so we will definitely change it later.

} else {
absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
RETURN_IF_NOT_OK(status);
Expand All @@ -470,12 +487,20 @@ ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bo
if (!factory) {
return absl::InvalidArgumentError(fmt::format("{} not found", name));
}
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 0);
THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw);
Grpc::AsyncClientFactoryPtr factory_failover = nullptr;
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
auto factory_failover_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false, 1);
THROW_IF_STATUS_NOT_OK(factory_failover_or_error, throw);
factory_failover = std::move(factory_failover_or_error.value());
}
ads_mux_ = factory->create(
factory_or_error.value()->createUncachedRawAsyncClient(), nullptr, dispatcher_, random_,
*stats_.rootScope(), dyn_resources.ads_config(), local_info_,
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
factory_failover ? factory_failover->createUncachedRawAsyncClient() : nullptr,
dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
std::move(custom_config_validators), std::move(backoff_strategy),
makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref, use_eds_cache);
}
Expand Down Expand Up @@ -567,7 +592,7 @@ absl::Status ClusterManagerImpl::initializeSecondaryClusters(
absl::Status status = Config::Utility::checkTransportVersion(load_stats_config);
RETURN_IF_NOT_OK(status);
auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
*async_client_manager_, load_stats_config, *stats_.rootScope(), false);
*async_client_manager_, load_stats_config, *stats_.rootScope(), false, 0);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
load_stats_reporter_ = std::make_unique<LoadStatsReporter>(
local_info_, *this, *stats_.rootScope(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create(
THROW_IF_STATUS_NOT_OK(strategy_or_error, throw);
JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());

auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0);
THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw);
absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
Utility::parseRateLimitSettings(api_config_source);
THROW_IF_STATUS_NOT_OK(rate_limit_settings_or_error, throw);
GrpcMuxContext grpc_mux_context{
factory_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_*/ nullptr,
factory_primary_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_=*/nullptr,
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/deltaGrpcMethod(data.type_url_),
/*local_info_=*/data.local_info_,
Expand Down
21 changes: 19 additions & 2 deletions source/extensions/config_subscription/grpc/grpc_mux_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,25 @@ GrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) {
grpc_mux_context.rate_limit_settings_);
},
/*failover_stream_creator=*/
// TODO(adisuissa): implement when failover is fully plumbed.
absl::nullopt,
grpc_mux_context.failover_async_client_
? absl::make_optional(
[&grpc_mux_context](
GrpcStreamCallbacks<envoy::service::discovery::v3::DiscoveryResponse>*
callbacks)
-> GrpcStreamInterfacePtr<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse> {
return std::make_unique<
GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
envoy::service::discovery::v3::DiscoveryResponse>>(
callbacks, std::move(grpc_mux_context.failover_async_client_),
grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
grpc_mux_context.scope_,
// TODO(adisuissa): the backoff strategy for the failover should
// be the same as the primary source.
std::make_unique<FixedBackOffStrategy>(500),
grpc_mux_context.rate_limit_settings_);
})
: absl::nullopt,
/*grpc_mux_callbacks=*/*this,
/*dispatch=*/grpc_mux_context.dispatcher_);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionDat
THROW_IF_STATUS_NOT_OK(strategy_or_error, throw);
JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());

auto factory_or_error = Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
auto factory_primary_or_error = Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0);
THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw);
absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
Utility::parseRateLimitSettings(api_config_source);
THROW_IF_STATUS_NOT_OK(rate_limit_settings_or_error, throw);
GrpcMuxContext grpc_mux_context{
/*async_client_=*/factory_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_=*/nullptr,
/*async_client_=*/factory_primary_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_=*/nullptr, // Failover is only supported for ADS.
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/sotwGrpcMethod(data.type_url_),
/*local_info_=*/data.local_info_,
Expand Down Expand Up @@ -75,15 +75,15 @@ DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::Subscripti
THROW_IF_STATUS_NOT_OK(strategy_or_error, throw);
JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());

auto factory_or_error = Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true);
THROW_IF_STATUS_NOT_OK(factory_or_error, throw);
auto factory_primary_or_error = Utility::factoryForGrpcApiConfigSource(
data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0);
THROW_IF_STATUS_NOT_OK(factory_primary_or_error, throw);
absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
Utility::parseRateLimitSettings(api_config_source);
THROW_IF_STATUS_NOT_OK(rate_limit_settings_or_error, throw);
GrpcMuxContext grpc_mux_context{
/*async_client_=*/factory_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_=*/nullptr,
/*async_client_=*/factory_primary_or_error.value()->createUncachedRawAsyncClient(),
/*failover_async_client_=*/nullptr, // Failover is only supported for ADS.
/*dispatcher_=*/data.dispatcher_,
/*service_method_=*/deltaGrpcMethod(data.type_url_),
/*local_info_=*/data.local_info_,
Expand Down
Loading
Loading