Skip to content

Commit

Permalink
feat: Enable server side flow control by default with the option to t…
Browse files Browse the repository at this point in the history
…urn it off (#426)

* Enable server side flow control by default with the option to turn it off

This change enables sending flow control settings automatically to the server. If FlowControlSettings.maxOutstandingElementCount > 0 or FlowControlSettings.maxOutstandingRequestBytes > 0, flow control will be enforced at the server side (in addition to the client side).

This behavior is enabled by default and Subscriber.Builder.setUseLegacyFlowControl() method is provided for users who would like to opt-out of this feature in case they encouter issues with server side flow control.

* feat: Enable server side flow control by default with the option to turn it off

This change enables sending flow control settings automatically to the server.
If FlowControlSettings.maxOutstandingElementCount > 0 or FlowControlSettings.maxOutstandingRequestBytes > 0,
flow control will be enforced at the server side (in addition to the client side).

This behavior is enabled by default and Subscriber.Builder.setUseLegacyFlowControl() method
is provided for users who would like to opt-out of this feature in case they encounter
issues with server side flow control.

* Update google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Co-authored-by: yoshi-code-bot <70984784+yoshi-code-bot@users.noreply.github.com>

* Update google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Co-authored-by: yoshi-code-bot <70984784+yoshi-code-bot@users.noreply.github.com>

* Update StreamingSubscriberConnection.java

Co-authored-by: yoshi-code-bot <70984784+yoshi-code-bot@users.noreply.github.com>
  • Loading branch information
fayssalmartanigcp and yoshi-code-bot committed Nov 10, 2020
1 parent 771bb17 commit 14ac8d7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final MessageDispatcher messageDispatcher;

private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
Expand All @@ -98,6 +99,7 @@ public StreamingSubscriberConnection(
SubscriberStub stub,
int channelAffinity,
FlowControlSettings flowControlSettings,
boolean useLegacyFlowControl,
FlowController flowController,
ScheduledExecutorService executor,
ScheduledExecutorService systemExecutor,
Expand All @@ -119,6 +121,7 @@ public StreamingSubscriberConnection(
systemExecutor,
clock);
this.flowControlSettings = flowControlSettings;
this.useLegacyFlowControl = useLegacyFlowControl;
}

@Override
Expand Down Expand Up @@ -217,9 +220,13 @@ private void initialize() {
.setStreamAckDeadlineSeconds(60)
.setClientId(clientId)
.setMaxOutstandingMessages(
valueOrZero(flowControlSettings.getMaxOutstandingElementCount()))
this.useLegacyFlowControl
? 0
: valueOrZero(flowControlSettings.getMaxOutstandingElementCount()))
.setMaxOutstandingBytes(
valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes()))
this.useLegacyFlowControl
? 0
: valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes()))
.build());

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac

private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final boolean useLegacyFlowControl;
private final Duration maxAckExtensionPeriod;
private final Duration maxDurationPerAckExtension;
// The ExecutorProvider used to generate executors for processing messages.
Expand All @@ -126,6 +127,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private Subscriber(Builder builder) {
receiver = builder.receiver;
flowControlSettings = builder.flowControlSettings;
useLegacyFlowControl = builder.useLegacyFlowControl;
subscriptionName = builder.subscriptionName;

maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
Expand Down Expand Up @@ -336,6 +338,7 @@ private void startStreamingConnections() {
subStub,
i,
flowControlSettings,
useLegacyFlowControl,
flowController,
executor,
alarmsExecutor,
Expand Down Expand Up @@ -420,6 +423,7 @@ public static final class Builder {
private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private Duration maxDurationPerAckExtension = Duration.ofMillis(0);

private boolean useLegacyFlowControl = false;
private FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1000L)
Expand Down Expand Up @@ -504,6 +508,15 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
return this;
}

/**
* Disables enforcing flow control settings at the Cloud PubSub server and uses the less
* accurate method of only enforcing flow control at the client side.
*/
public Builder setUseLegacyFlowControl(boolean value) {
this.useLegacyFlowControl = value;
return this;
}

/**
* Set the maximum period a message ack deadline will be extended. Defaults to one hour.
*
Expand Down

0 comments on commit 14ac8d7

Please sign in to comment.