diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriber.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriber.java
index dea02a581..7e03360d4 100644
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriber.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriber.java
@@ -16,24 +16,24 @@
package com.google.cloud.pubsublite.internal;
+import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.SequencedMessage;
import java.io.Closeable;
import java.util.Optional;
-import java.util.concurrent.Future;
import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
public interface BlockingPullSubscriber extends Closeable {
/**
- * Returns a {@link Future} that will be completed when there are messages available. Unfinished
- * existing {@link Future} returned by onData() will be set with {@link InterruptedException} and
- * superseded by new onData() call.
+ * Returns a {@link ApiFuture} that will be completed when there are messages available.
+ * Unfinished existing {@link ApiFuture} returned by onData() will be abandoned and superseded by
+ * new onData() call.
*
*
{@link CheckedApiException} will be set to the Future if there is underlying permanent
* error.
*/
- Future onData();
+ ApiFuture onData();
/**
* Pull messages if there is any ready to deliver. Any message will only be delivered to one call
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java
index 10b332742..88ca71760 100644
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java
@@ -16,10 +16,12 @@
package com.google.cloud.pubsublite.internal;
+import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
@@ -34,7 +36,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
public class BlockingPullSubscriberImpl implements BlockingPullSubscriber {
@@ -91,20 +92,16 @@ private synchronized void addMessages(Collection new_messages)
}
@Override
- public synchronized Future onData() {
- if (notification.isPresent()) {
- notification
- .get()
- .setException(new InterruptedException("Interruped and superseded by newer onData call"));
- notification = Optional.empty();
- }
+ public synchronized ApiFuture onData() {
if (error.isPresent()) {
return ApiFutures.immediateFailedFuture(error.get());
}
if (!messages.isEmpty()) {
return ApiFutures.immediateFuture(null);
}
- notification = Optional.of(SettableApiFuture.create());
+ if (!notification.isPresent()) {
+ notification = Optional.of(SettableApiFuture.create());
+ }
return notification.get();
}
@@ -113,14 +110,22 @@ public synchronized Optional messageIfAvailable() throws Check
if (error.isPresent()) {
throw error.get();
}
- if (!messages.isEmpty()) {
- return Optional.of(Objects.requireNonNull(messages.pollFirst()));
+ if (messages.isEmpty()) {
+ return Optional.empty();
}
- return Optional.empty();
+ return Optional.of(Objects.requireNonNull(messages.pollFirst()));
}
@Override
public void close() {
+ synchronized (this) {
+ if (!error.isPresent()) {
+ error =
+ Optional.of(
+ new CheckedApiException(
+ "Subscriber client shut down", StatusCode.Code.UNAVAILABLE));
+ }
+ }
underlying.stopAsync().awaitTerminated();
}
}
diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java
index 363b58ee4..3801e251f 100644
--- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java
+++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java
@@ -18,7 +18,6 @@
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
@@ -129,18 +128,13 @@ public void onDataAfterErrorThrows() {
@Test
public void onDataBeforeErrorThrows() throws Exception {
CheckedApiException expected = new CheckedApiException(StatusCode.Code.INTERNAL);
- Future> future =
- executorService.submit(
- () -> {
- ExecutionException e =
- assertThrows(ExecutionException.class, () -> subscriber.onData().get());
- assertThat(expected).isEqualTo(e.getCause());
- });
+ Future> future = subscriber.onData();
Thread.sleep(1000);
assertThat(future.isDone()).isFalse();
errorListener.failed(null, expected);
- future.get();
+ ExecutionException e = assertThrows(ExecutionException.class, future::get);
+ assertThat(expected).isEqualTo(e.getCause());
}
@Test
@@ -152,14 +146,6 @@ public void onDataSuccess() throws Exception {
future.get();
}
- @Test
- public void onDataMultiCalls() {
- Future> future1 = subscriber.onData();
- Future> future2 = subscriber.onData();
- ExecutionException e = assertThrows(ExecutionException.class, () -> future1.get());
- assertThat(e.getCause()).isInstanceOf(InterruptedException.class);
- }
-
@Test
public void pullMessage() throws Exception {
SequencedMessage message =
@@ -177,12 +163,9 @@ public void pullMessageNoMessage() throws Exception {
public void pullMessageWhenError() {
CheckedApiException expected = new CheckedApiException(StatusCode.Code.INTERNAL);
errorListener.failed(null, expected);
- try {
- subscriber.messageIfAvailable();
- fail();
- } catch (CheckedApiException e) {
- assertThat(expected).isEqualTo(e);
- }
+ CheckedApiException e =
+ assertThrows(CheckedApiException.class, () -> subscriber.messageIfAvailable());
+ assertThat(expected).isEqualTo(e);
}
@Test
@@ -193,14 +176,13 @@ public void pullMessagePrioritizeErrorOverExistingMessage() {
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(12), 30);
messageConsumer.accept(ImmutableList.of(message));
- try {
- subscriber.messageIfAvailable();
- fail();
- } catch (CheckedApiException e) {
- assertThat(expected).isEqualTo(e);
- }
+ CheckedApiException e =
+ assertThrows(CheckedApiException.class, () -> subscriber.messageIfAvailable());
+ assertThat(expected).isEqualTo(e);
}
+ // Not guaranteed to fail if subscriber is not thread safe, investigate if this becomes
+ // flaky.
@Test
public void onlyOneMessageDeliveredWhenMultiCalls() throws Exception {
SequencedMessage message =
diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java
index 8baa0d907..420bf5e95 100644
--- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java
+++ b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java
@@ -19,10 +19,8 @@
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.common.flogger.GoogleLogger;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
@@ -66,8 +64,8 @@ public boolean next() {
.offset(currentMsg.offset().value())
.build();
return true;
- } catch (InterruptedException | CheckedApiException | ExecutionException e) {
- throw new IllegalStateException("Failed to retrieve messages.", e);
+ } catch (Throwable t) {
+ throw new IllegalStateException("Failed to retrieve messages.", t);
}
}
diff --git a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java
index aa9f5db09..ad3527a03 100644
--- a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java
+++ b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java
@@ -19,10 +19,10 @@
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.*;
+import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
-import com.google.common.util.concurrent.Futures;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.util.Optional;
@@ -71,7 +71,7 @@ public void testPartitionReader() throws Exception {
UnitTestExamples.examplePartition());
// Multiple get w/o next will return same msg.
- when(subscriber.onData()).thenReturn(Futures.immediateVoidFuture());
+ when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1));
assertThat(reader.next()).isTrue();
assertThat(reader.get()).isEqualTo(expectedRow1);
@@ -79,7 +79,7 @@ public void testPartitionReader() throws Exception {
assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(10L);
// Next will advance to next msg.
- when(subscriber.onData()).thenReturn(Futures.immediateVoidFuture());
+ when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2));
assertThat(reader.next()).isTrue();
assertThat(reader.get()).isEqualTo(expectedRow2);