Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,28 @@ await Task.WhenAll(Enumerable.Range(0, _options.SubscriberParallelExecuteThreadC
await foreach (var nextMessage in _schedulerQueue.GetConsumingEnumerable(_tasksCts.Token))
{
_tasksCts.Token.ThrowIfCancellationRequested();
try

if (_enableParallelSend && nextMessage.Retries == 0)
{
var result = await _sender.SendAsync(nextMessage).ConfigureAwait(false);
if (!result.Succeeded)
{
_logger.LogError("Delay message sending failed. MessageId: {MessageId} ", nextMessage.DbId);
}
if (!_publishedChannel.Writer.TryWrite(nextMessage))
while (await _publishedChannel.Writer.WaitToWriteAsync(_tasksCts!.Token).ConfigureAwait(false))
if (_publishedChannel.Writer.TryWrite(nextMessage))
break;
}
catch (Exception ex)
else
{
_logger.LogError(ex, "Error sending scheduled message. MessageId: {MessageId}", nextMessage.DbId);
try
{
var result = await _sender.SendAsync(nextMessage).ConfigureAwait(false);
if (!result.Succeeded)
{
_logger.LogError("Delay message sending failed. MessageId: {MessageId} ", nextMessage.DbId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error sending scheduled message. MessageId: {MessageId}", nextMessage.DbId);
}
}
}

Expand Down
86 changes: 78 additions & 8 deletions test/DotNetCore.CAP.Test/DispatcherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

namespace DotNetCore.CAP.Test;

public class DispatcherTests
public class DispatcherTests
{
private readonly ILogger<Dispatcher> _logger;
private readonly ISubscribeExecutor _executor;
Expand All @@ -41,22 +41,22 @@ public async Task EnqueueToPublish_ShouldInvokeSend_WhenParallelSendDisabled()
SubscriberParallelExecuteThreadCount = 2,
SubscriberParallelExecuteBufferFactor = 2
});

var dispatcher = new Dispatcher(_logger, sender, options, _executor, _storage);

using var cts = new CancellationTokenSource();
var messageId = "testId";

// Act
await dispatcher.Start(cts.Token);
await dispatcher.EnqueueToPublish(CreateTestMessage(messageId));
await cts.CancelAsync();

// Assert
sender.Count.Should().Be(1);
sender.ReceivedMessages.First().DbId.Should().Be(messageId);
}

[Fact]
public async Task EnqueueToPublish_ShouldBeThreadSafe_WhenParallelSendDisabled()
{
Expand Down Expand Up @@ -150,18 +150,88 @@ public async Task EnqueueToScheduler_ShouldSendMessagesInCorrectOrder_WhenEarlie
// Act
await dispatcher.Start(cts.Token);
var dateTime = DateTime.Now;

await dispatcher.EnqueueToScheduler(messages[0], dateTime.AddSeconds(1));
await dispatcher.EnqueueToScheduler(messages[1], dateTime.AddMilliseconds(200));
await dispatcher.EnqueueToScheduler(messages[2], dateTime.AddMilliseconds(100));

await Task.Delay(1200, CancellationToken.None);
await cts.CancelAsync();


// Assert
sender.ReceivedMessages.Select(m => m.DbId).Should().Equal(["3", "2", "1"]);
}

[Fact]
public async Task EnqueueToScheduler_ShouldBeThreadSafe_WhenDelayLessThenMinuteAndParallelSendEnabled()
{
// Arrange
var sender = new TestThreadSafeMessageSender();
var options = Options.Create(new CapOptions
{
EnableSubscriberParallelExecute = false,
EnablePublishParallelSend = true,
SubscriberParallelExecuteThreadCount = 2,
SubscriberParallelExecuteBufferFactor = 2
});
var dispatcher = new Dispatcher(_logger, sender, options, _executor, _storage);

using var cts = new CancellationTokenSource();
var messages = Enumerable.Range(1, 10000)
.Select(i => CreateTestMessage(i.ToString()))
.ToArray();

// Act
await dispatcher.Start(cts.Token);
var dateTime = DateTime.Now.AddMilliseconds(50);
await Parallel.ForEachAsync(messages, CancellationToken.None,
async (m, ct) => { await dispatcher.EnqueueToScheduler(m, dateTime); });

await Task.Delay(3000, CancellationToken.None);

await cts.CancelAsync();

// Assert
sender.Count.Should().Be(10000);

var receivedMessages = sender.ReceivedMessages.Select(m => m.DbId).Order().ToList();
var expected = messages.Select(m => m.DbId).Order().ToList();
expected.Should().Equal(receivedMessages);
}

[Fact]
public async Task EnqueueToScheduler_ShouldSendMessagesInCorrectOrder_WhenParallelSendEnabled()
{
// Arrange
var sender = new TestThreadSafeMessageSender();
var options = Options.Create(new CapOptions
{
EnableSubscriberParallelExecute = true,
EnablePublishParallelSend = true,
SubscriberParallelExecuteThreadCount = 2,
SubscriberParallelExecuteBufferFactor = 2,
});
var dispatcher = new Dispatcher(_logger, sender, options, _executor, _storage);

using var cts = new CancellationTokenSource();
var messages = Enumerable.Range(1, 3)
.Select(i => CreateTestMessage(i.ToString()))
.ToArray();

// Act
await dispatcher.Start(cts.Token);
var dateTime = DateTime.Now;

await dispatcher.EnqueueToScheduler(messages[0], dateTime.AddSeconds(1));
await dispatcher.EnqueueToScheduler(messages[1], dateTime.AddMilliseconds(200));
await dispatcher.EnqueueToScheduler(messages[2], dateTime.AddMilliseconds(100));

await Task.Delay(1200, CancellationToken.None);
await cts.CancelAsync();

// Assert
sender.ReceivedMessages.Select(m => m.DbId).Should().Equal(["3", "2", "1"]);
}


private MediumMessage CreateTestMessage(string id = "1")
{
Expand Down
11 changes: 6 additions & 5 deletions test/DotNetCore.CAP.Test/Helpers/TestThreadSafeMessageSender.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
Expand All @@ -8,17 +9,17 @@ namespace DotNetCore.CAP.Test.Helpers;

public class TestThreadSafeMessageSender : IMessageSender
{
private readonly List<MediumMessage> _messagesInOrder = new();
private readonly ConcurrentQueue<MediumMessage> _messagesInOrder = [];

public Task<OperateResult> SendAsync(MediumMessage message)
{
{
lock (_messagesInOrder)
{
_messagesInOrder.Add(message);
_messagesInOrder.Enqueue(message);
}
return Task.FromResult(OperateResult.Success);
}

public int Count => _messagesInOrder.Count;
public List<MediumMessage> ReceivedMessages => _messagesInOrder.ToList();
}