Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang committed Dec 10, 2020
1 parent ac99972 commit 1fe3bce
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 26 deletions.
10 changes: 5 additions & 5 deletions core/src/main/java/io/grpc/internal/DelayedClientTransport.java
Expand Up @@ -68,7 +68,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
@GuardedBy("lock")
private Collection<PendingStream> toCheckCompletionStreams = new LinkedHashSet<>();
private Runnable pollForStreamTransferComplete = new Runnable() {
private Runnable pollForStreamTransferCompletion = new Runnable() {
@Override
public void run() {
ArrayList<PendingStream> savedToCheckCompletionStreams;
Expand Down Expand Up @@ -291,7 +291,7 @@ final int getPendingStreamsCount() {
}

@VisibleForTesting
final int getUncommittedStreamCount() {
final int getUncommittedStreamsCount() {
synchronized (lock) {
return toCheckCompletionStreams.size();
}
Expand Down Expand Up @@ -373,10 +373,10 @@ public void run() {
// transport starting streams and setting in-use state. During the gap the whole channel's
// in-use state may be false. However, it shouldn't cause spurious switching to idleness
// (which would shutdown the transports and LoadBalancer) because the gap should be shorter
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (30 millis).
syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null && reportTransportTerminated != null) {
syncContext.executeLater(pollForStreamTransferComplete);
syncContext.executeLater(pollForStreamTransferCompletion);
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
Expand Down Expand Up @@ -419,7 +419,7 @@ public void cancel(Status reason) {
if (!hasPendingStreams() && justRemovedAnElement) {
syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null) {
syncContext.executeLater(pollForStreamTransferComplete);
syncContext.executeLater(pollForStreamTransferCompletion);
syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null;
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/io/grpc/internal/DelayedStream.java
Expand Up @@ -140,8 +140,8 @@ protected boolean isStreamTransferCompleted() {
}

protected void awaitStreamTransferCompletion() {
// Wait until accepted RPCs transfer to the real stream and so that we can properly cancel or
// shutdown. Not waiting transfer completed may cause pending calls orphaned.
// Wait until accepted RPCs transfer to the real stream so that we can properly cancel or
// shutdown. Not waiting for transfer completion may cause pending calls orphaned. #636.
boolean delegationComplete;
try {
delegationComplete = realStreamStarted.await(5, TimeUnit.SECONDS);
Expand Down
38 changes: 19 additions & 19 deletions core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java
Expand Up @@ -258,18 +258,18 @@ public void cancelStreamShutdownLastPending() {
.thenReturn(PickResult.withSubchannel(mockSubchannel));

assertEquals(3, delayedTransport.getPendingStreamsCount());
assertEquals(0, delayedTransport.getUncommittedStreamCount());
assertEquals(0, delayedTransport.getUncommittedStreamsCount());
delayedTransport.reprocess(mockPicker);
fakeExecutor.runDueTasks();
assertEquals(1, delayedTransport.getPendingStreamsCount());
assertEquals(2, delayedTransport.getUncommittedStreamCount());
assertEquals(2, delayedTransport.getUncommittedStreamsCount());

delayedTransport.shutdown(SHUTDOWN_STATUS);
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS));
verify(transportListener, never()).transportTerminated();
stream2.cancel(Status.CANCELLED);
assertEquals(0, delayedTransport.getPendingStreamsCount());
assertEquals(0, delayedTransport.getUncommittedStreamCount());
assertEquals(0, delayedTransport.getUncommittedStreamsCount());
assertFalse(Thread.interrupted());
verify(mockRealStream, times(2)).start(any(ClientStreamListener.class));
verify(mockRealStream, never()).cancel(any(Status.class));//substituted
Expand Down Expand Up @@ -431,7 +431,7 @@ public void cancelStreamShutdownLastPending() {
delayedTransport.reprocess(picker);

assertEquals(5, delayedTransport.getPendingStreamsCount());
assertEquals(3, delayedTransport.getUncommittedStreamCount());
assertEquals(3, delayedTransport.getUncommittedStreamsCount());
inOrder.verify(picker).pickSubchannel(ff1args);
inOrder.verify(picker).pickSubchannel(ff2args);
inOrder.verify(picker).pickSubchannel(ff3args);
Expand Down Expand Up @@ -478,7 +478,7 @@ public void cancelStreamShutdownLastPending() {

delayedTransport.reprocess(picker);
assertEquals(0, delayedTransport.getPendingStreamsCount());
assertEquals(7, delayedTransport.getUncommittedStreamCount());//+5(new) - 1(completed)
assertEquals(7, delayedTransport.getUncommittedStreamsCount());//+5(new) - 1(completed)
verify(transportListener).transportInUse(false);
inOrder.verify(picker).pickSubchannel(ff3args); // ff3
inOrder.verify(picker).pickSubchannel(ff4args); // ff4
Expand All @@ -488,7 +488,7 @@ public void cancelStreamShutdownLastPending() {
inOrder.verifyNoMoreInteractions();
fakeExecutor.runDueTasks();
assertEquals(0, fakeExecutor.numPendingTasks());
assertEquals(7, delayedTransport.getUncommittedStreamCount());
assertEquals(7, delayedTransport.getUncommittedStreamsCount());
assertSame(mockRealStream, ff3.getRealStream());
assertSame(mockRealStream2, ff4.getRealStream());
assertSame(mockRealStream2, wfr2.getRealStream());
Expand All @@ -498,7 +498,7 @@ public void cancelStreamShutdownLastPending() {
ff4.start(streamListener);
wfr2.start(streamListener);
delayedTransport.reprocess(picker);
assertEquals(4, delayedTransport.getUncommittedStreamCount());//+0(new) - 3(completed)
assertEquals(4, delayedTransport.getUncommittedStreamsCount());//+0(new) - 3(completed)

// If there is an executor in the CallOptions, it will be used to create the real stream.
assertNull(wfr3.getRealStream());
Expand All @@ -513,7 +513,7 @@ public void cancelStreamShutdownLastPending() {
new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions));
inOrder.verifyNoMoreInteractions();
assertEquals(1, delayedTransport.getPendingStreamsCount());
assertEquals(4, delayedTransport.getUncommittedStreamCount());
assertEquals(4, delayedTransport.getUncommittedStreamsCount());

// wfr5 will stop delayed transport from terminating
delayedTransport.shutdown(SHUTDOWN_STATUS);
Expand All @@ -531,7 +531,7 @@ public void cancelStreamShutdownLastPending() {
fakeExecutor.runDueTasks();
assertSame(mockRealStream, wfr5.getRealStream());
assertEquals(0, delayedTransport.getPendingStreamsCount());
assertEquals(0, delayedTransport.getUncommittedStreamCount());
assertEquals(0, delayedTransport.getUncommittedStreamsCount());
verify(transportListener).transportTerminated();
assertTrue(Thread.interrupted());
}
Expand Down Expand Up @@ -686,7 +686,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception {
DelayedStream stream2 =
(DelayedStream)delayedTransport.newStream(method2, headers2, callOptions2);
assertEquals(2, delayedTransport.getPendingStreamsCount());
assertEquals(0, delayedTransport.getUncommittedStreamCount());
assertEquals(0, delayedTransport.getUncommittedStreamsCount());
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(mockSubchannel))
.thenReturn(PickResult.withNoResult())
Expand All @@ -695,7 +695,7 @@ public void pendingStreamReprocessRacesShutdown() throws Exception {
delayedTransport.reprocess(mockPicker);
fakeExecutor.runDueTasks();
assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2
assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1
assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1

doAnswer(new Answer<PickResult>() {
@Override
Expand All @@ -721,7 +721,7 @@ public void run() {
};
processThread.start();
assertEquals(1, delayedTransport.getPendingStreamsCount());//stream2
assertEquals(1, delayedTransport.getUncommittedStreamCount());//stream1
assertEquals(1, delayedTransport.getUncommittedStreamsCount());//stream1
barrierSignal.await(5, TimeUnit.SECONDS);
assertEquals(1, barrier.getNumberWaiting());
delayedTransport.shutdownNow(SHUTDOWN_STATUS);
Expand All @@ -742,16 +742,16 @@ public void uncommittedStreamReprocess() {
assertTrue(stream instanceof DelayedStream);
stream.start(streamListener);
assertEquals(1, delayedTransport.getPendingStreamsCount());
assertEquals(0, delayedTransport.getUncommittedStreamCount());
assertEquals(0, delayedTransport.getUncommittedStreamsCount());
delayedTransport.reprocess(mockPicker);
assertEquals(0, delayedTransport.getPendingStreamsCount());
assertEquals(1, delayedTransport.getUncommittedStreamCount());
assertEquals(1, delayedTransport.getUncommittedStreamsCount());
assertFalse(((DelayedStream) stream).isStreamTransferCompleted());
fakeExecutor.runDueTasks();
assertTrue(((DelayedStream) stream).isStreamTransferCompleted());

delayedTransport.reprocess(mockPicker);
assertEquals(0, delayedTransport.getUncommittedStreamCount());
assertEquals(0, delayedTransport.getUncommittedStreamsCount());

delayedTransport.shutdownNow(SHUTDOWN_STATUS);
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS));
Expand All @@ -769,21 +769,21 @@ public void uncommittedStreamShutdown() {
DelayedStream stream = (DelayedStream) delayedTransport.newStream(method, headers, callOptions);
stream.start(streamListener);
assertEquals(1, delayedTransport.getPendingStreamsCount());
assertEquals(0, delayedTransport.getUncommittedStreamCount());
assertEquals(0, delayedTransport.getUncommittedStreamsCount());
delayedTransport.reprocess(mockPicker);
assertEquals(0, delayedTransport.getPendingStreamsCount());
assertEquals(1, delayedTransport.getUncommittedStreamCount());
assertEquals(1, delayedTransport.getUncommittedStreamsCount());
assertFalse(stream.isStreamTransferCompleted());
fakeExecutor.runDueTasks();
assertTrue(stream.isStreamTransferCompleted());
assertEquals(1, delayedTransport.getUncommittedStreamCount());
assertEquals(1, delayedTransport.getUncommittedStreamsCount());

delayedTransport.shutdown(SHUTDOWN_STATUS);
verify(transportListener).transportShutdown(same(SHUTDOWN_STATUS));
verify(transportListener, never()).transportTerminated();
delayedTransport.reprocess(null);

assertEquals(0, delayedTransport.getUncommittedStreamCount());
assertEquals(0, delayedTransport.getUncommittedStreamsCount());
assertFalse(Thread.interrupted());
verify(mockRealStream).start(listenerCaptor.capture());
verifyNoMoreInteractions(streamListener);
Expand Down

0 comments on commit 1fe3bce

Please sign in to comment.