Skip to content
Permalink
Browse files
fix: retain context timeouts in ServerStreamingAttemptCallable (#1155)
  • Loading branch information
noahdietz committed Jul 24, 2020
1 parent 20bb200 commit 461ff846ca551c2242bf6c60e61234997d0ba58e
@@ -181,11 +181,12 @@ public void cancel() {
}
isStarted = true;

// Propagate the totalTimeout as the overall stream deadline.
// Propagate the totalTimeout as the overall stream deadline, so long as the user
// has not provided a timeout via the ApiCallContext. If they have, retain it.
Duration totalTimeout =
outerRetryingFuture.getAttemptSettings().getGlobalSettings().getTotalTimeout();

if (totalTimeout != null && context != null) {
if (totalTimeout != null && context != null && context.getTimeout() == null) {
context = context.withTimeout(totalTimeout);
}

@@ -217,7 +218,10 @@ public Void call() {

ApiCallContext attemptContext = context;

if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()) {
// Set the streamWaitTimeout to the attempt RPC Timeout, only if the context
// does not already have a timeout set by a user via withStreamWaitTimeout.
if (!outerRetryingFuture.getAttemptSettings().getRpcTimeout().isZero()
&& attemptContext.getStreamWaitTimeout() == null) {
attemptContext =
attemptContext.withStreamWaitTimeout(
outerRetryingFuture.getAttemptSettings().getRpcTimeout());
@@ -41,6 +41,7 @@
import com.google.api.gax.rpc.testing.FakeCallContext;
import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCall;
import com.google.api.gax.rpc.testing.MockStreamingApi.MockServerStreamingCallable;
import com.google.api.gax.tracing.NoopApiTracer;
import com.google.common.collect.Queues;
import com.google.common.truth.Truth;
import java.util.concurrent.BlockingDeque;
@@ -51,6 +52,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
import org.threeten.bp.Duration;

@RunWith(JUnit4.class)
@@ -59,29 +61,107 @@ public class ServerStreamingAttemptCallableTest {
private AccumulatingObserver observer;
private FakeRetryingFuture fakeRetryingFuture;
private StreamResumptionStrategy<String, String> resumptionStrategy;
private static Duration totalTimeout = Duration.ofHours(1);
private FakeCallContext mockedCallContext;

@Before
public void setUp() {
innerCallable = new MockServerStreamingCallable<>();
observer = new AccumulatingObserver(true);
resumptionStrategy = new MyStreamResumptionStrategy();
mockedCallContext = Mockito.mock(FakeCallContext.class);
}

private ServerStreamingAttemptCallable<String, String> createCallable() {
return createCallable(FakeCallContext.createDefault());
}

private ServerStreamingAttemptCallable<String, String> createCallable(ApiCallContext context) {
ServerStreamingAttemptCallable<String, String> callable =
new ServerStreamingAttemptCallable<>(
innerCallable,
resumptionStrategy,
"request",
FakeCallContext.createDefault(),
observer);
innerCallable, resumptionStrategy, "request", context, observer);

fakeRetryingFuture = new FakeRetryingFuture(callable);
callable.setExternalFuture(fakeRetryingFuture);

return callable;
}

@Test
public void testUserProvidedContextTimeout() {
// Mock up the ApiCallContext as if the user provided a timeout and streamWaitTimeout.
Mockito.doReturn(NoopApiTracer.getInstance()).when(mockedCallContext).getTracer();
Mockito.doReturn(Duration.ofHours(5)).when(mockedCallContext).getTimeout();
Mockito.doReturn(Duration.ofHours(5)).when(mockedCallContext).getStreamWaitTimeout();

ServerStreamingAttemptCallable<String, String> callable = createCallable(mockedCallContext);
callable.start();

// Ensure that the callable did not overwrite the user provided timeouts
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
Mockito.verify(mockedCallContext, Mockito.never()).withTimeout(totalTimeout);
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
Mockito.verify(mockedCallContext, Mockito.never())
.withStreamWaitTimeout(Mockito.any(Duration.class));

// Should notify outer observer
Truth.assertThat(observer.controller).isNotNull();

// Should configure the inner controller correctly.
MockServerStreamingCall<String, String> call = innerCallable.popLastCall();
Truth.assertThat(call.getController().isAutoFlowControlEnabled()).isTrue();
Truth.assertThat(call.getRequest()).isEqualTo("request");

// Send a response in auto flow mode.
call.getController().getObserver().onResponse("response1");
call.getController().getObserver().onResponse("response2");
call.getController().getObserver().onComplete();

// Make sure the responses are received
Truth.assertThat(observer.responses).containsExactly("response1", "response2").inOrder();
fakeRetryingFuture.assertSuccess();
}

@Test
public void testNoUserProvidedContextTimeout() {
// Mock up the ApiCallContext as if the user did not provide custom timeouts.
Mockito.doReturn(NoopApiTracer.getInstance()).when(mockedCallContext).getTracer();
Mockito.doReturn(null).when(mockedCallContext).getTimeout();
Mockito.doReturn(null).when(mockedCallContext).getStreamWaitTimeout();
Mockito.doReturn(mockedCallContext).when(mockedCallContext).withTimeout(totalTimeout);
Mockito.doReturn(mockedCallContext)
.when(mockedCallContext)
.withStreamWaitTimeout(Mockito.any(Duration.class));

ServerStreamingAttemptCallable<String, String> callable = createCallable(mockedCallContext);
callable.start();

// Ensure that the callable configured the timeouts via the Settings in the
// absence of user-defined timeouts.
Mockito.verify(mockedCallContext, Mockito.times(1)).getTimeout();
Mockito.verify(mockedCallContext, Mockito.times(1)).withTimeout(totalTimeout);
Mockito.verify(mockedCallContext, Mockito.times(1)).getStreamWaitTimeout();
Mockito.verify(mockedCallContext, Mockito.times(1))
.withStreamWaitTimeout(Mockito.any(Duration.class));

// Should notify outer observer
Truth.assertThat(observer.controller).isNotNull();

// Should configure the inner controller correctly.
MockServerStreamingCall<String, String> call = innerCallable.popLastCall();
Truth.assertThat(call.getController().isAutoFlowControlEnabled()).isTrue();
Truth.assertThat(call.getRequest()).isEqualTo("request");

// Send a response in auto flow mode.
call.getController().getObserver().onResponse("response1");
call.getController().getObserver().onResponse("response2");
call.getController().getObserver().onComplete();

// Make sure the responses are received
Truth.assertThat(observer.responses).containsExactly("response1", "response2").inOrder();
fakeRetryingFuture.assertSuccess();
}

@Test
public void testNoErrorsAutoFlow() {
ServerStreamingAttemptCallable<String, String> callable = createCallable();
@@ -396,8 +476,7 @@ private static class FakeRetryingFuture extends AbstractApiFuture<Void>
this.attemptCallable = attemptCallable;
attemptSettings =
TimedAttemptSettings.newBuilder()
.setGlobalSettings(
RetrySettings.newBuilder().setTotalTimeout(Duration.ofHours(1)).build())
.setGlobalSettings(RetrySettings.newBuilder().setTotalTimeout(totalTimeout).build())
.setFirstAttemptStartTimeNanos(0)
.setAttemptCount(0)
.setOverallAttemptCount(0)

0 comments on commit 461ff84

Please sign in to comment.