Skip to content

Commit

Permalink
feat: Improve Pub/Sub streaming pull retries
Browse files Browse the repository at this point in the history
- Don't back off before restarting a streaming pull if the failure is an expected server disconnect
- Handle auth-related failures separately to other unavailable/internal errors
  - We may want functionality like this across the board eventually, but this is a starting point.
  • Loading branch information
jskeet committed May 30, 2024
1 parent b41a7cf commit 8537ed4
Showing 1 changed file with 93 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

using Google.Api.Gax;
using Google.Api.Gax.Grpc;
using Google.Apis.Auth.OAuth2.Responses;
using Google.Cloud.PubSub.V1.Tasks;
using Grpc.Core;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -109,8 +110,7 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null)
internal RetryInfo WithBackoff(TimeSpan? backoff) => new RetryInfo(FirstTimeOfFailureInUtc, backoff);
}


private static readonly RetrySettings s_pullBackoff = RetrySettings.FromExponentialBackoff(
private static readonly RetrySettings s_defaultPullRetryTiming = RetrySettings.FromExponentialBackoff(
maxAttempts: int.MaxValue,
initialBackoff: TimeSpan.FromSeconds(0.5),
maxBackoff: TimeSpan.FromSeconds(30),
Expand Down Expand Up @@ -209,7 +209,7 @@ internal SingleChannel(SubscriberClientImpl subscriber,
_eventReceiptModAckForExactlyOnceDelivery = new AsyncAutoResetEvent(subscriber._taskHelper);
_continuationQueue = new AsyncSingleRecvQueue<TaskNextAction>(subscriber._taskHelper);
_logger = subscriber.Logger;
_retryState = new RetryState(clock, s_pullBackoff, TimeSpan.FromSeconds(0.5));
_retryState = new RetryState(clock, _logger, s_defaultPullRetryTiming, TimeSpan.FromSeconds(0.5));
}

internal async Task StartAsync()
Expand Down Expand Up @@ -249,6 +249,7 @@ internal async Task StartAsync()
next.Action();
}
}
_logger?.LogDebug("Subscriber task completed.");
// Stop waiting for data to push.
_pushStopCts.Cancel();
}
Expand Down Expand Up @@ -308,7 +309,7 @@ private void StopStreamingPull()
// If backoff is non-zero delay before opening streaming-pull.
private void StartStreamingPull()
{
if (_retryState.Backoff is TimeSpan backoff)
if (_retryState.Backoff is TimeSpan backoff && backoff.Ticks != 0)
{
// Delay, then start the streaming-pull.
_logger?.LogDebug("Client {index} delaying for {seconds}s before streaming pull call.", _clientIndex, (int) backoff.TotalSeconds);
Expand All @@ -325,6 +326,7 @@ private void StartStreamingPull()
// Backoff delay (if present) has already been done; no need to delay here.
private void HandleStartStreamingPullWithoutBackoff()
{
_retryState.OnStartAttempt();
_pull = _client.StreamingPull(CallSettings.FromCancellationToken(_softStopCts.Token));
// Cancellation not needed in this WriteAsync call. The StreamingPull() cancellation
// (above) will cause this call to cancel if _softStopCts is cancelled.
Expand All @@ -343,7 +345,7 @@ private void HandleStartStreamingPullWithoutBackoff()
/// </summary>
private void RestartPullOrThrow(Exception e)
{
if (_retryState.ShouldRetry(e))
if (_retryState.RecordFailureAndCheckForRetry(e))
{
RestartPullAfterRetriableFailure(e);
}
Expand All @@ -356,14 +358,12 @@ private void RestartPullOrThrow(Exception e)
}

/// <summary>
/// Restarts the streaming pull after an exception which is expected to already be retriable.
/// Restarts the streaming pull after an exception which has been confirmed to be retriable.
/// (This isn't rechecked.)
/// </summary>
private void RestartPullAfterRetriableFailure(Exception e)
{
_logger?.LogDebug(e, "Recoverable error in streaming pull for client {index}; will retry.", _clientIndex);
// Update the retry state, increasing the backoff etc.
_retryState.OnFailure(e);
StopStreamingPull();
StartStreamingPull();
}
Expand Down Expand Up @@ -423,7 +423,7 @@ private void HandlePullMessageData(Task<bool> moveNextTask)
_exactlyOnceDeliveryEnabled = current.SubscriptionProperties?.ExactlyOnceDeliveryEnabled ?? false;
_messageOrderingEnabled = current.SubscriptionProperties?.MessageOrderingEnabled ?? false;
}
catch (Exception e) when (_retryState.ShouldRetry(e))
catch (Exception e) when (_retryState.RecordFailureAndCheckForRetry(e))
{
RestartPullAfterRetriableFailure(e);
return;
Expand Down Expand Up @@ -1109,61 +1109,121 @@ private void UpdateReceiptModAckStatus(IEnumerable<string> ids, bool? status)

/// <summary>
/// Maintains the state of retry operations.
/// Currently this is only used for pull operations, but it could be used for publishing later.
/// Currently this only uses the existing "IsRecoverable" exception check; in the future we're
/// likely to expose the rest of this state for more nuanced decisions.
/// Currently this is only used for pull operations, and has hard-coded pull-specific behavior,
/// but it could be used for publishing later once we have clearer requirements.
/// </summary>
private class RetryState
{
/// <summary>
/// If a streaming pull only reports an error after this threshold, then we assume it was actually
/// successful, so we don't need to back off or repeat anything.
/// </summary>
private static readonly TimeSpan s_streamingPullSuccessThreshold = TimeSpan.FromSeconds(45);
private const string GrpcCoreAuthExceptionPrefix = "Getting metadata from plugin failed with error: Exception occurred in metadata credentials plugin. ";
private const int MaxAuthExceptionsBeforeFailing = 4;

private const int ExceptionLimit = 100;

private readonly IClock _clock;
// Only used for timing.
private readonly RetrySettings _retrySettings;
private readonly ILogger _logger;
private readonly RetrySettings _backoffTiming;
private readonly TimeSpan _disconnectBackoff;

private DateTime _currentStartTimestamp;
private readonly List<RpcException> _exceptions;
private DateTime? _firstExceptionTimestamp;
public TimeSpan? Backoff { get; private set; }
internal TimeSpan? Backoff { get; private set; }

public RetryState(IClock clock, RetrySettings retrySettings, TimeSpan disconnectBackoff)
internal RetryState(IClock clock, ILogger logger, RetrySettings backoffTiming, TimeSpan disconnectBackoff)
{
_clock = clock;
_exceptions = new List<RpcException>();
_firstExceptionTimestamp = null;
_retrySettings = retrySettings;
_logger = logger;
_backoffTiming = backoffTiming;
_disconnectBackoff = disconnectBackoff;
_exceptions = new();
_firstExceptionTimestamp = null;
Backoff = null;
}

/// <summary>
/// Checks whether the given exception should be retried, without updating any state.
/// </summary>
public bool ShouldRetry(Exception exception) =>
exception?.As<RpcException>()?.IsRecoverable() ?? false;

/// <summary>
/// Records the given exception in the retry state, updating the backoff accordingly.
/// Checks whether the given exception should be retried, updating the state including the next backoff.
/// </summary>
public void OnFailure(Exception exception)
internal bool RecordFailureAndCheckForRetry(Exception exception)
{
_firstExceptionTimestamp ??= _clock.GetCurrentDateTimeUtc();
if (exception?.As<RpcException>() is RpcException rpcException && _exceptions.Count < 100)
var now = _clock.GetCurrentDateTimeUtc();
_firstExceptionTimestamp ??= now;
// This won't be used if we decide not to retry anyway.
Backoff = _backoffTiming.NextBackoff(Backoff ?? TimeSpan.Zero);

// Don't retry any non-RpcExceptions
if (exception.As<RpcException>() is not RpcException rpcEx)
{
return false;
}
if (_exceptions.Count < ExceptionLimit)
{
_exceptions.Add(rpcException);
_exceptions.Add(rpcEx);
}
Backoff = _retrySettings.NextBackoff(Backoff ?? TimeSpan.Zero);

var code = rpcEx.StatusCode;

// If the server has just dropped the stream, but it's after a sufficiently long time, we can
// deem that to be successful and retry with no backoff.
if (code == StatusCode.Unavailable && (now - _currentStartTimestamp) >= s_streamingPullSuccessThreshold)
{
_logger?.LogDebug("Pull stream terminated with no messages, but after success assumption threshold time. Retrying with no backoff.");
OnSuccess();
return true;
}

// If the exception isn't generally recoverable, don't retry.
if (!rpcEx.IsRecoverable())
{
return false;
}

// If the exception was a failure due to auth, and we've seen some before, don't retry.
if (IsAuthException(rpcEx))
{
var count = _exceptions.Count(IsAuthException);
bool retry = count <= MaxAuthExceptionsBeforeFailing;
if (!retry)
{
_logger?.LogWarning("Failing pull request due to auth-based failures.");
}
return retry;
}

// We can potentially add more subtle checks here.

// Looks like we can retry.
return true;

// Indicates if an exception was due to a problem getting credentials, rather than a problem
// with the call itself. We only retry these a limited number of times.
static bool IsAuthException(RpcException ex) =>
// Grpc.Net.Client behavior
ex.StatusCode == StatusCode.Internal && ex.InnerException is TokenResponseException ||
// Grpc.Core behavior
ex.StatusCode == StatusCode.Unavailable && (ex.Status.Detail?.StartsWith(GrpcCoreAuthExceptionPrefix, StringComparison.Ordinal) ?? false);
}

/// <summary>
/// Records the time of the start of an operation.
/// </summary>
internal void OnStartAttempt() => _currentStartTimestamp = _clock.GetCurrentDateTimeUtc();

/// <summary>
/// Records that an operation succeeded.
/// </summary>
public void OnSuccess()
internal void OnSuccess()
{
_firstExceptionTimestamp = null;
_exceptions.Clear();
Backoff = null;
}

public void OnServerDisconnect()
internal void OnServerDisconnect()
{
// Ignore previous exceptions.
OnSuccess();
Expand Down

0 comments on commit 8537ed4

Please sign in to comment.