Skip to content
Permalink
Browse files
feat: wrap non-retryable RPCs in retry machinery (#1328)
* feat: wrap non-retryable RPCs in retry machinery

* address feedback

* add contextual retry settings to timeout tests

* add tracing test for non-retryable callable

* add server streaming callable tests
  • Loading branch information
noahdietz committed Apr 5, 2021
1 parent 751ccf3 commit 51c40abd408ab0637f3b65cf5697a4ee85a544a4
@@ -74,9 +74,12 @@ public class TimeoutTest {
private static final int DEADLINE_IN_MINUTES = 10;
private static final int DEADLINE_IN_SECONDS = 20;
private static final ImmutableSet<StatusCode.Code> emptyRetryCodes = ImmutableSet.of();
private static final ImmutableSet<StatusCode.Code> retryUnknownCode =
ImmutableSet.of(StatusCode.Code.UNKNOWN);
private static final Duration totalTimeout = Duration.ofDays(DEADLINE_IN_DAYS);
private static final Duration maxRpcTimeout = Duration.ofMinutes(DEADLINE_IN_MINUTES);
private static final Duration initialRpcTimeout = Duration.ofSeconds(DEADLINE_IN_SECONDS);
private static final GrpcCallContext defaultCallContext = GrpcCallContext.createDefault();

@Rule public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@Mock private Marshaller<String> stringMarshaller;
@@ -97,7 +100,8 @@ public void testNonRetryUnarySettings() {
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
CallOptions callOptionsUsed = setupUnaryCallable(retrySettings);
CallOptions callOptionsUsed =
setupUnaryCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
@@ -108,6 +112,46 @@ public void testNonRetryUnarySettings() {
assertThat(callOptionsUsed.getAuthority()).isEqualTo(CALL_OPTIONS_AUTHORITY);
}

@Test
public void testNonRetryUnarySettingsContextWithRetry() {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setTotalTimeout(totalTimeout)
.setInitialRetryDelay(Duration.ZERO)
.setRetryDelayMultiplier(1.0)
.setMaxRetryDelay(Duration.ZERO)
.setMaxAttempts(1)
.setJittered(true)
.setInitialRpcTimeout(initialRpcTimeout)
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
Duration newTimeout = Duration.ofSeconds(5);
RetrySettings contextRetrySettings =
retrySettings
.toBuilder()
.setInitialRpcTimeout(newTimeout)
.setMaxRpcTimeout(newTimeout)
.setMaxAttempts(3)
.build();
GrpcCallContext retryingContext =
defaultCallContext
.withRetrySettings(contextRetrySettings)
.withRetryableCodes(retryUnknownCode);
CallOptions callOptionsUsed =
setupUnaryCallable(retrySettings, emptyRetryCodes, retryingContext);

// Verify that the gRPC channel used the CallOptions the initial timeout of ~5 seconds.
// This indicates that the context retry settings were used on a callable that was instantiated
// with non-retryable settings.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
assertThat(callOptionsUsed.getDeadline())
.isGreaterThan(Deadline.after(newTimeout.toSecondsPart() - 1, TimeUnit.SECONDS));
assertThat(callOptionsUsed.getDeadline())
.isLessThan(Deadline.after(newTimeout.toSecondsPart(), TimeUnit.SECONDS));
assertThat(callOptionsUsed.getAuthority()).isEqualTo(CALL_OPTIONS_AUTHORITY);
}

@Test
public void testNonRetryUnarySettingsWithoutInitialRpcTimeout() {
RetrySettings retrySettings =
@@ -121,7 +165,8 @@ public void testNonRetryUnarySettingsWithoutInitialRpcTimeout() {
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
CallOptions callOptionsUsed = setupUnaryCallable(retrySettings);
CallOptions callOptionsUsed =
setupUnaryCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
@@ -145,7 +190,8 @@ public void testNonRetryUnarySettingsWithoutIndividualRpcTimeout() {
.setRpcTimeoutMultiplier(1.0)
.setRpcTimeoutMultiplier(1.0)
.build();
CallOptions callOptionsUsed = setupUnaryCallable(retrySettings);
CallOptions callOptionsUsed =
setupUnaryCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
@@ -170,7 +216,8 @@ public void testNonRetryServerStreamingSettings() {
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
CallOptions callOptionsUsed = setupServerStreamingCallable(retrySettings);
CallOptions callOptionsUsed =
setupServerStreamingCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
@@ -181,6 +228,41 @@ public void testNonRetryServerStreamingSettings() {
assertThat(callOptionsUsed.getAuthority()).isEqualTo(CALL_OPTIONS_AUTHORITY);
}

@Test
public void testNonRetryServerStreamingSettingsContextWithRetry() {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setTotalTimeout(totalTimeout)
.setInitialRetryDelay(Duration.ZERO)
.setRetryDelayMultiplier(1.0)
.setMaxRetryDelay(Duration.ZERO)
.setMaxAttempts(1)
.setJittered(true)
.setInitialRpcTimeout(initialRpcTimeout)
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
Duration newTimeout = Duration.ofSeconds(5);
RetrySettings contextRetrySettings =
retrySettings.toBuilder().setTotalTimeout(newTimeout).setMaxAttempts(3).build();
GrpcCallContext retryingContext =
defaultCallContext
.withRetrySettings(contextRetrySettings)
.withRetryableCodes(retryUnknownCode);
CallOptions callOptionsUsed =
setupServerStreamingCallable(retrySettings, emptyRetryCodes, retryingContext);

// Verify that the gRPC channel used the CallOptions the total timeout of ~5 seconds.
// This indicates that the context retry settings were used on a callable that was instantiated
// with non-retryable settings.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
assertThat(callOptionsUsed.getDeadline())
.isGreaterThan(Deadline.after(newTimeout.toSecondsPart() - 1, TimeUnit.SECONDS));
assertThat(callOptionsUsed.getDeadline())
.isLessThan(Deadline.after(newTimeout.toSecondsPart(), TimeUnit.SECONDS));
assertThat(callOptionsUsed.getAuthority()).isEqualTo(CALL_OPTIONS_AUTHORITY);
}

@Test
public void testNonRetryServerStreamingSettingsWithoutInitialRpcTimeout() {
RetrySettings retrySettings =
@@ -194,7 +276,8 @@ public void testNonRetryServerStreamingSettingsWithoutInitialRpcTimeout() {
.setRpcTimeoutMultiplier(1.0)
.setMaxRpcTimeout(maxRpcTimeout)
.build();
CallOptions callOptionsUsed = setupServerStreamingCallable(retrySettings);
CallOptions callOptionsUsed =
setupServerStreamingCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
@@ -218,7 +301,8 @@ public void testNonRetryServerStreamingSettingsWithoutIndividualRpcTimeout() {
.setRpcTimeoutMultiplier(1.0)
.setRpcTimeoutMultiplier(1.0)
.build();
CallOptions callOptionsUsed = setupServerStreamingCallable(retrySettings);
CallOptions callOptionsUsed =
setupServerStreamingCallable(retrySettings, emptyRetryCodes, defaultCallContext);

// Verify that the gRPC channel used the CallOptions with our custom timeout of ~2 Days.
assertThat(callOptionsUsed.getDeadline()).isNotNull();
@@ -229,7 +313,10 @@ public void testNonRetryServerStreamingSettingsWithoutIndividualRpcTimeout() {
assertThat(callOptionsUsed.getAuthority()).isEqualTo(CALL_OPTIONS_AUTHORITY);
}

private CallOptions setupUnaryCallable(RetrySettings retrySettings) {
private CallOptions setupUnaryCallable(
RetrySettings retrySettings,
ImmutableSet<StatusCode.Code> retryableCodes,
GrpcCallContext callContext) {
MethodDescriptor<String, String> methodDescriptor =
MethodDescriptor.<String, String>newBuilder()
.setSchemaDescriptor("yaml")
@@ -248,8 +335,8 @@ private CallOptions setupUnaryCallable(RetrySettings retrySettings) {
// Clobber the "authority" property with an identifier that allows us to trace
// the use of this CallOptions variable.
CallOptions spyCallOptions = CallOptions.DEFAULT.withAuthority("RETRYING_TEST");
GrpcCallContext grpcCallContext =
GrpcCallContext.createDefault().withChannel(managedChannel).withCallOptions(spyCallOptions);
GrpcCallContext context =
callContext.withChannel(managedChannel).withCallOptions(spyCallOptions);

ArgumentCaptor<CallOptions> callOptionsArgumentCaptor =
ArgumentCaptor.forClass(CallOptions.class);
@@ -266,16 +353,16 @@ private CallOptions setupUnaryCallable(RetrySettings retrySettings) {
.setMethodDescriptor(methodDescriptor)
.setParamsExtractor(paramsExtractor)
.build();
UnaryCallSettings<String, String> nonRetriedCallSettings =
UnaryCallSettings<String, String> unaryCallSettings =
UnaryCallSettings.<String, String>newUnaryCallSettingsBuilder()
.setRetrySettings(retrySettings)
.setRetryableCodes(emptyRetryCodes)
.setRetryableCodes(retryableCodes)
.build();
UnaryCallable<String, String> callable =
GrpcCallableFactory.createUnaryCallable(
grpcCallSettings,
nonRetriedCallSettings,
ClientContext.newBuilder().setDefaultCallContext(grpcCallContext).build());
unaryCallSettings,
ClientContext.newBuilder().setDefaultCallContext(context).build());

try {
ApiFuture<String> future = callable.futureCall("Is your refrigerator running?");
@@ -287,7 +374,10 @@ private CallOptions setupUnaryCallable(RetrySettings retrySettings) {
return callOptionsArgumentCaptor.getValue();
}

private CallOptions setupServerStreamingCallable(RetrySettings retrySettings) {
private CallOptions setupServerStreamingCallable(
RetrySettings retrySettings,
ImmutableSet<StatusCode.Code> retryableCodes,
GrpcCallContext callContext) {
MethodDescriptor<String, String> methodDescriptor =
MethodDescriptor.<String, String>newBuilder()
.setSchemaDescriptor("yaml")
@@ -306,8 +396,8 @@ private CallOptions setupServerStreamingCallable(RetrySettings retrySettings) {
// Clobber the "authority" property with an identifier that allows us to trace
// the use of this CallOptions variable.
CallOptions spyCallOptions = CallOptions.DEFAULT.withAuthority("RETRYING_TEST");
GrpcCallContext grpcCallContext =
GrpcCallContext.createDefault().withChannel(managedChannel).withCallOptions(spyCallOptions);
GrpcCallContext context =
callContext.withChannel(managedChannel).withCallOptions(spyCallOptions);

ArgumentCaptor<CallOptions> callOptionsArgumentCaptor =
ArgumentCaptor.forClass(CallOptions.class);
@@ -324,16 +414,16 @@ private CallOptions setupServerStreamingCallable(RetrySettings retrySettings) {
.setMethodDescriptor(methodDescriptor)
.setParamsExtractor(paramsExtractor)
.build();
ServerStreamingCallSettings<String, String> nonRetriedCallSettings =
ServerStreamingCallSettings<String, String> serverStreamingCallSettings =
ServerStreamingCallSettings.<String, String>newBuilder()
.setRetrySettings(retrySettings)
.setRetryableCodes(emptyRetryCodes)
.setRetryableCodes(retryableCodes)
.build();
ServerStreamingCallable<String, String> callable =
GrpcCallableFactory.createServerStreamingCallable(
grpcCallSettings,
nonRetriedCallSettings,
ClientContext.newBuilder().setDefaultCallContext(grpcCallContext).build());
serverStreamingCallSettings,
ClientContext.newBuilder().setDefaultCallContext(context).build());

try {
ServerStream<String> stream = callable.call("Is your refrigerator running?");
@@ -56,19 +56,21 @@ public static <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> retrying(
UnaryCallSettings<?, ?> callSettings,
ClientContext clientContext) {

if (areRetriesDisabled(callSettings.getRetryableCodes(), callSettings.getRetrySettings())) {
UnaryCallSettings<?, ?> settings = callSettings;

if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) {
// When retries are disabled, the total timeout can be treated as the rpc timeout.
return innerCallable.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withTimeout(callSettings.getRetrySettings().getTotalTimeout()));
settings =
settings
.toBuilder()
.setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout())
.build();
}

RetryAlgorithm<ResponseT> retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm<ResponseT>(),
new ExponentialRetryAlgorithm(
callSettings.getRetrySettings(), clientContext.getClock()));
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));
ScheduledRetryingExecutor<ResponseT> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());
return new RetryingCallable<>(
@@ -81,25 +83,26 @@ public static <RequestT, ResponseT> ServerStreamingCallable<RequestT, ResponseT>
ServerStreamingCallSettings<RequestT, ResponseT> callSettings,
ClientContext clientContext) {

if (areRetriesDisabled(callSettings.getRetryableCodes(), callSettings.getRetrySettings())) {
ServerStreamingCallSettings<RequestT, ResponseT> settings = callSettings;
if (areRetriesDisabled(settings.getRetryableCodes(), settings.getRetrySettings())) {
// When retries are disabled, the total timeout can be treated as the rpc timeout.
return innerCallable.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withTimeout(callSettings.getRetrySettings().getTotalTimeout()));
settings =
settings
.toBuilder()
.setSimpleTimeoutNoRetries(settings.getRetrySettings().getTotalTimeout())
.build();
}

StreamingRetryAlgorithm<Void> retryAlgorithm =
new StreamingRetryAlgorithm<>(
new ApiResultRetryAlgorithm<Void>(),
new ExponentialRetryAlgorithm(
callSettings.getRetrySettings(), clientContext.getClock()));
new ExponentialRetryAlgorithm(settings.getRetrySettings(), clientContext.getClock()));

ScheduledRetryingExecutor<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new RetryingServerStreamingCallable<>(
innerCallable, retryingExecutor, callSettings.getResumptionStrategy());
innerCallable, retryingExecutor, settings.getResumptionStrategy());
}

@BetaApi("The surface for streaming is not stable yet and may change in the future.")

0 comments on commit 51c40ab

Please sign in to comment.