Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions core/src/main/java/io/grpc/PickFirstBalancerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.grpc;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.LoadBalancer.PickResult;
Expand Down Expand Up @@ -70,7 +72,11 @@ public void handleResolvedAddressGroups(
EquivalentAddressGroup newEag = flattenEquivalentAddressGroup(servers);
if (subchannel == null) {
subchannel = helper.createSubchannel(newEag, Attributes.EMPTY);
helper.updatePicker(new Picker(PickResult.withSubchannel(subchannel)));

// The channel state does not get updated when doing name resolving today, so for the moment
// let LB report CONNECTION and call subchannel.requestConnection() immediately.
helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel)));
subchannel.requestConnection();
} else {
helper.updateSubchannelAddresses(subchannel, newEag);
}
Expand All @@ -84,7 +90,7 @@ public void handleNameResolutionError(Status error) {
}
// NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine
// for time being.
helper.updatePicker(new Picker(PickResult.withError(error)));
helper.updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
}

@Override
Expand All @@ -110,7 +116,7 @@ public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo s
throw new IllegalArgumentException("Unsupported state:" + currentState);
}

helper.updatePicker(new Picker(pickResult));
helper.updateBalancingState(currentState, new Picker(pickResult));
}

@Override
Expand Down
30 changes: 19 additions & 11 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI

// null when channel is in idle mode. Must be assigned from channelExecutor.
@Nullable
private LoadBalancer loadBalancer;
private LbHelperImpl lbHelper;

// Must be assigned from channelExecutor. null if channel is in idle mode.
@Nullable
Expand Down Expand Up @@ -183,9 +183,9 @@ public void transportInUse(final boolean inUse) {
public void transportTerminated() {
checkState(shutdown.get(), "Channel must have been shut down");
terminating = true;
if (loadBalancer != null) {
loadBalancer.shutdown();
loadBalancer = null;
if (lbHelper != null) {
lbHelper.lb.shutdown();
lbHelper = null;
}
if (nameResolver != null) {
nameResolver.shutdown();
Expand Down Expand Up @@ -247,9 +247,12 @@ public void run() {
// did not cancel idleModeTimer, both of which are bugs.
nameResolver.shutdown();
nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
loadBalancer.shutdown();
loadBalancer = null;
lbHelper.lb.shutdown();
lbHelper = null;
subchannelPicker = null;
if (!channelStateManager.isDisabled()) {
channelStateManager.gotoState(IDLE);
}
}
}

Expand Down Expand Up @@ -279,15 +282,14 @@ void exitIdleMode() {
// isInUse() == false, in which case we still need to schedule the timer.
rescheduleIdleTimer();
}
if (loadBalancer != null) {
if (lbHelper != null) {
return;
}
log.log(Level.FINE, "[{0}] Exiting idle mode", getLogId());
LbHelperImpl helper = new LbHelperImpl(nameResolver);
helper.lb = loadBalancerFactory.newLoadBalancer(helper);
this.loadBalancer = helper.lb;
lbHelper = new LbHelperImpl(nameResolver);
lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);

NameResolverListenerImpl listener = new NameResolverListenerImpl(helper);
NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper);
try {
nameResolver.start(listener);
} catch (Throwable t) {
Expand Down Expand Up @@ -679,6 +681,9 @@ public void updateBalancingState(
new Runnable() {
@Override
public void run() {
if (LbHelperImpl.this != lbHelper) {
return;
}
subchannelPicker = newPicker;
delayedTransport.reprocess(newPicker);
// It's not appropriate to report SHUTDOWN state from lb.
Expand Down Expand Up @@ -767,6 +772,9 @@ public void updatePicker(final SubchannelPicker picker) {
runSerialized(new Runnable() {
@Override
public void run() {
if (LbHelperImpl.this != lbHelper) {
return;
}
subchannelPicker = picker;
delayedTransport.reprocess(picker);
channelStateManager.disable();
Expand Down
54 changes: 30 additions & 24 deletions core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package io.grpc;

import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
Expand Down Expand Up @@ -98,7 +102,8 @@ public void pickAfterResolved() throws Exception {
loadBalancer.handleResolvedAddressGroups(servers, affinity);

verify(mockHelper).createSubchannel(eagCaptor.capture(), attrsCaptor.capture());
verify(mockHelper).updatePicker(pickerCaptor.capture());
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection();

assertEquals(new EquivalentAddressGroup(socketAddresses), eagCaptor.getValue());
assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs),
Expand All @@ -110,11 +115,13 @@ public void pickAfterResolved() throws Exception {
@Test
public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddressGroups(servers, affinity);
verify(mockSubchannel).requestConnection();
loadBalancer.handleResolvedAddressGroups(servers, affinity);
verifyNoMoreInteractions(mockSubchannel);

verify(mockHelper).createSubchannel(any(EquivalentAddressGroup.class),
any(Attributes.class));
verify(mockHelper).updatePicker(isA(Picker.class));
verify(mockHelper).updateBalancingState(isA(ConnectivityState.class), isA(Picker.class));
// Updating the subchannel addresses is unnecessary, but doesn't hurt anything
verify(mockHelper).updateSubchannelAddresses(
eq(mockSubchannel), any(EquivalentAddressGroup.class));
Expand All @@ -133,31 +140,30 @@ public void pickAfterResolvedAndChanged() throws Exception {

loadBalancer.handleResolvedAddressGroups(servers, affinity);
inOrder.verify(mockHelper).createSubchannel(eagCaptor.capture(), any(Attributes.class));
inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection();
assertEquals(socketAddresses, eagCaptor.getValue().getAddresses());
assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());

loadBalancer.handleResolvedAddressGroups(newServers, affinity);
inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eagCaptor.capture());
assertEquals(newSocketAddresses, eagCaptor.getValue().getAddresses());

verify(mockSubchannel, never()).shutdown();

verifyNoMoreInteractions(mockSubchannel);
verifyNoMoreInteractions(mockHelper);
}

@Test
public void stateChangeBeforeResolution() throws Exception {
loadBalancer.handleSubchannelState(mockSubchannel,
ConnectivityStateInfo.forNonError(ConnectivityState.READY));
loadBalancer.handleSubchannelState(mockSubchannel, ConnectivityStateInfo.forNonError(READY));

verifyNoMoreInteractions(mockHelper);
}

@Test
public void pickAfterStateChangeAfterResolution() throws Exception {
loadBalancer.handleResolvedAddressGroups(servers, affinity);
verify(mockHelper).updatePicker(pickerCaptor.capture());
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel();
reset(mockHelper);

Expand All @@ -166,19 +172,16 @@ public void pickAfterStateChangeAfterResolution() throws Exception {
Status error = Status.UNAVAILABLE.withDescription("boom!");
loadBalancer.handleSubchannelState(subchannel,
ConnectivityStateInfo.forTransientFailure(error));
inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

loadBalancer.handleSubchannelState(subchannel,
ConnectivityStateInfo.forNonError(ConnectivityState.IDLE));
inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());

loadBalancer.handleSubchannelState(subchannel,
ConnectivityStateInfo.forNonError(ConnectivityState.READY));
inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
assertEquals(subchannel,
pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());

verifyNoMoreInteractions(mockHelper);
}
Expand All @@ -187,10 +190,11 @@ public void pickAfterStateChangeAfterResolution() throws Exception {
public void nameResolutionError() throws Exception {
Status error = Status.NOT_FOUND.withDescription("nameResolutionError");
loadBalancer.handleNameResolutionError(error);
verify(mockHelper).updatePicker(pickerCaptor.capture());
verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(null, pickResult.getSubchannel());
assertEquals(error, pickResult.getStatus());
verify(mockSubchannel, never()).requestConnection();
verifyNoMoreInteractions(mockHelper);
}

Expand All @@ -199,12 +203,15 @@ public void nameResolutionSuccessAfterError() throws Exception {
InOrder inOrder = inOrder(mockHelper);

loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
inOrder.verify(mockHelper).updatePicker(any(Picker.class));
inOrder.verify(mockHelper)
.updateBalancingState(any(ConnectivityState.class), any(Picker.class));
verify(mockSubchannel, never()).requestConnection();

loadBalancer.handleResolvedAddressGroups(servers, affinity);
inOrder.verify(mockHelper).createSubchannel(eq(new EquivalentAddressGroup(socketAddresses)),
eq(Attributes.EMPTY));
inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection();

assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs)
.getSubchannel());
Expand All @@ -223,17 +230,16 @@ public void nameResolutionErrorWithStateChanges() throws Exception {
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
Status error = Status.NOT_FOUND.withDescription("nameResolutionError");
loadBalancer.handleNameResolutionError(error);
inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());

PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(null, pickResult.getSubchannel());
assertEquals(error, pickResult.getStatus());

loadBalancer.handleSubchannelState(mockSubchannel,
ConnectivityStateInfo.forNonError(ConnectivityState.READY));
loadBalancer.handleSubchannelState(mockSubchannel, ConnectivityStateInfo.forNonError(READY));
Status error2 = Status.NOT_FOUND.withDescription("nameResolutionError2");
loadBalancer.handleNameResolutionError(error2);
inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture());
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());

pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(null, pickResult.getSubchannel());
Expand Down
54 changes: 49 additions & 5 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,14 @@ public class ManagedChannelImplTest {

private void createChannel(
NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors) {
createChannel(nameResolverFactory, interceptors, true /* requestConnection */);
createChannel(
nameResolverFactory, interceptors, true /* requestConnection */,
ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE);
}

private void createChannel(
NameResolver.Factory nameResolverFactory, List<ClientInterceptor> interceptors,
boolean requestConnection) {
boolean requestConnection, long idleTimeoutMillis) {
class Builder extends AbstractManagedChannelImplBuilder<Builder> {
Builder(String target) {
super(target);
Expand All @@ -197,15 +199,17 @@ class Builder extends AbstractManagedChannelImplBuilder<Builder> {
.loadBalancerFactory(mockLoadBalancerFactory)
.userAgent(userAgent);
builder.executorPool = executorPool;
builder.idleTimeoutMillis = ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE;
builder.idleTimeoutMillis = idleTimeoutMillis;
channel = new ManagedChannelImpl(
builder, mockTransportFactory, new FakeBackoffPolicyProvider(),
oobExecutorPool, timer.getStopwatchSupplier(), interceptors);

if (requestConnection) {
// Force-exit the initial idle-mode
channel.exitIdleMode();
assertEquals(0, timer.numPendingTasks());
assertEquals(
idleTimeoutMillis == ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE ? 0 : 1,
timer.numPendingTasks());

ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
Expand Down Expand Up @@ -1165,7 +1169,8 @@ public void getState_loadBalancerSupportsChannelState() {
@Test
public void getState_withRequestConnect() {
createChannel(
new FakeNameResolverFactory(false), NO_INTERCEPTOR, false /* requestConnection */);
new FakeNameResolverFactory(false), NO_INTERCEPTOR, false /* requestConnection */,
ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE);

assertEquals(ConnectivityState.IDLE, channel.getState(false));
verify(mockLoadBalancerFactory, never()).newLoadBalancer(any(Helper.class));
Expand Down Expand Up @@ -1271,6 +1276,45 @@ public void run() {
assertFalse(stateChanged.get());
}

@Test
public void stateIsIdleOnIdleTimeout() {
long idleTimeoutMillis = 2000L;
createChannel(
new FakeNameResolverFactory(true), NO_INTERCEPTOR, true /* request connection*/,
idleTimeoutMillis);
assertEquals(ConnectivityState.IDLE, channel.getState(false));

helper.updateBalancingState(CONNECTING, mockPicker);
assertEquals(CONNECTING, channel.getState(false));

timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
assertEquals(ConnectivityState.IDLE, channel.getState(false));
}

@Test
public void idleTimeoutAndReconnect() {
long idleTimeoutMillis = 2000L;
createChannel(
new FakeNameResolverFactory(true), NO_INTERCEPTOR, true /* request connection*/,
idleTimeoutMillis);

timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis));
assertEquals(ConnectivityState.IDLE, channel.getState(true /* request connection */));

ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(Helper.class);
// Two times of requesting connection will create loadBalancer twice.
verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture());
Helper helper2 = helperCaptor.getValue();

// Updating on the old helper (whose balancer has been shutdown) does not change the channel
// state.
helper.updateBalancingState(CONNECTING, mockPicker);
assertEquals(ConnectivityState.IDLE, channel.getState(false));

helper2.updateBalancingState(CONNECTING, mockPicker);
assertEquals(ConnectivityState.CONNECTING, channel.getState(false));
}

// TODO(zdapeng): replace usages of updatePicker() in some other tests once it's deprecated
@Test
public void updateBalancingStateDoesUpdatePicker() {
Expand Down