diff --git a/.github/workflows/sonar.yaml b/.github/workflows/sonar.yaml index 597087fe62..84bb86234e 100644 --- a/.github/workflows/sonar.yaml +++ b/.github/workflows/sonar.yaml @@ -79,5 +79,4 @@ jobs: -Dsonar.projectKey=googleapis_gapic-generator-java_unit_tests \ -Dsonar.organization=googleapis \ -Dsonar.host.url=https://sonarcloud.io \ - -Dsonar.projectName=java_showcase_unit_tests - + -Dsonar.projectName=java_showcase_unit_tests \ No newline at end of file diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallOptions.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallOptions.java index dbb3cb6259..dfe3258465 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallOptions.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonCallOptions.java @@ -33,6 +33,7 @@ import com.google.auth.Credentials; import com.google.auto.value.AutoValue; import com.google.protobuf.TypeRegistry; +import java.time.Duration; import javax.annotation.Nullable; import org.threeten.bp.Instant; @@ -42,6 +43,9 @@ public abstract class HttpJsonCallOptions { public static final HttpJsonCallOptions DEFAULT = newBuilder().build(); + @Nullable + public abstract Duration getTimeout(); + @Nullable public abstract Instant getDeadline(); @@ -69,6 +73,13 @@ public HttpJsonCallOptions merge(HttpJsonCallOptions inputOptions) { builder.setDeadline(newDeadline); } + if (inputOptions.getTimeout() != null) { + Duration newTimeout = java.time.Duration.ofMillis(inputOptions.getTimeout().toMillis()); + if (newTimeout != null) { + builder.setTimeout(newTimeout); + } + } + Credentials newCredentials = inputOptions.getCredentials(); if (newCredentials != null) { builder.setCredentials(newCredentials); @@ -84,6 +95,8 @@ public HttpJsonCallOptions merge(HttpJsonCallOptions inputOptions) { @AutoValue.Builder public abstract static class Builder { + public abstract Builder setTimeout(java.time.Duration value); + public abstract Builder setDeadline(Instant value); public abstract Builder setCredentials(Credentials value); diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java index 9f52712b2e..e99e130739 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java @@ -33,15 +33,19 @@ import com.google.api.gax.httpjson.ApiMethodDescriptor.MethodType; import com.google.api.gax.httpjson.HttpRequestRunnable.ResultListener; import com.google.api.gax.httpjson.HttpRequestRunnable.RunnableResult; +import com.google.api.gax.rpc.StatusCode; import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.CancellationException; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -88,6 +92,7 @@ final class HttpJsonClientCallImpl private final ApiMethodDescriptor methodDescriptor; private final HttpTransport httpTransport; private final Executor executor; + private final ScheduledExecutorService deadlineCancellationExecutor; // // Request-specific data (provided by client code) before we get a response. @@ -114,19 +119,21 @@ final class HttpJsonClientCallImpl private ProtoMessageJsonStreamIterator responseStreamIterator; @GuardedBy("lock") - private boolean closed; + private volatile boolean closed; HttpJsonClientCallImpl( ApiMethodDescriptor methodDescriptor, String endpoint, HttpJsonCallOptions callOptions, HttpTransport httpTransport, - Executor executor) { + Executor executor, + ScheduledExecutorService deadlineCancellationExecutor) { this.methodDescriptor = methodDescriptor; this.endpoint = endpoint; this.callOptions = callOptions; this.httpTransport = httpTransport; this.executor = executor; + this.deadlineCancellationExecutor = deadlineCancellationExecutor; this.closed = false; } @@ -161,6 +168,38 @@ public void start(Listener responseListener, HttpJsonMetadata request this.listener = responseListener; this.requestHeaders = requestHeaders; } + + // Use the timeout duration value instead of calculating the future Instant + // Only schedule the deadline if the RPC timeout has been set in the RetrySettings + Duration timeout = callOptions.getTimeout(); + if (timeout != null) { + // The future timeout value is guaranteed to not be a negative value as the + // RetryAlgorithm will not retry + long timeoutMs = timeout.toMillis(); + this.deadlineCancellationExecutor.schedule(this::timeout, timeoutMs, TimeUnit.MILLISECONDS); + } + } + + // Notify the FutureListener that the there is a timeout exception from this RPC + // call (DEADLINE_EXCEEDED). For retrying RPCs, this code is returned for every attempt + // that exceeds the timeout. The RetryAlgorithm will check both the timing and code to + // ensure another attempt is made. + private void timeout() { + // There is a race between the deadline scheduler and response being returned from + // the server. The deadline scheduler has priority as it will clear out the pending + // notifications queue and add the DEADLINE_EXCEEDED event once it is able to obtain + // the lock. + synchronized (lock) { + close( + StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode(), + "Deadline exceeded", + new HttpJsonStatusRuntimeException( + StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode(), "Deadline exceeded", null), + true); + } + + // trigger delivery loop if not already running + deliver(); } @Override @@ -260,9 +299,10 @@ private void deliver() { throw new InterruptedException("Message delivery has been interrupted"); } - // All listeners must be called under delivery loop (but outside the lock) to ensure that no - // two notifications come simultaneously from two different threads and that we do not go - // indefinitely deep in the stack if delivery logic is called recursively via listeners. + // All listeners must be called under delivery loop (but outside the lock) to ensure that + // no two notifications come simultaneously from two different threads and that we do not + // go indefinitely deep in the stack if delivery logic is called recursively via + // listeners. notifyListeners(); // The synchronized block around message reading and cancellation notification processing @@ -302,7 +342,7 @@ private void deliver() { inDelivery = false; break; } else { - // We still have some stuff in notiticationTasksQueue so continue the loop, most + // We still have some stuff in notificationTasksQueue so continue the loop, most // likely we will finally terminate on the next cycle. continue; } @@ -319,8 +359,8 @@ private void deliver() { // can do in such an unlikely situation (otherwise we would stay forever in the delivery // loop). synchronized (lock) { - // Close the call immediately marking it cancelled. If already closed close() will have no - // effect. + // Close the call immediately marking it cancelled. If already closed, close() will have + // no effect. close(ex.getStatusCode(), ex.getMessage(), ex, true); } } @@ -352,7 +392,7 @@ private boolean consumeMessageFromStream() throws IOException { boolean allMessagesConsumed; Reader responseReader; if (methodDescriptor.getType() == MethodType.SERVER_STREAMING) { - // Lazily initialize responseStreamIterator in case if it is a server steraming response + // Lazily initialize responseStreamIterator in case if it is a server streaming response if (responseStreamIterator == null) { responseStreamIterator = new ProtoMessageJsonStreamIterator( @@ -384,7 +424,7 @@ private boolean consumeMessageFromStream() throws IOException { @GuardedBy("lock") private void close( - int statusCode, String message, Throwable cause, boolean terminateImmediatelly) { + int statusCode, String message, Throwable cause, boolean terminateImmediately) { try { if (closed) { return; @@ -399,12 +439,12 @@ private void close( requestRunnable = null; } - HttpJsonMetadata.Builder meatadaBuilder = HttpJsonMetadata.newBuilder(); + HttpJsonMetadata.Builder metadataBuilder = HttpJsonMetadata.newBuilder(); if (runnableResult != null && runnableResult.getTrailers() != null) { - meatadaBuilder = runnableResult.getTrailers().toBuilder(); + metadataBuilder = runnableResult.getTrailers().toBuilder(); } - meatadaBuilder.setException(cause); - meatadaBuilder.setStatusMessage(message); + metadataBuilder.setException(cause); + metadataBuilder.setStatusMessage(message); if (responseStreamIterator != null) { responseStreamIterator.close(); } @@ -415,7 +455,7 @@ private void close( // onClose() suppresses all other pending notifications. // there should be no place in the code which inserts something in this queue before checking // the `closed` flag under the lock and refusing to insert anything if `closed == true`. - if (terminateImmediatelly) { + if (terminateImmediately) { // This usually means we are cancelling the call before processing the response in full. // It may happen if a user explicitly cancels the call or in response to an unexpected // exception either from server or a call listener execution. @@ -423,11 +463,11 @@ private void close( } pendingNotifications.offer( - new OnCloseNotificationTask<>(listener, statusCode, meatadaBuilder.build())); + new OnCloseNotificationTask<>(listener, statusCode, metadataBuilder.build())); } catch (Throwable e) { // suppress stream closing exceptions in favor of the actual call closing cause. This method - // should not throw, otherwise we may stuck in an infinite loop of exception processing. + // should not throw, otherwise we may be stuck in an infinite loop of exception processing. } } diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCalls.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCalls.java index 1b6ab41d7f..c0b9719574 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCalls.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCalls.java @@ -34,8 +34,7 @@ import com.google.api.gax.rpc.ApiCallContext; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nonnull; -import org.threeten.bp.Instant; +import org.threeten.bp.Duration; /** * {@code HttpJsonClientCalls} creates a new {@code HttpJsonClientCAll} from the given call context. @@ -50,12 +49,25 @@ public static HttpJsonClientCall newC HttpJsonCallContext httpJsonContext = HttpJsonCallContext.createDefault().nullToSelf(context); - // Try to convert the timeout into a deadline and use it if it occurs before the actual deadline + // Use the context's timeout instead of calculating a future deadline with the System clock. + // The timeout value is calculated from TimedAttemptSettings which accounts for the + // TotalTimeout value set in the RetrySettings. if (httpJsonContext.getTimeout() != null) { - @Nonnull Instant newDeadline = Instant.now().plus(httpJsonContext.getTimeout()); HttpJsonCallOptions callOptions = httpJsonContext.getCallOptions(); - if (callOptions.getDeadline() == null || newDeadline.isBefore(callOptions.getDeadline())) { - callOptions = callOptions.toBuilder().setDeadline(newDeadline).build(); + // HttpJsonChannel expects the HttpJsonCallOptions and we store the timeout duration + // inside the HttpJsonCallOptions + // Note: There is manual conversion between threetenbp's Duration and java.util.Duration + // This is temporary here as we plan to migrate to java.util.Duration + if (callOptions.getTimeout() == null + || httpJsonContext + .getTimeout() + .compareTo(Duration.ofMillis(callOptions.getTimeout().toMillis())) + < 0) { + callOptions = + callOptions + .toBuilder() + .setTimeout(java.time.Duration.ofMillis(httpJsonContext.getTimeout().toMillis())) + .build(); httpJsonContext = httpJsonContext.withCallOptions(callOptions); } } diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpRequestRunnable.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpRequestRunnable.java index cc0ca4c20d..b5597099d2 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpRequestRunnable.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpRequestRunnable.java @@ -52,12 +52,11 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Map.Entry; import javax.annotation.Nullable; -import org.threeten.bp.Duration; -import org.threeten.bp.Instant; /** A runnable object that creates and executes an HTTP request. */ class HttpRequestRunnable implements Runnable { @@ -100,24 +99,22 @@ void cancel() { @Override public void run() { - HttpResponse httpResponse = null; RunnableResult.Builder result = RunnableResult.builder(); HttpJsonMetadata.Builder trailers = HttpJsonMetadata.newBuilder(); - HttpRequest httpRequest = null; + HttpResponse httpResponse = null; try { // Check if already cancelled before even creating a request if (cancelled) { return; } - httpRequest = createHttpRequest(); + HttpRequest httpRequest = createHttpRequest(); // Check if already cancelled before sending the request; if (cancelled) { return; } - httpResponse = httpRequest.execute(); - // Check if already cancelled before sending the request; + // Check if already cancelled before trying to construct and read the response if (cancelled) { httpResponse.disconnect(); return; @@ -145,6 +142,9 @@ public void run() { } trailers.setException(e); } finally { + // If cancelled, `close()` in HttpJsonClientCallImpl has already been invoked + // and returned a DEADLINE_EXCEEDED error back so there is no need to set + // a result back. if (!cancelled) { resultListener.setResult(result.setTrailers(trailers.build()).build()); } @@ -191,16 +191,6 @@ HttpRequest createHttpRequest() throws IOException { HttpRequest httpRequest = buildRequest(requestFactory, url, jsonHttpContent); - Instant deadline = httpJsonCallOptions.getDeadline(); - if (deadline != null) { - long readTimeout = Duration.between(Instant.now(), deadline).toMillis(); - if (httpRequest.getReadTimeout() > 0 - && httpRequest.getReadTimeout() < readTimeout - && readTimeout < Integer.MAX_VALUE) { - httpRequest.setReadTimeout((int) readTimeout); - } - } - for (Map.Entry entry : headers.getHeaders().entrySet()) { HttpHeadersUtils.setHeader( httpRequest.getHeaders(), entry.getKey(), (String) entry.getValue()); @@ -243,9 +233,35 @@ private HttpRequest buildRequest( HttpHeadersUtils.setHeader( httpRequest.getHeaders(), "X-HTTP-Method-Override", originalHttpMethod); } + + Duration timeout = httpJsonCallOptions.getTimeout(); + if (timeout != null) { + long timeoutMs = timeout.toMillis(); + + // Read timeout is the timeout between reading two data packets and not total timeout + // HttpJsonClientCallsImpl implements a deadlineCancellationExecutor to cancel the + // RPC when it exceeds the RPC timeout + if (shouldUpdateTimeout(httpRequest.getReadTimeout(), timeoutMs)) { + httpRequest.setReadTimeout((int) timeoutMs); + } + + // Connect timeout is the time allowed for establishing the connection. + // This is updated to match the RPC timeout as we do not want a shorter + // connect timeout to preemptively throw a ConnectExcepetion before + // we've reached the RPC timeout + if (shouldUpdateTimeout(httpRequest.getConnectTimeout(), timeoutMs)) { + httpRequest.setConnectTimeout((int) timeoutMs); + } + } return httpRequest; } + private boolean shouldUpdateTimeout(int currentTimeoutMs, long newTimeoutMs) { + return currentTimeoutMs > 0 + && currentTimeoutMs < newTimeoutMs + && newTimeoutMs < Integer.MAX_VALUE; + } + // This will be frequently executed, so avoiding using regexps if not necessary. private String normalizeEndpoint(String rawEndpoint) { String normalized = rawEndpoint; diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java index 6fb4200d30..e6fe38d5ef 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/ManagedHttpJsonChannel.java @@ -38,6 +38,8 @@ import java.io.IOException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -51,6 +53,7 @@ public class ManagedHttpJsonChannel implements HttpJsonChannel, BackgroundResour private final Executor executor; private final String endpoint; private final HttpTransport httpTransport; + private final ScheduledExecutorService deadlineScheduledExecutorService; private boolean isTransportShutdown; @@ -63,6 +66,7 @@ private ManagedHttpJsonChannel( this.executor = executor; this.endpoint = endpoint; this.httpTransport = httpTransport == null ? new NetHttpTransport() : httpTransport; + this.deadlineScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); } @Override @@ -70,7 +74,12 @@ public HttpJsonClientCall newCall( ApiMethodDescriptor methodDescriptor, HttpJsonCallOptions callOptions) { return new HttpJsonClientCallImpl<>( - methodDescriptor, endpoint, callOptions, httpTransport, executor); + methodDescriptor, + endpoint, + callOptions, + httpTransport, + executor, + deadlineScheduledExecutorService); } @Override @@ -79,6 +88,7 @@ public synchronized void shutdown() { return; } try { + deadlineScheduledExecutorService.shutdown(); httpTransport.shutdown(); isTransportShutdown = true; } catch (IOException e) { diff --git a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectCallableTest.java b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectCallableTest.java index 2e9d5cb082..fa666dc69c 100644 --- a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectCallableTest.java +++ b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectCallableTest.java @@ -167,7 +167,10 @@ public void testSuccessfulMultipleResponsesForUnaryCall() HttpJsonDirectCallable callable = new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR); - HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel); + HttpJsonCallContext callContext = + HttpJsonCallContext.createDefault() + .withChannel(channel) + .withTimeout(Duration.ofSeconds(30)); Field request = createTestMessage(2); Field expectedResponse = createTestMessage(2); @@ -199,7 +202,10 @@ public void testErrorMultipleResponsesForUnaryCall() HttpJsonDirectCallable callable = new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR); - HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel); + HttpJsonCallContext callContext = + HttpJsonCallContext.createDefault() + .withChannel(channel) + .withTimeout(Duration.ofSeconds(30)); Field request = createTestMessage(2); Field expectedResponse = createTestMessage(2); @@ -228,7 +234,10 @@ public void testErrorUnaryResponse() throws InterruptedException { HttpJsonDirectCallable callable = new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR); - HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel); + HttpJsonCallContext callContext = + HttpJsonCallContext.createDefault() + .withChannel(channel) + .withTimeout(Duration.ofSeconds(30)); ApiException exception = ApiExceptionFactory.createException( @@ -257,7 +266,10 @@ public void testErrorNullContentSuccessfulResponse() throws InterruptedException HttpJsonDirectCallable callable = new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR); - HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel); + HttpJsonCallContext callContext = + HttpJsonCallContext.createDefault() + .withChannel(channel) + .withTimeout(Duration.ofSeconds(30)); MOCK_SERVICE.addNullResponse(); @@ -283,7 +295,10 @@ public void testErrorNullContentFailedResponse() throws InterruptedException { HttpJsonDirectCallable callable = new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR); - HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel); + HttpJsonCallContext callContext = + HttpJsonCallContext.createDefault() + .withChannel(channel) + .withTimeout(Duration.ofSeconds(30)); MOCK_SERVICE.addNullResponse(400); try { @@ -306,7 +321,10 @@ public void testErrorNon2xxOr4xxResponse() throws InterruptedException { HttpJsonDirectCallable callable = new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR); - HttpJsonCallContext callContext = HttpJsonCallContext.createDefault().withChannel(channel); + HttpJsonCallContext callContext = + HttpJsonCallContext.createDefault() + .withChannel(channel) + .withTimeout(Duration.ofSeconds(30)); ApiException exception = ApiExceptionFactory.createException( @@ -323,6 +341,34 @@ public void testErrorNon2xxOr4xxResponse() throws InterruptedException { } } + /** + * Expectation is that an RPC that exceeds the Timeout value set will receive a DEADLINE_EXCEEDED + * response back. In this test, the call has a timeout value that is smaller than the time it + * takes for the mock service to return a response. + * + * @throws InterruptedException + */ + @Test + public void testDeadlineExceededResponse() throws InterruptedException { + HttpJsonDirectCallable callable = + new HttpJsonDirectCallable<>(FAKE_METHOD_DESCRIPTOR); + + HttpJsonCallContext callContext = + HttpJsonCallContext.createDefault().withChannel(channel).withTimeout(Duration.ofSeconds(3)); + + Field response = createTestMessage(10); + MOCK_SERVICE.addResponse(response, java.time.Duration.ofSeconds(5)); + + try { + callable.futureCall(createTestMessage(10), callContext).get(); + Assert.fail("No exception raised"); + } catch (ExecutionException e) { + HttpJsonStatusRuntimeException respExp = (HttpJsonStatusRuntimeException) e.getCause(); + assertThat(respExp.getStatusCode()).isEqualTo(504); + assertThat(respExp.getMessage()).isEqualTo("Deadline exceeded"); + } + } + private Field createTestMessage(int number) { return Field.newBuilder() // "echo" service .setName("john/imTheBestField") diff --git a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectServerStreamingCallableTest.java b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectServerStreamingCallableTest.java index 4152aa0991..8781ad00d4 100644 --- a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectServerStreamingCallableTest.java +++ b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonDirectServerStreamingCallableTest.java @@ -34,6 +34,7 @@ import com.google.api.gax.httpjson.testing.MockHttpService; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.DeadlineExceededException; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.ServerStreamingCallSettings; @@ -65,6 +66,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; @RunWith(JUnit4.class) public class HttpJsonDirectServerStreamingCallableTest { @@ -114,6 +116,7 @@ public class HttpJsonDirectServerStreamingCallableTest { private static final Money DEFAULTER_RESPONSE = Money.newBuilder().setCurrencyCode("UAH").setUnits(255).build(); + private ManagedHttpJsonChannel channel; private ClientContext clientContext; private ServerStreamingCallSettings streamingCallSettings; private ServerStreamingCallable streamingCallable; @@ -132,7 +135,7 @@ public static void destroy() { @Before public void setUp() { - ManagedHttpJsonChannel channel = + channel = new ManagedHttpJsonInterceptorChannel( ManagedHttpJsonChannel.newBuilder() .setEndpoint("google.com:443") @@ -144,7 +147,9 @@ public void setUp() { clientContext = ClientContext.newBuilder() .setTransportChannel(HttpJsonTransportChannel.create(channel)) - .setDefaultCallContext(HttpJsonCallContext.of(channel, HttpJsonCallOptions.DEFAULT)) + .setDefaultCallContext( + HttpJsonCallContext.of(channel, HttpJsonCallOptions.DEFAULT) + .withTimeout(Duration.ofSeconds(3))) .build(); streamingCallSettings = ServerStreamingCallSettings.newBuilder().build(); streamingCallable = @@ -156,6 +161,7 @@ public void setUp() { @After public void tearDown() { + channel.shutdown(); MOCK_SERVICE.reset(); } @@ -326,6 +332,26 @@ public void testBlockingServerStreaming() { Truth.assertThat(responseData).containsExactly(expected); } + // This test ensures that the server-side streaming does not exceed the timeout value + @Test + public void testDeadlineExceededServerStreaming() throws InterruptedException { + MOCK_SERVICE.addResponse( + new Money[] {DEFAULT_RESPONSE, DEFAULTER_RESPONSE}, java.time.Duration.ofSeconds(5)); + Color request = Color.newBuilder().setRed(0.5f).build(); + CountDownLatch latch = new CountDownLatch(1); + MoneyObserver moneyObserver = new MoneyObserver(false, latch); + + streamingCallable.call(request, moneyObserver); + + moneyObserver.controller.request(2); + // Set the latch's await time to above the context's timeout value to ensure that + // the latch has been released. + Truth.assertThat(latch.await(5000, TimeUnit.MILLISECONDS)).isTrue(); + + Truth.assertThat(moneyObserver.error).isInstanceOf(DeadlineExceededException.class); + Truth.assertThat(moneyObserver.error).hasMessageThat().isEqualTo("Deadline exceeded"); + } + static class MoneyObserver extends StateCheckingResponseObserver { private final boolean autoFlowControl; private final CountDownLatch latch; diff --git a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpRequestRunnableTest.java b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpRequestRunnableTest.java index 88767c1d4c..b6d4b0943f 100644 --- a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpRequestRunnableTest.java +++ b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpRequestRunnableTest.java @@ -243,4 +243,56 @@ public void testUnicodeValuesInBody() throws IOException { Truth.assertThat(result).isEqualTo(bodyRequestMessage); } } + + @Test + public void testUpdateRunnableTimeout_shouldNotUpdate() throws IOException { + ApiMethodDescriptor methodDescriptor = + ApiMethodDescriptor.newBuilder() + .setFullMethodName("house.cat.get") + .setHttpMethod("POST") + .setRequestFormatter(requestFormatter) + .setResponseParser(responseParser) + .build(); + + HttpRequestRunnable httpRequestRunnable = + new HttpRequestRunnable<>( + requestMessage, + methodDescriptor, + "www.googleapis.com/animals/v1/projects", + HttpJsonCallOptions.newBuilder().setTimeout(java.time.Duration.ofMillis(5000L)).build(), + new MockHttpTransport(), + HttpJsonMetadata.newBuilder().build(), + (result) -> {}); + + HttpRequest httpRequest = httpRequestRunnable.createHttpRequest(); + Truth.assertThat(httpRequest.getReadTimeout()).isEqualTo(20000L); + Truth.assertThat(httpRequest.getConnectTimeout()).isEqualTo(20000L); + } + + @Test + public void testUpdateRunnableTimeout_shouldUpdate() throws IOException { + ApiMethodDescriptor methodDescriptor = + ApiMethodDescriptor.newBuilder() + .setFullMethodName("house.cat.get") + .setHttpMethod("POST") + .setRequestFormatter(requestFormatter) + .setResponseParser(responseParser) + .build(); + + HttpRequestRunnable httpRequestRunnable = + new HttpRequestRunnable<>( + requestMessage, + methodDescriptor, + "www.googleapis.com/animals/v1/projects", + HttpJsonCallOptions.newBuilder() + .setTimeout(java.time.Duration.ofMillis(30000L)) + .build(), + new MockHttpTransport(), + HttpJsonMetadata.newBuilder().build(), + (result) -> {}); + + HttpRequest httpRequest = httpRequestRunnable.createHttpRequest(); + Truth.assertThat(httpRequest.getReadTimeout()).isEqualTo(30000L); + Truth.assertThat(httpRequest.getConnectTimeout()).isEqualTo(30000L); + } } diff --git a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/testing/MockHttpService.java b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/testing/MockHttpService.java index b2ebee14dc..86cc78fd7d 100644 --- a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/testing/MockHttpService.java +++ b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/testing/MockHttpService.java @@ -43,6 +43,7 @@ import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.Multimap; +import java.time.Duration; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -87,6 +88,11 @@ public synchronized void addResponse(Object response) { responseHandlers.add(new MessageResponseFactory(endpoint, serviceMethodDescriptors, response)); } + public synchronized void addResponse(Object response, Duration delay) { + responseHandlers.add( + new MessageResponseFactory(endpoint, serviceMethodDescriptors, response, delay)); + } + /** Add an expected null response (empty HTTP response body) with a custom status code. */ public synchronized void addNullResponse(int statusCode) { responseHandlers.add( @@ -182,16 +188,36 @@ private static class MessageResponseFactory implements HttpResponseFactory { private final List serviceMethodDescriptors; private final Object response; private final String endpoint; + private final Duration delay; public MessageResponseFactory( String endpoint, List serviceMethodDescriptors, Object response) { + this(endpoint, serviceMethodDescriptors, response, Duration.ofNanos(0)); + } + + public MessageResponseFactory( + String endpoint, + List serviceMethodDescriptors, + Object response, + Duration delay) { this.endpoint = endpoint; this.serviceMethodDescriptors = ImmutableList.copyOf(serviceMethodDescriptors); this.response = response; + this.delay = delay; } @Override public MockLowLevelHttpResponse getHttpResponse(String httpMethod, String fullTargetUrl) { + // We use Thread.sleep to mimic a long server response. Most tests should not + // require a sleep and can return a response immediately. + try { + long delayMs = delay.toMillis(); + if (delayMs > 0) { + Thread.sleep(delayMs); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } MockLowLevelHttpResponse httpResponse = new MockLowLevelHttpResponse(); String relativePath = getRelativePath(fullTargetUrl); diff --git a/gax-java/gax/src/main/java/com/google/api/gax/longrunning/OperationTimedPollAlgorithm.java b/gax-java/gax/src/main/java/com/google/api/gax/longrunning/OperationTimedPollAlgorithm.java index ddc49f3c0a..ec7e842e3d 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/longrunning/OperationTimedPollAlgorithm.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/longrunning/OperationTimedPollAlgorithm.java @@ -30,6 +30,7 @@ package com.google.api.gax.longrunning; import com.google.api.core.ApiClock; +import com.google.api.core.InternalApi; import com.google.api.core.NanoClock; import com.google.api.gax.retrying.ExponentialRetryAlgorithm; import com.google.api.gax.retrying.RetrySettings; @@ -78,4 +79,18 @@ public boolean shouldRetry(TimedAttemptSettings nextAttemptSettings) } throw new CancellationException(); } + + // Note: if the potential time spent is exactly equal to the totalTimeout, + // the attempt will still be allowed. This might not be desired, but if we + // enforce it, it could have potentially negative side effects on LRO polling. + // Specifically, if a polling retry attempt is denied, the LRO is canceled, and + // if a polling retry attempt is denied because its delay would *reach* the + // totalTimeout, the LRO would be canceled prematurely. The problem here is that + // totalTimeout doubles as the polling threshold and also the time limit for an + // operation to finish. + @InternalApi + @Override + protected boolean shouldRPCTerminate(long timeLeftMs) { + return timeLeftMs < 0; + } } diff --git a/gax-java/gax/src/main/java/com/google/api/gax/retrying/ExponentialRetryAlgorithm.java b/gax-java/gax/src/main/java/com/google/api/gax/retrying/ExponentialRetryAlgorithm.java index 26beb8f0bb..b035246746 100644 --- a/gax-java/gax/src/main/java/com/google/api/gax/retrying/ExponentialRetryAlgorithm.java +++ b/gax-java/gax/src/main/java/com/google/api/gax/retrying/ExponentialRetryAlgorithm.java @@ -32,6 +32,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.core.ApiClock; +import com.google.api.core.InternalApi; import java.util.concurrent.ThreadLocalRandom; import org.threeten.bp.Duration; @@ -150,7 +151,7 @@ public TimedAttemptSettings createNextAttempt(TimedAttemptSettings previousSetti // If timeLeft at this point is < 0, the shouldRetry logic will prevent // the attempt from being made as it would exceed the totalTimeout. A negative RPC timeout - // will result in a deadline in the past, which should will always fail prior to making a + // will result in a deadline in the past, which will always fail prior to making a // network call. newRpcTimeout = Math.min(newRpcTimeout, timeLeft.toMillis()); } @@ -197,10 +198,10 @@ public boolean shouldRetry(TimedAttemptSettings nextAttemptSettings) { RetrySettings globalSettings = nextAttemptSettings.getGlobalSettings(); int maxAttempts = globalSettings.getMaxAttempts(); - long totalTimeout = globalSettings.getTotalTimeout().toNanos(); + Duration totalTimeout = globalSettings.getTotalTimeout(); // If total timeout and maxAttempts is not set then do not attempt retry. - if (totalTimeout == 0 && maxAttempts == 0) { + if (totalTimeout.isZero() && maxAttempts == 0) { return false; } @@ -209,17 +210,13 @@ public boolean shouldRetry(TimedAttemptSettings nextAttemptSettings) { - nextAttemptSettings.getFirstAttemptStartTimeNanos() + nextAttemptSettings.getRandomizedRetryDelay().toNanos(); + Duration timeLeft = totalTimeout.minus(Duration.ofNanos(totalTimeSpentNanos)); + // Convert time spent to milliseconds to standardize the units being used for + // retries. Otherwise, we would be using nanoseconds to determine if retries + // should be attempted and milliseconds for retry delays and rpc timeouts + long timeLeftMs = timeLeft.toMillis(); // If totalTimeout limit is defined, check that it hasn't been crossed. - // - // Note: if the potential time spent is exactly equal to the totalTimeout, - // the attempt will still be allowed. This might not be desired, but if we - // enforce it, it could have potentially negative side effects on LRO polling. - // Specifically, if a polling retry attempt is denied, the LRO is canceled, and - // if a polling retry attempt is denied because its delay would *reach* the - // totalTimeout, the LRO would be canceled prematurely. The problem here is that - // totalTimeout doubles as the polling threshold and also the time limit for an - // operation to finish. - if (totalTimeout > 0 && totalTimeSpentNanos > totalTimeout) { + if (!totalTimeout.isZero() && shouldRPCTerminate(timeLeftMs)) { return false; } @@ -232,6 +229,15 @@ public boolean shouldRetry(TimedAttemptSettings nextAttemptSettings) { return true; } + // For non-LRO RPCs, do not attempt to retry if the totalTime spend is over + // the totalTimeout (timeout is in the past) or at the totalTimeout (timeout + // will occur immediately). For any other value, the deadlineScheduler will + // terminate in the future (even if the timeout is small). + @InternalApi + protected boolean shouldRPCTerminate(long timeLeftMs) { + return timeLeftMs <= 0; + } + /** * Returns {@code true} if another attempt should be made, or {@code false} otherwise. * diff --git a/gax-java/gax/src/test/java/com/google/api/gax/retrying/ExponentialRetryAlgorithmTest.java b/gax-java/gax/src/test/java/com/google/api/gax/retrying/ExponentialRetryAlgorithmTest.java index b81c8c95bd..d0c1ee3ed9 100644 --- a/gax-java/gax/src/test/java/com/google/api/gax/retrying/ExponentialRetryAlgorithmTest.java +++ b/gax-java/gax/src/test/java/com/google/api/gax/retrying/ExponentialRetryAlgorithmTest.java @@ -172,15 +172,26 @@ public void testShouldRetryFalseOnMaxAttempts() { assertFalse(algorithm.shouldRetry(attempt)); } + // First attempt runs at 0ms + // Second attempt runs at 60ms if shouldRetry is true + // - RPC timeout is 2ms and Time Left is 140ms (shouldRetry == true) + // Third attempt runs at 60ms if shouldRetry is true + // - RPC timeout is 4ms and Time Left is 120ms (shouldRetry == true) + // Fourth attempt runs at 60ms if shouldRetry is true + // - RPC timeout is 8ms and Time Left is 20ms (shouldRetry == true) + // Fifth attempt runs at 60ms if shouldRetry is true + // - RPC timeout is 8ms and Time Left is -40ms (shouldRetry == false) @Test public void testShouldRetryFalseOnMaxTimeout() { + // Simulate each attempt with 60ms of clock time. + // "attempt" = RPC Timeout + createNextAttempt() and shouldRetry() TimedAttemptSettings attempt = algorithm.createFirstAttempt(); - for (int i = 0; i < 4; i++) { + clock.incrementNanoTime(Duration.ofMillis(60L).toNanos()); + for (int i = 0; i < 3; i++) { assertTrue(algorithm.shouldRetry(attempt)); attempt = algorithm.createNextAttempt(attempt); clock.incrementNanoTime(Duration.ofMillis(60L).toNanos()); } - assertFalse(algorithm.shouldRetry(attempt)); } } diff --git a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITLongRunningOperation.java b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITLongRunningOperation.java new file mode 100644 index 0000000000..09dae05afc --- /dev/null +++ b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITLongRunningOperation.java @@ -0,0 +1,190 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.showcase.v1beta1.it; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.retrying.RetrySettings; +import com.google.protobuf.Timestamp; +import com.google.showcase.v1beta1.EchoClient; +import com.google.showcase.v1beta1.WaitMetadata; +import com.google.showcase.v1beta1.WaitRequest; +import com.google.showcase.v1beta1.WaitResponse; +import com.google.showcase.v1beta1.it.util.TestClientInitializer; +import java.util.concurrent.CancellationException; +import org.junit.Test; +import org.threeten.bp.Duration; +import org.threeten.bp.Instant; +import org.threeten.bp.temporal.ChronoUnit; + +/** + * For this test, we test a combination of various LRO RetrySettings and try to ensure that the + * calls are polling correctly. Each test attempts to test the number of attempts done in each call. + */ +public class ITLongRunningOperation { + + @Test + public void testGRPC_LROSuccessfulResponse_doesNotExceedTotalTimeout() throws Exception { + RetrySettings initialUnaryRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(3000L)) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ofMillis(3000L)) + .setTotalTimeout(Duration.ofMillis(3000L)) + .build(); + RetrySettings pollingRetrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(3000L)) + .setRetryDelayMultiplier(2.0) + .setMaxRetryDelay(Duration.ofMillis(5000L)) + .setInitialRpcTimeout(Duration.ZERO) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ZERO) + .setTotalTimeout(Duration.ofMillis(10000L)) + .build(); + try (EchoClient grpcClient = + TestClientInitializer.createGrpcEchoClientCustomWaitSettings( + initialUnaryRetrySettings, pollingRetrySettings)) { + long epochSecondsInFuture = Instant.now().plus(5, ChronoUnit.SECONDS).getEpochSecond(); + WaitRequest waitRequest = + WaitRequest.newBuilder() + .setSuccess(WaitResponse.newBuilder().setContent("gRPCWaitContent_5sDelay_noRetry")) + .setEndTime(Timestamp.newBuilder().setSeconds(epochSecondsInFuture).build()) + .build(); + OperationFuture operationFuture = + grpcClient.waitOperationCallable().futureCall(waitRequest); + WaitResponse waitResponse = operationFuture.get(); + assertThat(waitResponse.getContent()).isEqualTo("gRPCWaitContent_5sDelay_noRetry"); + int attemptCount = operationFuture.getPollingFuture().getAttemptSettings().getAttemptCount(); + assertThat(attemptCount).isAtLeast(2); + } + } + + @Test + public void testHttpJson_LROSuccessfulResponse_doesNotExceedTotalTimeout() throws Exception { + RetrySettings initialUnaryRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(3000L)) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ofMillis(3000L)) + .setTotalTimeout(Duration.ofMillis(3000L)) + .build(); + RetrySettings pollingRetrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(3000L)) + .setRetryDelayMultiplier(2.0) + .setMaxRetryDelay(Duration.ofMillis(5000L)) + .setInitialRpcTimeout(Duration.ZERO) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ZERO) + .setTotalTimeout(Duration.ofMillis(10000L)) + .build(); + try (EchoClient httpJsonClient = + TestClientInitializer.createHttpJsonEchoClientCustomWaitSettings( + initialUnaryRetrySettings, pollingRetrySettings)) { + long epochSecondsInFuture = Instant.now().plus(5, ChronoUnit.SECONDS).getEpochSecond(); + WaitRequest waitRequest = + WaitRequest.newBuilder() + .setSuccess( + WaitResponse.newBuilder().setContent("httpjsonWaitContent_5sDelay_noRetry")) + .setEndTime(Timestamp.newBuilder().setSeconds(epochSecondsInFuture).build()) + .build(); + OperationFuture operationFuture = + httpJsonClient.waitOperationCallable().futureCall(waitRequest); + WaitResponse waitResponse = operationFuture.get(); + assertThat(waitResponse.getContent()).isEqualTo("httpjsonWaitContent_5sDelay_noRetry"); + int attemptCount = operationFuture.getPollingFuture().getAttemptSettings().getAttemptCount(); + assertThat(attemptCount).isAtLeast(2); + } + } + + @Test + public void testGRPC_LROUnsuccessfulResponse_exceedsTotalTimeout_throwsDeadlineExceededException() + throws Exception { + RetrySettings initialUnaryRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(2000L)) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ofMillis(2000L)) + .setTotalTimeout(Duration.ofMillis(2000L)) + .build(); + RetrySettings pollingRetrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(1000L)) + .setRetryDelayMultiplier(2.0) + .setMaxRetryDelay(Duration.ofMillis(3000L)) + .setInitialRpcTimeout(Duration.ZERO) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ZERO) + .setTotalTimeout(Duration.ofMillis(5000L)) + .build(); + try (EchoClient grpcClient = + TestClientInitializer.createGrpcEchoClientCustomWaitSettings( + initialUnaryRetrySettings, pollingRetrySettings)) { + long epochSecondsInFuture = Instant.now().plus(6, ChronoUnit.SECONDS).getEpochSecond(); + WaitRequest waitRequest = + WaitRequest.newBuilder() + .setSuccess(WaitResponse.newBuilder().setContent("gRPCWaitContent_6sDelay")) + .setEndTime(Timestamp.newBuilder().setSeconds(epochSecondsInFuture).build()) + .build(); + OperationFuture operationFuture = + grpcClient.waitOperationCallable().futureCall(waitRequest); + assertThrows(CancellationException.class, operationFuture::get); + int attemptCount = operationFuture.getPollingFuture().getAttemptSettings().getAttemptCount(); + assertThat(attemptCount).isGreaterThan(1); + } + } + + @Test + public void + testHttpJson_LROUnsuccessfulResponse_exceedsTotalTimeout_throwsDeadlineExceededException() + throws Exception { + RetrySettings initialUnaryRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(2000L)) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ofMillis(2000L)) + .setTotalTimeout(Duration.ofMillis(2000L)) + .build(); + RetrySettings pollingRetrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(1000L)) + .setRetryDelayMultiplier(2.0) + .setMaxRetryDelay(Duration.ofMillis(3000L)) + .setInitialRpcTimeout(Duration.ZERO) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ZERO) + .setTotalTimeout(Duration.ofMillis(5000L)) + .build(); + try (EchoClient httpJsonClient = + TestClientInitializer.createHttpJsonEchoClientCustomWaitSettings( + initialUnaryRetrySettings, pollingRetrySettings)) { + long epochSecondsInFuture = Instant.now().plus(6, ChronoUnit.SECONDS).getEpochSecond(); + WaitRequest waitRequest = + WaitRequest.newBuilder() + .setSuccess(WaitResponse.newBuilder().setContent("httpjsonWaitContent_6sDelay")) + .setEndTime(Timestamp.newBuilder().setSeconds(epochSecondsInFuture).build()) + .build(); + OperationFuture operationFuture = + httpJsonClient.waitOperationCallable().futureCall(waitRequest); + assertThrows(CancellationException.class, operationFuture::get); + int attemptCount = operationFuture.getPollingFuture().getAttemptSettings().getAttemptCount(); + assertThat(attemptCount).isGreaterThan(1); + } + } +} diff --git a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITUnaryDeadline.java b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITUnaryDeadline.java new file mode 100644 index 0000000000..1f7e99a372 --- /dev/null +++ b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITUnaryDeadline.java @@ -0,0 +1,330 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.showcase.v1beta1.it; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.rpc.DeadlineExceededException; +import com.google.api.gax.rpc.StatusCode; +import com.google.common.collect.ImmutableSet; +import com.google.showcase.v1beta1.BlockRequest; +import com.google.showcase.v1beta1.BlockResponse; +import com.google.showcase.v1beta1.EchoClient; +import com.google.showcase.v1beta1.it.util.TestClientInitializer; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.threeten.bp.Duration; + +/** + * For this test, we test a combination of various RetrySettings to try and ensure that the timeouts + * set by the customer are cancelled when timeouts have exceeded their limits + * + *

Each test attempts to get the number of attempts done in each call. The attemptCount is + * incremented by 1 as the first attempt is zero indexed. + */ +public class ITUnaryDeadline { + + @Test + public void testGRPC_unarySuccessfulResponse_doesNotExceedTotalTimeout() throws Exception { + RetrySettings defaultNoRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(5000L)) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ofMillis(5000L)) + .setTotalTimeout(Duration.ofMillis(5000L)) + // Explicitly set retries as disabled (maxAttempts == 1) + .setMaxAttempts(1) + .build(); + try (EchoClient grpcClient = + TestClientInitializer.createGrpcEchoClientCustomBlockSettings( + defaultNoRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED))) { + BlockRequest blockRequest = + BlockRequest.newBuilder() + .setSuccess(BlockResponse.newBuilder().setContent("gRPCBlockContent_3sDelay_noRetry")) + .setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(3).build()) + .build(); + RetryingFuture retryingFuture = + (RetryingFuture) grpcClient.blockCallable().futureCall(blockRequest); + BlockResponse blockResponse = retryingFuture.get(10, TimeUnit.SECONDS); + assertThat(blockResponse.getContent()).isEqualTo("gRPCBlockContent_3sDelay_noRetry"); + // Guarantee that this only runs once + int attemptCount = retryingFuture.getAttemptSettings().getAttemptCount() + 1; + assertThat(attemptCount).isEqualTo(1); + } + } + + @Test + public void testHttpJson_unarySuccessfulResponse_doesNotExceedTotalTimeout() throws Exception { + RetrySettings defaultNoRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(5000L)) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ofMillis(5000L)) + .setTotalTimeout(Duration.ofMillis(5000L)) + // Explicitly set retries as disabled (maxAttempts == 1) + .setMaxAttempts(1) + .build(); + try (EchoClient httpJsonClient = + TestClientInitializer.createHttpJsonEchoClientCustomBlockSettings( + defaultNoRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED))) { + BlockRequest blockRequest = + BlockRequest.newBuilder() + .setSuccess( + BlockResponse.newBuilder().setContent("httpjsonBlockContent_3sDelay_noRetry")) + .setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(3).build()) + .build(); + RetryingFuture retryingFuture = + (RetryingFuture) httpJsonClient.blockCallable().futureCall(blockRequest); + BlockResponse blockResponse = retryingFuture.get(10, TimeUnit.SECONDS); + assertThat(blockResponse.getContent()).isEqualTo("httpjsonBlockContent_3sDelay_noRetry"); + // Guarantee that this only runs once + int attemptCount = retryingFuture.getAttemptSettings().getAttemptCount() + 1; + assertThat(attemptCount).isEqualTo(1); + } + } + + // Retry is configured by setting the initial RPC timeout (1.5s) to be less than + // the RPC delay (2s). The next RPC timeout (3s) will wait long enough for the delay. + @Test + public void testGRPC_unarySuccessfulResponse_exceedsRPCDeadlineButWithinTotalTimeout() + throws Exception { + RetrySettings defaultRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(1500L)) + .setRpcTimeoutMultiplier(2.0) + .setMaxRpcTimeout(Duration.ofMillis(3000L)) + .setTotalTimeout(Duration.ofMillis(5000L)) + .build(); + try (EchoClient grpcClient = + TestClientInitializer.createGrpcEchoClientCustomBlockSettings( + defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED))) { + BlockRequest blockRequest = + BlockRequest.newBuilder() + .setSuccess(BlockResponse.newBuilder().setContent("gRPCBlockContent_2sDelay_Retry")) + .setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(2).build()) + .build(); + RetryingFuture retryingFuture = + (RetryingFuture) grpcClient.blockCallable().futureCall(blockRequest); + BlockResponse blockResponse = retryingFuture.get(10, TimeUnit.SECONDS); + assertThat(blockResponse.getContent()).isEqualTo("gRPCBlockContent_2sDelay_Retry"); + // Guarantee that this only runs twice + int attemptCount = retryingFuture.getAttemptSettings().getAttemptCount() + 1; + assertThat(attemptCount).isEqualTo(2); + } + } + + // Retry is configured by setting the initial RPC timeout (1.5s) to be less than + // the RPC delay (2s). The next RPC timeout (3s) will wait long enough for the delay. + @Test + public void testHttpJson_unarySuccessfulResponse_exceedsRPCDeadlineButWithinTotalTimeout() + throws Exception { + RetrySettings defaultRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(1500L)) + .setRpcTimeoutMultiplier(2.0) + .setMaxRpcTimeout(Duration.ofMillis(3000L)) + .setTotalTimeout(Duration.ofMillis(5000L)) + .build(); + try (EchoClient httpJsonClient = + TestClientInitializer.createHttpJsonEchoClientCustomBlockSettings( + defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED))) { + BlockRequest blockRequest = + BlockRequest.newBuilder() + .setSuccess( + BlockResponse.newBuilder().setContent("httpjsonBlockContent_2sDelay_Retry")) + .setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(2).build()) + .build(); + RetryingFuture retryingFuture = + (RetryingFuture) httpJsonClient.blockCallable().futureCall(blockRequest); + BlockResponse blockResponse = retryingFuture.get(10, TimeUnit.SECONDS); + assertThat(blockResponse.getContent()).isEqualTo("httpjsonBlockContent_2sDelay_Retry"); + // Guarantee that this only runs twice + int attemptCount = retryingFuture.getAttemptSettings().getAttemptCount() + 1; + assertThat(attemptCount).isEqualTo(2); + } + } + + // Request is set to block for 6 seconds to allow the RPC to timeout. If retries are + // disabled, the RPC timeout is set to be the totalTimeout (5s). + @Test + public void + testGRPC_unaryUnsuccessfulResponse_exceedsRPCTimeoutAndTotalTimeout_throwsDeadlineExceededException() + throws Exception { + RetrySettings defaultNoRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(5000L)) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ofMillis(5000L)) + .setTotalTimeout(Duration.ofMillis(5000L)) + // Explicitly set retries as disabled (maxAttempts == 1) + .setMaxAttempts(1) + .build(); + try (EchoClient grpcClient = + TestClientInitializer.createGrpcEchoClientCustomBlockSettings( + defaultNoRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED))) { + BlockRequest blockRequest = + BlockRequest.newBuilder() + .setSuccess(BlockResponse.newBuilder().setContent("gRPCBlockContent_6sDelay_noRetry")) + .setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(6).build()) + .build(); + RetryingFuture retryingFuture = + (RetryingFuture) grpcClient.blockCallable().futureCall(blockRequest); + ExecutionException exception = + assertThrows(ExecutionException.class, () -> retryingFuture.get(10, TimeUnit.SECONDS)); + assertThat(exception.getCause()).isInstanceOf(DeadlineExceededException.class); + DeadlineExceededException deadlineExceededException = + (DeadlineExceededException) exception.getCause(); + assertThat(deadlineExceededException.getStatusCode().getCode()) + .isEqualTo(StatusCode.Code.DEADLINE_EXCEEDED); + // We can guarantee that this only runs once + int attemptCount = retryingFuture.getAttemptSettings().getAttemptCount() + 1; + assertThat(attemptCount).isEqualTo(1); + } + } + + // Request is set to block for 6 seconds to allow the RPC to timeout. If retries are + // disabled, the RPC timeout is set to be the totalTimeout (5s). + @Test + public void + testHttpJson_unaryUnsuccessfulResponse_exceedsRPCTimeoutAndTotalTimeout_throwsDeadlineExceededException() + throws Exception { + RetrySettings defaultNoRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(5000L)) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ofMillis(5000L)) + .setTotalTimeout(Duration.ofMillis(5000L)) + // Explicitly set retries as disabled (maxAttempts == 1) + .setMaxAttempts(1) + .build(); + try (EchoClient httpJsonClient = + TestClientInitializer.createHttpJsonEchoClientCustomBlockSettings( + defaultNoRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED))) { + BlockRequest blockRequest = + BlockRequest.newBuilder() + .setSuccess( + BlockResponse.newBuilder().setContent("httpjsonBlockContent_6sDelay_noRetry")) + .setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(6).build()) + .build(); + RetryingFuture retryingFuture = + (RetryingFuture) httpJsonClient.blockCallable().futureCall(blockRequest); + ExecutionException exception = + assertThrows(ExecutionException.class, () -> retryingFuture.get(10, TimeUnit.SECONDS)); + assertThat(exception.getCause()).isInstanceOf(DeadlineExceededException.class); + DeadlineExceededException deadlineExceededException = + (DeadlineExceededException) exception.getCause(); + assertThat(deadlineExceededException.getStatusCode().getCode()) + .isEqualTo(StatusCode.Code.DEADLINE_EXCEEDED); + // We can guarantee that this only runs once + int attemptCount = retryingFuture.getAttemptSettings().getAttemptCount() + 1; + assertThat(attemptCount).isEqualTo(1); + } + } + + // The purpose of this test is to ensure that the deadlineScheduleExecutor is able + // to properly cancel the HttpRequest for each retry attempt. This test attempts to + // make a call every 100ms for 1 second. If the requestRunnable blocks until we + // receive a response from the server (200ms) regardless of it was cancelled, then + // we would expect at most 50 responses. + @Test + public void + testGRPC_unaryCallableRetry_deadlineExecutorTimesOutRequest_throwsDeadlineExceededException() + throws Exception { + RetrySettings defaultRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(100L)) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ofMillis(100L)) + .setTotalTimeout(Duration.ofMillis(1000L)) + .build(); + try (EchoClient grpcClient = + TestClientInitializer.createGrpcEchoClientCustomBlockSettings( + defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED))) { + BlockRequest blockRequest = + BlockRequest.newBuilder() + .setSuccess( + BlockResponse.newBuilder().setContent("gRPCBlockContent_200msDelay_Retry")) + // Set the timeout to be longer than the RPC timeout + .setResponseDelay( + com.google.protobuf.Duration.newBuilder().setNanos(200000000).build()) + .build(); + RetryingFuture retryingFuture = + (RetryingFuture) grpcClient.blockCallable().futureCall(blockRequest); + ExecutionException exception = + assertThrows(ExecutionException.class, () -> retryingFuture.get(10, TimeUnit.SECONDS)); + assertThat(exception.getCause()).isInstanceOf(DeadlineExceededException.class); + DeadlineExceededException deadlineExceededException = + (DeadlineExceededException) exception.getCause(); + assertThat(deadlineExceededException.getStatusCode().getCode()) + .isEqualTo(StatusCode.Code.DEADLINE_EXCEEDED); + // We cannot guarantee the number of attempts. The RetrySettings should be configured + // such that there is no delay between the attempts, but the execution takes time + // to run. Theoretically this should run exactly 100 times. + int attemptCount = retryingFuture.getAttemptSettings().getAttemptCount() + 1; + assertThat(attemptCount).isGreaterThan(5); + assertThat(attemptCount).isAtMost(10); + } + } + + // The purpose of this test is to ensure that the deadlineScheduleExecutor is able + // to properly cancel the HttpRequest for each retry attempt. This test attempts to + // make a call every 100ms for 1 second. If the requestRunnable blocks until we + // receive a response from the server (200ms) regardless of it was cancelled, then + // we would expect at most 50 responses. + @Test + public void + testHttpJson_unaryCallableRetry_deadlineExecutorTimesOutRequest_throwsDeadlineExceededException() + throws Exception { + RetrySettings defaultRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(100L)) + .setRpcTimeoutMultiplier(1.0) + .setMaxRpcTimeout(Duration.ofMillis(100L)) + .setTotalTimeout(Duration.ofMillis(10000L)) + .build(); + try (EchoClient httpJsonClient = + TestClientInitializer.createHttpJsonEchoClientCustomBlockSettings( + defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED))) { + BlockRequest blockRequest = + BlockRequest.newBuilder() + .setSuccess( + BlockResponse.newBuilder().setContent("httpjsonBlockContent_200msDelay_Retry")) + // Set the timeout to be longer than the RPC timeout + .setResponseDelay( + com.google.protobuf.Duration.newBuilder().setNanos(200000000).build()) + .build(); + RetryingFuture retryingFuture = + (RetryingFuture) httpJsonClient.blockCallable().futureCall(blockRequest); + ExecutionException exception = + assertThrows(ExecutionException.class, () -> retryingFuture.get(15, TimeUnit.SECONDS)); + assertThat(exception.getCause()).isInstanceOf(DeadlineExceededException.class); + DeadlineExceededException deadlineExceededException = + (DeadlineExceededException) exception.getCause(); + assertThat(deadlineExceededException.getStatusCode().getCode()) + .isEqualTo(StatusCode.Code.DEADLINE_EXCEEDED); + // We cannot guarantee the number of attempts. The RetrySettings should be configured + // such that there is no delay between the attempts, but the execution takes time + // to run. Theoretically this should run exactly 100 times. + int attemptCount = retryingFuture.getAttemptSettings().getAttemptCount() + 1; + assertThat(attemptCount).isGreaterThan(80); + assertThat(attemptCount).isAtMost(100); + } + } +} diff --git a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/util/TestClientInitializer.java b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/util/TestClientInitializer.java index bcd8862236..4c1b7b00d2 100644 --- a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/util/TestClientInitializer.java +++ b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/util/TestClientInitializer.java @@ -18,11 +18,19 @@ import com.google.api.client.http.javanet.NetHttpTransport; import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.longrunning.OperationSnapshot; +import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.UnaryCallSettings; import com.google.showcase.v1beta1.EchoClient; import com.google.showcase.v1beta1.EchoSettings; import com.google.showcase.v1beta1.IdentityClient; import com.google.showcase.v1beta1.IdentitySettings; +import com.google.showcase.v1beta1.WaitRequest; +import com.google.showcase.v1beta1.stub.EchoStubSettings; import io.grpc.ManagedChannelBuilder; +import java.util.Set; public class TestClientInitializer { @@ -52,6 +60,98 @@ public static EchoClient createHttpJsonEchoClient() throws Exception { return EchoClient.create(httpJsonEchoSettings); } + public static EchoClient createGrpcEchoClientCustomBlockSettings( + RetrySettings retrySettings, Set retryableCodes) throws Exception { + EchoStubSettings.Builder grpcEchoSettingsBuilder = EchoStubSettings.newBuilder(); + grpcEchoSettingsBuilder + .blockSettings() + .setRetrySettings(retrySettings) + .setRetryableCodes(retryableCodes); + EchoSettings grpcEchoSettings = EchoSettings.create(grpcEchoSettingsBuilder.build()); + grpcEchoSettings = + grpcEchoSettings + .toBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider( + EchoSettings.defaultGrpcTransportProviderBuilder() + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build()) + .build(); + return EchoClient.create(grpcEchoSettings); + } + + public static EchoClient createHttpJsonEchoClientCustomBlockSettings( + RetrySettings retrySettings, Set retryableCodes) throws Exception { + EchoStubSettings.Builder httpJsonEchoSettingsBuilder = EchoStubSettings.newHttpJsonBuilder(); + httpJsonEchoSettingsBuilder + .blockSettings() + .setRetrySettings(retrySettings) + .setRetryableCodes(retryableCodes); + EchoSettings httpJsonEchoSettings = EchoSettings.create(httpJsonEchoSettingsBuilder.build()); + httpJsonEchoSettings = + httpJsonEchoSettings + .toBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider( + EchoSettings.defaultHttpJsonTransportProviderBuilder() + .setHttpTransport( + new NetHttpTransport.Builder().doNotValidateCertificate().build()) + .setEndpoint("http://localhost:7469") + .build()) + .build(); + return EchoClient.create(httpJsonEchoSettings); + } + + public static EchoClient createGrpcEchoClientCustomWaitSettings( + RetrySettings initialUnaryRetrySettings, RetrySettings pollingRetrySettings) + throws Exception { + EchoStubSettings.Builder grpcEchoSettingsBuilder = EchoStubSettings.newBuilder(); + grpcEchoSettingsBuilder + .waitOperationSettings() + .setInitialCallSettings( + UnaryCallSettings.newUnaryCallSettingsBuilder() + .setRetrySettings(initialUnaryRetrySettings) + .build()) + .setPollingAlgorithm(OperationTimedPollAlgorithm.create(pollingRetrySettings)); + EchoSettings grpcEchoSettings = EchoSettings.create(grpcEchoSettingsBuilder.build()); + grpcEchoSettings = + grpcEchoSettings + .toBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider( + EchoSettings.defaultGrpcTransportProviderBuilder() + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build()) + .build(); + return EchoClient.create(grpcEchoSettings); + } + + public static EchoClient createHttpJsonEchoClientCustomWaitSettings( + RetrySettings initialUnaryRetrySettings, RetrySettings pollingRetrySettings) + throws Exception { + EchoStubSettings.Builder httpJsonEchoSettingsBuilder = EchoStubSettings.newHttpJsonBuilder(); + httpJsonEchoSettingsBuilder + .waitOperationSettings() + .setInitialCallSettings( + UnaryCallSettings.newUnaryCallSettingsBuilder() + .setRetrySettings(initialUnaryRetrySettings) + .build()) + .setPollingAlgorithm(OperationTimedPollAlgorithm.create(pollingRetrySettings)); + EchoSettings httpJsonEchoSettings = EchoSettings.create(httpJsonEchoSettingsBuilder.build()); + httpJsonEchoSettings = + httpJsonEchoSettings + .toBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider( + EchoSettings.defaultHttpJsonTransportProviderBuilder() + .setHttpTransport( + new NetHttpTransport.Builder().doNotValidateCertificate().build()) + .setEndpoint("http://localhost:7469") + .build()) + .build(); + return EchoClient.create(httpJsonEchoSettings); + } + public static IdentityClient createGrpcIdentityClient() throws Exception { IdentitySettings grpcIdentitySettings = IdentitySettings.newHttpJsonBuilder() diff --git a/showcase/pom.xml b/showcase/pom.xml index 7107ac6b97..e4b79db518 100644 --- a/showcase/pom.xml +++ b/showcase/pom.xml @@ -100,7 +100,5 @@ - -