From bbc7e89343056f4a3d2d828cf2f74ec328a52139 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Thu, 11 Mar 2021 17:16:18 -0800 Subject: [PATCH 1/2] Turn LB state into TRANSIENT_FAILURE if an empty list of backend addresses is given. It applies to both the address list given by the balancer and fallback list given by the resolver whenever it is being used. --- .../main/java/io/grpc/grpclb/GrpclbState.java | 106 +++++++++--------- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 33 +++--- 2 files changed, 74 insertions(+), 65 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index a241453f866..4060d3f86fa 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -95,6 +95,9 @@ final class GrpclbState { @VisibleForTesting static final Status NO_AVAILABLE_BACKENDS_STATUS = Status.UNAVAILABLE.withDescription("LoadBalancer responded without any backends"); + @VisibleForTesting + static final Status NO_FALLBACK_BACKENDS_FOUND_STATUS = + Status.UNAVAILABLE.withDescription("Unable to fallback, no fallback addresses found"); @VisibleForTesting static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { @@ -143,7 +146,6 @@ enum Mode { @Nullable private ManagedChannel lbCommChannel; - private boolean lbSentEmptyBackends = false; @Nullable private LbStream lbStream; @@ -288,7 +290,7 @@ private void maybeUseFallbackBackends() { } /** - * Populate the round-robin lists with the fallback backends. + * Populate backend servers to be used from the fallback backends. */ private void useFallbackBackends() { usingFallbackBackends = true; @@ -300,7 +302,7 @@ private void useFallbackBackends() { newDropList.add(null); newBackendAddrList.add(new BackendAddressGroup(eag, null)); } - useRoundRobinLists(newDropList, newBackendAddrList, null); + updateServerList(newDropList, newBackendAddrList, null); } private void shutdownLbComm() { @@ -421,9 +423,9 @@ GrpclbClientLoadRecorder getLoadRecorder() { } /** - * Populate the round-robin lists with the given values. + * Populate backend servers to be used based on the given list of addresses. */ - private void useRoundRobinLists( + private void updateServerList( List newDropList, List newBackendAddrList, @Nullable GrpclbClientLoadRecorder loadRecorder) { logger.log( @@ -473,7 +475,6 @@ private void useRoundRobinLists( final Subchannel subchannel; if (newBackendAddrList.isEmpty()) { if (subchannels.size() == 1) { - cancelFallbackTimer(); subchannel = subchannels.values().iterator().next(); subchannel.shutdown(); subchannels = Collections.emptyMap(); @@ -705,9 +706,8 @@ private void handleResponse(LoadBalanceResponse response) { } // Stop using fallback backends as soon as a new server list is received from the balancer. usingFallbackBackends = false; - lbSentEmptyBackends = serverList.getServersList().isEmpty(); cancelFallbackTimer(); - useRoundRobinLists(newDropList, newBackendAddrList, loadRecorder); + updateServerList(newDropList, newBackendAddrList, loadRecorder); maybeUpdatePicker(); } @@ -775,48 +775,52 @@ private void cleanUp() { private void maybeUpdatePicker() { List pickList; ConnectivityState state; - switch (config.getMode()) { - case ROUND_ROBIN: - pickList = new ArrayList<>(backendList.size()); - Status error = null; - boolean hasIdle = false; - for (BackendEntry entry : backendList) { - Subchannel subchannel = entry.subchannel; - Attributes attrs = subchannel.getAttributes(); - ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get(); - if (stateInfo.getState() == READY) { - pickList.add(entry); - } else if (stateInfo.getState() == TRANSIENT_FAILURE) { - error = stateInfo.getStatus(); - } else if (stateInfo.getState() == IDLE) { - hasIdle = true; - } - } - if (pickList.isEmpty()) { - if (error != null && !hasIdle) { - pickList.add(new ErrorEntry(error)); - state = TRANSIENT_FAILURE; - } else { - pickList.add(BUFFER_ENTRY); - state = CONNECTING; + if (backendList.isEmpty()) { + if (balancerWorking) { + pickList = + Collections.singletonList( + new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS)); + state = TRANSIENT_FAILURE; + } else if (usingFallbackBackends) { + pickList = + Collections.singletonList( + new ErrorEntry(NO_FALLBACK_BACKENDS_FOUND_STATUS)); + state = TRANSIENT_FAILURE; + } else { // still waiting for LoadBalancer + pickList = Collections.singletonList(BUFFER_ENTRY); + state = CONNECTING; + } + } else { + switch (config.getMode()) { + case ROUND_ROBIN: + pickList = new ArrayList<>(backendList.size()); + Status error = null; + boolean hasPending = false; + for (BackendEntry entry : backendList) { + Subchannel subchannel = entry.subchannel; + Attributes attrs = subchannel.getAttributes(); + ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get(); + if (stateInfo.getState() == READY) { + pickList.add(entry); + } else if (stateInfo.getState() == TRANSIENT_FAILURE) { + error = stateInfo.getStatus(); + } else { + hasPending = true; + } } - } else { - state = READY; - } - break; - case PICK_FIRST: - if (backendList.isEmpty()) { - if (lbSentEmptyBackends) { - pickList = - Collections.singletonList( - new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS)); - state = TRANSIENT_FAILURE; + if (pickList.isEmpty()) { + if (hasPending) { + pickList.add(BUFFER_ENTRY); + state = CONNECTING; + } else { + pickList.add(new ErrorEntry(error)); + state = TRANSIENT_FAILURE; + } } else { - pickList = Collections.singletonList(BUFFER_ENTRY); - // Have not received server addresses - state = CONNECTING; + state = READY; } - } else { + break; + case PICK_FIRST: checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList); BackendEntry onlyEntry = backendList.get(0); ConnectivityStateInfo stateInfo = @@ -837,10 +841,10 @@ private void maybeUpdatePicker() { pickList = Collections.singletonList( new IdleSubchannelEntry(onlyEntry.subchannel, syncContext)); } - } - break; - default: - throw new AssertionError("Missing case for " + config.getMode()); + break; + default: + throw new AssertionError("Missing case for " + config.getMode()); + } } maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList)); } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index c7ed8b1efdc..2d6949ef47a 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1133,10 +1133,11 @@ public void grpclbWorking() { .returnSubchannel(same(subchannel2), eq(ConnectivityStateInfo.forNonError(READY))); verify(subchannelPool) .returnSubchannel(same(subchannel3), eq(ConnectivityStateInfo.forNonError(READY))); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); RoundRobinPicker picker10 = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker10.dropList).isEmpty(); - assertThat(picker10.pickList).containsExactly(BUFFER_ENTRY); + assertThat(picker10.pickList) + .containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS)); assertFalse(oobChannel.isShutdown()); assertEquals(0, lbRequestObservers.size()); @@ -1248,20 +1249,18 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { fakeClock.forwardTime(1, TimeUnit.MILLISECONDS); assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - List fallbackList = - Arrays.asList(backendList.get(0), backendList.get(1)); assertThat(logs).containsExactly( - "INFO: [grpclb-] Using fallback backends", - "INFO: [grpclb-]" - + " Using RR list=[[[FakeSocketAddress-fake-address-0]/{}], " - + "[[FakeSocketAddress-fake-address-1]/{}]], drop=[null, null]", - "INFO: [grpclb-] " - + "Update balancing state to CONNECTING: picks=[BUFFER_ENTRY], " - + "drops=[null, null]") + "INFO: [grpclb-] Using fallback backends", + "INFO: [grpclb-]" + + " Using RR list=[[[FakeSocketAddress-fake-address-0]/{}], " + + "[[FakeSocketAddress-fake-address-1]/{}]], drop=[null, null]", + "INFO: [grpclb-] " + + "Update balancing state to CONNECTING: picks=[BUFFER_ENTRY], " + + "drops=[null, null]") .inOrder(); // Fall back to the backends from resolver - fallbackTestVerifyUseOfFallbackBackendLists(inOrder, fallbackList); + fallbackTestVerifyUseOfFallbackBackendLists(inOrder, backendList); assertFalse(oobChannel.isShutdown()); verify(lbRequestObserver, never()).onCompleted(); @@ -1284,8 +1283,14 @@ private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { if (timerExpires) { // Still in fallback logic, except that the backend list is empty - fallbackTestVerifyUseOfFallbackBackendLists( - inOrder, Collections.emptyList()); + for (Subchannel subchannel : mockSubchannels) { + verify(subchannelPool).returnSubchannel(eq(subchannel), any(ConnectivityStateInfo.class)); + } + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList).isEmpty(); + assertThat(picker.pickList) + .containsExactly(new ErrorEntry(GrpclbState.NO_FALLBACK_BACKENDS_FOUND_STATUS)); } //////////////////////////////////////////////////////////////// From 677dc929ddf1a225c8724a28892dae94b7484cce Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 12 Mar 2021 16:35:11 -0800 Subject: [PATCH 2/2] Impove style. --- .../main/java/io/grpc/grpclb/GrpclbState.java | 104 +++++++++--------- 1 file changed, 53 insertions(+), 51 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 36d72391fe6..1c812e14500 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -787,61 +787,63 @@ private void maybeUpdatePicker() { pickList = Collections.singletonList(BUFFER_ENTRY); state = CONNECTING; } - } else { - switch (config.getMode()) { - case ROUND_ROBIN: - pickList = new ArrayList<>(backendList.size()); - Status error = null; - boolean hasPending = false; - for (BackendEntry entry : backendList) { - Subchannel subchannel = entry.subchannel; - Attributes attrs = subchannel.getAttributes(); - ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get(); - if (stateInfo.getState() == READY) { - pickList.add(entry); - } else if (stateInfo.getState() == TRANSIENT_FAILURE) { - error = stateInfo.getStatus(); - } else { - hasPending = true; - } - } - if (pickList.isEmpty()) { - if (hasPending) { - pickList.add(BUFFER_ENTRY); - state = CONNECTING; - } else { - pickList.add(new ErrorEntry(error)); - state = TRANSIENT_FAILURE; - } + maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList)); + return; + } + switch (config.getMode()) { + case ROUND_ROBIN: + pickList = new ArrayList<>(backendList.size()); + Status error = null; + boolean hasPending = false; + for (BackendEntry entry : backendList) { + Subchannel subchannel = entry.subchannel; + Attributes attrs = subchannel.getAttributes(); + ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get(); + if (stateInfo.getState() == READY) { + pickList.add(entry); + } else if (stateInfo.getState() == TRANSIENT_FAILURE) { + error = stateInfo.getStatus(); } else { - state = READY; + hasPending = true; } - break; - case PICK_FIRST: - checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList); - BackendEntry onlyEntry = backendList.get(0); - ConnectivityStateInfo stateInfo = - onlyEntry.subchannel.getAttributes().get(STATE_INFO).get(); - state = stateInfo.getState(); - switch (state) { - case READY: - pickList = Collections.singletonList(onlyEntry); - break; - case TRANSIENT_FAILURE: - pickList = - Collections.singletonList(new ErrorEntry(stateInfo.getStatus())); - break; - case CONNECTING: - pickList = Collections.singletonList(BUFFER_ENTRY); - break; - default: - pickList = Collections.singletonList( - new IdleSubchannelEntry(onlyEntry.subchannel, syncContext)); + } + if (pickList.isEmpty()) { + if (hasPending) { + pickList.add(BUFFER_ENTRY); + state = CONNECTING; + } else { + pickList.add(new ErrorEntry(error)); + state = TRANSIENT_FAILURE; } - break; - default: - throw new AssertionError("Missing case for " + config.getMode()); + } else { + state = READY; + } + break; + case PICK_FIRST: { + checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList); + BackendEntry onlyEntry = backendList.get(0); + ConnectivityStateInfo stateInfo = + onlyEntry.subchannel.getAttributes().get(STATE_INFO).get(); + state = stateInfo.getState(); + switch (state) { + case READY: + pickList = Collections.singletonList(onlyEntry); + break; + case TRANSIENT_FAILURE: + pickList = + Collections.singletonList(new ErrorEntry(stateInfo.getStatus())); + break; + case CONNECTING: + pickList = Collections.singletonList(BUFFER_ENTRY); + break; + default: + pickList = Collections.singletonList( + new IdleSubchannelEntry(onlyEntry.subchannel, syncContext)); + } + break; } + default: + throw new AssertionError("Missing case for " + config.getMode()); } maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList)); }