Skip to content

Commit

Permalink
xds: Disallow duplicate addresses in the RingHashLB. (#9776)
Browse files Browse the repository at this point in the history
* xds: Disallow duplicate addresses in the RingHashLB.
Removed test that was previously checking for specific expected behavior with duplicate addresses.
  • Loading branch information
larry-safran committed Jan 3, 2023
1 parent 3c5c2be commit 51ee3eb
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 40 deletions.
34 changes: 34 additions & 0 deletions xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

import com.google.common.base.MoreObjects;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.google.common.primitives.UnsignedInteger;
import io.grpc.Attributes;
Expand All @@ -38,14 +40,17 @@
import io.grpc.SynchronizationContext;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -173,6 +178,13 @@ private boolean validateAddrList(List<EquivalentAddressGroup> addrList) {
return false;
}

String dupAddrString = validateNoDuplicateAddresses(addrList);
if (dupAddrString != null) {
handleNameResolutionError(Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS "
+ "resolution was successful, but there were duplicate addresses: " + dupAddrString));
return false;
}

long totalWeight = 0;
for (EquivalentAddressGroup eag : addrList) {
Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT);
Expand Down Expand Up @@ -207,6 +219,28 @@ private boolean validateAddrList(List<EquivalentAddressGroup> addrList) {
return true;
}

@Nullable
private String validateNoDuplicateAddresses(List<EquivalentAddressGroup> addrList) {
Set<SocketAddress> addresses = new HashSet<>();
Multiset<String> dups = HashMultiset.create();
for (EquivalentAddressGroup eag : addrList) {
for (SocketAddress address : eag.getAddresses()) {
if (!addresses.add(address)) {
dups.add(address.toString());
}
}
}

if (!dups.isEmpty()) {
return dups.entrySet().stream()
.map((dup) ->
String.format("Address: %s, count: %d", dup.getElement(), dup.getCount() + 1))
.collect(Collectors.joining("; "));
}

return null;
}

private static List<RingEntry> buildRing(
Map<EquivalentAddressGroup, Long> serverWeights, long totalWeight, double scale) {
List<RingEntry> ring = new ArrayList<>();
Expand Down
70 changes: 30 additions & 40 deletions xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,7 @@ public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() {
// "[FakeSocketAddress-server2]_0"

long rpcHash = hashFunc.hashAsciiString("[FakeSocketAddress-server0]_0");
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash));
PickSubchannelArgs args = getDefaultPickSubchannelArgs(rpcHash);

// Bring down server0 to force trying server2.
deliverSubchannelState(
Expand Down Expand Up @@ -592,6 +590,12 @@ public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() {
assertThat(result.getSubchannel().getAddresses()).isEqualTo(servers.get(2));
}

private PickSubchannelArgsImpl getDefaultPickSubchannelArgs(long rpcHash) {
return new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash));
}

@Test
public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() {
// Map each server address to exactly one ring entry.
Expand Down Expand Up @@ -1039,43 +1043,6 @@ public void hostSelectionProportionalToWeights() {
assertThat(ratio12).isWithin(0.03).of((double) 10 / 100);
}

@Test
public void hostSelectionProportionalToRepeatedAddressCount() {
RingHashConfig config = new RingHashConfig(10000, 100000);
List<EquivalentAddressGroup> servers = createRepeatedServerAddrs(1, 10, 100); // 1:10:100
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
assertThat(addressesAccepted).isTrue();
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class));

// Bring all subchannels to READY.
Map<EquivalentAddressGroup, Integer> pickCounts = new HashMap<>();
for (Subchannel subchannel : subchannels.values()) {
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
pickCounts.put(subchannel.getAddresses(), 0);
}
verify(helper, times(3)).updateBalancingState(eq(READY), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();

for (int i = 0; i < 10000; i++) {
long hash = hashFunc.hashInt(i);
PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hash));
Subchannel pickedSubchannel = picker.pickSubchannel(args).getSubchannel();
EquivalentAddressGroup addr = pickedSubchannel.getAddresses();
pickCounts.put(addr, pickCounts.get(addr) + 1);
}

// Actual distribution: server0 = 104, server1 = 808, server2 = 9088
double ratio01 = (double) pickCounts.get(servers.get(0)) / pickCounts.get(servers.get(1));
double ratio12 = (double) pickCounts.get(servers.get(1)) / pickCounts.get(servers.get(11));
assertThat(ratio01).isWithin(0.03).of((double) 1 / 10);
assertThat(ratio12).isWithin(0.03).of((double) 10 / 100);
}

@Test
public void nameResolutionErrorWithNoActiveSubchannels() {
Status error = Status.UNAVAILABLE.withDescription("not reachable");
Expand Down Expand Up @@ -1112,6 +1079,29 @@ public void nameResolutionErrorWithActiveSubchannels() {
verifyNoMoreInteractions(helper);
}

@Test
public void duplicateAddresses() {
RingHashConfig config = new RingHashConfig(10, 100);
List<EquivalentAddressGroup> servers = createRepeatedServerAddrs(1, 2, 3);
boolean addressesAccepted = loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers).setLoadBalancingPolicyConfig(config).build());
assertThat(addressesAccepted).isFalse();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());

PickSubchannelArgs args = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid()));
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isFalse(); // fail the RPC
assertThat(result.getStatus().getCode())
.isEqualTo(Code.UNAVAILABLE); // with error status for the original server hit by hash
assertThat(result.getStatus().getDescription()).isEqualTo(
"Ring hash lb error: EDS resolution was successful, but there were duplicate "
+ "addresses: Address: FakeSocketAddress-server1, count: 2; "
+ "Address: FakeSocketAddress-server2, count: 3");
}

private void deliverSubchannelState(Subchannel subchannel, ConnectivityStateInfo state) {
subchannelStateListeners.get(subchannel).onSubchannelState(state);
}
Expand Down

0 comments on commit 51ee3eb

Please sign in to comment.