From 8b081ddba3aafa873245761ef3c3c90389e45115 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Thu, 20 Feb 2020 10:43:17 -0800 Subject: [PATCH 1/2] grpclb handles empty address from LB --- .../main/java/io/grpc/grpclb/GrpclbState.java | 28 ++++- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 118 ++++++++++++++++++ 2 files changed, 141 insertions(+), 5 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index e767aac1551..0fd92808659 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -138,6 +138,7 @@ static enum Mode { @Nullable private ManagedChannel lbCommChannel; + private boolean lbSentEmptyBackends = false; @Nullable private LbStream lbStream; @@ -423,6 +424,17 @@ private void useRoundRobinLists( subchannels = Collections.unmodifiableMap(newSubchannelMap); break; case PICK_FIRST: + checkState(subchannels.size() <= 1, "Unexpected Subchannel count: %s", subchannels); + Subchannel subchannel; + if (newBackendAddrList.isEmpty()) { + if (subchannels.size() == 1) { + cancelFallbackTimer(); + subchannel = subchannels.values().iterator().next(); + subchannel.shutdown(); + subchannels = Collections.emptyMap(); + } + break; + } List eagList = new ArrayList<>(); // Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to // attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on @@ -438,13 +450,12 @@ private void useRoundRobinLists( } eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs)); } - Subchannel subchannel; if (subchannels.isEmpty()) { // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to // the new createSubchannel(). + // why this is not calling the doAnswer? subchannel = helper.createSubchannel(eagList, createSubchannelAttrs()); } else { - checkState(subchannels.size() == 1, "Unexpected Subchannel count: %s", subchannels); subchannel = subchannels.values().iterator().next(); subchannel.updateAddresses(eagList); } @@ -629,6 +640,7 @@ private void handleResponse(LoadBalanceResponse response) { } // Stop using fallback backends as soon as a new server list is received from the balancer. usingFallbackBackends = false; + lbSentEmptyBackends = serverList.getServersList().isEmpty(); cancelFallbackTimer(); useRoundRobinLists(newDropList, newBackendAddrList, loadRecorder); maybeUpdatePicker(); @@ -729,9 +741,15 @@ private void maybeUpdatePicker() { break; case PICK_FIRST: if (backendList.isEmpty()) { - pickList = Collections.singletonList(BUFFER_ENTRY); - // Have not received server addresses - state = CONNECTING; + if (lbSentEmptyBackends) { + pickList = + Collections.singletonList(new ErrorEntry(Status.UNAVAILABLE)); + state = TRANSIENT_FAILURE; + } else { + pickList = Collections.singletonList(BUFFER_ENTRY); + // Have not received server addresses + state = CONNECTING; + } } else { checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList); BackendEntry onlyEntry = backendList.get(0); diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 6d9abc70c43..53b9d0093ed 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1849,6 +1849,124 @@ public void grpclbWorking_pickFirstMode() throws Exception { .returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class)); } + @SuppressWarnings({"unchecked", "deprecation"}) + @Test + public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { + InOrder inOrder = inOrder(helper); + + List grpclbBalancerList = createResolvedBalancerAddresses(1); + deliverResolvedAddresses( + Collections.emptyList(), + grpclbBalancerList, + Attributes.EMPTY, + GrpclbConfig.create(Mode.PICK_FIRST)); + + assertEquals(1, fakeOobChannels.size()); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + + // Simulate receiving LB response + List backends1 = Arrays.asList( + new ServerEntry("127.0.0.1", 2000, "token0001"), + new ServerEntry("127.0.0.1", 2010, "token0002")); + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + lbResponseObserver.onNext(buildInitialResponse()); + lbResponseObserver.onNext(buildLbResponse(backends1)); + + // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to + // the new createSubchannel(). + inOrder.verify(helper).createSubchannel( + eq(Arrays.asList( + new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), + new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))), + any(Attributes.class)); + + // Initially IDLE + inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); + + // Only one subchannel is created + assertThat(mockSubchannels).hasSize(1); + Subchannel subchannel = mockSubchannels.poll(); + assertThat(picker0.dropList).containsExactly(null, null); + assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); + + // PICK_FIRST doesn't eagerly connect + verify(subchannel, never()).requestConnection(); + + // CONNECTING + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); + + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker1.dropList).containsExactly(null, null); + assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY); + + // TRANSIENT_FAILURE + Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); + deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker2.dropList).containsExactly(null, null); + assertThat(picker2.pickList).containsExactly(new ErrorEntry(error)); + + // READY + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker3.dropList).containsExactly(null, null); + assertThat(picker3.pickList).containsExactly( + new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); + + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + + // Empty addresses from LB + lbResponseObserver.onNext(buildLbResponse(Collections.emptyList())); + + // new addresses will be updated to the existing subchannel + // createSubchannel() has ever been called only once + inOrder.verify(helper, never()).createSubchannel(any(List.class), any(Attributes.class)); + assertThat(mockSubchannels).isEmpty(); + verify(subchannel).shutdown(); + + inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(errorPicker.pickList).containsExactly(new ErrorEntry(Status.UNAVAILABLE)); + + lbResponseObserver.onNext(buildLbResponse(Collections.emptyList())); + + // Test recover from new LB response with addresses + // New server list with drops + List backends2 = Arrays.asList( + new ServerEntry("127.0.0.1", 2000, "token0001"), + new ServerEntry("token0003"), // drop + new ServerEntry("127.0.0.1", 2020, "token0004")); + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + lbResponseObserver.onNext(buildLbResponse(backends2)); + + // new addresses will be updated to the existing subchannel + inOrder.verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class)); + inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + subchannel = mockSubchannels.poll(); + + // Subchannel became READY + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker4.pickList).containsExactly( + new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); + } + @Test public void shutdownWithoutSubchannel_roundRobin() throws Exception { subtestShutdownWithoutSubchannel("round_robin"); From c70467fb15a895c73360472563913777e6bb5954 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Thu, 20 Feb 2020 11:58:34 -0800 Subject: [PATCH 2/2] remove debug comment for myself...=p --- grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java | 1 - 1 file changed, 1 deletion(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 0fd92808659..d940fc8832c 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -453,7 +453,6 @@ private void useRoundRobinLists( if (subchannels.isEmpty()) { // TODO(zhangkun83): remove the deprecation suppression on this method once migrated to // the new createSubchannel(). - // why this is not calling the doAnswer? subchannel = helper.createSubchannel(eagList, createSubchannelAttrs()); } else { subchannel = subchannels.values().iterator().next();