From 5a12cd29601253423c5738be5471a036fd0334be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 31 Jul 2023 20:23:52 +0200 Subject: [PATCH] fix: apply stream wait timeout (#2544) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: apply stream wait timeout Use the streamWaitTimeout that has been set on the call context when polling from the gRPC stream. This prevents the stream from blocking forever if for some reason the stream is no longer delivering data, and also no error is propagated to the client. The default stream wait timeout that is set for all call contexts is 30 mins. This value can be overridden by configuring a custom call context for a specific query. Fixes #2494 * test: add a wait time to the mock server to ensure that a timeout occurs * chore: add clirr ignore * docs: add test + comment for zero timeout * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --- .../clirr-ignored-differences.xml | 7 +++ .../cloud/spanner/AbstractResultSet.java | 34 +++++++++-- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 56 ++++++++--------- .../cloud/spanner/spi/v1/SpannerRpc.java | 4 ++ .../cloud/spanner/DatabaseClientImplTest.java | 60 +++++++++++++++++++ .../cloud/spanner/GrpcResultSetTest.java | 17 ++++++ .../cloud/spanner/ReadFormatTestRunner.java | 7 +++ .../google/cloud/spanner/SessionImplTest.java | 7 +++ 8 files changed, 159 insertions(+), 33 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 74d6d823ae..1319b511fd 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -359,4 +359,11 @@ com/google/cloud/spanner/connection/Connection boolean isDelayTransactionStartUntilFirstWrite() + + + + 7012 + com/google/cloud/spanner/spi/v1/SpannerRpc$StreamingCall + com.google.api.gax.rpc.ApiCallContext getCallContext() + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 4d9ec1cda0..37024bd267 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -25,6 +25,7 @@ import com.google.api.client.util.BackOff; import com.google.api.client.util.ExponentialBackOff; import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.ApiCallContext; import com.google.cloud.ByteArray; import com.google.cloud.Date; import com.google.cloud.Timestamp; @@ -74,6 +75,7 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.threeten.bp.Duration; /** Implementation of {@link ResultSet}. */ abstract class AbstractResultSet extends AbstractStructReader implements ResultSet { @@ -944,6 +946,8 @@ static class GrpcStreamIterator extends AbstractIterator private SpannerRpc.StreamingCall call; private volatile boolean withBeginTransaction; + private TimeUnit streamWaitTimeoutUnit; + private long streamWaitTimeoutValue; private SpannerException error; @VisibleForTesting @@ -965,6 +969,22 @@ protected final SpannerRpc.ResultStreamConsumer consumer() { public void setCall(SpannerRpc.StreamingCall call, boolean withBeginTransaction) { this.call = call; this.withBeginTransaction = withBeginTransaction; + ApiCallContext callContext = call.getCallContext(); + Duration streamWaitTimeout = callContext == null ? null : callContext.getStreamWaitTimeout(); + if (streamWaitTimeout != null) { + // Determine the timeout unit to use. This reduces the precision to seconds if the timeout + // value is more than 1 second, which is lower than the precision that would normally be + // used by the stream watchdog (which uses a precision of 10 seconds by default). + if (streamWaitTimeout.getSeconds() > 0L) { + streamWaitTimeoutValue = streamWaitTimeout.getSeconds(); + streamWaitTimeoutUnit = TimeUnit.SECONDS; + } else if (streamWaitTimeout.getNano() > 0) { + streamWaitTimeoutValue = streamWaitTimeout.getNano(); + streamWaitTimeoutUnit = TimeUnit.NANOSECONDS; + } + // Note that if the stream-wait-timeout is zero, we won't set a timeout at all. + // That is consistent with ApiCallContext#withStreamWaitTimeout(Duration.ZERO). + } } @Override @@ -983,11 +1003,15 @@ public boolean isWithBeginTransaction() { protected final PartialResultSet computeNext() { PartialResultSet next; try { - // TODO: Ideally honor io.grpc.Context while blocking here. In practice, - // cancellation/deadline results in an error being delivered to "stream", which - // should mean that we do not block significantly longer afterwards, but it would - // be more robust to use poll() with a timeout. - next = stream.take(); + if (streamWaitTimeoutUnit != null) { + next = stream.poll(streamWaitTimeoutValue, streamWaitTimeoutUnit); + if (next == null) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.DEADLINE_EXCEEDED, "stream wait timeout"); + } + } else { + next = stream.take(); + } } catch (InterruptedException e) { // Treat interrupt as a request to cancel the read. throw SpannerExceptionFactory.propagateInterrupt(e); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 5395409782..1afea7676d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -1596,20 +1596,7 @@ public StreamingCall read( options, request.getSession(), request, SpannerGrpc.getReadMethod(), routeToLeader); SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer); spannerStub.streamingReadCallable().call(request, responseObserver, context); - final StreamController controller = responseObserver.getController(); - return new StreamingCall() { - @Override - public void request(int numMessage) { - controller.request(numMessage); - } - - // TODO(hzyi): streamController currently does not support cancel with message. Add - // this in gax and update this method later - @Override - public void cancel(String message) { - controller.cancel(); - } - }; + return new GrpcStreamingCall(context, responseObserver.getController()); } @Override @@ -1673,22 +1660,10 @@ public StreamingCall executeQuery( request, SpannerGrpc.getExecuteStreamingSqlMethod(), routeToLeader); + SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer); spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context); - final StreamController controller = responseObserver.getController(); - return new StreamingCall() { - @Override - public void request(int numMessage) { - controller.request(numMessage); - } - - // TODO(hzyi): streamController currently does not support cancel with message. Add - // this in gax and update this method later - @Override - public void cancel(String message) { - controller.cancel(); - } - }; + return new GrpcStreamingCall(context, responseObserver.getController()); } @Override @@ -1957,6 +1932,31 @@ public boolean isClosed() { return rpcIsClosed; } + private static final class GrpcStreamingCall implements StreamingCall { + private final ApiCallContext callContext; + private final StreamController controller; + + GrpcStreamingCall(ApiCallContext callContext, StreamController controller) { + this.callContext = callContext; + this.controller = controller; + } + + @Override + public ApiCallContext getCallContext() { + return callContext; + } + + @Override + public void request(int numMessages) { + controller.request(numMessages); + } + + @Override + public void cancel(@Nullable String message) { + controller.cancel(); + } + } + /** * A {@code ResponseObserver} that exposes the {@code StreamController} and delegates callbacks to * the {@link ResultStreamConsumer}. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index 27adf89a23..62c34a58a1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -20,6 +20,7 @@ import com.google.api.core.InternalApi; import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ServerStream; import com.google.cloud.ServiceRpc; import com.google.cloud.spanner.BackupId; @@ -150,6 +151,9 @@ interface ResultStreamConsumer { /** Handle for cancellation of a streaming read or query call. */ interface StreamingCall { + /** Returns the {@link ApiCallContext} that is used for this streaming call. */ + ApiCallContext getCallContext(); + /** * Requests more messages from the stream. We disable the auto flow control mechanism in grpc, * so we need to request messages ourself. This gives us more control over how much buffer we diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 0b88edc7f6..53bb30dba7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -38,6 +38,7 @@ import com.google.api.core.ApiFutures; import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.ApiCallContext; import com.google.cloud.ByteArray; import com.google.cloud.NoCredentials; import com.google.cloud.Timestamp; @@ -51,6 +52,7 @@ import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; +import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator; import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator; import com.google.cloud.spanner.Type.Code; import com.google.cloud.spanner.connection.RandomResultSetGenerator; @@ -77,6 +79,7 @@ import com.google.spanner.v1.TypeAnnotationCode; import com.google.spanner.v1.TypeCode; import io.grpc.Context; +import io.grpc.MethodDescriptor; import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -2963,6 +2966,63 @@ public void testStatementWithBytesArrayParameter() { } } + @Test + public void testStreamWaitTimeout() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + // Add a wait time to the mock server. Note that the test won't actually wait 100ms, as it uses + // a 1ns time out. + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0)); + // Create a custom call configuration that uses a 1 nanosecond stream timeout value. This will + // always time out, as a call to the mock server will always take more than 1 nanosecond. + CallContextConfigurator configurator = + new CallContextConfigurator() { + @Override + public ApiCallContext configure( + ApiCallContext context, ReqT request, MethodDescriptor method) { + return context.withStreamWaitTimeout(Duration.ofNanos(1L)); + } + }; + Context context = + Context.current().withValue(SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY, configurator); + context.run( + () -> { + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) { + SpannerException exception = assertThrows(SpannerException.class, resultSet::next); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode()); + assertTrue( + exception.getMessage(), exception.getMessage().contains("stream wait timeout")); + } + }); + } + + @Test + public void testZeroStreamWaitTimeout() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + // Create a custom call configuration that sets the stream timeout to zero. + // This should disable the timeout. + CallContextConfigurator configurator = + new CallContextConfigurator() { + @Override + public ApiCallContext configure( + ApiCallContext context, ReqT request, MethodDescriptor method) { + return context.withStreamWaitTimeout(Duration.ZERO); + } + }; + Context context = + Context.current().withValue(SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY, configurator); + context.run( + () -> { + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) { + // A zero timeout should not cause a timeout, and instead be ignored. + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + } + }); + } + static void assertAsString(String expected, ResultSet resultSet, int col) { assertEquals(expected, resultSet.getValue(col).getAsString()); assertEquals(ImmutableList.of(expected), resultSet.getValue(col).getAsStringList()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java index e35ecf9ad9..82300a9309 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; import com.google.cloud.ByteArray; import com.google.cloud.Date; import com.google.cloud.Timestamp; @@ -50,6 +52,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; /** Unit tests for {@link com.google.cloud.spanner.AbstractResultSet.GrpcResultSet}. */ @RunWith(JUnit4.class) @@ -58,6 +61,7 @@ public class GrpcResultSetTest { private AbstractResultSet.GrpcResultSet resultSet; private SpannerRpc.ResultStreamConsumer consumer; private AbstractResultSet.GrpcStreamIterator stream; + private final Duration streamWaitTimeout = Duration.ofNanos(1L); private static class NoOpListener implements AbstractResultSet.Listener { @Override @@ -78,6 +82,11 @@ public void setUp() { stream = new AbstractResultSet.GrpcStreamIterator(10); stream.setCall( new SpannerRpc.StreamingCall() { + @Override + public ApiCallContext getCallContext() { + return GrpcCallContext.createDefault().withStreamWaitTimeout(streamWaitTimeout); + } + @Override public void cancel(@Nullable String message) {} @@ -93,6 +102,14 @@ public AbstractResultSet.GrpcResultSet resultSetWithMode(QueryMode queryMode) { return new AbstractResultSet.GrpcResultSet(stream, new NoOpListener()); } + @Test + public void testStreamTimeout() { + // We don't add any results to the stream. That means that it will time out after 1ns. + SpannerException exception = assertThrows(SpannerException.class, resultSet::next); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode()); + assertTrue(exception.getMessage(), exception.getMessage().contains("stream wait timeout")); + } + @Test public void metadata() { Type rowType = Type.struct(Type.StructField.of("f", Type.string())); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java index d8d39e6931..af558d14dd 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java @@ -18,6 +18,8 @@ import static com.google.common.truth.Truth.assertThat; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; import com.google.cloud.ByteArray; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.io.Resources; @@ -115,6 +117,11 @@ private void run() throws Exception { stream = new AbstractResultSet.GrpcStreamIterator(10); stream.setCall( new SpannerRpc.StreamingCall() { + @Override + public ApiCallContext getCallContext() { + return GrpcCallContext.createDefault(); + } + @Override public void cancel(@Nullable String message) {} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 90e9a684d9..aa3fe53046 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -27,7 +27,9 @@ import com.google.api.core.ApiFutures; import com.google.api.core.NanoClock; +import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.ApiCallContext; import com.google.cloud.Timestamp; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; @@ -407,6 +409,11 @@ public void singleUseReadOnlyTransactionReturnsEmptyTransactionMetadata() { } private static class NoOpStreamingCall implements SpannerRpc.StreamingCall { + @Override + public ApiCallContext getCallContext() { + return GrpcCallContext.createDefault(); + } + @Override public void cancel(@Nullable String message) {}