Skip to content

Commit

Permalink
fix: Fix BufferingPullSubscriber to not seek after sending flow contr…
Browse files Browse the repository at this point in the history
…ol tokens. (#253)

Seeking after sending tokens will discard any flow control tokens sent to the server. Also, update BufferingPullSubscriber to accept arbitrary initial seek requests.
  • Loading branch information
dpcollins-google committed Sep 24, 2020
1 parent eabe900 commit 0e20d80
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@

import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.StatusException;
Expand All @@ -42,6 +41,15 @@ public class BufferingPullSubscriber implements PullSubscriber<SequencedMessage>

public BufferingPullSubscriber(SubscriberFactory factory, FlowControlSettings settings)
throws StatusException {
this(
factory,
settings,
SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build());
}

public BufferingPullSubscriber(
SubscriberFactory factory, FlowControlSettings settings, SeekRequest initialSeek)
throws StatusException {
underlying = factory.New(messages::addAll);
underlying.addListener(
new Listener() {
Expand All @@ -52,29 +60,18 @@ public void failed(State state, Throwable throwable) {
},
MoreExecutors.directExecutor());
underlying.startAsync().awaitRunning();
underlying.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(settings.messagesOutstanding())
.setAllowedBytes(settings.bytesOutstanding())
.build());
}

public BufferingPullSubscriber(
SubscriberFactory factory, FlowControlSettings settings, Offset initialLocation)
throws StatusException {
this(factory, settings);
try {
underlying
.seek(
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(initialLocation.value()))
.build())
.get();
underlying.seek(initialSeek).get();
} catch (InterruptedException e) {
throw ExtractStatus.toCanonical(e);
} catch (ExecutionException e) {
throw ExtractStatus.toCanonical(e.getCause());
}
underlying.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(settings.messagesOutstanding())
.setAllowedBytes(settings.bytesOutstanding())
.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public class BufferingPullSubscriberTest {
private final SubscriberFactory underlyingFactory = mock(SubscriberFactory.class);
private final Subscriber underlying = mock(Subscriber.class);
private final Offset initialOffset = Offset.of(5);
private final SeekRequest initialSeek =
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(initialOffset.value()))
.build();
private final FlowControlSettings flowControlSettings =
((Supplier<FlowControlSettings>)
() -> {
Expand Down Expand Up @@ -102,15 +106,15 @@ public void setUp() throws Exception {
.when(underlying)
.addListener(any(), any());

subscriber = new BufferingPullSubscriber(underlyingFactory, flowControlSettings, initialOffset);
subscriber = new BufferingPullSubscriber(underlyingFactory, flowControlSettings, initialSeek);

InOrder inOrder = inOrder(underlyingFactory, underlying);
inOrder.verify(underlyingFactory).New(any());
inOrder.verify(underlying).addListener(any(), any());
inOrder.verify(underlying).startAsync();
inOrder.verify(underlying).awaitRunning();
inOrder.verify(underlying).allowFlow(flow);
inOrder.verify(underlying).seek(seek);
inOrder.verify(underlying).allowFlow(flow);

assertThat(messageConsumer).isNotNull();
assertThat(errorListener).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.cloud.pubsublite.internal.BufferingPullSubscriber;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -87,7 +89,9 @@ public UnboundedReader<SequencedMessage> createReader(
new BufferingPullSubscriber(
subscriberFactories.get(partition),
subscriberOptions.flowControlSettings(),
checkpointed);
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(checkpointed.value()))
.build());
} else {
state.subscriber =
new BufferingPullSubscriber(
Expand Down

0 comments on commit 0e20d80

Please sign in to comment.