diff --git a/core/src/main/java/io/grpc/PickFirstBalancerFactory.java b/core/src/main/java/io/grpc/PickFirstBalancerFactory.java index c003b47ed6e..a1177ed3372 100644 --- a/core/src/main/java/io/grpc/PickFirstBalancerFactory.java +++ b/core/src/main/java/io/grpc/PickFirstBalancerFactory.java @@ -17,7 +17,9 @@ package io.grpc; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.SHUTDOWN; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import com.google.common.annotations.VisibleForTesting; import io.grpc.LoadBalancer.PickResult; @@ -70,7 +72,11 @@ public void handleResolvedAddressGroups( EquivalentAddressGroup newEag = flattenEquivalentAddressGroup(servers); if (subchannel == null) { subchannel = helper.createSubchannel(newEag, Attributes.EMPTY); - helper.updatePicker(new Picker(PickResult.withSubchannel(subchannel))); + + // The channel state does not get updated when doing name resolving today, so for the moment + // let LB report CONNECTION and call subchannel.requestConnection() immediately. + helper.updateBalancingState(CONNECTING, new Picker(PickResult.withSubchannel(subchannel))); + subchannel.requestConnection(); } else { helper.updateSubchannelAddresses(subchannel, newEag); } @@ -84,7 +90,7 @@ public void handleNameResolutionError(Status error) { } // NB(lukaszx0) Whether we should propagate the error unconditionally is arguable. It's fine // for time being. - helper.updatePicker(new Picker(PickResult.withError(error))); + helper.updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); } @Override @@ -110,7 +116,7 @@ public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo s throw new IllegalArgumentException("Unsupported state:" + currentState); } - helper.updatePicker(new Picker(pickResult)); + helper.updateBalancingState(currentState, new Picker(pickResult)); } @Override diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 7294d60e8b5..bcd681e8eaf 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -119,7 +119,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI // null when channel is in idle mode. Must be assigned from channelExecutor. @Nullable - private LoadBalancer loadBalancer; + private LbHelperImpl lbHelper; // Must be assigned from channelExecutor. null if channel is in idle mode. @Nullable @@ -183,9 +183,9 @@ public void transportInUse(final boolean inUse) { public void transportTerminated() { checkState(shutdown.get(), "Channel must have been shut down"); terminating = true; - if (loadBalancer != null) { - loadBalancer.shutdown(); - loadBalancer = null; + if (lbHelper != null) { + lbHelper.lb.shutdown(); + lbHelper = null; } if (nameResolver != null) { nameResolver.shutdown(); @@ -247,9 +247,12 @@ public void run() { // did not cancel idleModeTimer, both of which are bugs. nameResolver.shutdown(); nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams); - loadBalancer.shutdown(); - loadBalancer = null; + lbHelper.lb.shutdown(); + lbHelper = null; subchannelPicker = null; + if (!channelStateManager.isDisabled()) { + channelStateManager.gotoState(IDLE); + } } } @@ -279,15 +282,14 @@ void exitIdleMode() { // isInUse() == false, in which case we still need to schedule the timer. rescheduleIdleTimer(); } - if (loadBalancer != null) { + if (lbHelper != null) { return; } log.log(Level.FINE, "[{0}] Exiting idle mode", getLogId()); - LbHelperImpl helper = new LbHelperImpl(nameResolver); - helper.lb = loadBalancerFactory.newLoadBalancer(helper); - this.loadBalancer = helper.lb; + lbHelper = new LbHelperImpl(nameResolver); + lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper); - NameResolverListenerImpl listener = new NameResolverListenerImpl(helper); + NameResolverListenerImpl listener = new NameResolverListenerImpl(lbHelper); try { nameResolver.start(listener); } catch (Throwable t) { @@ -679,6 +681,9 @@ public void updateBalancingState( new Runnable() { @Override public void run() { + if (LbHelperImpl.this != lbHelper) { + return; + } subchannelPicker = newPicker; delayedTransport.reprocess(newPicker); // It's not appropriate to report SHUTDOWN state from lb. @@ -767,6 +772,9 @@ public void updatePicker(final SubchannelPicker picker) { runSerialized(new Runnable() { @Override public void run() { + if (LbHelperImpl.this != lbHelper) { + return; + } subchannelPicker = picker; delayedTransport.reprocess(picker); channelStateManager.disable(); diff --git a/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java index ba2375dc177..828fa1a9367 100644 --- a/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/PickFirstLoadBalancerTest.java @@ -16,6 +16,10 @@ package io.grpc; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -98,7 +102,8 @@ public void pickAfterResolved() throws Exception { loadBalancer.handleResolvedAddressGroups(servers, affinity); verify(mockHelper).createSubchannel(eagCaptor.capture(), attrsCaptor.capture()); - verify(mockHelper).updatePicker(pickerCaptor.capture()); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockSubchannel).requestConnection(); assertEquals(new EquivalentAddressGroup(socketAddresses), eagCaptor.getValue()); assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), @@ -110,11 +115,13 @@ public void pickAfterResolved() throws Exception { @Test public void pickAfterResolvedAndUnchanged() throws Exception { loadBalancer.handleResolvedAddressGroups(servers, affinity); + verify(mockSubchannel).requestConnection(); loadBalancer.handleResolvedAddressGroups(servers, affinity); + verifyNoMoreInteractions(mockSubchannel); verify(mockHelper).createSubchannel(any(EquivalentAddressGroup.class), any(Attributes.class)); - verify(mockHelper).updatePicker(isA(Picker.class)); + verify(mockHelper).updateBalancingState(isA(ConnectivityState.class), isA(Picker.class)); // Updating the subchannel addresses is unnecessary, but doesn't hurt anything verify(mockHelper).updateSubchannelAddresses( eq(mockSubchannel), any(EquivalentAddressGroup.class)); @@ -133,7 +140,8 @@ public void pickAfterResolvedAndChanged() throws Exception { loadBalancer.handleResolvedAddressGroups(servers, affinity); inOrder.verify(mockHelper).createSubchannel(eagCaptor.capture(), any(Attributes.class)); - inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockSubchannel).requestConnection(); assertEquals(socketAddresses, eagCaptor.getValue().getAddresses()); assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); @@ -141,15 +149,13 @@ public void pickAfterResolvedAndChanged() throws Exception { inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eagCaptor.capture()); assertEquals(newSocketAddresses, eagCaptor.getValue().getAddresses()); - verify(mockSubchannel, never()).shutdown(); - + verifyNoMoreInteractions(mockSubchannel); verifyNoMoreInteractions(mockHelper); } @Test public void stateChangeBeforeResolution() throws Exception { - loadBalancer.handleSubchannelState(mockSubchannel, - ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + loadBalancer.handleSubchannelState(mockSubchannel, ConnectivityStateInfo.forNonError(READY)); verifyNoMoreInteractions(mockHelper); } @@ -157,7 +163,7 @@ public void stateChangeBeforeResolution() throws Exception { @Test public void pickAfterStateChangeAfterResolution() throws Exception { loadBalancer.handleResolvedAddressGroups(servers, affinity); - verify(mockHelper).updatePicker(pickerCaptor.capture()); + verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel(); reset(mockHelper); @@ -166,19 +172,16 @@ public void pickAfterStateChangeAfterResolution() throws Exception { Status error = Status.UNAVAILABLE.withDescription("boom!"); loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); assertEquals(error, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); - loadBalancer.handleSubchannelState(subchannel, - ConnectivityStateInfo.forNonError(ConnectivityState.IDLE)); - inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture()); + loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); assertEquals(Status.OK, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus()); - loadBalancer.handleSubchannelState(subchannel, - ConnectivityStateInfo.forNonError(ConnectivityState.READY)); - inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture()); - assertEquals(subchannel, - pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); + loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel()); verifyNoMoreInteractions(mockHelper); } @@ -187,10 +190,11 @@ public void pickAfterStateChangeAfterResolution() throws Exception { public void nameResolutionError() throws Exception { Status error = Status.NOT_FOUND.withDescription("nameResolutionError"); loadBalancer.handleNameResolutionError(error); - verify(mockHelper).updatePicker(pickerCaptor.capture()); + verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs); assertEquals(null, pickResult.getSubchannel()); assertEquals(error, pickResult.getStatus()); + verify(mockSubchannel, never()).requestConnection(); verifyNoMoreInteractions(mockHelper); } @@ -199,12 +203,15 @@ public void nameResolutionSuccessAfterError() throws Exception { InOrder inOrder = inOrder(mockHelper); loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError")); - inOrder.verify(mockHelper).updatePicker(any(Picker.class)); + inOrder.verify(mockHelper) + .updateBalancingState(any(ConnectivityState.class), any(Picker.class)); + verify(mockSubchannel, never()).requestConnection(); loadBalancer.handleResolvedAddressGroups(servers, affinity); inOrder.verify(mockHelper).createSubchannel(eq(new EquivalentAddressGroup(socketAddresses)), eq(Attributes.EMPTY)); - inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + verify(mockSubchannel).requestConnection(); assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs) .getSubchannel()); @@ -223,17 +230,16 @@ public void nameResolutionErrorWithStateChanges() throws Exception { ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); Status error = Status.NOT_FOUND.withDescription("nameResolutionError"); loadBalancer.handleNameResolutionError(error); - inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs); assertEquals(null, pickResult.getSubchannel()); assertEquals(error, pickResult.getStatus()); - loadBalancer.handleSubchannelState(mockSubchannel, - ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + loadBalancer.handleSubchannelState(mockSubchannel, ConnectivityStateInfo.forNonError(READY)); Status error2 = Status.NOT_FOUND.withDescription("nameResolutionError2"); loadBalancer.handleNameResolutionError(error2); - inOrder.verify(mockHelper).updatePicker(pickerCaptor.capture()); + inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs); assertEquals(null, pickResult.getSubchannel()); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 59dc883f669..1f171b00f06 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -168,12 +168,14 @@ public class ManagedChannelImplTest { private void createChannel( NameResolver.Factory nameResolverFactory, List interceptors) { - createChannel(nameResolverFactory, interceptors, true /* requestConnection */); + createChannel( + nameResolverFactory, interceptors, true /* requestConnection */, + ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE); } private void createChannel( NameResolver.Factory nameResolverFactory, List interceptors, - boolean requestConnection) { + boolean requestConnection, long idleTimeoutMillis) { class Builder extends AbstractManagedChannelImplBuilder { Builder(String target) { super(target); @@ -197,7 +199,7 @@ class Builder extends AbstractManagedChannelImplBuilder { .loadBalancerFactory(mockLoadBalancerFactory) .userAgent(userAgent); builder.executorPool = executorPool; - builder.idleTimeoutMillis = ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE; + builder.idleTimeoutMillis = idleTimeoutMillis; channel = new ManagedChannelImpl( builder, mockTransportFactory, new FakeBackoffPolicyProvider(), oobExecutorPool, timer.getStopwatchSupplier(), interceptors); @@ -205,7 +207,9 @@ builder, mockTransportFactory, new FakeBackoffPolicyProvider(), if (requestConnection) { // Force-exit the initial idle-mode channel.exitIdleMode(); - assertEquals(0, timer.numPendingTasks()); + assertEquals( + idleTimeoutMillis == ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE ? 0 : 1, + timer.numPendingTasks()); ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(null); verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture()); @@ -1165,7 +1169,8 @@ public void getState_loadBalancerSupportsChannelState() { @Test public void getState_withRequestConnect() { createChannel( - new FakeNameResolverFactory(false), NO_INTERCEPTOR, false /* requestConnection */); + new FakeNameResolverFactory(false), NO_INTERCEPTOR, false /* requestConnection */, + ManagedChannelImpl.IDLE_TIMEOUT_MILLIS_DISABLE); assertEquals(ConnectivityState.IDLE, channel.getState(false)); verify(mockLoadBalancerFactory, never()).newLoadBalancer(any(Helper.class)); @@ -1271,6 +1276,45 @@ public void run() { assertFalse(stateChanged.get()); } + @Test + public void stateIsIdleOnIdleTimeout() { + long idleTimeoutMillis = 2000L; + createChannel( + new FakeNameResolverFactory(true), NO_INTERCEPTOR, true /* request connection*/, + idleTimeoutMillis); + assertEquals(ConnectivityState.IDLE, channel.getState(false)); + + helper.updateBalancingState(CONNECTING, mockPicker); + assertEquals(CONNECTING, channel.getState(false)); + + timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); + assertEquals(ConnectivityState.IDLE, channel.getState(false)); + } + + @Test + public void idleTimeoutAndReconnect() { + long idleTimeoutMillis = 2000L; + createChannel( + new FakeNameResolverFactory(true), NO_INTERCEPTOR, true /* request connection*/, + idleTimeoutMillis); + + timer.forwardNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeoutMillis)); + assertEquals(ConnectivityState.IDLE, channel.getState(true /* request connection */)); + + ArgumentCaptor helperCaptor = ArgumentCaptor.forClass(Helper.class); + // Two times of requesting connection will create loadBalancer twice. + verify(mockLoadBalancerFactory, times(2)).newLoadBalancer(helperCaptor.capture()); + Helper helper2 = helperCaptor.getValue(); + + // Updating on the old helper (whose balancer has been shutdown) does not change the channel + // state. + helper.updateBalancingState(CONNECTING, mockPicker); + assertEquals(ConnectivityState.IDLE, channel.getState(false)); + + helper2.updateBalancingState(CONNECTING, mockPicker); + assertEquals(ConnectivityState.CONNECTING, channel.getState(false)); + } + // TODO(zdapeng): replace usages of updatePicker() in some other tests once it's deprecated @Test public void updateBalancingStateDoesUpdatePicker() {