Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ static enum Mode {

@Nullable
private ManagedChannel lbCommChannel;
private boolean lbSentEmptyBackends = false;

@Nullable
private LbStream lbStream;
Expand Down Expand Up @@ -423,6 +424,17 @@ private void useRoundRobinLists(
subchannels = Collections.unmodifiableMap(newSubchannelMap);
break;
case PICK_FIRST:
checkState(subchannels.size() <= 1, "Unexpected Subchannel count: %s", subchannels);
Subchannel subchannel;
if (newBackendAddrList.isEmpty()) {
if (subchannels.size() == 1) {
cancelFallbackTimer();
subchannel = subchannels.values().iterator().next();
subchannel.shutdown();
subchannels = Collections.emptyMap();
}
break;
}
List<EquivalentAddressGroup> eagList = new ArrayList<>();
// Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to
// attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on
Expand All @@ -438,13 +450,11 @@ private void useRoundRobinLists(
}
eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs));
}
Subchannel subchannel;
if (subchannels.isEmpty()) {
// TODO(zhangkun83): remove the deprecation suppression on this method once migrated to
// the new createSubchannel().
subchannel = helper.createSubchannel(eagList, createSubchannelAttrs());
} else {
checkState(subchannels.size() == 1, "Unexpected Subchannel count: %s", subchannels);
subchannel = subchannels.values().iterator().next();
subchannel.updateAddresses(eagList);
}
Expand Down Expand Up @@ -629,6 +639,7 @@ private void handleResponse(LoadBalanceResponse response) {
}
// Stop using fallback backends as soon as a new server list is received from the balancer.
usingFallbackBackends = false;
lbSentEmptyBackends = serverList.getServersList().isEmpty();
cancelFallbackTimer();
useRoundRobinLists(newDropList, newBackendAddrList, loadRecorder);
maybeUpdatePicker();
Expand Down Expand Up @@ -729,9 +740,15 @@ private void maybeUpdatePicker() {
break;
case PICK_FIRST:
if (backendList.isEmpty()) {
pickList = Collections.singletonList(BUFFER_ENTRY);
// Have not received server addresses
state = CONNECTING;
if (lbSentEmptyBackends) {
pickList =
Collections.<RoundRobinEntry>singletonList(new ErrorEntry(Status.UNAVAILABLE));
state = TRANSIENT_FAILURE;
} else {
pickList = Collections.singletonList(BUFFER_ENTRY);
// Have not received server addresses
state = CONNECTING;
}
} else {
checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList);
BackendEntry onlyEntry = backendList.get(0);
Expand Down
118 changes: 118 additions & 0 deletions grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1849,6 +1849,124 @@ public void grpclbWorking_pickFirstMode() throws Exception {
.returnSubchannel(any(Subchannel.class), any(ConnectivityStateInfo.class));
}

@SuppressWarnings({"unchecked", "deprecation"})
@Test
public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception {
InOrder inOrder = inOrder(helper);

List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
Attributes.EMPTY,
GrpclbConfig.create(Mode.PICK_FIRST));

assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));

// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));

// TODO(zhangkun83): remove the deprecation suppression on this method once migrated to
// the new createSubchannel().
inOrder.verify(helper).createSubchannel(
eq(Arrays.asList(
new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")),
new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002")))),
any(Attributes.class));

// Initially IDLE
inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue();

// Only one subchannel is created
assertThat(mockSubchannels).hasSize(1);
Subchannel subchannel = mockSubchannels.poll();
assertThat(picker0.dropList).containsExactly(null, null);
assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext));

// PICK_FIRST doesn't eagerly connect
verify(subchannel, never()).requestConnection();

// CONNECTING
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));

inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker1.dropList).containsExactly(null, null);
assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY);

// TRANSIENT_FAILURE
Status error = Status.UNAVAILABLE.withDescription("Simulated connection error");
deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker2.dropList).containsExactly(null, null);
assertThat(picker2.pickList).containsExactly(new ErrorEntry(error));

// READY
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker3.dropList).containsExactly(null, null);
assertThat(picker3.pickList).containsExactly(
new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));

inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));

// Empty addresses from LB
lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));

// new addresses will be updated to the existing subchannel
// createSubchannel() has ever been called only once
inOrder.verify(helper, never()).createSubchannel(any(List.class), any(Attributes.class));
assertThat(mockSubchannels).isEmpty();
verify(subchannel).shutdown();

inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(errorPicker.pickList).containsExactly(new ErrorEntry(Status.UNAVAILABLE));

lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));

// Test recover from new LB response with addresses
// New server list with drops
List<ServerEntry> backends2 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("token0003"), // drop
new ServerEntry("127.0.0.1", 2020, "token0004"));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildLbResponse(backends2));

// new addresses will be updated to the existing subchannel
inOrder.verify(helper, times(1)).createSubchannel(any(List.class), any(Attributes.class));
inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
subchannel = mockSubchannels.poll();

// Subchannel became READY
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING));
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker4.pickList).containsExactly(
new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder())));
}

@Test
public void shutdownWithoutSubchannel_roundRobin() throws Exception {
subtestShutdownWithoutSubchannel("round_robin");
Expand Down