-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
MessageRetryConfigurationExtensions.cs
163 lines (140 loc) · 7.63 KB
/
MessageRetryConfigurationExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
namespace MassTransit
{
using System;
using System.Threading;
using Configuration;
using Middleware;
public static class MessageRetryConfigurationExtensions
{
/// <summary>
/// For all configured messages type (handlers, consumers, and sagas), configures message retry using the retry configuration specified.
/// Retry is configured once for each message type, and is added prior to the consumer factory or saga repository in the pipeline.
/// </summary>
/// <param name="configurator"></param>
/// <param name="configure"></param>
public static void UseMessageRetry(this IConsumePipeConfigurator configurator, Action<IRetryConfigurator> configure)
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
if (configure == null)
throw new ArgumentNullException(nameof(configure));
var observer = new MessageRetryConfigurationObserver(configurator, CancellationToken.None, configure);
}
/// <summary>
/// For all configured messages type (handlers, consumers, and sagas), configures message retry using the retry configuration specified.
/// Retry is configured once for each message type, and is added prior to the consumer factory or saga repository in the pipeline.
/// </summary>
/// <param name="configurator"></param>
/// <param name="connector">
/// The bus factory configurator, to connect the observer, to cancel retries if the bus is stopped
/// </param>
/// <param name="configure"></param>
public static void UseMessageRetry(this IConsumePipeConfigurator configurator, IBusFactoryConfigurator connector,
Action<IRetryConfigurator> configure)
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
if (configure == null)
throw new ArgumentNullException(nameof(configure));
var retryObserver = new RetryBusObserver();
connector.ConnectBusObserver(retryObserver);
var observer = new MessageRetryConfigurationObserver(configurator, retryObserver.Stopping, configure);
}
/// <summary>
/// Configures the message retry for the consumer consumer, regardless of message type.
/// </summary>
/// <param name="configurator"></param>
/// <param name="configure"></param>
public static void UseMessageRetry<TConsumer>(this IConsumerConfigurator<TConsumer> configurator, Action<IRetryConfigurator> configure)
where TConsumer : class
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
var observer = new MessageRetryConsumerConfigurationObserver<TConsumer>(configurator, CancellationToken.None, configure);
configurator.ConnectConsumerConfigurationObserver(observer);
}
/// <summary>
/// Configures the message retry for the consumer consumer, regardless of message type.
/// </summary>
/// <param name="configurator"></param>
/// <param name="busFactoryConfigurator">
/// The bus factory configurator, to connect the observer, to cancel retries if the bus is stopped
/// </param>
/// <param name="configure"></param>
public static void UseMessageRetry<TConsumer>(this IConsumerConfigurator<TConsumer> configurator, IBusFactoryConfigurator busFactoryConfigurator,
Action<IRetryConfigurator> configure)
where TConsumer : class
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
var retryObserver = new RetryBusObserver();
busFactoryConfigurator.ConnectBusObserver(retryObserver);
var observer = new MessageRetryConsumerConfigurationObserver<TConsumer>(configurator, retryObserver.Stopping, configure);
configurator.ConnectConsumerConfigurationObserver(observer);
}
/// <summary>
/// Configures the message retry for the consumer consumer, regardless of message type.
/// </summary>
/// <param name="configurator"></param>
/// <param name="configure"></param>
public static void UseMessageRetry<TSaga>(this ISagaConfigurator<TSaga> configurator, Action<IRetryConfigurator> configure)
where TSaga : class, ISaga
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
var observer = new MessageRetrySagaConfigurationObserver<TSaga>(configurator, CancellationToken.None, configure);
configurator.ConnectSagaConfigurationObserver(observer);
}
/// <summary>
/// Configures the message retry for the consumer consumer, regardless of message type.
/// </summary>
/// <param name="configurator"></param>
/// <param name="busFactoryConfigurator">
/// The bus factory configurator, to connect the observer, to cancel retries if the bus is stopped
/// </param>
/// <param name="configure"></param>
public static void UseMessageRetry<TSaga>(this ISagaConfigurator<TSaga> configurator, IBusFactoryConfigurator busFactoryConfigurator,
Action<IRetryConfigurator> configure)
where TSaga : class, ISaga
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
var retryObserver = new RetryBusObserver();
busFactoryConfigurator.ConnectBusObserver(retryObserver);
var observer = new MessageRetrySagaConfigurationObserver<TSaga>(configurator, retryObserver.Stopping, configure);
configurator.ConnectSagaConfigurationObserver(observer);
}
/// <summary>
/// Configures the message retry for the consumer consumer, regardless of message type.
/// </summary>
/// <param name="configurator"></param>
/// <param name="configure"></param>
public static void UseMessageRetry<TMessage>(this IHandlerConfigurator<TMessage> configurator, Action<IRetryConfigurator> configure)
where TMessage : class
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
var observer = new MessageRetryHandlerConfigurationObserver(CancellationToken.None, configure);
configurator.ConnectHandlerConfigurationObserver(observer);
}
/// <summary>
/// Configures the message retry for the consumer consumer, regardless of message type.
/// </summary>
/// <param name="configurator"></param>
/// <param name="busFactoryConfigurator">
/// The bus factory configurator, to connect the observer, to cancel retries if the bus is stopped
/// </param>
/// <param name="configure"></param>
public static void UseMessageRetry<TMessage>(this IHandlerConfigurator<TMessage> configurator, IBusFactoryConfigurator busFactoryConfigurator,
Action<IRetryConfigurator> configure)
where TMessage : class
{
if (configurator == null)
throw new ArgumentNullException(nameof(configurator));
var retryObserver = new RetryBusObserver();
busFactoryConfigurator.ConnectBusObserver(retryObserver);
var observer = new MessageRetryHandlerConfigurationObserver(retryObserver.Stopping, configure);
configurator.ConnectHandlerConfigurationObserver(observer);
}
}
}