Skip to content

Commit

Permalink
fix: shutdown grpc stubs properly when a subscriber is stopped (#74)
Browse files Browse the repository at this point in the history
* Modifying Publish example in README to match other examples given, and
fix issue #6784

* fix: Modifying Publish example in README to match other examples, and
fix Issue #11

* feat: Adding support for DLQs

Adding delivery attempt count to PubsubMessages as a message attribute,
and creating helper function to allow users to get the count without
knowing implementation details.

* Fix formatting

* fix: making changes requested in pull request

* fix: creating fix to not populate delivery attempt attribute when dead
lettering is not enabled

* Adding unit test for case in which a received message has no delivery attempt

* Making MessageWaiter class more generic to also be used for outstanding
ack operations

* Waiting for acks to complete before shutting down a streaming subscriber
connection

* Fixing formatting error
  • Loading branch information
hannahrogers-google committed Jan 31, 2020
1 parent 052ed86 commit 9bcc433
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class MessageDispatcher {
private final AckProcessor ackProcessor;

private final FlowController flowController;
private final MessageWaiter messagesWaiter;
private final Waiter messagesWaiter;

// Maps ID to "total expiration time". If it takes longer than this, stop extending.
private final ConcurrentMap<String, AckHandler> pendingMessages = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -145,7 +145,7 @@ private void forget() {
return;
}
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
messagesWaiter.incrementPendingCount(-1);
}

@Override
Expand Down Expand Up @@ -205,7 +205,7 @@ void sendAckOperations(
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
this.ackLatencyDistribution = ackLatencyDistribution;
jobLock = new ReentrantLock();
messagesWaiter = new MessageWaiter();
messagesWaiter = new Waiter();
this.clock = clock;
this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
}
Expand Down Expand Up @@ -268,7 +268,7 @@ public void run() {
}

void stop() {
messagesWaiter.waitNoMessages();
messagesWaiter.waitComplete();
jobLock.lock();
try {
if (backgroundJob != null) {
Expand Down Expand Up @@ -331,9 +331,9 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
}

private void processBatch(List<OutstandingMessage> batch) {
messagesWaiter.incrementPendingMessages(batch.size());
messagesWaiter.incrementPendingCount(batch.size());
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented MessageWaiter, so
// This is a blocking flow controller. We have already incremented messagesWaiter, so
// shutdown will block on processing of all these messages anyway.
try {
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class Publisher {

private final AtomicBoolean shutdown;
private final BackgroundResource backgroundResources;
private final MessageWaiter messagesWaiter;
private final Waiter messagesWaiter;
private ScheduledFuture<?> currentAlarmFuture;
private final ApiFunction<PubsubMessage, PubsubMessage> messageTransform;

Expand Down Expand Up @@ -173,7 +173,7 @@ private Publisher(Builder builder) throws IOException {
backgroundResourceList.add(publisherStub);
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
shutdown = new AtomicBoolean(false);
messagesWaiter = new MessageWaiter();
messagesWaiter = new Waiter();
}

/** Topic which the publisher publishes to. */
Expand Down Expand Up @@ -249,7 +249,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
messagesBatchLock.unlock();
}

messagesWaiter.incrementPendingMessages(1);
messagesWaiter.incrementPendingCount(1);

// For messages without ordering keys, it is okay to send batches without holding
// messagesBatchLock.
Expand Down Expand Up @@ -423,7 +423,7 @@ public void onSuccess(PublishResponse result) {
}
}
} finally {
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
messagesWaiter.incrementPendingCount(-outstandingBatch.size());
}
}

Expand All @@ -432,7 +432,7 @@ public void onFailure(Throwable t) {
try {
outstandingBatch.onFailure(t);
} finally {
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
messagesWaiter.incrementPendingCount(-outstandingBatch.size());
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
private final Waiter ackOperationsWaiter = new Waiter();

private final Lock lock = new ReentrantLock();
private ClientStream<StreamingPullRequest> clientStream;
Expand Down Expand Up @@ -116,6 +117,7 @@ protected void doStart() {
@Override
protected void doStop() {
messageDispatcher.stop();
ackOperationsWaiter.waitComplete();

lock.lock();
try {
Expand Down Expand Up @@ -273,16 +275,18 @@ public void sendAckOperations(
new ApiFutureCallback<Empty>() {
@Override
public void onSuccess(Empty empty) {
// noop
ackOperationsWaiter.incrementPendingCount(-1);
}

@Override
public void onFailure(Throwable t) {
ackOperationsWaiter.incrementPendingCount(-1);
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
}
};

int pendingOperations = 0;
for (PendingModifyAckDeadline modack : ackDeadlineExtensions) {
for (List<String> idChunk : Lists.partition(modack.ackIds, MAX_PER_REQUEST_CHANGES)) {
ApiFuture<Empty> future =
Expand All @@ -294,6 +298,7 @@ public void onFailure(Throwable t) {
.setAckDeadlineSeconds(modack.deadlineExtensionSeconds)
.build());
ApiFutures.addCallback(future, loggingCallback, directExecutor());
pendingOperations++;
}
}

Expand All @@ -306,6 +311,9 @@ public void onFailure(Throwable t) {
.addAllAckIds(idChunk)
.build());
ApiFutures.addCallback(future, loggingCallback, directExecutor());
pendingOperations++;
}

ackOperationsWaiter.incrementPendingCount(pendingOperations);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ public void run() {
// stop connection is no-op if connections haven't been started.
stopAllStreamingConnections();
shutdownBackgroundResources();
subStub.shutdownNow();
notifyStopped();
} catch (Exception e) {
notifyFailed(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@

import com.google.api.core.InternalApi;

/** A barrier kind of object that helps to keep track and synchronously wait on pending messages. */
class MessageWaiter {
private int pendingMessages;
/**
* A barrier kind of object that helps keep track of pending actions and synchronously wait until
* all have completed.
*/
class Waiter {
private int pendingCount;

MessageWaiter() {
pendingMessages = 0;
Waiter() {
pendingCount = 0;
}

public synchronized void incrementPendingMessages(int messages) {
this.pendingMessages += messages;
if (pendingMessages == 0) {
public synchronized void incrementPendingCount(int delta) {
this.pendingCount += delta;
if (pendingCount == 0) {
notifyAll();
}
}

public synchronized void waitNoMessages() {
public synchronized void waitComplete() {
boolean interrupted = false;
try {
while (pendingMessages > 0) {
while (pendingCount > 0) {
try {
wait();
} catch (InterruptedException e) {
Expand All @@ -52,7 +55,7 @@ public synchronized void waitNoMessages() {
}

@InternalApi
public int pendingMessages() {
return pendingMessages;
public int pendingCount() {
return pendingCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link MessageWaiter}. */
/** Tests for {@link Waiter}. */
@RunWith(JUnit4.class)
public class MessageWaiterTest {
public class WaiterTest {

@Test
public void test() throws Exception {
final MessageWaiter waiter = new MessageWaiter();
waiter.incrementPendingMessages(1);
final Waiter waiter = new Waiter();
waiter.incrementPendingCount(1);

final Thread mainThread = Thread.currentThread();
Thread t =
Expand All @@ -40,14 +40,14 @@ public void run() {
while (mainThread.getState() != Thread.State.WAITING) {
Thread.yield();
}
waiter.incrementPendingMessages(-1);
waiter.incrementPendingCount(-1);
}
});
t.start();

waiter.waitNoMessages();
waiter.waitComplete();
t.join();

assertEquals(0, waiter.pendingMessages());
assertEquals(0, waiter.pendingCount());
}
}

0 comments on commit 9bcc433

Please sign in to comment.