-
Notifications
You must be signed in to change notification settings - Fork 36
/
SkipMessageErrorPolicy.cs
89 lines (79 loc) · 3.31 KB
/
SkipMessageErrorPolicy.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Copyright (c) 2020 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Silverback.Diagnostics;
using Silverback.Messaging.Broker.Behaviors;
using Silverback.Messaging.Messages;
using Silverback.Util;
namespace Silverback.Messaging.Inbound.ErrorHandling
{
/// <summary>
/// This policy skips the message that failed to be processed.
/// </summary>
public class SkipMessageErrorPolicy : ErrorPolicyBase
{
/// <inheritdoc cref="ErrorPolicyBase.BuildCore" />
protected override ErrorPolicyImplementation BuildCore(IServiceProvider serviceProvider) =>
new SkipMessageErrorPolicyImplementation(
MaxFailedAttemptsCount,
ExcludedExceptions,
IncludedExceptions,
ApplyRule,
MessageToPublishFactory,
serviceProvider,
serviceProvider.GetRequiredService<IInboundLogger<SkipMessageErrorPolicy>>());
private sealed class SkipMessageErrorPolicyImplementation : ErrorPolicyImplementation
{
private readonly IInboundLogger<SkipMessageErrorPolicy> _logger;
public SkipMessageErrorPolicyImplementation(
int? maxFailedAttempts,
ICollection<Type> excludedExceptions,
ICollection<Type> includedExceptions,
Func<IRawInboundEnvelope, Exception, bool>? applyRule,
Func<IRawInboundEnvelope, Exception, object?>? messageToPublishFactory,
IServiceProvider serviceProvider,
IInboundLogger<SkipMessageErrorPolicy> logger)
: base(
maxFailedAttempts,
excludedExceptions,
includedExceptions,
applyRule,
messageToPublishFactory,
serviceProvider,
logger)
{
_logger = logger;
}
protected override async Task<bool> ApplyPolicyAsync(
ConsumerPipelineContext context,
Exception exception)
{
Check.NotNull(context, nameof(context));
Check.NotNull(exception, nameof(exception));
_logger.LogSkipped(context.Envelope);
if (!await TryRollbackAsync(context, exception).ConfigureAwait(false))
await context.Consumer.TriggerReconnectAsync().ConfigureAwait(false);
return true;
}
[SuppressMessage("", "CA1031", Justification = Justifications.ExceptionLogged)]
private async Task<bool> TryRollbackAsync(ConsumerPipelineContext context, Exception exception)
{
try
{
await context.TransactionManager.RollbackAsync(exception, true)
.ConfigureAwait(false);
return true;
}
catch (Exception ex)
{
_logger.LogRollbackToSkipFailed(context.Envelope, ex);
return false;
}
}
}
}
}