Skip to content
Permalink
Browse files
fix: stop invoking callback after pausing and cancelling result set (#…
…1192)

An AsyncResultSet that was PAUSED and then CANCELLED, would continue to invoke the
callback until the callback would call tryNext(). If the callback never called tryNext(),
the spinning would continue until the entire result set had been consumed.

Fixes #1191
  • Loading branch information
olavloite committed May 19, 2021
1 parent 9935066 commit 78e678448782d5d16ba43ec7c10ab85b89059d88
@@ -252,6 +252,13 @@ public void run() {
if (cursorReturnedDoneOrException) {
break;
}
if (state == State.CANCELLED) {
// The callback should always get at least one chance to catch the CANCELLED
// exception. It is however possible that the callback does not call tryNext(), and
// instead directly returns PAUSE or DONE. In those cases, the callback runner should
// also stop, even though the callback has not seen the CANCELLED state.
cursorReturnedDoneOrException = true;
}
}
CallbackResponse response;
try {
@@ -16,7 +16,9 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.SpannerApiFutures.get;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
@@ -371,6 +373,33 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
}
}

@Test
public void testCallbackIsNotCalledWhilePausedAndCanceled()
throws InterruptedException, ExecutionException {
Executor executor = Executors.newSingleThreadExecutor();
ResultSet delegate = mock(ResultSet.class);

final AtomicInteger callbackCounter = new AtomicInteger();
ApiFuture<Void> callbackResult;

try (AsyncResultSetImpl rs =
new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
callbackResult =
rs.setCallback(
executor,
resultSet -> {
callbackCounter.getAndIncrement();
return CallbackResponse.PAUSE;
});

rs.cancel();

SpannerException exception = assertThrows(SpannerException.class, () -> get(callbackResult));
assertEquals(ErrorCode.CANCELLED, exception.getErrorCode());
assertEquals(1, callbackCounter.get());
}
}

@Test
public void cancel() throws InterruptedException {
Executor executor = Executors.newSingleThreadExecutor();

0 comments on commit 78e6784

Please sign in to comment.