Skip to content

Commit

Permalink
fix: ordering keys publishing of last batch (#9)
Browse files Browse the repository at this point in the history
* google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java

* Ensure that if a batch is started and the timeout completes before the currently outstanding message has finished publishing with an ordering key that the last batch does in fact get published.

* add back in unit test
  • Loading branch information
kamalaboulhosn committed Nov 26, 2019
1 parent ec9cbce commit 02c3771
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,25 @@ private void publishAllWithoutInflight() {
}
}

/**
* Publish any outstanding batches if non-empty and there are no other batches in flight for
* orderingKey. This method sends buffered messages, but does not wait for the send operations to
* complete. To wait for messages to send, call {@code get} on the futures returned from {@code
* publish}.
*/
private void publishAllWithoutInflightForKey(final String orderingKey) {
messagesBatchLock.lock();
try {
MessagesBatch batch = messagesBatches.get(orderingKey);
if (batch != null && !sequentialExecutor.hasTasksInflight(orderingKey)) {
publishOutstandingBatch(batch.popOutstandingBatch());
messagesBatches.remove(orderingKey);
}
} finally {
messagesBatchLock.unlock();
}
}

private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
return publisherStub
.publishCallable()
Expand All @@ -397,6 +416,11 @@ public void onSuccess(PublishResponse result) {
result.getMessageIdsCount(), outstandingBatch.size())));
} else {
outstandingBatch.onSuccess(result.getMessageIdsList());
if (!activeAlarm.get()
&& outstandingBatch.orderingKey != null
&& !outstandingBatch.orderingKey.isEmpty()) {
publishAllWithoutInflightForKey(outstandingBatch.orderingKey);
}
}
} finally {
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ public void run() {
// Step 5.1: on success
@Override
public void onSuccess(T msg) {
future.set(msg);
callNextTaskAsync(key);
future.set(msg);
}

// Step 5.2: on failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.threeten.bp.Duration;

/**
* A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud
Expand All @@ -36,6 +39,8 @@ class FakePublisherServiceImpl extends PublisherImplBase {
private final LinkedBlockingQueue<Response> publishResponses = new LinkedBlockingQueue<>();
private final AtomicInteger nextMessageId = new AtomicInteger(1);
private boolean autoPublishResponse;
private ScheduledExecutorService executor = null;
private Duration responseDelay = Duration.ZERO;

/** Class used to save the state of a possible response. */
private static class Response {
Expand Down Expand Up @@ -74,7 +79,8 @@ public String toString() {
}

@Override
public void publish(PublishRequest request, StreamObserver<PublishResponse> responseObserver) {
public void publish(
PublishRequest request, final StreamObserver<PublishResponse> responseObserver) {
requests.add(request);
Response response;
try {
Expand All @@ -90,6 +96,23 @@ public void publish(PublishRequest request, StreamObserver<PublishResponse> resp
} catch (InterruptedException e) {
throw new IllegalArgumentException(e);
}
if (responseDelay == Duration.ZERO) {
sendResponse(response, responseObserver);
} else {
final Response responseToSend = response;
executor.schedule(
new Runnable() {
@Override
public void run() {
sendResponse(responseToSend, responseObserver);
}
},
responseDelay.toMillis(),
TimeUnit.MILLISECONDS);
}
}

private void sendResponse(Response response, StreamObserver<PublishResponse> responseObserver) {
if (response.isError()) {
responseObserver.onError(response.getError());
} else {
Expand All @@ -107,6 +130,18 @@ public FakePublisherServiceImpl setAutoPublishResponse(boolean autoPublishRespon
return this;
}

/** Set an executor to use to delay publish responses. */
public FakePublisherServiceImpl setExecutor(ScheduledExecutorService executor) {
this.executor = executor;
return this;
}

/** Set an amount of time by which to delay publish responses. */
public FakePublisherServiceImpl setPublishResponseDelay(Duration responseDelay) {
this.responseDelay = responseDelay;
return this;
}

public FakePublisherServiceImpl addPublishResponse(PublishResponse publishResponse) {
publishResponses.add(new Response(publishResponse));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception {
.setEnableMessageOrdering(true)
.build();
testPublisherServiceImpl.setAutoPublishResponse(true);
testPublisherServiceImpl.setExecutor(fakeExecutor);
testPublisherServiceImpl.setPublishResponseDelay(Duration.ofSeconds(300));

// Publish two messages with ordering key, "OrderA", and other two messages with "OrderB".
ApiFuture<String> publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA");
Expand All @@ -325,10 +327,23 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception {
// The timeout expires.
fakeExecutor.advanceTime(Duration.ofSeconds(100));

// Publish one more message on "OrderA" while publishes are outstanding.
testPublisherServiceImpl.setPublishResponseDelay(Duration.ZERO);
ApiFuture<String> publishFuture5 = sendTestMessageWithOrderingKey(publisher, "m5", "OrderA");

// The second timeout expires.
fakeExecutor.advanceTime(Duration.ofSeconds(100));

// Publishing completes on the first four messages.
fakeExecutor.advanceTime(Duration.ofSeconds(200));

// Verify that they were delivered in order per ordering key.
assertTrue(Integer.parseInt(publishFuture1.get()) < Integer.parseInt(publishFuture3.get()));
assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture4.get()));

// Verify that they were delivered in order per ordering key.
assertTrue(Integer.parseInt(publishFuture3.get()) < Integer.parseInt(publishFuture5.get()));

// Verify that every message within the same batch has the same ordering key.
List<PublishRequest> requests = testPublisherServiceImpl.getCapturedRequests();
for (PublishRequest request : requests) {
Expand Down

0 comments on commit 02c3771

Please sign in to comment.