Skip to content

Commit

Permalink
xds: Support localities in multiple priorities
Browse files Browse the repository at this point in the history
Additional logic to support for the same locality appearing under
multiple priorities.
  • Loading branch information
temawi committed Nov 8, 2022
1 parent a82ea0c commit e4b7195
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 61 deletions.
20 changes: 5 additions & 15 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ private void handleEndpointResourceUpdate() {
List<EquivalentAddressGroup> addresses = new ArrayList<>();
Map<String, PriorityChildConfig> priorityChildConfigs = new HashMap<>();
List<String> priorities = new ArrayList<>(); // totally ordered priority list
Map<Locality, Integer> localityWeights = new HashMap<>();

Status endpointNotFound = Status.OK;
for (String cluster : clusters) {
Expand All @@ -229,7 +228,6 @@ private void handleEndpointResourceUpdate() {
addresses.addAll(state.result.addresses);
priorityChildConfigs.putAll(state.result.priorityChildConfigs);
priorities.addAll(state.result.priorities);
localityWeights.putAll(state.result.localityWeights);
} else {
endpointNotFound = state.status;
}
Expand Down Expand Up @@ -260,9 +258,6 @@ private void handleEndpointResourceUpdate() {
resolvedAddresses.toBuilder()
.setLoadBalancingPolicyConfig(childConfig)
.setAddresses(Collections.unmodifiableList(addresses))
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS,
Collections.unmodifiableMap(localityWeights)).build())
.build());
}

Expand Down Expand Up @@ -396,7 +391,6 @@ public void run() {
}
Map<Locality, LocalityLbEndpoints> localityLbEndpoints =
update.localityLbEndpointsMap;
Map<Locality, Integer> localityWeights = new HashMap<>();
List<DropOverload> dropOverloads = update.dropPolicies;
List<EquivalentAddressGroup> addresses = new ArrayList<>();
Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
Expand All @@ -415,6 +409,8 @@ public void run() {
Attributes attr =
endpoint.eag().getAttributes().toBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY, locality)
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT,
localityLbInfo.localityWeight())
.set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight)
.build();
EquivalentAddressGroup eag = new EquivalentAddressGroup(
Expand All @@ -429,7 +425,6 @@ public void run() {
"Discard locality {0} with 0 healthy endpoints", locality);
continue;
}
localityWeights.put(locality, localityLbInfo.localityWeight());
if (!prioritizedLocalityWeights.containsKey(priorityName)) {
prioritizedLocalityWeights.put(priorityName, new HashMap<Locality, Integer>());
}
Expand All @@ -450,7 +445,7 @@ public void run() {
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityChildConfigs,
sortedPriorityNames, localityWeights);
sortedPriorityNames);
handleEndpointResourceUpdate();
}
}
Expand Down Expand Up @@ -690,23 +685,18 @@ private static class ClusterResolutionResult {
private final Map<String, PriorityChildConfig> priorityChildConfigs;
// List of priority names ordered in descending priorities.
private final List<String> priorities;
// Most recent view on how localities in the cluster should be wighted. Only set for EDS
// clusters that support the concept.
private final Map<Locality, Integer> localityWeights;

ClusterResolutionResult(List<EquivalentAddressGroup> addresses, String priority,
PriorityChildConfig config) {
this(addresses, Collections.singletonMap(priority, config),
Collections.singletonList(priority), Collections.emptyMap());
Collections.singletonList(priority));
}

ClusterResolutionResult(List<EquivalentAddressGroup> addresses,
Map<String, PriorityChildConfig> configs, List<String> priorities,
Map<Locality, Integer> localityWeights) {
Map<String, PriorityChildConfig> configs, List<String> priorities) {
this.addresses = addresses;
this.priorityChildConfigs = configs;
this.priorities = priorities;
this.localityWeights = localityWeights;
}
}

Expand Down
5 changes: 2 additions & 3 deletions xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.grpc.internal.ObjectPool;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.internal.security.SslContextProviderSupplier;
import java.util.Map;

/**
* Internal attributes used for xDS implementation. Do not use.
Expand Down Expand Up @@ -58,8 +57,8 @@ public final class InternalXdsAttributes {
* Map from localities to their weights.
*/
@NameResolver.ResolutionResultAttr
static final Attributes.Key<Map<Locality, Integer>> ATTR_LOCALITY_WEIGHTS =
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityWeights");
static final Attributes.Key<Integer> ATTR_LOCALITY_WEIGHT =
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityWeight");

/**
* Name of the cluster that provides this EquivalentAddressGroup.
Expand Down
41 changes: 30 additions & 11 deletions xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME;

import com.google.common.base.MoreObjects;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerRegistry;
Expand Down Expand Up @@ -68,15 +70,32 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
// to produce the weighted target LB config.
WrrLocalityConfig wrrLocalityConfig
= (WrrLocalityConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
Map<Locality, Integer> localityWeights = resolvedAddresses.getAttributes()
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);

// Not having locality weights is a misconfiguration, and we have to return with an error.
if (localityWeights == null) {
Status unavailable =
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality weights provided");
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable));
return false;

// A map of locality weights is built up from the locality weight attributes in each address.
Map<Locality, Integer> localityWeights = new HashMap<>();
for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) {
Attributes eagAttrs = eag.getAttributes();
Locality locality = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY);
Integer localityWeight = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT);

if (locality == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality provided")));
return false;
}
if (localityWeight == null) {
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(
Status.UNAVAILABLE.withDescription("wrr_locality error: no locality weight provided")));
return false;
}

if (!localityWeights.containsKey(locality)) {
localityWeights.put(locality, localityWeight);
} else if (!localityWeights.get(locality).equals(localityWeight)) {
logger.log(XdsLogLevel.WARNING,
"Locality {0} has both weights {1} and {2}, using weight {1}", locality,
localityWeights.get(locality), localityWeight);
}
}

// Weighted target LB expects a WeightedPolicySelection for each locality as it will create a
Expand All @@ -88,13 +107,13 @@ public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
wrrLocalityConfig.childPolicy));
}

// Remove the locality weights attribute now that we have consumed it. This is done simply for
// Remove the locality weight attribute now that we have consumed it. This is done simply for
// ease of debugging for the unsupported (and unlikely) scenario where WrrLocalityConfig has
// another wrr_locality as the child policy. The missing locality weight attribute would make
// the child wrr_locality fail early.
resolvedAddresses = resolvedAddresses.toBuilder()
.setAttributes(resolvedAddresses.getAttributes().toBuilder()
.discard(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS).build()).build();
.discard(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT).build()).build();

switchLb.switchTo(lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME));
switchLb.handleResolvedAddresses(
Expand Down
34 changes: 22 additions & 12 deletions xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ public void edsClustersWithLeastRequestEndpointLbPolicy() {
"least_request_experimental");

assertThat(
childBalancer.attributes.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).containsEntry(
locality1, 100);
childBalancer.addresses.get(0).getAttributes()
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)).isEqualTo(100);
}

@Test
Expand Down Expand Up @@ -410,8 +410,8 @@ public void edsClustersWithOutlierDetection() {
"least_request_experimental");

assertThat(
childBalancer.attributes.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).containsEntry(
locality1, 100);
childBalancer.addresses.get(0).getAttributes()
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT)).isEqualTo(100);
}


Expand Down Expand Up @@ -507,11 +507,20 @@ public void onlyEdsClusters_receivedEndpoints() {
assertThat(wrrLocalityConfig3.childPolicy.getProvider().getPolicyName()).isEqualTo(
"round_robin");

Map<Locality, Integer> localityWeights = childBalancer.attributes.get(
InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
assertThat(localityWeights).containsEntry(locality1, 70);
assertThat(localityWeights).containsEntry(locality2, 10);
assertThat(localityWeights).containsEntry(locality3, 20);
for (EquivalentAddressGroup eag : childBalancer.addresses) {
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality1) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
.isEqualTo(70);
}
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality2) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
.isEqualTo(10);
}
if (eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY) == locality3) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT))
.isEqualTo(20);
}
}
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -682,14 +691,15 @@ public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() {
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */)),
10 /* localityWeight */, 1 /* priority */);
String priority = CLUSTER1 + "[priority1]";
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1,
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));

FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
Map<Locality, Integer> localityWeights = childBalancer.attributes.get(
InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS);
assertThat(localityWeights.keySet()).containsExactly(locality2);
for (EquivalentAddressGroup eag : childBalancer.addresses) {
assertThat(eag.getAttributes().get(InternalXdsAttributes.ATTR_LOCALITY)).isEqualTo(locality2);
}
}

@Test
Expand Down
87 changes: 67 additions & 20 deletions xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.testing.EqualsTester;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
Expand All @@ -44,7 +43,9 @@
import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -84,8 +85,6 @@ public class WrrLocalityLoadBalancerTest {
@Captor
private ArgumentCaptor<SubchannelPicker> errorPickerCaptor;

private final EquivalentAddressGroup eag = new EquivalentAddressGroup(mockSocketAddress);

private WrrLocalityLoadBalancer loadBalancer;
private LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry();

Expand Down Expand Up @@ -124,8 +123,10 @@ public void handleResolvedAddresses() {
// The child config is delivered wrapped in the wrr_locality config and the locality weights
// in a ResolvedAddresses attribute.
WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy);
Map<Locality, Integer> localityWeights = ImmutableMap.of(localityOne, 1, localityTwo, 2);
deliverAddresses(wlConfig, localityWeights);
deliverAddresses(wlConfig,
ImmutableList.of(
makeAddress("addr1", localityOne, 1),
makeAddress("addr2", localityTwo, 2)));

// Assert that the child policy and the locality weights were correctly mapped to a
// WeightedTargetConfig.
Expand All @@ -148,7 +149,8 @@ public void handleResolvedAddresses_noLocalityWeights() {
// The child config is delivered wrapped in the wrr_locality config and the locality weights
// in a ResolvedAddresses attribute.
WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy);
deliverAddresses(wlConfig, null);
deliverAddresses(wlConfig, ImmutableList.of(
makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), null)));

// With no locality weights, we should get a TRANSIENT_FAILURE.
verify(mockHelper).getAuthority();
Expand All @@ -170,8 +172,8 @@ public void handleNameResolutionError_noChildLb() {
@Test
public void handleNameResolutionError_withChildLb() {
deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)),
ImmutableMap.of(
Locality.create("region", "zone", "subzone"), 1));
ImmutableList.of(
makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1)));
loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED);

verify(mockHelper, never()).updateBalancingState(isA(ConnectivityState.class),
Expand All @@ -185,21 +187,22 @@ public void localityWeightAttributeNotPropagated() {
PolicySelection childPolicy = new PolicySelection(mockChildProvider, null);

WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy);
Map<Locality, Integer> localityWeights = ImmutableMap.of(locality, 1);
deliverAddresses(wlConfig, localityWeights);
deliverAddresses(wlConfig, ImmutableList.of(
makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1)));

// Assert that the child policy and the locality weights were correctly mapped to a
// WeightedTargetConfig.
verify(mockWeightedTargetLb).handleResolvedAddresses(resolvedAddressesCaptor.capture());
assertThat(resolvedAddressesCaptor.getValue().getAttributes()
.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).isNull();

//assertThat(resolvedAddressesCaptor.getValue().getAttributes()
// .get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS)).isNull();
}

@Test
public void shutdown() {
deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)),
ImmutableMap.of(
Locality.create("region", "zone", "subzone"), 1));
ImmutableList.of(
makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), 1)));
loadBalancer.shutdown();

verify(mockWeightedTargetLb).shutdown();
Expand All @@ -218,11 +221,55 @@ public void configEquality() {
.testEquals();
}

private void deliverAddresses(WrrLocalityConfig config, Map<Locality, Integer> localityWeights) {
private void deliverAddresses(WrrLocalityConfig config, List<EquivalentAddressGroup> addresses) {
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(ImmutableList.of(eag)).setAttributes(
Attributes.newBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS, localityWeights).build())
.setLoadBalancingPolicyConfig(config).build());
ResolvedAddresses.newBuilder().setAddresses(addresses).setLoadBalancingPolicyConfig(config)
.build());
}

/**
* Create a locality-labeled address.
*/
private static EquivalentAddressGroup makeAddress(final String name, Locality locality,
Integer localityWeight) {
class FakeSocketAddress extends SocketAddress {
private final String name;

private FakeSocketAddress(String name) {
this.name = name;
}

@Override
public int hashCode() {
return Objects.hash(name);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FakeSocketAddress)) {
return false;
}
FakeSocketAddress that = (FakeSocketAddress) o;
return Objects.equals(name, that.name);
}

@Override
public String toString() {
return name;
}
}

Attributes.Builder attrBuilder = Attributes.newBuilder()
.set(InternalXdsAttributes.ATTR_LOCALITY, locality);
if (localityWeight != null) {
attrBuilder.set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT, localityWeight);
}

EquivalentAddressGroup eag = new EquivalentAddressGroup(new FakeSocketAddress(name),
attrBuilder.build());
return AddressFilter.setPathFilter(eag, Collections.singletonList(locality.toString()));
}
}

0 comments on commit e4b7195

Please sign in to comment.