Skip to content

Commit

Permalink
feat: Send streaming pull flow control settings to server (#267)
Browse files Browse the repository at this point in the history
* feat: Add flow control support to publisher

* make suggested fixes

* chore: Remove note that ordering keys requires enablements.

* feat: Add support for server-side flow control

* Revert "chore: Remove note that ordering keys requires enablements."

This reverts commit 9c113c3.

* fix: Fix import order
  • Loading branch information
kamalaboulhosn committed Jun 24, 2020
1 parent ddd3283 commit 9c750c8
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.api.gax.grpc.GrpcCallContext;
Expand Down Expand Up @@ -71,6 +72,8 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final ScheduledExecutorService systemExecutor;
private final MessageDispatcher messageDispatcher;

private final FlowControlSettings flowControlSettings;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
private final Waiter ackOperationsWaiter = new Waiter();
Expand All @@ -93,6 +96,7 @@ public StreamingSubscriberConnection(
Distribution ackLatencyDistribution,
SubscriberStub stub,
int channelAffinity,
FlowControlSettings flowControlSettings,
FlowController flowController,
ScheduledExecutorService executor,
ScheduledExecutorService systemExecutor,
Expand All @@ -112,6 +116,7 @@ public StreamingSubscriberConnection(
executor,
systemExecutor,
clock);
this.flowControlSettings = flowControlSettings;
}

@Override
Expand Down Expand Up @@ -209,6 +214,8 @@ private void initialize() {
.setSubscription(subscription)
.setStreamAckDeadlineSeconds(60)
.setClientId(clientId)
.setMaxOutstandingMessages(flowControlSettings.getMaxOutstandingElementCount())
.setMaxOutstandingBytes(flowControlSettings.getMaxOutstandingRequestBytes())
.build());

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ private void startStreamingConnections() {
ackLatencyDistribution,
subStub,
i,
flowControlSettings,
flowController,
executor,
alarmsExecutor,
Expand Down

0 comments on commit 9c750c8

Please sign in to comment.