From 596ae1a2ed576f1d72738c1291b198cf3276a8c1 Mon Sep 17 00:00:00 2001 From: Michael Jiang Date: Mon, 14 Dec 2020 16:57:15 -0500 Subject: [PATCH] update --- .../internal/BlockingPullSubscriber.java | 10 ++--- .../internal/BlockingPullSubscriberImpl.java | 29 ++++++++------ .../BlockingPullSubscriberImplTest.java | 40 +++++-------------- .../PslContinuousInputPartitionReader.java | 6 +-- ...PslContinuousInputPartitionReaderTest.java | 6 +-- 5 files changed, 38 insertions(+), 53 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriber.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriber.java index dea02a581..7e03360d4 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriber.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriber.java @@ -16,24 +16,24 @@ package com.google.cloud.pubsublite.internal; +import com.google.api.core.ApiFuture; import com.google.cloud.pubsublite.SequencedMessage; import java.io.Closeable; import java.util.Optional; -import java.util.concurrent.Future; import javax.annotation.concurrent.ThreadSafe; @ThreadSafe public interface BlockingPullSubscriber extends Closeable { /** - * Returns a {@link Future} that will be completed when there are messages available. Unfinished - * existing {@link Future} returned by onData() will be set with {@link InterruptedException} and - * superseded by new onData() call. + * Returns a {@link ApiFuture} that will be completed when there are messages available. + * Unfinished existing {@link ApiFuture} returned by onData() will be abandoned and superseded by + * new onData() call. * *

{@link CheckedApiException} will be set to the Future if there is underlying permanent * error. */ - Future onData(); + ApiFuture onData(); /** * Pull messages if there is any ready to deliver. Any message will only be delivered to one call diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java index 10b332742..88ca71760 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java @@ -16,10 +16,12 @@ package com.google.cloud.pubsublite.internal; +import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.ApiService.Listener; import com.google.api.core.ApiService.State; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.StatusCode; import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; import com.google.cloud.pubsublite.internal.wire.Subscriber; @@ -34,7 +36,6 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; public class BlockingPullSubscriberImpl implements BlockingPullSubscriber { @@ -91,20 +92,16 @@ private synchronized void addMessages(Collection new_messages) } @Override - public synchronized Future onData() { - if (notification.isPresent()) { - notification - .get() - .setException(new InterruptedException("Interruped and superseded by newer onData call")); - notification = Optional.empty(); - } + public synchronized ApiFuture onData() { if (error.isPresent()) { return ApiFutures.immediateFailedFuture(error.get()); } if (!messages.isEmpty()) { return ApiFutures.immediateFuture(null); } - notification = Optional.of(SettableApiFuture.create()); + if (!notification.isPresent()) { + notification = Optional.of(SettableApiFuture.create()); + } return notification.get(); } @@ -113,14 +110,22 @@ public synchronized Optional messageIfAvailable() throws Check if (error.isPresent()) { throw error.get(); } - if (!messages.isEmpty()) { - return Optional.of(Objects.requireNonNull(messages.pollFirst())); + if (messages.isEmpty()) { + return Optional.empty(); } - return Optional.empty(); + return Optional.of(Objects.requireNonNull(messages.pollFirst())); } @Override public void close() { + synchronized (this) { + if (!error.isPresent()) { + error = + Optional.of( + new CheckedApiException( + "Subscriber client shut down", StatusCode.Code.UNAVAILABLE)); + } + } underlying.stopAsync().awaitTerminated(); } } diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java index 363b58ee4..3801e251f 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImplTest.java @@ -18,7 +18,6 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.inOrder; @@ -129,18 +128,13 @@ public void onDataAfterErrorThrows() { @Test public void onDataBeforeErrorThrows() throws Exception { CheckedApiException expected = new CheckedApiException(StatusCode.Code.INTERNAL); - Future future = - executorService.submit( - () -> { - ExecutionException e = - assertThrows(ExecutionException.class, () -> subscriber.onData().get()); - assertThat(expected).isEqualTo(e.getCause()); - }); + Future future = subscriber.onData(); Thread.sleep(1000); assertThat(future.isDone()).isFalse(); errorListener.failed(null, expected); - future.get(); + ExecutionException e = assertThrows(ExecutionException.class, future::get); + assertThat(expected).isEqualTo(e.getCause()); } @Test @@ -152,14 +146,6 @@ public void onDataSuccess() throws Exception { future.get(); } - @Test - public void onDataMultiCalls() { - Future future1 = subscriber.onData(); - Future future2 = subscriber.onData(); - ExecutionException e = assertThrows(ExecutionException.class, () -> future1.get()); - assertThat(e.getCause()).isInstanceOf(InterruptedException.class); - } - @Test public void pullMessage() throws Exception { SequencedMessage message = @@ -177,12 +163,9 @@ public void pullMessageNoMessage() throws Exception { public void pullMessageWhenError() { CheckedApiException expected = new CheckedApiException(StatusCode.Code.INTERNAL); errorListener.failed(null, expected); - try { - subscriber.messageIfAvailable(); - fail(); - } catch (CheckedApiException e) { - assertThat(expected).isEqualTo(e); - } + CheckedApiException e = + assertThrows(CheckedApiException.class, () -> subscriber.messageIfAvailable()); + assertThat(expected).isEqualTo(e); } @Test @@ -193,14 +176,13 @@ public void pullMessagePrioritizeErrorOverExistingMessage() { SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(12), 30); messageConsumer.accept(ImmutableList.of(message)); - try { - subscriber.messageIfAvailable(); - fail(); - } catch (CheckedApiException e) { - assertThat(expected).isEqualTo(e); - } + CheckedApiException e = + assertThrows(CheckedApiException.class, () -> subscriber.messageIfAvailable()); + assertThat(expected).isEqualTo(e); } + // Not guaranteed to fail if subscriber is not thread safe, investigate if this becomes + // flaky. @Test public void onlyOneMessageDeliveredWhenMultiCalls() throws Exception { SequencedMessage message = diff --git a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java index 8baa0d907..420bf5e95 100644 --- a/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java +++ b/pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReader.java @@ -19,10 +19,8 @@ import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; -import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.common.flogger.GoogleLogger; import java.util.Optional; -import java.util.concurrent.ExecutionException; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader; import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset; @@ -66,8 +64,8 @@ public boolean next() { .offset(currentMsg.offset().value()) .build(); return true; - } catch (InterruptedException | CheckedApiException | ExecutionException e) { - throw new IllegalStateException("Failed to retrieve messages.", e); + } catch (Throwable t) { + throw new IllegalStateException("Failed to retrieve messages.", t); } } diff --git a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java index aa9f5db09..ad3527a03 100644 --- a/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java +++ b/pubsublite-spark-sql-streaming/src/test/java/com/google/cloud/pubsublite/spark/PslContinuousInputPartitionReaderTest.java @@ -19,10 +19,10 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.Mockito.*; +import com.google.api.core.ApiFutures; import com.google.cloud.pubsublite.*; import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl; import com.google.cloud.pubsublite.internal.testing.UnitTestExamples; -import com.google.common.util.concurrent.Futures; import com.google.protobuf.ByteString; import com.google.protobuf.util.Timestamps; import java.util.Optional; @@ -71,7 +71,7 @@ public void testPartitionReader() throws Exception { UnitTestExamples.examplePartition()); // Multiple get w/o next will return same msg. - when(subscriber.onData()).thenReturn(Futures.immediateVoidFuture()); + when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1)); assertThat(reader.next()).isTrue(); assertThat(reader.get()).isEqualTo(expectedRow1); @@ -79,7 +79,7 @@ public void testPartitionReader() throws Exception { assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(10L); // Next will advance to next msg. - when(subscriber.onData()).thenReturn(Futures.immediateVoidFuture()); + when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null)); when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2)); assertThat(reader.next()).isTrue(); assertThat(reader.get()).isEqualTo(expectedRow2);