Skip to content
Permalink
Browse files
feat: support retry settings and retryable codes in call context (#1238)
* feat: support retry settings and retryable codes in call context

Allows applications to supply retry settings and retryabe codes for individual RPCs
through ApiCallContext.

Fixes #1197

* chore: fix formatting and license headers

* feat: use context retry settings for streaming calls

* fix: process review comments

* fix: missed one Thread.sleep

* fix: fix style

* fix: address review comments

* refactor: remove ContextAwareResultRetryAlgorithm

* refactor: remove ContextAwareTimedRetryAlgorithm

* fix: make package-private + add ignored

* revert: revert to always using global settings for jittered

* test: add tests for NoopRetryingContext

* chore: cleanup 'this' usage

* refactor: merge context aware classes with base classes

* chore: cleanup and remove unnecessary methods

* test: add safety margin as the retry settings are now jittered

* docs: add javadoc

* chore: address review comments

* fix: address review comments

* fix: add tests + fix potential NPE

* fix: remove unused method + add tests

* test: add additional test calls

* fix: deprecation
  • Loading branch information
olavloite committed Mar 18, 2021
1 parent 86d5c72 commit 7f7aa252ce96413cb09e01cc2e76672b167b1baf
Showing with 2,093 additions and 254 deletions.
  1. +137 −43 gax-grpc/src/main/java/com/google/api/gax/grpc/GrpcCallContext.java
  2. +29 −9 gax-grpc/src/test/java/com/google/api/gax/grpc/GrpcCallContextTest.java
  3. +133 −17 gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallContext.java
  4. +48 −9 gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonCallContextTest.java
  5. +35 −13 gax/src/main/java/com/google/api/gax/retrying/BasicResultRetryAlgorithm.java
  6. +6 −4 gax/src/main/java/com/google/api/gax/retrying/BasicRetryingFuture.java
  7. +77 −13 gax/src/main/java/com/google/api/gax/retrying/ExponentialRetryAlgorithm.java
  8. +12 −0 gax/src/main/java/com/google/api/gax/retrying/NoopRetryingContext.java
  9. +11 −23 gax/src/main/java/com/google/api/gax/retrying/ResultRetryAlgorithm.java
  10. +81 −0 gax/src/main/java/com/google/api/gax/retrying/ResultRetryAlgorithmWithContext.java
  11. +158 −19 gax/src/main/java/com/google/api/gax/retrying/RetryAlgorithm.java
  12. +17 −0 gax/src/main/java/com/google/api/gax/retrying/RetryingContext.java
  13. +71 −16 gax/src/main/java/com/google/api/gax/retrying/StreamingRetryAlgorithm.java
  14. +15 −23 gax/src/main/java/com/google/api/gax/retrying/TimedRetryAlgorithm.java
  15. +86 −0 gax/src/main/java/com/google/api/gax/retrying/TimedRetryAlgorithmWithContext.java
  16. +53 −0 gax/src/main/java/com/google/api/gax/rpc/ApiCallContext.java
  17. +25 −2 gax/src/main/java/com/google/api/gax/rpc/ApiResultRetryAlgorithm.java
  18. +64 −13 gax/src/test/java/com/google/api/gax/retrying/AbstractRetryingExecutorTest.java
  19. +53 −1 gax/src/test/java/com/google/api/gax/retrying/BasicRetryingFutureTest.java
  20. +48 −6 gax/src/test/java/com/google/api/gax/retrying/ExponentialRetryAlgorithmTest.java
  21. +27 −7 gax/src/test/java/com/google/api/gax/retrying/FailingCallable.java
  22. +60 −0 gax/src/test/java/com/google/api/gax/retrying/NoopRetryingContextTest.java
  23. +205 −0 gax/src/test/java/com/google/api/gax/retrying/RetryAlgorithmTest.java
  24. +12 −9 gax/src/test/java/com/google/api/gax/retrying/ScheduledRetryingExecutorTest.java
  25. +112 −0 gax/src/test/java/com/google/api/gax/rpc/ApiResultRetryAlgorithmTest.java
  26. +156 −0 gax/src/test/java/com/google/api/gax/rpc/RetryingTest.java
  27. +23 −6 gax/src/test/java/com/google/api/gax/rpc/StreamingCallableTest.java
  28. +228 −0 gax/src/test/java/com/google/api/gax/rpc/StreamingRetryAlgorithmTest.java
  29. +31 −11 gax/src/test/java/com/google/api/gax/rpc/UnaryCallableTest.java
  30. +80 −10 gax/src/test/java/com/google/api/gax/rpc/testing/FakeCallContext.java
@@ -30,14 +30,17 @@
package com.google.api.gax.grpc;

import com.google.api.core.BetaApi;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.internal.Headers;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.NoopApiTracer;
import com.google.auth.Credentials;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.CallOptions.Key;
@@ -48,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
@@ -70,18 +74,36 @@ public final class GrpcCallContext implements ApiCallContext {
@Nullable private final Duration streamWaitTimeout;
@Nullable private final Duration streamIdleTimeout;
@Nullable private final Integer channelAffinity;
@Nullable private final RetrySettings retrySettings;
@Nullable private final ImmutableSet<StatusCode.Code> retryableCodes;
private final ImmutableMap<String, List<String>> extraHeaders;

/** Returns an empty instance with a null channel and default {@link CallOptions}. */
public static GrpcCallContext createDefault() {
return new GrpcCallContext(
null, CallOptions.DEFAULT, null, null, null, null, ImmutableMap.<String, List<String>>of());
null,
CallOptions.DEFAULT,
null,
null,
null,
null,
ImmutableMap.<String, List<String>>of(),
null,
null);
}

/** Returns an instance with the given channel and {@link CallOptions}. */
public static GrpcCallContext of(Channel channel, CallOptions callOptions) {
return new GrpcCallContext(
channel, callOptions, null, null, null, null, ImmutableMap.<String, List<String>>of());
channel,
callOptions,
null,
null,
null,
null,
ImmutableMap.<String, List<String>>of(),
null,
null);
}

private GrpcCallContext(
@@ -91,14 +113,18 @@ private GrpcCallContext(
@Nullable Duration streamWaitTimeout,
@Nullable Duration streamIdleTimeout,
@Nullable Integer channelAffinity,
ImmutableMap<String, List<String>> extraHeaders) {
ImmutableMap<String, List<String>> extraHeaders,
@Nullable RetrySettings retrySettings,
@Nullable Set<StatusCode.Code> retryableCodes) {
this.channel = channel;
this.callOptions = Preconditions.checkNotNull(callOptions);
this.timeout = timeout;
this.streamWaitTimeout = streamWaitTimeout;
this.streamIdleTimeout = streamIdleTimeout;
this.channelAffinity = channelAffinity;
this.extraHeaders = Preconditions.checkNotNull(extraHeaders);
this.retrySettings = retrySettings;
this.retryableCodes = retryableCodes == null ? null : ImmutableSet.copyOf(retryableCodes);
}

/**
@@ -160,7 +186,9 @@ public GrpcCallContext withTimeout(@Nullable Duration timeout) {
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders);
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

@Nullable
@@ -177,13 +205,15 @@ public GrpcCallContext withStreamWaitTimeout(@Nullable Duration streamWaitTimeou
}

return new GrpcCallContext(
channel,
callOptions,
timeout,
this.channel,
this.callOptions,
this.timeout,
streamWaitTimeout,
streamIdleTimeout,
channelAffinity,
extraHeaders);
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

@Override
@@ -194,25 +224,29 @@ public GrpcCallContext withStreamIdleTimeout(@Nullable Duration streamIdleTimeou
}

return new GrpcCallContext(
channel,
callOptions,
timeout,
streamWaitTimeout,
this.channel,
this.callOptions,
this.timeout,
this.streamWaitTimeout,
streamIdleTimeout,
channelAffinity,
extraHeaders);
this.channelAffinity,
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

@BetaApi("The surface for channel affinity is not stable yet and may change in the future.")
public GrpcCallContext withChannelAffinity(@Nullable Integer affinity) {
return new GrpcCallContext(
channel,
callOptions,
timeout,
streamWaitTimeout,
streamIdleTimeout,
this.channel,
this.callOptions,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
affinity,
extraHeaders);
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

@BetaApi("The surface for extra headers is not stable yet and may change in the future.")
@@ -222,13 +256,53 @@ public GrpcCallContext withExtraHeaders(Map<String, List<String>> extraHeaders)
ImmutableMap<String, List<String>> newExtraHeaders =
Headers.mergeHeaders(this.extraHeaders, extraHeaders);
return new GrpcCallContext(
channel,
callOptions,
timeout,
streamWaitTimeout,
streamIdleTimeout,
channelAffinity,
newExtraHeaders);
this.channel,
this.callOptions,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
newExtraHeaders,
this.retrySettings,
this.retryableCodes);
}

@Override
public RetrySettings getRetrySettings() {
return this.retrySettings;
}

@Override
public GrpcCallContext withRetrySettings(RetrySettings retrySettings) {
return new GrpcCallContext(
this.channel,
this.callOptions,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders,
retrySettings,
this.retryableCodes);
}

@Override
public Set<StatusCode.Code> getRetryableCodes() {
return this.retryableCodes;
}

@Override
public GrpcCallContext withRetryableCodes(Set<StatusCode.Code> retryableCodes) {
return new GrpcCallContext(
this.channel,
this.callOptions,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders,
this.retrySettings,
retryableCodes);
}

@Override
@@ -283,8 +357,18 @@ public ApiCallContext merge(ApiCallContext inputCallContext) {
newChannelAffinity = this.channelAffinity;
}

RetrySettings newRetrySettings = grpcCallContext.retrySettings;
if (newRetrySettings == null) {
newRetrySettings = this.retrySettings;
}

Set<StatusCode.Code> newRetryableCodes = grpcCallContext.retryableCodes;
if (newRetryableCodes == null) {
newRetryableCodes = this.retryableCodes;
}

ImmutableMap<String, List<String>> newExtraHeaders =
Headers.mergeHeaders(extraHeaders, grpcCallContext.extraHeaders);
Headers.mergeHeaders(this.extraHeaders, grpcCallContext.extraHeaders);

CallOptions newCallOptions =
grpcCallContext
@@ -303,7 +387,9 @@ public ApiCallContext merge(ApiCallContext inputCallContext) {
newStreamWaitTimeout,
newStreamIdleTimeout,
newChannelAffinity,
newExtraHeaders);
newExtraHeaders,
newRetrySettings,
newRetryableCodes);
}

/** The {@link Channel} set on this context. */
@@ -357,23 +443,27 @@ public GrpcCallContext withChannel(Channel newChannel) {
return new GrpcCallContext(
newChannel,
this.callOptions,
timeout,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders);
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

/** Returns a new instance with the call options set to the given call options. */
public GrpcCallContext withCallOptions(CallOptions newCallOptions) {
return new GrpcCallContext(
this.channel,
newCallOptions,
timeout,
this.timeout,
this.streamWaitTimeout,
this.streamIdleTimeout,
this.channelAffinity,
this.extraHeaders);
this.extraHeaders,
this.retrySettings,
this.retryableCodes);
}

public GrpcCallContext withRequestParamsDynamicHeaderOption(String requestParams) {
@@ -410,7 +500,9 @@ public int hashCode() {
streamWaitTimeout,
streamIdleTimeout,
channelAffinity,
extraHeaders);
extraHeaders,
retrySettings,
retryableCodes);
}

@Override
@@ -423,13 +515,15 @@ public boolean equals(Object o) {
}

GrpcCallContext that = (GrpcCallContext) o;
return Objects.equals(channel, that.channel)
&& Objects.equals(callOptions, that.callOptions)
&& Objects.equals(timeout, that.timeout)
&& Objects.equals(streamWaitTimeout, that.streamWaitTimeout)
&& Objects.equals(streamIdleTimeout, that.streamIdleTimeout)
&& Objects.equals(channelAffinity, that.channelAffinity)
&& Objects.equals(extraHeaders, that.extraHeaders);
return Objects.equals(this.channel, that.channel)
&& Objects.equals(this.callOptions, that.callOptions)
&& Objects.equals(this.timeout, that.timeout)
&& Objects.equals(this.streamWaitTimeout, that.streamWaitTimeout)
&& Objects.equals(this.streamIdleTimeout, that.streamIdleTimeout)
&& Objects.equals(this.channelAffinity, that.channelAffinity)
&& Objects.equals(this.extraHeaders, that.extraHeaders)
&& Objects.equals(this.retrySettings, that.retrySettings)
&& Objects.equals(this.retryableCodes, that.retryableCodes);
}

Metadata getMetadata() {

0 comments on commit 7f7aa25

Please sign in to comment.