Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 20 additions & 30 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -153,17 +149,16 @@ private void handleClusterDiscovered() {
}
if (clusterState.isLeaf) {
DiscoveryMechanism instance;
if (clusterState.result instanceof EdsClusterConfig) {
EdsClusterConfig clusterConfig = (EdsClusterConfig) clusterState.result;
instance = DiscoveryMechanism.forEds(clusterState.name, clusterConfig.edsServiceName,
clusterConfig.lrsServerName, clusterConfig.maxConcurrentRequests,
clusterConfig.upstreamTlsContext);
if (clusterState.result.clusterType() == ClusterType.EDS) {
instance = DiscoveryMechanism.forEds(
clusterState.name, clusterState.result.edsServiceName(),
clusterState.result.lrsServerName(), clusterState.result.maxConcurrentRequests(),
clusterState.result.upstreamTlsContext());
} else { // logical DNS
LogicalDnsClusterConfig clusterConfig =
(LogicalDnsClusterConfig) clusterState.result;
instance = DiscoveryMechanism.forLogicalDns(clusterState.name,
clusterConfig.lrsServerName, clusterConfig.maxConcurrentRequests,
clusterConfig.upstreamTlsContext);
instance = DiscoveryMechanism.forLogicalDns(
clusterState.name, clusterState.result.lrsServerName(),
clusterState.result.maxConcurrentRequests(),
clusterState.result.upstreamTlsContext());
}
instances.add(instance);
} else {
Expand All @@ -183,7 +178,7 @@ private void handleClusterDiscovered() {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
return;
}
String endpointPickingPolicy = root.result.lbPolicy;
String endpointPickingPolicy = root.result.lbPolicy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of ring_hash, endpointPickingPolicy should be appended with _experimental.
new PolicySelection(endpointPickingLbProvider, null) should not pass a non-null config.
It's fine if you do it when implementing the balancer.

LoadBalancerProvider localityPickingLbProvider =
lbRegistry.getProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded
LoadBalancerProvider endpointPickingLbProvider =
Expand Down Expand Up @@ -212,7 +207,7 @@ private final class ClusterState implements CdsResourceWatcher {
@Nullable
private Map<String, ClusterState> childClusterStates;
@Nullable
private ClusterConfig result;
private CdsUpdate result;
// Following fields are effectively final.
private boolean isLeaf;
private boolean discovered;
Expand Down Expand Up @@ -281,15 +276,15 @@ public void run() {
if (shutdown) {
return;
}
logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true;
result = update.clusterConfig;
if (update.clusterType == ClusterType.AGGREGATE) {
result = update;
if (update.clusterType() == ClusterType.AGGREGATE) {
isLeaf = false;
AggregateClusterConfig clusterConfig = (AggregateClusterConfig) update.clusterConfig;
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}", update.clusterName);
logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig);
logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}",
update.clusterName(), update.prioritizedClusterNames());
Map<String, ClusterState> newChildStates = new LinkedHashMap<>();
for (String cluster : clusterConfig.prioritizedClusterNames) {
for (String cluster : update.prioritizedClusterNames()) {
if (childClusterStates == null || !childClusterStates.containsKey(cluster)) {
ClusterState childState = new ClusterState(cluster);
childState.start();
Expand All @@ -304,18 +299,13 @@ public void run() {
}
}
childClusterStates = newChildStates;
} else if (update.clusterType == ClusterType.EDS) {
} else if (update.clusterType() == ClusterType.EDS) {
isLeaf = true;
EdsClusterConfig clusterConfig = (EdsClusterConfig) update.clusterConfig;
logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}",
update.clusterName, clusterConfig.edsServiceName);
logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig);
update.clusterName(), update.edsServiceName());
} else { // logical DNS
isLeaf = true;
LogicalDnsClusterConfig clusterConfig =
(LogicalDnsClusterConfig) update.clusterConfig;
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName);
logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig);
logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName());
}
handleClusterDiscovered();
}
Expand Down
140 changes: 85 additions & 55 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.grpc.xds.EnvoyProtoData.TRANSPORT_SOCKET_NAME_TLS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CaseFormat;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
Expand All @@ -34,6 +35,7 @@
import io.envoyproxy.envoy.config.cluster.v3.Cluster.CustomClusterType;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy;
import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig;
import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions;
import io.envoyproxy.envoy.config.core.v3.RoutingPriority;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
Expand Down Expand Up @@ -65,11 +67,9 @@
import io.grpc.xds.VirtualHost.Route;
import io.grpc.xds.VirtualHost.Route.RouteAction;
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
import io.grpc.xds.VirtualHost.Route.RouteMatch;
import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig;
import io.grpc.xds.XdsClient.CdsUpdate.HashFunction;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.net.InetSocketAddress;
import java.util.ArrayList;
Expand All @@ -96,6 +96,8 @@ final class ClientXdsClient extends AbstractXdsClient {
@VisibleForTesting
static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate";
private static final String HTTP_FAULT_FILTER_NAME = "envoy.fault";
@VisibleForTesting
static final String HASH_POLICY_FILTER_STATE_KEY = "io.grpc.channel_id";
private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 =
"type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2"
+ ".HttpConnectionManager";
Expand Down Expand Up @@ -468,10 +470,42 @@ static StructOrError<RouteAction> parseRouteAction(
timeoutNano = Durations.toNanos(maxStreamDuration.getMaxStreamDuration());
}
}
List<ClusterWeight> weightedClusters;
List<HashPolicy> hashPolicies = new ArrayList<>();
for (io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy config
: proto.getHashPolicyList()) {
HashPolicy policy = null;
boolean terminal = config.getTerminal();
switch (config.getPolicySpecifierCase()) {
case HEADER:
io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.Header headerCfg =
config.getHeader();
Pattern regEx = null;
String regExSubstitute = null;
if (headerCfg.hasRegexRewrite() && headerCfg.getRegexRewrite().hasPattern()
&& headerCfg.getRegexRewrite().getPattern().hasGoogleRe2()) {
regEx = Pattern.compile(headerCfg.getRegexRewrite().getPattern().getRegex());
regExSubstitute = headerCfg.getRegexRewrite().getSubstitution();
}
policy = HashPolicy.forHeader(
terminal, headerCfg.getHeaderName(), regEx, regExSubstitute);
break;
case FILTER_STATE:
if (config.getFilterState().getKey().equals(HASH_POLICY_FILTER_STATE_KEY)) {
policy = HashPolicy.forChannelId(terminal);
}
break;
default:
// Ignore
}
if (policy != null) {
hashPolicies.add(policy);
}
}

switch (proto.getClusterSpecifierCase()) {
case CLUSTER:
return StructOrError.fromStruct(RouteAction.forCluster(proto.getCluster(), timeoutNano));
return StructOrError.fromStruct(RouteAction.forCluster(
proto.getCluster(), hashPolicies, timeoutNano));
case CLUSTER_HEADER:
return null;
case WEIGHTED_CLUSTERS:
Expand All @@ -480,7 +514,7 @@ static StructOrError<RouteAction> parseRouteAction(
if (clusterWeights.isEmpty()) {
return StructOrError.fromError("No cluster found in weighted cluster list");
}
weightedClusters = new ArrayList<>();
List<ClusterWeight> weightedClusters = new ArrayList<>();
for (io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight clusterWeight
: clusterWeights) {
StructOrError<ClusterWeight> clusterWeightOrError = parseClusterWeight(clusterWeight);
Expand All @@ -492,7 +526,7 @@ static StructOrError<RouteAction> parseRouteAction(
}
// TODO(chengyuanzhang): validate if the sum of weights equals to total weight.
return StructOrError.fromStruct(RouteAction.forWeightedClusters(
weightedClusters, timeoutNano));
weightedClusters, hashPolicies, timeoutNano));
case CLUSTERSPECIFIER_NOT_SET:
default:
return StructOrError.fromError(
Expand Down Expand Up @@ -713,30 +747,44 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String
if (!cdsResourceSubscribers.containsKey(clusterName)) {
continue;
}
// The lb_policy field must be set to ROUND_ROBIN.
if (!cluster.getLbPolicy().equals(LbPolicy.ROUND_ROBIN)) {
nackResponse(ResourceType.CDS, nonce,
"Cluster " + clusterName + ": unsupported Lb policy: " + cluster.getLbPolicy());
return;
}
String lbPolicy = "round_robin";
CdsUpdate update = null;
StructOrError<CdsUpdate.Builder> structOrError;
switch (cluster.getClusterDiscoveryTypeCase()) {
case TYPE:
update = parseNonAggregateCluster(cluster, nonce, lbPolicy, edsResources);
structOrError = parseNonAggregateCluster(cluster, edsResources);
break;
case CLUSTER_TYPE:
update = parseAggregateCluster(cluster, nonce, lbPolicy);
structOrError = parseAggregateCluster(cluster);
break;
case CLUSTERDISCOVERYTYPE_NOT_SET:
default:
nackResponse(ResourceType.CDS, nonce,
"Cluster " + clusterName + ": cluster discovery type unspecified");
return;
}
if (update == null) {
if (structOrError.getErrorDetail() != null) {
nackResponse(ResourceType.CDS, nonce, structOrError.errorDetail);
return;
}
cdsUpdates.put(clusterName, update);
CdsUpdate.Builder updateBuilder = structOrError.getStruct();
String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to(
CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name());
if (cluster.getLbPolicy() == LbPolicy.RING_HASH) {
HashFunction hashFunction;
RingHashLbConfig lbConfig = cluster.getRingHashLbConfig();
if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.XX_HASH) {
hashFunction = HashFunction.XX_HASH;
} else {
nackResponse(ResourceType.CDS, nonce,
"Cluster " + clusterName + ": unsupported ring hash function: "
+ lbConfig.getHashFunction());
return;
}
updateBuilder.lbPolicy(lbPolicy, lbConfig.getMinimumRingSize().getValue(),
lbConfig.getMaximumRingSize().getValue(), hashFunction);
} else {
updateBuilder.lbPolicy(lbPolicy);
}
cdsUpdates.put(clusterName, updateBuilder.build());
}
ackResponse(ResourceType.CDS, versionInfo, nonce);

Expand All @@ -756,18 +804,13 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String
}
}

/**
* Parses CDS resource for an aggregate cluster into {@link io.grpc.xds.XdsClient.CdsUpdate}.
* Returns {@code null} and nack the response with the given nonce if the resource is invalid.
*/
private CdsUpdate parseAggregateCluster(Cluster cluster, String nonce, String lbPolicy) {
private static StructOrError<CdsUpdate.Builder> parseAggregateCluster(Cluster cluster) {
String clusterName = cluster.getName();
CustomClusterType customType = cluster.getClusterType();
String typeName = customType.getName();
if (!typeName.equals(AGGREGATE_CLUSTER_TYPE_NAME)) {
nackResponse(ResourceType.CDS, nonce,
return StructOrError.fromError(
"Cluster " + clusterName + ": unsupported custom cluster type: " + typeName);
return null;
}
io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig clusterConfig;
Any unpackedClusterConfig = customType.getTypedConfig();
Expand All @@ -779,31 +822,23 @@ private CdsUpdate parseAggregateCluster(Cluster cluster, String nonce, String lb
clusterConfig = unpackedClusterConfig.unpack(
io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig.class);
} catch (InvalidProtocolBufferException e) {
nackResponse(ResourceType.CDS, nonce,
"Cluster " + clusterName + ": invalid cluster config: " + e);
StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e);
return null;
}
AggregateClusterConfig config =
new AggregateClusterConfig(lbPolicy, clusterConfig.getClustersList());
return new CdsUpdate(clusterName, ClusterType.AGGREGATE, config);
return StructOrError.fromStruct(CdsUpdate.forAggregate(
clusterName, clusterConfig.getClustersList()));
}

/**
* Parses CDS resource for a non-aggregate cluster (EDS or Logical DNS) into {@link
* io.grpc.xds.XdsClient.CdsUpdate}. Returns {@code null} and nack the response with the given
* nonce if the resource is invalid.
*/
private CdsUpdate parseNonAggregateCluster(Cluster cluster, String nonce, String lbPolicy,
Set<String> edsResources) {
private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(
Cluster cluster, Set<String> edsResources) {
String clusterName = cluster.getName();
String lrsServerName = null;
Long maxConcurrentRequests = null;
UpstreamTlsContext upstreamTlsContext = null;
if (cluster.hasLrsServer()) {
if (!cluster.getLrsServer().hasSelf()) {
nackResponse(ResourceType.CDS, nonce,
return StructOrError.fromError(
"Cluster " + clusterName + ": only support LRS for the same management server");
return null;
}
lrsServerName = "";
}
Expand All @@ -829,9 +864,8 @@ private CdsUpdate parseNonAggregateCluster(Cluster cluster, String nonce, String
unpacked = any.unpack(
io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext.class);
} catch (InvalidProtocolBufferException e) {
nackResponse(ResourceType.CDS, nonce,
"Cluster " + clusterName + ": invalid upstream TLS context: " + e);
return null;
return StructOrError.fromError(
"Cluster " + clusterName + ": malformed UpstreamTlsContext: " + e);
}
upstreamTlsContext = UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext(unpacked);
}
Expand All @@ -842,9 +876,8 @@ private CdsUpdate parseNonAggregateCluster(Cluster cluster, String nonce, String
io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig edsClusterConfig =
cluster.getEdsClusterConfig();
if (!edsClusterConfig.getEdsConfig().hasAds()) {
nackResponse(ResourceType.CDS, nonce, "Cluster " + clusterName + ": "
+ "field eds_cluster_config must be set to indicate to use EDS over ADS.");
return null;
return StructOrError.fromError("Cluster " + clusterName
+ ": field eds_cluster_config must be set to indicate to use EDS over ADS.");
}
// If the service_name field is set, that value will be used for the EDS request.
if (!edsClusterConfig.getServiceName().isEmpty()) {
Expand All @@ -853,17 +886,14 @@ private CdsUpdate parseNonAggregateCluster(Cluster cluster, String nonce, String
} else {
edsResources.add(clusterName);
}
EdsClusterConfig config = new EdsClusterConfig(lbPolicy, edsServiceName,
lrsServerName, maxConcurrentRequests, upstreamTlsContext);
return new CdsUpdate(clusterName, ClusterType.EDS, config);
return StructOrError.fromStruct(CdsUpdate.forEds(
clusterName, edsServiceName, lrsServerName, maxConcurrentRequests, upstreamTlsContext));
} else if (type.equals(DiscoveryType.LOGICAL_DNS)) {
LogicalDnsClusterConfig config = new LogicalDnsClusterConfig(lbPolicy, lrsServerName,
maxConcurrentRequests, upstreamTlsContext);
return new CdsUpdate(clusterName, ClusterType.LOGICAL_DNS, config);
return StructOrError.fromStruct(CdsUpdate.forLogicalDns(
clusterName, lrsServerName, maxConcurrentRequests, upstreamTlsContext));
}
nackResponse(ResourceType.CDS, nonce,
return StructOrError.fromError(
"Cluster " + clusterName + ": unsupported built-in discovery type: " + type);
return null;
}

@Override
Expand Down
Loading