-
Notifications
You must be signed in to change notification settings - Fork 4.5k
/
ProcessMessageEventArgs.cs
288 lines (255 loc) · 13.4 KB
/
ProcessMessageEventArgs.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Threading;
using System.Threading.Tasks;
namespace Azure.Messaging.ServiceBus
{
/// <summary>
/// The <see cref="ProcessMessageEventArgs"/> contain event args that are specific
/// to the <see cref="ServiceBusReceivedMessage"/> that is being processed.
/// </summary>
public class ProcessMessageEventArgs : EventArgs
{
/// <summary>
/// The received message to be processed.
/// </summary>
public ServiceBusReceivedMessage Message { get; }
/// <summary>
/// The processor's <see cref="System.Threading.CancellationToken"/> instance which will be
/// cancelled when <see cref="ServiceBusProcessor.StopProcessingAsync"/> is called.
/// </summary>
public CancellationToken CancellationToken { get; }
/// <summary>
/// An event that is raised when the message lock is lost. This event is only raised for the scope of the Process Message handler,
/// and only for the message that is delivered to the handler - it is not raised for any additional messages received via the ProcessorReceiveActions.
/// Once the handler returns, the event will not be raised. There are two cases in which this event can be raised:
/// <list type="numbered">
/// <item>
/// <description>When the message lock has expired based on the <see cref="ServiceBusReceivedMessage.LockedUntil"/> property</description>
/// </item>
/// <item>
/// <description>When a non-transient exception occurs while attempting to renew the message lock.</description>
/// </item>
/// </list>
/// </summary>
public event Func<MessageLockLostEventArgs, Task> MessageLockLostAsync;
internal CancellationTokenSource MessageLockLostCancellationSource { get; }
/// <summary>
/// The <see cref="System.Threading.CancellationToken"/> instance is cancelled when the lock renewal failed to
/// renew the lock or the <see cref="ServiceBusProcessorOptions.MaxAutoLockRenewalDuration"/> has elapsed.
/// </summary>
/// <remarks>The cancellation token is triggered by comparing <see cref="ServiceBusReceivedMessage.LockedUntil"/>
/// against <see cref="DateTimeOffset.UtcNow"/> and might be subjected to clock drift.</remarks>
internal CancellationToken MessageLockCancellationToken { get; }
internal Exception LockLostException { get; set; }
/// <summary>
/// The path of the Service Bus entity that the message was received from.
/// </summary>
public string EntityPath => _receiver.EntityPath;
/// <summary>
/// The identifier of the processor that raised this event.
/// </summary>
public string Identifier { get; }
/// <summary>
/// The fully qualified Service Bus namespace that the message was received from.
/// </summary>
public string FullyQualifiedNamespace => _receiver.FullyQualifiedNamespace;
internal ConcurrentDictionary<ServiceBusReceivedMessage, byte> Messages => _receiveActions.Messages;
private readonly ServiceBusReceiver _receiver;
private readonly ProcessorReceiveActions _receiveActions;
/// <summary>
/// Initializes a new instance of the <see cref="ProcessMessageEventArgs"/> class.
/// </summary>
///
/// <param name="message">The message to be processed.</param>
/// <param name="receiver">The receiver instance that can be used to perform message settlement.</param>
/// <param name="cancellationToken">The processor's <see cref="System.Threading.CancellationToken"/> instance which will be cancelled
/// in the event that <see cref="ServiceBusProcessor.StopProcessingAsync"/> is called.
/// </param>
[EditorBrowsable(EditorBrowsableState.Never)]
public ProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, CancellationToken cancellationToken) :
this(message, manager: null, cancellationToken)
{
_receiver = receiver;
}
/// <summary>
/// Initializes a new instance of the <see cref="ProcessMessageEventArgs"/> class.
/// </summary>
///
/// <param name="message">The message to be processed.</param>
/// <param name="receiver">The receiver instance that can be used to perform message settlement.</param>
/// <param name="identifier">The identifier of the processor.</param>
/// <param name="cancellationToken">The processor's <see cref="System.Threading.CancellationToken"/> instance which will be cancelled
/// in the event that <see cref="ServiceBusProcessor.StopProcessingAsync"/> is called.
/// </param>
public ProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, string identifier, CancellationToken cancellationToken) :
this(message, manager: null, identifier, cancellationToken)
{
_receiver = receiver;
}
/// <summary>
/// Initializes a new instance of the <see cref="ProcessMessageEventArgs"/> class.
/// </summary>
///
/// <param name="message">The message to be processed.</param>
/// <param name="manager">The receiver manager for these event args.</param>
/// <param name="cancellationToken">The processor's <see cref="System.Threading.CancellationToken"/> instance which will be cancelled
/// in the event that <see cref="ServiceBusProcessor.StopProcessingAsync"/> is called.
/// </param>
internal ProcessMessageEventArgs(
ServiceBusReceivedMessage message,
ReceiverManager manager,
CancellationToken cancellationToken)
{
Message = message;
// manager would be null in scenarios where customers are using the public constructor for testing purposes.
_receiver = manager?.Receiver;
CancellationToken = cancellationToken;
MessageLockLostCancellationSource = new CancellationTokenSource();
MessageLockCancellationToken = MessageLockLostCancellationSource.Token;
MessageLockLostCancellationSource.CancelAfterLockExpired(Message);
bool autoRenew = manager?.ShouldAutoRenewMessageLock() == true;
_receiveActions = new ProcessorReceiveActions(this, manager, autoRenew);
}
/// <summary>
/// Initializes a new instance of the <see cref="ProcessMessageEventArgs"/> class.
/// </summary>
///
/// <param name="message">The message to be processed.</param>
/// <param name="manager">The receiver manager for these event args.</param>
/// <param name="identifier">The identifier of the processor.</param>
/// <param name="cancellationToken">The processor's <see cref="System.Threading.CancellationToken"/> instance which will be cancelled
/// in the event that <see cref="ServiceBusProcessor.StopProcessingAsync"/> is called.
/// </param>
internal ProcessMessageEventArgs(
ServiceBusReceivedMessage message,
ReceiverManager manager,
string identifier,
CancellationToken cancellationToken) : this(message, manager, cancellationToken)
{
Identifier = identifier;
}
/// <summary>
/// Invokes the message lock lost event handler after a message lock is lost.
/// This method can be overridden to raise an event manually for testing purposes.
/// </summary>
/// <param name="args">The event args containing information related to the lock lost event.</param>
protected internal virtual Task OnMessageLockLostAsync(MessageLockLostEventArgs args) => MessageLockLostAsync?.Invoke(args) ?? Task.CompletedTask;
///<inheritdoc cref="ServiceBusReceiver.AbandonMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)"/>
public virtual async Task AbandonMessageAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default)
{
await _receiver.AbandonMessageAsync(message, propertiesToModify, cancellationToken)
.ConfigureAwait(false);
message.IsSettled = true;
}
///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task CompleteMessageAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
await _receiver.CompleteMessageAsync(
message,
cancellationToken)
.ConfigureAwait(false);
message.IsSettled = true;
}
///<inheritdoc cref="ServiceBusReceiver.DeadLetterMessageAsync(ServiceBusReceivedMessage, string, string, CancellationToken)"/>
public virtual async Task DeadLetterMessageAsync(
ServiceBusReceivedMessage message,
string deadLetterReason,
string deadLetterErrorDescription = default,
CancellationToken cancellationToken = default)
{
await _receiver.DeadLetterMessageAsync(
message,
deadLetterReason,
deadLetterErrorDescription,
cancellationToken)
.ConfigureAwait(false);
message.IsSettled = true;
}
///<inheritdoc cref="ServiceBusReceiver.DeadLetterMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)"/>
public virtual async Task DeadLetterMessageAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default)
{
await _receiver.DeadLetterMessageAsync(
message,
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
message.IsSettled = true;
}
///<inheritdoc cref="ServiceBusReceiver.DeadLetterMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, string, string, CancellationToken)"/>
public virtual async Task DeadLetterMessageAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object> propertiesToModify,
string deadLetterReason,
string deadLetterErrorDescription = default,
CancellationToken cancellationToken = default)
{
await _receiver.DeadLetterMessageAsync(
message,
propertiesToModify,
deadLetterReason: deadLetterReason,
deadLetterErrorDescription: deadLetterErrorDescription,
cancellationToken)
.ConfigureAwait(false);
message.IsSettled = true;
}
///<inheritdoc cref="ServiceBusReceiver.DeferMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)"/>
public virtual async Task DeferMessageAsync(
ServiceBusReceivedMessage message,
IDictionary<string, object> propertiesToModify = default,
CancellationToken cancellationToken = default)
{
await _receiver.DeferMessageAsync(
message,
propertiesToModify,
cancellationToken)
.ConfigureAwait(false);
message.IsSettled = true;
}
///<inheritdoc cref="ServiceBusReceiver.RenewMessageLockAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task RenewMessageLockAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
await _receiver.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false);
// Currently only the trigger message supports cancellation token for LockedUntil.
if (message == Message)
{
MessageLockLostCancellationSource.CancelAfterLockExpired(Message);
}
}
/// <summary>
/// Gets a <see cref="ProcessorReceiveActions"/> instance which enables receiving additional messages within the scope of the current event.
/// </summary>
public virtual ProcessorReceiveActions GetReceiveActions() => _receiveActions;
internal void EndExecutionScope() => _receiveActions.EndExecutionScope();
internal async Task CancelMessageLockRenewalAsync()
{
try
{
await _receiveActions.CancelMessageLockRenewalAsync().ConfigureAwait(false);
}
finally
{
MessageLockLostCancellationSource.Dispose();
}
}
internal CancellationTokenRegistration RegisterMessageLockLostHandler() =>
MessageLockCancellationToken.Register(
() => OnMessageLockLostAsync(new MessageLockLostEventArgs(
Message,
LockLostException)));
}
}