diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 928e8e8bf82..436eca8ec5d 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -26,6 +26,8 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import com.google.common.base.MoreObjects; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; import com.google.common.collect.Sets; import com.google.common.primitives.UnsignedInteger; import io.grpc.Attributes; @@ -38,14 +40,17 @@ import io.grpc.SynchronizationContext; import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; +import java.net.SocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -173,6 +178,13 @@ private boolean validateAddrList(List addrList) { return false; } + String dupAddrString = validateNoDuplicateAddresses(addrList); + if (dupAddrString != null) { + handleNameResolutionError(Status.UNAVAILABLE.withDescription("Ring hash lb error: EDS " + + "resolution was successful, but there were duplicate addresses: " + dupAddrString)); + return false; + } + long totalWeight = 0; for (EquivalentAddressGroup eag : addrList) { Long weight = eag.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT); @@ -207,6 +219,28 @@ private boolean validateAddrList(List addrList) { return true; } + @Nullable + private String validateNoDuplicateAddresses(List addrList) { + Set addresses = new HashSet<>(); + Multiset dups = HashMultiset.create(); + for (EquivalentAddressGroup eag : addrList) { + for (SocketAddress address : eag.getAddresses()) { + if (!addresses.add(address)) { + dups.add(address.toString()); + } + } + } + + if (!dups.isEmpty()) { + return dups.entrySet().stream() + .map((dup) -> + String.format("Address: %s, count: %d", dup.getElement(), dup.getCount() + 1)) + .collect(Collectors.joining("; ")); + } + + return null; + } + private static List buildRing( Map serverWeights, long totalWeight, double scale) { List ring = new ArrayList<>(); diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index a4435625b0d..64651eb6f3d 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -552,9 +552,7 @@ public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() { // "[FakeSocketAddress-server2]_0" long rpcHash = hashFunc.hashAsciiString("[FakeSocketAddress-server0]_0"); - PickSubchannelArgs args = new PickSubchannelArgsImpl( - TestMethodDescriptors.voidMethod(), new Metadata(), - CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash)); + PickSubchannelArgs args = getDefaultPickSubchannelArgs(rpcHash); // Bring down server0 to force trying server2. deliverSubchannelState( @@ -592,6 +590,12 @@ public void skipFailingHosts_pickNextNonFailingHostInFirstTwoHosts() { assertThat(result.getSubchannel().getAddresses()).isEqualTo(servers.get(2)); } + private PickSubchannelArgsImpl getDefaultPickSubchannelArgs(long rpcHash) { + return new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash)); + } + @Test public void skipFailingHosts_firstTwoHostsFailed_pickNextFirstReady() { // Map each server address to exactly one ring entry. @@ -1039,43 +1043,6 @@ public void hostSelectionProportionalToWeights() { assertThat(ratio12).isWithin(0.03).of((double) 10 / 100); } - @Test - public void hostSelectionProportionalToRepeatedAddressCount() { - RingHashConfig config = new RingHashConfig(10000, 100000); - List servers = createRepeatedServerAddrs(1, 10, 100); // 1:10:100 - boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( - ResolvedAddresses.newBuilder() - .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); - assertThat(addressesAccepted).isTrue(); - verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class)); - verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); - - // Bring all subchannels to READY. - Map pickCounts = new HashMap<>(); - for (Subchannel subchannel : subchannels.values()) { - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); - pickCounts.put(subchannel.getAddresses(), 0); - } - verify(helper, times(3)).updateBalancingState(eq(READY), pickerCaptor.capture()); - SubchannelPicker picker = pickerCaptor.getValue(); - - for (int i = 0; i < 10000; i++) { - long hash = hashFunc.hashInt(i); - PickSubchannelArgs args = new PickSubchannelArgsImpl( - TestMethodDescriptors.voidMethod(), new Metadata(), - CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hash)); - Subchannel pickedSubchannel = picker.pickSubchannel(args).getSubchannel(); - EquivalentAddressGroup addr = pickedSubchannel.getAddresses(); - pickCounts.put(addr, pickCounts.get(addr) + 1); - } - - // Actual distribution: server0 = 104, server1 = 808, server2 = 9088 - double ratio01 = (double) pickCounts.get(servers.get(0)) / pickCounts.get(servers.get(1)); - double ratio12 = (double) pickCounts.get(servers.get(1)) / pickCounts.get(servers.get(11)); - assertThat(ratio01).isWithin(0.03).of((double) 1 / 10); - assertThat(ratio12).isWithin(0.03).of((double) 10 / 100); - } - @Test public void nameResolutionErrorWithNoActiveSubchannels() { Status error = Status.UNAVAILABLE.withDescription("not reachable"); @@ -1112,6 +1079,29 @@ public void nameResolutionErrorWithActiveSubchannels() { verifyNoMoreInteractions(helper); } + @Test + public void duplicateAddresses() { + RingHashConfig config = new RingHashConfig(10, 100); + List servers = createRepeatedServerAddrs(1, 2, 3); + boolean addressesAccepted = loadBalancer.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(servers).setLoadBalancingPolicyConfig(config).build()); + assertThat(addressesAccepted).isFalse(); + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + + PickSubchannelArgs args = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), + CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, hashFunc.hashVoid())); + PickResult result = pickerCaptor.getValue().pickSubchannel(args); + assertThat(result.getStatus().isOk()).isFalse(); // fail the RPC + assertThat(result.getStatus().getCode()) + .isEqualTo(Code.UNAVAILABLE); // with error status for the original server hit by hash + assertThat(result.getStatus().getDescription()).isEqualTo( + "Ring hash lb error: EDS resolution was successful, but there were duplicate " + + "addresses: Address: FakeSocketAddress-server1, count: 2; " + + "Address: FakeSocketAddress-server2, count: 3"); + } + private void deliverSubchannelState(Subchannel subchannel, ConnectivityStateInfo state) { subchannelStateListeners.get(subchannel).onSubchannelState(state); }