Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpclb: implement Channel State API for GRPCLB #3327

Merged
merged 1 commit into from
Aug 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions grpclb/src/main/java/io/grpc/grpclb/GrpclbLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
Expand All @@ -29,6 +30,7 @@
import com.google.common.base.Objects;
import com.google.protobuf.util.Durations;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
Expand Down Expand Up @@ -312,7 +314,7 @@ private void handleGrpclbError(Status status) {
logger.log(Level.FINE, "[{0}] Had an error: {1}; roundRobinList={2}",
new Object[] {logId, status, roundRobinList});
if (roundRobinList.isEmpty()) {
maybeUpdatePicker(new ErrorPicker(status));
maybeUpdatePicker(TRANSIENT_FAILURE, new ErrorPicker(status));
}
}

Expand Down Expand Up @@ -545,6 +547,7 @@ private void cleanUp() {
private void maybeUpdatePicker() {
List<RoundRobinEntry> resultList = new ArrayList<RoundRobinEntry>();
Status error = null;
boolean hasIdle = false;
// TODO(zhangkun83): if roundRobinList contains at least one address, but none of them are
// ready, maybe we should always return BUFFER_PICKER, no matter if there are drop entries or
// not.
Expand All @@ -557,31 +560,33 @@ private void maybeUpdatePicker() {
resultList.add(entry);
} else if (stateInfo.getState() == TRANSIENT_FAILURE) {
error = stateInfo.getStatus();
} else if (stateInfo.getState() == IDLE) {
hasIdle = true;
}
} else {
// This is a drop entry.
resultList.add(entry);
}
}
if (resultList.isEmpty()) {
if (error != null) {
if (error != null && !hasIdle) {
logger.log(Level.FINE, "[{0}] No ready Subchannel. Using error: {1}",
new Object[] {logId, error});
maybeUpdatePicker(new ErrorPicker(error));
maybeUpdatePicker(TRANSIENT_FAILURE, new ErrorPicker(error));
} else {
logger.log(Level.FINE, "[{0}] No ready Subchannel and no error", logId);
maybeUpdatePicker(BUFFER_PICKER);
logger.log(Level.FINE, "[{0}] No ready Subchannel and still connecting", logId);
maybeUpdatePicker(CONNECTING, BUFFER_PICKER);
}
} else {
logger.log(Level.FINE, "[{0}] Using list {1}", new Object[] {logId, resultList});
maybeUpdatePicker(new RoundRobinPicker(resultList));
maybeUpdatePicker(READY, new RoundRobinPicker(resultList));
}
}

/**
* Update the given picker to the helper if it's different from the current one.
*/
private void maybeUpdatePicker(SubchannelPicker picker) {
private void maybeUpdatePicker(ConnectivityState state, SubchannelPicker picker) {
// Discard the new picker if we are sure it won't make any difference, in order to save
// re-processing pending streams, and avoid unnecessary resetting of the pointer in
// RoundRobinPicker.
Expand All @@ -596,7 +601,7 @@ private void maybeUpdatePicker(SubchannelPicker picker) {
// No need to skip ErrorPicker. If the current picker is ErrorPicker, there won't be any pending
// stream thus no time is wasted in re-process.
currentPicker = picker;
helper.updatePicker(picker);
helper.updateBalancingState(state, picker);
}

@VisibleForTesting
Expand Down
44 changes: 23 additions & 21 deletions grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -48,6 +49,7 @@
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
Expand Down Expand Up @@ -344,7 +346,8 @@ public void loadReporting() {
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));

helperInOrder.verify(helper, atLeast(1)).updatePicker(pickerCaptor.capture());
helperInOrder.verify(helper, atLeast(1))
.updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.list).containsExactly(
new RoundRobinEntry(subchannel1, balancer.getLoadRecorder(), "token0001"),
Expand Down Expand Up @@ -468,7 +471,8 @@ public void loadReporting() {
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));

// No picker created because balancer is still using the results from the last stream
helperInOrder.verify(helper, never()).updatePicker(any(SubchannelPicker.class));
helperInOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));

// Make a new pick on that picker. It will not show up on the report of the new stream, because
// that picker is associated with the previous stream.
Expand All @@ -487,7 +491,7 @@ public void loadReporting() {
any(EquivalentAddressGroup.class), any(Attributes.class));
// But the new RoundRobinEntries have a new loadRecorder, thus considered different from
// the previous list, thus a new picker is created
helperInOrder.verify(helper).updatePicker(pickerCaptor.capture());
helperInOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
picker = (RoundRobinPicker) pickerCaptor.getValue();

PickResult pick1p = picker.pickSubchannel(args);
Expand Down Expand Up @@ -611,7 +615,7 @@ public void acquireAndReleaseScheduledExecutor() {
public void nameResolutionFailsThenRecoverToDelegate() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(helper).updatePicker(pickerCaptor.capture());
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
assertSame(error, errorPicker.result.getStatus());

Expand All @@ -631,7 +635,7 @@ public void nameResolutionFailsThenRecoverToDelegate() {
public void nameResolutionFailsThenRecoverToGrpclb() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(helper).updatePicker(pickerCaptor.capture());
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
assertSame(error, errorPicker.result.getStatus());

Expand Down Expand Up @@ -668,7 +672,8 @@ public void delegatingPickFirstThenNameResolutionFails() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(pickFirstBalancer).handleNameResolutionError(error);
verify(helper, never()).updatePicker(any(SubchannelPicker.class));
verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
verifyNoMoreInteractions(roundRobinBalancerFactory);
verifyNoMoreInteractions(roundRobinBalancer);
}
Expand Down Expand Up @@ -716,7 +721,7 @@ public void grpclbThenNameResolutionFails() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);

inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
ErrorPicker errorPicker = (ErrorPicker) pickerCaptor.getValue();
assertSame(error, errorPicker.result.getStatus());
assertFalse(oobChannel.isShutdown());
Expand Down Expand Up @@ -898,7 +903,8 @@ public void grpclbWorking() {
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
new ServerEntry("127.0.0.1", 2010, "token0002"));
inOrder.verify(helper, never()).updatePicker(any(SubchannelPicker.class));
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends1));

Expand All @@ -920,14 +926,14 @@ public void grpclbWorking() {

// Let subchannels be connected
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue();

assertThat(picker1.list).containsExactly(
new RoundRobinEntry(subchannel2, balancer.getLoadRecorder(), "token0002"));

deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker2.list).containsExactly(
new RoundRobinEntry(subchannel1, balancer.getLoadRecorder(), "token0001"),
Expand All @@ -938,7 +944,7 @@ public void grpclbWorking() {
verify(subchannel1).requestConnection();
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(IDLE));
verify(subchannel1, times(2)).requestConnection();
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker3.list).containsExactly(
new RoundRobinEntry(subchannel2, balancer.getLoadRecorder(), "token0002"));
Expand All @@ -951,15 +957,11 @@ public void grpclbWorking() {
deliverSubchannelState(subchannel1, ConnectivityStateInfo.forTransientFailure(error1));
inOrder.verifyNoMoreInteractions();

// If no subchannel is READY, will propagate an error from an arbitrary subchannel (but here
// only subchannel1 has error).
// If no subchannel is READY, some with error and the others are IDLE, will report CONNECTING
verify(subchannel2).requestConnection();
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(IDLE));
verify(subchannel2, times(2)).requestConnection();
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
ErrorPicker picker6 = (ErrorPicker) pickerCaptor.getValue();
assertNull(picker6.result.getSubchannel());
assertSame(error1, picker6.result.getStatus());
inOrder.verify(helper).updateBalancingState(CONNECTING, GrpclbLoadBalancer.BUFFER_PICKER);

// Update backends, with a drop entry
List<ServerEntry> backends2 =
Expand All @@ -983,7 +985,7 @@ public void grpclbWorking() {
Subchannel subchannel3 = mockSubchannels.poll();
verify(subchannel3).requestConnection();
assertEquals(new EquivalentAddressGroup(backends2.get(0).addr), subchannel3.getAddresses());
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker7 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker7.list).containsExactly(
new RoundRobinEntry(DropType.RATE_LIMITING, balancer.getLoadRecorder()),
Expand All @@ -997,7 +999,7 @@ public void grpclbWorking() {
inOrder.verifyNoMoreInteractions();

deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker8 = (RoundRobinPicker) pickerCaptor.getValue();
// subchannel2 is still IDLE, thus not in the active list
assertThat(picker8.list).containsExactly(
Expand All @@ -1007,7 +1009,7 @@ public void grpclbWorking() {
new RoundRobinEntry(DropType.LOAD_BALANCING, balancer.getLoadRecorder())).inOrder();
// subchannel2 becomes READY and makes it into the list
deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(helper).updatePicker(pickerCaptor.capture());
inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture());
RoundRobinPicker picker9 = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker9.list).containsExactly(
new RoundRobinEntry(subchannel3, balancer.getLoadRecorder(), "token0003"),
Expand All @@ -1021,7 +1023,7 @@ public void grpclbWorking() {
lbResponseObserver.onNext(buildLbResponse(Collections.<ServerEntry>emptyList()));
verify(subchannel2).shutdown();
verify(subchannel3).shutdown();
inOrder.verify(helper).updatePicker((GrpclbLoadBalancer.BUFFER_PICKER));
inOrder.verify(helper).updateBalancingState(CONNECTING, GrpclbLoadBalancer.BUFFER_PICKER);

assertFalse(oobChannel.isShutdown());
assertEquals(0, lbRequestObservers.size());
Expand Down