Skip to content

Commit

Permalink
Merge 315e58a into 2a92207
Browse files Browse the repository at this point in the history
  • Loading branch information
pongad committed Jul 3, 2017
2 parents 2a92207 + 315e58a commit c5f1f2a
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,13 @@ private class AckHandler implements FutureCallback<AckReply> {
private final String ackId;
private final int outstandingBytes;
private final AtomicBoolean acked;
private final Instant receivedTime;
private final long receivedTimeMillis;

AckHandler(String ackId, int outstandingBytes) {
this.ackId = ackId;
this.outstandingBytes = outstandingBytes;
acked = new AtomicBoolean(false);
receivedTime = Instant.ofEpochMilli(clock.millisTime());
receivedTimeMillis = clock.millisTime();
}

@Override
Expand Down Expand Up @@ -207,7 +207,6 @@ public void onSuccess(AckReply reply) {
pendingAcks.add(ackId);
}
// Record the latency rounded to the next closest integer.
long receivedTimeMillis = TimeUnit.NANOSECONDS.toMillis(receivedTime.getNano());
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriberGrpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ClientResponseObserver;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -55,7 +52,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements

private Duration channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;

private final Channel channel;
private final SubscriberStub asyncStub;

private final String subscription;
private final ScheduledExecutorService executor;
Expand All @@ -69,14 +66,14 @@ public StreamingSubscriberConnection(
Duration maxAckExtensionPeriod,
int streamAckDeadlineSeconds,
Distribution ackLatencyDistribution,
Channel channel,
SubscriberStub asyncStub,
FlowController flowController,
ScheduledExecutorService executor,
@Nullable ScheduledExecutorService alarmsExecutor,
ApiClock clock) {
this.subscription = subscription;
this.executor = executor;
this.channel = channel;
this.asyncStub = asyncStub;
this.messageDispatcher =
new MessageDispatcher(
receiver,
Expand All @@ -101,8 +98,8 @@ protected void doStart() {
@Override
protected void doStop() {
messageDispatcher.stop();
notifyStopped();
requestObserver.onError(Status.CANCELLED.asException());
notifyStopped();
}

private class StreamingPullResponseObserver
Expand Down Expand Up @@ -137,7 +134,6 @@ public void run() {

@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Terminated streaming with exception", t);
errorFuture.setException(t);
}

Expand All @@ -154,9 +150,7 @@ private void initialize() {
new StreamingPullResponseObserver(errorFuture);
final ClientCallStreamObserver<StreamingPullRequest> requestObserver =
(ClientCallStreamObserver<StreamingPullRequest>)
(ClientCalls.asyncBidiStreamingCall(
channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, CallOptions.DEFAULT),
responseObserver));
(asyncStub.streamingPull(responseObserver));
logger.log(
Level.FINER,
"Initializing stream to subscription {0} with deadline {1}",
Expand All @@ -173,6 +167,9 @@ private void initialize() {
new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
if (!isAlive()) {
return;
}
channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
// The stream was closed. And any case we want to reopen it to continue receiving
// messages.
Expand All @@ -186,6 +183,7 @@ public void onFailure(Throwable cause) {
logger.log(Level.FINE, "pull failure after service no longer running", cause);
return;
}
logger.log(Level.WARNING, "Terminated streaming with exception", cause);
if (StatusUtil.isRetryable(cause)) {
long backoffMillis = channelReconnectBackoff.toMillis();
channelReconnectBackoff = channelReconnectBackoff.plusMillis(backoffMillis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import com.google.pubsub.v1.GetSubscriptionRequest;
import com.google.pubsub.v1.SubscriberGrpc;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.auth.MoreCallCredentials;
import java.io.IOException;
Expand Down Expand Up @@ -257,22 +259,25 @@ public void close() {
// same executor, it will deadlock: the thread will be stuck waiting for connections
// to start but cannot start the connections.
// For this reason, we spawn a dedicated thread. Starting subscriber should be rare.
new Thread(new Runnable() {
@Override
public void run() {
try {
startPollingConnections();
notifyStarted();
} catch (Throwable t) {
notifyFailed(t);
}
}
}).start();
new Thread(
new Runnable() {
@Override
public void run() {
try {
// startPollingConnections();
startStreamingConnections();
notifyStarted();
} catch (Throwable t) {
notifyFailed(t);
}
}
})
.start();
}

@Override
protected void doStop() {
// stopAllStreamingConnections();
stopAllStreamingConnections();
stopAllPollingConnections();
try {
for (AutoCloseable closeable : closeables) {
Expand All @@ -284,6 +289,94 @@ protected void doStop() {
}
}

private void startStreamingConnections() throws IOException {
synchronized (streamingSubscriberConnections) {
Credentials credentials = credentialsProvider.getCredentials();
CallCredentials callCredentials =
credentials == null ? null : MoreCallCredentials.from(credentials);

for (Channel channel : channels) {
SubscriberStub stub = SubscriberGrpc.newStub(channel);
if (callCredentials != null) {
stub = stub.withCallCredentials(callCredentials);
}
streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
cachedSubscriptionNameString,
receiver,
ackExpirationPadding,
maxAckExtensionPeriod,
streamAckDeadlineSeconds,
ackLatencyDistribution,
stub,
flowController,
executor,
alarmsExecutor,
clock));
}
startConnections(
streamingSubscriberConnections,
new Listener() {
@Override
public void failed(State from, Throwable failure) {
// If a connection failed is because of a fatal error, we should fail the
// whole subscriber.
stopAllStreamingConnections();
try {
notifyFailed(failure);
} catch (IllegalStateException e) {
if (isRunning()) {
throw e;
}
// It could happen that we are shutting down while some channels fail.
}
}
});
}

ackDeadlineUpdater =
executor.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
// It is guaranteed this will be <= MAX_ACK_DEADLINE_SECONDS, the max of the API.
long ackLatency =
ackLatencyDistribution.getNthPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);
if (ackLatency > 0) {
long ackExpirationPaddingMillis = ackExpirationPadding.toMillis();
int possibleStreamAckDeadlineSeconds =
Math.max(
MIN_ACK_DEADLINE_SECONDS,
Ints.saturatedCast(
Math.max(
ackLatency,
TimeUnit.MILLISECONDS.toSeconds(ackExpirationPaddingMillis))));
if (streamAckDeadlineSeconds != possibleStreamAckDeadlineSeconds) {
streamAckDeadlineSeconds = possibleStreamAckDeadlineSeconds;
logger.log(
Level.FINER,
"Updating stream deadline to {0} seconds.",
streamAckDeadlineSeconds);
for (StreamingSubscriberConnection subscriberConnection :
streamingSubscriberConnections) {
subscriberConnection.updateStreamAckDeadline(streamAckDeadlineSeconds);
}
}
}
}
},
ACK_DEADLINE_UPDATE_PERIOD.toMillis(),
ACK_DEADLINE_UPDATE_PERIOD.toMillis(),
TimeUnit.MILLISECONDS);
}

private void stopAllStreamingConnections() {
stopConnections(streamingSubscriberConnections);
if (ackDeadlineUpdater != null) {
ackDeadlineUpdater.cancel(true);
}
}

// Starts polling connections. Blocks until all connections declare themselves running.
private void startPollingConnections() throws IOException {
synchronized (pollingSubscriberConnections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class SubscriberTest {

@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {{false}});
return Arrays.asList(new Object[][] {{true}});
}

static class TestReceiver implements MessageReceiver {
Expand Down Expand Up @@ -205,6 +205,9 @@ public void testAckSingleMessage() throws Exception {

@Test
public void testGetSubscriptionOnce() throws Exception {
if (isStreamingTest) {
return;
}
Subscriber subscriber = startSubscriber(getTestSubscriberBuilder(testReceiver));

sendMessages(ImmutableList.of("A"));
Expand Down

0 comments on commit c5f1f2a

Please sign in to comment.