diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index e8eb336a3e80..8f3dc054ab0e 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -173,13 +173,13 @@ private class AckHandler implements FutureCallback { private final String ackId; private final int outstandingBytes; private final AtomicBoolean acked; - private final Instant receivedTime; + private final long receivedTimeMillis; AckHandler(String ackId, int outstandingBytes) { this.ackId = ackId; this.outstandingBytes = outstandingBytes; acked = new AtomicBoolean(false); - receivedTime = Instant.ofEpochMilli(clock.millisTime()); + receivedTimeMillis = clock.millisTime(); } @Override @@ -207,7 +207,6 @@ public void onSuccess(AckReply reply) { pendingAcks.add(ackId); } // Record the latency rounded to the next closest integer. - long receivedTimeMillis = TimeUnit.NANOSECONDS.toMillis(receivedTime.getNano()); ackLatencyDistribution.record( Ints.saturatedCast( (long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D))); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 7822bc470ac3..53a2b672a8ce 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -29,12 +29,9 @@ import com.google.common.util.concurrent.SettableFuture; import com.google.pubsub.v1.StreamingPullRequest; import com.google.pubsub.v1.StreamingPullResponse; -import com.google.pubsub.v1.SubscriberGrpc; -import io.grpc.CallOptions; -import io.grpc.Channel; +import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub; import io.grpc.Status; import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.ClientCalls; import io.grpc.stub.ClientResponseObserver; import java.util.ArrayList; import java.util.List; @@ -55,7 +52,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private Duration channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF; - private final Channel channel; + private final SubscriberStub asyncStub; private final String subscription; private final ScheduledExecutorService executor; @@ -69,14 +66,14 @@ public StreamingSubscriberConnection( Duration maxAckExtensionPeriod, int streamAckDeadlineSeconds, Distribution ackLatencyDistribution, - Channel channel, + SubscriberStub asyncStub, FlowController flowController, ScheduledExecutorService executor, @Nullable ScheduledExecutorService alarmsExecutor, ApiClock clock) { this.subscription = subscription; this.executor = executor; - this.channel = channel; + this.asyncStub = asyncStub; this.messageDispatcher = new MessageDispatcher( receiver, @@ -101,8 +98,8 @@ protected void doStart() { @Override protected void doStop() { messageDispatcher.stop(); - notifyStopped(); requestObserver.onError(Status.CANCELLED.asException()); + notifyStopped(); } private class StreamingPullResponseObserver @@ -137,7 +134,6 @@ public void run() { @Override public void onError(Throwable t) { - logger.log(Level.WARNING, "Terminated streaming with exception", t); errorFuture.setException(t); } @@ -154,9 +150,7 @@ private void initialize() { new StreamingPullResponseObserver(errorFuture); final ClientCallStreamObserver requestObserver = (ClientCallStreamObserver) - (ClientCalls.asyncBidiStreamingCall( - channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, CallOptions.DEFAULT), - responseObserver)); + (asyncStub.streamingPull(responseObserver)); logger.log( Level.FINER, "Initializing stream to subscription {0} with deadline {1}", @@ -173,6 +167,9 @@ private void initialize() { new FutureCallback() { @Override public void onSuccess(@Nullable Void result) { + if (!isAlive()) { + return; + } channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF; // The stream was closed. And any case we want to reopen it to continue receiving // messages. @@ -186,6 +183,7 @@ public void onFailure(Throwable cause) { logger.log(Level.FINE, "pull failure after service no longer running", cause); return; } + logger.log(Level.WARNING, "Terminated streaming with exception", cause); if (StatusUtil.isRetryable(cause)) { long backoffMillis = channelReconnectBackoff.toMillis(); channelReconnectBackoff = channelReconnectBackoff.plusMillis(backoffMillis); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index f5e4d161ecdb..ae8b240bc4e7 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -38,9 +38,11 @@ import com.google.pubsub.v1.GetSubscriptionRequest; import com.google.pubsub.v1.SubscriberGrpc; import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub; +import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.SubscriptionName; import io.grpc.CallCredentials; +import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.auth.MoreCallCredentials; import java.io.IOException; @@ -257,22 +259,25 @@ public void close() { // same executor, it will deadlock: the thread will be stuck waiting for connections // to start but cannot start the connections. // For this reason, we spawn a dedicated thread. Starting subscriber should be rare. - new Thread(new Runnable() { - @Override - public void run() { - try { - startPollingConnections(); - notifyStarted(); - } catch (Throwable t) { - notifyFailed(t); - } - } - }).start(); + new Thread( + new Runnable() { + @Override + public void run() { + try { + // startPollingConnections(); + startStreamingConnections(); + notifyStarted(); + } catch (Throwable t) { + notifyFailed(t); + } + } + }) + .start(); } @Override protected void doStop() { - // stopAllStreamingConnections(); + stopAllStreamingConnections(); stopAllPollingConnections(); try { for (AutoCloseable closeable : closeables) { @@ -284,6 +289,94 @@ protected void doStop() { } } + private void startStreamingConnections() throws IOException { + synchronized (streamingSubscriberConnections) { + Credentials credentials = credentialsProvider.getCredentials(); + CallCredentials callCredentials = + credentials == null ? null : MoreCallCredentials.from(credentials); + + for (Channel channel : channels) { + SubscriberStub stub = SubscriberGrpc.newStub(channel); + if (callCredentials != null) { + stub = stub.withCallCredentials(callCredentials); + } + streamingSubscriberConnections.add( + new StreamingSubscriberConnection( + cachedSubscriptionNameString, + receiver, + ackExpirationPadding, + maxAckExtensionPeriod, + streamAckDeadlineSeconds, + ackLatencyDistribution, + stub, + flowController, + executor, + alarmsExecutor, + clock)); + } + startConnections( + streamingSubscriberConnections, + new Listener() { + @Override + public void failed(State from, Throwable failure) { + // If a connection failed is because of a fatal error, we should fail the + // whole subscriber. + stopAllStreamingConnections(); + try { + notifyFailed(failure); + } catch (IllegalStateException e) { + if (isRunning()) { + throw e; + } + // It could happen that we are shutting down while some channels fail. + } + } + }); + } + + ackDeadlineUpdater = + executor.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + // It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API. + long ackLatency = + ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); + if (ackLatency > 0) { + long ackExpirationPaddingMillis = ackExpirationPadding.toMillis(); + int possibleStreamAckDeadlineSeconds = + Math.max( + MIN_ACK_DEADLINE_SECONDS, + Ints.saturatedCast( + Math.max( + ackLatency, + TimeUnit.MILLISECONDS.toSeconds(ackExpirationPaddingMillis)))); + if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) { + streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds; + logger.log( + Level.FINER, + "Updating stream deadline to {0} seconds.", + streamAckDeadlineSeconds); + for (StreamingSubscriberConnection subscriberConnection : + streamingSubscriberConnections) { + subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds); + } + } + } + } + }, + ACK_DEADLINE_UPDATE_PERIOD.toMillis(), + ACK_DEADLINE_UPDATE_PERIOD.toMillis(), + TimeUnit.MILLISECONDS); + } + + private void stopAllStreamingConnections() { + stopConnections(streamingSubscriberConnections); + if (ackDeadlineUpdater != null) { + ackDeadlineUpdater.cancel(true); + } + } + // Starts polling connections. Blocks until all connections declare themselves running. private void startPollingConnections() throws IOException { synchronized (pollingSubscriberConnections) { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 56af21b98ba2..38c29232496b 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -80,7 +80,7 @@ public class SubscriberTest { @Parameters public static Collection data() { - return Arrays.asList(new Object[][] {{false}}); + return Arrays.asList(new Object[][] {{true}}); } static class TestReceiver implements MessageReceiver { @@ -205,6 +205,9 @@ public void testAckSingleMessage() throws Exception { @Test public void testGetSubscriptionOnce() throws Exception { + if (isStreamingTest) { + return; + } Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver)); sendMessages(ImmutableList.of("A"));