Skip to content

Commit c06fcde

Browse files
Link cancellation tokens to disposal for prompt shutdown in MessageBus and Queue
- Add GetLinkedDisposableCancellationTokenSource to MessageBusBase so publish operations cancel when the bus is disposed mid-flight - Use linked tokens in QueueBase.DequeueAsync, GetDeadletterItemsAsync, and StartWorkingAsync for EnsureQueueCreatedAsync calls - Fix subscriber Action lambda to use its token parameter instead of a closure capture, preventing a subtle token mismatch bug - Add DisposedCancellationToken check in SendMessageToSubscribersAsync to avoid dispatching to subscribers after the bus is disposed - Pass cancellationToken to FolderFileStorage lock acquisitions so callers can cancel while waiting for the lock - Remove unused DelayedMessage nested class from MessageBusBase - Minor cleanups: TryGetValue, GetValueOrDefault, guard clause inversion Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 57a3303 commit c06fcde

5 files changed

Lines changed: 36 additions & 30 deletions

File tree

src/Foundatio/Messaging/InMemoryMessageBus.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
34
using System.Threading;
45
using System.Threading.Tasks;
56
using Foundatio.Utility;
@@ -23,12 +24,12 @@ public InMemoryMessageBus(Builder<InMemoryMessageBusOptionsBuilder, InMemoryMess
2324

2425
public long GetMessagesSent(Type messageType)
2526
{
26-
return _messageCounts.TryGetValue(GetMappedMessageType(messageType), out long count) ? count : 0;
27+
return _messageCounts.GetValueOrDefault(GetMappedMessageType(messageType), 0);
2728
}
2829

2930
public long GetMessagesSent<T>()
3031
{
31-
return _messageCounts.TryGetValue(GetMappedMessageType(typeof(T)), out long count) ? count : 0;
32+
return _messageCounts.GetValueOrDefault(GetMappedMessageType(typeof(T)), 0);
3233
}
3334

3435
public void ResetMessagesSent()

src/Foundatio/Messaging/MessageBusBase.cs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,17 @@ public MessageBusBase(TOptions options)
4949
/// </summary>
5050
protected CancellationToken DisposedCancellationToken => _disposedCancellationTokenSource.Token;
5151

52+
/// <summary>
53+
/// Creates a linked cancellation token source that combines the provided token with the disposal token.
54+
/// This allows operations to be cancelled by either the caller or when this instance is disposed.
55+
/// </summary>
56+
/// <param name="cancellationToken">The caller's cancellation token to link.</param>
57+
/// <returns>A new <see cref="CancellationTokenSource"/> that should be disposed by the caller.</returns>
58+
protected CancellationTokenSource GetLinkedDisposableCancellationTokenSource(CancellationToken cancellationToken)
59+
{
60+
return CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, DisposedCancellationToken);
61+
}
62+
5263
ILogger IHaveLogger.Logger => _logger;
5364
ILoggerFactory IHaveLoggerFactory.LoggerFactory => _loggerFactory;
5465
TimeProvider IHaveTimeProvider.TimeProvider => _timeProvider;
@@ -73,10 +84,11 @@ public async Task PublishAsync(Type messageType, object message, MessageOptions
7384
options.Properties.Add("TraceState", Activity.Current.TraceStateString);
7485
}
7586

87+
using var linkedCancellationTokenSource = GetLinkedDisposableCancellationTokenSource(cancellationToken);
7688
try
7789
{
78-
await EnsureTopicCreatedAsync(cancellationToken).AnyContext();
79-
await PublishImplAsync(GetMappedMessageType(messageType), message, options, cancellationToken).AnyContext();
90+
await EnsureTopicCreatedAsync(linkedCancellationTokenSource.Token).AnyContext();
91+
await PublishImplAsync(GetMappedMessageType(messageType), message, options, linkedCancellationTokenSource.Token).AnyContext();
8092
}
8193
catch (Exception ex) when (ex is not OperationCanceledException and not MessageBusException)
8294
{
@@ -105,8 +117,8 @@ protected virtual Type GetMappedMessageType(string messageType)
105117

106118
return _knownMessageTypesCache.GetOrAdd(messageType, type =>
107119
{
108-
if (_options.MessageTypeMappings != null && _options.MessageTypeMappings.ContainsKey(type))
109-
return _options.MessageTypeMappings[type];
120+
if (_options.MessageTypeMappings != null && _options.MessageTypeMappings.TryGetValue(type, out Type typeMapping))
121+
return typeMapping;
110122

111123
try
112124
{
@@ -126,7 +138,6 @@ protected virtual Type GetMappedMessageType(string messageType)
126138
catch (Exception ex)
127139
{
128140
_logger.LogError(ex, "Error getting message body type: {MessageType}", type);
129-
130141
return null;
131142
}
132143
}
@@ -145,7 +156,7 @@ protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> ha
145156
Action = (message, token) =>
146157
{
147158
if (message is T typedMessage)
148-
return handler(typedMessage, cancellationToken);
159+
return handler(typedMessage, token);
149160

150161
_logger.LogTrace("Unable to call subscriber action: {MessageType} cannot be safely casted to {SubscriberType}", message.GetType(), typeof(T));
151162
return Task.CompletedTask;
@@ -157,11 +168,11 @@ protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> ha
157168
cancellationToken.Register(() =>
158169
{
159170
_subscribers.TryRemove(subscriber.Id, out _);
160-
if (_subscribers.Count == 0)
161-
{
162-
_logger.LogDebug("Removing topic subscription for {MessageBusId}: No subscribers", MessageBusId);
163-
RemoveTopicSubscriptionAsync().GetAwaiter().GetResult();
164-
}
171+
if (_subscribers.Count > 0)
172+
return;
173+
174+
_logger.LogDebug("Removing topic subscription for {MessageBusId}: No subscribers", MessageBusId);
175+
RemoveTopicSubscriptionAsync().GetAwaiter().GetResult();
165176
});
166177
}
167178

@@ -265,7 +276,7 @@ protected async Task SendMessageToSubscribersAsync(IMessage message)
265276

266277
return Task.Run(async () =>
267278
{
268-
if (subscriber.CancellationToken.IsCancellationRequested)
279+
if (DisposedCancellationToken.IsCancellationRequested || subscriber.CancellationToken.IsCancellationRequested)
269280
{
270281
_logger.LogTrace("The cancelled subscriber action will not be called: {SubscriberId}", subscriber.Id);
271282
return;
@@ -422,14 +433,6 @@ public virtual void Dispose()
422433
_disposedCancellationTokenSource.Dispose();
423434
}
424435

425-
[DebuggerDisplay("MessageType: {MessageType} SendTime: {SendTime} Message: {Message}")]
426-
protected class DelayedMessage
427-
{
428-
public DateTime SendTime { get; set; }
429-
public Type MessageType { get; set; }
430-
public object Message { get; set; }
431-
}
432-
433436
[DebuggerDisplay("Id: {Id} Type: {Type} CancellationToken: {CancellationToken}")]
434437
protected class Subscriber
435438
{

src/Foundatio/Queues/QueueBase.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ public async Task<string> EnqueueAsync(T data, QueueEntryOptions options = null)
139139
protected abstract Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken linkedCancellationToken);
140140
public async Task<IQueueEntry<T>> DequeueAsync(CancellationToken cancellationToken)
141141
{
142-
await EnsureQueueCreatedAsync(DisposedCancellationToken).AnyContext();
142+
using var linkedCancellationTokenSource = GetLinkedDisposableCancellationTokenSource(cancellationToken);
143+
await EnsureQueueCreatedAsync(linkedCancellationTokenSource.Token).AnyContext();
143144

144145
LastDequeueActivity = _timeProvider.GetUtcNow();
145-
using var linkedCancellationTokenSource = GetLinkedDisposableCancellationTokenSource(cancellationToken);
146146
return await DequeueImplAsync(linkedCancellationTokenSource.Token).AnyContext();
147147
}
148148

@@ -161,8 +161,9 @@ public virtual async Task<IQueueEntry<T>> DequeueAsync(TimeSpan? timeout = null)
161161
protected abstract Task<IEnumerable<T>> GetDeadletterItemsImplAsync(CancellationToken cancellationToken);
162162
public async Task<IEnumerable<T>> GetDeadletterItemsAsync(CancellationToken cancellationToken = default)
163163
{
164-
await EnsureQueueCreatedAsync(DisposedCancellationToken).AnyContext();
165-
return await GetDeadletterItemsImplAsync(cancellationToken).AnyContext();
164+
using var linkedCancellationTokenSource = GetLinkedDisposableCancellationTokenSource(cancellationToken);
165+
await EnsureQueueCreatedAsync(linkedCancellationTokenSource.Token).AnyContext();
166+
return await GetDeadletterItemsImplAsync(linkedCancellationTokenSource.Token).AnyContext();
166167
}
167168

168169
protected abstract Task<QueueStats> GetQueueStatsImplAsync();
@@ -191,7 +192,8 @@ public async Task DeleteQueueAsync()
191192
protected abstract void StartWorkingImpl(Func<IQueueEntry<T>, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken);
192193
public async Task StartWorkingAsync(Func<IQueueEntry<T>, CancellationToken, Task> handler, bool autoComplete = false, CancellationToken cancellationToken = default)
193194
{
194-
await EnsureQueueCreatedAsync(DisposedCancellationToken).AnyContext();
195+
using var linkedCancellationTokenSource = GetLinkedDisposableCancellationTokenSource(cancellationToken);
196+
await EnsureQueueCreatedAsync(linkedCancellationTokenSource.Token).AnyContext();
195197
StartWorkingImpl(handler, autoComplete, cancellationToken);
196198
}
197199

src/Foundatio/Storage/FolderFileStorage.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public async Task<bool> RenameFileAsync(string path, string newPath, Cancellatio
151151

152152
try
153153
{
154-
using (await _lock.LockAsync().AnyContext())
154+
using (await _lock.LockAsync(cancellationToken).AnyContext())
155155
{
156156
string directory = Path.GetDirectoryName(normalizedNewPath);
157157
if (directory != null)
@@ -198,7 +198,7 @@ public async Task<bool> CopyFileAsync(string path, string targetPath, Cancellati
198198

199199
try
200200
{
201-
using (await _lock.LockAsync().AnyContext())
201+
using (await _lock.LockAsync(cancellationToken).AnyContext())
202202
{
203203
string directory = Path.GetDirectoryName(normalizedTargetPath);
204204
if (directory != null)

src/Foundatio/Utility/FoundatioDiagnostics.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,4 @@ public static Activity AddException(this Activity activity, Exception exception,
6969
return activity.AddEvent(new ActivityEvent(ExceptionEventName, timestamp, exceptionTags));
7070
}
7171
}
72-
#endif
72+
#endif

0 commit comments

Comments
 (0)