Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final MessageDispatcher messageDispatcher;

private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
Expand All @@ -98,6 +99,7 @@ public StreamingSubscriberConnection(
SubscriberStub stub,
int channelAffinity,
FlowControlSettings flowControlSettings,
boolean useLegacyFlowControl,
FlowController flowController,
ScheduledExecutorService executor,
ScheduledExecutorService systemExecutor,
Expand All @@ -119,6 +121,7 @@ public StreamingSubscriberConnection(
systemExecutor,
clock);
this.flowControlSettings = flowControlSettings;
this.useLegacyFlowControl = useLegacyFlowControl;
}

@Override
Expand Down Expand Up @@ -217,9 +220,13 @@ private void initialize() {
.setStreamAckDeadlineSeconds(60)
.setClientId(clientId)
.setMaxOutstandingMessages(
valueOrZero(flowControlSettings.getMaxOutstandingElementCount()))
this.useLegacyFlowControl
? 0
: valueOrZero(flowControlSettings.getMaxOutstandingElementCount()))
.setMaxOutstandingBytes(
valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes()))
this.useLegacyFlowControl
? 0
: valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes()))
.build());

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac

private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;
private final Duration maxAckExtensionPeriod;
private final Duration maxDurationPerAckExtension;
// The ExecutorProvider used to generate executors for processing messages.
Expand All @@ -126,6 +127,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private Subscriber(Builder builder) {
receiver = builder.receiver;
flowControlSettings = builder.flowControlSettings;
useLegacyFlowControl = builder.useLegacyFlowControl;
subscriptionName = builder.subscriptionName;

maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
Expand Down Expand Up @@ -336,6 +338,7 @@ private void startStreamingConnections() {
subStub,
i,
flowControlSettings,
useLegacyFlowControl,
flowController,
executor,
alarmsExecutor,
Expand Down Expand Up @@ -420,6 +423,7 @@ public static final class Builder {
private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private Duration maxDurationPerAckExtension = Duration.ofMillis(0);

private boolean useLegacyFlowControl = false;
private FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1000L)
Expand Down Expand Up @@ -504,6 +508,15 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
return this;
}

/**
* Disables enforcing flow control settings at the Cloud PubSub server and uses the less
* accurate method of only enforcing flow control at the client side.
*/
public Builder setUseLegacyFlowControl(boolean value) {
this.useLegacyFlowControl = value;
return this;
}

/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
Expand Down