Skip to content

Commit

Permalink
Related to #5114 - changed delivery service to no longer wait when me…
Browse files Browse the repository at this point in the history
…ssages were processed, ensuring rapido delivery.
  • Loading branch information
phatboyg committed Apr 21, 2024
1 parent 552701a commit dbcfc4e
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 18 deletions.
36 changes: 24 additions & 12 deletions src/MassTransit.Abstractions/Util/RequestRateAlgorithm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ public void Dispose()
/// </summary>
/// <param name="requestCallback"></param>
/// <param name="cancellationToken"></param>
public async Task Run(RequestCallback requestCallback, CancellationToken cancellationToken = default)
public async Task<int> Run(RequestCallback requestCallback, CancellationToken cancellationToken = default)
{
var requestCount = _requestCount;

var tasks = new List<Task>(requestCount);
var tasks = new List<Task<int>>(requestCount);

try
{
Expand All @@ -149,16 +149,20 @@ public async Task Run(RequestCallback requestCallback, CancellationToken cancell
throw;
}

await Task.WhenAll(tasks).ConfigureAwait(false);
var counts = await Task.WhenAll(tasks).ConfigureAwait(false);

return counts.Sum();
}

async Task RunRequest(RequestCallback requestCallback, CancellationToken cancellationToken = default)
async Task<int> RunRequest(RequestCallback requestCallback, CancellationToken cancellationToken = default)
{
using var activeRequest = await BeginRequest(cancellationToken).ConfigureAwait(false);

var count = await requestCallback(activeRequest.ResultLimit, activeRequest.CancellationToken).ConfigureAwait(false);

await activeRequest.Complete(count, CancellationToken.None).ConfigureAwait(false);

return count;
}

/// <summary>
Expand All @@ -168,11 +172,11 @@ async Task RunRequest(RequestCallback requestCallback, CancellationToken cancell
/// <param name="resultCallback"></param>
/// <param name="cancellationToken"></param>
/// <typeparam name="T"></typeparam>
public async Task Run<T>(RequestCallback<T> requestCallback, ResultCallback<T> resultCallback, CancellationToken cancellationToken = default)
public async Task<int> Run<T>(RequestCallback<T> requestCallback, ResultCallback<T> resultCallback, CancellationToken cancellationToken = default)
{
var requestCount = _requestCount;

var tasks = new List<Task>(requestCount);
var tasks = new List<Task<int>>(requestCount);

try
{
Expand All @@ -185,10 +189,12 @@ public async Task Run<T>(RequestCallback<T> requestCallback, ResultCallback<T> r
throw;
}

await Task.WhenAll(tasks).ConfigureAwait(false);
var counts = await Task.WhenAll(tasks).ConfigureAwait(false);

return counts.Sum();
}

async Task RunRequest<T>(RequestCallback<T> requestCallback, ResultCallback<T> resultCallback, CancellationToken cancellationToken = default)
async Task<int> RunRequest<T>(RequestCallback<T> requestCallback, ResultCallback<T> resultCallback, CancellationToken cancellationToken = default)
{
using var activeRequest = await BeginRequest(cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -225,6 +231,8 @@ async Task RunResultCallback()
}

await activeRequest.Complete(count, CancellationToken.None).ConfigureAwait(false);

return count;
}

/// <summary>
Expand All @@ -237,7 +245,7 @@ async Task RunResultCallback()
/// <param name="cancellationToken"></param>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TKey"></typeparam>
public async Task Run<T, TKey>(RequestCallback<T> requestCallback, ResultCallback<T> resultCallback, GroupCallback<T, TKey> groupCallback,
public async Task<int> Run<T, TKey>(RequestCallback<T> requestCallback, ResultCallback<T> resultCallback, GroupCallback<T, TKey> groupCallback,
OrderCallback<T> orderCallback, CancellationToken cancellationToken = default)
{
var requestCount = _requestCount;
Expand All @@ -259,7 +267,7 @@ async Task RunResultCallback()

List<IGrouping<TKey, T>> resultSets = groupCallback(results.SelectMany(x => x)).ToList();

var resultTasks = new List<Task>(ResultLimit);
var resultTasks = new List<Task<int>>(ResultLimit);

try
{
Expand All @@ -272,7 +280,9 @@ async Task RunResultCallback()
throw;
}

await Task.WhenAll(resultTasks).ConfigureAwait(false);
var counts = await Task.WhenAll(resultTasks).ConfigureAwait(false);

return counts.Sum();
}

async Task<IReadOnlyList<T>> RunRequest<T>(RequestCallback<T> requestCallback, CancellationToken cancellationToken = default)
Expand All @@ -286,7 +296,7 @@ async Task<IReadOnlyList<T>> RunRequest<T>(RequestCallback<T> requestCallback, C
return results;
}

async Task RunResultSet<TKey, T>(IGrouping<TKey, T> results, ResultCallback<T> resultCallback, OrderCallback<T> orderCallback,
async Task<int> RunResultSet<TKey, T>(IGrouping<TKey, T> results, ResultCallback<T> resultCallback, OrderCallback<T> orderCallback,
CancellationToken cancellationToken = default)
{
var count = 0;
Expand Down Expand Up @@ -318,6 +328,8 @@ async Task RunResultCallback()
if (count == 0)
throw;
}

return count;
}

public async Task<ActiveRequest> BeginRequest(CancellationToken cancellationToken = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
await _notification.WaitForDelivery(stoppingToken).ConfigureAwait(false);

await _busControl.WaitForHealthStatus(BusHealthStatus.Healthy, stoppingToken).ConfigureAwait(false);

await algorithm.Run(DeliverOutbox, stoppingToken).ConfigureAwait(false);
var count = await algorithm.Run(DeliverOutbox, stoppingToken).ConfigureAwait(false);
if (count > 0)
continue;

await _notification.WaitForDelivery(stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
await _notification.WaitForDelivery(stoppingToken).ConfigureAwait(false);

await _busControl.WaitForHealthStatus(BusHealthStatus.Healthy, stoppingToken).ConfigureAwait(false);

await algorithm.Run(GetOutboxes, DeliverOutbox, stoppingToken).ConfigureAwait(false);
var count = await algorithm.Run(GetOutboxes, DeliverOutbox, stoppingToken).ConfigureAwait(false);
if (count > 0)
continue;

await _notification.WaitForDelivery(stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
Expand Down

0 comments on commit dbcfc4e

Please sign in to comment.