Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Dec 14, 2020
1 parent 31e9306 commit 596ae1a
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 53 deletions.
Expand Up @@ -16,24 +16,24 @@

package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.SequencedMessage;
import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.Future;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public interface BlockingPullSubscriber extends Closeable {

/**
* Returns a {@link Future} that will be completed when there are messages available. Unfinished
* existing {@link Future} returned by onData() will be set with {@link InterruptedException} and
* superseded by new onData() call.
* Returns a {@link ApiFuture} that will be completed when there are messages available.
* Unfinished existing {@link ApiFuture} returned by onData() will be abandoned and superseded by
* new onData() call.
*
* <p>{@link CheckedApiException} will be set to the Future if there is underlying permanent
* error.
*/
Future<Void> onData();
ApiFuture<Void> onData();

/**
* Pull messages if there is any ready to deliver. Any message will only be delivered to one call
Expand Down
Expand Up @@ -16,10 +16,12 @@

package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
Expand All @@ -34,7 +36,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class BlockingPullSubscriberImpl implements BlockingPullSubscriber {

Expand Down Expand Up @@ -91,20 +92,16 @@ private synchronized void addMessages(Collection<SequencedMessage> new_messages)
}

@Override
public synchronized Future<Void> onData() {
if (notification.isPresent()) {
notification
.get()
.setException(new InterruptedException("Interruped and superseded by newer onData call"));
notification = Optional.empty();
}
public synchronized ApiFuture<Void> onData() {
if (error.isPresent()) {
return ApiFutures.immediateFailedFuture(error.get());
}
if (!messages.isEmpty()) {
return ApiFutures.immediateFuture(null);
}
notification = Optional.of(SettableApiFuture.create());
if (!notification.isPresent()) {
notification = Optional.of(SettableApiFuture.create());
}
return notification.get();
}

Expand All @@ -113,14 +110,22 @@ public synchronized Optional<SequencedMessage> messageIfAvailable() throws Check
if (error.isPresent()) {
throw error.get();
}
if (!messages.isEmpty()) {
return Optional.of(Objects.requireNonNull(messages.pollFirst()));
if (messages.isEmpty()) {
return Optional.empty();
}
return Optional.empty();
return Optional.of(Objects.requireNonNull(messages.pollFirst()));
}

@Override
public void close() {
synchronized (this) {
if (!error.isPresent()) {
error =
Optional.of(
new CheckedApiException(
"Subscriber client shut down", StatusCode.Code.UNAVAILABLE));
}
}
underlying.stopAsync().awaitTerminated();
}
}
Expand Up @@ -18,7 +18,6 @@

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
Expand Down Expand Up @@ -129,18 +128,13 @@ public void onDataAfterErrorThrows() {
@Test
public void onDataBeforeErrorThrows() throws Exception {
CheckedApiException expected = new CheckedApiException(StatusCode.Code.INTERNAL);
Future<?> future =
executorService.submit(
() -> {
ExecutionException e =
assertThrows(ExecutionException.class, () -> subscriber.onData().get());
assertThat(expected).isEqualTo(e.getCause());
});
Future<?> future = subscriber.onData();
Thread.sleep(1000);
assertThat(future.isDone()).isFalse();

errorListener.failed(null, expected);
future.get();
ExecutionException e = assertThrows(ExecutionException.class, future::get);
assertThat(expected).isEqualTo(e.getCause());
}

@Test
Expand All @@ -152,14 +146,6 @@ public void onDataSuccess() throws Exception {
future.get();
}

@Test
public void onDataMultiCalls() {
Future<?> future1 = subscriber.onData();
Future<?> future2 = subscriber.onData();
ExecutionException e = assertThrows(ExecutionException.class, () -> future1.get());
assertThat(e.getCause()).isInstanceOf(InterruptedException.class);
}

@Test
public void pullMessage() throws Exception {
SequencedMessage message =
Expand All @@ -177,12 +163,9 @@ public void pullMessageNoMessage() throws Exception {
public void pullMessageWhenError() {
CheckedApiException expected = new CheckedApiException(StatusCode.Code.INTERNAL);
errorListener.failed(null, expected);
try {
subscriber.messageIfAvailable();
fail();
} catch (CheckedApiException e) {
assertThat(expected).isEqualTo(e);
}
CheckedApiException e =
assertThrows(CheckedApiException.class, () -> subscriber.messageIfAvailable());
assertThat(expected).isEqualTo(e);
}

@Test
Expand All @@ -193,14 +176,13 @@ public void pullMessagePrioritizeErrorOverExistingMessage() {
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(12), 30);
messageConsumer.accept(ImmutableList.of(message));

try {
subscriber.messageIfAvailable();
fail();
} catch (CheckedApiException e) {
assertThat(expected).isEqualTo(e);
}
CheckedApiException e =
assertThrows(CheckedApiException.class, () -> subscriber.messageIfAvailable());
assertThat(expected).isEqualTo(e);
}

// Not guaranteed to fail if subscriber is not thread safe, investigate if this becomes
// flaky.
@Test
public void onlyOneMessageDeliveredWhenMultiCalls() throws Exception {
SequencedMessage message =
Expand Down
Expand Up @@ -19,10 +19,8 @@
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.common.flogger.GoogleLogger;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
Expand Down Expand Up @@ -66,8 +64,8 @@ public boolean next() {
.offset(currentMsg.offset().value())
.build();
return true;
} catch (InterruptedException | CheckedApiException | ExecutionException e) {
throw new IllegalStateException("Failed to retrieve messages.", e);
} catch (Throwable t) {
throw new IllegalStateException("Failed to retrieve messages.", t);
}
}

Expand Down
Expand Up @@ -19,10 +19,10 @@
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.*;

import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.*;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.common.util.concurrent.Futures;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import java.util.Optional;
Expand Down Expand Up @@ -71,15 +71,15 @@ public void testPartitionReader() throws Exception {
UnitTestExamples.examplePartition());

// Multiple get w/o next will return same msg.
when(subscriber.onData()).thenReturn(Futures.immediateVoidFuture());
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1));
assertThat(reader.next()).isTrue();
assertThat(reader.get()).isEqualTo(expectedRow1);
assertThat(reader.get()).isEqualTo(expectedRow1);
assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(10L);

// Next will advance to next msg.
when(subscriber.onData()).thenReturn(Futures.immediateVoidFuture());
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2));
assertThat(reader.next()).isTrue();
assertThat(reader.get()).isEqualTo(expectedRow2);
Expand Down

0 comments on commit 596ae1a

Please sign in to comment.