Skip to content

Commit

Permalink
Move xds classes for Stubby to xds.client package (#10912)
Browse files Browse the repository at this point in the history
* Move bootstrap, XdsClient, load reporting, XdsLogger and XdsResourceType to xds.client package.
  • Loading branch information
larry-safran committed Feb 27, 2024
1 parent 78b3972 commit 8087977
Show file tree
Hide file tree
Showing 82 changed files with 1,022 additions and 755 deletions.
1 change: 1 addition & 0 deletions xds/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ tasks.named("javadoc").configure {
exclude 'com/google/security/**'
exclude 'io/envoyproxy/**'
// Need to clean up the class structure to reduce how much is exposed
exclude 'io/grpc/xds/client/**'
exclude 'io/grpc/xds/*LoadBalancer*'
exclude 'io/grpc/xds/Bootstrapper.java'
exclude 'io/grpc/xds/Envoy*'
Expand Down
6 changes: 4 additions & 2 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.XdsClient.ResourceWatcher;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
Expand Down
31 changes: 23 additions & 8 deletions xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.ForwardingSubchannel;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.client.LoadStatsManager2.ClusterLocalityStats;
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import io.grpc.xds.internal.security.SslContextProviderSupplier;
import io.grpc.xds.orca.OrcaPerRequestUtil;
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
Expand Down Expand Up @@ -106,13 +109,19 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
Attributes attributes = resolvedAddresses.getAttributes();
if (xdsClientPool == null) {
xdsClientPool = attributes.get(InternalXdsAttributes.XDS_CLIENT_POOL);
assert xdsClientPool != null;
xdsClient = xdsClientPool.getObject();
}
if (callCounterProvider == null) {
callCounterProvider = attributes.get(InternalXdsAttributes.CALL_COUNTER_PROVIDER);
}

ClusterImplConfig config =
(ClusterImplConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (config == null) {
return Status.INTERNAL.withDescription("No cluster configuration found");
}

if (cluster == null) {
cluster = config.cluster;
edsServiceName = config.edsServiceName;
Expand All @@ -125,6 +134,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName);
}
}

childLbHelper.updateDropPolicies(config.dropCategories);
childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
Expand Down Expand Up @@ -205,8 +215,12 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) {
if (locality == null) {
locality = Locality.create("", "", "");
}
final ClusterLocalityStats localityStats = lrsServerInfo == null ? null
: xdsClient.addClusterLocalityStats(lrsServerInfo, cluster, edsServiceName, locality);
final ClusterLocalityStats localityStats =
(lrsServerInfo == null)
? null
: xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
edsServiceName, locality);

Attributes attrs = args.getAttributes().toBuilder().set(
ATTR_CLUSTER_LOCALITY_STATS, localityStats).build();
args = args.toBuilder().setAddresses(addresses).setAttributes(attrs).build();
Expand Down Expand Up @@ -285,7 +299,8 @@ private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsCo
}
sslContextProviderSupplier =
tlsContext != null
? new SslContextProviderSupplier(tlsContext, xdsClient.getTlsContextManager())
? new SslContextProviderSupplier(tlsContext,
(TlsContextManager) xdsClient.getSecurityConfig())
: null;
}

Expand Down Expand Up @@ -348,7 +363,7 @@ public String toString() {

private static final class CountingStreamTracerFactory extends
ClientStreamTracer.Factory {
private ClusterLocalityStats stats;
private final ClusterLocalityStats stats;
private final AtomicLong inFlights;
@Nullable
private final ClientStreamTracer.Factory delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.util.MultiChildLoadBalancer;
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
Expand All @@ -52,9 +51,13 @@
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
import io.grpc.xds.XdsClient.ResourceWatcher;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.internal.ServiceConfigUtil.PolicySelection;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyServerProtoData.OutlierDetection;
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.client.Bootstrapper.ServerInfo;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down
9 changes: 5 additions & 4 deletions xds/src/main/java/io/grpc/xds/CsdsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@
import io.grpc.StatusException;
import io.grpc.internal.ObjectPool;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.XdsClient.ResourceMetadata;
import io.grpc.xds.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.XdsClient.ResourceMetadata.UpdateFailureState;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClient.ResourceMetadata;
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.client.XdsClient.ResourceMetadata.UpdateFailureState;
import io.grpc.xds.client.XdsResourceType;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down
1 change: 1 addition & 0 deletions xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext;
import io.grpc.Internal;
import io.grpc.xds.client.EnvoyProtoData;
import io.grpc.xds.internal.security.SslContextProviderSupplier;
import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down
138 changes: 138 additions & 0 deletions xds/src/main/java/io/grpc/xds/GrpcBootstrapperImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.grpc.ChannelCredentials;
import io.grpc.internal.JsonUtil;
import io.grpc.xds.client.BootstrapperImpl;
import io.grpc.xds.client.XdsInitializationException;
import io.grpc.xds.client.XdsLogger;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

class GrpcBootstrapperImpl extends BootstrapperImpl {
private static final String BOOTSTRAP_PATH_SYS_ENV_VAR = "GRPC_XDS_BOOTSTRAP";
private static final String BOOTSTRAP_PATH_SYS_PROPERTY = "io.grpc.xds.bootstrap";
private static final String BOOTSTRAP_CONFIG_SYS_ENV_VAR = "GRPC_XDS_BOOTSTRAP_CONFIG";
private static final String BOOTSTRAP_CONFIG_SYS_PROPERTY = "io.grpc.xds.bootstrapConfig";
@VisibleForTesting
String bootstrapPathFromEnvVar = System.getenv(BOOTSTRAP_PATH_SYS_ENV_VAR);
@VisibleForTesting
String bootstrapPathFromSysProp = System.getProperty(BOOTSTRAP_PATH_SYS_PROPERTY);
@VisibleForTesting
String bootstrapConfigFromEnvVar = System.getenv(BOOTSTRAP_CONFIG_SYS_ENV_VAR);
@VisibleForTesting
String bootstrapConfigFromSysProp = System.getProperty(BOOTSTRAP_CONFIG_SYS_PROPERTY);

GrpcBootstrapperImpl() {
super();
}

@Override
public BootstrapInfo bootstrap(Map<String, ?> rawData) throws XdsInitializationException {
return super.bootstrap(rawData);
}

/**
* Gets the bootstrap config as JSON. Searches the config (or file of config) with the
* following order:
*
* <ol>
* <li>A filesystem path defined by environment variable "GRPC_XDS_BOOTSTRAP"</li>
* <li>A filesystem path defined by Java System Property "io.grpc.xds.bootstrap"</li>
* <li>Environment variable value of "GRPC_XDS_BOOTSTRAP_CONFIG"</li>
* <li>Java System Property value of "io.grpc.xds.bootstrapConfig"</li>
* </ol>
*/
@Override
protected String getJsonContent() throws XdsInitializationException, IOException {
String jsonContent;
String filePath =
bootstrapPathFromEnvVar != null ? bootstrapPathFromEnvVar : bootstrapPathFromSysProp;
if (filePath != null) {
logger.log(XdsLogger.XdsLogLevel.INFO, "Reading bootstrap file from {0}", filePath);
jsonContent = reader.readFile(filePath);
logger.log(XdsLogger.XdsLogLevel.INFO, "Reading bootstrap from " + filePath);
} else {
jsonContent = bootstrapConfigFromEnvVar != null
? bootstrapConfigFromEnvVar : bootstrapConfigFromSysProp;
}
if (jsonContent == null) {
throw new XdsInitializationException(
"Cannot find bootstrap configuration\n"
+ "Environment variables searched:\n"
+ "- " + BOOTSTRAP_PATH_SYS_ENV_VAR + "\n"
+ "- " + BOOTSTRAP_CONFIG_SYS_ENV_VAR + "\n\n"
+ "Java System Properties searched:\n"
+ "- " + BOOTSTRAP_PATH_SYS_PROPERTY + "\n"
+ "- " + BOOTSTRAP_CONFIG_SYS_PROPERTY + "\n\n");
}

return jsonContent;
}

@Override
protected Object getImplSpecificConfig(Map<String, ?> serverConfig, String serverUri)
throws XdsInitializationException {
return getChannelCredentials(serverConfig, serverUri);
}

private static ChannelCredentials getChannelCredentials(Map<String, ?> serverConfig,
String serverUri)
throws XdsInitializationException {
List<?> rawChannelCredsList = JsonUtil.getList(serverConfig, "channel_creds");
if (rawChannelCredsList == null || rawChannelCredsList.isEmpty()) {
throw new XdsInitializationException(
"Invalid bootstrap: server " + serverUri + " 'channel_creds' required");
}
ChannelCredentials channelCredentials =
parseChannelCredentials(JsonUtil.checkObjectList(rawChannelCredsList), serverUri);
if (channelCredentials == null) {
throw new XdsInitializationException(
"Server " + serverUri + ": no supported channel credentials found");
}
return channelCredentials;
}

@Nullable
private static ChannelCredentials parseChannelCredentials(List<Map<String, ?>> jsonList,
String serverUri)
throws XdsInitializationException {
for (Map<String, ?> channelCreds : jsonList) {
String type = JsonUtil.getString(channelCreds, "type");
if (type == null) {
throw new XdsInitializationException(
"Invalid bootstrap: server " + serverUri + " with 'channel_creds' type unspecified");
}
XdsCredentialsProvider provider = XdsCredentialsRegistry.getDefaultRegistry()
.getProvider(type);
if (provider != null) {
Map<String, ?> config = JsonUtil.getObject(channelCreds, "config");
if (config == null) {
config = ImmutableMap.of();
}

return provider.newChannelCredentials(config);
}
}
return null;
}
}
13 changes: 11 additions & 2 deletions xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import io.grpc.CallOptions;
import io.grpc.ChannelCredentials;
import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.xds.client.Bootstrapper;
import io.grpc.xds.client.XdsTransportFactory;
import java.util.concurrent.TimeUnit;

final class GrpcXdsTransportFactory implements XdsTransportFactory {
Expand All @@ -51,7 +54,7 @@ static class GrpcXdsTransport implements XdsTransport {

public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) {
String target = serverInfo.target();
ChannelCredentials channelCredentials = serverInfo.channelCredentials();
ChannelCredentials channelCredentials = (ChannelCredentials) serverInfo.implSpecificConfig();
this.channel = Grpc.newChannelBuilder(target, channelCredentials)
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
Expand All @@ -67,7 +70,13 @@ public <ReqT, RespT> StreamingCall<ReqT, RespT> createStreamingCall(
String fullMethodName,
MethodDescriptor.Marshaller<ReqT> reqMarshaller,
MethodDescriptor.Marshaller<RespT> respMarshaller) {
return new XdsStreamingCall<>(fullMethodName, reqMarshaller, respMarshaller);
Context prevContext = Context.ROOT.attach();
try {
return new XdsStreamingCall<>(fullMethodName, reqMarshaller, respMarshaller);
} finally {
Context.ROOT.detach(prevContext);
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import io.grpc.Internal;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsInitializationException;
import java.util.Map;

/**
Expand Down
2 changes: 2 additions & 0 deletions xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.grpc.NameResolver;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.internal.security.SslContextProviderSupplier;

/**
Expand Down
5 changes: 3 additions & 2 deletions xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
import io.grpc.LoadBalancerRegistry;
import io.grpc.internal.JsonParser;
import io.grpc.xds.LoadBalancerConfigFactory.LoadBalancingPolicyConverter.MaxRecursionReachedException;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsResourceType.ResourceInvalidException;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
import java.io.IOException;
import java.util.Map;

Expand Down

0 comments on commit 8087977

Please sign in to comment.