diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 1c812e14500..c8bf77076c3 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -95,8 +95,20 @@ final class GrpclbState { static final Status NO_AVAILABLE_BACKENDS_STATUS = Status.UNAVAILABLE.withDescription("LoadBalancer responded without any backends"); @VisibleForTesting - static final Status NO_FALLBACK_BACKENDS_FOUND_STATUS = + static final Status BALANCER_TIMEOUT_STATUS = + Status.UNAVAILABLE.withDescription("Timeout waiting for remote balancer"); + @VisibleForTesting + static final Status BALANCER_REQUESTED_FALLBACK_STATUS = + Status.UNAVAILABLE.withDescription("Fallback requested by balancer"); + @VisibleForTesting + static final Status NO_FALLBACK_BACKENDS_STATUS = Status.UNAVAILABLE.withDescription("Unable to fallback, no fallback addresses found"); + // This error status should never be propagated to RPC failures, as "no backend or balancer + // addresses found" should be directly handled as a name resolution error. So in cases of no + // balancer address, fallback should never fail. + private static final Status NO_LB_ADDRESS_PROVIDED_STATUS = + Status.UNAVAILABLE.withDescription("No balancer address found"); + @VisibleForTesting static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { @@ -137,6 +149,10 @@ enum Mode { private ScheduledHandle fallbackTimer; private List fallbackBackendList = Collections.emptyList(); private boolean usingFallbackBackends; + // Reason to fallback, will be used as RPC's error message if fail to fallback (e.g., no + // fallback addresses found). + @Nullable + private Status fallbackReason; // True if the current balancer has returned a serverlist. Will be reset to false when lost // connection to a balancer. private boolean balancerWorking; @@ -239,7 +255,7 @@ void handleAddresses( // No balancer address: close existing balancer connection and enter fallback mode // immediately. shutdownLbComm(); - syncContext.execute(new FallbackModeTask()); + syncContext.execute(new FallbackModeTask(NO_LB_ADDRESS_PROVIDED_STATUS)); } else { startLbComm(newLbAddressGroups); // Avoid creating a new RPC just because the addresses were updated, as it can cause a @@ -253,7 +269,8 @@ void handleAddresses( // Start the fallback timer if it's never started if (fallbackTimer == null) { fallbackTimer = syncContext.schedule( - new FallbackModeTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS, timerService); + new FallbackModeTask(BALANCER_TIMEOUT_STATUS), FALLBACK_TIMEOUT_MS, + TimeUnit.MILLISECONDS, timerService); } } fallbackBackendList = newBackendServers; @@ -275,16 +292,21 @@ void requestConnection() { } private void maybeUseFallbackBackends() { - if (balancerWorking) { - return; - } - if (usingFallbackBackends) { + if (balancerWorking || usingFallbackBackends) { return; } + // Balancer RPC should have either been broken or timed out. + checkState(fallbackReason != null, "no reason to fallback"); for (Subchannel subchannel : subchannels.values()) { - if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) { + ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get(); + if (stateInfo.getState() == READY) { return; } + // If we do have balancer-provided backends, use one of its error in the error message if + // fail to fallback. + if (stateInfo.getState() == TRANSIENT_FAILURE) { + fallbackReason = stateInfo.getStatus(); + } } // Fallback conditions met useFallbackBackends(); @@ -401,8 +423,10 @@ void shutdown() { void propagateError(Status status) { logger.log(ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Error: {1}", serviceName, status); if (backendList.isEmpty()) { + Status error = + Status.UNAVAILABLE.withCause(status.getCause()).withDescription(status.getDescription()); maybeUpdatePicker( - TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(status)))); + TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(error)))); } } @@ -528,8 +552,17 @@ public void onSubchannelState(ConnectivityStateInfo newState) { @VisibleForTesting class FallbackModeTask implements Runnable { + private final Status reason; + + private FallbackModeTask(Status reason) { + this.reason = reason; + } + @Override public void run() { + // Timer should have been cancelled if entered fallback early. + checkState(!usingFallbackBackends, "already in fallback"); + fallbackReason = reason; maybeUseFallbackBackends(); maybeUpdatePicker(); } @@ -658,7 +691,9 @@ private void handleResponse(LoadBalanceResponse response) { } if (typeCase == LoadBalanceResponseTypeCase.FALLBACK_RESPONSE) { + // Force entering fallback requested by balancer. cancelFallbackTimer(); + fallbackReason = BALANCER_REQUESTED_FALLBACK_STATUS; useFallbackBackends(); maybeUpdatePicker(); return; @@ -690,7 +725,7 @@ private void handleResponse(LoadBalanceResponse response) { } catch (UnknownHostException e) { propagateError( Status.UNAVAILABLE - .withDescription("Host for server not found: " + server) + .withDescription("Invalid backend address: " + server) .withCause(e)); continue; } @@ -701,8 +736,9 @@ private void handleResponse(LoadBalanceResponse response) { newBackendAddrList.add(new BackendAddressGroup(eag, token)); } } - // Stop using fallback backends as soon as a new server list is received from the balancer. + // Exit fallback as soon as a new server list is received from the balancer. usingFallbackBackends = false; + fallbackReason = null; cancelFallbackTimer(); updateServerList(newDropList, newBackendAddrList, loadRecorder); maybeUpdatePicker(); @@ -717,6 +753,8 @@ private void handleStreamClosed(Status error) { cleanUp(); propagateError(error); balancerWorking = false; + fallbackReason = error; + cancelFallbackTimer(); maybeUseFallbackBackends(); maybeUpdatePicker(); @@ -773,15 +811,19 @@ private void maybeUpdatePicker() { List pickList; ConnectivityState state; if (backendList.isEmpty()) { - if (balancerWorking) { - pickList = - Collections.singletonList( - new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS)); + // Note balancer (is working) may enforce using fallback backends, and that fallback may + // fail. So we should check if currently in fallback first. + if (usingFallbackBackends) { + Status error = + NO_FALLBACK_BACKENDS_STATUS + .withCause(fallbackReason.getCause()) + .augmentDescription(fallbackReason.getDescription()); + pickList = Collections.singletonList(new ErrorEntry(error)); state = TRANSIENT_FAILURE; - } else if (usingFallbackBackends) { + } else if (balancerWorking) { pickList = Collections.singletonList( - new ErrorEntry(NO_FALLBACK_BACKENDS_FOUND_STATUS)); + new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS)); state = TRANSIENT_FAILURE; } else { // still waiting for balancer pickList = Collections.singletonList(BUFFER_ENTRY); diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index edd422a8b76..4fc3d122347 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -792,14 +792,16 @@ public void nameResolutionFailsThenRecover() { "INFO: [grpclb-] Created", "DEBUG: [grpclb-] Error: " + error, "INFO: [grpclb-] Update balancing state to TRANSIENT_FAILURE: picks=" - + "[Status{code=NOT_FOUND, description=www.google.com not found, cause=null}]," + + "[Status{code=UNAVAILABLE, description=www.google.com not found, cause=null}]," + " drops=[]") .inOrder(); logs.clear(); RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker.dropList).isEmpty(); - assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); + PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()).isEqualTo(error.getDescription()); // Recover with a subsequent success List grpclbBalancerList = createResolvedBalancerAddresses(1); @@ -832,7 +834,9 @@ public void grpclbThenNameResolutionFails() { inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker.dropList).isEmpty(); - assertThat(picker.pickList).containsExactly(new ErrorEntry(error)); + PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()).isEqualTo(error.getDescription()); assertFalse(oobChannel.isShutdown()); // Simulate receiving LB response @@ -1284,11 +1288,16 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { for (Subchannel subchannel : mockSubchannels) { verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); } + + // RPC error status includes message of balancer RPC timeout inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker.dropList).isEmpty(); - assertThat(picker.pickList) - .containsExactly(new ErrorEntry(GrpclbState.NO_FALLBACK_BACKENDS_FOUND_STATUS)); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()) + .isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); + assertThat(result.getStatus().getDescription()) + .contains(GrpclbState.BALANCER_TIMEOUT_STATUS.getDescription()); } //////////////////////////////////////////////////////////////// @@ -1396,6 +1405,9 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); lbResponseObserver.onError(streamError.asException()); + // Fallback time has been short-circuited + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + // Fall back to the backends from resolver fallbackTestVerifyUseOfFallbackBackendLists( inOrder, Arrays.asList(backendList.get(0), backendList.get(1))); @@ -1408,6 +1420,24 @@ public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { eq(LoadBalanceRequest.newBuilder().setInitialRequest( InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); + + ////////////////////////////////////////////////////////////////////// + // Name resolver sends new resolution results without any backend addr + ////////////////////////////////////////////////////////////////////// + deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); + + // Still in fallback logic, except that the backend list is empty + for (Subchannel subchannel : mockSubchannels) { + verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); + } + + // RPC error status includes error of balancer stream + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); + assertThat(result.getStatus().getDescription()).contains(streamError.getDescription()); } @Test @@ -1434,6 +1464,24 @@ public void grpclbFallback_noBalancerAddress() { assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); verify(helper, never()) .createOobChannel(ArgumentMatchers.anyList(), anyString()); + logs.clear(); + + /////////////////////////////////////////////////////////////////////////////////////// + // Name resolver sends new resolution results without any backend addr or balancer addr + /////////////////////////////////////////////////////////////////////////////////////// + deliverResolvedAddresses(Collections.emptyList(), + Collections.emptyList()); + assertThat(logs).containsExactly( + "DEBUG: [grpclb-] Error: Status{code=UNAVAILABLE, " + + "description=No backend or balancer addresses found, cause=null}"); + + // Keep using existing fallback addresses without interruption + for (Subchannel subchannel : mockSubchannels) { + verify(subchannelPool, never()) + .returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); + } + verify(helper, never()) + .updateBalancingState(eq(TRANSIENT_FAILURE), any(SubchannelPicker.class)); } @Test @@ -1531,6 +1579,7 @@ private void subtestGrpclbFallbackConnectionLost( } assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + // No subchannel to fallback backends should have been created if no fallback happened if (!(balancerBroken && allSubchannelsBroken)) { verify(subchannelPool, never()).takeOrCreateSubchannel( eq(backendList.get(0)), any(Attributes.class)); @@ -1539,6 +1588,72 @@ private void subtestGrpclbFallbackConnectionLost( } } + @Test + public void grpclbFallback_allLost_failToFallback() { + long loadReportIntervalMillis = 1983; + InOrder inOrder = inOrder(helper, mockLbService, subchannelPool); + + // Create balancer and (empty) backend addresses + List grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); + + inOrder.verify(helper).createOobChannel(eq(xattr(grpclbBalancerList)), + eq(lbAuthority(0) + NO_USE_AUTHORITY_SUFFIX)); + + // Attempted to connect to balancer + assertEquals(1, fakeOobChannels.size()); + fakeOobChannels.poll(); + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); + // We don't care if these methods have been run. + inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); + inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + + inOrder.verifyNoMoreInteractions(); + + // Balancer returns a server list + List serverList = Arrays.asList( + new ServerEntry("127.0.0.1", 2000, "token0001"), + new ServerEntry("127.0.0.1", 2010, "token0002")); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(serverList)); + + List subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, serverList); + + // Break connections + lbResponseObserver.onError(Status.UNAVAILABLE.asException()); + // A new stream to LB is created + inOrder.verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + assertEquals(1, lbRequestObservers.size()); + + // Break all subchannel connections + Status error = Status.UNAUTHENTICATED.withDescription("Permission denied"); + for (Subchannel subchannel : subchannels) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); + } + + // Recycle all subchannels + for (Subchannel subchannel : subchannels) { + verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); + } + + // RPC error status includes errors of subchannels to balancer-provided backends + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); + assertThat(result.getStatus().getDescription()).contains(error.getDescription()); + } + private List fallbackTestVerifyUseOfFallbackBackendLists( InOrder inOrder, List addrs) { return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, null); @@ -1958,6 +2073,7 @@ public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { assertThat(mockSubchannels).isEmpty(); verify(subchannel).shutdown(); + // RPC error status includes message of no backends provided by balancer inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(errorPicker.pickList) @@ -2445,7 +2561,7 @@ public void grpclbWorking_lbSendsFallbackMessage() { new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) .inOrder(); - // enter fallback mode + // Balancer forces entering fallback mode lbResponseObserver.onNext(buildLbFallbackResponse()); // existing subchannels must be returned immediately to gracefully shutdown. @@ -2460,6 +2576,25 @@ public void grpclbWorking_lbSendsFallbackMessage() { assertFalse(oobChannel.isShutdown()); verify(lbRequestObserver, never()).onCompleted(); + ////////////////////////////////////////////////////////////////////// + // Name resolver sends new resolution results without any backend addr + ////////////////////////////////////////////////////////////////////// + deliverResolvedAddresses(Collections.emptyList(), grpclbBalancerList); + + // Still in fallback logic, except that the backend list is empty + for (Subchannel subchannel : mockSubchannels) { + verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); + } + + // RPC error status includes message of fallback requested by balancer + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); + assertThat(result.getStatus().getDescription()) + .startsWith(GrpclbState.NO_FALLBACK_BACKENDS_STATUS.getDescription()); + assertThat(result.getStatus().getDescription()) + .contains(GrpclbState.BALANCER_REQUESTED_FALLBACK_STATUS.getDescription()); + // exit fall back by providing two new backends ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001"); ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002");