diff --git a/core/src/main/java/io/grpc/internal/BackoffPolicy.java b/core/src/main/java/io/grpc/internal/BackoffPolicy.java index e3626b90fc9..21541db2759 100644 --- a/core/src/main/java/io/grpc/internal/BackoffPolicy.java +++ b/core/src/main/java/io/grpc/internal/BackoffPolicy.java @@ -25,7 +25,7 @@ interface Provider { } /** - * @return The number of milliseconds to wait. + * @return The number of nanoseconds to wait. */ long nextBackoffNanos(); } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 7bfce8807a8..e549c11ed13 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -639,6 +639,13 @@ private class LbHelperImpl extends LoadBalancer.Helper { this.nr = checkNotNull(nr, "NameResolver"); } + // Must be called from channelExecutor + private void handleInternalSubchannelState(ConnectivityStateInfo newState) { + if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { + nr.refresh(); + } + } + @Override public AbstractSubchannel createSubchannel( EquivalentAddressGroup addressGroup, Attributes attrs) { @@ -660,9 +667,7 @@ void onTerminated(InternalSubchannel is) { @Override void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { - if ((newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE)) { - nr.refresh(); - } + handleInternalSubchannelState(newState); // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) { lb.handleSubchannelState(subchannel, newState); @@ -757,6 +762,7 @@ void onTerminated(InternalSubchannel is) { @Override void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { + handleInternalSubchannelState(newState); oobChannel.handleSubchannelStateChange(newState); } }, diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index a993a4083eb..10747a13c47 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -18,11 +18,13 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; @@ -223,4 +225,9 @@ void handleSubchannelTerminated() { executorPool.returnObject(executor); terminatedLatch.countDown(); } + + @VisibleForTesting + Subchannel getSubchannel() { + return subchannelImpl; + } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 947adbd90a8..672e24ef263 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -124,6 +124,7 @@ public class ManagedChannelImplTest { .build(); private static final Attributes.Key SUBCHANNEL_ATTR_KEY = Attributes.Key.of("subchannel-attr-key"); + private static final long RECONNECT_BACKOFF_INTERVAL_NANOS = 1; private static int unterminatedChannels; private final String serviceName = "fake.example.com"; private final String authority = serviceName; @@ -1131,6 +1132,49 @@ public void oobChannelsNoConnectionShutdownNow() { anyString(), any(ProxyParameters.class)); } + @Test + public void refreshNameResolutionWhenSubchannelConnectionFailed() { + subtestRefreshNameResolutionWhenConnectionFailed(false); + } + + @Test + public void refreshNameResolutionWhenOobChannelConnectionFailed() { + subtestRefreshNameResolutionWhenConnectionFailed(true); + } + + private void subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel) { + FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true); + createChannel(nameResolverFactory, NO_INTERCEPTOR); + FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); + + if (isOobChannel) { + OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobAuthority"); + oobChannel.getSubchannel().requestConnection(); + } else { + Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); + subchannel.requestConnection(); + } + + MockClientTransportInfo transportInfo = transports.poll(); + assertNotNull(transportInfo); + + // Transport closed when connecting + assertEquals(0, resolver.refreshCalled); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + assertEquals(1, resolver.refreshCalled); + + timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS); + transportInfo = transports.poll(); + assertNotNull(transportInfo); + + transportInfo.listener.transportReady(); + + // Transport closed when ready + assertEquals(1, resolver.refreshCalled); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + assertEquals(2, resolver.refreshCalled); + } + @Test public void uriPattern() { assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("a:/").matches()); @@ -1570,7 +1614,7 @@ public BackoffPolicy get() { return new BackoffPolicy() { @Override public long nextBackoffNanos() { - return 1; + return RECONNECT_BACKOFF_INTERVAL_NANOS; } }; } @@ -1621,6 +1665,7 @@ void allResolved() { private class FakeNameResolver extends NameResolver { Listener listener; boolean shutdown; + int refreshCalled; @Override public String getServiceAuthority() { return expectedUri.getAuthority(); @@ -1633,6 +1678,10 @@ private class FakeNameResolver extends NameResolver { } } + @Override public void refresh() { + refreshCalled++; + } + void resolved() { listener.onAddresses(servers, Attributes.EMPTY); }