Skip to content

Commit

Permalink
grpclb: implement Channel State API for GRPCLB
Browse files Browse the repository at this point in the history
  • Loading branch information
dapengzhang0 committed Aug 9, 2017
1 parent 8585cd5 commit 55ff196
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 29 deletions.
21 changes: 13 additions & 8 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
Expand All @@ -29,6 +30,7 @@
import com.google.common.base.Objects;
import com.google.protobuf.util.Durations;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
Expand Down Expand Up @@ -312,7 +314,7 @@ private void handleGrpclbError(Status status) {
logger.log(Level.FINE, "[{0}] Had an error: {1}; roundRobinList={2}",
new Object[] {logId, status, roundRobinList});
if (roundRobinList.isEmpty()) {
maybeUpdatePicker(new ErrorPicker(status));
maybeUpdatePicker(TRANSIENT_FAILURE, new ErrorPicker(status));
}
}

Expand Down Expand Up @@ -545,6 +547,7 @@ private void cleanUp() {
private void maybeUpdatePicker() {
List<RoundRobinEntry> resultList = new ArrayList<RoundRobinEntry>();
Status error = null;
boolean hasIdle = false;
// TODO(zhangkun83): if roundRobinList contains at least one address, but none of them are
// ready, maybe we should always return BUFFER_PICKER, no matter if there are drop entries or
// not.
Expand All @@ -557,31 +560,33 @@ private void maybeUpdatePicker() {
resultList.add(entry);
} else if (stateInfo.getState() == TRANSIENT_FAILURE) {
error = stateInfo.getStatus();
} else if (stateInfo.getState() == IDLE) {
hasIdle = true;
}
} else {
// This is a drop entry.
resultList.add(entry);
}
}
if (resultList.isEmpty()) {
if (error != null) {
if (error != null && !hasIdle) {
logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}",
new Object[] {logId, error});
maybeUpdatePicker(new ErrorPicker(error));
maybeUpdatePicker(TRANSIENT_FAILURE, new ErrorPicker(error));
} else {
logger.log(Level.FINE, "[{0}] No ready Subchannel and no error", logId);
maybeUpdatePicker(BUFFER_PICKER);
logger.log(Level.FINE, "[{0}] No ready Subchannel and still connecting", logId);
maybeUpdatePicker(CONNECTING, BUFFER_PICKER);
}
} else {
logger.log(Level.FINE, "[{0}] Using list {1}", new Object[] {logId, resultList});
maybeUpdatePicker(new RoundRobinPicker(resultList));
maybeUpdatePicker(READY, new RoundRobinPicker(resultList));
}
}

/**
* Update the given picker to the helper if it's different from the current one.
*/
private void maybeUpdatePicker(SubchannelPicker picker) {
private void maybeUpdatePicker(ConnectivityState state, SubchannelPicker picker) {
// Discard the new picker if we are sure it won't make any difference, in order to save
// re-processing pending streams, and avoid unnecessary resetting of the pointer in
// RoundRobinPicker.
Expand All @@ -596,7 +601,7 @@ private void maybeUpdatePicker(SubchannelPicker picker) {
// No need to skip ErrorPicker. If the current picker is ErrorPicker, there won't be any pending
// stream thus no time is wasted in re-process.
currentPicker = picker;
helper.updatePicker(picker);
helper.updateBalancingState(state, picker);
}

@VisibleForTesting
Expand Down
44 changes: 23 additions & 21 deletions grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -48,6 +49,7 @@
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
Expand Down Expand Up @@ -344,7 +346,8 @@ public void loadReporting() {
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));

helperInOrder.verify(helper, atLeast(1)).updatePicker(pickerCaptor.capture());
helperInOrder.verify(helper, atLeast(1))
.updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.list).containsExactly(
new RoundRobinEntry(subchannel1, balancer.getLoadRecorder(), "token0001"),
Expand Down Expand Up @@ -468,7 +471,8 @@ public void loadReporting() {
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));

// No picker created because balancer is still using the results from the last stream
helperInOrder.verify(helper, never()).updatePicker(any(SubchannelPicker.class));
helperInOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));

// Make a new pick on that picker. It will not show up on the report of the new stream, because
// that picker is associated with the previous stream.
Expand All @@ -487,7 +491,7 @@ public void loadReporting() {
any(EquivalentAddressGroup.class), any(Attributes.class));
// But the new RoundRobinEntries have a new loadRecorder, thus considered different from
// the previous list, thus a new picker is created
helperInOrder.verify(helper).updatePicker(pickerCaptor.capture());
helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
picker = (RoundRobinPicker) pickerCaptor.getValue();

PickResult pick1p = picker.pickSubchannel(args);
Expand Down Expand Up @@ -611,7 +615,7 @@ public void acquireAndReleaseScheduledExecutor() {
public void nameResolutionFailsThenRecoverToDelegate() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(helper).updatePicker(pickerCaptor.capture());
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
assertSame(error, errorPicker.result.getStatus());

Expand All @@ -631,7 +635,7 @@ public void nameResolutionFailsThenRecoverToDelegate() {
public void nameResolutionFailsThenRecoverToGrpclb() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(helper).updatePicker(pickerCaptor.capture());
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
assertSame(error, errorPicker.result.getStatus());

Expand Down Expand Up @@ -668,7 +672,8 @@ public void delegatingPickFirstThenNameResolutionFails() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(pickFirstBalancer).handleNameResolutionError(error);
verify(helper, never()).updatePicker(any(SubchannelPicker.class));
verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
verifyNoMoreInteractions(roundRobinBalancerFactory);
verifyNoMoreInteractions(roundRobinBalancer);
}
Expand Down Expand Up @@ -716,7 +721,7 @@ public void grpclbThenNameResolutionFails() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);

inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
assertSame(error, errorPicker.result.getStatus());
assertFalse(oobChannel.isShutdown());
Expand Down Expand Up @@ -898,7 +903,8 @@ public void grpclbWorking() {
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never()).updatePicker(any(SubchannelPicker.class));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));

Expand All @@ -920,14 +926,14 @@ public void grpclbWorking() {

// Let subchannels be connected
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();

assertThat(picker1.list).containsExactly(
new RoundRobinEntry(subchannel2, balancer.getLoadRecorder(), "token0002"));

deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker2.list).containsExactly(
new RoundRobinEntry(subchannel1, balancer.getLoadRecorder(), "token0001"),
Expand All @@ -938,7 +944,7 @@ public void grpclbWorking() {
verify(subchannel1).requestConnection();
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
verify(subchannel1, times(2)).requestConnection();
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker3.list).containsExactly(
new RoundRobinEntry(subchannel2, balancer.getLoadRecorder(), "token0002"));
Expand All @@ -951,15 +957,11 @@ public void grpclbWorking() {
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1));
inOrder.verifyNoMoreInteractions();

// If no subchannel is READY, will propagate an error from an arbitrary subchannel (but here
// only subchannel1 has error).
// If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING
verify(subchannel2).requestConnection();
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE));
verify(subchannel2, times(2)).requestConnection();
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
ErrorPicker picker6 = (ErrorPicker) pickerCaptor.getValue();
assertNull(picker6.result.getSubchannel());
assertSame(error1, picker6.result.getStatus());
inOrder.verify(helper).updateBalancingState(CONNECTING, GrpclbLoadBalancer.BUFFER_PICKER);

// Update backends, with a drop entry
List<ServerEntry> backends2 =
Expand All @@ -983,7 +985,7 @@ public void grpclbWorking() {
Subchannel subchannel3 = mockSubchannels.poll();
verify(subchannel3).requestConnection();
assertEquals(new EquivalentAddressGroup(backends2.get(0).addr), subchannel3.getAddresses());
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker7.list).containsExactly(
new RoundRobinEntry(DropType.RATE_LIMITING, balancer.getLoadRecorder()),
Expand All @@ -997,7 +999,7 @@ public void grpclbWorking() {
inOrder.verifyNoMoreInteractions();

deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue();
// subchannel2 is still IDLE, thus not in the active list
assertThat(picker8.list).containsExactly(
Expand All @@ -1007,7 +1009,7 @@ public void grpclbWorking() {
new RoundRobinEntry(DropType.LOAD_BALANCING, balancer.getLoadRecorder())).inOrder();
// subchannel2 becomes READY and makes it into the list
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker9.list).containsExactly(
new RoundRobinEntry(subchannel3, balancer.getLoadRecorder(), "token0003"),
Expand All @@ -1021,7 +1023,7 @@ public void grpclbWorking() {
lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
verify(subchannel2).shutdown();
verify(subchannel3).shutdown();
inOrder.verify(helper).updatePicker((GrpclbLoadBalancer.BUFFER_PICKER));
inOrder.verify(helper).updateBalancingState(CONNECTING, GrpclbLoadBalancer.BUFFER_PICKER);

assertFalse(oobChannel.isShutdown());
assertEquals(0, lbRequestObservers.size());
Expand Down

0 comments on commit 55ff196

Please sign in to comment.