Skip to content

Commit

Permalink
fix: ensure all publishes complete before shutting down publisher (#76)
Browse files Browse the repository at this point in the history
* Modifying Publish example in README to match other examples given, and
fix issue #6784

* fix: Modifying Publish example in README to match other examples, and
fix Issue #11

* feat: Adding support for DLQs

Adding delivery attempt count to PubsubMessages as a message attribute,
and creating helper function to allow users to get the count without
knowing implementation details.

* Fix formatting

* fix: making changes requested in pull request

* fix: creating fix to not populate delivery attempt attribute when dead
lettering is not enabled

* Adding unit test for case in which a received message has no delivery attempt

* Making MessageWaiter class more generic to also be used for outstanding
ack operations

* Waiting for acks to complete before shutting down a streaming subscriber
connection

* Fixing formatting error

* fix: making sure all publishes complete before shutting down the
publisher
  • Loading branch information
hannahrogers-google committed Jan 31, 2020
1 parent 9bcc433 commit d0ab525
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ public void shutdown() {
currentAlarmFuture.cancel(false);
}
publishAllOutstanding();
messagesWaiter.waitComplete();
backgroundResources.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,37 @@ public void testSinglePublishByNumBytes() throws Exception {
publisher.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
public void testPublishByShutdown() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setDelayThreshold(Duration.ofSeconds(100))
.setElementCountThreshold(10L)
.build())
.build();

testPublisherServiceImpl.addPublishResponse(
PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2"));

ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "A");
ApiFuture<String> publishFuture2 = sendTestMessage(publisher, "B");

// Note we are not advancing time or reaching the count threshold but messages should
// still get published by call to shutdown

publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);

// Verify the publishes completed
assertTrue(publishFuture1.isDone());
assertTrue(publishFuture2.isDone());
assertEquals("1", publishFuture1.get());
assertEquals("2", publishFuture2.get());
}

@Test
public void testPublishMixedSizeAndDuration() throws Exception {
Publisher publisher =
Expand Down

0 comments on commit d0ab525

Please sign in to comment.