Skip to content

Commit

Permalink
feat: Set initial location when connecting subscribe streams (#664)
Browse files Browse the repository at this point in the history
- Sets the InitialSubscribeRequest.initial_location field when reconnecting subscribe streams.
- Completes the handling of admin/out of band seeks.
- Removes the initial seek from all subscriber wrapper implementations.
  • Loading branch information
tmdiep committed Jun 8, 2021
1 parent 8f4d176 commit 65ced46
Show file tree
Hide file tree
Showing 16 changed files with 208 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.cloud.pubsublite.v1.CursorServiceClient;
import com.google.cloud.pubsublite.v1.CursorServiceSettings;
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient;
Expand Down Expand Up @@ -244,7 +246,9 @@ Subscriber newPartitionSubscriber(Partition partition) throws CheckedApiExceptio
SubscriberBuilder.newBuilder()
.setPartition(partition)
.setSubscriptionPath(subscriptionPath())
.setServiceClient(newSubscriberServiceClient(partition));
.setServiceClient(newSubscriberServiceClient(partition))
.setInitialLocation(
SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build());

Committer wireCommitter =
CommitterSettings.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void onSuccess(Void result) {

@VisibleForTesting
boolean onSubscriberReset() throws CheckedApiException {
// TODO: handle reset.
return false;
ackSetTracker.waitUntilCommitted();
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

public class BlockingPullSubscriberImpl implements BlockingPullSubscriber {

Expand All @@ -49,8 +47,7 @@ public class BlockingPullSubscriberImpl implements BlockingPullSubscriber {
@GuardedBy("this")
private Optional<SettableApiFuture<Void>> notification = Optional.empty();

public BlockingPullSubscriberImpl(
SubscriberFactory factory, FlowControlSettings settings, SeekRequest initialSeek)
public BlockingPullSubscriberImpl(SubscriberFactory factory, FlowControlSettings settings)
throws CheckedApiException {
underlying = factory.newSubscriber(this::addMessages);
underlying.addListener(
Expand All @@ -62,11 +59,6 @@ public void failed(State state, Throwable throwable) {
},
MoreExecutors.directExecutor());
underlying.startAsync().awaitRunning();
try {
underlying.seek(initialSeek).get();
} catch (InterruptedException | ExecutionException e) {
throw ExtractStatus.toCanonical(e);
}
underlying.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(settings.messagesOutstanding())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -35,7 +33,6 @@
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

public class BufferingPullSubscriber implements PullSubscriber<SequencedMessage> {
private final Subscriber underlying;
Expand All @@ -51,15 +48,6 @@ public class BufferingPullSubscriber implements PullSubscriber<SequencedMessage>

public BufferingPullSubscriber(SubscriberFactory factory, FlowControlSettings settings)
throws CheckedApiException {
this(
factory,
settings,
SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build());
}

public BufferingPullSubscriber(
SubscriberFactory factory, FlowControlSettings settings, SeekRequest initialSeek)
throws CheckedApiException {
underlying = factory.newSubscriber(this::addMessages);
underlying.addListener(
new Listener() {
Expand All @@ -70,11 +58,6 @@ public void failed(State state, Throwable throwable) {
},
MoreExecutors.directExecutor());
underlying.startAsync().awaitRunning();
try {
underlying.seek(initialSeek).get();
} catch (InterruptedException | ExecutionException e) {
throw ExtractStatus.toCanonical(e);
}
underlying.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(settings.messagesOutstanding())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
public interface Subscriber extends ApiService {
// Seek the subscriber using the given SeekRequest. Requires that no seeks are outstanding.
// Returns the seeked-to offset.
//
// Flow control tokens are reset when the seek response is received from the server and should be
// refilled after the future completes.
ApiFuture<Offset> seek(SeekRequest request);
// Whether or not a seek is in flight for this subscriber. If a seek is in flight, any further
// seek requests will result in a permanent error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.common.collect.ImmutableList;
import java.util.function.Consumer;
Expand All @@ -37,6 +38,8 @@ public abstract class SubscriberBuilder {

abstract SubscriberServiceClient serviceClient();

abstract SeekRequest initialLocation();

// Optional parameters.
abstract SubscriberResetHandler resetHandler();

Expand All @@ -57,6 +60,8 @@ public abstract Builder setMessageConsumer(

public abstract Builder setServiceClient(SubscriberServiceClient serviceClient);

public abstract Builder setInitialLocation(SeekRequest initialLocation);

// Optional parameters.
public abstract Builder setResetHandler(SubscriberResetHandler resetHandler);

Expand All @@ -75,6 +80,7 @@ public Subscriber build() throws ApiException {
new SubscriberImpl(
autoBuilt.serviceClient(),
initialSubscribeRequest,
autoBuilt.initialLocation(),
autoBuilt.messageConsumer(),
autoBuilt.resetHandler()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.cloud.pubsublite.proto.SubscribeResponse;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Monitor;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -54,7 +54,7 @@ public class SubscriberImpl extends ProxyService

private final SubscriberResetHandler resetHandler;

private final SubscribeRequest initialRequest;
private final InitialSubscribeRequest baseInitialRequest;

private final CloseableMonitor monitor = new CloseableMonitor();

Expand All @@ -73,7 +73,7 @@ public class SubscriberImpl extends ProxyService
private Optional<InFlightSeek> inFlightSeek = Optional.empty();

@GuardedBy("monitor.monitor")
private boolean internalSeekInFlight = false;
private SeekRequest initialLocation;

@GuardedBy("monitor.monitor")
private boolean shutdown = false;
Expand All @@ -92,28 +92,32 @@ private static class InFlightSeek {
SubscriberImpl(
StreamFactory<SubscribeRequest, SubscribeResponse> streamFactory,
ConnectedSubscriberFactory factory,
InitialSubscribeRequest initialRequest,
InitialSubscribeRequest baseInitialRequest,
SeekRequest initialLocation,
Consumer<ImmutableList<SequencedMessage>> messageConsumer,
SubscriberResetHandler resetHandler)
throws ApiException {
this.messageConsumer = messageConsumer;
this.resetHandler = resetHandler;
this.initialRequest = SubscribeRequest.newBuilder().setInitial(initialRequest).build();
this.baseInitialRequest = baseInitialRequest;
this.initialLocation = initialLocation;
this.connection =
new RetryingConnectionImpl<>(streamFactory, factory, this, this.initialRequest);
new RetryingConnectionImpl<>(streamFactory, factory, this, getInitialRequest());
addServices(this.connection);
}

public SubscriberImpl(
SubscriberServiceClient client,
InitialSubscribeRequest initialRequest,
InitialSubscribeRequest baseInitialRequest,
SeekRequest initialLocation,
Consumer<ImmutableList<SequencedMessage>> messageConsumer,
SubscriberResetHandler resetHandler)
throws ApiException {
this(
stream -> client.subscribeCallable().splitCall(stream),
new ConnectedSubscriberImpl.Factory(),
initialRequest,
baseInitialRequest,
initialLocation,
messageConsumer,
resetHandler);
addServices(backgroundResourceAsApiService(client));
Expand Down Expand Up @@ -157,24 +161,18 @@ protected void stop() {

@Override
public ApiFuture<Offset> seek(SeekRequest request) {
try (CloseableMonitor.Hold h =
monitor.enterWhenUninterruptibly(
new Monitor.Guard(monitor.monitor) {
@Override
public boolean isSatisfied() {
return !internalSeekInFlight || shutdown;
}
})) {
try (CloseableMonitor.Hold h = monitor.enter()) {
checkArgument(
Predicates.isValidSeekRequest(request), "Sent SeekRequest with no location set.");
checkState(!shutdown, "Seeked after the stream shut down.");
checkState(!inFlightSeek.isPresent(), "Seeked while seek is already in flight.");
SettableApiFuture<Offset> future = SettableApiFuture.create();
inFlightSeek = Optional.of(new InFlightSeek(request, future));
flowControlBatcher.onClientSeek();
connection.modifyConnection(
connectedSubscriber ->
connectedSubscriber.ifPresent(subscriber -> subscriber.seek(request)));
// Note: next offset and flow control tokens should be reset upon seek response. Pre-seek
// messages may still be received until the server receives the seek request.
return future;
} catch (CheckedApiException e) {
onPermanentError(e);
Expand Down Expand Up @@ -202,6 +200,18 @@ public void allowFlow(FlowControlRequest clientRequest) throws CheckedApiExcepti
}
}

private SubscribeRequest getInitialRequest() {
try (CloseableMonitor.Hold h = monitor.enter()) {
return SubscribeRequest.newBuilder()
.setInitial(
baseInitialRequest
.toBuilder()
.setInitialLocation(
nextOffsetTracker.requestForRestart().orElse(initialLocation)))
.build();
}
}

public void reset() {
try (CloseableMonitor.Hold h = monitor.enter()) {
if (shutdown) return;
Expand All @@ -211,6 +221,8 @@ public void reset() {
new CheckedApiException("Aborted due to out of band seek.", Code.ABORTED)));
inFlightSeek = Optional.empty();
nextOffsetTracker.reset();
initialLocation =
SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build();
}
}

Expand All @@ -230,25 +242,21 @@ public void triggerReinitialize(CheckedApiException streamError) {

try (CloseableMonitor.Hold h = monitor.enter()) {
if (shutdown) return;
connection.reinitialize(initialRequest);
connection.reinitialize(getInitialRequest());
connection.modifyConnection(
connectedSubscriber -> {
checkArgument(monitor.monitor.isOccupiedByCurrentThread());
checkArgument(connectedSubscriber.isPresent());
if (inFlightSeek.isPresent()) {
connectedSubscriber.get().seek(inFlightSeek.get().seekRequest);
} else {
nextOffsetTracker
// Flow control tokens should be cleared after the seek response is received, thus
// they are not sent after the subscribe stream is reconnected when there is an
// in-flight seek.
flowControlBatcher
.requestForRestart()
.ifPresent(
request -> {
internalSeekInFlight = true;
connectedSubscriber.get().seek(request);
});
.ifPresent(request -> connectedSubscriber.get().allowFlow(request));
}
flowControlBatcher
.requestForRestart()
.ifPresent(request -> connectedSubscriber.get().allowFlow(request));
});
} catch (CheckedApiException e) {
onPermanentError(e);
Expand Down Expand Up @@ -282,12 +290,9 @@ private void onMessageResponse(ImmutableList<SequencedMessage> messages)
private void onSeekResponse(Offset seekOffset) throws CheckedApiException {
try (CloseableMonitor.Hold h = monitor.enter()) {
if (shutdown) return;
if (internalSeekInFlight) {
internalSeekInFlight = false;
return;
}
checkState(inFlightSeek.isPresent(), "No in flight seek, but received a seek response.");
nextOffsetTracker.onClientSeek(seekOffset);
flowControlBatcher.onClientSeek();
inFlightSeek.get().seekFuture.set(seekOffset);
inFlightSeek = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ public void singleMessageNackHandlerFailedFuture() throws CheckedApiException {
}

@Test
public void onSubscriberResetNotHandled() throws CheckedApiException {
assertThat(subscriber.onSubscriberReset()).isFalse();
public void onSubscriberResetWaitsForAckSetTracker() throws CheckedApiException {
assertThat(subscriber.onSubscriberReset()).isTrue();
verify(ackSetTracker).waitUntilCommitted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Message;
Expand All @@ -34,9 +33,7 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.util.Timestamps;
import java.util.ArrayList;
Expand All @@ -53,11 +50,6 @@
public class BlockingPullSubscriberImplTest {
private final SubscriberFactory underlyingFactory = mock(SubscriberFactory.class);
private final Subscriber underlying = mock(Subscriber.class);
private final Offset initialOffset = Offset.of(5);
private final SeekRequest initialSeek =
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(initialOffset.value()))
.build();
private final FlowControlSettings flowControlSettings =
FlowControlSettings.builder().setBytesOutstanding(10).setMessagesOutstanding(20).build();
// Initialized in setUp.
Expand All @@ -69,11 +61,6 @@ public class BlockingPullSubscriberImplTest {
@Before
public void setUp() throws Exception {
when(underlying.startAsync()).thenReturn(underlying);
SeekRequest seek =
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(initialOffset.value()).build())
.build();
when(underlying.seek(seek)).thenReturn(ApiFutures.immediateFuture(initialOffset));
FlowControlRequest flow =
FlowControlRequest.newBuilder()
.setAllowedBytes(flowControlSettings.bytesOutstanding())
Expand All @@ -94,15 +81,13 @@ public void setUp() throws Exception {
.when(underlying)
.addListener(any(), any());

subscriber =
new BlockingPullSubscriberImpl(underlyingFactory, flowControlSettings, initialSeek);
subscriber = new BlockingPullSubscriberImpl(underlyingFactory, flowControlSettings);

InOrder inOrder = inOrder(underlyingFactory, underlying);
inOrder.verify(underlyingFactory).newSubscriber(any());
inOrder.verify(underlying).addListener(any(), any());
inOrder.verify(underlying).startAsync();
inOrder.verify(underlying).awaitRunning();
inOrder.verify(underlying).seek(seek);
inOrder.verify(underlying).allowFlow(flow);

assertThat(messageConsumer).isNotNull();
Expand Down
Loading

0 comments on commit 65ced46

Please sign in to comment.