diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 1478639b30d..56cbc384a93 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -660,7 +660,10 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { if ((newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE)) { nr.refresh(); } - lb.handleSubchannelState(subchannel, newState); + // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. + if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) { + lb.handleSubchannelState(subchannel, newState); + } } @Override @@ -846,7 +849,8 @@ public void onAddresses(final List servers, final Attrib helper.runSerialized(new Runnable() { @Override public void run() { - if (terminated) { + // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. + if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { return; } try { @@ -871,7 +875,8 @@ public void onError(final Status error) { channelExecutor.executeLater(new Runnable() { @Override public void run() { - if (terminated) { + // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. + if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { return; } balancer.handleNameResolutionError(error); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 19cc375289b..a9617d418c5 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -443,6 +443,52 @@ private void subtestCallsAndShutdown(boolean shutdownNow, boolean shutdownNowAft verifyNoMoreInteractions(mockTransport); } + @Test + public void noMoreCallbackAfterLoadBalancerShutdown() { + FakeNameResolverFactory nameResolverFactory = new FakeNameResolverFactory(true); + Status resolutionError = Status.UNAVAILABLE.withDescription("Resolution failed"); + createChannel(nameResolverFactory, NO_INTERCEPTOR); + + FakeNameResolverFactory.FakeNameResolver resolver = nameResolverFactory.resolvers.get(0); + verify(mockLoadBalancerFactory).newLoadBalancer(any(Helper.class)); + verify(mockLoadBalancer).handleResolvedAddressGroups( + eq(Arrays.asList(addressGroup)), eq(Attributes.EMPTY)); + + Subchannel subchannel1 = helper.createSubchannel(addressGroup, Attributes.EMPTY); + Subchannel subchannel2 = helper.createSubchannel(addressGroup, Attributes.EMPTY); + subchannel1.requestConnection(); + subchannel2.requestConnection(); + verify(mockTransportFactory, times(2)).newClientTransport( + any(SocketAddress.class), any(String.class), any(String.class)); + MockClientTransportInfo transportInfo1 = transports.poll(); + MockClientTransportInfo transportInfo2 = transports.poll(); + + // LoadBalancer receives all sorts of callbacks + transportInfo1.listener.transportReady(); + verify(mockLoadBalancer, times(2)) + .handleSubchannelState(same(subchannel1), stateInfoCaptor.capture()); + assertSame(CONNECTING, stateInfoCaptor.getAllValues().get(0).getState()); + assertSame(READY, stateInfoCaptor.getAllValues().get(1).getState()); + + verify(mockLoadBalancer) + .handleSubchannelState(same(subchannel2), stateInfoCaptor.capture()); + assertSame(CONNECTING, stateInfoCaptor.getValue().getState()); + + resolver.listener.onError(resolutionError); + verify(mockLoadBalancer).handleNameResolutionError(resolutionError); + + verifyNoMoreInteractions(mockLoadBalancer); + + channel.shutdown(); + verify(mockLoadBalancer).shutdown(); + + // No more callback should be delivered to LoadBalancer after it's shut down + transportInfo2.listener.transportReady(); + resolver.listener.onError(resolutionError); + resolver.resolved(); + verifyNoMoreInteractions(mockLoadBalancer); + } + @Test public void interceptor() throws Exception { final AtomicLong atomic = new AtomicLong();