diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 24168249643..2ecbdf48b78 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -68,7 +68,7 @@ final class DelayedClientTransport implements ManagedClientTransport { private Collection pendingStreams = new LinkedHashSet<>(); @GuardedBy("lock") private Collection toCheckCompletionStreams = new LinkedHashSet<>(); - private Runnable pollForStreamTransferComplete = new Runnable() { + private Runnable pollForStreamTransferCompletion = new Runnable() { @Override public void run() { ArrayList savedToCheckCompletionStreams; @@ -291,7 +291,7 @@ final int getPendingStreamsCount() { } @VisibleForTesting - final int getUncommittedStreamCount() { + final int getUncommittedStreamsCount() { synchronized (lock) { return toCheckCompletionStreams.size(); } @@ -373,10 +373,10 @@ public void run() { // transport starting streams and setting in-use state. During the gap the whole channel's // in-use state may be false. However, it shouldn't cause spurious switching to idleness // (which would shutdown the transports and LoadBalancer) because the gap should be shorter - // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). + // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (30 millis). syncContext.executeLater(reportTransportNotInUse); if (shutdownStatus != null && reportTransportTerminated != null) { - syncContext.executeLater(pollForStreamTransferComplete); + syncContext.executeLater(pollForStreamTransferCompletion); syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } @@ -419,7 +419,7 @@ public void cancel(Status reason) { if (!hasPendingStreams() && justRemovedAnElement) { syncContext.executeLater(reportTransportNotInUse); if (shutdownStatus != null) { - syncContext.executeLater(pollForStreamTransferComplete); + syncContext.executeLater(pollForStreamTransferCompletion); syncContext.executeLater(reportTransportTerminated); reportTransportTerminated = null; } diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index 68464627320..05e285851c7 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -140,8 +140,8 @@ protected boolean isStreamTransferCompleted() { } protected void awaitStreamTransferCompletion() { - // Wait until accepted RPCs transfer to the real stream and so that we can properly cancel or - // shutdown. Not waiting transfer completed may cause pending calls orphaned. + // Wait until accepted RPCs transfer to the real stream so that we can properly cancel or + // shutdown. Not waiting for transfer completion may cause pending calls orphaned. #636. boolean delegationComplete; try { delegationComplete = realStreamStarted.await(5, TimeUnit.SECONDS); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 4a2ed84b858..54353edeb83 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -258,18 +258,18 @@ public void cancelStreamShutdownLastPending() { .thenReturn(PickResult.withSubchannel(mockSubchannel)); assertEquals(3, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.reprocess(mockPicker); fakeExecutor.runDueTasks(); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(2, delayedTransport.getUncommittedStreamCount()); + assertEquals(2, delayedTransport.getUncommittedStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated(); stream2.cancel(Status.CANCELLED); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); assertFalse(Thread.interrupted()); verify(mockRealStream, times(2)).start(any(ClientStreamListener.class)); verify(mockRealStream, never()).cancel(any(Status.class));//substituted @@ -431,7 +431,7 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(5, delayedTransport.getPendingStreamsCount()); - assertEquals(3, delayedTransport.getUncommittedStreamCount()); + assertEquals(3, delayedTransport.getUncommittedStreamsCount()); inOrder.verify(picker).pickSubchannel(ff1args); inOrder.verify(picker).pickSubchannel(ff2args); inOrder.verify(picker).pickSubchannel(ff3args); @@ -478,7 +478,7 @@ public void cancelStreamShutdownLastPending() { delayedTransport.reprocess(picker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(7, delayedTransport.getUncommittedStreamCount());//+5(new) - 1(completed) + assertEquals(7, delayedTransport.getUncommittedStreamsCount());//+5(new) - 1(completed) verify(transportListener).transportInUse(false); inOrder.verify(picker).pickSubchannel(ff3args); // ff3 inOrder.verify(picker).pickSubchannel(ff4args); // ff4 @@ -488,7 +488,7 @@ public void cancelStreamShutdownLastPending() { inOrder.verifyNoMoreInteractions(); fakeExecutor.runDueTasks(); assertEquals(0, fakeExecutor.numPendingTasks()); - assertEquals(7, delayedTransport.getUncommittedStreamCount()); + assertEquals(7, delayedTransport.getUncommittedStreamsCount()); assertSame(mockRealStream, ff3.getRealStream()); assertSame(mockRealStream2, ff4.getRealStream()); assertSame(mockRealStream2, wfr2.getRealStream()); @@ -498,7 +498,7 @@ public void cancelStreamShutdownLastPending() { ff4.start(streamListener); wfr2.start(streamListener); delayedTransport.reprocess(picker); - assertEquals(4, delayedTransport.getUncommittedStreamCount());//+0(new) - 3(completed) + assertEquals(4, delayedTransport.getUncommittedStreamsCount());//+0(new) - 3(completed) // If there is an executor in the CallOptions, it will be used to create the real stream. assertNull(wfr3.getRealStream()); @@ -513,7 +513,7 @@ public void cancelStreamShutdownLastPending() { new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions)); inOrder.verifyNoMoreInteractions(); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(4, delayedTransport.getUncommittedStreamCount()); + assertEquals(4, delayedTransport.getUncommittedStreamsCount()); // wfr5 will stop delayed transport from terminating delayedTransport.shutdown(SHUTDOWN_STATUS); @@ -531,7 +531,7 @@ public void cancelStreamShutdownLastPending() { fakeExecutor.runDueTasks(); assertSame(mockRealStream, wfr5.getRealStream()); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); verify(transportListener).transportTerminated(); assertTrue(Thread.interrupted()); } @@ -686,7 +686,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception { DelayedStream stream2 = (DelayedStream)delayedTransport.newStream(method2, headers2, callOptions2); assertEquals(2, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))) .thenReturn(PickResult.withSubchannel(mockSubchannel)) .thenReturn(PickResult.withNoResult()) @@ -695,7 +695,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception { delayedTransport.reprocess(mockPicker); fakeExecutor.runDueTasks(); assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 - assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1 + assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1 doAnswer(new Answer() { @Override @@ -721,7 +721,7 @@ public void run() { }; processThread.start(); assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2 - assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1 + assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1 barrierSignal.await(5, TimeUnit.SECONDS); assertEquals(1, barrier.getNumberWaiting()); delayedTransport.shutdownNow(SHUTDOWN_STATUS); @@ -742,16 +742,16 @@ public void uncommittedStreamReprocess() { assertTrue(stream instanceof DelayedStream); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getUncommittedStreamCount()); + assertEquals(1, delayedTransport.getUncommittedStreamsCount()); assertFalse(((DelayedStream) stream).isStreamTransferCompleted()); fakeExecutor.runDueTasks(); assertTrue(((DelayedStream) stream).isStreamTransferCompleted()); delayedTransport.reprocess(mockPicker); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.shutdownNow(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); @@ -769,21 +769,21 @@ public void uncommittedStreamShutdown() { DelayedStream stream = (DelayedStream) delayedTransport.newStream(method, headers, callOptions); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); delayedTransport.reprocess(mockPicker); assertEquals(0, delayedTransport.getPendingStreamsCount()); - assertEquals(1, delayedTransport.getUncommittedStreamCount()); + assertEquals(1, delayedTransport.getUncommittedStreamsCount()); assertFalse(stream.isStreamTransferCompleted()); fakeExecutor.runDueTasks(); assertTrue(stream.isStreamTransferCompleted()); - assertEquals(1, delayedTransport.getUncommittedStreamCount()); + assertEquals(1, delayedTransport.getUncommittedStreamsCount()); delayedTransport.shutdown(SHUTDOWN_STATUS); verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS)); verify(transportListener, never()).transportTerminated(); delayedTransport.reprocess(null); - assertEquals(0, delayedTransport.getUncommittedStreamCount()); + assertEquals(0, delayedTransport.getUncommittedStreamsCount()); assertFalse(Thread.interrupted()); verify(mockRealStream).start(listenerCaptor.capture()); verifyNoMoreInteractions(streamListener);