From ef5ded568e1d4843946f235a82ddfda64f29aa2b Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Mon, 4 Nov 2024 00:15:54 +0530 Subject: [PATCH 1/6] fix: Avoid blocking thread in AsyncResultSet --- .../clirr-ignored-differences.xml | 6 +- .../cloud/spanner/AbstractReadContext.java | 14 ++- .../cloud/spanner/AbstractResultSet.java | 4 + .../google/cloud/spanner/AsyncResultSet.java | 21 ++++ .../cloud/spanner/AsyncResultSetImpl.java | 104 ++++++++++++++---- .../cloud/spanner/ForwardingResultSet.java | 5 + .../google/cloud/spanner/GrpcResultSet.java | 5 + .../cloud/spanner/GrpcStreamIterator.java | 25 ++++- .../cloud/spanner/GrpcValueIterator.java | 4 + .../com/google/cloud/spanner/ResultSet.java | 8 ++ .../spanner/ResumableStreamIterator.java | 37 +++++-- .../com/google/cloud/spanner/SessionPool.java | 29 +++++ .../cloud/spanner/AsyncResultSetImplTest.java | 92 +++++++++++++++- .../spanner/ResumableStreamIteratorTest.java | 54 ++++----- 14 files changed, 337 insertions(+), 71 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index ec13415790c..a9d82988314 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -790,5 +790,9 @@ com/google/cloud/spanner/connection/Connection boolean isAutoBatchDmlUpdateCountVerification() - + + 7012 + com/google/cloud/spanner/ResultSet + boolean initiateStreaming(com.google.cloud.spanner.AsyncResultSet$StreamMessageListener) + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index a89090e34d9..3cb7df74c0e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -768,9 +768,12 @@ ResultSet executeQueryInternalWithOptions( rpc.getExecuteQueryRetrySettings(), rpc.getExecuteQueryRetryableCodes()) { @Override - CloseableIterator startStream(@Nullable ByteString resumeToken) { + CloseableIterator startStream( + @Nullable ByteString resumeToken, + AsyncResultSet.StreamMessageListener streamListener) { GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed); + stream.registerListener(streamListener); if (partitionToken != null) { request.setPartitionToken(partitionToken); } @@ -791,8 +794,8 @@ CloseableIterator startStream(@Nullable ByteString resumeToken getTransactionChannelHint(), isRouteToLeader()); session.markUsed(clock.instant()); - call.request(prefetchChunks); stream.setCall(call, request.getTransaction().hasBegin()); + call.request(prefetchChunks); return stream; } @@ -959,9 +962,12 @@ ResultSet readInternalWithOptions( rpc.getReadRetrySettings(), rpc.getReadRetryableCodes()) { @Override - CloseableIterator startStream(@Nullable ByteString resumeToken) { + CloseableIterator startStream( + @Nullable ByteString resumeToken, + AsyncResultSet.StreamMessageListener streamListener) { GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed); + stream.registerListener(streamListener); TransactionSelector selector = null; if (resumeToken != null) { builder.setResumeToken(resumeToken); @@ -980,8 +986,8 @@ CloseableIterator startStream(@Nullable ByteString resumeToken getTransactionChannelHint(), isRouteToLeader()); session.markUsed(clock.instant()); - call.request(prefetchChunks); stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin()); + call.request(prefetchChunks); return stream; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index fdc0398d5fe..659ac36e7f5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -150,6 +150,10 @@ interface CloseableIterator extends Iterator { void close(@Nullable String message); boolean isWithBeginTransaction(); + + default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { + return false; + } } static double valueProtoToFloat64(com.google.protobuf.Value proto) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java index dfedcc4f8be..dbd7f93a3c6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java @@ -18,6 +18,7 @@ import com.google.api.core.ApiFuture; import com.google.common.base.Function; +import com.google.spanner.v1.PartialResultSet; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -223,4 +224,24 @@ interface ReadyCallback { * @param transformer function which will be used to transform the row. It should not return null. */ List toList(Function transformer) throws SpannerException; + + /** + * An interface to register the listener for streaming gRPC request. It will be called when a + * chunk is received from gRPC streaming call. + */ + interface StreamMessageListener { + void onStreamMessage( + PartialResultSet partialResultSet, + int prefetchChunks, + int currentBufferSize, + StreamMessageRequestor streamMessageRequestor); + } + + /** + * An interface to request more messages from the gRPC streaming call. It will be implemented by + * the class which has access to SpannerRpc.StreamingCall object + */ + interface StreamMessageRequestor { + void requestMessages(int numOfMessages); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index fa7cc158c19..0fd3569e211 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -18,7 +18,6 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; -import com.google.api.core.ListenableFutureToApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.core.ExecutorProvider; import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet; @@ -29,28 +28,25 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.ResultSetStats; import java.util.Collection; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.*; import java.util.logging.Level; import java.util.logging.Logger; /** Default implementation for {@link AsyncResultSet}. */ -class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet { +class AsyncResultSetImpl extends ForwardingStructReader + implements ListenableAsyncResultSet, AsyncResultSet.StreamMessageListener { private static final Logger log = Logger.getLogger(AsyncResultSetImpl.class.getName()); /** State of an {@link AsyncResultSetImpl}. */ private enum State { INITIALIZED, + IN_PROGRESS, /** SYNC indicates that the {@link ResultSet} is used in sync pattern. */ SYNC, CONSUMING, @@ -115,12 +111,15 @@ private enum State { private State state = State.INITIALIZED; + /** This variable indicates that produce rows thread is initiated */ + private volatile boolean produceRowsInitiated; + /** * This variable indicates whether all the results from the underlying result set have been read. */ private volatile boolean finished; - private volatile ApiFuture result; + private volatile SettableApiFuture result; /** * This variable indicates whether {@link #tryNext()} has returned {@link CursorState#DONE} or a @@ -329,12 +328,12 @@ public void run() { private final CallbackRunnable callbackRunnable = new CallbackRunnable(); /** - * {@link ProduceRowsCallable} reads data from the underlying {@link ResultSet}, places these in + * {@link ProduceRowsRunnable} reads data from the underlying {@link ResultSet}, places these in * the buffer and dispatches the {@link CallbackRunnable} when data is ready to be consumed. */ - private class ProduceRowsCallable implements Callable { + private class ProduceRowsRunnable implements Runnable { @Override - public Void call() throws Exception { + public void run() { boolean stop = false; boolean hasNext = false; try { @@ -393,12 +392,17 @@ public Void call() throws Exception { } // Call the callback if there are still rows in the buffer that need to be processed. while (!stop) { - waitIfPaused(); - startCallbackIfNecessary(); - // Make sure we wait until the callback runner has actually finished. - consumingLatch.await(); - synchronized (monitor) { - stop = cursorReturnedDoneOrException; + try { + waitIfPaused(); + startCallbackIfNecessary(); + // Make sure we wait until the callback runner has actually finished. + consumingLatch.await(); + synchronized (monitor) { + stop = cursorReturnedDoneOrException; + } + } catch (InterruptedException e) { + result.setException(e); + return; } } } finally { @@ -410,14 +414,16 @@ public Void call() throws Exception { } synchronized (monitor) { if (executionException != null) { + result.setException(executionException); throw executionException; } if (state == State.CANCELLED) { + result.setException(CANCELLED_EXCEPTION); throw CANCELLED_EXCEPTION; } } } - return null; + result.set(null); } private void waitIfPaused() throws InterruptedException { @@ -449,6 +455,21 @@ private void startCallbackWithBufferLatchIfNecessary(int bufferLatch) { } } + private class InitiateStreamingRunnable implements Runnable { + + @Override + public void run() { + try { + if (!initiateStreaming(AsyncResultSetImpl.this)) { + initiateProduceRows(); + } + } catch (SpannerException e) { + executionException = e; + initiateProduceRows(); + } + } + } + /** Sets the callback for this {@link AsyncResultSet}. */ @Override public ApiFuture setCallback(Executor exec, ReadyCallback cb) { @@ -458,16 +479,24 @@ public ApiFuture setCallback(Executor exec, ReadyCallback cb) { this.state == State.INITIALIZED, "callback may not be set multiple times"); // Start to fetch data and buffer these. - this.result = - new ListenableFutureToApiFuture<>(this.service.submit(new ProduceRowsCallable())); + this.result = SettableApiFuture.create(); + this.state = State.IN_PROGRESS; + this.service.execute(new InitiateStreamingRunnable()); this.executor = MoreExecutors.newSequentialExecutor(Preconditions.checkNotNull(exec)); this.callback = Preconditions.checkNotNull(cb); - this.state = State.RUNNING; pausedLatch.countDown(); return result; } } + private void initiateProduceRows() { + this.service.execute(new ProduceRowsRunnable()); + if (this.state == State.IN_PROGRESS) { + this.state = State.RUNNING; + } + produceRowsInitiated = true; + } + Future getResult() { return result; } @@ -578,6 +607,11 @@ public ResultSetMetadata getMetadata() { return delegateResultSet.get().getMetadata(); } + @Override + public boolean initiateStreaming(StreamMessageListener streamMessageListener) { + return delegateResultSet.get().initiateStreaming(streamMessageListener); + } + @Override protected void checkValidState() { synchronized (monitor) { @@ -593,4 +627,28 @@ public Struct getCurrentRowAsStruct() { checkValidState(); return currentRow; } + + @Override + public void onStreamMessage( + PartialResultSet partialResultSet, + int prefetchChunks, + int currentBufferSize, + StreamMessageRequestor streamMessageRequestor) { + synchronized (monitor) { + if (produceRowsInitiated) { + return; + } + // if PartialResultSet contains resume token or buffer size is more than configured size or + // we have reached end of stream, we can start the thread + boolean startJobThread = + !partialResultSet.getResumeToken().isEmpty() + || currentBufferSize >= prefetchChunks + || partialResultSet == GrpcStreamIterator.END_OF_STREAM; + if (startJobThread || state != State.IN_PROGRESS) { + initiateProduceRows(); + } else { + streamMessageRequestor.requestMessages(1); + } + } + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java index babbb310a45..5ed39a92ffc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java @@ -102,4 +102,9 @@ public ResultSetStats getStats() { public ResultSetMetadata getMetadata() { return delegate.get().getMetadata(); } + + @Override + public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { + return delegate.get().initiateStreaming(streamMessageListener); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java index 23c9dd7c2d3..fcf7ceaa8c2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java @@ -123,6 +123,11 @@ public ResultSetMetadata getMetadata() { return metadata; } + @Override + public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { + return iterator.initiateStreaming(streamMessageListener); + } + @Override public void close() { synchronized (this) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java index af6b5683502..b74bc77318c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java @@ -23,6 +23,7 @@ import com.google.common.collect.AbstractIterator; import com.google.common.util.concurrent.Uninterruptibles; import com.google.spanner.v1.PartialResultSet; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -34,13 +35,15 @@ /** Adapts a streaming read/query call into an iterator over partial result sets. */ @VisibleForTesting class GrpcStreamIterator extends AbstractIterator - implements CloseableIterator { + implements CloseableIterator, AsyncResultSet.StreamMessageRequestor { private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName()); - private static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build(); + public static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build(); + private AsyncResultSet.StreamMessageListener streamMessageListener; private final ConsumerImpl consumer; private final BlockingQueue stream; private final Statement statement; + private final int prefetchChunks; private SpannerRpc.StreamingCall call; private volatile boolean withBeginTransaction; @@ -57,15 +60,20 @@ class GrpcStreamIterator extends AbstractIterator GrpcStreamIterator( Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) { this.statement = statement; + this.prefetchChunks = prefetchChunks; this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed); // One extra to allow for END_OF_STREAM message. - this.stream = new LinkedBlockingQueue<>(prefetchChunks + 1); + this.stream = new LinkedBlockingQueue<>((prefetchChunks * 2) + 1); } protected final SpannerRpc.ResultStreamConsumer consumer() { return consumer; } + public void registerListener(AsyncResultSet.StreamMessageListener streamMessageListener) { + this.streamMessageListener = streamMessageListener; + } + public void setCall(SpannerRpc.StreamingCall call, boolean withBeginTransaction) { this.call = call; this.withBeginTransaction = withBeginTransaction; @@ -135,6 +143,12 @@ protected final PartialResultSet computeNext() { private void addToStream(PartialResultSet results) { // We assume that nothing from the user will interrupt gRPC event threads. Uninterruptibles.putUninterruptibly(stream, results); + onStreamMessage(results); + } + + @Override + public void requestMessages(int numOfMessages) { + call.request(numOfMessages); } private class ConsumerImpl implements SpannerRpc.ResultStreamConsumer { @@ -182,4 +196,9 @@ public boolean cancelQueryWhenClientIsClosed() { return this.cancelQueryWhenClientIsClosed; } } + + private void onStreamMessage(PartialResultSet partialResultSet) { + Optional.ofNullable(streamMessageListener) + .ifPresent(sl -> sl.onStreamMessage(partialResultSet, prefetchChunks, stream.size(), this)); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java index 1a3df8b9123..24c431eec31 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java @@ -127,6 +127,10 @@ ResultSetStats getStats() { return statistics; } + boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { + return stream.initiateStreaming(streamMessageListener); + } + Type type() { checkState(type != null, "metadata has not been received"); return type; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java index cd6fa10b996..9a21e8d40db 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java @@ -82,4 +82,12 @@ public interface ResultSet extends AutoCloseable, StructReader { default ResultSetMetadata getMetadata() { throw new UnsupportedOperationException("Method should be overridden"); } + + /** + * Returns the {@link boolean} for this {@link ResultSet}. This method will be used by + * AsyncResultSet to initiate gRPC streaming + */ + default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { + return false; + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java index 3e82ab7d5ff..b94bdaf8482 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java @@ -58,6 +58,7 @@ abstract class ResumableStreamIterator extends AbstractIterator retryableCodes; private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName()); @@ -196,7 +197,8 @@ public void execute(Runnable command) { } } - abstract CloseableIterator startStream(@Nullable ByteString resumeToken); + abstract CloseableIterator startStream( + @Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamMessageListener); /** * Prepares the iterator for a retry on a different gRPC channel. Returns true if that is @@ -220,23 +222,20 @@ public boolean isWithBeginTransaction() { return stream != null && stream.isWithBeginTransaction(); } + @Override + public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { + this.streamMessageListener = streamMessageListener; + startGrpcStreaming(); + return true; + } + @Override protected PartialResultSet computeNext() { int numAttemptsOnOtherChannel = 0; Context context = Context.current(); while (true) { // Eagerly start stream before consuming any buffered items. - if (stream == null) { - span.addAnnotation( - "Starting/Resuming stream", - "ResumeToken", - resumeToken == null ? "null" : resumeToken.toStringUtf8()); - try (IScope scope = tracer.withSpan(span)) { - // When start a new stream set the Span as current to make the gRPC Span a child of - // this Span. - stream = checkNotNull(startStream(resumeToken)); - } - } + startGrpcStreaming(); // Buffer contains items up to a resume token or has reached capacity: flush. if (!buffer.isEmpty() && (finished || !safeToRetry || !buffer.getLast().getResumeToken().isEmpty())) { @@ -315,6 +314,20 @@ && prepareIteratorForRetryOnDifferentGrpcChannel()) { } } + private void startGrpcStreaming() { + if (stream == null) { + span.addAnnotation( + "Starting/Resuming stream", + "ResumeToken", + resumeToken == null ? "null" : resumeToken.toStringUtf8()); + try (IScope scope = tracer.withSpan(span)) { + // When start a new stream set the Span as current to make the gRPC Span a child of + // this Span. + stream = checkNotNull(startStream(resumeToken, streamMessageListener)); + } + } + } + boolean isRetryable(SpannerException spannerException) { return spannerException.isRetryable() || retryableCodes.contains( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index cf50fa44c77..f98d96eef15 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -288,6 +288,35 @@ public boolean next() throws SpannerException { } } + @Override + public boolean initiateStreaming( + AsyncResultSet.StreamMessageListener streamMessageListener) { + try { + boolean ret = super.initiateStreaming(streamMessageListener); + if (beforeFirst) { + synchronized (lock) { + session.get().markUsed(); + beforeFirst = false; + sessionUsedForQuery = true; + } + } + if (!ret && isSingleUse) { + close(); + } + return ret; + } catch (SessionNotFoundException e) { + throw e; + } catch (SpannerException e) { + synchronized (lock) { + if (!closed && isSingleUse) { + session.get().setLastException(e); + AutoClosingReadContext.this.close(); + } + } + throw e; + } + } + private boolean internalNext() { try { boolean ret = super.next(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java index 98497fbf140..7dd9e653a81 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java @@ -22,8 +22,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; import com.google.api.core.ApiFuture; import com.google.api.gax.core.ExecutorProvider; @@ -32,6 +32,9 @@ import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; import com.google.common.base.Function; import com.google.common.collect.Range; +import com.google.protobuf.ByteString; +import com.google.protobuf.Value; +import com.google.spanner.v1.PartialResultSet; import java.util.List; import java.util.concurrent.BlockingDeque; import java.util.concurrent.CountDownLatch; @@ -48,6 +51,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -388,6 +392,13 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled() try (AsyncResultSetImpl rs = new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { + + when(delegate.initiateStreaming(any(AsyncResultSet.StreamMessageListener.class))) + .thenAnswer( + answer -> { + rs.onStreamMessage(PartialResultSet.newBuilder().build(), 4, 1, null); + return null; + }); callbackResult = rs.setCallback( executor, @@ -498,4 +509,81 @@ public void callbackReturnsDoneBeforeEnd_shouldStopIteration() throws Exception rs.getResult().get(10L, TimeUnit.SECONDS); } } + + @Test + public void testOnStreamMessageWhenResumeTokenIsPresent() { + ResultSet delegate = mock(ResultSet.class); + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { + AsyncResultSet.StreamMessageRequestor streamMessageRequestor = + Mockito.mock(AsyncResultSet.StreamMessageRequestor.class); + // Marking Streaming as supported + Mockito.when( + delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class))) + .thenReturn(true); + + rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE); + rs.onStreamMessage( + PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), + 4, + 1, + streamMessageRequestor); + Mockito.verify(streamMessageRequestor, times(1)).requestMessages(Mockito.eq(1)); + + rs.onStreamMessage( + PartialResultSet.newBuilder().setResumeToken(ByteString.copyFromUtf8("test")).build(), + 4, + 2, + streamMessageRequestor); + Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any()); + Mockito.verify(streamMessageRequestor, times(1)).requestMessages(Mockito.eq(1)); + } + } + + @Test + public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() { + ResultSet delegate = mock(ResultSet.class); + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { + AsyncResultSet.StreamMessageRequestor streamMessageRequestor = + Mockito.mock(AsyncResultSet.StreamMessageRequestor.class); + // Marking Streaming as supported + Mockito.when( + delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class))) + .thenReturn(true); + + rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE); + rs.onStreamMessage( + PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), + 4, + 4, + streamMessageRequestor); + Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any()); + Mockito.verify(streamMessageRequestor, times(0)).requestMessages(Mockito.eq(1)); + } + } + + @Test + public void testOnStreamMessageWhenAsyncResultIsCancelled() { + ResultSet delegate = mock(ResultSet.class); + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { + AsyncResultSet.StreamMessageRequestor streamMessageRequestor = + Mockito.mock(AsyncResultSet.StreamMessageRequestor.class); + // Marking Streaming as supported + Mockito.when( + delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class))) + .thenReturn(true); + + rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE); + rs.cancel(); + rs.onStreamMessage( + PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), + 1, + 4, + streamMessageRequestor); + Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any()); + Mockito.verify(streamMessageRequestor, times(0)).requestMessages(Mockito.eq(1)); + } + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java index d126719ebb8..ebe86724678 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java @@ -64,7 +64,8 @@ public class ResumableStreamIteratorTest { interface Starter { AbstractResultSet.CloseableIterator startStream( - @Nullable ByteString resumeToken); + @Nullable ByteString resumeToken, + AsyncResultSet.StreamMessageListener streamMessageListener); } interface ResultSetStream { @@ -164,8 +165,9 @@ private void initWithLimit(int maxBufferSize) { SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()) { @Override AbstractResultSet.CloseableIterator startStream( - @Nullable ByteString resumeToken) { - return starter.startStream(resumeToken); + @Nullable ByteString resumeToken, + AsyncResultSet.StreamMessageListener streamMessageListener) { + return starter.startStream(resumeToken, null); } }; } @@ -173,7 +175,7 @@ AbstractResultSet.CloseableIterator startStream( @Test public void simple() { ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(null, "a")) .thenReturn(resultSet(null, "b")) @@ -195,7 +197,7 @@ public void closedOTSpan() { setInternalState(ResumableStreamIterator.class, this.resumableStreamIterator, "span", span); ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b")) @@ -218,7 +220,7 @@ public void closedOCSpan() { setInternalState(ResumableStreamIterator.class, this.resumableStreamIterator, "span", span); ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b")) @@ -232,14 +234,14 @@ public void closedOCSpan() { @Test public void restart() { ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b")) .thenThrow(new RetryableException(errorCodeParameter, "failed by test")); ResultSetStream s2 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"))) + Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"), null)) .thenReturn(new ResultSetIterator(s2)); Mockito.when(s2.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "c")) @@ -251,7 +253,7 @@ public void restart() { @Test public void restartWithHoldBack() { ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b")) @@ -260,7 +262,7 @@ public void restartWithHoldBack() { .thenThrow(new RetryableException(errorCodeParameter, "failed by test")); ResultSetStream s2 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"))) + Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"), null)) .thenReturn(new ResultSetIterator(s2)); Mockito.when(s2.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "c")) @@ -272,7 +274,7 @@ public void restartWithHoldBack() { @Test public void restartWithHoldBackMidStream() { ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenReturn(resultSet(null, "b")) @@ -281,7 +283,7 @@ public void restartWithHoldBackMidStream() { .thenThrow(new RetryableException(errorCodeParameter, "failed by test")); ResultSetStream s2 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"))) + Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"), null)) .thenReturn(new ResultSetIterator(s2)); Mockito.when(s2.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "e")) @@ -304,7 +306,7 @@ public void retryableErrorWithoutRetryInfo() throws IOException { ResumableStreamIterator.class, this.resumableStreamIterator, "backOff", backOff); ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenThrow( @@ -312,7 +314,7 @@ public void retryableErrorWithoutRetryInfo() throws IOException { ErrorCode.UNAVAILABLE, "failed by test", Status.UNAVAILABLE.asRuntimeException())); ResultSetStream s2 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(ByteString.copyFromUtf8("r1"))) + Mockito.when(starter.startStream(ByteString.copyFromUtf8("r1"), null)) .thenReturn(new ResultSetIterator(s2)); Mockito.when(s2.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b")) @@ -324,7 +326,7 @@ public void retryableErrorWithoutRetryInfo() throws IOException { @Test public void nonRetryableError() { ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b")) @@ -343,7 +345,7 @@ public void bufferLimitSimple() { initWithLimit(1); ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(null, "a")) .thenReturn(resultSet(null, "b")) @@ -356,7 +358,7 @@ public void bufferLimitSimpleWithRestartTokens() { initWithLimit(1); ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b")) @@ -369,14 +371,14 @@ public void bufferLimitRestart() { initWithLimit(1); ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenReturn(resultSet(ByteString.copyFromUtf8("r2"), "b")) .thenThrow(new RetryableException(errorCodeParameter, "failed by test")); ResultSetStream s2 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"))) + Mockito.when(starter.startStream(ByteString.copyFromUtf8("r2"), null)) .thenReturn(new ResultSetIterator(s2)); Mockito.when(s2.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r3"), "c")) @@ -390,13 +392,13 @@ public void bufferLimitRestartWithinLimitAtStartOfResults() { initWithLimit(1); ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(null, "XXXXXX")) .thenThrow(new RetryableException(errorCodeParameter, "failed by test")); ResultSetStream s2 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s2)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s2)); Mockito.when(s2.next()) .thenReturn(resultSet(null, "a")) .thenReturn(resultSet(null, "b")) @@ -409,14 +411,14 @@ public void bufferLimitRestartWithinLimitMidResults() { initWithLimit(1); ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenReturn(resultSet(null, "XXXXXX")) .thenThrow(new RetryableException(errorCodeParameter, "failed by test")); ResultSetStream s2 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(ByteString.copyFromUtf8("r1"))) + Mockito.when(starter.startStream(ByteString.copyFromUtf8("r1"), null)) .thenReturn(new ResultSetIterator(s2)); Mockito.when(s2.next()) .thenReturn(resultSet(null, "b")) @@ -430,7 +432,7 @@ public void bufferLimitMissingTokensUnsafeToRetry() { initWithLimit(1); ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenReturn(resultSet(null, "b")) @@ -447,7 +449,7 @@ public void bufferLimitMissingTokensSafeToRetry() { initWithLimit(1); ResultSetStream s1 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(null)).thenReturn(new ResultSetIterator(s1)); + Mockito.when(starter.startStream(null, null)).thenReturn(new ResultSetIterator(s1)); Mockito.when(s1.next()) .thenReturn(resultSet(ByteString.copyFromUtf8("r1"), "a")) .thenReturn(resultSet(null, "b")) @@ -455,7 +457,7 @@ public void bufferLimitMissingTokensSafeToRetry() { .thenThrow(new RetryableException(errorCodeParameter, "failed by test")); ResultSetStream s2 = Mockito.mock(ResultSetStream.class); - Mockito.when(starter.startStream(ByteString.copyFromUtf8("r3"))) + Mockito.when(starter.startStream(ByteString.copyFromUtf8("r3"), null)) .thenReturn(new ResultSetIterator(s2)); Mockito.when(s2.next()).thenReturn(resultSet(null, "d")).thenReturn(null); From 66152839cb33f17903227bd7b3ff54ed0354d9b4 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Tue, 5 Nov 2024 13:57:39 +0530 Subject: [PATCH 2/6] Addressed comments --- .../clirr-ignored-differences.xml | 10 ++--- .../google/cloud/spanner/AsyncResultSet.java | 3 +- .../cloud/spanner/AsyncResultSetImpl.java | 41 +++++++++++-------- .../cloud/spanner/ForwardingResultSet.java | 5 ++- .../google/cloud/spanner/GrpcResultSet.java | 3 +- .../cloud/spanner/GrpcStreamIterator.java | 3 +- .../com/google/cloud/spanner/ResultSet.java | 8 ---- .../com/google/cloud/spanner/SessionPool.java | 9 ++-- .../cloud/spanner/StreamingResultSet.java | 27 ++++++++++++ .../google/cloud/spanner/StreamingUtil.java | 27 ++++++++++++ .../cloud/spanner/AsyncResultSetImplTest.java | 26 ++++++------ 11 files changed, 105 insertions(+), 57 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index a9d82988314..4b36353ac53 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -758,7 +758,7 @@ com/google/cloud/spanner/connection/Connection boolean isKeepTransactionAlive() - + 7012 @@ -790,9 +790,5 @@ com/google/cloud/spanner/connection/Connection boolean isAutoBatchDmlUpdateCountVerification() - - 7012 - com/google/cloud/spanner/ResultSet - boolean initiateStreaming(com.google.cloud.spanner.AsyncResultSet$StreamMessageListener) - - + + \ No newline at end of file diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java index dbd7f93a3c6..aa374b69d3e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java @@ -232,8 +232,7 @@ interface ReadyCallback { interface StreamMessageListener { void onStreamMessage( PartialResultSet partialResultSet, - int prefetchChunks, - int currentBufferSize, + boolean bufferIsFull, StreamMessageRequestor streamMessageRequestor); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index 0fd3569e211..d449179260b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -34,7 +34,12 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; import java.util.logging.Level; import java.util.logging.Logger; @@ -46,7 +51,7 @@ class AsyncResultSetImpl extends ForwardingStructReader /** State of an {@link AsyncResultSetImpl}. */ private enum State { INITIALIZED, - IN_PROGRESS, + STREAMING_INITIALIZED, /** SYNC indicates that the {@link ResultSet} is used in sync pattern. */ SYNC, CONSUMING, @@ -400,7 +405,7 @@ public void run() { synchronized (monitor) { stop = cursorReturnedDoneOrException; } - } catch (InterruptedException e) { + } catch (Throwable e) { result.setException(e); return; } @@ -415,15 +420,13 @@ public void run() { synchronized (monitor) { if (executionException != null) { result.setException(executionException); - throw executionException; - } - if (state == State.CANCELLED) { + } else if (state == State.CANCELLED) { result.setException(CANCELLED_EXCEPTION); - throw CANCELLED_EXCEPTION; + } else { + result.set(null); } } } - result.set(null); } private void waitIfPaused() throws InterruptedException { @@ -460,6 +463,12 @@ private class InitiateStreamingRunnable implements Runnable { @Override public void run() { try { + // This method returns true if the underlying result set is a streaming result set (e.g. a + // GrpcResultSet). + // Those result sets will trigger initiateProduceRows() when the first results are received. + // Non-streaming result sets do not trigger this callback, and for those result sets, we + // need to eagerly + // start the ProduceRowsRunnable. if (!initiateStreaming(AsyncResultSetImpl.this)) { initiateProduceRows(); } @@ -480,7 +489,7 @@ public ApiFuture setCallback(Executor exec, ReadyCallback cb) { // Start to fetch data and buffer these. this.result = SettableApiFuture.create(); - this.state = State.IN_PROGRESS; + this.state = State.STREAMING_INITIALIZED; this.service.execute(new InitiateStreamingRunnable()); this.executor = MoreExecutors.newSequentialExecutor(Preconditions.checkNotNull(exec)); this.callback = Preconditions.checkNotNull(cb); @@ -491,7 +500,7 @@ public ApiFuture setCallback(Executor exec, ReadyCallback cb) { private void initiateProduceRows() { this.service.execute(new ProduceRowsRunnable()); - if (this.state == State.IN_PROGRESS) { + if (this.state == State.STREAMING_INITIALIZED) { this.state = State.RUNNING; } produceRowsInitiated = true; @@ -607,9 +616,8 @@ public ResultSetMetadata getMetadata() { return delegateResultSet.get().getMetadata(); } - @Override public boolean initiateStreaming(StreamMessageListener streamMessageListener) { - return delegateResultSet.get().initiateStreaming(streamMessageListener); + return StreamingUtil.initiateStreaming(delegateResultSet.get(), streamMessageListener); } @Override @@ -631,20 +639,19 @@ public Struct getCurrentRowAsStruct() { @Override public void onStreamMessage( PartialResultSet partialResultSet, - int prefetchChunks, - int currentBufferSize, + boolean bufferIsFull, StreamMessageRequestor streamMessageRequestor) { synchronized (monitor) { if (produceRowsInitiated) { return; } - // if PartialResultSet contains resume token or buffer size is more than configured size or + // if PartialResultSet contains resume token or buffer size is ful or // we have reached end of stream, we can start the thread boolean startJobThread = !partialResultSet.getResumeToken().isEmpty() - || currentBufferSize >= prefetchChunks + || bufferIsFull || partialResultSet == GrpcStreamIterator.END_OF_STREAM; - if (startJobThread || state != State.IN_PROGRESS) { + if (startJobThread || state != State.STREAMING_INITIALIZED) { initiateProduceRows(); } else { streamMessageRequestor.requestMessages(1); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java index 5ed39a92ffc..c3c6e5d91c3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java @@ -23,7 +23,8 @@ import com.google.spanner.v1.ResultSetStats; /** Forwarding implementation of ResultSet that forwards all calls to a delegate. */ -public class ForwardingResultSet extends ForwardingStructReader implements ProtobufResultSet { +public class ForwardingResultSet extends ForwardingStructReader + implements ProtobufResultSet, StreamingResultSet { private Supplier delegate; @@ -105,6 +106,6 @@ public ResultSetMetadata getMetadata() { @Override public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { - return delegate.get().initiateStreaming(streamMessageListener); + return StreamingUtil.initiateStreaming(delegate.get(), streamMessageListener); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java index fcf7ceaa8c2..6af9c9a0e1e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java @@ -30,7 +30,8 @@ import javax.annotation.Nullable; @VisibleForTesting -class GrpcResultSet extends AbstractResultSet> implements ProtobufResultSet { +class GrpcResultSet extends AbstractResultSet> + implements ProtobufResultSet, StreamingResultSet { private final GrpcValueIterator iterator; private final Listener listener; private final DecodeMode decodeMode; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java index b74bc77318c..eecf172cead 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java @@ -199,6 +199,7 @@ public boolean cancelQueryWhenClientIsClosed() { private void onStreamMessage(PartialResultSet partialResultSet) { Optional.ofNullable(streamMessageListener) - .ifPresent(sl -> sl.onStreamMessage(partialResultSet, prefetchChunks, stream.size(), this)); + .ifPresent( + sl -> sl.onStreamMessage(partialResultSet, prefetchChunks >= stream.size(), this)); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java index 9a21e8d40db..cd6fa10b996 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java @@ -82,12 +82,4 @@ public interface ResultSet extends AutoCloseable, StructReader { default ResultSetMetadata getMetadata() { throw new UnsupportedOperationException("Method should be overridden"); } - - /** - * Returns the {@link boolean} for this {@link ResultSet}. This method will be used by - * AsyncResultSet to initiate gRPC streaming - */ - default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { - return false; - } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index f98d96eef15..6ca04226262 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -292,18 +292,17 @@ public boolean next() throws SpannerException { public boolean initiateStreaming( AsyncResultSet.StreamMessageListener streamMessageListener) { try { - boolean ret = super.initiateStreaming(streamMessageListener); - if (beforeFirst) { + boolean streamInitiated = super.initiateStreaming(streamMessageListener); + if (!streamInitiated) { synchronized (lock) { session.get().markUsed(); - beforeFirst = false; sessionUsedForQuery = true; } } - if (!ret && isSingleUse) { + if (!streamInitiated && isSingleUse) { close(); } - return ret; + return streamInitiated; } catch (SessionNotFoundException e) { throw e; } catch (SpannerException e) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java new file mode 100644 index 00000000000..5b765f2ba77 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +/** Streaming implementation of ResultSet that supports streaming of chunks */ +public interface StreamingResultSet extends ResultSet { + + /** + * Returns the {@link boolean} for this {@link ResultSet}. This method will be used by + * AsyncResultSet to initiate gRPC streaming + */ + boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener); +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java new file mode 100644 index 00000000000..a4c7cc29c0b --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +public class StreamingUtil { + public static boolean initiateStreaming( + ResultSet resultSet, AsyncResultSet.StreamMessageListener streamMessageListener) { + if (resultSet instanceof StreamingResultSet) { + return ((StreamingResultSet) resultSet).initiateStreaming(streamMessageListener); + } + return false; + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java index 7dd9e653a81..49b7cfd6e8a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java @@ -23,7 +23,9 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; import com.google.api.core.ApiFuture; import com.google.api.gax.core.ExecutorProvider; @@ -385,7 +387,7 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable { public void testCallbackIsNotCalledWhilePausedAndCanceled() throws InterruptedException, ExecutionException { Executor executor = Executors.newSingleThreadExecutor(); - ResultSet delegate = mock(ResultSet.class); + StreamingResultSet delegate = mock(StreamingResultSet.class); final AtomicInteger callbackCounter = new AtomicInteger(); ApiFuture callbackResult; @@ -396,7 +398,7 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled() when(delegate.initiateStreaming(any(AsyncResultSet.StreamMessageListener.class))) .thenAnswer( answer -> { - rs.onStreamMessage(PartialResultSet.newBuilder().build(), 4, 1, null); + rs.onStreamMessage(PartialResultSet.newBuilder().build(), false, null); return null; }); callbackResult = @@ -512,7 +514,7 @@ public void callbackReturnsDoneBeforeEnd_shouldStopIteration() throws Exception @Test public void testOnStreamMessageWhenResumeTokenIsPresent() { - ResultSet delegate = mock(ResultSet.class); + StreamingResultSet delegate = mock(StreamingResultSet.class); try (AsyncResultSetImpl rs = new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { AsyncResultSet.StreamMessageRequestor streamMessageRequestor = @@ -525,15 +527,13 @@ public void testOnStreamMessageWhenResumeTokenIsPresent() { rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE); rs.onStreamMessage( PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), - 4, - 1, + false, streamMessageRequestor); Mockito.verify(streamMessageRequestor, times(1)).requestMessages(Mockito.eq(1)); rs.onStreamMessage( PartialResultSet.newBuilder().setResumeToken(ByteString.copyFromUtf8("test")).build(), - 4, - 2, + false, streamMessageRequestor); Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any()); Mockito.verify(streamMessageRequestor, times(1)).requestMessages(Mockito.eq(1)); @@ -542,7 +542,7 @@ public void testOnStreamMessageWhenResumeTokenIsPresent() { @Test public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() { - ResultSet delegate = mock(ResultSet.class); + StreamingResultSet delegate = mock(StreamingResultSet.class); try (AsyncResultSetImpl rs = new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { AsyncResultSet.StreamMessageRequestor streamMessageRequestor = @@ -555,8 +555,7 @@ public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() { rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE); rs.onStreamMessage( PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), - 4, - 4, + true, streamMessageRequestor); Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any()); Mockito.verify(streamMessageRequestor, times(0)).requestMessages(Mockito.eq(1)); @@ -565,7 +564,7 @@ public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() { @Test public void testOnStreamMessageWhenAsyncResultIsCancelled() { - ResultSet delegate = mock(ResultSet.class); + StreamingResultSet delegate = mock(StreamingResultSet.class); try (AsyncResultSetImpl rs = new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { AsyncResultSet.StreamMessageRequestor streamMessageRequestor = @@ -579,8 +578,7 @@ public void testOnStreamMessageWhenAsyncResultIsCancelled() { rs.cancel(); rs.onStreamMessage( PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), - 1, - 4, + false, streamMessageRequestor); Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any()); Mockito.verify(streamMessageRequestor, times(0)).requestMessages(Mockito.eq(1)); From 798d0427e2f39d451df0ef3af3085b731cb66a93 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Thu, 7 Nov 2024 14:27:13 +0530 Subject: [PATCH 3/6] Addressed comments --- .../google/cloud/spanner/AsyncResultSet.java | 13 +-------- .../cloud/spanner/AsyncResultSetImpl.java | 7 +---- .../cloud/spanner/GrpcStreamIterator.java | 10 ++----- .../cloud/spanner/AsyncResultSetImplTest.java | 27 ++++--------------- 4 files changed, 9 insertions(+), 48 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java index aa374b69d3e..2b3225bfc59 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java @@ -230,17 +230,6 @@ interface ReadyCallback { * chunk is received from gRPC streaming call. */ interface StreamMessageListener { - void onStreamMessage( - PartialResultSet partialResultSet, - boolean bufferIsFull, - StreamMessageRequestor streamMessageRequestor); - } - - /** - * An interface to request more messages from the gRPC streaming call. It will be implemented by - * the class which has access to SpannerRpc.StreamingCall object - */ - interface StreamMessageRequestor { - void requestMessages(int numOfMessages); + void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index d449179260b..fe4bfa767a6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -637,10 +637,7 @@ public Struct getCurrentRowAsStruct() { } @Override - public void onStreamMessage( - PartialResultSet partialResultSet, - boolean bufferIsFull, - StreamMessageRequestor streamMessageRequestor) { + public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull) { synchronized (monitor) { if (produceRowsInitiated) { return; @@ -653,8 +650,6 @@ public void onStreamMessage( || partialResultSet == GrpcStreamIterator.END_OF_STREAM; if (startJobThread || state != State.STREAMING_INITIALIZED) { initiateProduceRows(); - } else { - streamMessageRequestor.requestMessages(1); } } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java index eecf172cead..1e48161daee 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java @@ -35,7 +35,7 @@ /** Adapts a streaming read/query call into an iterator over partial result sets. */ @VisibleForTesting class GrpcStreamIterator extends AbstractIterator - implements CloseableIterator, AsyncResultSet.StreamMessageRequestor { + implements CloseableIterator { private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName()); public static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build(); private AsyncResultSet.StreamMessageListener streamMessageListener; @@ -146,11 +146,6 @@ private void addToStream(PartialResultSet results) { onStreamMessage(results); } - @Override - public void requestMessages(int numOfMessages) { - call.request(numOfMessages); - } - private class ConsumerImpl implements SpannerRpc.ResultStreamConsumer { private final boolean cancelQueryWhenClientIsClosed; @@ -199,7 +194,6 @@ public boolean cancelQueryWhenClientIsClosed() { private void onStreamMessage(PartialResultSet partialResultSet) { Optional.ofNullable(streamMessageListener) - .ifPresent( - sl -> sl.onStreamMessage(partialResultSet, prefetchChunks >= stream.size(), this)); + .ifPresent(sl -> sl.onStreamMessage(partialResultSet, prefetchChunks >= stream.size())); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java index 49b7cfd6e8a..0ba924ef740 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java @@ -398,7 +398,7 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled() when(delegate.initiateStreaming(any(AsyncResultSet.StreamMessageListener.class))) .thenAnswer( answer -> { - rs.onStreamMessage(PartialResultSet.newBuilder().build(), false, null); + rs.onStreamMessage(PartialResultSet.newBuilder().build(), false); return null; }); callbackResult = @@ -517,8 +517,6 @@ public void testOnStreamMessageWhenResumeTokenIsPresent() { StreamingResultSet delegate = mock(StreamingResultSet.class); try (AsyncResultSetImpl rs = new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { - AsyncResultSet.StreamMessageRequestor streamMessageRequestor = - Mockito.mock(AsyncResultSet.StreamMessageRequestor.class); // Marking Streaming as supported Mockito.when( delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class))) @@ -526,17 +524,12 @@ public void testOnStreamMessageWhenResumeTokenIsPresent() { rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE); rs.onStreamMessage( - PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), - false, - streamMessageRequestor); - Mockito.verify(streamMessageRequestor, times(1)).requestMessages(Mockito.eq(1)); + PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), false); rs.onStreamMessage( PartialResultSet.newBuilder().setResumeToken(ByteString.copyFromUtf8("test")).build(), - false, - streamMessageRequestor); + false); Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any()); - Mockito.verify(streamMessageRequestor, times(1)).requestMessages(Mockito.eq(1)); } } @@ -545,8 +538,6 @@ public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() { StreamingResultSet delegate = mock(StreamingResultSet.class); try (AsyncResultSetImpl rs = new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { - AsyncResultSet.StreamMessageRequestor streamMessageRequestor = - Mockito.mock(AsyncResultSet.StreamMessageRequestor.class); // Marking Streaming as supported Mockito.when( delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class))) @@ -554,11 +545,8 @@ public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() { rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE); rs.onStreamMessage( - PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), - true, - streamMessageRequestor); + PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), true); Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any()); - Mockito.verify(streamMessageRequestor, times(0)).requestMessages(Mockito.eq(1)); } } @@ -567,8 +555,6 @@ public void testOnStreamMessageWhenAsyncResultIsCancelled() { StreamingResultSet delegate = mock(StreamingResultSet.class); try (AsyncResultSetImpl rs = new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { - AsyncResultSet.StreamMessageRequestor streamMessageRequestor = - Mockito.mock(AsyncResultSet.StreamMessageRequestor.class); // Marking Streaming as supported Mockito.when( delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class))) @@ -577,11 +563,8 @@ public void testOnStreamMessageWhenAsyncResultIsCancelled() { rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE); rs.cancel(); rs.onStreamMessage( - PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), - false, - streamMessageRequestor); + PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), false); Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any()); - Mockito.verify(streamMessageRequestor, times(0)).requestMessages(Mockito.eq(1)); } } } From f202ecc6a0e3f3b2280bef071b184e45bb555b79 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Fri, 8 Nov 2024 21:18:11 +0530 Subject: [PATCH 4/6] Addressed comments --- .../clirr-ignored-differences.xml | 6 ++-- .../cloud/spanner/AbstractResultSet.java | 4 +++ .../cloud/spanner/AsyncResultSetImpl.java | 13 ++++----- .../cloud/spanner/GrpcStreamIterator.java | 8 ++---- .../com/google/cloud/spanner/SessionPool.java | 28 ------------------- .../cloud/spanner/StreamingResultSet.java | 2 +- .../google/cloud/spanner/StreamingUtil.java | 4 +-- 7 files changed, 19 insertions(+), 46 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 4b36353ac53..ec13415790c 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -758,7 +758,7 @@ com/google/cloud/spanner/connection/Connection boolean isKeepTransactionAlive() - + 7012 @@ -790,5 +790,5 @@ com/google/cloud/spanner/connection/Connection boolean isAutoBatchDmlUpdateCountVerification() - - \ No newline at end of file + + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 659ac36e7f5..3dca970f96e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -151,6 +151,10 @@ interface CloseableIterator extends Iterator { boolean isWithBeginTransaction(); + /** + * @param streamMessageListener A class object which implements StreamMessageListener + * @return true if streaming is supported by the iterator, otherwise false + */ default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { return false; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index fe4bfa767a6..62e4aec31e0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -467,13 +467,12 @@ public void run() { // GrpcResultSet). // Those result sets will trigger initiateProduceRows() when the first results are received. // Non-streaming result sets do not trigger this callback, and for those result sets, we - // need to eagerly - // start the ProduceRowsRunnable. + // need to eagerly start the ProduceRowsRunnable. if (!initiateStreaming(AsyncResultSetImpl.this)) { initiateProduceRows(); } - } catch (SpannerException e) { - executionException = e; + } catch (Throwable exception) { + executionException = SpannerExceptionFactory.asSpannerException(exception); initiateProduceRows(); } } @@ -499,11 +498,11 @@ public ApiFuture setCallback(Executor exec, ReadyCallback cb) { } private void initiateProduceRows() { - this.service.execute(new ProduceRowsRunnable()); if (this.state == State.STREAMING_INITIALIZED) { this.state = State.RUNNING; } produceRowsInitiated = true; + this.service.execute(new ProduceRowsRunnable()); } Future getResult() { @@ -642,8 +641,8 @@ public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsF if (produceRowsInitiated) { return; } - // if PartialResultSet contains resume token or buffer size is ful or - // we have reached end of stream, we can start the thread + // if PartialResultSet contains a resume token or buffer size is full, or + // we have reached the end of the stream, we can start the thread. boolean startJobThread = !partialResultSet.getResumeToken().isEmpty() || bufferIsFull diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java index 1e48161daee..5b571003352 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java @@ -37,13 +37,12 @@ class GrpcStreamIterator extends AbstractIterator implements CloseableIterator { private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName()); - public static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build(); + static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build(); private AsyncResultSet.StreamMessageListener streamMessageListener; private final ConsumerImpl consumer; private final BlockingQueue stream; private final Statement statement; - private final int prefetchChunks; private SpannerRpc.StreamingCall call; private volatile boolean withBeginTransaction; @@ -60,10 +59,9 @@ class GrpcStreamIterator extends AbstractIterator GrpcStreamIterator( Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) { this.statement = statement; - this.prefetchChunks = prefetchChunks; this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed); // One extra to allow for END_OF_STREAM message. - this.stream = new LinkedBlockingQueue<>((prefetchChunks * 2) + 1); + this.stream = new LinkedBlockingQueue<>(prefetchChunks + 1); } protected final SpannerRpc.ResultStreamConsumer consumer() { @@ -194,6 +192,6 @@ public boolean cancelQueryWhenClientIsClosed() { private void onStreamMessage(PartialResultSet partialResultSet) { Optional.ofNullable(streamMessageListener) - .ifPresent(sl -> sl.onStreamMessage(partialResultSet, prefetchChunks >= stream.size())); + .ifPresent(sl -> sl.onStreamMessage(partialResultSet, stream.remainingCapacity() <= 1)); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 6ca04226262..cf50fa44c77 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -288,34 +288,6 @@ public boolean next() throws SpannerException { } } - @Override - public boolean initiateStreaming( - AsyncResultSet.StreamMessageListener streamMessageListener) { - try { - boolean streamInitiated = super.initiateStreaming(streamMessageListener); - if (!streamInitiated) { - synchronized (lock) { - session.get().markUsed(); - sessionUsedForQuery = true; - } - } - if (!streamInitiated && isSingleUse) { - close(); - } - return streamInitiated; - } catch (SessionNotFoundException e) { - throw e; - } catch (SpannerException e) { - synchronized (lock) { - if (!closed && isSingleUse) { - session.get().setLastException(e); - AutoClosingReadContext.this.close(); - } - } - throw e; - } - } - private boolean internalNext() { try { boolean ret = super.next(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java index 5b765f2ba77..cb371ccf769 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java @@ -17,7 +17,7 @@ package com.google.cloud.spanner; /** Streaming implementation of ResultSet that supports streaming of chunks */ -public interface StreamingResultSet extends ResultSet { +interface StreamingResultSet extends ResultSet { /** * Returns the {@link boolean} for this {@link ResultSet}. This method will be used by diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java index a4c7cc29c0b..587d17c3614 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java @@ -16,8 +16,8 @@ package com.google.cloud.spanner; -public class StreamingUtil { - public static boolean initiateStreaming( +class StreamingUtil { + static boolean initiateStreaming( ResultSet resultSet, AsyncResultSet.StreamMessageListener streamMessageListener) { if (resultSet instanceof StreamingResultSet) { return ((StreamingResultSet) resultSet).initiateStreaming(streamMessageListener); From 44ed7d34cbe88bfdcca06e05b61a2dab9cf531da Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Mon, 11 Nov 2024 13:59:49 +0530 Subject: [PATCH 5/6] Addressed comments --- .../java/com/google/cloud/spanner/AsyncResultSetImpl.java | 2 +- .../java/com/google/cloud/spanner/ForwardingResultSet.java | 2 ++ .../main/java/com/google/cloud/spanner/GrpcResultSet.java | 2 ++ .../java/com/google/cloud/spanner/GrpcStreamIterator.java | 5 +++-- .../com/google/cloud/spanner/ResumableStreamIterator.java | 2 ++ .../java/com/google/cloud/spanner/StreamingResultSet.java | 6 +++++- .../main/java/com/google/cloud/spanner/StreamingUtil.java | 5 ++++- 7 files changed, 19 insertions(+), 5 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index 62e4aec31e0..1161822cd10 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -615,7 +615,7 @@ public ResultSetMetadata getMetadata() { return delegateResultSet.get().getMetadata(); } - public boolean initiateStreaming(StreamMessageListener streamMessageListener) { + boolean initiateStreaming(StreamMessageListener streamMessageListener) { return StreamingUtil.initiateStreaming(delegateResultSet.get(), streamMessageListener); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java index c3c6e5d91c3..3c4883e6586 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java @@ -16,6 +16,7 @@ package com.google.cloud.spanner; +import com.google.api.core.InternalApi; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; @@ -105,6 +106,7 @@ public ResultSetMetadata getMetadata() { } @Override + @InternalApi public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { return StreamingUtil.initiateStreaming(delegate.get(), streamMessageListener); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java index 6af9c9a0e1e..c2a4ee5a585 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java @@ -19,6 +19,7 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; import static com.google.common.base.Preconditions.checkState; +import com.google.api.core.InternalApi; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Value; import com.google.spanner.v1.PartialResultSet; @@ -125,6 +126,7 @@ public ResultSetMetadata getMetadata() { } @Override + @InternalApi public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { return iterator.initiateStreaming(streamMessageListener); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java index 5b571003352..79c02eab58c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java @@ -20,6 +20,7 @@ import com.google.cloud.spanner.AbstractResultSet.CloseableIterator; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.AbstractIterator; import com.google.common.util.concurrent.Uninterruptibles; import com.google.spanner.v1.PartialResultSet; @@ -68,8 +69,8 @@ protected final SpannerRpc.ResultStreamConsumer consumer() { return consumer; } - public void registerListener(AsyncResultSet.StreamMessageListener streamMessageListener) { - this.streamMessageListener = streamMessageListener; + void registerListener(AsyncResultSet.StreamMessageListener streamMessageListener) { + this.streamMessageListener = Preconditions.checkNotNull(streamMessageListener); } public void setCall(SpannerRpc.StreamingCall call, boolean withBeginTransaction) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java index b94bdaf8482..39165da2d38 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java @@ -23,6 +23,7 @@ import com.google.api.client.util.BackOff; import com.google.api.client.util.ExponentialBackOff; +import com.google.api.core.InternalApi; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.StatusCode.Code; @@ -223,6 +224,7 @@ public boolean isWithBeginTransaction() { } @Override + @InternalApi public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) { this.streamMessageListener = streamMessageListener; startGrpcStreaming(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java index cb371ccf769..47b10d852c6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java @@ -16,12 +16,16 @@ package com.google.cloud.spanner; +import com.google.api.core.InternalApi; + /** Streaming implementation of ResultSet that supports streaming of chunks */ interface StreamingResultSet extends ResultSet { /** * Returns the {@link boolean} for this {@link ResultSet}. This method will be used by - * AsyncResultSet to initiate gRPC streaming + * AsyncResultSet internally to initiate gRPC streaming. This method should not be called by the + * users. */ + @InternalApi boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java index 587d17c3614..54496d39f96 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java @@ -16,7 +16,10 @@ package com.google.cloud.spanner; -class StreamingUtil { +final class StreamingUtil { + + private StreamingUtil() {} + static boolean initiateStreaming( ResultSet resultSet, AsyncResultSet.StreamMessageListener streamMessageListener) { if (resultSet instanceof StreamingResultSet) { From b5e353637c8bd3e8b924bd0e3221cf7a1a8dc1d7 Mon Sep 17 00:00:00 2001 From: Sakthivel Subramanian Date: Mon, 11 Nov 2024 14:32:27 +0530 Subject: [PATCH 6/6] Addressed comments --- .../com/google/cloud/spanner/AbstractReadContext.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 3cb7df74c0e..cecf462bd25 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -773,7 +773,9 @@ CloseableIterator startStream( AsyncResultSet.StreamMessageListener streamListener) { GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed); - stream.registerListener(streamListener); + if (streamListener != null) { + stream.registerListener(streamListener); + } if (partitionToken != null) { request.setPartitionToken(partitionToken); } @@ -967,7 +969,9 @@ CloseableIterator startStream( AsyncResultSet.StreamMessageListener streamListener) { GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed); - stream.registerListener(streamListener); + if (streamListener != null) { + stream.registerListener(streamListener); + } TransactionSelector selector = null; if (resumeToken != null) { builder.setResumeToken(resumeToken);