From 92c5291bea2a9f65087fd45214e298915ccdebd6 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Mon, 1 Feb 2021 00:32:08 -0800 Subject: [PATCH 1/5] Support XdsClient parsing endpoint load balancing policy other than round_robin. --- .../java/io/grpc/xds/CdsLoadBalancer2.java | 42 ++- .../java/io/grpc/xds/ClientXdsClient.java | 69 +++-- xds/src/main/java/io/grpc/xds/XdsClient.java | 247 +++++++----------- .../io/grpc/xds/CdsLoadBalancer2Test.java | 16 +- .../io/grpc/xds/ClientXdsClientTestBase.java | 213 ++++++++------- .../io/grpc/xds/ClientXdsClientV2Test.java | 64 +++-- .../io/grpc/xds/ClientXdsClientV3Test.java | 64 +++-- 7 files changed, 375 insertions(+), 340 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 62f2b49249d..ae23528211b 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -34,11 +34,7 @@ import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsUpdate; -import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.ClusterConfig; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; -import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig; import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayDeque; @@ -153,17 +149,16 @@ private void handleClusterDiscovered() { } if (clusterState.isLeaf) { DiscoveryMechanism instance; - if (clusterState.result instanceof EdsClusterConfig) { - EdsClusterConfig clusterConfig = (EdsClusterConfig) clusterState.result; - instance = DiscoveryMechanism.forEds(clusterState.name, clusterConfig.edsServiceName, - clusterConfig.lrsServerName, clusterConfig.maxConcurrentRequests, - clusterConfig.upstreamTlsContext); + if (clusterState.result.clusterType == ClusterType.EDS) { + instance = DiscoveryMechanism.forEds( + clusterState.name, clusterState.result.edsServiceName, + clusterState.result.lrsServerName, clusterState.result.maxConcurrentRequests, + clusterState.result.upstreamTlsContext); } else { // logical DNS - LogicalDnsClusterConfig clusterConfig = - (LogicalDnsClusterConfig) clusterState.result; - instance = DiscoveryMechanism.forLogicalDns(clusterState.name, - clusterConfig.lrsServerName, clusterConfig.maxConcurrentRequests, - clusterConfig.upstreamTlsContext); + instance = DiscoveryMechanism.forLogicalDns( + clusterState.name, clusterState.result.lrsServerName, + clusterState.result.maxConcurrentRequests, + clusterState.result.upstreamTlsContext); } instances.add(instance); } else { @@ -212,7 +207,7 @@ private final class ClusterState implements CdsResourceWatcher { @Nullable private Map childClusterStates; @Nullable - private ClusterConfig result; + private CdsUpdate result; // Following fields are effectively final. private boolean isLeaf; private boolean discovered; @@ -281,15 +276,15 @@ public void run() { if (shutdown) { return; } + logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update); discovered = true; - result = update.clusterConfig; + result = update; if (update.clusterType == ClusterType.AGGREGATE) { isLeaf = false; - AggregateClusterConfig clusterConfig = (AggregateClusterConfig) update.clusterConfig; - logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}", update.clusterName); - logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig); + logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}", + update.clusterName, update.prioritizedClusterNames); Map newChildStates = new LinkedHashMap<>(); - for (String cluster : clusterConfig.prioritizedClusterNames) { + for (String cluster : update.prioritizedClusterNames) { if (childClusterStates == null || !childClusterStates.containsKey(cluster)) { ClusterState childState = new ClusterState(cluster); childState.start(); @@ -306,16 +301,11 @@ public void run() { childClusterStates = newChildStates; } else if (update.clusterType == ClusterType.EDS) { isLeaf = true; - EdsClusterConfig clusterConfig = (EdsClusterConfig) update.clusterConfig; logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", - update.clusterName, clusterConfig.edsServiceName); - logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig); + update.clusterName, update.edsServiceName); } else { // logical DNS isLeaf = true; - LogicalDnsClusterConfig clusterConfig = - (LogicalDnsClusterConfig) update.clusterConfig; logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName); - logger.log(XdsLogLevel.DEBUG, "Cluster config: {0}", clusterConfig); } handleClusterDiscovered(); } diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 9344e2ec90e..b48538f5926 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -21,6 +21,7 @@ import static io.grpc.xds.EnvoyProtoData.TRANSPORT_SOCKET_NAME_TLS; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.CaseFormat; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.protobuf.Any; @@ -31,6 +32,7 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster.CustomClusterType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; import io.envoyproxy.envoy.config.core.v3.HttpProtocolOptions; import io.envoyproxy.envoy.config.core.v3.RoutingPriority; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; @@ -53,10 +55,7 @@ import io.grpc.xds.EnvoyProtoData.StructOrError; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.LoadStatsManager.LoadStatsStore; -import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; -import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig; +import io.grpc.xds.XdsClient.CdsUpdate.HashFunction; import io.grpc.xds.XdsLogger.XdsLogLevel; import java.util.ArrayList; import java.util.Collection; @@ -317,20 +316,13 @@ protected void handleCdsResponse(String versionInfo, List resources, String if (!cdsResourceSubscribers.containsKey(clusterName)) { continue; } - // The lb_policy field must be set to ROUND_ROBIN. - if (!cluster.getLbPolicy().equals(LbPolicy.ROUND_ROBIN)) { - nackResponse(ResourceType.CDS, nonce, - "Cluster " + clusterName + ": unsupported Lb policy: " + cluster.getLbPolicy()); - return; - } - String lbPolicy = "round_robin"; CdsUpdate update = null; switch (cluster.getClusterDiscoveryTypeCase()) { case TYPE: - update = parseNonAggregateCluster(cluster, nonce, lbPolicy, edsResources); + update = parseNonAggregateCluster(cluster, nonce, edsResources); break; case CLUSTER_TYPE: - update = parseAggregateCluster(cluster, nonce, lbPolicy); + update = parseAggregateCluster(cluster, nonce); break; case CLUSTERDISCOVERYTYPE_NOT_SET: default: @@ -364,8 +356,23 @@ protected void handleCdsResponse(String versionInfo, List resources, String * Parses CDS resource for an aggregate cluster into {@link io.grpc.xds.XdsClient.CdsUpdate}. * Returns {@code null} and nack the response with the given nonce if the resource is invalid. */ - private CdsUpdate parseAggregateCluster(Cluster cluster, String nonce, String lbPolicy) { + private CdsUpdate parseAggregateCluster(Cluster cluster, String nonce) { String clusterName = cluster.getName(); + String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to( + CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name()); + long minRingSize = -1; + long maxRingSize = -1; + HashFunction hashFunction = null; + if (cluster.getLbPolicy() == LbPolicy.RING_HASH) { + RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); + minRingSize = lbConfig.getMinimumRingSize().getValue(); + maxRingSize = lbConfig.getMaximumRingSize().getValue(); + if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.XX_HASH) { + hashFunction = HashFunction.XX_HASH; + } else if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.MURMUR_HASH_2) { + hashFunction = HashFunction.MURMUR_HASH_2; + } + } CustomClusterType customType = cluster.getClusterType(); String typeName = customType.getName(); if (!typeName.equals(AGGREGATE_CLUSTER_TYPE_NAME)) { @@ -387,9 +394,8 @@ private CdsUpdate parseAggregateCluster(Cluster cluster, String nonce, String lb "Cluster " + clusterName + ": invalid cluster config: " + e); return null; } - AggregateClusterConfig config = - new AggregateClusterConfig(lbPolicy, clusterConfig.getClustersList()); - return new CdsUpdate(clusterName, ClusterType.AGGREGATE, config); + return CdsUpdate.forAggregate(clusterName, lbPolicy, minRingSize, maxRingSize, hashFunction, + clusterConfig.getClustersList()); } /** @@ -397,9 +403,24 @@ private CdsUpdate parseAggregateCluster(Cluster cluster, String nonce, String lb * io.grpc.xds.XdsClient.CdsUpdate}. Returns {@code null} and nack the response with the given * nonce if the resource is invalid. */ - private CdsUpdate parseNonAggregateCluster(Cluster cluster, String nonce, String lbPolicy, - Set edsResources) { + private CdsUpdate parseNonAggregateCluster( + Cluster cluster, String nonce, Set edsResources) { String clusterName = cluster.getName(); + String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to( + CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name()); + long minRingSize = -1; + long maxRingSize = -1; + HashFunction hashFunction = null; + if (cluster.getLbPolicy() == LbPolicy.RING_HASH) { + RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); + minRingSize = lbConfig.getMinimumRingSize().getValue(); + maxRingSize = lbConfig.getMaximumRingSize().getValue(); + if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.XX_HASH) { + hashFunction = HashFunction.XX_HASH; + } else if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.MURMUR_HASH_2) { + hashFunction = HashFunction.MURMUR_HASH_2; + } + } String lrsServerName = null; Long maxConcurrentRequests = null; UpstreamTlsContext upstreamTlsContext = null; @@ -457,13 +478,11 @@ private CdsUpdate parseNonAggregateCluster(Cluster cluster, String nonce, String } else { edsResources.add(clusterName); } - EdsClusterConfig config = new EdsClusterConfig(lbPolicy, edsServiceName, - lrsServerName, maxConcurrentRequests, upstreamTlsContext); - return new CdsUpdate(clusterName, ClusterType.EDS, config); + return CdsUpdate.forEds(clusterName, lbPolicy, minRingSize, maxRingSize, hashFunction, + edsServiceName, lrsServerName, maxConcurrentRequests, upstreamTlsContext); } else if (type.equals(DiscoveryType.LOGICAL_DNS)) { - LogicalDnsClusterConfig config = new LogicalDnsClusterConfig(lbPolicy, lrsServerName, - maxConcurrentRequests, upstreamTlsContext); - return new CdsUpdate(clusterName, ClusterType.LOGICAL_DNS, config); + return CdsUpdate.forLogicalDns(clusterName, lbPolicy, minRingSize, maxRingSize, hashFunction, + lrsServerName, maxConcurrentRequests, upstreamTlsContext); } nackResponse(ResourceType.CDS, nonce, "Cluster " + clusterName + ": unsupported built-in discovery type: " + type); diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 809558b088b..3c6258d9558 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -157,20 +157,89 @@ public boolean equals(Object o) { } } + /** xDS resource update for cluster-level configuration. */ static final class CdsUpdate implements ResourceUpdate { final String clusterName; final ClusterType clusterType; - final ClusterConfig clusterConfig; + // Endpoint-level load balancing policy. + final String lbPolicy; + // Only valid if lbPolicy is "ring_hash". + final long minRingSize; + // Only valid if lbPolicy is "ring_hash". + final long maxRingSize; + // Only valid if lbPolicy is "ring_hash". + @Nullable + final HashFunction hashFunction; + // Alternative resource name to be used in EDS requests. + /// Only valid for EDS cluster. + @Nullable + final String edsServiceName; + // Load report server name for reporting loads via LRS. + // Only valid for EDS or LOGICAL_DNS cluster. + @Nullable + final String lrsServerName; + // Max number of concurrent requests can be sent to this cluster. + // Only valid for EDS or LOGICAL_DNS cluster. + @Nullable + final Long maxConcurrentRequests; + // TLS context used to connect to connect to this cluster. + // Only valid for EDS or LOGICAL_DNS cluster. + @Nullable + final UpstreamTlsContext upstreamTlsContext; + // List of underlying clusters making of this aggregate cluster. + // Only valid for AGGREGATE cluster. + @Nullable + final List prioritizedClusterNames; + + static CdsUpdate forAggregate(String clusterName, String lbPolicy, long minRingSize, + long maxRingSize, @Nullable HashFunction hashFunction, + List prioritizedClusterNames) { + return new CdsUpdate(clusterName, ClusterType.AGGREGATE, lbPolicy, minRingSize, maxRingSize, + hashFunction, null, null, null, null, + checkNotNull(prioritizedClusterNames, "prioritizedClusterNames")); + } + + static CdsUpdate forEds(String clusterName, String lbPolicy, long minRingSize, + long maxRingSize, @Nullable HashFunction hashFunction, @Nullable String edsServiceName, + @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, + @Nullable UpstreamTlsContext upstreamTlsContext) { + return new CdsUpdate(clusterName, ClusterType.EDS, lbPolicy, minRingSize, maxRingSize, + hashFunction, edsServiceName, lrsServerName, maxConcurrentRequests, upstreamTlsContext, + null); + } - CdsUpdate(String clusterName, ClusterType clusterType, ClusterConfig clusterConfig) { + static CdsUpdate forLogicalDns(String clusterName, String lbPolicy, long minRingSize, + long maxRingSize, @Nullable HashFunction hashFunction, @Nullable String lrsServerName, + @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) { + return new CdsUpdate(clusterName, ClusterType.LOGICAL_DNS, lbPolicy, minRingSize, + maxRingSize, hashFunction, null, lrsServerName, maxConcurrentRequests, + upstreamTlsContext, null); + } + + CdsUpdate(String clusterName, ClusterType clusterType, @Nullable String lbPolicy, + long minRingSize, long maxRingSize, @Nullable HashFunction hashFunction, + @Nullable String edsServiceName, @Nullable String lrsServerName, + @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext, + @Nullable List prioritizedClusterNames) { this.clusterName = checkNotNull(clusterName, "clusterName"); this.clusterType = checkNotNull(clusterType, "clusterType"); - this.clusterConfig = checkNotNull(clusterConfig, "clusterConfig"); + this.lbPolicy = checkNotNull(lbPolicy, "lbPolicy"); + this.minRingSize = minRingSize; + this.maxRingSize = maxRingSize; + this.hashFunction = hashFunction; + this.edsServiceName = edsServiceName; + this.lrsServerName = lrsServerName; + this.maxConcurrentRequests = maxConcurrentRequests; + this.upstreamTlsContext = upstreamTlsContext; + this.prioritizedClusterNames = prioritizedClusterNames != null + ? Collections.unmodifiableList(new ArrayList<>(prioritizedClusterNames)) : null; } @Override public int hashCode() { - return Objects.hash(clusterName, clusterType, clusterConfig); + return Objects.hash(clusterName, clusterType, lbPolicy, minRingSize, maxRingSize, + hashFunction, edsServiceName, lrsServerName, maxConcurrentRequests, upstreamTlsContext, + prioritizedClusterNames); } @Override @@ -184,7 +253,15 @@ public boolean equals(Object o) { CdsUpdate that = (CdsUpdate) o; return Objects.equals(clusterName, that.clusterName) && Objects.equals(clusterType, that.clusterType) - && Objects.equals(clusterConfig, that.clusterConfig); + && Objects.equals(lbPolicy, that.lbPolicy) + && minRingSize == that.minRingSize + && maxRingSize == that.maxRingSize + && Objects.equals(hashFunction, that.hashFunction) + && Objects.equals(edsServiceName, that.edsServiceName) + && Objects.equals(lrsServerName, that.lrsServerName) + && Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests) + && Objects.equals(upstreamTlsContext, that.upstreamTlsContext) + && Objects.equals(prioritizedClusterNames, that.prioritizedClusterNames); } @Override @@ -192,7 +269,15 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("clusterName", clusterName) .add("clusterType", clusterType) - .add("clusterConfig", clusterConfig) + .add("lbPolicy", lbPolicy) + .add("minRingSize", minRingSize) + .add("maxRingSize", maxRingSize) + .add("hashFunction", hashFunction) + .add("edsServiceName", edsServiceName) + .add("lrsServerName", lrsServerName) + .add("maxConcurrentRequests", maxConcurrentRequests) + // Exclude upstreamTlsContext as its string representation is cumbersome. + .add("prioritizedClusterNames", prioritizedClusterNames) .toString(); } @@ -200,154 +285,8 @@ enum ClusterType { EDS, LOGICAL_DNS, AGGREGATE } - abstract static class ClusterConfig { - // Endpoint level load balancing policy. - final String lbPolicy; - - private ClusterConfig(String lbPolicy) { - this.lbPolicy = checkNotNull(lbPolicy, "lbPolicy"); - } - } - - static final class AggregateClusterConfig extends ClusterConfig { - // List of underlying clusters making of this aggregate cluster. - final List prioritizedClusterNames; - - AggregateClusterConfig(String lbPolicy, List prioritizedClusterNames) { - super(lbPolicy); - this.prioritizedClusterNames = - Collections.unmodifiableList(new ArrayList<>(prioritizedClusterNames)); - } - - @Override - public int hashCode() { - return Objects.hash(lbPolicy, prioritizedClusterNames); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - AggregateClusterConfig that = (AggregateClusterConfig) o; - return Objects.equals(lbPolicy, that.lbPolicy) - && Objects.equals(prioritizedClusterNames, that.prioritizedClusterNames); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("lbPolicy", lbPolicy) - .add("prioritizedClusterNames", prioritizedClusterNames) - .toString(); - } - } - - private abstract static class NonAggregateClusterConfig extends ClusterConfig { - // Load report server name for reporting loads via LRS. - @Nullable - final String lrsServerName; - // Max number of concurrent requests can be sent to this cluster. - // FIXME(chengyuanzhang): protobuf uint32 is int in Java, so this field can be Integer. - @Nullable - final Long maxConcurrentRequests; - // TLS context used to connect to connect to this cluster. - @Nullable - final UpstreamTlsContext upstreamTlsContext; - - private NonAggregateClusterConfig(String lbPolicy, @Nullable String lrsServerName, - @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) { - super(lbPolicy); - this.lrsServerName = lrsServerName; - this.maxConcurrentRequests = maxConcurrentRequests; - this.upstreamTlsContext = upstreamTlsContext; - } - } - - static final class EdsClusterConfig extends NonAggregateClusterConfig { - // Alternative resource name to be used in EDS requests. - @Nullable - final String edsServiceName; - - EdsClusterConfig(String lbPolicy, @Nullable String edsServiceName, - @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext upstreamTlsContext) { - super(lbPolicy, lrsServerName, maxConcurrentRequests, upstreamTlsContext); - this.edsServiceName = edsServiceName; - } - - @Override - public int hashCode() { - return Objects.hash(lbPolicy, edsServiceName, lrsServerName, maxConcurrentRequests, - upstreamTlsContext); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - EdsClusterConfig that = (EdsClusterConfig) o; - return Objects.equals(lbPolicy, that.lbPolicy) - && Objects.equals(edsServiceName, that.edsServiceName) - && Objects.equals(lrsServerName, that.lrsServerName) - && Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests) - && Objects.equals(upstreamTlsContext, that.upstreamTlsContext); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("lbPolicy", lbPolicy) - .add("edsServiceName", edsServiceName) - .add("lrsServerName", lrsServerName) - .add("maxConcurrentRequests", maxConcurrentRequests) - // Exclude upstreamTlsContext as its string representation is cumbersome. - .toString(); - } - } - - static final class LogicalDnsClusterConfig extends NonAggregateClusterConfig { - LogicalDnsClusterConfig(String lbPolicy, @Nullable String lrsServerName, - @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) { - super(lbPolicy, lrsServerName, maxConcurrentRequests, upstreamTlsContext); - } - - @Override - public int hashCode() { - return Objects.hash(lbPolicy, lrsServerName, maxConcurrentRequests, upstreamTlsContext); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - LogicalDnsClusterConfig that = (LogicalDnsClusterConfig) o; - return Objects.equals(lbPolicy, that.lbPolicy) - && Objects.equals(lrsServerName, that.lrsServerName) - && Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests) - && Objects.equals(upstreamTlsContext, that.upstreamTlsContext); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("lbPolicy", lbPolicy) - .add("lrsServerName", lrsServerName) - .add("maxConcurrentRequests", maxConcurrentRequests) - // Exclude upstreamTlsContext as its string representation is cumbersome. - .toString(); - } + enum HashFunction { + XX_HASH, MURMUR_HASH_2 } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 8957a4e5df8..4a9806be05b 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -47,10 +47,6 @@ import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; -import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; -import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig; import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil; import java.util.ArrayList; import java.util.Arrays; @@ -548,9 +544,8 @@ private void deliverEdsCluster(String clusterName, @Nullable String edsServiceNa @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) { if (watchers.containsKey(clusterName)) { - EdsClusterConfig clusterConfig = new EdsClusterConfig("round_robin", edsServiceName, - lrsServerName, maxConcurrentRequests, tlsContext); - CdsUpdate update = new CdsUpdate(clusterName, ClusterType.EDS, clusterConfig); + CdsUpdate update = CdsUpdate.forEds(clusterName, "round_robin", -1, -1, null, + edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext); watchers.get(clusterName).onChanged(update); } } @@ -558,17 +553,16 @@ private void deliverEdsCluster(String clusterName, @Nullable String edsServiceNa private void deliverLogicalDnsCluster(String clusterName, @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) { if (watchers.containsKey(clusterName)) { - LogicalDnsClusterConfig clusterConfig = new LogicalDnsClusterConfig("round_robin", + CdsUpdate update = CdsUpdate.forLogicalDns(clusterName, "round_robin", -1, -1, null, lrsServerName, maxConcurrentRequests, tlsContext); - CdsUpdate update = new CdsUpdate(clusterName, ClusterType.LOGICAL_DNS, clusterConfig); watchers.get(clusterName).onChanged(update); } } private void deliverAggregateCluster(String clusterName, List clusters) { if (watchers.containsKey(clusterName)) { - AggregateClusterConfig clusterConfig = new AggregateClusterConfig("round_robin", clusters); - CdsUpdate update = new CdsUpdate(clusterName, ClusterType.AGGREGATE, clusterConfig); + CdsUpdate update = CdsUpdate.forAggregate(clusterName, "round_robin", -1, -1, null, + clusters); watchers.get(clusterName).onChanged(update); } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index c47494f73e5..5d17fef37e3 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -49,13 +49,10 @@ import io.grpc.xds.EnvoyProtoData.Locality; import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; import io.grpc.xds.EnvoyProtoData.Node; -import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsUpdate; -import io.grpc.xds.XdsClient.CdsUpdate.AggregateClusterConfig; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; -import io.grpc.xds.XdsClient.CdsUpdate.EdsClusterConfig; -import io.grpc.xds.XdsClient.CdsUpdate.LogicalDnsClusterConfig; +import io.grpc.xds.XdsClient.CdsUpdate.HashFunction; import io.grpc.xds.XdsClient.EdsResourceWatcher; import io.grpc.xds.XdsClient.EdsUpdate; import io.grpc.xds.XdsClient.LdsResourceWatcher; @@ -598,8 +595,10 @@ public void cdsResourceNotFound() { startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster("cluster-bar.googleapis.com", null, false, null, null)), - Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, false, null, null))); + Any.pack(mf.buildEdsCluster("cluster-bar.googleapis.com", null, "round_robin", null, + false, null, null)), + Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, "round_robin", null, + false, null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. @@ -617,7 +616,7 @@ public void cdsResourceFound() { DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. @@ -627,12 +626,39 @@ public void cdsResourceFound() { CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(clusterConfig.edsServiceName).isNull(); - assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(clusterConfig.lrsServerName).isNull(); - assertThat(clusterConfig.maxConcurrentRequests).isNull(); - assertThat(clusterConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.edsServiceName).isNull(); + assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests).isNull(); + assertThat(cdsUpdate.upstreamTlsContext).isNull(); + assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + } + + @Test + public void cdsResourceFound_ringHashLbPolicy() { + DiscoveryRpcCall call = + startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); + Message ringHashConfig = mf.buildRingHashLbConfig("xx_hash", 10L, 100L); + List clusters = ImmutableList.of( + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "ring_hash", ringHashConfig, false, null, + null))); + call.sendResponse("0", clusters, ResourceType.CDS, "0000"); + + // Client sent an ACK CDS request. + call.verifyRequest(NODE, "0", Collections.singletonList(CDS_RESOURCE), ResourceType.CDS, + "0000"); + verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName).isNull(); + assertThat(cdsUpdate.lbPolicy).isEqualTo("ring_hash"); + assertThat(cdsUpdate.hashFunction).isEqualTo(HashFunction.XX_HASH); + assertThat(cdsUpdate.minRingSize).isEqualTo(10L); + assertThat(cdsUpdate.maxRingSize).isEqualTo(100L); + assertThat(cdsUpdate.lrsServerName).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests).isNull(); + assertThat(cdsUpdate.upstreamTlsContext).isNull(); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } @@ -643,7 +669,7 @@ public void cdsResponseWithAggregateCluster() { List candidates = Arrays.asList( "cluster1.googleapis.com", "cluster2.googleapis.com", "cluster3.googleapis.com"); List clusters = ImmutableList.of( - Any.pack(mf.buildAggregateCluster(CDS_RESOURCE, candidates))); + Any.pack(mf.buildAggregateCluster(CDS_RESOURCE, "round_robin", null, candidates))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. @@ -653,10 +679,8 @@ public void cdsResponseWithAggregateCluster() { CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.AGGREGATE); - AggregateClusterConfig clusterConfig = (AggregateClusterConfig) cdsUpdate.clusterConfig; - assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(clusterConfig.prioritizedClusterNames) - .containsExactlyElementsIn(candidates).inOrder(); + assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); + assertThat(cdsUpdate.prioritizedClusterNames).containsExactlyElementsIn(candidates).inOrder(); } @Test @@ -664,7 +688,7 @@ public void cdsResponseWithCircuitBreakers() { DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, mf.buildCircuitBreakers(50, 200)))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); @@ -675,12 +699,11 @@ public void cdsResponseWithCircuitBreakers() { CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(clusterConfig.edsServiceName).isNull(); - assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(clusterConfig.lrsServerName).isNull(); - assertThat(clusterConfig.maxConcurrentRequests).isEqualTo(200L); - assertThat(clusterConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.edsServiceName).isNull(); + assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests).isEqualTo(200L); + assertThat(cdsUpdate.upstreamTlsContext).isNull(); } /** @@ -693,10 +716,13 @@ public void cdsResponseWithUpstreamTlsContext() { // Management server sends back CDS response with UpstreamTlsContext. List clusters = ImmutableList.of( - Any.pack(mf.buildLogicalDnsCluster("cluster-bar.googleapis.com", false, null, null)), - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, "eds-cluster-foo.googleapis.com", true, + Any.pack(mf.buildLogicalDnsCluster("cluster-bar.googleapis.com", "round_robin", null, + false, null, null)), + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, "eds-cluster-foo.googleapis.com", "round_robin", + null, true, mf.buildUpstreamTlsContext("secret1", "unix:/var/uds2"), null)), - Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, false, null, null))); + Any.pack(mf.buildEdsCluster("cluster-baz.googleapis.com", null, "round_robin", null, false, + null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sent an ACK CDS request. @@ -704,10 +730,8 @@ public void cdsResponseWithUpstreamTlsContext() { "0000"); verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - UpstreamTlsContext upstreamTlsContext = - ((EdsClusterConfig) cdsUpdate.clusterConfig).upstreamTlsContext; - SdsSecretConfig validationContextSdsSecretConfig = upstreamTlsContext.getCommonTlsContext() - .getValidationContextSdsSecretConfig(); + SdsSecretConfig validationContextSdsSecretConfig = + cdsUpdate.upstreamTlsContext.getCommonTlsContext().getValidationContextSdsSecretConfig(); assertThat(validationContextSdsSecretConfig.getName()).isEqualTo("secret1"); assertThat( Iterables.getOnlyElement( @@ -725,7 +749,7 @@ public void cachedCdsResource_data() { DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. @@ -738,12 +762,11 @@ public void cachedCdsResource_data() { CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(clusterConfig.edsServiceName).isNull(); - assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(clusterConfig.lrsServerName).isNull(); - assertThat(clusterConfig.maxConcurrentRequests).isNull(); - assertThat(clusterConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.edsServiceName).isNull(); + assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests).isNull(); + assertThat(cdsUpdate.upstreamTlsContext).isNull(); call.verifyNoMoreRequest(); } @@ -764,7 +787,7 @@ public void cdsResourceUpdated() { DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, false, null, null))); + Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, "round_robin", null, false, null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. @@ -774,15 +797,15 @@ public void cdsResourceUpdated() { CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.LOGICAL_DNS); - LogicalDnsClusterConfig dnsConfig = (LogicalDnsClusterConfig) cdsUpdate.clusterConfig; - assertThat(dnsConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(dnsConfig.lrsServerName).isNull(); - assertThat(dnsConfig.maxConcurrentRequests).isNull(); - assertThat(dnsConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests).isNull(); + assertThat(cdsUpdate.upstreamTlsContext).isNull(); String edsService = "eds-service-bar.googleapis.com"; clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, edsService, true, null, null))); + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, edsService, "round_robin", null, true, null, + null))); call.sendResponse("1", clusters, ResourceType.CDS, "0001"); // Client sends an ACK CDS request. @@ -792,12 +815,11 @@ public void cdsResourceUpdated() { cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig edsConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(edsConfig.edsServiceName).isEqualTo(edsService); - assertThat(edsConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(edsConfig.lrsServerName).isEqualTo(""); - assertThat(edsConfig.maxConcurrentRequests).isNull(); - assertThat(edsConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.edsServiceName).isEqualTo(edsService); + assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName).isEqualTo(""); + assertThat(cdsUpdate.maxConcurrentRequests).isNull(); + assertThat(cdsUpdate.upstreamTlsContext).isNull(); } @Test @@ -805,7 +827,7 @@ public void cdsResourceDeleted() { DiscoveryRpcCall call = startResourceWatcher(ResourceType.CDS, CDS_RESOURCE, cdsResourceWatcher); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); // Client sends an ACK CDS request. @@ -815,12 +837,11 @@ public void cdsResourceDeleted() { CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(clusterConfig.edsServiceName).isNull(); - assertThat(clusterConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(clusterConfig.lrsServerName).isNull(); - assertThat(clusterConfig.maxConcurrentRequests).isNull(); - assertThat(clusterConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.edsServiceName).isNull(); + assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests).isNull(); + assertThat(cdsUpdate.upstreamTlsContext).isNull(); call.sendResponse("1", Collections.emptyList(), ResourceType.CDS, "0001"); @@ -848,38 +869,36 @@ public void multipleCdsWatchers() { String edsService = "eds-service-bar.googleapis.com"; List clusters = ImmutableList.of( - Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, false, null, null)), - Any.pack(mf.buildEdsCluster(cdsResource, edsService, true, null, null))); + Any.pack(mf.buildLogicalDnsCluster(CDS_RESOURCE, "round_robin", null, false, null, null)), + Any.pack(mf.buildEdsCluster(cdsResource, edsService, "round_robin", null, true, null, + null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.LOGICAL_DNS); - LogicalDnsClusterConfig dnsConfig = (LogicalDnsClusterConfig) cdsUpdate.clusterConfig; - assertThat(dnsConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(dnsConfig.lrsServerName).isNull(); - assertThat(dnsConfig.maxConcurrentRequests).isNull(); - assertThat(dnsConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests).isNull(); + assertThat(cdsUpdate.upstreamTlsContext).isNull(); verify(watcher1).onChanged(cdsUpdateCaptor.capture()); cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName).isEqualTo(cdsResource); assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - EdsClusterConfig edsConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(edsConfig.edsServiceName).isEqualTo(edsService); - assertThat(edsConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(edsConfig.lrsServerName).isEqualTo(""); - assertThat(edsConfig.maxConcurrentRequests).isNull(); - assertThat(edsConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.edsServiceName).isEqualTo(edsService); + assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName).isEqualTo(""); + assertThat(cdsUpdate.maxConcurrentRequests).isNull(); + assertThat(cdsUpdate.upstreamTlsContext).isNull(); verify(watcher2).onChanged(cdsUpdateCaptor.capture()); cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName).isEqualTo(cdsResource); assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - edsConfig = (EdsClusterConfig) cdsUpdate.clusterConfig; - assertThat(edsConfig.edsServiceName).isEqualTo(edsService); - assertThat(edsConfig.lbPolicy).isEqualTo("round_robin"); - assertThat(edsConfig.lrsServerName).isEqualTo(""); - assertThat(edsConfig.maxConcurrentRequests).isNull(); - assertThat(edsConfig.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.edsServiceName).isEqualTo(edsService); + assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName).isEqualTo(""); + assertThat(cdsUpdate.maxConcurrentRequests).isNull(); + assertThat(cdsUpdate.upstreamTlsContext).isNull(); } @Test @@ -1095,17 +1114,18 @@ public void edsResourceDeletedByCds() { xdsClient.watchEdsResource(EDS_RESOURCE, edsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); List clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(resource, null, true, null, null)), - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, EDS_RESOURCE, false, null, null))); + Any.pack(mf.buildEdsCluster(resource, null, "round_robin", null, true, null, null)), + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, EDS_RESOURCE, "round_robin", null, false, null, + null))); call.sendResponse("0", clusters, ResourceType.CDS, "0000"); verify(cdsWatcher).onChanged(cdsUpdateCaptor.capture()); - EdsClusterConfig clusterConfig = (EdsClusterConfig) cdsUpdateCaptor.getValue().clusterConfig; - assertThat(clusterConfig.edsServiceName).isEqualTo(null); - assertThat(clusterConfig.lrsServerName).isEqualTo(""); + CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.edsServiceName).isEqualTo(null); + assertThat(cdsUpdate.lrsServerName).isEqualTo(""); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); - clusterConfig = (EdsClusterConfig) cdsUpdateCaptor.getValue().clusterConfig; - assertThat(clusterConfig.edsServiceName).isEqualTo(EDS_RESOURCE); - assertThat(clusterConfig.lrsServerName).isNull(); + cdsUpdate = cdsUpdateCaptor.getValue(); + assertThat(cdsUpdate.edsServiceName).isEqualTo(EDS_RESOURCE); + assertThat(cdsUpdate.lrsServerName).isNull(); List clusterLoadAssignments = ImmutableList.of( @@ -1135,12 +1155,12 @@ public void edsResourceDeletedByCds() { assertThat(edsUpdateCaptor.getValue().clusterName).isEqualTo(EDS_RESOURCE); clusters = ImmutableList.of( - Any.pack(mf.buildEdsCluster(resource, null, true, null, null)), // no change - Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, false, null, null))); + Any.pack(mf.buildEdsCluster(resource, null, "round_robin", null, true, null, + null)), // no change + Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null))); call.sendResponse("1", clusters, ResourceType.CDS, "0001"); verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); - clusterConfig = (EdsClusterConfig) cdsUpdateCaptor.getValue().clusterConfig; - assertThat(clusterConfig.edsServiceName).isNull(); + assertThat(cdsUpdateCaptor.getValue().edsServiceName).isNull(); verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); verifyNoMoreInteractions(cdsWatcher, edsWatcher); } @@ -1532,13 +1552,18 @@ protected abstract Message buildVirtualHost( protected abstract List buildOpaqueRoutes(int num); protected abstract Message buildEdsCluster(String clusterName, @Nullable String edsServiceName, - boolean enableLrs, @Nullable Message upstreamTlsContext, - @Nullable Message circuitBreakers); + String lbPolicy, @Nullable Message ringHashLbConfig, boolean enableLrs, + @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers); - protected abstract Message buildLogicalDnsCluster(String clusterName, boolean enableLrs, + protected abstract Message buildLogicalDnsCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, boolean enableLrs, @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers); - protected abstract Message buildAggregateCluster(String clusterName, List clusters); + protected abstract Message buildAggregateCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, List clusters); + + protected abstract Message buildRingHashLbConfig(String hashFunction, long minRingSize, + long maxRingSize); protected abstract Message buildUpstreamTlsContext(String secretName, String targetUri); diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java index 341bac7a720..424af04ccc9 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV2Test.java @@ -27,12 +27,15 @@ import com.google.protobuf.Any; import com.google.protobuf.Message; import com.google.protobuf.UInt32Value; +import com.google.protobuf.UInt64Value; import com.google.protobuf.util.Durations; import io.envoyproxy.envoy.api.v2.Cluster; import io.envoyproxy.envoy.api.v2.Cluster.CustomClusterType; import io.envoyproxy.envoy.api.v2.Cluster.DiscoveryType; import io.envoyproxy.envoy.api.v2.Cluster.EdsClusterConfig; import io.envoyproxy.envoy.api.v2.Cluster.LbPolicy; +import io.envoyproxy.envoy.api.v2.Cluster.RingHashLbConfig; +import io.envoyproxy.envoy.api.v2.Cluster.RingHashLbConfig.HashFunction; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy; import io.envoyproxy.envoy.api.v2.ClusterLoadAssignment.Policy.DropOverload; @@ -384,10 +387,10 @@ protected List buildOpaqueRoutes(int num) { @Override protected Message buildEdsCluster(String clusterName, @Nullable String edsServiceName, - boolean enableLrs, @Nullable Message upstreamTlsContext, - @Nullable Message circuitBreakers) { - Cluster.Builder builder = - initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); + String lbPolicy, @Nullable Message ringHashLbConfig, boolean enableLrs, + @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { + Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig, + enableLrs, upstreamTlsContext, circuitBreakers); builder.setType(DiscoveryType.EDS); EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); edsClusterConfigBuilder.setEdsConfig( @@ -400,34 +403,49 @@ protected Message buildEdsCluster(String clusterName, @Nullable String edsServic } @Override - protected Message buildLogicalDnsCluster(String clusterName, boolean enableLrs, + protected Message buildLogicalDnsCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, boolean enableLrs, @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { - Cluster.Builder builder = - initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); + Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig, + enableLrs, upstreamTlsContext, circuitBreakers); builder.setType(DiscoveryType.LOGICAL_DNS); return builder.build(); } @Override - protected Message buildAggregateCluster(String clusterName, List clusters) { + protected Message buildAggregateCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, List clusters) { ClusterConfig clusterConfig = ClusterConfig.newBuilder().addAllClusters(clusters).build(); CustomClusterType type = CustomClusterType.newBuilder() .setName(ClientXdsClient.AGGREGATE_CLUSTER_TYPE_NAME) .setTypedConfig(Any.pack(clusterConfig)) .build(); - return Cluster.newBuilder() - .setName(clusterName) - .setLbPolicy(LbPolicy.ROUND_ROBIN) - .setClusterType(type) - .build(); + Cluster.Builder builder = Cluster.newBuilder().setName(clusterName).setClusterType(type); + if (lbPolicy.equals("round_robin")) { + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + } else if (lbPolicy.equals("ring_hash")) { + builder.setLbPolicy(LbPolicy.RING_HASH); + builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig); + } else { + throw new AssertionError("Invalid LB policy"); + } + return builder.build(); } - private Cluster.Builder initClusterBuilder(String clusterName, boolean enableLrs, + private Cluster.Builder initClusterBuilder(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, boolean enableLrs, @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { Cluster.Builder builder = Cluster.newBuilder(); builder.setName(clusterName); - builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + if (lbPolicy.equals("round_robin")) { + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + } else if (lbPolicy.equals("ring_hash")) { + builder.setLbPolicy(LbPolicy.RING_HASH); + builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig); + } else { + throw new AssertionError("Invalid LB policy"); + } if (enableLrs) { builder.setLrsServer( ConfigSource.newBuilder() @@ -445,6 +463,22 @@ private Cluster.Builder initClusterBuilder(String clusterName, boolean enableLrs return builder; } + @Override + protected Message buildRingHashLbConfig(String hashFunction, long minRingSize, + long maxRingSize) { + RingHashLbConfig.Builder builder = RingHashLbConfig.newBuilder(); + if (hashFunction.equals("xx_hash")) { + builder.setHashFunction(HashFunction.XX_HASH); + } else if (hashFunction.equals("murmur_hash_2")) { + builder.setHashFunction(HashFunction.MURMUR_HASH_2); + } else { + throw new AssertionError("Invalid hash function"); + } + builder.setMinimumRingSize(UInt64Value.newBuilder().setValue(minRingSize).build()); + builder.setMaximumRingSize(UInt64Value.newBuilder().setValue(maxRingSize).build()); + return builder.build(); + } + @Override protected Message buildUpstreamTlsContext(String secretName, String targetUri) { GrpcService grpcService = diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java index dca11b39846..66cf8be5fb8 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientV3Test.java @@ -27,6 +27,7 @@ import com.google.protobuf.Any; import com.google.protobuf.Message; import com.google.protobuf.UInt32Value; +import com.google.protobuf.UInt64Value; import com.google.protobuf.util.Durations; import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers; import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; @@ -35,6 +36,8 @@ import io.envoyproxy.envoy.config.cluster.v3.Cluster.DiscoveryType; import io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig.HashFunction; import io.envoyproxy.envoy.config.core.v3.Address; import io.envoyproxy.envoy.config.core.v3.AggregatedConfigSource; import io.envoyproxy.envoy.config.core.v3.ApiConfigSource; @@ -382,10 +385,10 @@ protected List buildOpaqueRoutes(int num) { @Override protected Message buildEdsCluster(String clusterName, @Nullable String edsServiceName, - boolean enableLrs, @Nullable Message upstreamTlsContext, - @Nullable Message circuitBreakers) { - Cluster.Builder builder = - initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); + String lbPolicy, @Nullable Message ringHashLbConfig, boolean enableLrs, + @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { + Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig, + enableLrs, upstreamTlsContext, circuitBreakers); builder.setType(DiscoveryType.EDS); EdsClusterConfig.Builder edsClusterConfigBuilder = EdsClusterConfig.newBuilder(); edsClusterConfigBuilder.setEdsConfig( @@ -398,34 +401,49 @@ protected Message buildEdsCluster(String clusterName, @Nullable String edsServic } @Override - protected Message buildLogicalDnsCluster(String clusterName, boolean enableLrs, + protected Message buildLogicalDnsCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, boolean enableLrs, @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { - Cluster.Builder builder = - initClusterBuilder(clusterName, enableLrs, upstreamTlsContext, circuitBreakers); + Cluster.Builder builder = initClusterBuilder(clusterName, lbPolicy, ringHashLbConfig, + enableLrs, upstreamTlsContext, circuitBreakers); builder.setType(DiscoveryType.LOGICAL_DNS); return builder.build(); } @Override - protected Message buildAggregateCluster(String clusterName, List clusters) { + protected Message buildAggregateCluster(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, List clusters) { ClusterConfig clusterConfig = ClusterConfig.newBuilder().addAllClusters(clusters).build(); CustomClusterType type = CustomClusterType.newBuilder() .setName(ClientXdsClient.AGGREGATE_CLUSTER_TYPE_NAME) .setTypedConfig(Any.pack(clusterConfig)) .build(); - return Cluster.newBuilder() - .setName(clusterName) - .setLbPolicy(LbPolicy.ROUND_ROBIN) - .setClusterType(type) - .build(); + Cluster.Builder builder = Cluster.newBuilder().setName(clusterName).setClusterType(type); + if (lbPolicy.equals("round_robin")) { + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + } else if (lbPolicy.equals("ring_hash")) { + builder.setLbPolicy(LbPolicy.RING_HASH); + builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig); + } else { + throw new AssertionError("Invalid LB policy"); + } + return builder.build(); } - private Cluster.Builder initClusterBuilder(String clusterName, boolean enableLrs, + private Cluster.Builder initClusterBuilder(String clusterName, String lbPolicy, + @Nullable Message ringHashLbConfig, boolean enableLrs, @Nullable Message upstreamTlsContext, @Nullable Message circuitBreakers) { Cluster.Builder builder = Cluster.newBuilder(); builder.setName(clusterName); - builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + if (lbPolicy.equals("round_robin")) { + builder.setLbPolicy(LbPolicy.ROUND_ROBIN); + } else if (lbPolicy.equals("ring_hash")) { + builder.setLbPolicy(LbPolicy.RING_HASH); + builder.setRingHashLbConfig((RingHashLbConfig) ringHashLbConfig); + } else { + throw new AssertionError("Invalid LB policy"); + } if (enableLrs) { builder.setLrsServer( ConfigSource.newBuilder() @@ -443,6 +461,22 @@ private Cluster.Builder initClusterBuilder(String clusterName, boolean enableLrs return builder; } + @Override + protected Message buildRingHashLbConfig(String hashFunction, long minRingSize, + long maxRingSize) { + RingHashLbConfig.Builder builder = RingHashLbConfig.newBuilder(); + if (hashFunction.equals("xx_hash")) { + builder.setHashFunction(HashFunction.XX_HASH); + } else if (hashFunction.equals("murmur_hash_2")) { + builder.setHashFunction(HashFunction.MURMUR_HASH_2); + } else { + throw new AssertionError("Invalid hash function"); + } + builder.setMinimumRingSize(UInt64Value.newBuilder().setValue(minRingSize).build()); + builder.setMaximumRingSize(UInt64Value.newBuilder().setValue(maxRingSize).build()); + return builder.build(); + } + @Override protected Message buildUpstreamTlsContext(String secretName, String targetUri) { GrpcService grpcService = From b5dfed2ef5c11718e9f4b541020e9493bb83ed3d Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Mon, 1 Feb 2021 15:14:31 -0800 Subject: [PATCH 2/5] Add per-route hashing policy configuration. --- .../main/java/io/grpc/xds/EnvoyProtoData.java | 139 +++++++++++++++++- .../java/io/grpc/xds/EnvoyProtoDataTest.java | 3 +- .../java/io/grpc/xds/XdsNameResolverTest.java | 54 ++++--- 3 files changed, 172 insertions(+), 24 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java b/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java index c1b089ce8f9..4ba58a670b0 100644 --- a/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java +++ b/xds/src/main/java/io/grpc/xds/EnvoyProtoData.java @@ -1440,6 +1440,8 @@ private static int getRatePerMillion(FractionalPercent percent) { static final class RouteAction { @Nullable private final Long timeoutNano; + // List of hash policies to use for ring hash load balancing. + private final List hashPolicies; // Exactly one of the following fields is non-null. @Nullable private final String cluster; @@ -1447,9 +1449,11 @@ static final class RouteAction { private final List weightedClusters; @VisibleForTesting - RouteAction(@Nullable Long timeoutNano, @Nullable String cluster, - @Nullable List weightedClusters) { + RouteAction(@Nullable Long timeoutNano, List hashPolicies, + @Nullable String cluster, @Nullable List weightedClusters) { this.timeoutNano = timeoutNano; + this.hashPolicies = Collections.unmodifiableList(new ArrayList<>( + checkNotNull(hashPolicies, "hashPolicies"))); this.cluster = cluster; this.weightedClusters = weightedClusters; } @@ -1459,6 +1463,10 @@ Long getTimeoutNano() { return timeoutNano; } + List getHashPolicies() { + return hashPolicies; + } + @Nullable String getCluster() { return cluster; @@ -1479,18 +1487,20 @@ public boolean equals(Object o) { } RouteAction that = (RouteAction) o; return Objects.equals(timeoutNano, that.timeoutNano) + && Objects.equals(hashPolicies, that.hashPolicies) && Objects.equals(cluster, that.cluster) && Objects.equals(weightedClusters, that.weightedClusters); } @Override public int hashCode() { - return Objects.hash(timeoutNano, cluster, weightedClusters); + return Objects.hash(timeoutNano, hashPolicies, cluster, weightedClusters); } @Override public String toString() { - ToStringHelper toStringHelper = MoreObjects.toStringHelper(this); + ToStringHelper toStringHelper = MoreObjects.toStringHelper(this) + .add("hashPolicies", hashPolicies); if (timeoutNano != null) { toStringHelper.add("timeout", timeoutNano + "ns"); } @@ -1549,7 +1559,126 @@ static StructOrError fromEnvoyProtoRouteAction( timeoutNano = Durations.toNanos(maxStreamDuration.getMaxStreamDuration()); } } - return StructOrError.fromStruct(new RouteAction(timeoutNano, cluster, weightedClusters)); + List hashPolicies = new ArrayList<>(); + for (io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy config + : proto.getHashPolicyList()) { + HashPolicy policy = null; + boolean terminal = config.getTerminal(); + switch (config.getPolicySpecifierCase()) { + case HEADER: + io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.Header headerCfg = + config.getHeader(); + Pattern regEx = null; + String regExSubstitute = null; + if (headerCfg.hasRegexRewrite() && headerCfg.getRegexRewrite().hasPattern() + && headerCfg.getRegexRewrite().getPattern().hasGoogleRe2()) { + regEx = Pattern.compile(headerCfg.getRegexRewrite().getPattern().getRegex()); + regExSubstitute = headerCfg.getRegexRewrite().getSubstitution(); + } + policy = HashPolicy.forHeader( + terminal, headerCfg.getHeaderName(), regEx, regExSubstitute); + break; + case CONNECTION_PROPERTIES: + if (config.getConnectionProperties().getSourceIp()) { + policy = HashPolicy.forSourceIp(terminal); + } + break; + case FILTER_STATE: + if (config.getFilterState().getKey().equals("io.grpc.channel_id")) { + policy = HashPolicy.forChannelId(terminal); + } + break; + default: + // Ignore + } + hashPolicies.add(policy); + } + return StructOrError.fromStruct(new RouteAction( + timeoutNano, hashPolicies, cluster, weightedClusters)); + } + } + + // Configuration for the route's hashing policy if the upstream cluster uses a hashing load + // balancer. + static final class HashPolicy { + // The specifier that indicates the component of the request to be hashed on. + private final Type type; + // The flag that short-circuits the hash computing. + private final boolean terminal; + // The name of the request header that will be used to obtain the hash key. + // Only valid if type is HEADER. + @Nullable + private final String headerName; + // The regular expression used to find portions to be replaced in the header value. + // Only valid if type is HEADER. + @Nullable + private final Pattern regEx; + // The string that should be substituted into matching portions of the header value. + // Only valid if type is HEADER. + @Nullable + private final String regExSubstitution; + + private HashPolicy(Type type, boolean terminal, @Nullable String headerName, + @Nullable Pattern regEx, @Nullable String regExSubstitution) { + this.type = checkNotNull(type, "type"); + this.terminal = terminal; + this.headerName = headerName; + this.regEx = regEx; + this.regExSubstitution = regExSubstitution; + } + + static HashPolicy forHeader(boolean terminal, String headerName, @Nullable Pattern regEx, + @Nullable String regExSubstitution) { + return new HashPolicy(Type.HEADER, terminal, checkNotNull(headerName, "headerName"), + regEx, regExSubstitution); + } + + static HashPolicy forSourceIp(boolean terminal) { + return new HashPolicy(Type.SOURCE_IP, terminal, null, null, null); + } + + static HashPolicy forChannelId(boolean terminal) { + return new HashPolicy(Type.CHANNEL_ID, terminal, null, null, null); + } + + @Override + public int hashCode() { + return Objects.hash(type, terminal, headerName, regEx, regExSubstitution); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + HashPolicy that = (HashPolicy) o; + return type == that.type + && terminal == that.terminal + && Objects.equals(headerName, that.headerName) + && Objects.equals( + regEx == null ? null : regEx.pattern(), + that.regEx == null ? null : that.regEx.pattern()) + && Objects.equals(regExSubstitution, that.regExSubstitution); + } + + @Override + public String toString() { + ToStringHelper toStringHelper = MoreObjects.toStringHelper(this) + .add("type", type) + .add("terminal", terminal) + .add("headerName", headerName) + .add("regExSubstitution", regExSubstitution); + if (regEx != null) { + toStringHelper.add("regEx", regEx.pattern()); + } + return toStringHelper.toString(); + } + + enum Type { + HEADER, SOURCE_IP, CHANNEL_ID } } diff --git a/xds/src/test/java/io/grpc/xds/EnvoyProtoDataTest.java b/xds/src/test/java/io/grpc/xds/EnvoyProtoDataTest.java index 34a2055d1b2..6d1aaa9c46a 100644 --- a/xds/src/test/java/io/grpc/xds/EnvoyProtoDataTest.java +++ b/xds/src/test/java/io/grpc/xds/EnvoyProtoDataTest.java @@ -39,6 +39,7 @@ import io.grpc.xds.EnvoyProtoData.ClusterStats.DroppedRequests; import io.grpc.xds.EnvoyProtoData.ClusterWeight; import io.grpc.xds.EnvoyProtoData.EndpointLoadMetricStats; +import io.grpc.xds.EnvoyProtoData.HashPolicy; import io.grpc.xds.EnvoyProtoData.Locality; import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.EnvoyProtoData.Route; @@ -213,7 +214,7 @@ public void convertRoute() { new Route( new RouteMatch(PathMatcher.fromPath("/service/method", false), Collections.emptyList(), null), - new RouteAction(null, "cluster-foo", null))); + new RouteAction(null, Collections.emptyList(), "cluster-foo", null))); io.envoyproxy.envoy.config.route.v3.Route unsupportedProto = io.envoyproxy.envoy.config.route.v3.Route.newBuilder() diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index 219a4aee5f4..45cca9e06ee 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -50,6 +50,7 @@ import io.grpc.internal.PickSubchannelArgsImpl; import io.grpc.testing.TestMethodDescriptors; import io.grpc.xds.EnvoyProtoData.ClusterWeight; +import io.grpc.xds.EnvoyProtoData.HashPolicy; import io.grpc.xds.EnvoyProtoData.Route; import io.grpc.xds.EnvoyProtoData.RouteAction; import io.grpc.xds.EnvoyProtoData.VirtualHost; @@ -233,9 +234,11 @@ public void resolving_matchingVirtualHostNotFoundInRdsResource() { private List buildUnmatchedVirtualHosts() { Route route1 = new Route(RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)); + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + cluster2, null)); Route route2 = new Route(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster1, null)); + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + cluster1, null)); return Arrays.asList( new VirtualHost("virtualhost-foo", Collections.singletonList("hello.googleapis.com"), Collections.singletonList(route1)), @@ -248,7 +251,8 @@ public void resolved_noTimeout() { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); Route route = new Route(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - new RouteAction(null, cluster1, null)); // per-route timeout unset + new RouteAction(null, Collections.emptyList(), cluster1, + null)); // per-route timeout unset VirtualHost virtualHost = new VirtualHost("does not matter", Collections.singletonList(AUTHORITY), Collections.singletonList(route)); xdsClient.deliverLdsUpdate(AUTHORITY, 0L, Collections.singletonList(virtualHost)); @@ -263,7 +267,8 @@ public void resolved_fallbackToHttpMaxStreamDurationAsTimeout() { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); Route route = new Route(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - new RouteAction(null, cluster1, null)); // per-route timeout unset + new RouteAction(null, Collections.emptyList(), cluster1, + null)); // per-route timeout unset VirtualHost virtualHost = new VirtualHost("does not matter", Collections.singletonList(AUTHORITY), Collections.singletonList(route)); xdsClient.deliverLdsUpdate(AUTHORITY, TimeUnit.SECONDS.toNanos(5L), @@ -309,10 +314,12 @@ public void resolved_resourceUpdateAfterCallStarted() { Arrays.asList( new Route( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(20L), "another-cluster", null)), + new RouteAction(TimeUnit.SECONDS.toNanos(20L), Collections.emptyList(), + "another-cluster", null)), new Route( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + cluster2, null)))); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); // Updated service config still contains cluster1 while it is removed resource. New calls no @@ -344,10 +351,12 @@ public void resolved_resourceUpdatedBeforeCallStarted() { Arrays.asList( new Route( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(20L), "another-cluster", null)), + new RouteAction(TimeUnit.SECONDS.toNanos(20L), Collections.emptyList(), + "another-cluster", null)), new Route( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + cluster2, null)))); // Two consecutive service config updates: one for removing clcuster1, // one for adding "another=cluster". verify(mockListener, times(2)).onResult(resolutionResultCaptor.capture()); @@ -375,10 +384,12 @@ public void resolved_raceBetweenCallAndRepeatedResourceUpdate() { Arrays.asList( new Route( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(20L), "another-cluster", null)), + new RouteAction(TimeUnit.SECONDS.toNanos(20L), Collections.emptyList(), + "another-cluster", null)), new Route( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + cluster2, null)))); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); @@ -391,10 +402,12 @@ public void resolved_raceBetweenCallAndRepeatedResourceUpdate() { Arrays.asList( new Route( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), "another-cluster", null)), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + "another-cluster", null)), new Route( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + cluster2, null)))); verifyNoMoreInteractions(mockListener); // no cluster added/deleted assertCallSelectResult(call1, configSelector, "another-cluster", 15.0); } @@ -409,16 +422,19 @@ public void resolved_raceBetweenClusterReleasedAndResourceUpdateAddBackAgain() { Collections.singletonList( new Route( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + cluster2, null)))); xdsClient.deliverLdsUpdate( AUTHORITY, Arrays.asList( new Route( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster1, null)), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + cluster1, null)), new Route( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + cluster2, null)))); testCall.deliverErrorStatus(); verifyNoMoreInteractions(mockListener); } @@ -435,7 +451,7 @@ public void resolved_simpleCallSucceeds_routeToWeightedCluster() { new Route( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), new RouteAction( - TimeUnit.SECONDS.toNanos(20L), null, + TimeUnit.SECONDS.toNanos(20L), Collections.emptyList(), null, Arrays.asList( new ClusterWeight(cluster1, 20, null), new ClusterWeight(cluster2, 80, null)))))); @@ -495,10 +511,12 @@ private InternalConfigSelector resolveToClusters() { Arrays.asList( new Route( RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster1, null)), + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + cluster1, null)), new Route( RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()), - new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null)))); + new RouteAction(TimeUnit.SECONDS.toNanos(15L), Collections.emptyList(), + cluster2, null)))); verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); assertThat(result.getAddresses()).isEmpty(); From 70e656492af5a5cd11d4e5610b0b0fdb33236795 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 9 Feb 2021 11:31:44 -0800 Subject: [PATCH 3/5] Add tests for parsing RouteAction. --- .../java/io/grpc/xds/ClientXdsClient.java | 3 +- .../io/grpc/xds/ClientXdsClientDataTest.java | 53 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index fb7eae972ae..18ff5f16e5f 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -96,7 +96,8 @@ final class ClientXdsClient extends AbstractXdsClient { @VisibleForTesting static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate"; private static final String HTTP_FAULT_FILTER_NAME = "envoy.fault"; - private static final String HASH_POLICY_FILTER_STATE_KEY = "io.grpc.channel_id"; + @VisibleForTesting + static final String HASH_POLICY_FILTER_STATE_KEY = "io.grpc.channel_id"; private static final String TYPE_URL_HTTP_CONNECTION_MANAGER_V2 = "type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2" + ".HttpConnectionManager"; diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java index ec663b94aa0..834d911f4d1 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java @@ -29,10 +29,16 @@ import io.envoyproxy.envoy.config.route.v3.DirectResponseAction; import io.envoyproxy.envoy.config.route.v3.FilterAction; import io.envoyproxy.envoy.config.route.v3.RedirectAction; +import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.ConnectionProperties; +import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.FilterState; +import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.Header; +import io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.QueryParameter; import io.envoyproxy.envoy.config.route.v3.RouteAction.MaxStreamDuration; import io.envoyproxy.envoy.config.route.v3.WeightedCluster; import io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort.HeaderAbort; +import io.envoyproxy.envoy.type.matcher.v3.RegexMatchAndSubstitute; import io.envoyproxy.envoy.type.matcher.v3.RegexMatcher; +import io.envoyproxy.envoy.type.matcher.v3.RegexMatcher.GoogleRE2; import io.envoyproxy.envoy.type.v3.FractionalPercent; import io.envoyproxy.envoy.type.v3.FractionalPercent.DenominatorType; import io.envoyproxy.envoy.type.v3.Int64Range; @@ -51,6 +57,7 @@ import io.grpc.xds.VirtualHost.Route.RouteMatch; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; @@ -392,6 +399,52 @@ public void parseRouteAction_withTimeoutUnset() { assertThat(struct.getStruct().timeoutNano()).isNull(); } + @Test + public void parseRouteAction_withHashPolicies() { + io.envoyproxy.envoy.config.route.v3.RouteAction proto = + io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder() + .setCluster("cluster-foo") + .addHashPolicy( + io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder() + .setHeader( + Header.newBuilder() + .setHeaderName("user-agent") + .setRegexRewrite( + RegexMatchAndSubstitute.newBuilder() + .setPattern( + RegexMatcher.newBuilder() + .setGoogleRe2(GoogleRE2.getDefaultInstance()) + .setRegex("grpc.*")) + .setSubstitution("gRPC")))) + .addHashPolicy( + io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder() + .setConnectionProperties(ConnectionProperties.newBuilder().setSourceIp(true)) + .setTerminal(true)) + .addHashPolicy( + io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder() + .setFilterState( + FilterState.newBuilder() + .setKey(ClientXdsClient.HASH_POLICY_FILTER_STATE_KEY))) + .addHashPolicy( + io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder() + .setQueryParameter(QueryParameter.newBuilder().setName("param"))) + .build(); + StructOrError struct = ClientXdsClient.parseRouteAction(proto); + List policies = struct.getStruct().hashPolicies(); + assertThat(policies).hasSize(3); + assertThat(policies.get(0).type()).isEqualTo(HashPolicy.Type.HEADER); + assertThat(policies.get(0).headerName()).isEqualTo("user-agent"); + assertThat(policies.get(0).isTerminal()).isFalse(); + assertThat(policies.get(0).regEx().pattern()).isEqualTo("grpc.*"); + assertThat(policies.get(0).regExSubstitution()).isEqualTo("gRPC"); + + assertThat(policies.get(1).type()).isEqualTo(HashPolicy.Type.SOURCE_IP); + assertThat(policies.get(1).isTerminal()).isTrue(); + + assertThat(policies.get(2).type()).isEqualTo(HashPolicy.Type.CHANNEL_ID); + assertThat(policies.get(2).isTerminal()).isFalse(); + } + @Test public void parseClusterWeight() { io.envoyproxy.envoy.config.route.v3.WeightedCluster.ClusterWeight proto = From 22cdc116a8b57f4a8066c64bf023dce108585ab5 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Thu, 11 Feb 2021 12:49:09 -0800 Subject: [PATCH 4/5] Allow only xx_hash hash function and refactor cluster config parsing code. --- .../java/io/grpc/xds/CdsLoadBalancer2.java | 28 +-- .../java/io/grpc/xds/ClientXdsClient.java | 110 ++++------ xds/src/main/java/io/grpc/xds/XdsClient.java | 199 ++++++++++-------- .../io/grpc/xds/CdsLoadBalancer2Test.java | 14 +- .../io/grpc/xds/ClientXdsClientTestBase.java | 162 +++++++------- 5 files changed, 256 insertions(+), 257 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index ae23528211b..4ba0a32030e 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -149,16 +149,16 @@ private void handleClusterDiscovered() { } if (clusterState.isLeaf) { DiscoveryMechanism instance; - if (clusterState.result.clusterType == ClusterType.EDS) { + if (clusterState.result.clusterType() == ClusterType.EDS) { instance = DiscoveryMechanism.forEds( - clusterState.name, clusterState.result.edsServiceName, - clusterState.result.lrsServerName, clusterState.result.maxConcurrentRequests, - clusterState.result.upstreamTlsContext); + clusterState.name, clusterState.result.edsServiceName(), + clusterState.result.lrsServerName(), clusterState.result.maxConcurrentRequests(), + clusterState.result.upstreamTlsContext()); } else { // logical DNS instance = DiscoveryMechanism.forLogicalDns( - clusterState.name, clusterState.result.lrsServerName, - clusterState.result.maxConcurrentRequests, - clusterState.result.upstreamTlsContext); + clusterState.name, clusterState.result.lrsServerName(), + clusterState.result.maxConcurrentRequests(), + clusterState.result.upstreamTlsContext()); } instances.add(instance); } else { @@ -178,7 +178,7 @@ private void handleClusterDiscovered() { helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable)); return; } - String endpointPickingPolicy = root.result.lbPolicy; + String endpointPickingPolicy = root.result.lbPolicy(); LoadBalancerProvider localityPickingLbProvider = lbRegistry.getProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded LoadBalancerProvider endpointPickingLbProvider = @@ -279,12 +279,12 @@ public void run() { logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update); discovered = true; result = update; - if (update.clusterType == ClusterType.AGGREGATE) { + if (update.clusterType() == ClusterType.AGGREGATE) { isLeaf = false; logger.log(XdsLogLevel.INFO, "Aggregate cluster {0}, underlying clusters: {1}", - update.clusterName, update.prioritizedClusterNames); + update.clusterName(), update.prioritizedClusterNames()); Map newChildStates = new LinkedHashMap<>(); - for (String cluster : update.prioritizedClusterNames) { + for (String cluster : update.prioritizedClusterNames()) { if (childClusterStates == null || !childClusterStates.containsKey(cluster)) { ClusterState childState = new ClusterState(cluster); childState.start(); @@ -299,13 +299,13 @@ public void run() { } } childClusterStates = newChildStates; - } else if (update.clusterType == ClusterType.EDS) { + } else if (update.clusterType() == ClusterType.EDS) { isLeaf = true; logger.log(XdsLogLevel.INFO, "EDS cluster {0}, edsServiceName: {1}", - update.clusterName, update.edsServiceName); + update.clusterName(), update.edsServiceName()); } else { // logical DNS isLeaf = true; - logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName); + logger.log(XdsLogLevel.INFO, "Logical DNS cluster {0}", update.clusterName()); } handleClusterDiscovered(); } diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 18ff5f16e5f..2aa619776aa 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -752,23 +752,44 @@ protected void handleCdsResponse(String versionInfo, List resources, String if (!cdsResourceSubscribers.containsKey(clusterName)) { continue; } - CdsUpdate update = null; + StructOrError structOrError; switch (cluster.getClusterDiscoveryTypeCase()) { case TYPE: - update = parseNonAggregateCluster(cluster, nonce, edsResources); + structOrError = parseNonAggregateCluster(cluster, edsResources); break; case CLUSTER_TYPE: - update = parseAggregateCluster(cluster, nonce); + structOrError = parseAggregateCluster(cluster); break; case CLUSTERDISCOVERYTYPE_NOT_SET: default: nackResponse(ResourceType.CDS, nonce, "Cluster " + clusterName + ": cluster discovery type unspecified"); + return; } - if (update == null) { + if (structOrError.getErrorDetail() != null) { + nackResponse(ResourceType.CDS, nonce, structOrError.errorDetail); return; } - cdsUpdates.put(clusterName, update); + CdsUpdate.Builder updateBuilder = structOrError.getStruct(); + String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to( + CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name()); + if (cluster.getLbPolicy() == LbPolicy.RING_HASH) { + HashFunction hashFunction; + RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); + if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.XX_HASH) { + hashFunction = HashFunction.XX_HASH; + } else { + nackResponse(ResourceType.CDS, nonce, + "Cluster " + clusterName + ": unsupported ring hash function: " + + lbConfig.getHashFunction()); + return; + } + updateBuilder.lbPolicy(lbPolicy, lbConfig.getMinimumRingSize().getValue(), + lbConfig.getMaximumRingSize().getValue(), hashFunction); + } else { + updateBuilder.lbPolicy(lbPolicy); + } + cdsUpdates.put(clusterName, updateBuilder.build()); } ackResponse(ResourceType.CDS, versionInfo, nonce); @@ -788,33 +809,13 @@ protected void handleCdsResponse(String versionInfo, List resources, String } } - /** - * Parses CDS resource for an aggregate cluster into {@link io.grpc.xds.XdsClient.CdsUpdate}. - * Returns {@code null} and nack the response with the given nonce if the resource is invalid. - */ - private CdsUpdate parseAggregateCluster(Cluster cluster, String nonce) { + private static StructOrError parseAggregateCluster(Cluster cluster) { String clusterName = cluster.getName(); - String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to( - CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name()); - long minRingSize = -1; - long maxRingSize = -1; - HashFunction hashFunction = null; - if (cluster.getLbPolicy() == LbPolicy.RING_HASH) { - RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); - minRingSize = lbConfig.getMinimumRingSize().getValue(); - maxRingSize = lbConfig.getMaximumRingSize().getValue(); - if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.XX_HASH) { - hashFunction = HashFunction.XX_HASH; - } else if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.MURMUR_HASH_2) { - hashFunction = HashFunction.MURMUR_HASH_2; - } - } CustomClusterType customType = cluster.getClusterType(); String typeName = customType.getName(); if (!typeName.equals(AGGREGATE_CLUSTER_TYPE_NAME)) { - nackResponse(ResourceType.CDS, nonce, + return StructOrError.fromError( "Cluster " + clusterName + ": unsupported custom cluster type: " + typeName); - return null; } io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig clusterConfig; Any unpackedClusterConfig = customType.getTypedConfig(); @@ -826,45 +827,23 @@ private CdsUpdate parseAggregateCluster(Cluster cluster, String nonce) { clusterConfig = unpackedClusterConfig.unpack( io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig.class); } catch (InvalidProtocolBufferException e) { - nackResponse(ResourceType.CDS, nonce, - "Cluster " + clusterName + ": invalid cluster config: " + e); + StructOrError.fromError("Cluster " + clusterName + ": malformed ClusterConfig: " + e); return null; } - return CdsUpdate.forAggregate(clusterName, lbPolicy, minRingSize, maxRingSize, hashFunction, - clusterConfig.getClustersList()); + return StructOrError.fromStruct(CdsUpdate.forAggregate( + clusterName, clusterConfig.getClustersList())); } - /** - * Parses CDS resource for a non-aggregate cluster (EDS or Logical DNS) into {@link - * io.grpc.xds.XdsClient.CdsUpdate}. Returns {@code null} and nack the response with the given - * nonce if the resource is invalid. - */ - private CdsUpdate parseNonAggregateCluster( - Cluster cluster, String nonce, Set edsResources) { + private static StructOrError parseNonAggregateCluster( + Cluster cluster, Set edsResources) { String clusterName = cluster.getName(); - String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to( - CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name()); - long minRingSize = -1; - long maxRingSize = -1; - HashFunction hashFunction = null; - if (cluster.getLbPolicy() == LbPolicy.RING_HASH) { - RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); - minRingSize = lbConfig.getMinimumRingSize().getValue(); - maxRingSize = lbConfig.getMaximumRingSize().getValue(); - if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.XX_HASH) { - hashFunction = HashFunction.XX_HASH; - } else if (lbConfig.getHashFunction() == RingHashLbConfig.HashFunction.MURMUR_HASH_2) { - hashFunction = HashFunction.MURMUR_HASH_2; - } - } String lrsServerName = null; Long maxConcurrentRequests = null; UpstreamTlsContext upstreamTlsContext = null; if (cluster.hasLrsServer()) { if (!cluster.getLrsServer().hasSelf()) { - nackResponse(ResourceType.CDS, nonce, + return StructOrError.fromError( "Cluster " + clusterName + ": only support LRS for the same management server"); - return null; } lrsServerName = ""; } @@ -890,9 +869,8 @@ private CdsUpdate parseNonAggregateCluster( unpacked = any.unpack( io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext.class); } catch (InvalidProtocolBufferException e) { - nackResponse(ResourceType.CDS, nonce, - "Cluster " + clusterName + ": invalid upstream TLS context: " + e); - return null; + return StructOrError.fromError( + "Cluster " + clusterName + ": malformed UpstreamTlsContext: " + e); } upstreamTlsContext = UpstreamTlsContext.fromEnvoyProtoUpstreamTlsContext(unpacked); } @@ -903,9 +881,8 @@ private CdsUpdate parseNonAggregateCluster( io.envoyproxy.envoy.config.cluster.v3.Cluster.EdsClusterConfig edsClusterConfig = cluster.getEdsClusterConfig(); if (!edsClusterConfig.getEdsConfig().hasAds()) { - nackResponse(ResourceType.CDS, nonce, "Cluster " + clusterName + ": " - + "field eds_cluster_config must be set to indicate to use EDS over ADS."); - return null; + return StructOrError.fromError("Cluster " + clusterName + + ": field eds_cluster_config must be set to indicate to use EDS over ADS."); } // If the service_name field is set, that value will be used for the EDS request. if (!edsClusterConfig.getServiceName().isEmpty()) { @@ -914,15 +891,14 @@ private CdsUpdate parseNonAggregateCluster( } else { edsResources.add(clusterName); } - return CdsUpdate.forEds(clusterName, lbPolicy, minRingSize, maxRingSize, hashFunction, - edsServiceName, lrsServerName, maxConcurrentRequests, upstreamTlsContext); + return StructOrError.fromStruct(CdsUpdate.forEds( + clusterName, edsServiceName, lrsServerName, maxConcurrentRequests, upstreamTlsContext)); } else if (type.equals(DiscoveryType.LOGICAL_DNS)) { - return CdsUpdate.forLogicalDns(clusterName, lbPolicy, minRingSize, maxRingSize, hashFunction, - lrsServerName, maxConcurrentRequests, upstreamTlsContext); + return StructOrError.fromStruct(CdsUpdate.forLogicalDns( + clusterName, lrsServerName, maxConcurrentRequests, upstreamTlsContext)); } - nackResponse(ResourceType.CDS, nonce, + return StructOrError.fromError( "Cluster " + clusterName + ": unsupported built-in discovery type: " + type); - return null; } @Override diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index e379c78e718..4018770dec9 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -19,8 +19,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.auto.value.AutoValue; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; +import com.google.common.collect.ImmutableList; import io.grpc.Status; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.Endpoints.LocalityLbEndpoints; @@ -156,135 +158,154 @@ public boolean equals(Object o) { } /** xDS resource update for cluster-level configuration. */ - static final class CdsUpdate implements ResourceUpdate { - final String clusterName; - final ClusterType clusterType; + @AutoValue + abstract static class CdsUpdate implements ResourceUpdate { + abstract String clusterName(); + + abstract ClusterType clusterType(); + // Endpoint-level load balancing policy. - final String lbPolicy; + abstract String lbPolicy(); + // Only valid if lbPolicy is "ring_hash". - final long minRingSize; + abstract long minRingSize(); + // Only valid if lbPolicy is "ring_hash". - final long maxRingSize; + abstract long maxRingSize(); + // Only valid if lbPolicy is "ring_hash". @Nullable - final HashFunction hashFunction; + abstract HashFunction hashFunction(); + // Alternative resource name to be used in EDS requests. /// Only valid for EDS cluster. @Nullable - final String edsServiceName; + abstract String edsServiceName(); + // Load report server name for reporting loads via LRS. // Only valid for EDS or LOGICAL_DNS cluster. @Nullable - final String lrsServerName; + abstract String lrsServerName(); + // Max number of concurrent requests can be sent to this cluster. // Only valid for EDS or LOGICAL_DNS cluster. @Nullable - final Long maxConcurrentRequests; + abstract Long maxConcurrentRequests(); + // TLS context used to connect to connect to this cluster. // Only valid for EDS or LOGICAL_DNS cluster. @Nullable - final UpstreamTlsContext upstreamTlsContext; + abstract UpstreamTlsContext upstreamTlsContext(); + // List of underlying clusters making of this aggregate cluster. // Only valid for AGGREGATE cluster. @Nullable - final List prioritizedClusterNames; - - static CdsUpdate forAggregate(String clusterName, String lbPolicy, long minRingSize, - long maxRingSize, @Nullable HashFunction hashFunction, - List prioritizedClusterNames) { - return new CdsUpdate(clusterName, ClusterType.AGGREGATE, lbPolicy, minRingSize, maxRingSize, - hashFunction, null, null, null, null, - checkNotNull(prioritizedClusterNames, "prioritizedClusterNames")); + abstract ImmutableList prioritizedClusterNames(); + + static Builder forAggregate(String clusterName, List prioritizedClusterNames) { + checkNotNull(prioritizedClusterNames, "prioritizedClusterNames"); + return new AutoValue_XdsClient_CdsUpdate.Builder() + .clusterName(clusterName) + .clusterType(ClusterType.AGGREGATE) + .minRingSize(0) + .maxRingSize(0) + .prioritizedClusterNames(ImmutableList.copyOf(prioritizedClusterNames)); } - static CdsUpdate forEds(String clusterName, String lbPolicy, long minRingSize, - long maxRingSize, @Nullable HashFunction hashFunction, @Nullable String edsServiceName, + static Builder forEds(String clusterName, @Nullable String edsServiceName, @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) { - return new CdsUpdate(clusterName, ClusterType.EDS, lbPolicy, minRingSize, maxRingSize, - hashFunction, edsServiceName, lrsServerName, maxConcurrentRequests, upstreamTlsContext, - null); + return new AutoValue_XdsClient_CdsUpdate.Builder() + .clusterName(clusterName) + .clusterType(ClusterType.EDS) + .minRingSize(0) + .maxRingSize(0) + .edsServiceName(edsServiceName) + .lrsServerName(lrsServerName) + .maxConcurrentRequests(maxConcurrentRequests) + .upstreamTlsContext(upstreamTlsContext); } - static CdsUpdate forLogicalDns(String clusterName, String lbPolicy, long minRingSize, - long maxRingSize, @Nullable HashFunction hashFunction, @Nullable String lrsServerName, + static Builder forLogicalDns(String clusterName, @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) { - return new CdsUpdate(clusterName, ClusterType.LOGICAL_DNS, lbPolicy, minRingSize, - maxRingSize, hashFunction, null, lrsServerName, maxConcurrentRequests, - upstreamTlsContext, null); + return new AutoValue_XdsClient_CdsUpdate.Builder() + .clusterName(clusterName) + .clusterType(ClusterType.LOGICAL_DNS) + .minRingSize(0) + .maxRingSize(0) + .lrsServerName(lrsServerName) + .maxConcurrentRequests(maxConcurrentRequests) + .upstreamTlsContext(upstreamTlsContext); } - CdsUpdate(String clusterName, ClusterType clusterType, @Nullable String lbPolicy, - long minRingSize, long maxRingSize, @Nullable HashFunction hashFunction, - @Nullable String edsServiceName, @Nullable String lrsServerName, - @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext, - @Nullable List prioritizedClusterNames) { - this.clusterName = checkNotNull(clusterName, "clusterName"); - this.clusterType = checkNotNull(clusterType, "clusterType"); - this.lbPolicy = checkNotNull(lbPolicy, "lbPolicy"); - this.minRingSize = minRingSize; - this.maxRingSize = maxRingSize; - this.hashFunction = hashFunction; - this.edsServiceName = edsServiceName; - this.lrsServerName = lrsServerName; - this.maxConcurrentRequests = maxConcurrentRequests; - this.upstreamTlsContext = upstreamTlsContext; - this.prioritizedClusterNames = prioritizedClusterNames != null - ? Collections.unmodifiableList(new ArrayList<>(prioritizedClusterNames)) : null; - } - - @Override - public int hashCode() { - return Objects.hash(clusterName, clusterType, lbPolicy, minRingSize, maxRingSize, - hashFunction, edsServiceName, lrsServerName, maxConcurrentRequests, upstreamTlsContext, - prioritizedClusterNames); + enum ClusterType { + EDS, LOGICAL_DNS, AGGREGATE } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - CdsUpdate that = (CdsUpdate) o; - return Objects.equals(clusterName, that.clusterName) - && Objects.equals(clusterType, that.clusterType) - && Objects.equals(lbPolicy, that.lbPolicy) - && minRingSize == that.minRingSize - && maxRingSize == that.maxRingSize - && Objects.equals(hashFunction, that.hashFunction) - && Objects.equals(edsServiceName, that.edsServiceName) - && Objects.equals(lrsServerName, that.lrsServerName) - && Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests) - && Objects.equals(upstreamTlsContext, that.upstreamTlsContext) - && Objects.equals(prioritizedClusterNames, that.prioritizedClusterNames); + enum HashFunction { + XX_HASH } + // FIXME(chengyuanzhang): delete this after UpstreamTlsContext's toString() is fixed. @Override - public String toString() { + public final String toString() { return MoreObjects.toStringHelper(this) - .add("clusterName", clusterName) - .add("clusterType", clusterType) - .add("lbPolicy", lbPolicy) - .add("minRingSize", minRingSize) - .add("maxRingSize", maxRingSize) - .add("hashFunction", hashFunction) - .add("edsServiceName", edsServiceName) - .add("lrsServerName", lrsServerName) - .add("maxConcurrentRequests", maxConcurrentRequests) + .add("clusterName", clusterName()) + .add("clusterType", clusterType()) + .add("lbPolicy", lbPolicy()) + .add("minRingSize", minRingSize()) + .add("maxRingSize", maxRingSize()) + .add("hashFunction", hashFunction()) + .add("edsServiceName", edsServiceName()) + .add("lrsServerName", lrsServerName()) + .add("maxConcurrentRequests", maxConcurrentRequests()) // Exclude upstreamTlsContext as its string representation is cumbersome. - .add("prioritizedClusterNames", prioritizedClusterNames) + .add("prioritizedClusterNames", prioritizedClusterNames()) .toString(); } - enum ClusterType { - EDS, LOGICAL_DNS, AGGREGATE - } + @AutoValue.Builder + abstract static class Builder { + // Private do not use. + protected abstract Builder clusterName(String clusterName); - enum HashFunction { - XX_HASH, MURMUR_HASH_2 + // Private do not use. + protected abstract Builder clusterType(ClusterType clusterType); + + // Private do not use. + protected abstract Builder lbPolicy(String lbPolicy); + + Builder lbPolicy(String lbPolicy, long minRingSize, long maxRingSize, + HashFunction hashFunction) { + return this.lbPolicy(lbPolicy).minRingSize(minRingSize).maxRingSize(maxRingSize) + .hashFunction(checkNotNull(hashFunction, "hashFunction")); + } + + // Private do not use. + protected abstract Builder minRingSize(long minRingSize); + + // Private do not use. + protected abstract Builder maxRingSize(long maxRingSize); + + // Private do not use. + protected abstract Builder hashFunction(HashFunction hashFunction); + + // Private do not use. + protected abstract Builder edsServiceName(String edsServiceName); + + // Private do not use. + protected abstract Builder lrsServerName(String lrsServerName); + + // Private do not use. + protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests); + + // Private do not use. + protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext); + + // Private do not use. + protected abstract Builder prioritizedClusterNames(List prioritizedClusterNames); + + abstract CdsUpdate build(); } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 4a9806be05b..b95117acbff 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -544,8 +544,9 @@ private void deliverEdsCluster(String clusterName, @Nullable String edsServiceNa @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) { if (watchers.containsKey(clusterName)) { - CdsUpdate update = CdsUpdate.forEds(clusterName, "round_robin", -1, -1, null, - edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext); + CdsUpdate update = CdsUpdate.forEds( + clusterName, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext) + .lbPolicy("round_robin").build(); watchers.get(clusterName).onChanged(update); } } @@ -553,16 +554,17 @@ private void deliverEdsCluster(String clusterName, @Nullable String edsServiceNa private void deliverLogicalDnsCluster(String clusterName, @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) { if (watchers.containsKey(clusterName)) { - CdsUpdate update = CdsUpdate.forLogicalDns(clusterName, "round_robin", -1, -1, null, - lrsServerName, maxConcurrentRequests, tlsContext); + CdsUpdate update = CdsUpdate.forLogicalDns( + clusterName, lrsServerName, maxConcurrentRequests, tlsContext) + .lbPolicy("round_robin").build(); watchers.get(clusterName).onChanged(update); } } private void deliverAggregateCluster(String clusterName, List clusters) { if (watchers.containsKey(clusterName)) { - CdsUpdate update = CdsUpdate.forAggregate(clusterName, "round_robin", -1, -1, null, - clusters); + CdsUpdate update = CdsUpdate.forAggregate(clusterName, clusters) + .lbPolicy("round_robin").build(); watchers.get(clusterName).onChanged(update); } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index ff7f8ffd39e..5c88d366e55 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -623,13 +623,13 @@ public void cdsResourceFound() { "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - assertThat(cdsUpdate.edsServiceName).isNull(); - assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); - assertThat(cdsUpdate.lrsServerName).isNull(); - assertThat(cdsUpdate.maxConcurrentRequests).isNull(); - assertThat(cdsUpdate.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isNull(); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } @@ -648,16 +648,16 @@ public void cdsResourceFound_ringHashLbPolicy() { "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - assertThat(cdsUpdate.edsServiceName).isNull(); - assertThat(cdsUpdate.lbPolicy).isEqualTo("ring_hash"); - assertThat(cdsUpdate.hashFunction).isEqualTo(HashFunction.XX_HASH); - assertThat(cdsUpdate.minRingSize).isEqualTo(10L); - assertThat(cdsUpdate.maxRingSize).isEqualTo(100L); - assertThat(cdsUpdate.lrsServerName).isNull(); - assertThat(cdsUpdate.maxConcurrentRequests).isNull(); - assertThat(cdsUpdate.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isNull(); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("ring_hash"); + assertThat(cdsUpdate.hashFunction()).isEqualTo(HashFunction.XX_HASH); + assertThat(cdsUpdate.minRingSize()).isEqualTo(10L); + assertThat(cdsUpdate.maxRingSize()).isEqualTo(100L); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); } @@ -676,10 +676,10 @@ public void cdsResponseWithAggregateCluster() { "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.AGGREGATE); - assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); - assertThat(cdsUpdate.prioritizedClusterNames).containsExactlyElementsIn(candidates).inOrder(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder(); } @Test @@ -696,13 +696,13 @@ public void cdsResponseWithCircuitBreakers() { "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - assertThat(cdsUpdate.edsServiceName).isNull(); - assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); - assertThat(cdsUpdate.lrsServerName).isNull(); - assertThat(cdsUpdate.maxConcurrentRequests).isEqualTo(200L); - assertThat(cdsUpdate.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isNull(); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); } /** @@ -730,7 +730,7 @@ public void cdsResponseWithUpstreamTlsContext() { verify(cdsResourceWatcher, times(1)).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); SdsSecretConfig validationContextSdsSecretConfig = - cdsUpdate.upstreamTlsContext.getCommonTlsContext().getValidationContextSdsSecretConfig(); + cdsUpdate.upstreamTlsContext().getCommonTlsContext().getValidationContextSdsSecretConfig(); assertThat(validationContextSdsSecretConfig.getName()).isEqualTo("secret1"); assertThat( Iterables.getOnlyElement( @@ -759,13 +759,13 @@ public void cachedCdsResource_data() { xdsClient.watchCdsResource(CDS_RESOURCE, watcher); verify(watcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - assertThat(cdsUpdate.edsServiceName).isNull(); - assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); - assertThat(cdsUpdate.lrsServerName).isNull(); - assertThat(cdsUpdate.maxConcurrentRequests).isNull(); - assertThat(cdsUpdate.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isNull(); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); call.verifyNoMoreRequest(); } @@ -794,12 +794,12 @@ public void cdsResourceUpdated() { "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.LOGICAL_DNS); - assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); - assertThat(cdsUpdate.lrsServerName).isNull(); - assertThat(cdsUpdate.maxConcurrentRequests).isNull(); - assertThat(cdsUpdate.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); String edsService = "eds-service-bar.googleapis.com"; clusters = ImmutableList.of( @@ -812,13 +812,13 @@ public void cdsResourceUpdated() { "0001"); verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - assertThat(cdsUpdate.edsServiceName).isEqualTo(edsService); - assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); - assertThat(cdsUpdate.lrsServerName).isEqualTo(""); - assertThat(cdsUpdate.maxConcurrentRequests).isNull(); - assertThat(cdsUpdate.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); } @Test @@ -834,13 +834,13 @@ public void cdsResourceDeleted() { "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - assertThat(cdsUpdate.edsServiceName).isNull(); - assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); - assertThat(cdsUpdate.lrsServerName).isNull(); - assertThat(cdsUpdate.maxConcurrentRequests).isNull(); - assertThat(cdsUpdate.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isNull(); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); call.sendResponse("1", Collections.emptyList(), ResourceType.CDS, "0001"); @@ -874,30 +874,30 @@ public void multipleCdsWatchers() { call.sendResponse("0", clusters, ResourceType.CDS, "0000"); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(CDS_RESOURCE); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.LOGICAL_DNS); - assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); - assertThat(cdsUpdate.lrsServerName).isNull(); - assertThat(cdsUpdate.maxConcurrentRequests).isNull(); - assertThat(cdsUpdate.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isNull(); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); verify(watcher1).onChanged(cdsUpdateCaptor.capture()); cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(cdsResource); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - assertThat(cdsUpdate.edsServiceName).isEqualTo(edsService); - assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); - assertThat(cdsUpdate.lrsServerName).isEqualTo(""); - assertThat(cdsUpdate.maxConcurrentRequests).isNull(); - assertThat(cdsUpdate.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResource); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); verify(watcher2).onChanged(cdsUpdateCaptor.capture()); cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.clusterName).isEqualTo(cdsResource); - assertThat(cdsUpdate.clusterType).isEqualTo(ClusterType.EDS); - assertThat(cdsUpdate.edsServiceName).isEqualTo(edsService); - assertThat(cdsUpdate.lbPolicy).isEqualTo("round_robin"); - assertThat(cdsUpdate.lrsServerName).isEqualTo(""); - assertThat(cdsUpdate.maxConcurrentRequests).isNull(); - assertThat(cdsUpdate.upstreamTlsContext).isNull(); + assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResource); + assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); + assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); + assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); + assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); + assertThat(cdsUpdate.upstreamTlsContext()).isNull(); } @Test @@ -1119,12 +1119,12 @@ public void edsResourceDeletedByCds() { call.sendResponse("0", clusters, ResourceType.CDS, "0000"); verify(cdsWatcher).onChanged(cdsUpdateCaptor.capture()); CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.edsServiceName).isEqualTo(null); - assertThat(cdsUpdate.lrsServerName).isEqualTo(""); + assertThat(cdsUpdate.edsServiceName()).isEqualTo(null); + assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture()); cdsUpdate = cdsUpdateCaptor.getValue(); - assertThat(cdsUpdate.edsServiceName).isEqualTo(EDS_RESOURCE); - assertThat(cdsUpdate.lrsServerName).isNull(); + assertThat(cdsUpdate.edsServiceName()).isEqualTo(EDS_RESOURCE); + assertThat(cdsUpdate.lrsServerName()).isNull(); List clusterLoadAssignments = ImmutableList.of( @@ -1159,7 +1159,7 @@ public void edsResourceDeletedByCds() { Any.pack(mf.buildEdsCluster(CDS_RESOURCE, null, "round_robin", null, false, null, null))); call.sendResponse("1", clusters, ResourceType.CDS, "0001"); verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture()); - assertThat(cdsUpdateCaptor.getValue().edsServiceName).isNull(); + assertThat(cdsUpdateCaptor.getValue().edsServiceName()).isNull(); verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE); verifyNoMoreInteractions(cdsWatcher, edsWatcher); } From 5a7109e7925044fa9c43594333ebe544c884ab9f Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Thu, 11 Feb 2021 14:40:06 -0800 Subject: [PATCH 5/5] Drop support for source ip hashing. --- xds/src/main/java/io/grpc/xds/ClientXdsClient.java | 5 ----- xds/src/main/java/io/grpc/xds/VirtualHost.java | 6 +----- .../java/io/grpc/xds/ClientXdsClientDataTest.java | 14 ++++++-------- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 2aa619776aa..5ba10c329ef 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -489,11 +489,6 @@ static StructOrError parseRouteAction( policy = HashPolicy.forHeader( terminal, headerCfg.getHeaderName(), regEx, regExSubstitute); break; - case CONNECTION_PROPERTIES: - if (config.getConnectionProperties().getSourceIp()) { - policy = HashPolicy.forSourceIp(terminal); - } - break; case FILTER_STATE: if (config.getFilterState().getKey().equals(HASH_POLICY_FILTER_STATE_KEY)) { policy = HashPolicy.forChannelId(terminal); diff --git a/xds/src/main/java/io/grpc/xds/VirtualHost.java b/xds/src/main/java/io/grpc/xds/VirtualHost.java index 663b41077d9..30e08d5e23c 100644 --- a/xds/src/main/java/io/grpc/xds/VirtualHost.java +++ b/xds/src/main/java/io/grpc/xds/VirtualHost.java @@ -168,10 +168,6 @@ static HashPolicy forHeader(boolean isTerminal, String headerName, return HashPolicy.create(Type.HEADER, isTerminal, headerName, regEx, regExSubstitution); } - static HashPolicy forSourceIp(boolean isTerminal) { - return HashPolicy.create(Type.SOURCE_IP, isTerminal, null, null, null); - } - static HashPolicy forChannelId(boolean isTerminal) { return HashPolicy.create(Type.CHANNEL_ID, isTerminal, null, null, null); } @@ -183,7 +179,7 @@ private static HashPolicy create(Type type, boolean isTerminal, @Nullable String } enum Type { - HEADER, SOURCE_IP, CHANNEL_ID + HEADER, CHANNEL_ID } } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java index 834d911f4d1..988a00350e5 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java @@ -419,7 +419,7 @@ public void parseRouteAction_withHashPolicies() { .addHashPolicy( io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder() .setConnectionProperties(ConnectionProperties.newBuilder().setSourceIp(true)) - .setTerminal(true)) + .setTerminal(true)) // unsupported .addHashPolicy( io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder() .setFilterState( @@ -427,22 +427,20 @@ public void parseRouteAction_withHashPolicies() { .setKey(ClientXdsClient.HASH_POLICY_FILTER_STATE_KEY))) .addHashPolicy( io.envoyproxy.envoy.config.route.v3.RouteAction.HashPolicy.newBuilder() - .setQueryParameter(QueryParameter.newBuilder().setName("param"))) + .setQueryParameter( + QueryParameter.newBuilder().setName("param"))) // unsupported .build(); StructOrError struct = ClientXdsClient.parseRouteAction(proto); List policies = struct.getStruct().hashPolicies(); - assertThat(policies).hasSize(3); + assertThat(policies).hasSize(2); assertThat(policies.get(0).type()).isEqualTo(HashPolicy.Type.HEADER); assertThat(policies.get(0).headerName()).isEqualTo("user-agent"); assertThat(policies.get(0).isTerminal()).isFalse(); assertThat(policies.get(0).regEx().pattern()).isEqualTo("grpc.*"); assertThat(policies.get(0).regExSubstitution()).isEqualTo("gRPC"); - assertThat(policies.get(1).type()).isEqualTo(HashPolicy.Type.SOURCE_IP); - assertThat(policies.get(1).isTerminal()).isTrue(); - - assertThat(policies.get(2).type()).isEqualTo(HashPolicy.Type.CHANNEL_ID); - assertThat(policies.get(2).isTerminal()).isFalse(); + assertThat(policies.get(1).type()).isEqualTo(HashPolicy.Type.CHANNEL_ID); + assertThat(policies.get(1).isTerminal()).isFalse(); } @Test