diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java index 2ef6a19012a..3b6b1eeeeab 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java @@ -138,12 +138,7 @@ public void handleResolvedAddressGroups( delegate.handleResolvedAddressGroups(newBackendServers, attributes); break; case GRPCLB: - if (newLbAddressGroups.isEmpty()) { - grpclbState.propagateError(Status.UNAVAILABLE.withDescription( - "NameResolver returned no LB address while asking for GRPCLB")); - } else { - grpclbState.updateAddresses(newLbAddressGroups, newBackendServers); - } + grpclbState.handleAddresses(newLbAddressGroups, newBackendServers); break; default: // Do nothing diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 301f65985d3..a82d29c177c 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -101,12 +101,14 @@ public String toString() { private static final Attributes.Key> STATE_INFO = Attributes.Key.of("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo"); - // Once set, never go back to null. + // Reset to null when timer expires or cancelled. @Nullable - private ScheduledFuture fallbackTimer; + private FallbackModeTask fallbackTimer; private List fallbackBackendList = Collections.emptyList(); - private boolean fallbackTimerExpired; - private boolean receivedServerListFromBalancer; + private boolean usingFallbackBackends; + // True if the current balancer has returned a serverlist. Will be reset to false when lost + // connection to a balancer. + private boolean balancerWorking; @Nullable private ManagedChannel lbCommChannel; @@ -141,14 +143,21 @@ void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState subchannel.requestConnection(); } subchannel.getAttributes().get(STATE_INFO).set(newState); + maybeStartFallbackTimer(); maybeUpdatePicker(); } /** - * Set the new addresses of the balancer and backends, and create connection if not yet connected. + * Handle new addresses of the balancer and backends from the resolver, and create connection if + * not yet connected. */ - void updateAddresses( + void handleAddresses( List newLbAddressGroups, List newBackendServers) { + if (newLbAddressGroups.isEmpty()) { + propagateError(Status.UNAVAILABLE.withDescription( + "NameResolver returned no LB address while asking for GRPCLB")); + return; + } LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups); startLbComm(newLbAddressGroup); // Avoid creating a new RPC just because the addresses were updated, as it can cause a @@ -158,30 +167,59 @@ void updateAddresses( if (lbStream == null) { startLbRpc(); } - // If we don't receive server list from the balancer within the timeout, we round-robin on - // the backend list from the resolver (aka fallback), until the balancer returns a server list. fallbackBackendList = newBackendServers; - if (fallbackTimer == null) { - fallbackTimer = - timerService.schedule(new FallbackModeTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } else { - maybeUseFallbackBackends(); + maybeStartFallbackTimer(); + if (usingFallbackBackends) { + // Populate the new fallback backends to round-robin list. + useFallbackBackends(); } + maybeUpdatePicker(); } - private void maybeUseFallbackBackends() { - // Only use fallback backends after fallback timer expired and before receiving server list from - // the balancer. - if (receivedServerListFromBalancer || !fallbackTimerExpired) { + /** + * Start the fallback timer if it's not already started and all connections are lost. + */ + private void maybeStartFallbackTimer() { + if (fallbackTimer != null) { return; } + if (fallbackBackendList.isEmpty()) { + return; + } + if (balancerWorking) { + return; + } + if (usingFallbackBackends) { + return; + } + int numReadySubchannels = 0; + for (Subchannel subchannel : subchannels.values()) { + if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) { + numReadySubchannels++; + } + } + if (numReadySubchannels > 0) { + return; + } + logger.log(Level.FINE, "[{0}] Starting fallback timer.", new Object[] {logId}); + fallbackTimer = new FallbackModeTask(); + fallbackTimer.schedule(); + } + + /** + * Populate the round-robin lists with the fallback backends. + */ + private void useFallbackBackends() { + usingFallbackBackends = true; + logger.log(Level.INFO, "[{0}] Using fallback: {1}", new Object[] {logId, fallbackBackendList}); + List newDropList = new ArrayList(); List newBackendAddrList = new ArrayList(); for (EquivalentAddressGroup eag : fallbackBackendList) { newDropList.add(null); newBackendAddrList.add(new BackendAddressGroup(eag, null)); } - updateRoundRobinLists(newDropList, newBackendAddrList, null); + useRoundRobinLists(newDropList, newBackendAddrList, null); } private void shutdownLbComm() { @@ -231,12 +269,20 @@ private void startLbRpc() { } } + private void cancelFallbackTimer() { + if (fallbackTimer != null) { + fallbackTimer.cancel(); + fallbackTimer = null; + } + } + void shutdown() { shutdownLbComm(); for (Subchannel subchannel : subchannels.values()) { subchannel.shutdown(); } subchannels = Collections.emptyMap(); + cancelFallbackTimer(); } void propagateError(Status status) { @@ -257,7 +303,10 @@ GrpclbClientLoadRecorder getLoadRecorder() { return lbStream.loadRecorder; } - private void updateRoundRobinLists( + /** + * Populate the round-robin lists with the given values. + */ + private void useRoundRobinLists( List newDropList, List newBackendAddrList, @Nullable GrpclbClientLoadRecorder loadRecorder) { HashMap newSubchannelMap = @@ -301,21 +350,39 @@ private void updateRoundRobinLists( subchannels = Collections.unmodifiableMap(newSubchannelMap); dropList = Collections.unmodifiableList(newDropList); backendList = Collections.unmodifiableList(newBackendList); - maybeUpdatePicker(); } @VisibleForTesting class FallbackModeTask implements Runnable { + private ScheduledFuture scheduledFuture; + // If the scheduledFuture is cancelled after the task has made it into the ChannelExecutor, the + // task will be started anyway. Use this boolean to signal that the task should not be run. + private boolean cancelled; + @Override public void run() { helper.runSerialized(new Runnable() { @Override public void run() { - fallbackTimerExpired = true; - maybeUseFallbackBackends(); + if (!cancelled) { + checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch"); + fallbackTimer = null; + useFallbackBackends(); + maybeUpdatePicker(); + } } }); } + + void cancel() { + scheduledFuture.cancel(false); + cancelled = true; + } + + void schedule() { + checkState(scheduledFuture == null, "FallbackModeTask already scheduled"); + scheduledFuture = timerService.schedule(this, FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } } @VisibleForTesting @@ -443,10 +510,7 @@ private void handleResponse(LoadBalanceResponse response) { return; } - receivedServerListFromBalancer = true; - if (fallbackTimer != null) { - fallbackTimer.cancel(false); - } + balancerWorking = true; // TODO(zhangkun83): handle delegate from initialResponse ServerList serverList = response.getServerList(); List newDropList = new ArrayList(); @@ -470,16 +534,23 @@ private void handleResponse(LoadBalanceResponse response) { newBackendAddrList.add(new BackendAddressGroup(eag, token)); } } - updateRoundRobinLists(newDropList, newBackendAddrList, loadRecorder); + // Stop using fallback backends as soon as a new server list is received from the balancer. + usingFallbackBackends = false; + cancelFallbackTimer(); + useRoundRobinLists(newDropList, newBackendAddrList, loadRecorder); + maybeUpdatePicker(); } - private void handleStreamClosed(Status status) { + private void handleStreamClosed(Status error) { + checkArgument(!error.isOk(), "unexpected OK status"); if (closed) { return; } closed = true; cleanUp(); - propagateError(status); + propagateError(error); + balancerWorking = false; + maybeStartFallbackTimer(); startLbRpc(); } @@ -705,7 +776,7 @@ public boolean equals(Object other) { @VisibleForTesting static final class ErrorEntry implements RoundRobinEntry { - private final PickResult result; + final PickResult result; ErrorEntry(Status status) { result = PickResult.withError(status); @@ -728,6 +799,13 @@ public boolean equals(Object other) { } return Objects.equal(result, ((ErrorEntry) other).result); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("result", result) + .toString(); + } } @VisibleForTesting diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index fc3beab6774..01958748e2d 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -43,6 +43,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.protobuf.util.Durations; @@ -119,6 +120,7 @@ public boolean shouldAccept(Runnable command) { @Mock private Helper helper; + private SubchannelPicker currentPicker; private LoadBalancerGrpc.LoadBalancerImplBase mockLbService; @Captor private ArgumentCaptor> lbResponseObserverCaptor; @@ -220,6 +222,14 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return null; } }).when(helper).runSerialized(any(Runnable.class)); + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + currentPicker = (SubchannelPicker) invocation.getArguments()[1]; + return null; + } + }).when(helper).updateBalancingState( + any(ConnectivityState.class), any(SubchannelPicker.class)); when(helper.getAuthority()).thenReturn(SERVICE_AUTHORITY); when(timerServicePool.getObject()).thenReturn(fakeClock.getScheduledExecutorService()); balancer = new GrpclbLoadBalancer( @@ -247,6 +257,8 @@ public void run() { for (Subchannel subchannel: subchannelTracker) { verify(subchannel).shutdown(); } + // No timer should linger after shutdown + assertEquals(0, fakeClock.numPendingTasks()); } finally { if (fakeLbServer != null) { fakeLbServer.shutdownNow(); @@ -358,6 +370,10 @@ public void loadReporting() { Attributes grpclbResolutionAttrs = Attributes.newBuilder() .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + + // No backend address from resolver. Fallback timer is not started. + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + assertEquals(1, fakeOobChannels.size()); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); @@ -1018,8 +1034,8 @@ public void grpclbWorking() { InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); - // Timer for fallback mode is registered - assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + // No backend address from resolver. Fallback timer is not started. + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); // Simulate receiving LB response List backends1 = Arrays.asList( @@ -1034,8 +1050,6 @@ public void grpclbWorking() { eq(new EquivalentAddressGroup(backends1.get(0).addr)), any(Attributes.class)); inOrder.verify(helper).createSubchannel( eq(new EquivalentAddressGroup(backends1.get(1).addr)), any(Attributes.class)); - // Timer for fallback mode is cancelled as soon as the balancer returns a server list - assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); assertEquals(2, mockSubchannels.size()); Subchannel subchannel1 = mockSubchannels.poll(); Subchannel subchannel2 = mockSubchannels.poll(); @@ -1180,11 +1194,22 @@ public void grpclbWorking() { verify(lbRequestObserver, never()).onError(any(Throwable.class)); // Load reporting was not requested, thus never scheduled - assertEquals(0, fakeClock.numPendingTasks()); + assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER)); + } + + @Test + public void grpclbFallback_initialTimeout_serverListReceivedBeforeTimerExpires() { + subtestGrpclbFallbackInitialTimeout(false); } @Test - public void grpclbFallbackToBackendsFromResolver() { + public void grpclbFallback_initialTimeout_timerExpires() { + subtestGrpclbFallbackInitialTimeout(true); + } + + // Fallback within the period of the initial timeout, where the server list is not received from + // the balancer. + private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { long loadReportIntervalMillis = 1983; InOrder helperInOrder = inOrder(helper); @@ -1211,29 +1236,29 @@ public void grpclbFallbackToBackendsFromResolver() { eq(LoadBalanceRequest.newBuilder().setInitialRequest( InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); - // Receiving the initial response won't reset the fallback timer. Only reciving the server list - // does. lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); // We don't care if runSerialized() has been run. helperInOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); helperInOrder.verifyNoMoreInteractions(); - //////////////////////////// - // Fallback timer expires - //////////////////////////// assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS); assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); - assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - - // Fall back to the backends from resolver - fallbackTestVerifyUseOfFallbackBackendLists( - helperInOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); - assertNull(balancer.getDelegate()); - assertFalse(oobChannel.isShutdown()); - verify(lbRequestObserver, never()).onCompleted(); + ////////////////////////////////// + // Fallback timer expires (or not) + ////////////////////////////////// + if (timerExpires) { + fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + // Fall back to the backends from resolver + fallbackTestVerifyUseOfFallbackBackendLists( + helperInOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); + + assertNull(balancer.getDelegate()); + assertFalse(oobChannel.isShutdown()); + verify(lbRequestObserver, never()).onCompleted(); + } //////////////////////////////////////////////////////// // Name resolver sends new list without any backend addr @@ -1250,9 +1275,11 @@ public void grpclbFallbackToBackendsFromResolver() { resolutionList.get(0).getAddresses().get(0), resolutionList.get(1).getAddresses().get(0))))); - // Still in fallback logic, except that the backend list is empty - fallbackTestVerifyUseOfFallbackBackendLists( - helperInOrder, helper, Collections.emptyList()); + if (timerExpires) { + // Still in fallback logic, except that the backend list is empty + fallbackTestVerifyUseOfFallbackBackendLists( + helperInOrder, helper, Collections.emptyList()); + } ////////////////////////////////////////////////// // Name resolver sends new list with backend addrs @@ -1266,18 +1293,32 @@ public void grpclbFallbackToBackendsFromResolver() { same(oobChannel), addrsEq(resolutionList.get(0))); - // New backend addresses are used for fallback - fallbackTestVerifyUseOfFallbackBackendLists( - helperInOrder, helper, Arrays.asList(resolutionList.get(1), resolutionList.get(2))); + if (timerExpires) { + // New backend addresses are used for fallback + fallbackTestVerifyUseOfFallbackBackendLists( + helperInOrder, helper, Arrays.asList(resolutionList.get(1), resolutionList.get(2))); + } /////////////////////// // Break the LB stream /////////////////////// - lbResponseObserver.onError(Status.UNAVAILABLE.asException()); + Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); + lbResponseObserver.onError(streamError.asException()); - // The error will NOT propagate to picker because fallback list is in use. - helperInOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + if (timerExpires) { + // The error will NOT propagate to picker because fallback list is in use. + helperInOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + } else { + // Not in fallback mode. The error will be propagated. + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList).isEmpty(); + ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList); + Status status = errorEntry.result.getStatus(); + assertThat(status.getCode()).isEqualTo(streamError.getCode()); + assertThat(status.getDescription()).contains(streamError.getDescription()); + } // A new stream is created verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); @@ -1298,7 +1339,7 @@ public void grpclbFallbackToBackendsFromResolver() { lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(serverList)); - // Fallback mode ends + // Balancer-provided server list now in effect fallbackTestVerifyUseOfBalancerBackendLists(helperInOrder, helper, serverList); /////////////////////////////////////////////////////////////// @@ -1311,16 +1352,141 @@ public void grpclbFallbackToBackendsFromResolver() { helperInOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - // Fallback mode is one-shot only. + // No fallback timeout timer scheduled. + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + } + + @Test + public void grpclbFallback_balancerLost_timerExpires() { + subtestGrpclbFallbackConnectionLost(true, false, true); + } + + @Test + public void grpclbFallback_subchannelsLost_timerExpires() { + subtestGrpclbFallbackConnectionLost(false, true, true); + } + + @Test + public void grpclbFallback_allLost_timerExpires() { + subtestGrpclbFallbackConnectionLost(true, true, true); + } + + @Test + public void grpclbFallback_allLost_ResumeBeforeTimerExpires() { + subtestGrpclbFallbackConnectionLost(true, true, false); + } + + // Fallback outside of the initial timeout, where all connections are lost. + private void subtestGrpclbFallbackConnectionLost( + boolean balancerBroken, boolean allSubchannelsBroken, boolean timerExpires) { + long loadReportIntervalMillis = 1983; + InOrder inOrder = inOrder(helper, mockLbService); + + // Create a resolution list with a mixture of balancer and backend addresses + List resolutionList = + createResolvedServerAddresses(false, true, false); + Attributes resolutionAttrs = Attributes.newBuilder() + .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); + deliverResolvedAddresses(resolutionList, resolutionAttrs); + + assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); + inOrder.verify(helper).createOobChannel( + addrsEq(resolutionList.get(1)), eq(lbAuthority(0))); + + // Attempted to connect to balancer + assertEquals(1, fakeOobChannels.size()); + ManagedChannel oobChannel = fakeOobChannels.poll(); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); + // We don't care if runSerialized() has been run. + inOrder.verify(helper, atLeast(0)).runSerialized(any(Runnable.class)); + inOrder.verifyNoMoreInteractions(); + + // Balancer returns a server list + List serverList = Arrays.asList( + new ServerEntry("127.0.0.1", 2000, "token0001"), + new ServerEntry("127.0.0.1", 2010, "token0002")); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(serverList)); + + // No fallback timer scheduled assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + + List subchannels = + fallbackTestVerifyUseOfBalancerBackendLists(inOrder, helper, serverList); + + // Break connections + if (balancerBroken) { + lbResponseObserver.onError(Status.UNAVAILABLE.asException()); + // A new stream to LB is created + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + } + if (allSubchannelsBroken) { + for (Subchannel subchannel : subchannels) { + // A READY subchannel transits to IDLE when receiving a go-away + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); + } + } + + if (balancerBroken && allSubchannelsBroken) { + // Fallback timer is scheduled if all connections are lost. + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + if (timerExpires) { + fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + + // Going into fallback + subchannels = fallbackTestVerifyUseOfFallbackBackendLists( + inOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); + + // When in fallback mode, fallback timer should not be scheduled when all backend + // connections are lost + for (Subchannel subchannel : subchannels) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); + } + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + } else { + fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS); + } + + // Exit fallback mode or cancel fallback timer when receiving a new server list from balancer + List serverList2 = Arrays.asList( + new ServerEntry("127.0.0.1", 2001, "token0003"), + new ServerEntry("127.0.0.1", 2011, "token0004")); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(serverList2)); + + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + if (timerExpires) { + fallbackTestVerifyUseOfBalancerBackendLists(inOrder, helper, serverList2); + } + } else { + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + } + + if (!(balancerBroken && allSubchannelsBroken && timerExpires)) { + verify(helper, never()).createSubchannel(eq(resolutionList.get(0)), any(Attributes.class)); + verify(helper, never()).createSubchannel(eq(resolutionList.get(2)), any(Attributes.class)); + } } - private void fallbackTestVerifyUseOfFallbackBackendLists( + private List fallbackTestVerifyUseOfFallbackBackendLists( InOrder inOrder, Helper helper, List addrs) { - fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, null); + return fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, null); } - private void fallbackTestVerifyUseOfBalancerBackendLists( + private List fallbackTestVerifyUseOfBalancerBackendLists( InOrder inOrder, Helper helper, List servers) { ArrayList addrs = new ArrayList(); ArrayList tokens = new ArrayList(); @@ -1328,10 +1494,10 @@ private void fallbackTestVerifyUseOfBalancerBackendLists( addrs.add(new EquivalentAddressGroup(server.addr)); tokens.add(server.token); } - fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, tokens); + return fallbackTestVerifyUseOfBackendLists(inOrder, helper, addrs, tokens); } - private void fallbackTestVerifyUseOfBackendLists( + private List fallbackTestVerifyUseOfBackendLists( InOrder inOrder, Helper helper, List addrs, @Nullable List tokens) { if (tokens != null) { @@ -1340,8 +1506,7 @@ private void fallbackTestVerifyUseOfBackendLists( for (EquivalentAddressGroup addr : addrs) { inOrder.verify(helper).createSubchannel(addrsEq(addr), any(Attributes.class)); } - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); + RoundRobinPicker picker = (RoundRobinPicker) currentPicker; assertThat(picker.dropList).containsExactlyElementsIn(Collections.nCopies(addrs.size(), null)); assertThat(picker.pickList).containsExactly(GrpclbState.BUFFER_ENTRY); assertEquals(addrs.size(), mockSubchannels.size()); @@ -1374,6 +1539,7 @@ private void fallbackTestVerifyUseOfBackendLists( inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); } + return subchannels; } @Test