Skip to content

Commit

Permalink
feat: Add nextOffset method to BufferingPullSubscriber (#272)
Browse files Browse the repository at this point in the history
* feat: Add nextOffset method to BufferingPullSubscriber

* chore: add newline to EOF

* chore: Update PullSubscriber docs.
  • Loading branch information
dpcollins-google committed Oct 5, 2020
1 parent d72bea8 commit 5c0e7cc
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,37 @@

import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

public class BufferingPullSubscriber implements PullSubscriber<SequencedMessage> {
private final Subscriber underlying;
private final AtomicReference<StatusException> error = new AtomicReference<>();
private final LinkedBlockingQueue<SequencedMessage> messages = new LinkedBlockingQueue<>();

@GuardedBy("this")
private Optional<StatusException> error = Optional.empty();

@GuardedBy("this")
private Deque<SequencedMessage> messages = new ArrayDeque<>();

@GuardedBy("this")
private Optional<Offset> lastDelivered = Optional.empty();

public BufferingPullSubscriber(SubscriberFactory factory, FlowControlSettings settings)
throws StatusException {
Expand All @@ -50,12 +61,12 @@ public BufferingPullSubscriber(SubscriberFactory factory, FlowControlSettings se
public BufferingPullSubscriber(
SubscriberFactory factory, FlowControlSettings settings, SeekRequest initialSeek)
throws StatusException {
underlying = factory.New(messages::addAll);
underlying = factory.New(this::addMessages);
underlying.addListener(
new Listener() {
@Override
public void failed(State state, Throwable throwable) {
error.set(ExtractStatus.toCanonical(throwable));
fail(ExtractStatus.toCanonical(throwable));
}
},
MoreExecutors.directExecutor());
Expand All @@ -74,21 +85,37 @@ public void failed(State state, Throwable throwable) {
.build());
}

private synchronized void fail(StatusException e) {
error = Optional.of(e);
}

private synchronized void addMessages(Collection<SequencedMessage> new_messages) {
messages.addAll(new_messages);
}

@Override
public List<SequencedMessage> pull() throws StatusException {
@Nullable StatusException maybeError = error.get();
if (maybeError != null) {
throw maybeError;
public synchronized List<SequencedMessage> pull() throws StatusException {
if (error.isPresent()) {
throw error.get();
}
if (messages.isEmpty()) {
return ImmutableList.of();
}
ArrayList<SequencedMessage> collection = new ArrayList<>();
messages.drainTo(collection);
Deque<SequencedMessage> collection = messages;
messages = new ArrayDeque<>();
long bytes = collection.stream().mapToLong(SequencedMessage::byteSize).sum();
underlying.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedBytes(bytes)
.setAllowedMessages(collection.size())
.build());
return collection;
lastDelivered = Optional.of(Iterables.getLast(collection).offset());
return ImmutableList.copyOf(collection);
}

@Override
public synchronized Optional<Offset> nextOffset() {
return lastDelivered.map(offset -> Offset.of(offset.value() + 1));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@

package com.google.cloud.pubsublite.internal;

import com.google.cloud.pubsublite.Offset;
import io.grpc.StatusException;
import java.util.List;
import java.util.Optional;

// A PullSubscriber exposes a "pull" mechanism for retrieving messages.
public interface PullSubscriber<T> extends AutoCloseable {
// Pull currently available messages from this subscriber. Does not block.
List<T> pull() throws StatusException;

// The next offset expected to be returned by this PullSubscriber, or empty if unknown.
// Subsequent messages are guaranteed to have offsets of at least this value.
Optional<Offset> nextOffset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,11 @@ public void multipleBatchesAggregatedReturnsTokens() throws StatusException {
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(11), 20);
SequencedMessage message3 =
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(12), 30);
assertThat(subscriber.nextOffset()).isEmpty();
messageConsumer.accept(ImmutableList.of(message1, message2));
messageConsumer.accept(ImmutableList.of(message3));
assertThat(subscriber.pull()).containsExactly(message1, message2, message3);
assertThat(subscriber.nextOffset()).hasValue(Offset.of(13));
assertThat(subscriber.pull()).isEmpty();

FlowControlRequest flowControlRequest =
Expand Down

0 comments on commit 5c0e7cc

Please sign in to comment.