Skip to content
Permalink
Browse files

Fixed configuration options of FailedThresholdCallback could not be i…

…nvoke when the value less then three. (#161)
  • Loading branch information...
yang-xiaodong committed Jul 17, 2018
1 parent 2e491a6 commit 7089b1ccded6851045c3ebf5d9fb0a6f1235111f
Showing with 79 additions and 37 deletions.
  1. +45 −25 src/DotNetCore.CAP/IPublishMessageSender.Base.cs
  2. +34 −12 src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
@@ -48,18 +48,19 @@ public async Task<OperateResult> SendAsync(CapPublishedMessage message)
OperateResult result;
do
{
result = await SendWithoutRetryAsync(message);
var executedResult = await SendWithoutRetryAsync(message);
result = executedResult.Item2;
if (result == OperateResult.Success)
{
return result;
}
retry = UpdateMessageForRetry(message);
retry = executedResult.Item1;
} while (retry);

return result;
}

private async Task<OperateResult> SendWithoutRetryAsync(CapPublishedMessage message)
private async Task<(bool, OperateResult)> SendWithoutRetryAsync(CapPublishedMessage message)
{
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
@@ -80,52 +81,71 @@ private async Task<OperateResult> SendWithoutRetryAsync(CapPublishedMessage mess

TracingAfter(operationId, message.Name, sendValues, startTime, stopwatch.Elapsed);

return OperateResult.Success;
return (false, OperateResult.Success);
}
else
{
TracingError(operationId, message, result, startTime, stopwatch.Elapsed);

await SetFailedState(message, result.Exception);

return OperateResult.Failed(result.Exception);
var needRetry = await SetFailedState(message, result.Exception);
return (needRetry, OperateResult.Failed(result.Exception));
}
}

private bool UpdateMessageForRetry(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
var retries = ++message.Retries;
if (retries >= retryBehavior.RetryCount)
{
return false;
}

_logger.SenderRetrying(message.Id, retries);

var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;

return true;
}

private Task SetSuccessfulState(CapPublishedMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);
return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}

private Task SetFailedState(CapPublishedMessage message, Exception ex)
private async Task<bool> SetFailedState(CapPublishedMessage message, Exception ex)
{
AddErrorReasonToContent(message, ex);
return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);

var needRetry = UpdateMessageForRetry(message);

await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);

return needRetry;
}

private static void AddErrorReasonToContent(CapPublishedMessage message, Exception exception)
{
message.Content = Helper.AddExceptionProperty(message.Content, exception);
}

private bool UpdateMessageForRetry(CapPublishedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries;
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));

var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
if (retries == _options.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);

_logger.SenderAfterThreshold(message.Id, _options.FailedRetryCount);
}
catch (Exception ex)
{
_logger.ExecutedThresholdCallbackFailed(ex);
}
}
return false;
}

_logger.SenderRetrying(message.Id, retries);

return true;
}

private (Guid, TracingHeaders) TracingBefore(string topic, string values)
{
Guid operationId = Guid.NewGuid();
@@ -55,18 +55,24 @@ public async Task<OperateResult> ExecuteAsync(CapReceivedMessage message)
OperateResult result;
do
{
result = await ExecuteWithoutRetryAsync(message);
var executedResult = await ExecuteWithoutRetryAsync(message);
result = executedResult.Item2;
if (result == OperateResult.Success)
{
return result;
}
retry = UpdateMessageForRetry(message);
retry = executedResult.Item1;
} while (retry);

return result;
}

private async Task<OperateResult> ExecuteWithoutRetryAsync(CapReceivedMessage message)
/// <summary>
/// Execute message consumption once.
/// </summary>
/// <param name="message">the message rececived of <see cref="CapReceivedMessage"/></param>
/// <returns>Item1 is need still restry, Item2 is executed result.</returns>
private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(CapReceivedMessage message)
{
if (message == null)
{
@@ -85,26 +91,23 @@ private async Task<OperateResult> ExecuteWithoutRetryAsync(CapReceivedMessage me

_logger.ConsumerExecuted(sp.Elapsed.TotalSeconds);

return OperateResult.Success;
return (false, OperateResult.Success);
}
catch (Exception ex)
{
_logger.LogError(ex, $"An exception occurred while executing the subscription method. Topic:{message.Name}, Id:{message.Id}");

await SetFailedState(message, ex);

return OperateResult.Failed(ex);
return (await SetFailedState(message, ex), OperateResult.Failed(ex));
}
}

private Task SetSuccessfulState(CapReceivedMessage message)
{
var succeededState = new SucceededState(_options.SucceedMessageExpiredAfter);

return _stateChanger.ChangeStateAsync(message, succeededState, _connection);
}

private Task SetFailedState(CapReceivedMessage message, Exception ex)
private async Task<bool> SetFailedState(CapReceivedMessage message, Exception ex)
{
if (ex is SubscriberNotFoundException)
{
@@ -113,22 +116,41 @@ private Task SetFailedState(CapReceivedMessage message, Exception ex)

AddErrorReasonToContent(message, ex);

return _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);
var needRetry = UpdateMessageForRetry(message);

await _stateChanger.ChangeStateAsync(message, new FailedState(), _connection);

return needRetry;
}

private bool UpdateMessageForRetry(CapReceivedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;

var retries = ++message.Retries;
message.ExpiresAt = message.Added.AddSeconds(retryBehavior.RetryIn(retries));

var retryCount = Math.Min(_options.FailedRetryCount, retryBehavior.RetryCount);
if (retries >= retryCount)
{
if (retries == _options.FailedRetryCount)
{
try
{
_options.FailedThresholdCallback?.Invoke(MessageType.Subscribe, message.Name, message.Content);

_logger.ConsumerExecutedAfterThreshold(message.Id, _options.FailedRetryCount);
}
catch (Exception ex)
{
_logger.ExecutedThresholdCallbackFailed(ex);
}
}
return false;
}

_logger.ConsumerExecutionRetrying(message.Id, retries);
var due = message.Added.AddSeconds(retryBehavior.RetryIn(retries));
message.ExpiresAt = due;

return true;
}

0 comments on commit 7089b1c

Please sign in to comment.
You can’t perform that action at this time.