Skip to content

Commit

Permalink
xds: decouple xds channel creation and bootstrapping (#7764)
Browse files Browse the repository at this point in the history
This change fixes the problem of mismatched lifecycle of the xDS channel and XdsClient. Reading the bootstrap will determine and create the ChannelCredentials for each specified xDS server. An exception will be thrown if any xDS server specifies some channel_creds type that is not supported, not just for the first server (which is the only one to be used now). Reading the bootstrap also determines the xDS protocol version. The xDS channel will have the same lifecycle as the XdsClient instance: an xDS channel is created at the first call of getObject() and is shut down at the same time as the XdsClient is shutting down. A new xDS channel will be created when the ObjectPool creates a new XdsClient instance.
  • Loading branch information
voidzcy committed Dec 29, 2020
1 parent 67ad786 commit cddc1a5
Show file tree
Hide file tree
Showing 20 changed files with 341 additions and 482 deletions.
25 changes: 16 additions & 9 deletions xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.InternalLogId;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
Expand Down Expand Up @@ -80,7 +81,8 @@ public void uncaughtException(Thread t, Throwable e) {
private final MessagePrinter msgPrinter = new MessagePrinter();
private final InternalLogId logId;
private final XdsLogger logger;
private final XdsChannel xdsChannel;
private final ManagedChannel channel;
private final boolean useProtocolV3;
private final ScheduledExecutorService timeService;
private final BackoffPolicy.Provider backoffPolicyProvider;
private final Stopwatch stopwatch;
Expand All @@ -107,9 +109,11 @@ public void uncaughtException(Thread t, Throwable e) {
@Nullable
private ScheduledHandle rpcRetryTimer;

AbstractXdsClient(XdsChannel channel, Node node, ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier) {
this.xdsChannel = checkNotNull(channel, "channel");
AbstractXdsClient(ManagedChannel channel, boolean useProtocolV3, Node node,
ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.channel = checkNotNull(channel, "channel");
this.useProtocolV3 = useProtocolV3;
this.node = checkNotNull(node, "node");
this.timeService = checkNotNull(timeService, "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
Expand Down Expand Up @@ -175,7 +179,6 @@ final void shutdown() {
public void run() {
shutdown = true;
logger.log(XdsLogLevel.INFO, "Shutting down");
xdsChannel.getManagedChannel().shutdown();
if (adsStream != null) {
adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
}
Expand All @@ -187,6 +190,11 @@ public void run() {
});
}

@Override
boolean isShutDown() {
return shutdown;
}

@Override
public String toString() {
return logId.toString();
Expand Down Expand Up @@ -295,7 +303,7 @@ protected final boolean isInBackoff() {
// Must be synchronized.
private void startRpcStream() {
checkState(adsStream == null, "Previous adsStream has not been cleared yet");
if (xdsChannel.isUseProtocolV3()) {
if (useProtocolV3) {
adsStream = new AdsStreamV3();
} else {
adsStream = new AdsStreamV2();
Expand Down Expand Up @@ -559,8 +567,7 @@ private final class AdsStreamV2 extends AbstractAdsStream {
void start() {
io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc
.AggregatedDiscoveryServiceStub stub =
io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(
xdsChannel.getManagedChannel());
io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc.newStub(channel);
StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse> responseReaderV2 =
new StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse>() {
@Override
Expand Down Expand Up @@ -638,7 +645,7 @@ private final class AdsStreamV3 extends AbstractAdsStream {
@Override
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(xdsChannel.getManagedChannel());
AggregatedDiscoveryServiceGrpc.newStub(channel);
StreamObserver<DiscoveryResponse> responseReader = new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(final DiscoveryResponse response) {
Expand Down
115 changes: 56 additions & 59 deletions xds/src/main/java/io/grpc/xds/Bootstrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.ChannelCredentials;
import io.grpc.InsecureChannelCredentials;
import io.grpc.Internal;
import io.grpc.TlsChannelCredentials;
import io.grpc.alts.GoogleDefaultChannelCredentials;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.GrpcUtil.GrpcBuildVersion;
import io.grpc.internal.JsonParser;
Expand Down Expand Up @@ -48,13 +52,17 @@ public abstract class Bootstrapper {
private static final String LOG_PREFIX = "xds-bootstrap";
private static final String BOOTSTRAP_PATH_SYS_ENV_VAR = "GRPC_XDS_BOOTSTRAP";
private static final String BOOTSTRAP_PATH_SYS_PROPERTY_VAR = "io.grpc.xds.bootstrap";
private static final String XDS_V3_SERVER_FEATURE = "xds_v3";
@VisibleForTesting
static boolean enableV3Protocol = Boolean.parseBoolean(
System.getenv("GRPC_XDS_EXPERIMENTAL_V3_SUPPORT"));
@VisibleForTesting
static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING =
"envoy.lb.does_not_support_overprovisioning";

private static final Bootstrapper DEFAULT_INSTANCE = new Bootstrapper() {
@Override
public BootstrapInfo readBootstrap() throws XdsInitializationException {
public BootstrapInfo bootstrap() throws XdsInitializationException {
String filePathSource = BOOTSTRAP_PATH_SYS_ENV_VAR;
String filePath = System.getenv(filePathSource);
if (filePath == null) {
Expand Down Expand Up @@ -86,7 +94,7 @@ public static Bootstrapper getInstance() {
/**
* Returns configurations from bootstrap.
*/
public abstract BootstrapInfo readBootstrap() throws XdsInitializationException;
public abstract BootstrapInfo bootstrap() throws XdsInitializationException;

/** Parses a raw string into {@link BootstrapInfo}. */
@VisibleForTesting
Expand All @@ -108,36 +116,35 @@ static BootstrapInfo parseConfig(String rawData) throws XdsInitializationExcepti
throw new XdsInitializationException("Invalid bootstrap: 'xds_servers' does not exist.");
}
logger.log(XdsLogLevel.INFO, "Configured with {0} xDS servers", rawServerConfigs.size());
// TODO(chengyuanzhang): require at least one server URI.
List<Map<String, ?>> serverConfigList = JsonUtil.checkObjectList(rawServerConfigs);
for (Map<String, ?> serverConfig : serverConfigList) {
String serverUri = JsonUtil.getString(serverConfig, "server_uri");
if (serverUri == null) {
throw new XdsInitializationException(
"Invalid bootstrap: missing 'xds_servers'");
throw new XdsInitializationException("Invalid bootstrap: missing 'server_uri'");
}
logger.log(XdsLogLevel.INFO, "xDS server URI: {0}", serverUri);
List<ChannelCreds> channelCredsOptions = new ArrayList<>();

List<?> rawChannelCredsList = JsonUtil.getList(serverConfig, "channel_creds");
if (rawChannelCredsList == null || rawChannelCredsList.isEmpty()) {
throw new XdsInitializationException(
"Invalid bootstrap: server " + serverUri + " 'channel_creds' required");
}
List<Map<String, ?>> channelCredsList = JsonUtil.checkObjectList(rawChannelCredsList);
for (Map<String, ?> channelCreds : channelCredsList) {
String type = JsonUtil.getString(channelCreds, "type");
if (type == null) {
throw new XdsInitializationException(
"Invalid bootstrap: server " + serverUri + " with 'channel_creds' type unspecified");
}
logger.log(XdsLogLevel.INFO, "Channel credentials option: {0}", type);
ChannelCreds creds = new ChannelCreds(type, JsonUtil.getObject(channelCreds, "config"));
channelCredsOptions.add(creds);
ChannelCredentials channelCredentials =
parseChannelCredentials(JsonUtil.checkObjectList(rawChannelCredsList), serverUri);
if (channelCredentials == null) {
throw new XdsInitializationException(
"Server " + serverUri + ": no supported channel credentials found");
}

boolean useProtocolV3 = false;
List<String> serverFeatures = JsonUtil.getListOfStrings(serverConfig, "server_features");
if (serverFeatures != null) {
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
useProtocolV3 = enableV3Protocol
&& serverFeatures.contains(XDS_V3_SERVER_FEATURE);
}
servers.add(new ServerInfo(serverUri, channelCredsOptions, serverFeatures));
servers.add(new ServerInfo(serverUri, channelCredentials, useProtocolV3));
}

Node.Builder nodeBuilder = Node.newBuilder();
Expand Down Expand Up @@ -208,65 +215,55 @@ static <T> T checkForNull(T value, String fieldName) throws XdsInitializationExc
return value;
}

/**
* Data class containing channel credentials configurations for xDS protocol communication.
*/
// TODO(chengyuanzhang): May need more complex structure for channel creds config representation.
@Immutable
static class ChannelCreds {
private final String type;
@Nullable
private final Map<String, ?> config;

@VisibleForTesting
ChannelCreds(String type, @Nullable Map<String, ?> config) {
this.type = type;
this.config = config;
}

String getType() {
return type;
}

@Nullable
Map<String, ?> getConfig() {
if (config != null) {
return Collections.unmodifiableMap(config);
@Nullable
private static ChannelCredentials parseChannelCredentials(List<Map<String, ?>> jsonList,
String serverUri) throws XdsInitializationException {
for (Map<String, ?> channelCreds : jsonList) {
String type = JsonUtil.getString(channelCreds, "type");
if (type == null) {
throw new XdsInitializationException(
"Invalid bootstrap: server " + serverUri + " with 'channel_creds' type unspecified");
}
switch (type) {
case "google_default":
return GoogleDefaultChannelCredentials.create();
case "insecure":
return InsecureChannelCredentials.create();
case "tls":
return TlsChannelCredentials.create();
default:
}
return null;
}
return null;
}

/**
* Data class containing xDS server information, such as server URI and channel credential
* options to be used for communication.
* Data class containing xDS server information, such as server URI and channel credentials
* to be used for communication.
*/
@Immutable
static class ServerInfo {
private final String serverUri;
private final List<ChannelCreds> channelCredsList;
@Nullable
private final List<String> serverFeatures;
private final String target;
private final ChannelCredentials channelCredentials;
private final boolean useProtocolV3;

@VisibleForTesting
ServerInfo(String serverUri, List<ChannelCreds> channelCredsList, List<String> serverFeatures) {
this.serverUri = serverUri;
this.channelCredsList = channelCredsList;
this.serverFeatures = serverFeatures;
ServerInfo(String target, ChannelCredentials channelCredentials, boolean useProtocolV3) {
this.target = checkNotNull(target, "target");
this.channelCredentials = checkNotNull(channelCredentials, "channelCredentials");
this.useProtocolV3 = useProtocolV3;
}

String getServerUri() {
return serverUri;
String getTarget() {
return target;
}

List<ChannelCreds> getChannelCredentials() {
return Collections.unmodifiableList(channelCredsList);
ChannelCredentials getChannelCredentials() {
return channelCredentials;
}

List<String> getServerFeatures() {
return serverFeatures == null
? Collections.<String>emptyList()
: Collections.unmodifiableList(serverFeatures);
boolean isUseProtocolV3() {
return useProtocolV3;
}
}

Expand Down
12 changes: 7 additions & 5 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager;
import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
Expand Down Expand Up @@ -89,11 +90,12 @@ final class ClientXdsClient extends AbstractXdsClient {
private final LoadReportClient lrsClient;
private boolean reportingLoad;

ClientXdsClient(XdsChannel channel, Node node, ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier) {
super(channel, node, timeService, backoffPolicyProvider, stopwatchSupplier);
lrsClient = new LoadReportClient(loadStatsManager, channel, node, getSyncContext(),
timeService, backoffPolicyProvider, stopwatchSupplier);
ClientXdsClient(ManagedChannel channel, boolean useProtocolV3, Node node,
ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
super(channel, useProtocolV3, node, timeService, backoffPolicyProvider, stopwatchSupplier);
lrsClient = new LoadReportClient(loadStatsManager, channel, useProtocolV3, node,
getSyncContext(), timeService, backoffPolicyProvider, stopwatchSupplier);
}

@Override
Expand Down
17 changes: 10 additions & 7 deletions xds/src/main/java/io/grpc/xds/LoadReportClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.InternalLogId;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.EnvoyProtoData.ClusterStats;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -52,7 +52,8 @@
final class LoadReportClient {
private final InternalLogId logId;
private final XdsLogger logger;
private final XdsChannel xdsChannel;
private final ManagedChannel channel;
private final boolean useProtocolV3;
private final Node node;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService timerService;
Expand All @@ -70,14 +71,16 @@ final class LoadReportClient {

LoadReportClient(
LoadStatsManager loadStatsManager,
XdsChannel xdsChannel,
ManagedChannel channel,
boolean useProtocolV3,
Node node,
SynchronizationContext syncContext,
ScheduledExecutorService scheduledExecutorService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) {
this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
this.xdsChannel = checkNotNull(xdsChannel, "xdsChannel");
this.channel = checkNotNull(channel, "xdsChannel");
this.useProtocolV3 = useProtocolV3;
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timerService = checkNotNull(scheduledExecutorService, "timeService");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
Expand Down Expand Up @@ -152,7 +155,7 @@ private void startLrsRpc() {
return;
}
checkState(lrsStream == null, "previous lbStream has not been cleared yet");
if (xdsChannel.isUseProtocolV3()) {
if (useProtocolV3) {
lrsStream = new LrsStreamV3();
} else {
lrsStream = new LrsStreamV2();
Expand Down Expand Up @@ -331,7 +334,7 @@ public void run() {
};
io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceStub
stubV2 = io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.newStub(
xdsChannel.getManagedChannel());
channel);
lrsRequestWriterV2 = stubV2.withWaitForReady().streamLoadStats(lrsResponseReaderV2);
logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
Expand Down Expand Up @@ -396,7 +399,7 @@ public void run() {
}
};
LoadReportingServiceStub stubV3 =
LoadReportingServiceGrpc.newStub(xdsChannel.getManagedChannel());
LoadReportingServiceGrpc.newStub(channel);
lrsRequestWriterV3 = stubV3.withWaitForReady().streamLoadStats(lrsResponseReaderV3);
logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
Expand Down
8 changes: 5 additions & 3 deletions xds/src/main/java/io/grpc/xds/ServerXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.envoyproxy.envoy.config.listener.v3.FilterChain;
import io.envoyproxy.envoy.config.listener.v3.FilterChainMatch;
import io.envoyproxy.envoy.config.listener.v3.Listener;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
Expand Down Expand Up @@ -63,16 +64,17 @@ final class ServerXdsClient extends AbstractXdsClient {
private ScheduledHandle ldsRespTimer;

ServerXdsClient(
XdsChannel channel,
ManagedChannel channel,
boolean useProtocolV3,
Node node,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier,
boolean useNewApiForListenerQuery,
String instanceIp,
String grpcServerResourceId) {
super(channel, node, timeService, backoffPolicyProvider, stopwatchSupplier);
this.useNewApiForListenerQuery = channel.isUseProtocolV3() && useNewApiForListenerQuery;
super(channel, useProtocolV3, node, timeService, backoffPolicyProvider, stopwatchSupplier);
this.useNewApiForListenerQuery = useProtocolV3 && useNewApiForListenerQuery;
this.instanceIp = (instanceIp != null ? instanceIp : "0.0.0.0");
this.grpcServerResourceId = grpcServerResourceId != null ? grpcServerResourceId : "grpc/server";
}
Expand Down

0 comments on commit cddc1a5

Please sign in to comment.