Skip to content
Permalink
Browse files
feat: add randomly generated UUID to outgoing initial streaming pull …
…requests (#77)

* google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java

* Ensure that if a batch is started and the timeout completes before the currently outstanding message has finished publishing with an ordering key that the last batch does in fact get published.

* add back in unit test

* feat: add randomly generated UUID to outgoing initial streaming pull requests for better ordering keys affinity
  • Loading branch information
kamalaboulhosn committed Jan 31, 2020
1 parent d0ab525 commit 08e77d428aa50bb53ed7d5b922e76c2da18ed6d1
Showing with 9 additions and 0 deletions.
  1. +9 −0 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java
@@ -45,6 +45,7 @@
import com.google.pubsub.v1.StreamingPullResponse;
import io.grpc.Status;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -77,6 +78,13 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final Lock lock = new ReentrantLock();
private ClientStream<StreamingPullRequest> clientStream;

/**
* The same clientId is used across all streaming pull connections that are created. This is
* intentional, as it indicates to the server that any guarantees made for a stream that
* disconnected will be made for the stream that is created to replace it.
*/
private final String clientId = UUID.randomUUID().toString();

public StreamingSubscriberConnection(
String subscription,
MessageReceiver receiver,
@@ -200,6 +208,7 @@ private void initialize() {
StreamingPullRequest.newBuilder()
.setSubscription(subscription)
.setStreamAckDeadlineSeconds(60)
.setClientId(clientId)
.build());

/**

0 comments on commit 08e77d4

Please sign in to comment.