Skip to content

Commit

Permalink
pubsub: enable streaming pull
Browse files Browse the repository at this point in the history
Moving this to a branch.
If nothing else, we should perf test this before
declaring it release-ready.

This commit re-enables streaming pull.
Unlike previous implementation,
it does not fall back to polling if streaming is unavailable,
since the streaming pull endpoint should be working
by the time this is released.

This commit fixes a bug deadline modification code.
Previously we record an Instant we receive a message,
then call Instant::getNano to get the time in nanoseconds.
This is incorrect since getNano returns the nanosecond
from the beginning of that second, not since an epoch.
The fix is to simply save the time since epoch instead of Instant.

This commit also slightly changes how errors are logged,
so that errors that occur after the service is being shut down
don't spam the console.
  • Loading branch information
pongad committed Jul 3, 2017
1 parent 2a92207 commit 315e58a
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,13 @@ private class AckHandler implements FutureCallback<AckReply> {
private final String ackId;
private final int outstandingBytes;
private final AtomicBoolean acked;
private final Instant receivedTime;
private final long receivedTimeMillis;

AckHandler(String ackId, int outstandingBytes) {
this.ackId = ackId;
this.outstandingBytes = outstandingBytes;
acked = new AtomicBoolean(false);
receivedTime = Instant.ofEpochMilli(clock.millisTime());
receivedTimeMillis = clock.millisTime();
}

@Override
Expand Down Expand Up @@ -207,7 +207,6 @@ public void onSuccess(AckReply reply) {
pendingAcks.add(ackId);
}
// Record the latency rounded to the next closest integer.
long receivedTimeMillis = TimeUnit.NANOSECONDS.toMillis(receivedTime.getNano());
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriberGrpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ClientResponseObserver;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -55,7 +52,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements

private Duration channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;

private final Channel channel;
private final SubscriberStub asyncStub;

private final String subscription;
private final ScheduledExecutorService executor;
Expand All @@ -69,14 +66,14 @@ public StreamingSubscriberConnection(
Duration maxAckExtensionPeriod,
int streamAckDeadlineSeconds,
Distribution ackLatencyDistribution,
Channel channel,
SubscriberStub asyncStub,
FlowController flowController,
ScheduledExecutorService executor,
@Nullable ScheduledExecutorService alarmsExecutor,
ApiClock clock) {
this.subscription = subscription;
this.executor = executor;
this.channel = channel;
this.asyncStub = asyncStub;
this.messageDispatcher =
new MessageDispatcher(
receiver,
Expand All @@ -101,8 +98,8 @@ protected void doStart() {
@Override
protected void doStop() {
messageDispatcher.stop();
notifyStopped();
requestObserver.onError(Status.CANCELLED.asException());
notifyStopped();
}

private class StreamingPullResponseObserver
Expand Down Expand Up @@ -137,7 +134,6 @@ public void run() {

@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Terminated streaming with exception", t);
errorFuture.setException(t);
}

Expand All @@ -154,9 +150,7 @@ private void initialize() {
new StreamingPullResponseObserver(errorFuture);
final ClientCallStreamObserver<StreamingPullRequest> requestObserver =
(ClientCallStreamObserver<StreamingPullRequest>)
(ClientCalls.asyncBidiStreamingCall(
channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, CallOptions.DEFAULT),
responseObserver));
(asyncStub.streamingPull(responseObserver));
logger.log(
Level.FINER,
"Initializing stream to subscription {0} with deadline {1}",
Expand All @@ -173,6 +167,9 @@ private void initialize() {
new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
if (!isAlive()) {
return;
}
channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
// The stream was closed. And any case we want to reopen it to continue receiving
// messages.
Expand All @@ -186,6 +183,7 @@ public void onFailure(Throwable cause) {
logger.log(Level.FINE, "pull failure after service no longer running", cause);
return;
}
logger.log(Level.WARNING, "Terminated streaming with exception", cause);
if (StatusUtil.isRetryable(cause)) {
long backoffMillis = channelReconnectBackoff.toMillis();
channelReconnectBackoff = channelReconnectBackoff.plusMillis(backoffMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import com.google.pubsub.v1.GetSubscriptionRequest;
import com.google.pubsub.v1.SubscriberGrpc;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.auth.MoreCallCredentials;
import java.io.IOException;
Expand Down Expand Up @@ -257,22 +259,25 @@ public void close() {
// same executor, it will deadlock: the thread will be stuck waiting for connections
// to start but cannot start the connections.
// For this reason, we spawn a dedicated thread. Starting subscriber should be rare.
new Thread(new Runnable() {
@Override
public void run() {
try {
startPollingConnections();
notifyStarted();
} catch (Throwable t) {
notifyFailed(t);
}
}
}).start();
new Thread(
new Runnable() {
@Override
public void run() {
try {
// startPollingConnections();
startStreamingConnections();
notifyStarted();
} catch (Throwable t) {
notifyFailed(t);
}
}
})
.start();
}

@Override
protected void doStop() {
// stopAllStreamingConnections();
stopAllStreamingConnections();
stopAllPollingConnections();
try {
for (AutoCloseable closeable : closeables) {
Expand All @@ -284,6 +289,94 @@ protected void doStop() {
}
}

private void startStreamingConnections() throws IOException {
synchronized (streamingSubscriberConnections) {
Credentials credentials = credentialsProvider.getCredentials();
CallCredentials callCredentials =
credentials == null ? null : MoreCallCredentials.from(credentials);

for (Channel channel : channels) {
SubscriberStub stub = SubscriberGrpc.newStub(channel);
if (callCredentials != null) {
stub = stub.withCallCredentials(callCredentials);
}
streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
cachedSubscriptionNameString,
receiver,
ackExpirationPadding,
maxAckExtensionPeriod,
streamAckDeadlineSeconds,
ackLatencyDistribution,
stub,
flowController,
executor,
alarmsExecutor,
clock));
}
startConnections(
streamingSubscriberConnections,
new Listener() {
@Override
public void failed(State from, Throwable failure) {
// If a connection failed is because of a fatal error, we should fail the
// whole subscriber.
stopAllStreamingConnections();
try {
notifyFailed(failure);
} catch (IllegalStateException e) {
if (isRunning()) {
throw e;
}
// It could happen that we are shutting down while some channels fail.
}
}
});
}

ackDeadlineUpdater =
executor.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
// It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API.
long ackLatency =
ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
if (ackLatency > 0) {
long ackExpirationPaddingMillis = ackExpirationPadding.toMillis();
int possibleStreamAckDeadlineSeconds =
Math.max(
MIN_ACK_DEADLINE_SECONDS,
Ints.saturatedCast(
Math.max(
ackLatency,
TimeUnit.MILLISECONDS.toSeconds(ackExpirationPaddingMillis))));
if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) {
streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds;
logger.log(
Level.FINER,
"Updating stream deadline to {0} seconds.",
streamAckDeadlineSeconds);
for (StreamingSubscriberConnection subscriberConnection :
streamingSubscriberConnections) {
subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds);
}
}
}
}
},
ACK_DEADLINE_UPDATE_PERIOD.toMillis(),
ACK_DEADLINE_UPDATE_PERIOD.toMillis(),
TimeUnit.MILLISECONDS);
}

private void stopAllStreamingConnections() {
stopConnections(streamingSubscriberConnections);
if (ackDeadlineUpdater != null) {
ackDeadlineUpdater.cancel(true);
}
}

// Starts polling connections. Blocks until all connections declare themselves running.
private void startPollingConnections() throws IOException {
synchronized (pollingSubscriberConnections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class SubscriberTest {

@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {{false}});
return Arrays.asList(new Object[][] {{true}});
}

static class TestReceiver implements MessageReceiver {
Expand Down Expand Up @@ -205,6 +205,9 @@ public void testAckSingleMessage() throws Exception {

@Test
public void testGetSubscriptionOnce() throws Exception {
if (isStreamingTest) {
return;
}
Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver));

sendMessages(ImmutableList.of("A"));
Expand Down

0 comments on commit 315e58a

Please sign in to comment.