From c8a6483eb1613afbc1c1dbb1236c97094233c3a7 Mon Sep 17 00:00:00 2001 From: nickhill Date: Sat, 30 Jun 2018 17:50:08 -0700 Subject: [PATCH 1/8] Remove redundant SubchannelPicker refreshes in RoundRobinLoadBalancer - Ensure active subchannel list and round-robin index is only regenerated/refreshed when it changes - Make it so that Subchannels exist in subchannels map iff their state != SHUTDOWN - Add EmptyPicker class since logic for this case is disjoint from the non-empty case --- .../util/RoundRobinLoadBalancerFactory.java | 110 ++++++++++++------ .../grpc/util/RoundRobinLoadBalancerTest.java | 60 +++++----- 2 files changed, 105 insertions(+), 65 deletions(-) diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java index a8b03315fed..8de490a7867 100644 --- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java @@ -42,6 +42,7 @@ import io.grpc.internal.ServiceConfigUtil; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -110,6 +111,8 @@ static final class RoundRobinLoadBalancer extends LoadBalancer { private final Helper helper; private final Map subchannels = new HashMap(); + // true when map contains at least one Subchannel in READY state + private boolean ready = false; @Nullable private StickinessState stickinessState; @@ -126,6 +129,7 @@ public void handleResolvedAddressGroups( Set addedAddrs = setsDifference(latestAddrs, currentAddrs); Set removedAddrs = setsDifference(currentAddrs, latestAddrs); + boolean update = !ready; Map serviceConfig = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); if (serviceConfig != null) { @@ -140,6 +144,7 @@ public void handleResolvedAddressGroups( } else if (stickinessState == null || !stickinessState.key.name().equals(stickinessMetadataKey)) { stickinessState = new StickinessState(stickinessMetadataKey); + update = true; } } } @@ -173,10 +178,13 @@ public void handleResolvedAddressGroups( // Shutdown subchannels for removed addresses. for (EquivalentAddressGroup addressGroup : removedAddrs) { Subchannel subchannel = subchannels.remove(addressGroup); - subchannel.shutdown(); + update = update || isReady(subchannel); // no need to update if channel was already excluded + shutdownSubchannel(subchannel); } - updateBalancingState(getAggregatedState(), getAggregatedError()); + if (update) { + updateBalancingState(null, null); + } } @Override @@ -186,23 +194,40 @@ public void handleNameResolutionError(Status error) { @Override public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { - if (stateInfo.getState() == SHUTDOWN && stickinessState != null) { + Ref stateInfoRef = getSubchannelStateInfoRef(subchannel); + ConnectivityState stateBefore = stateInfoRef.value.getState(); + if (stateBefore == SHUTDOWN) { + return; + } + ConnectivityState newState = stateInfo.getState(); + if (newState == SHUTDOWN && stickinessState != null) { stickinessState.remove(subchannel); } if (subchannels.get(subchannel.getAddresses()) != subchannel) { return; } - if (stateInfo.getState() == IDLE) { + if (newState == IDLE) { subchannel.requestConnection(); } - getSubchannelStateInfoRef(subchannel).value = stateInfo; - updateBalancingState(getAggregatedState(), getAggregatedError()); + stateInfoRef.value = stateInfo; + if (!ready || (newState == READY ^ stateBefore == READY)) { + updateBalancingState(null, null); + } + } + + private void shutdownSubchannel(Subchannel subchannel) { + subchannel.shutdown(); + getSubchannelStateInfoRef(subchannel).value = + ConnectivityStateInfo.forNonError(SHUTDOWN); + if (stickinessState != null) { + stickinessState.remove(subchannel); + } } @Override public void shutdown() { for (Subchannel subchannel : getSubchannels()) { - subchannel.shutdown(); + shutdownSubchannel(subchannel); } } @@ -211,7 +236,13 @@ public void shutdown() { */ private void updateBalancingState(ConnectivityState state, Status error) { List activeList = filterNonFailingSubchannels(getSubchannels()); - helper.updateBalancingState(state, new Picker(activeList, error, stickinessState)); + ready = !activeList.isEmpty(); + if (state == null) { + state = ready ? READY : getAggregatedState(); + } + SubchannelPicker picker = ready ? new Picker(activeList, stickinessState) : + new EmptyPicker(error != null ? error : getAggregatedError()); + helper.updateBalancingState(state, picker); } /** @@ -219,13 +250,16 @@ private void updateBalancingState(ConnectivityState state, Status error) { */ private static List filterNonFailingSubchannels( Collection subchannels) { - List readySubchannels = new ArrayList(subchannels.size()); + List readySubchannels = null; for (Subchannel subchannel : subchannels) { - if (getSubchannelStateInfoRef(subchannel).value.getState() == READY) { + if (isReady(subchannel)) { + if (readySubchannels == null) { + readySubchannels = new ArrayList(subchannels.size()); + } readySubchannels.add(subchannel); } } - return readySubchannels; + return readySubchannels != null ? readySubchannels : Collections.emptyList(); } /** @@ -286,6 +320,11 @@ private static Ref getSubchannelStateInfoRef( Subchannel subchannel) { return checkNotNull(subchannel.getAttributes().get(STATE_INFO), "STATE_INFO"); } + + // package-private to avoid synthetic access + static boolean isReady(Subchannel subchannel) { + return getSubchannelStateInfoRef(subchannel).value.getState() == READY; + } private static Set setsDifference(Set a, Set b) { Set aCopy = new HashSet(a); @@ -324,7 +363,7 @@ private static final class StickinessState { */ @Nonnull Subchannel maybeRegister( - String stickinessValue, @Nonnull Subchannel subchannel, List rrList) { + String stickinessValue, @Nonnull Subchannel subchannel) { final Ref newSubchannelRef = subchannel.getAttributes().get(STICKY_REF); while (true) { Ref existingSubchannelRef = @@ -336,7 +375,7 @@ Subchannel maybeRegister( } else { // existing entry Subchannel existingSubchannel = existingSubchannelRef.value; - if (existingSubchannel != null && rrList.contains(existingSubchannel)) { + if (existingSubchannel != null && isReady(existingSubchannel)) { return existingSubchannel; } } @@ -382,44 +421,32 @@ static final class Picker extends SubchannelPicker { private static final AtomicIntegerFieldUpdater indexUpdater = AtomicIntegerFieldUpdater.newUpdater(Picker.class, "index"); - @Nullable - private final Status status; private final List list; @Nullable private final RoundRobinLoadBalancer.StickinessState stickinessState; @SuppressWarnings("unused") private volatile int index = -1; // start off at -1 so the address on first use is 0. - Picker( - List list, @Nullable Status status, + Picker(List list, @Nullable RoundRobinLoadBalancer.StickinessState stickinessState) { this.list = list; - this.status = status; this.stickinessState = stickinessState; } @Override public PickResult pickSubchannel(PickSubchannelArgs args) { - if (list.size() > 0) { - Subchannel subchannel = null; - if (stickinessState != null) { - String stickinessValue = args.getHeaders().get(stickinessState.key); - if (stickinessValue != null) { - subchannel = stickinessState.getSubchannel(stickinessValue); - if (subchannel == null || !list.contains(subchannel)) { - subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel(), list); - } + Subchannel subchannel = null; + if (stickinessState != null) { + String stickinessValue = args.getHeaders().get(stickinessState.key); + if (stickinessValue != null) { + subchannel = stickinessState.getSubchannel(stickinessValue); + if (subchannel == null || !RoundRobinLoadBalancer.isReady(subchannel)) { + subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel()); } } - - return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel()); - } - - if (status != null) { - return PickResult.withError(status); } - return PickResult.withNoResult(); + return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel()); } private Subchannel nextSubchannel() { @@ -441,10 +468,19 @@ private Subchannel nextSubchannel() { List getList() { return list; } + } - @VisibleForTesting - Status getStatus() { - return status; + static final class EmptyPicker extends SubchannelPicker { + @Nullable + private final Status status; + + EmptyPicker(@Nullable Status status) { + this.status = status; + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return status != null ? PickResult.withError(status) : PickResult.withNoResult(); } } } diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 257cdef9e53..6e8c094aa7b 100644 --- a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -51,10 +51,12 @@ import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; import io.grpc.Metadata; import io.grpc.Metadata.Key; import io.grpc.Status; import io.grpc.internal.GrpcAttributes; +import io.grpc.util.RoundRobinLoadBalancerFactory.EmptyPicker; import io.grpc.util.RoundRobinLoadBalancerFactory.Picker; import io.grpc.util.RoundRobinLoadBalancerFactory.Ref; import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer; @@ -87,7 +89,7 @@ public class RoundRobinLoadBalancerTest { private Attributes affinity = Attributes.newBuilder().set(MAJOR_KEY, "I got the keys").build(); @Captor - private ArgumentCaptor pickerCaptor; + private ArgumentCaptor pickerCaptor; @Captor private ArgumentCaptor stateCaptor; @Captor @@ -151,7 +153,7 @@ public void pickAfterResolved() throws Exception { assertEquals(CONNECTING, stateCaptor.getAllValues().get(0)); assertEquals(READY, stateCaptor.getAllValues().get(1)); - assertThat(pickerCaptor.getValue().getList()).containsExactly(readySubchannel); + assertThat(getList(pickerCaptor.getValue())).containsExactly(readySubchannel); verifyNoMoreInteractions(mockHelper); } @@ -195,9 +197,8 @@ public Subchannel answer(InvocationOnMock invocation) throws Throwable { InOrder inOrder = inOrder(mockHelper); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); - assertNull(picker.getStatus()); - assertThat(picker.getList()).containsExactly(removedSubchannel, oldSubchannel); + SubchannelPicker picker = pickerCaptor.getValue(); + assertThat(getList(picker)).containsExactly(removedSubchannel, oldSubchannel); verify(removedSubchannel, times(1)).requestConnection(); verify(oldSubchannel, times(1)).requestConnection(); @@ -227,8 +228,7 @@ public Subchannel answer(InvocationOnMock invocation) throws Throwable { inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); picker = pickerCaptor.getValue(); - assertNull(picker.getStatus()); - assertThat(picker.getList()).containsExactly(oldSubchannel, newSubchannel); + assertThat(getList(picker)).containsExactly(oldSubchannel, newSubchannel); verifyNoMoreInteractions(mockHelper); } @@ -241,13 +241,13 @@ public void pickAfterStateChange() throws Exception { Ref subchannelStateInfo = subchannel.getAttributes().get( STATE_INFO); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(Picker.class)); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class)); assertThat(subchannelStateInfo.value).isEqualTo(ConnectivityStateInfo.forNonError(IDLE)); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); - assertNull(pickerCaptor.getValue().getStatus()); + assertThat(pickerCaptor.getValue()).isInstanceOf(Picker.class); assertThat(subchannelStateInfo.value).isEqualTo( ConnectivityStateInfo.forNonError(READY)); @@ -257,12 +257,12 @@ public void pickAfterStateChange() throws Exception { assertThat(subchannelStateInfo.value).isEqualTo( ConnectivityStateInfo.forTransientFailure(error)); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - assertNull(pickerCaptor.getValue().getStatus()); + assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - assertNull(pickerCaptor.getValue().getStatus()); + assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class); assertThat(subchannelStateInfo.value).isEqualTo( ConnectivityStateInfo.forNonError(IDLE)); @@ -279,7 +279,7 @@ public void pickerRoundRobin() throws Exception { Subchannel subchannel2 = mock(Subchannel.class); Picker picker = new Picker(Collections.unmodifiableList(Lists.newArrayList( - subchannel, subchannel1, subchannel2)), null /* status */, null /* stickinessState */); + subchannel, subchannel1, subchannel2)), null /* stickinessState */); assertThat(picker.getList()).containsExactly(subchannel, subchannel1, subchannel2); @@ -291,8 +291,7 @@ public void pickerRoundRobin() throws Exception { @Test public void pickerEmptyList() throws Exception { - Picker picker = - new Picker(Lists.newArrayList(), Status.UNKNOWN, null /* stickinessState */); + SubchannelPicker picker = new EmptyPicker(Status.UNKNOWN); assertEquals(null, picker.pickSubchannel(mockArgs).getSubchannel()); assertEquals(Status.UNKNOWN, @@ -358,23 +357,23 @@ public void subchannelStateIsolation() throws Exception { verify(mockHelper, times(6)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); Iterator stateIterator = stateCaptor.getAllValues().iterator(); - Iterator pickers = pickerCaptor.getAllValues().iterator(); + Iterator pickers = pickerCaptor.getAllValues().iterator(); // The picker is incrementally updated as subchannels become READY assertEquals(CONNECTING, stateIterator.next()); - assertThat(pickers.next().getList()).isEmpty(); + assertThat(pickers.next()).isInstanceOf(EmptyPicker.class); assertEquals(READY, stateIterator.next()); - assertThat(pickers.next().getList()).containsExactly(sc1); + assertThat(getList(pickers.next())).containsExactly(sc1); assertEquals(READY, stateIterator.next()); - assertThat(pickers.next().getList()).containsExactly(sc1, sc2); + assertThat(getList(pickers.next())).containsExactly(sc1, sc2); assertEquals(READY, stateIterator.next()); - assertThat(pickers.next().getList()).containsExactly(sc1, sc2, sc3); + assertThat(getList(pickers.next())).containsExactly(sc1, sc2, sc3); // The IDLE subchannel is dropped from the picker, but a reconnection is requested assertEquals(READY, stateIterator.next()); - assertThat(pickers.next().getList()).containsExactly(sc1, sc3); + assertThat(getList(pickers.next())).containsExactly(sc1, sc3); verify(sc2, times(2)).requestConnection(); // The failing subchannel is dropped from the picker, with no requested reconnect assertEquals(READY, stateIterator.next()); - assertThat(pickers.next().getList()).containsExactly(sc1); + assertThat(getList(pickers.next())).containsExactly(sc1); verify(sc3, times(1)).requestConnection(); assertThat(stateIterator.hasNext()).isFalse(); assertThat(pickers.hasNext()).isFalse(); @@ -388,7 +387,7 @@ public void noStickinessEnabled_withStickyHeader() { } verify(mockHelper, times(4)) .updateBalancingState(any(ConnectivityState.class), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue = new Metadata(); @@ -419,7 +418,7 @@ public void stickinessEnabled_withoutStickyHeader() { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); doReturn(new Metadata()).when(mockArgs).getHeaders(); @@ -449,7 +448,7 @@ public void stickinessEnabled_withStickyHeader() { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue = new Metadata(); @@ -479,7 +478,7 @@ public void stickinessEnabled_withDifferentStickyHeaders() { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue1 = new Metadata(); @@ -521,7 +520,7 @@ public void stickiness_goToTransientFailure_pick_backToReady() { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue = new Metadata(); @@ -571,7 +570,7 @@ public void stickiness_goToTransientFailure_backToReady_pick() { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue1 = new Metadata(); @@ -625,7 +624,7 @@ public void stickiness_oneSubchannelShutdown() { } verify(mockHelper, times(4)) .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); - Picker picker = pickerCaptor.getValue(); + SubchannelPicker picker = pickerCaptor.getValue(); Key stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER); Metadata headerWithStickinessValue = new Metadata(); @@ -681,6 +680,11 @@ public void stickiness_resolveTwice_metadataKeyUnChanged() { assertSame(stickinessMap1, stickinessMap2); } + + private static List getList(SubchannelPicker picker) { + return picker instanceof Picker ? ((Picker) picker).getList() : + Collections.emptyList(); + } private static class FakeSocketAddress extends SocketAddress { final String name; From 1c9c0f6732928ea6d463fc50a58bc295bb7eefc1 Mon Sep 17 00:00:00 2001 From: nickhill Date: Fri, 13 Jul 2018 15:51:43 -0700 Subject: [PATCH 2/8] remove explicit initialization of boolean ready field per @carl-mastrangelo's review comment --- .../main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java index 8de490a7867..173c5c92b27 100644 --- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java @@ -112,7 +112,7 @@ static final class RoundRobinLoadBalancer extends LoadBalancer { private final Map subchannels = new HashMap(); // true when map contains at least one Subchannel in READY state - private boolean ready = false; + private boolean ready; @Nullable private StickinessState stickinessState; From a693c302e7ddb5bdd8f3a2fdebadc9b15fd96328 Mon Sep 17 00:00:00 2001 From: nickhill Date: Sun, 26 Aug 2018 08:14:36 -0700 Subject: [PATCH 3/8] minor restructuring to make logic clearer; more explanatory comments --- .../util/RoundRobinLoadBalancerFactory.java | 65 ++++++++++--------- .../grpc/util/RoundRobinLoadBalancerTest.java | 11 ++-- 2 files changed, 41 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java index db8da9423b9..60932cafefd 100644 --- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java @@ -42,7 +42,6 @@ import io.grpc.internal.ServiceConfigUtil; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -113,8 +112,7 @@ static final class RoundRobinLoadBalancer extends LoadBalancer { private final Map subchannels = new HashMap(); private final Random random; - // true when map contains at least one Subchannel in READY state - private boolean ready; + private SubchannelPicker currentPicker = new EmptyPicker(null); @Nullable private StickinessState stickinessState; @@ -132,7 +130,7 @@ public void handleResolvedAddressGroups( Set addedAddrs = setsDifference(latestAddrs, currentAddrs); Set removedAddrs = setsDifference(currentAddrs, latestAddrs); - boolean update = !ready; + boolean stickinessStateChanged = false; Map serviceConfig = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); if (serviceConfig != null) { @@ -147,7 +145,7 @@ public void handleResolvedAddressGroups( } else if (stickinessState == null || !stickinessState.key.name().equals(stickinessMetadataKey)) { stickinessState = new StickinessState(stickinessMetadataKey); - update = true; + stickinessStateChanged = true; } } } @@ -177,22 +175,31 @@ public void handleResolvedAddressGroups( subchannels.put(addressGroup, subchannel); subchannel.requestConnection(); } + + // Note that the active list will never change if there are only subchannel additions, + // since they are added in IDLE (non-READY) state. + boolean activeListChanged = false; // Shutdown subchannels for removed addresses. for (EquivalentAddressGroup addressGroup : removedAddrs) { Subchannel subchannel = subchannels.remove(addressGroup); - update = update || isReady(subchannel); // no need to update if channel was already excluded + // Active list only changes if we remove a subchannel in READY state + activeListChanged = activeListChanged || isReady(subchannel); shutdownSubchannel(subchannel); } - if (update) { - updateBalancingState(null, null); + // Only refresh an active picker if either the active list or stickiness state changed. + if (!(currentPicker instanceof ReadyPicker) || activeListChanged || stickinessStateChanged) { + updateBalancingState(); } } @Override public void handleNameResolutionError(Status error) { - updateBalancingState(TRANSIENT_FAILURE, error); + if (!(currentPicker instanceof ReadyPicker)) { + currentPicker = new EmptyPicker(error); + } + helper.updateBalancingState(TRANSIENT_FAILURE, currentPicker); } @Override @@ -200,6 +207,8 @@ public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo s Ref stateInfoRef = getSubchannelStateInfoRef(subchannel); ConnectivityState stateBefore = stateInfoRef.value.getState(); if (stateBefore == SHUTDOWN) { + // This is the case the shutdown was triggered by a name resolver removal, the channel + // shutdown state change logic was already triggered in handleResolvedAddressGroups(). return; } ConnectivityState newState = stateInfo.getState(); @@ -213,8 +222,10 @@ public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo s subchannel.requestConnection(); } stateInfoRef.value = stateInfo; - if (!ready || (newState == READY ^ stateBefore == READY)) { - updateBalancingState(null, null); + // The active list only changes if this channel is moving between READY and non-READY. + // Otherwise, we don't need to refresh the picker if it's already active. + if (!(currentPicker instanceof ReadyPicker) || (newState == READY ^ stateBefore == READY)) { + updateBalancingState(); } } @@ -237,21 +248,18 @@ public void shutdown() { /** * Updates picker with the list of active subchannels (state == READY). */ - private void updateBalancingState(ConnectivityState state, Status error) { + private void updateBalancingState() { List activeList = filterNonFailingSubchannels(getSubchannels()); - ready = !activeList.isEmpty(); - SubchannelPicker picker; - if (ready) { - state = state != null ? state : READY; + if (activeList.isEmpty()) { + currentPicker = new EmptyPicker(getAggregatedError()); + helper.updateBalancingState(getAggregatedState(), currentPicker); + } else { // initialize the Picker to a random start index to ensure that a high frequency of Picker // churn does not skew subchannel selection. int startIndex = random.nextInt(activeList.size()); - picker = new Picker(activeList, startIndex, stickinessState); - } else { - state = state != null ? state : getAggregatedState(); - picker = new EmptyPicker(error != null ? error : getAggregatedError()); + currentPicker = new ReadyPicker(activeList, startIndex, stickinessState); + helper.updateBalancingState(READY, currentPicker); } - helper.updateBalancingState(state, picker); } /** @@ -259,16 +267,13 @@ private void updateBalancingState(ConnectivityState state, Status error) { */ private static List filterNonFailingSubchannels( Collection subchannels) { - List readySubchannels = null; + List readySubchannels = new ArrayList(subchannels.size()); for (Subchannel subchannel : subchannels) { if (isReady(subchannel)) { - if (readySubchannels == null) { - readySubchannels = new ArrayList(subchannels.size()); - } readySubchannels.add(subchannel); } } - return readySubchannels != null ? readySubchannels : Collections.emptyList(); + return readySubchannels; } /** @@ -426,9 +431,9 @@ Subchannel getSubchannel(String stickinessValue) { } @VisibleForTesting - static final class Picker extends SubchannelPicker { - private static final AtomicIntegerFieldUpdater indexUpdater = - AtomicIntegerFieldUpdater.newUpdater(Picker.class, "index"); + static final class ReadyPicker extends SubchannelPicker { + private static final AtomicIntegerFieldUpdater indexUpdater = + AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index"); private final List list; @Nullable @@ -436,7 +441,7 @@ static final class Picker extends SubchannelPicker { @SuppressWarnings("unused") private volatile int index; - Picker(List list, int startIndex, + ReadyPicker(List list, int startIndex, @Nullable RoundRobinLoadBalancer.StickinessState stickinessState) { this.list = list; this.stickinessState = stickinessState; diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index a330e752a24..8b5ae7699ad 100644 --- a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -57,7 +57,7 @@ import io.grpc.Status; import io.grpc.internal.GrpcAttributes; import io.grpc.util.RoundRobinLoadBalancerFactory.EmptyPicker; -import io.grpc.util.RoundRobinLoadBalancerFactory.Picker; +import io.grpc.util.RoundRobinLoadBalancerFactory.ReadyPicker; import io.grpc.util.RoundRobinLoadBalancerFactory.Ref; import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer; import java.net.SocketAddress; @@ -247,7 +247,7 @@ public void pickAfterStateChange() throws Exception { loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); - assertThat(pickerCaptor.getValue()).isInstanceOf(Picker.class); + assertThat(pickerCaptor.getValue()).isInstanceOf(ReadyPicker.class); assertThat(subchannelStateInfo.value).isEqualTo( ConnectivityStateInfo.forNonError(READY)); @@ -282,8 +282,9 @@ public void pickerRoundRobin() throws Exception { Subchannel subchannel1 = mock(Subchannel.class); Subchannel subchannel2 = mock(Subchannel.class); - Picker picker = new Picker(Collections.unmodifiableList(Lists.newArrayList( - subchannel, subchannel1, subchannel2)), 0 /* startIndex */, null /* stickinessState */); + ReadyPicker picker = new ReadyPicker(Collections.unmodifiableList( + Lists.newArrayList(subchannel, subchannel1, subchannel2)), + 0 /* startIndex */, null /* stickinessState */); assertThat(picker.getList()).containsExactly(subchannel, subchannel1, subchannel2); @@ -689,7 +690,7 @@ public void stickiness_resolveTwice_metadataKeyUnChanged() { } private static List getList(SubchannelPicker picker) { - return picker instanceof Picker ? ((Picker) picker).getList() : + return picker instanceof ReadyPicker ? ((ReadyPicker) picker).getList() : Collections.emptyList(); } From 3cbb05d1a6b93f8fb6bd2fbcf9b5735834844bab Mon Sep 17 00:00:00 2001 From: nickhill Date: Sun, 26 Aug 2018 12:12:55 -0700 Subject: [PATCH 4/8] move some checks inside updateBalancingState method for clarity --- .../util/RoundRobinLoadBalancerFactory.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java index 60932cafefd..00ddfbd01ae 100644 --- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java @@ -176,7 +176,7 @@ public void handleResolvedAddressGroups( subchannel.requestConnection(); } - // Note that the active list will never change if there are only subchannel additions, + // Note that the active list is not changed by subchannel additions, // since they are added in IDLE (non-READY) state. boolean activeListChanged = false; @@ -188,10 +188,7 @@ public void handleResolvedAddressGroups( shutdownSubchannel(subchannel); } - // Only refresh an active picker if either the active list or stickiness state changed. - if (!(currentPicker instanceof ReadyPicker) || activeListChanged || stickinessStateChanged) { - updateBalancingState(); - } + updateBalancingState(activeListChanged || stickinessStateChanged); } @Override @@ -222,11 +219,8 @@ public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo s subchannel.requestConnection(); } stateInfoRef.value = stateInfo; - // The active list only changes if this channel is moving between READY and non-READY. - // Otherwise, we don't need to refresh the picker if it's already active. - if (!(currentPicker instanceof ReadyPicker) || (newState == READY ^ stateBefore == READY)) { - updateBalancingState(); - } + // The active list only changes if this channel is moving between READY and non-READY + updateBalancingState(newState == READY ^ stateBefore == READY); } private void shutdownSubchannel(Subchannel subchannel) { @@ -248,9 +242,14 @@ public void shutdown() { /** * Updates picker with the list of active subchannels (state == READY). */ - private void updateBalancingState() { + private void updateBalancingState(boolean activeListOrStickinessStateChanged) { + if (!activeListOrStickinessStateChanged && currentPicker instanceof ReadyPicker) { + // no refresh needed if there's an active picker with no change to its list + return; + } List activeList = filterNonFailingSubchannels(getSubchannels()); if (activeList.isEmpty()) { + // empty picker returns error or no result currentPicker = new EmptyPicker(getAggregatedError()); helper.updateBalancingState(getAggregatedState(), currentPicker); } else { From 7d9bde03c85ed57296ea6a1bdaa81615078462c2 Mon Sep 17 00:00:00 2001 From: nickhill Date: Thu, 30 Aug 2018 16:04:36 -0700 Subject: [PATCH 5/8] store current state and picker in RRLB, only update when new one is diff --- .../util/RoundRobinLoadBalancerFactory.java | 72 +++++++++++-------- .../grpc/util/RoundRobinLoadBalancerTest.java | 2 - 2 files changed, 43 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java index 00ddfbd01ae..eb444848a79 100644 --- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java @@ -24,6 +24,8 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; + import io.grpc.Attributes; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; @@ -112,6 +114,8 @@ static final class RoundRobinLoadBalancer extends LoadBalancer { private final Map subchannels = new HashMap(); private final Random random; + + private ConnectivityState currentState; private SubchannelPicker currentPicker = new EmptyPicker(null); @Nullable @@ -130,7 +134,6 @@ public void handleResolvedAddressGroups( Set addedAddrs = setsDifference(latestAddrs, currentAddrs); Set removedAddrs = setsDifference(currentAddrs, latestAddrs); - boolean stickinessStateChanged = false; Map serviceConfig = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); if (serviceConfig != null) { @@ -145,7 +148,6 @@ public void handleResolvedAddressGroups( } else if (stickinessState == null || !stickinessState.key.name().equals(stickinessMetadataKey)) { stickinessState = new StickinessState(stickinessMetadataKey); - stickinessStateChanged = true; } } } @@ -175,52 +177,42 @@ public void handleResolvedAddressGroups( subchannels.put(addressGroup, subchannel); subchannel.requestConnection(); } - - // Note that the active list is not changed by subchannel additions, - // since they are added in IDLE (non-READY) state. - boolean activeListChanged = false; // Shutdown subchannels for removed addresses. for (EquivalentAddressGroup addressGroup : removedAddrs) { Subchannel subchannel = subchannels.remove(addressGroup); - // Active list only changes if we remove a subchannel in READY state - activeListChanged = activeListChanged || isReady(subchannel); shutdownSubchannel(subchannel); } - updateBalancingState(activeListChanged || stickinessStateChanged); + updateBalancingState(); } @Override public void handleNameResolutionError(Status error) { - if (!(currentPicker instanceof ReadyPicker)) { - currentPicker = new EmptyPicker(error); - } - helper.updateBalancingState(TRANSIENT_FAILURE, currentPicker); + // ready pickers aren't affected by status changes + updateBalancingState(TRANSIENT_FAILURE, + currentPicker instanceof ReadyPicker ? currentPicker : new EmptyPicker(error)); } @Override public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { Ref stateInfoRef = getSubchannelStateInfoRef(subchannel); - ConnectivityState stateBefore = stateInfoRef.value.getState(); - if (stateBefore == SHUTDOWN) { + if (stateInfoRef.value.getState() == SHUTDOWN) { // This is the case the shutdown was triggered by a name resolver removal, the channel // shutdown state change logic was already triggered in handleResolvedAddressGroups(). return; } - ConnectivityState newState = stateInfo.getState(); - if (newState == SHUTDOWN && stickinessState != null) { + if (stateInfo.getState() == SHUTDOWN && stickinessState != null) { stickinessState.remove(subchannel); } if (subchannels.get(subchannel.getAddresses()) != subchannel) { return; } - if (newState == IDLE) { + if (stateInfo.getState() == IDLE) { subchannel.requestConnection(); } stateInfoRef.value = stateInfo; - // The active list only changes if this channel is moving between READY and non-READY - updateBalancingState(newState == READY ^ stateBefore == READY); + updateBalancingState(); } private void shutdownSubchannel(Subchannel subchannel) { @@ -242,23 +234,45 @@ public void shutdown() { /** * Updates picker with the list of active subchannels (state == READY). */ - private void updateBalancingState(boolean activeListOrStickinessStateChanged) { - if (!activeListOrStickinessStateChanged && currentPicker instanceof ReadyPicker) { - // no refresh needed if there's an active picker with no change to its list - return; - } + private void updateBalancingState() { List activeList = filterNonFailingSubchannels(getSubchannels()); if (activeList.isEmpty()) { // empty picker returns error or no result - currentPicker = new EmptyPicker(getAggregatedError()); - helper.updateBalancingState(getAggregatedState(), currentPicker); + updateBalancingState(getAggregatedState(), new EmptyPicker(getAggregatedError())); } else { // initialize the Picker to a random start index to ensure that a high frequency of Picker // churn does not skew subchannel selection. int startIndex = random.nextInt(activeList.size()); - currentPicker = new ReadyPicker(activeList, startIndex, stickinessState); - helper.updateBalancingState(READY, currentPicker); + updateBalancingState(READY, new ReadyPicker(activeList, startIndex, stickinessState)); + } + } + + private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) { + if (state != currentState || !areEquivalentPickers(picker, currentPicker)) { + helper.updateBalancingState(state, picker); + currentState = state; + currentPicker = picker; + } + } + + private static boolean areEquivalentPickers(SubchannelPicker p1, SubchannelPicker p2) { + if (p1 == p2) { + return true; + } + if (p1.getClass() != p2.getClass()) { + return false; + } + if (p1 instanceof EmptyPicker) { + return Objects.equal(((EmptyPicker)p1).status, ((EmptyPicker)p2).status); + } + if (p1 instanceof ReadyPicker) { + ReadyPicker rp1 = (ReadyPicker) p1; + ReadyPicker rp2 = (ReadyPicker) p2; + // the lists cannot contain duplicate subchannels + return rp1.stickinessState == rp2.stickinessState && rp1.list.size() == rp2.list.size() + && new HashSet(rp1.list).containsAll(rp2.list); } + return false; } /** diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 8b5ae7699ad..133f32b9a27 100644 --- a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -261,8 +261,6 @@ public void pickAfterStateChange() throws Exception { loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); - inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class); assertThat(subchannelStateInfo.value).isEqualTo( ConnectivityStateInfo.forNonError(IDLE)); From b1a793597b4c454f7bf4f8e727bb73cbb4a44d44 Mon Sep 17 00:00:00 2001 From: nickhill Date: Sat, 1 Sep 2018 13:01:36 -0700 Subject: [PATCH 6/8] some more simplification/refactoring; improve test coverage - remove now redundant check in handleSubchannelState - collapse getAggregatedState() and getAggregatedError() into handleBalancingState() - have both pickers extend new RoundRobinPicker, move areEquivalentPickers() logic into RoundRobinPicker.isEquivalentTo() - extend unit tests to cover some additional cases --- .../util/RoundRobinLoadBalancerFactory.java | 130 +++++++----------- .../grpc/util/RoundRobinLoadBalancerTest.java | 27 ++++ 2 files changed, 78 insertions(+), 79 deletions(-) diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java index eb444848a79..19141edbd86 100644 --- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java @@ -44,12 +44,10 @@ import io.grpc.internal.ServiceConfigUtil; import java.util.ArrayList; import java.util.Collection; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Queue; import java.util.Random; import java.util.Set; @@ -116,7 +114,7 @@ static final class RoundRobinLoadBalancer extends LoadBalancer { private final Random random; private ConnectivityState currentState; - private SubchannelPicker currentPicker = new EmptyPicker(null); + private RoundRobinPicker currentPicker = new EmptyPicker(/*status=*/ null); @Nullable private StickinessState stickinessState; @@ -196,22 +194,16 @@ public void handleNameResolutionError(Status error) { @Override public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { - Ref stateInfoRef = getSubchannelStateInfoRef(subchannel); - if (stateInfoRef.value.getState() == SHUTDOWN) { - // This is the case the shutdown was triggered by a name resolver removal, the channel - // shutdown state change logic was already triggered in handleResolvedAddressGroups(). + if (subchannels.get(subchannel.getAddresses()) != subchannel) { return; } if (stateInfo.getState() == SHUTDOWN && stickinessState != null) { stickinessState.remove(subchannel); } - if (subchannels.get(subchannel.getAddresses()) != subchannel) { - return; - } if (stateInfo.getState() == IDLE) { subchannel.requestConnection(); } - stateInfoRef.value = stateInfo; + getSubchannelStateInfoRef(subchannel).value = stateInfo; updateBalancingState(); } @@ -237,8 +229,25 @@ public void shutdown() { private void updateBalancingState() { List activeList = filterNonFailingSubchannels(getSubchannels()); if (activeList.isEmpty()) { - // empty picker returns error or no result - updateBalancingState(getAggregatedState(), new EmptyPicker(getAggregatedError())); + // No READY subchannels, determine aggregate state and error status + boolean isConnecting = false; + Status aggStatus = null; + for (Subchannel subchannel : getSubchannels()) { + ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value; + // This subchannel IDLE is not because of channel IDLE_TIMEOUT, + // in which case LB is already shutdown. + // RRLB will request connection immediately on subchannel IDLE. + if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) { + isConnecting = true; + } + if (aggStatus == null || !aggStatus.isOk()) { + aggStatus = stateInfo.getStatus(); + } + } + updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE, + // If all subchannels are TRANSIENT_FAILURE, return the Status associated with + // an arbitrary subchannel, otherwise return null. + new EmptyPicker(aggStatus != null && !aggStatus.isOk() ? aggStatus : null)); } else { // initialize the Picker to a random start index to ensure that a high frequency of Picker // churn does not skew subchannel selection. @@ -247,34 +256,14 @@ private void updateBalancingState() { } } - private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) { - if (state != currentState || !areEquivalentPickers(picker, currentPicker)) { + private void updateBalancingState(ConnectivityState state, RoundRobinPicker picker) { + if (state != currentState || !picker.isEquivalentTo(currentPicker)) { helper.updateBalancingState(state, picker); currentState = state; currentPicker = picker; } } - private static boolean areEquivalentPickers(SubchannelPicker p1, SubchannelPicker p2) { - if (p1 == p2) { - return true; - } - if (p1.getClass() != p2.getClass()) { - return false; - } - if (p1 instanceof EmptyPicker) { - return Objects.equal(((EmptyPicker)p1).status, ((EmptyPicker)p2).status); - } - if (p1 instanceof ReadyPicker) { - ReadyPicker rp1 = (ReadyPicker) p1; - ReadyPicker rp2 = (ReadyPicker) p2; - // the lists cannot contain duplicate subchannels - return rp1.stickinessState == rp2.stickinessState && rp1.list.size() == rp2.list.size() - && new HashSet(rp1.list).containsAll(rp2.list); - } - return false; - } - /** * Filters out non-ready subchannels. */ @@ -301,43 +290,6 @@ private static Set stripAttrs(List states = EnumSet.noneOf(ConnectivityState.class); - for (Subchannel subchannel : getSubchannels()) { - states.add(getSubchannelStateInfoRef(subchannel).value.getState()); - } - if (states.contains(READY)) { - return READY; - } - if (states.contains(CONNECTING)) { - return CONNECTING; - } - if (states.contains(IDLE)) { - // This subchannel IDLE is not because of channel IDLE_TIMEOUT, in which case LB is already - // shutdown. - // RRLB will request connection immediately on subchannel IDLE. - return CONNECTING; - } - return TRANSIENT_FAILURE; - } - @VisibleForTesting Collection getSubchannels() { return subchannels.values(); @@ -442,13 +394,18 @@ Subchannel getSubchannel(String stickinessValue) { } } } + + // Only subclasses are ReadyPicker or EmptyPicker + private abstract static class RoundRobinPicker extends SubchannelPicker { + abstract boolean isEquivalentTo(RoundRobinPicker picker); + } @VisibleForTesting - static final class ReadyPicker extends SubchannelPicker { + static final class ReadyPicker extends RoundRobinPicker { private static final AtomicIntegerFieldUpdater indexUpdater = AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index"); - private final List list; + private final List list; // non-empty @Nullable private final RoundRobinLoadBalancer.StickinessState stickinessState; @SuppressWarnings("unused") @@ -456,6 +413,7 @@ static final class ReadyPicker extends SubchannelPicker { ReadyPicker(List list, int startIndex, @Nullable RoundRobinLoadBalancer.StickinessState stickinessState) { + assert !list.isEmpty(); this.list = list; this.stickinessState = stickinessState; this.index = startIndex - 1; @@ -479,10 +437,6 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { private Subchannel nextSubchannel() { int size = list.size(); - if (size == 0) { - throw new NoSuchElementException(); - } - int i = indexUpdater.incrementAndGet(this); if (i >= size) { int oldi = i; @@ -496,9 +450,22 @@ private Subchannel nextSubchannel() { List getList() { return list; } + + @Override + boolean isEquivalentTo(RoundRobinPicker picker) { + if (!(picker instanceof ReadyPicker)) { + return false; + } + ReadyPicker other = (ReadyPicker) picker; + // the lists cannot contain duplicate subchannels + return other == this || (stickinessState == other.stickinessState + && list.size() == other.list.size() + && new HashSet(list).containsAll(other.list)); + } } - static final class EmptyPicker extends SubchannelPicker { + @VisibleForTesting + static final class EmptyPicker extends RoundRobinPicker { @Nullable private final Status status; @@ -510,5 +477,10 @@ static final class EmptyPicker extends SubchannelPicker { public PickResult pickSubchannel(PickSubchannelArgs args) { return status != null ? PickResult.withError(status) : PickResult.withNoResult(); } + + @Override + boolean isEquivalentTo(RoundRobinPicker picker) { + return picker instanceof EmptyPicker && Objects.equal(status, ((EmptyPicker) picker).status); + } } } diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 133f32b9a27..5a4d3d6077a 100644 --- a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -49,6 +49,7 @@ import io.grpc.EquivalentAddressGroup; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; @@ -61,6 +62,7 @@ import io.grpc.util.RoundRobinLoadBalancerFactory.Ref; import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -230,6 +232,13 @@ public Subchannel answer(InvocationOnMock invocation) throws Throwable { picker = pickerCaptor.getValue(); assertThat(getList(picker)).containsExactly(oldSubchannel, newSubchannel); + // test going from non-empty to empty + loadBalancer.handleResolvedAddressGroups(Collections.emptyList(), + affinity); + + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + assertEquals(PickResult.withNoResult(), pickerCaptor.getValue().pickSubchannel(mockArgs)); + verifyNoMoreInteractions(mockHelper); } @@ -642,6 +651,7 @@ public void stickiness_oneSubchannelShutdown() { Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel(); + // shutdown channel directly loadBalancer .handleSubchannelState(sc1, ConnectivityStateInfo.forNonError(ConnectivityState.SHUTDOWN)); @@ -651,6 +661,23 @@ public void stickiness_oneSubchannelShutdown() { picker.pickSubchannel(mockArgs).getSubchannel()); assertThat(loadBalancer.getStickinessMapForTest()).hasSize(1); verify(mockArgs, atLeast(2)).getHeaders(); + + Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel(); + + assertEquals(sc2, loadBalancer.getStickinessMapForTest().get("my-sticky-value").value); + + // shutdown channel via name resolver change + List newServers = new ArrayList(servers); + newServers.remove(sc2.getAddresses()); + + loadBalancer.handleResolvedAddressGroups(newServers, attributes); + + assertNull(loadBalancer.getStickinessMapForTest().get("my-sticky-value").value); + + assertEquals(nextSubchannel(sc2, allSubchannels), + picker.pickSubchannel(mockArgs).getSubchannel()); + assertThat(loadBalancer.getStickinessMapForTest()).hasSize(1); + verify(mockArgs, atLeast(2)).getHeaders(); } @Test From b902bc26e9c9c8d5105648535374acf76b0acd51 Mon Sep 17 00:00:00 2001 From: nickhill Date: Thu, 6 Sep 2018 11:21:02 +0100 Subject: [PATCH 7/8] Address latest review comments from @zhangkun83 - Use explicit check for non-empty list instead of assert - Change EmptyPicker.status to be non-nullable - Further test coverage improvement including explicit picker comparison tests --- .../util/RoundRobinLoadBalancerFactory.java | 30 ++++++++----- .../grpc/util/RoundRobinLoadBalancerTest.java | 45 +++++++++++++++++++ 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java index 19141edbd86..88ad904040b 100644 --- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java @@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import io.grpc.Attributes; import io.grpc.ConnectivityState; @@ -114,7 +115,7 @@ static final class RoundRobinLoadBalancer extends LoadBalancer { private final Random random; private ConnectivityState currentState; - private RoundRobinPicker currentPicker = new EmptyPicker(/*status=*/ null); + private RoundRobinPicker currentPicker = new EmptyPicker(Status.OK); @Nullable private StickinessState stickinessState; @@ -223,15 +224,18 @@ public void shutdown() { } } + private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready"); + /** * Updates picker with the list of active subchannels (state == READY). */ + @SuppressWarnings("ReferenceEquality") private void updateBalancingState() { List activeList = filterNonFailingSubchannels(getSubchannels()); if (activeList.isEmpty()) { // No READY subchannels, determine aggregate state and error status boolean isConnecting = false; - Status aggStatus = null; + Status aggStatus = EMPTY_OK; for (Subchannel subchannel : getSubchannels()) { ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value; // This subchannel IDLE is not because of channel IDLE_TIMEOUT, @@ -240,14 +244,14 @@ private void updateBalancingState() { if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) { isConnecting = true; } - if (aggStatus == null || !aggStatus.isOk()) { + if (aggStatus == EMPTY_OK || !aggStatus.isOk()) { aggStatus = stateInfo.getStatus(); } } updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE, // If all subchannels are TRANSIENT_FAILURE, return the Status associated with - // an arbitrary subchannel, otherwise return null. - new EmptyPicker(aggStatus != null && !aggStatus.isOk() ? aggStatus : null)); + // an arbitrary subchannel, otherwise return OK. + new EmptyPicker(aggStatus)); } else { // initialize the Picker to a random start index to ensure that a high frequency of Picker // churn does not skew subchannel selection. @@ -322,7 +326,8 @@ Map> getStickinessMapForTest() { * Holds stickiness related states: The stickiness key, a registry mapping stickiness values to * the associated Subchannel Ref, and a map from Subchannel to Subchannel Ref. */ - private static final class StickinessState { + @VisibleForTesting + static final class StickinessState { static final int MAX_ENTRIES = 1000; final Key key; @@ -413,7 +418,7 @@ static final class ReadyPicker extends RoundRobinPicker { ReadyPicker(List list, int startIndex, @Nullable RoundRobinLoadBalancer.StickinessState stickinessState) { - assert !list.isEmpty(); + Preconditions.checkArgument(!list.isEmpty(), "empty list"); this.list = list; this.stickinessState = stickinessState; this.index = startIndex - 1; @@ -466,21 +471,22 @@ boolean isEquivalentTo(RoundRobinPicker picker) { @VisibleForTesting static final class EmptyPicker extends RoundRobinPicker { - @Nullable + private final Status status; - EmptyPicker(@Nullable Status status) { - this.status = status; + EmptyPicker(@Nonnull Status status) { + this.status = Preconditions.checkNotNull(status, "status"); } @Override public PickResult pickSubchannel(PickSubchannelArgs args) { - return status != null ? PickResult.withError(status) : PickResult.withNoResult(); + return status.isOk() ? PickResult.withNoResult() : PickResult.withError(status); } @Override boolean isEquivalentTo(RoundRobinPicker picker) { - return picker instanceof EmptyPicker && Objects.equal(status, ((EmptyPicker) picker).status); + return picker instanceof EmptyPicker && (Objects.equal(status, ((EmptyPicker) picker).status) + || (status.isOk() && ((EmptyPicker) picker).status.isOk())); } } } diff --git a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 5a4d3d6077a..aea42c98fb6 100644 --- a/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -20,13 +20,16 @@ import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer.STATE_INFO; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; @@ -61,8 +64,10 @@ import io.grpc.util.RoundRobinLoadBalancerFactory.ReadyPicker; import io.grpc.util.RoundRobinLoadBalancerFactory.Ref; import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer; +import io.grpc.util.RoundRobinLoadBalancerFactory.RoundRobinLoadBalancer.StickinessState; import java.net.SocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -221,6 +226,9 @@ public Subchannel answer(InvocationOnMock invocation) throws Throwable { verify(newSubchannel, times(1)).requestConnection(); verify(removedSubchannel, times(1)).shutdown(); + + loadBalancer.handleSubchannelState(removedSubchannel, + ConnectivityStateInfo.forNonError(SHUTDOWN)); assertThat(loadBalancer.getSubchannels()).containsExactly(oldSubchannel, newSubchannel); @@ -672,6 +680,10 @@ public void stickiness_oneSubchannelShutdown() { loadBalancer.handleResolvedAddressGroups(newServers, attributes); + verify(sc2, times(1)).shutdown(); + + loadBalancer.handleSubchannelState(sc2, ConnectivityStateInfo.forNonError(SHUTDOWN)); + assertNull(loadBalancer.getStickinessMapForTest().get("my-sticky-value").value); assertEquals(nextSubchannel(sc2, allSubchannels), @@ -714,6 +726,39 @@ public void stickiness_resolveTwice_metadataKeyUnChanged() { assertSame(stickinessMap1, stickinessMap2); } + @Test(expected = IllegalArgumentException.class) + public void readyPicker_emptyList() { + // ready picker list must be non-empty + new ReadyPicker(Collections.emptyList(), 0, null); + } + + @Test + public void internalPickerComparisons() { + EmptyPicker emptyOk1 = new EmptyPicker(Status.OK); + EmptyPicker emptyOk2 = new EmptyPicker(Status.OK.withDescription("different OK")); + EmptyPicker emptyErr = new EmptyPicker(Status.UNKNOWN.withDescription("¯\\_(ツ)_//¯")); + + Iterator subchannelIterator = subchannels.values().iterator(); + Subchannel sc1 = subchannelIterator.next(); + Subchannel sc2 = subchannelIterator.next(); + StickinessState stickinessState = new StickinessState("stick-key"); + ReadyPicker ready1 = new ReadyPicker(Arrays.asList(sc1, sc2), 0, null); + ReadyPicker ready2 = new ReadyPicker(Arrays.asList(sc1), 0, null); + ReadyPicker ready3 = new ReadyPicker(Arrays.asList(sc2, sc1), 1, null); + ReadyPicker ready4 = new ReadyPicker(Arrays.asList(sc1, sc2), 1, stickinessState); + ReadyPicker ready5 = new ReadyPicker(Arrays.asList(sc2, sc1), 0, stickinessState); + + assertTrue(emptyOk1.isEquivalentTo(emptyOk2)); + assertFalse(emptyOk1.isEquivalentTo(emptyErr)); + assertFalse(ready1.isEquivalentTo(ready2)); + assertTrue(ready1.isEquivalentTo(ready3)); + assertFalse(ready3.isEquivalentTo(ready4)); + assertTrue(ready4.isEquivalentTo(ready5)); + assertFalse(emptyOk1.isEquivalentTo(ready1)); + assertFalse(ready1.isEquivalentTo(emptyOk1)); + } + + private static List getList(SubchannelPicker picker) { return picker instanceof ReadyPicker ? ((ReadyPicker) picker).getList() : Collections.emptyList(); From 3df0d18e38772d2e096e76ff5728db3649d9527e Mon Sep 17 00:00:00 2001 From: nickhill Date: Sat, 8 Sep 2018 11:32:49 +0100 Subject: [PATCH 8/8] use EMPTY_OK instead of Status.OK for initial empty picker --- .../main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java index 88ad904040b..2e6e5e0530f 100644 --- a/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/util/RoundRobinLoadBalancerFactory.java @@ -115,7 +115,7 @@ static final class RoundRobinLoadBalancer extends LoadBalancer { private final Random random; private ConnectivityState currentState; - private RoundRobinPicker currentPicker = new EmptyPicker(Status.OK); + private RoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK); @Nullable private StickinessState stickinessState;