/
MoveMessageErrorPolicy.cs
166 lines (139 loc) · 6.65 KB
/
MoveMessageErrorPolicy.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Copyright (c) 2020 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Silverback.Diagnostics;
using Silverback.Messaging.Broker;
using Silverback.Messaging.Broker.Behaviors;
using Silverback.Messaging.Messages;
using Silverback.Messaging.Outbound.Enrichers;
using Silverback.Util;
namespace Silverback.Messaging.Inbound.ErrorHandling
{
/// <summary>
/// This policy moves the message that failed to be processed to the configured endpoint.
/// </summary>
/// <remarks>
/// This policy can be used also to move the message at the end of the current topic to retry it later on.
/// The number of retries can be limited using <see cref="RetryableErrorPolicyBase.MaxFailedAttempts" />.
/// </remarks>
public class MoveMessageErrorPolicy : RetryableErrorPolicyBase
{
private Action<IOutboundEnvelope, Exception>? _transformationAction;
/// <summary>
/// Initializes a new instance of the <see cref="MoveMessageErrorPolicy" /> class.
/// </summary>
/// <param name="endpoint">
/// The endpoint to move the message to.
/// </param>
public MoveMessageErrorPolicy(IProducerEndpoint endpoint)
{
Check.NotNull(endpoint, nameof(endpoint));
endpoint.Validate();
Endpoint = endpoint;
}
internal IProducerEndpoint Endpoint { get; }
/// <summary>
/// Defines an <see cref="Action{T}" /> to be called to modify (or completely rewrite) the message being
/// moved.
/// </summary>
/// <param name="transformationAction">
/// The <see cref="Action{T}" /> to be called to modify the message. This function can be used to modify
/// or replace the message body and its headers.
/// </param>
/// <returns>
/// The <see cref="MoveMessageErrorPolicy" /> so that additional calls can be chained.
/// </returns>
public MoveMessageErrorPolicy Transform(Action<IOutboundEnvelope, Exception> transformationAction)
{
_transformationAction = transformationAction;
return this;
}
/// <inheritdoc cref="ErrorPolicyBase.BuildCore" />
protected override ErrorPolicyImplementation BuildCore(IServiceProvider serviceProvider) =>
new MoveMessageErrorPolicyImplementation(
Endpoint,
_transformationAction,
MaxFailedAttemptsCount,
ExcludedExceptions,
IncludedExceptions,
ApplyRule,
MessageToPublishFactory,
serviceProvider
.GetRequiredService<IBrokerOutboundMessageEnrichersFactory>(),
serviceProvider,
serviceProvider
.GetRequiredService<IInboundLogger<MoveMessageErrorPolicy>>());
private sealed class MoveMessageErrorPolicyImplementation : ErrorPolicyImplementation
{
private readonly IProducerEndpoint _endpoint;
private readonly Action<IOutboundEnvelope, Exception>? _transformationAction;
private readonly IInboundLogger<MoveMessageErrorPolicy> _logger;
private readonly IBrokerOutboundMessageEnrichersFactory _enricherFactory;
private readonly IProducer _producer;
public MoveMessageErrorPolicyImplementation(
IProducerEndpoint endpoint,
Action<IOutboundEnvelope, Exception>? transformationAction,
int? maxFailedAttempts,
ICollection<Type> excludedExceptions,
ICollection<Type> includedExceptions,
Func<IRawInboundEnvelope, Exception, bool>? applyRule,
Func<IRawInboundEnvelope, Exception, object?>? messageToPublishFactory,
IBrokerOutboundMessageEnrichersFactory enricherFactory,
IServiceProvider serviceProvider,
IInboundLogger<MoveMessageErrorPolicy> logger)
: base(
maxFailedAttempts,
excludedExceptions,
includedExceptions,
applyRule,
messageToPublishFactory,
serviceProvider,
logger)
{
_endpoint = Check.NotNull(endpoint, nameof(endpoint));
_transformationAction = transformationAction;
_enricherFactory = enricherFactory;
_logger = logger;
_producer = serviceProvider.GetRequiredService<IBrokerCollection>().GetProducer(endpoint);
}
public override bool CanHandle(ConsumerPipelineContext context, Exception exception)
{
Check.NotNull(context, nameof(context));
if (context.Sequence != null)
{
_logger.LogCannotMoveSequences(context.Envelope, context.Sequence);
return false;
}
return base.CanHandle(context, exception);
}
protected override async Task<bool> ApplyPolicyAsync(
ConsumerPipelineContext context,
Exception exception)
{
Check.NotNull(context, nameof(context));
Check.NotNull(exception, nameof(exception));
_logger.LogMoved(context.Envelope, _endpoint);
await PublishToNewEndpointAsync(context.Envelope, exception).ConfigureAwait(false);
await context.TransactionManager.RollbackAsync(exception, true).ConfigureAwait(false);
return true;
}
private async Task PublishToNewEndpointAsync(IRawInboundEnvelope envelope, Exception exception)
{
var outboundEnvelope =
envelope is IInboundEnvelope deserializedEnvelope
? new OutboundEnvelope(
deserializedEnvelope.Message,
deserializedEnvelope.Headers,
_endpoint)
: new OutboundEnvelope(envelope.RawMessage, envelope.Headers, _endpoint);
var enricher = _enricherFactory.GetMovePolicyEnricher(envelope.Endpoint);
enricher.Enrich(envelope, outboundEnvelope, exception);
_transformationAction?.Invoke(outboundEnvelope, exception);
await _producer.ProduceAsync(outboundEnvelope).ConfigureAwait(false);
}
}
}
}