-
Notifications
You must be signed in to change notification settings - Fork 4.5k
/
ProcessorReceiveActions.cs
181 lines (166 loc) · 8.81 KB
/
ProcessorReceiveActions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Azure.Messaging.ServiceBus
{
/// <summary>
/// A set of receive-related actions that can be performed from the processor message callback.
/// </summary>
#pragma warning disable CA1001 // Does not need to be disposable as cancellationTokenSource will always be disposed
public class ProcessorReceiveActions
#pragma warning restore CA1001
{
private bool _callbackCompleted;
private readonly ReceiverManager _manager;
private readonly ServiceBusReceiver _receiver;
private readonly CancellationTokenSource _lockRenewalCancellationSource;
private readonly ConcurrentDictionary<Task, byte> _renewalTasks = new();
private readonly bool _autoRenew;
private readonly ProcessMessageEventArgs _processMessageEventArgs;
internal ConcurrentDictionary<ServiceBusReceivedMessage, byte> Messages { get; } = new();
/// <summary>
/// For mocking.
/// </summary>
protected ProcessorReceiveActions()
{
}
internal ProcessorReceiveActions(EventArgs args, ReceiverManager manager, bool autoRenewMessageLocks)
{
_manager = manager;
// manager would be null in scenarios where customers are using the public constructor of the event args for testing purposes.
_receiver = manager?.Receiver;
if (args is ProcessMessageEventArgs processMessageEventArgs)
{
_autoRenew = autoRenewMessageLocks;
_processMessageEventArgs = processMessageEventArgs;
Messages[processMessageEventArgs.Message] = default;
if (_autoRenew)
{
_lockRenewalCancellationSource = new CancellationTokenSource();
_renewalTasks[_manager.RenewMessageLockAsync(_processMessageEventArgs, processMessageEventArgs.Message, _lockRenewalCancellationSource)] = default;
}
}
else
{
Messages[((ProcessSessionMessageEventArgs)args).Message] = default;
}
}
/// <summary>
/// Receives a list of <see cref="ServiceBusReceivedMessage"/> from the entity using <see cref="ServiceBusReceiveMode"/> mode
/// configured in <see cref="ServiceBusProcessorOptions.ReceiveMode"/>, which defaults to PeekLock mode.
/// This method doesn't guarantee to return exact `maxMessages` messages, even if there are `maxMessages` messages available in the queue or topic.
/// Messages received using this method are subject to the behavior defined in the <see cref="ServiceBusProcessorOptions.AutoCompleteMessages"/>
/// and <see cref="ServiceBusProcessorOptions.MaxAutoLockRenewalDuration"/> properties.
/// </summary>
///
/// <param name="maxMessages">The maximum number of messages that will be received.</param>
/// <param name="maxWaitTime">An optional <see cref="TimeSpan"/> specifying the maximum time to wait for the first message before returning an empty list if no messages are available.
/// If not specified, the <see cref="ServiceBusRetryOptions.TryTimeout"/> will be used.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>List of messages received. Returns an empty list if no message is found.</returns>
public virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsync(
int maxMessages,
TimeSpan? maxWaitTime = default,
CancellationToken cancellationToken = default)
{
ValidateCallbackInScope();
IReadOnlyList<ServiceBusReceivedMessage> messages = await _receiver.ReceiveMessagesAsync(maxMessages, maxWaitTime, cancellationToken).ConfigureAwait(false);
return TrackMessagesAsReceived(messages);
}
/// <summary>
/// Receives a list of deferred <see cref="ServiceBusReceivedMessage"/> identified by <paramref name="sequenceNumbers"/>.
/// Messages received using this method are subject to the behavior defined in the <see cref="ServiceBusProcessorOptions.AutoCompleteMessages"/>
/// and <see cref="ServiceBusProcessorOptions.MaxAutoLockRenewalDuration"/> properties.
/// </summary>
///
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
/// <param name="sequenceNumbers">An <see cref="IEnumerable{T}"/> containing the sequence numbers to receive.</param>
///
/// <returns>Messages identified by sequence number are returned.
/// Throws if the messages have not been deferred.</returns>
/// <seealso cref="ProcessMessageEventArgs.DeferMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)"/>
/// <exception cref="ServiceBusException">
/// The specified sequence number does not correspond to a message that has been deferred.
/// The <see cref="ServiceBusException.Reason" /> will be set to <see cref="ServiceBusFailureReason.MessageNotFound"/> in this case.
/// </exception>
public virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsync(
IEnumerable<long> sequenceNumbers,
CancellationToken cancellationToken = default)
{
ValidateCallbackInScope();
IReadOnlyList<ServiceBusReceivedMessage> messages = await _receiver.ReceiveDeferredMessagesAsync(sequenceNumbers, cancellationToken).ConfigureAwait(false);
return TrackMessagesAsReceived(messages);
}
/// <inheritdoc cref="ServiceBusReceiver.PeekMessagesAsync"/>
public virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesAsync(
int maxMessages,
long? fromSequenceNumber = default,
CancellationToken cancellationToken = default)
{
ValidateCallbackInScope();
// Peeked messages are not locked so we don't need to track them for lock renewal or autocompletion, as these options do not apply.
return await _receiver.PeekMessagesAsync(
maxMessages: maxMessages,
fromSequenceNumber: fromSequenceNumber,
cancellationToken: cancellationToken).ConfigureAwait(false);
}
private IReadOnlyList<ServiceBusReceivedMessage> TrackMessagesAsReceived(IReadOnlyList<ServiceBusReceivedMessage> messages)
{
if (_autoRenew)
{
foreach (ServiceBusReceivedMessage message in messages)
{
Messages[message] = default;
// Currently only the trigger message supports cancellation token for LockedUntil.
_renewalTasks[_manager.RenewMessageLockAsync(_processMessageEventArgs, message, _lockRenewalCancellationSource)] = default;
}
}
else
{
foreach (ServiceBusReceivedMessage message in messages)
{
Messages[message] = default;
}
}
return messages;
}
internal void EndExecutionScope()
{
_callbackCompleted = true;
}
internal async Task CancelMessageLockRenewalAsync()
{
try
{
if (_lockRenewalCancellationSource != null)
{
_lockRenewalCancellationSource.Cancel();
_lockRenewalCancellationSource.Dispose();
await Task.WhenAll(_renewalTasks.Keys).ConfigureAwait(false);
}
}
catch (Exception ex) when (ex is TaskCanceledException)
{
// Nothing to do here. These exceptions are expected.
}
}
private void ValidateCallbackInScope()
{
if (Volatile.Read(ref _callbackCompleted))
{
if (_manager is SessionReceiverManager)
{
throw new InvalidOperationException(
"Messages cannot be received using the 'ProcessSessionMessageEventArgs' after the 'ProcessSessionMessageAsync' event handler has returned.");
}
throw new InvalidOperationException(
"Messages cannot be received using the 'ProcessMessageEventArgs' after the 'ProcessMessageAsync' event handler has returned.");
}
}
}
}