From 0aa5ae1eb1a4b1ac057ad8c2a821970e6504acc1 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 18 Oct 2017 13:39:03 -0700 Subject: [PATCH 1/2] core: refresh name resolution when an OOB connection is closed. This is required by an internal use case. We have already been doing so for Subchannels. --- .../java/io/grpc/internal/BackoffPolicy.java | 2 +- .../io/grpc/internal/ManagedChannelImpl.java | 12 +++-- .../java/io/grpc/internal/OobChannel.java | 7 +++ .../grpc/internal/ManagedChannelImplTest.java | 51 ++++++++++++++++++- 4 files changed, 67 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/BackoffPolicy.java b/core/src/main/java/io/grpc/internal/BackoffPolicy.java index e3626b90fc9..21541db2759 100644 --- a/core/src/main/java/io/grpc/internal/BackoffPolicy.java +++ b/core/src/main/java/io/grpc/internal/BackoffPolicy.java @@ -25,7 +25,7 @@ interface Provider { } /** - * @return The number of milliseconds to wait. + * @return The number of nanoseconds to wait. */ long nextBackoffNanos(); } diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 7bfce8807a8..2b77be06a7d 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -639,6 +639,13 @@ private class LbHelperImpl extends LoadBalancer.Helper { this.nr = checkNotNull(nr, "NameResolver"); } + // Must be called from channelExecutor + private void handleInternalSubchannelState(ConnectivityStateInfo newState) { + if ((newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE)) { + nr.refresh(); + } + } + @Override public AbstractSubchannel createSubchannel( EquivalentAddressGroup addressGroup, Attributes attrs) { @@ -660,9 +667,7 @@ void onTerminated(InternalSubchannel is) { @Override void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { - if ((newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE)) { - nr.refresh(); - } + handleInternalSubchannelState(newState); // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) { lb.handleSubchannelState(subchannel, newState); @@ -757,6 +762,7 @@ void onTerminated(InternalSubchannel is) { @Override void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { + handleInternalSubchannelState(newState); oobChannel.handleSubchannelStateChange(newState); } }, diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index a993a4083eb..10747a13c47 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -18,11 +18,13 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; @@ -223,4 +225,9 @@ void handleSubchannelTerminated() { executorPool.returnObject(executor); terminatedLatch.countDown(); } + + @VisibleForTesting + Subchannel getSubchannel() { + return subchannelImpl; + } } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 947adbd90a8..672e24ef263 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -124,6 +124,7 @@ public class ManagedChannelImplTest { .build(); private static final Attributes.Key SUBCHANNEL_ATTR_KEY = Attributes.Key.of("subchannel-attr-key"); + private static final long RECONNECT_BACKOFF_INTERVAL_NANOS = 1; private static int unterminatedChannels; private final String serviceName = "fake.example.com"; private final String authority = serviceName; @@ -1131,6 +1132,49 @@ public void oobChannelsNoConnectionShutdownNow() { anyString(), any(ProxyParameters.class)); } + @Test + public void refreshNameResolutionWhenSubchannelConnectionFailed() { + subtestRefreshNameResolutionWhenConnectionFailed(false); + } + + @Test + public void refreshNameResolutionWhenOobChannelConnectionFailed() { + subtestRefreshNameResolutionWhenConnectionFailed(true); + } + + private void subtestRefreshNameResolutionWhenConnectionFailed(boolean isOobChannel) { + FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true); + createChannel(nameResolverFactory, NO_INTERCEPTOR); + FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); + + if (isOobChannel) { + OobChannel oobChannel = (OobChannel) helper.createOobChannel(addressGroup, "oobAuthority"); + oobChannel.getSubchannel().requestConnection(); + } else { + Subchannel subchannel = helper.createSubchannel(addressGroup, Attributes.EMPTY); + subchannel.requestConnection(); + } + + MockClientTransportInfo transportInfo = transports.poll(); + assertNotNull(transportInfo); + + // Transport closed when connecting + assertEquals(0, resolver.refreshCalled); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + assertEquals(1, resolver.refreshCalled); + + timer.forwardNanos(RECONNECT_BACKOFF_INTERVAL_NANOS); + transportInfo = transports.poll(); + assertNotNull(transportInfo); + + transportInfo.listener.transportReady(); + + // Transport closed when ready + assertEquals(1, resolver.refreshCalled); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + assertEquals(2, resolver.refreshCalled); + } + @Test public void uriPattern() { assertTrue(ManagedChannelImpl.URI_PATTERN.matcher("a:/").matches()); @@ -1570,7 +1614,7 @@ public BackoffPolicy get() { return new BackoffPolicy() { @Override public long nextBackoffNanos() { - return 1; + return RECONNECT_BACKOFF_INTERVAL_NANOS; } }; } @@ -1621,6 +1665,7 @@ void allResolved() { private class FakeNameResolver extends NameResolver { Listener listener; boolean shutdown; + int refreshCalled; @Override public String getServiceAuthority() { return expectedUri.getAuthority(); @@ -1633,6 +1678,10 @@ private class FakeNameResolver extends NameResolver { } } + @Override public void refresh() { + refreshCalled++; + } + void resolved() { listener.onAddresses(servers, Attributes.EMPTY); } From 2c4e3269ae4357d20ce19981b8187c77d28b1763 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 18 Oct 2017 14:17:53 -0700 Subject: [PATCH 2/2] Remove double parens --- core/src/main/java/io/grpc/internal/ManagedChannelImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 2b77be06a7d..e549c11ed13 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -641,7 +641,7 @@ private class LbHelperImpl extends LoadBalancer.Helper { // Must be called from channelExecutor private void handleInternalSubchannelState(ConnectivityStateInfo newState) { - if ((newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE)) { + if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) { nr.refresh(); } }