Skip to content

Commit

Permalink
feat: Enable server side flow control by default with the option to t…
Browse files Browse the repository at this point in the history
…urn it off

This change enables sending flow control settings automatically to the server. If FlowControlSettings.MaxOutstandingElementCount > 0 or FlowControlSettings.MaxOutstandingByteCount > 0, flow control will be enforced at the server side (in addition to the client side).

This behavior is enabled by default and users who would like to opt-out of this feature --in case they encounter issues with server side flow control-- can set Settings.UseLegacyFlowControl=True in SubscriberClient.CreateAsync().
  • Loading branch information
fayssalmartanigcp authored and jskeet committed Nov 10, 2020
1 parent c3c9448 commit 0ce91bb
Showing 1 changed file with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ internal Settings(Settings other)
/// </summary>
public FlowControlSettings FlowControlSettings { get; set; }

/// <summary>
/// If set to true, disables enforcing flow control settings at the Cloud PubSub server
/// and uses the less accurate method of only enforcing flow control at the client side.
/// </summary>
public bool UseLegacyFlowControl { get; set; } = false;

/// <summary>
/// The lease time before which a message must either be ACKed
/// or have its lease extended. This is truncated to the nearest second.
Expand Down Expand Up @@ -381,6 +387,7 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable<Sub
_scheduler = settings.Scheduler ?? SystemScheduler.Instance;
_taskHelper = GaxPreconditions.CheckNotNull(taskHelper, nameof(taskHelper));
_flowControlSettings = settings.FlowControlSettings ?? DefaultFlowControlSettings;
_useLegacyFlowControl = settings.UseLegacyFlowControl;
_maxAckExtendQueue = (int)Math.Min(_flowControlSettings.MaxOutstandingElementCount ?? long.MaxValue, 20_000);
}

Expand All @@ -394,6 +401,7 @@ internal SubscriberClientImpl(SubscriptionName subscriptionName, IEnumerable<Sub
private readonly IScheduler _scheduler;
private readonly TaskHelper _taskHelper;
private readonly FlowControlSettings _flowControlSettings;
private readonly bool _useLegacyFlowControl;

private TaskCompletionSource<int> _mainTcs;
private CancellationTokenSource _globalSoftStopCts; // soft-stop is guarenteed to occur before hard-stop.
Expand Down Expand Up @@ -424,7 +432,7 @@ public override Task StartAsync(Func<PubsubMessage, CancellationToken, Task<Repl
// Start all subscribers
var subscriberTasks = _clients.Select(client =>
{
var singleChannel = new SingleChannel(this, client, handlerAsync, flow, registerTask);
var singleChannel = new SingleChannel(this, client, handlerAsync, flow, _useLegacyFlowControl, registerTask);
return _taskHelper.Run(() => singleChannel.StartAsync());
}).ToArray();
// Set up finish task; code that executes when this subscriber is being shutdown (for whatever reason).
Expand Down Expand Up @@ -821,7 +829,7 @@ internal TimedId(long time, string id)

internal SingleChannel(SubscriberClientImpl subscriber,
SubscriberServiceApiClient client, Func<PubsubMessage, CancellationToken, Task<Reply>> handlerAsync,
Flow flow,
Flow flow, bool useLegacyFlowControl,
Action<Task> registerTaskFn)
{
_registerTaskFn = registerTaskFn;
Expand All @@ -841,6 +849,7 @@ internal TimedId(long time, string id)
_maxAckExtendSendCount = Math.Max(10, subscriber._maxAckExtendQueue / 4);
_maxConcurrentPush = 3; // Fairly arbitrary.
_flow = flow;
_useLegacyFlowControl = useLegacyFlowControl;
_eventPush = new AsyncAutoResetEvent(subscriber._taskHelper);
_continuationQueue = new AsyncSingleRecvQueue<TaskNextAction>(subscriber._taskHelper);
}
Expand All @@ -864,6 +873,7 @@ internal TimedId(long time, string id)
private readonly int _maxConcurrentPush; // Mamimum number (slightly soft) of concurrent ack/nack/extend push RPCs.

private readonly Flow _flow;
private readonly bool _useLegacyFlowControl;
private readonly AsyncAutoResetEvent _eventPush;
private readonly AsyncSingleRecvQueue<TaskNextAction> _continuationQueue;
private readonly RequeueableQueue<TimedId> _extendQueue = new RequeueableQueue<TimedId>();
Expand Down Expand Up @@ -992,8 +1002,8 @@ private void HandleStartStreamingPullWithoutBackoff()
{
SubscriptionAsSubscriptionName = _subscriptionName,
StreamAckDeadlineSeconds = _modifyDeadlineSeconds,
MaxOutstandingMessages = _flow.MaxOutstandingElementCount,
MaxOutstandingBytes = _flow.MaxOutstandingByteCount
MaxOutstandingMessages = _useLegacyFlowControl ? 0 : _flow.MaxOutstandingElementCount,
MaxOutstandingBytes = _useLegacyFlowControl ? 0 : _flow.MaxOutstandingByteCount
});
Add(initTask, Next(true, () => HandlePullMoveNext(initTask)));
}
Expand Down

0 comments on commit 0ce91bb

Please sign in to comment.