Skip to content

Commit

Permalink
xds: use defaults for unspecified ring_hash_lb_config values (#8237)
Browse files Browse the repository at this point in the history
Sets ring_hash LB config to its default values (min_ring_size = 1024 and max_ring_size = 8M) if not given by the control plane. This applies to both parsing RingHashLbConfig from xDS proto and parsing RingHashConfig from the JSON config (currently not used). If the values are given by the control plane, they are validated such that min_ring_size is not less than max_ring_size and do not exceed the 8M limit.
  • Loading branch information
voidzcy committed Jun 7, 2021
1 parent 29618a6 commit fa4b980
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 51 deletions.
44 changes: 31 additions & 13 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ final class ClientXdsClient extends AbstractXdsClient {
@VisibleForTesting
static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
@VisibleForTesting
static final long DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE = 1024L;
@VisibleForTesting
static final long DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE = 8 * 1024 * 1024L;
@VisibleForTesting
static final long MAX_RING_HASH_LB_POLICY_RING_SIZE = 8 * 1024 * 1024L;
@VisibleForTesting
static final String AGGREGATE_CLUSTER_TYPE_NAME = "envoy.clusters.aggregate";
@VisibleForTesting
static final String HASH_POLICY_FILTER_STATE_KEY = "io.grpc.channel_id";
Expand Down Expand Up @@ -790,7 +796,7 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String
// Process Cluster into CdsUpdate.
CdsUpdate cdsUpdate;
try {
cdsUpdate = processCluster(cluster, retainedEdsResources);
cdsUpdate = parseCluster(cluster, retainedEdsResources);
} catch (ResourceInvalidException e) {
errors.add(
"CDS response Cluster '" + clusterName + "' validation error: " + e.getMessage());
Expand Down Expand Up @@ -818,7 +824,8 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String
}
}

private static CdsUpdate processCluster(Cluster cluster, Set<String> retainedEdsResources)
@VisibleForTesting
static CdsUpdate parseCluster(Cluster cluster, Set<String> retainedEdsResources)
throws ResourceInvalidException {
StructOrError<CdsUpdate.Builder> structOrError;
switch (cluster.getClusterDiscoveryTypeCase()) {
Expand All @@ -830,26 +837,36 @@ private static CdsUpdate processCluster(Cluster cluster, Set<String> retainedEds
break;
case CLUSTERDISCOVERYTYPE_NOT_SET:
default:
throw new ResourceInvalidException("Unspecified cluster discovery type");
throw new ResourceInvalidException(
"Cluster " + cluster.getName() + ": unspecified cluster discovery type");
}
if (structOrError.getErrorDetail() != null) {
throw new ResourceInvalidException(structOrError.getErrorDetail());
}

CdsUpdate.Builder updateBuilder = structOrError.getStruct();

if (cluster.getLbPolicy() == LbPolicy.RING_HASH) {
RingHashLbConfig lbConfig = cluster.getRingHashLbConfig();
if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH) {
long minRingSize =
lbConfig.hasMinimumRingSize()
? lbConfig.getMinimumRingSize().getValue()
: DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE;
long maxRingSize =
lbConfig.hasMaximumRingSize()
? lbConfig.getMaximumRingSize().getValue()
: DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE;
if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH
|| minRingSize > maxRingSize
|| maxRingSize > MAX_RING_HASH_LB_POLICY_RING_SIZE) {
throw new ResourceInvalidException(
"Unsupported ring hash function: " + lbConfig.getHashFunction());
"Cluster " + cluster.getName() + ": invalid ring_hash_lb_config: " + lbConfig);
}
updateBuilder.lbPolicy(CdsUpdate.LbPolicy.RING_HASH,
lbConfig.getMinimumRingSize().getValue(), lbConfig.getMaximumRingSize().getValue());
updateBuilder.ringHashLbPolicy(minRingSize, maxRingSize);
} else if (cluster.getLbPolicy() == LbPolicy.ROUND_ROBIN) {
updateBuilder.lbPolicy(CdsUpdate.LbPolicy.ROUND_ROBIN);
updateBuilder.roundRobinLbPolicy();
} else {
throw new ResourceInvalidException("Unsupported lb policy: " + cluster.getLbPolicy());
throw new ResourceInvalidException(
"Cluster " + cluster.getName() + ": unsupported lb policy: " + cluster.getLbPolicy());
}

return updateBuilder.build();
Expand Down Expand Up @@ -1610,14 +1627,15 @@ private void notifyWatcher(ResourceWatcher watcher, ResourceUpdate update) {
}
}

private static final class ResourceInvalidException extends Exception {
@VisibleForTesting
static final class ResourceInvalidException extends Exception {
private static final long serialVersionUID = 0L;

public ResourceInvalidException(String message) {
private ResourceInvalidException(String message) {
super(message, null, false, false);
}

public ResourceInvalidException(String message, Throwable cause) {
private ResourceInvalidException(String message, Throwable cause) {
super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false);
}
}
Expand Down
23 changes: 19 additions & 4 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancerProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.grpc.xds;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Internal;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
Expand All @@ -32,6 +33,17 @@
@Internal
public final class RingHashLoadBalancerProvider extends LoadBalancerProvider {

// Same as ClientXdsClient.DEFAULT_RING_HASH_LB_POLICY_MIN_RING_SIZE
@VisibleForTesting
static final long DEFAULT_MIN_RING_SIZE = 1024L;
// Same as ClientXdsClient.DEFAULT_RING_HASH_LB_POLICY_MAX_RING_SIZE
@VisibleForTesting
static final long DEFAULT_MAX_RING_SIZE = 8 * 1024 * 1024L;
// Maximum number of ring entries allowed. Setting this too large can result in slow
// ring construction and OOM error.
// Same as ClientXdsClient.MAX_RING_HASH_LB_POLICY_RING_SIZE
static final long MAX_RING_SIZE = 8 * 1024 * 1024L;

private static final boolean enableRingHash =
Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_RING_HASH"));

Expand Down Expand Up @@ -59,11 +71,14 @@ public String getPolicyName() {
public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawLoadBalancingPolicyConfig) {
Long minRingSize = JsonUtil.getNumberAsLong(rawLoadBalancingPolicyConfig, "minRingSize");
Long maxRingSize = JsonUtil.getNumberAsLong(rawLoadBalancingPolicyConfig, "maxRingSize");
if (minRingSize == null || maxRingSize == null) {
return ConfigOrError.fromError(Status.INVALID_ARGUMENT.withDescription(
"Missing 'mingRingSize'/'maxRingSize'"));
if (minRingSize == null) {
minRingSize = DEFAULT_MIN_RING_SIZE;
}
if (maxRingSize == null) {
maxRingSize = DEFAULT_MAX_RING_SIZE;
}
if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize) {
if (minRingSize <= 0 || maxRingSize <= 0 || minRingSize > maxRingSize
|| maxRingSize > MAX_RING_SIZE) {
return ConfigOrError.fromError(Status.INVALID_ARGUMENT.withDescription(
"Invalid 'mingRingSize'/'maxRingSize'"));
}
Expand Down
15 changes: 10 additions & 5 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,21 @@ abstract static class Builder {
// Private, use one of the static factory methods instead.
protected abstract Builder clusterType(ClusterType clusterType);

abstract Builder lbPolicy(LbPolicy lbPolicy);
// Private, use roundRobinLbPolicy() or ringHashLbPolicy(long, long).
protected abstract Builder lbPolicy(LbPolicy lbPolicy);

Builder lbPolicy(LbPolicy lbPolicy, long minRingSize, long maxRingSize) {
return this.lbPolicy(lbPolicy).minRingSize(minRingSize).maxRingSize(maxRingSize);
Builder roundRobinLbPolicy() {
return this.lbPolicy(LbPolicy.ROUND_ROBIN);
}

// Private, use lbPolicy(LbPolicy, long, long).
Builder ringHashLbPolicy(long minRingSize, long maxRingSize) {
return this.lbPolicy(LbPolicy.RING_HASH).minRingSize(minRingSize).maxRingSize(maxRingSize);
}

// Private, use ringHashLbPolicy(long, long).
protected abstract Builder minRingSize(long minRingSize);

// Private, use lbPolicy(.LbPolicy, long, long)
// Private, use ringHashLbPolicy(long, long).
protected abstract Builder maxRingSize(long maxRingSize);

// Private, use CdsUpdate.forEds() instead.
Expand Down
51 changes: 25 additions & 26 deletions xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
import io.grpc.xds.RingHashLoadBalancer.RingHashConfig;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy;
import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -148,7 +147,7 @@ public void tearDown() {
public void discoverTopLevelEdsCluster() {
CdsUpdate update =
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Expand All @@ -165,7 +164,7 @@ public void discoverTopLevelEdsCluster() {
public void discoverTopLevelLogicalDnsCluster() {
CdsUpdate update =
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Expand All @@ -192,7 +191,7 @@ public void nonAggregateCluster_resourceNotExist_returnErrorPicker() {
public void nonAggregateCluster_resourceUpdate() {
CdsUpdate update =
CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Expand All @@ -202,7 +201,7 @@ public void nonAggregateCluster_resourceUpdate() {
100L, upstreamTlsContext);

update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
childLbConfig = (ClusterResolverConfig) childBalancer.config;
instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms);
Expand All @@ -214,7 +213,7 @@ public void nonAggregateCluster_resourceUpdate() {
public void nonAggregateCluster_resourceRevoked() {
CdsUpdate update =
CdsUpdate.forLogicalDns(CLUSTER, DNS_HOST_NAME, null, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Expand All @@ -240,7 +239,7 @@ public void discoverAggregateCluster() {
// CLUSTER (aggr.) -> [cluster1 (aggr.), cluster2 (logical DNS)]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
.lbPolicy(LbPolicy.RING_HASH, 100L, 1000L).build();
.ringHashLbPolicy(100L, 1000L).build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
assertThat(childBalancers).isEmpty();
Expand All @@ -249,24 +248,24 @@ public void discoverAggregateCluster() {
// cluster1 (aggr.) -> [cluster3 (EDS), cluster4 (EDS)]
CdsUpdate update1 =
CdsUpdate.forAggregate(cluster1, Arrays.asList(cluster3, cluster4))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster1, update1);
assertThat(xdsClient.watchers.keySet()).containsExactly(
CLUSTER, cluster1, cluster2, cluster3, cluster4);
assertThat(childBalancers).isEmpty();
CdsUpdate update3 =
CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster3, update3);
assertThat(childBalancers).isEmpty();
CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, null, 100L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
assertThat(childBalancers).isEmpty();
CdsUpdate update4 =
CdsUpdate.forEds(cluster4, null, LRS_SERVER_NAME, 300L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster4, update4);
assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Expand All @@ -293,7 +292,7 @@ public void aggregateCluster_noNonAggregateClusterExits_returnErrorPicker() {
// CLUSTER (aggr.) -> [cluster1 (EDS)]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
xdsClient.deliverResourceNotExist(cluster1);
Expand All @@ -311,16 +310,16 @@ public void aggregateCluster_descendantClustersRevoked() {
// CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
CdsUpdate update1 =
CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster1, update1);
CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_NAME, 100L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
Expand Down Expand Up @@ -358,16 +357,16 @@ public void aggregateCluster_rootClusterRevoked() {
// CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2);
CdsUpdate update1 =
CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster1, update1);
CdsUpdate update2 =
CdsUpdate.forLogicalDns(cluster2, DNS_HOST_NAME, LRS_SERVER_NAME, 100L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
Expand Down Expand Up @@ -395,28 +394,28 @@ public void aggregateCluster_intermediateClusterChanges() {
// CLUSTER (aggr.) -> [cluster1]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);

// CLUSTER (aggr.) -> [cluster2 (aggr.)]
String cluster2 = "cluster-02.googleapis.com";
update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster2))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2);

// cluster2 (aggr.) -> [cluster3 (EDS)]
String cluster3 = "cluster-03.googleapis.com";
CdsUpdate update2 =
CdsUpdate.forAggregate(cluster2, Collections.singletonList(cluster3))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster2, update2);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3);
CdsUpdate update3 =
CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster3, update3);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config;
Expand All @@ -443,7 +442,7 @@ public void aggregateCluster_discoveryErrorBeforeChildLbCreated_returnErrorPicke
// CLUSTER (aggr.) -> [cluster1]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1);
Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM");
Expand All @@ -460,11 +459,11 @@ public void aggregateCluster_discoveryErrorAfterChildLbCreated_propagateToChildL
// CLUSTER (aggr.) -> [cluster1 (logical DNS)]
CdsUpdate update =
CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1))
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
CdsUpdate update1 =
CdsUpdate.forLogicalDns(cluster1, DNS_HOST_NAME, LRS_SERVER_NAME, 200L, null)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(cluster1, update1);
FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers);
ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childLb.config;
Expand All @@ -489,7 +488,7 @@ public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErr
public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() {
CdsUpdate update =
CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext)
.lbPolicy(LbPolicy.ROUND_ROBIN).build();
.roundRobinLbPolicy().build();
xdsClient.deliverCdsUpdate(CLUSTER, update);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.shutdown).isFalse();
Expand Down

0 comments on commit fa4b980

Please sign in to comment.