diff --git a/CAP.sln b/CAP.sln index 0b5b3199..a1de23c5 100644 --- a/CAP.sln +++ b/CAP.sln @@ -82,6 +82,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Pulsar", "sr EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Pulsar.InMemory", "samples\Sample.Pulsar.InMemory\Sample.Pulsar.InMemory.csproj", "{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer.DispatcherPerGroup", "samples\Sample.RabbitMQ.SqlServer.DispatcherPerGroup\Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj", "{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -196,6 +198,10 @@ Global {B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Debug|Any CPU.Build.0 = Debug|Any CPU {B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Release|Any CPU.ActiveCfg = Release|Any CPU {B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Release|Any CPU.Build.0 = Release|Any CPU + {DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -228,6 +234,7 @@ Global {23684403-7DA8-489A-8A1E-8056D7683E18} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0} {AB7A10CB-2C7E-49CE-AA21-893772FF6546} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {DCDF58E8-F823-4F04-9F8C-E8076DC16A68} = {3A6B6931-A123-477A-9469-8B468B5385AF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/README.md b/README.md index 437b8910..05a77b9c 100644 --- a/README.md +++ b/README.md @@ -227,7 +227,8 @@ public void ShowTime2(DateTime datetime) } ``` -`ShowTime1` and `ShowTime2` will be called at the same time. +`ShowTime1` and `ShowTime2` will be called one after another because all received messages are processed linear. +You can change that behaviour increasing `ConsumerThreadCount`. BTW, You can specify the default group name in the configuration: diff --git a/docs/content/user-guide/en/cap/configuration.md b/docs/content/user-guide/en/cap/configuration.md index 8125c6b2..a1ee4c2f 100644 --- a/docs/content/user-guide/en/cap/configuration.md +++ b/docs/content/user-guide/en/cap/configuration.md @@ -93,7 +93,7 @@ The interval of the collector processor deletes expired messages. #### ConsumerThreadCount -> Default : 1 +> Default: 1 Number of consumer threads, when this value is greater than 1, the order of message execution cannot be guaranteed. @@ -115,4 +115,10 @@ Failure threshold callback. This action is called when the retry reaches the val > Default: 24*3600 sec (1 days) -The expiration time (in seconds) of the success message. When the message is sent or consumed successfully, it will be removed from database storage when the time reaches `SucceedMessageExpiredAfter` seconds. You can set the expiration time by specifying this value. \ No newline at end of file +The expiration time (in seconds) of the success message. When the message is sent or consumed successfully, it will be removed from database storage when the time reaches `SucceedMessageExpiredAfter` seconds. You can set the expiration time by specifying this value. + +#### UseDispatchingPerGroup + +> Default: false + +If `true` then all consumers within the same group pushes received messages to own dispatching pipeline channel. Each channel has set thread count to `ConsumerThreadCount` value. \ No newline at end of file diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Controllers/HomeController.cs b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Controllers/HomeController.cs new file mode 100644 index 00000000..dacd2c94 --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Controllers/HomeController.cs @@ -0,0 +1,35 @@ +using DotNetCore.CAP; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Data.SqlClient; +using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages; +using System; +using System.Threading.Tasks; + +namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Controllers +{ + public class HomeController : Controller + { + private readonly ICapPublisher _capPublisher; + + public HomeController(ICapPublisher capPublisher) + { + _capPublisher = capPublisher; + } + + public async Task Index() + { + await using (var connection = new SqlConnection("Server=(local);Database=CAP-Test;Trusted_Connection=True;")) + { + using var transaction = connection.BeginTransaction(_capPublisher); + // This is where you would do other work that is going to persist data to your database + + var message = TestMessage.Create($"This is message text created at {DateTime.Now:O}."); + + await _capPublisher.PublishAsync(typeof(TestMessage).FullName, message); + transaction.Commit(); + } + + return Content("ok"); + } + } +} \ No newline at end of file diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/TestMessage.cs b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/TestMessage.cs new file mode 100644 index 00000000..95dbfd60 --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/TestMessage.cs @@ -0,0 +1,12 @@ +namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages +{ + public class TestMessage + { + public static TestMessage Create(string text) => new() + { + Text = text + }; + + public string Text { get; private init; } + } +} \ No newline at end of file diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/VeryFastProcessingReceiver.cs b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/VeryFastProcessingReceiver.cs new file mode 100644 index 00000000..9262d61d --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/VeryFastProcessingReceiver.cs @@ -0,0 +1,25 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers; + +namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages +{ + [QueueHandlerTopic("fasttopic")] + public class VeryFastProcessingReceiver : QueueHandler + { + private readonly ILogger _logger; + + public VeryFastProcessingReceiver(ILogger logger) + { + _logger = logger; + } + + public async Task Handle(TestMessage value) + { + _logger.LogInformation($"Starting FAST processing handler {DateTime.Now:O}: {value.Text}"); + await Task.Delay(50); + _logger.LogInformation($"Ending FAST processing handler {DateTime.Now:O}: {value.Text}"); + } + } +} \ No newline at end of file diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/XSlowProcessingReceiver.cs b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/XSlowProcessingReceiver.cs new file mode 100644 index 00000000..ab620a1a --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Messages/XSlowProcessingReceiver.cs @@ -0,0 +1,25 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers; + +namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages +{ + [QueueHandlerTopic("slowtopic")] + public class XSlowProcessingReceiver : QueueHandler + { + private readonly ILogger _logger; + + public XSlowProcessingReceiver(ILogger logger) + { + _logger = logger; + } + + public async Task Handle(TestMessage value) + { + _logger.LogInformation($"Starting SLOW processing handler {DateTime.Now:O}: {value.Text}"); + await Task.Delay(10000); + _logger.LogInformation($"Ending SLOW processing handler {DateTime.Now:O}: {value.Text}"); + } + } +} \ No newline at end of file diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Program.cs b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Program.cs new file mode 100644 index 00000000..2055b481 --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Program.cs @@ -0,0 +1,58 @@ +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Serilog; +using Serilog.Events; +using System; + +namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup +{ + public class Program + { + public static int Main(string[] args) + { + Log.Logger = new LoggerConfiguration() + .MinimumLevel.Debug() + .MinimumLevel.Override("Microsoft", LogEventLevel.Warning) + .Enrich.FromLogContext() + .WriteTo.Debug() +#if DEBUG + .WriteTo.Seq("http://localhost:5341") +#endif + .CreateLogger(); + + try + { + Log.Information("Starting host..."); + CreateHostBuilder(args).Build().Run(); + return 0; + } + catch (Exception ex) + { + Log.Fatal(ex.InnerException ?? ex, "Host terminated unexpectedly"); + return 1; + } + finally + { + Log.CloseAndFlush(); + } + } + + public static IHostBuilder CreateHostBuilder(string[] args) => + Host.CreateDefaultBuilder(args) + .ConfigureAppConfiguration((context, builder) => + { + builder + .AddJsonFile("appsettings.json") + .AddJsonFile($"appsettings.{context.HostingEnvironment.EnvironmentName}.json", true); + }) + .UseSerilog((context, configuration) => + { + configuration.ReadFrom.Configuration(context.Configuration); + }, true, true) + .ConfigureWebHostDefaults(webBuilder => + { + webBuilder.UseStartup(); + }); + } +} diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj new file mode 100644 index 00000000..759ad6d6 --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj @@ -0,0 +1,26 @@ + + + net5.0 + latest + OutOfProcess + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + + + diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Startup.cs b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Startup.cs new file mode 100644 index 00000000..3f20c400 --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Startup.cs @@ -0,0 +1,52 @@ +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers; +using Serilog; + +namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup +{ + public class Startup + { + // This method gets called by the runtime. Use this method to add services to the container. + // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940 + public void ConfigureServices(IServiceCollection services) + { + services.AddLogging(x => x.AddSerilog()); + + services + .AddSingleton() + .AddQueueHandlers(typeof(Startup).Assembly); + + services.AddCap(options => + { + options.UseSqlServer("Server=(local);Database=CAP-Test;Trusted_Connection=True;"); + options.UseRabbitMQ("localhost"); + options.UseDashboard(); + options.GroupNamePrefix = "th"; + options.ConsumerThreadCount = 1; + + options.UseDispatchingPerGroup = true; + }); + + services.AddControllersWithViews(); + } + + // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. + public void Configure(IApplicationBuilder app, IWebHostEnvironment env) + { + app.UseDeveloperExceptionPage(); + app.UseSerilogRequestLogging(); + app.UseCapDashboard(); + app.UseRouting(); + app.UseEndpoints(endpoints => + { + endpoints.MapControllerRoute( + name: "default", + pattern: "{controller=Home}/{action=Index}/{id?}"); + }); + } + } +} diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandler.cs b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandler.cs new file mode 100644 index 00000000..82fa3a17 --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandler.cs @@ -0,0 +1,4 @@ +namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers +{ + public abstract class QueueHandler { } +} \ No newline at end of file diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandlerTopicAttribute.cs b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandlerTopicAttribute.cs new file mode 100644 index 00000000..fd545bf2 --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandlerTopicAttribute.cs @@ -0,0 +1,15 @@ +using System; + +namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers +{ + [AttributeUsage(AttributeTargets.Class)] + public class QueueHandlerTopicAttribute : Attribute + { + public string Topic { get; } + + public QueueHandlerTopicAttribute(string topic) + { + Topic = topic; + } + } +} \ No newline at end of file diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandlersExtensions.cs b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandlersExtensions.cs new file mode 100644 index 00000000..b62a15f7 --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/QueueHandlersExtensions.cs @@ -0,0 +1,31 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Linq; +using System.Reflection; + +namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers +{ + internal static class QueueHandlersExtensions + { + private static readonly Type queueHandlerType = typeof(QueueHandler); + + public static IServiceCollection AddQueueHandlers(this IServiceCollection services, params Assembly[] assemblies) + { + assemblies ??= new[] { Assembly.GetEntryAssembly() }; + + foreach (var type in assemblies.Distinct().SelectMany(x => x.GetTypes().Where(FilterHandlers))) + { + services.AddTransient(queueHandlerType, type); + } + + return services; + } + + private static bool FilterHandlers(Type t) + { + var topic = t.GetCustomAttribute(); + + return queueHandlerType.IsAssignableFrom(t) && topic != null && t.IsClass && !t.IsAbstract; + } + } +} \ No newline at end of file diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/TypedConsumerServiceSelector.cs b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/TypedConsumerServiceSelector.cs new file mode 100644 index 00000000..0aff7602 --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/TypedConsumers/TypedConsumerServiceSelector.cs @@ -0,0 +1,87 @@ +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; + +namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers +{ + internal class TypedConsumerServiceSelector : ConsumerServiceSelector + { + private readonly CapOptions _capOptions; + + public TypedConsumerServiceSelector(IServiceProvider serviceProvider) : base(serviceProvider) + { + _capOptions = serviceProvider.GetRequiredService>().Value; + } + + protected override IEnumerable FindConsumersFromInterfaceTypes(IServiceProvider provider) + { + var executorDescriptorList = new List(30); + + using var scoped = provider.CreateScope(); + var consumerServices = scoped.ServiceProvider.GetServices(); + foreach (var service in consumerServices) + { + var typeInfo = service.GetType().GetTypeInfo(); + if (!typeof(QueueHandler).GetTypeInfo().IsAssignableFrom(typeInfo)) + { + continue; + } + + executorDescriptorList.AddRange(GetMyDescription(typeInfo)); + } + + return executorDescriptorList; + } + + private IEnumerable GetMyDescription(TypeInfo typeInfo) + { + var method = typeInfo.DeclaredMethods.FirstOrDefault(x => x.Name == "Handle"); + if (method == null) yield break; + + var topicAttr = typeInfo.GetCustomAttributes(true); + var topicAttributes = topicAttr as IList ?? topicAttr.ToList(); + + if (topicAttributes.Count == 0) yield break; + + foreach (var attr in topicAttributes) + { + var topic = attr.Topic == null + ? _capOptions.DefaultGroupName + "." + _capOptions.Version + : attr.Topic + "." + _capOptions.Version; + + if (!string.IsNullOrEmpty(_capOptions.GroupNamePrefix)) + { + topic = $"{_capOptions.GroupNamePrefix}.{topic}"; + } + + var parameters = method.GetParameters().Select(p => new ParameterDescriptor + { + Name = p.Name, + ParameterType = p.ParameterType, + IsFromCap = p.GetCustomAttributes(typeof(FromCapAttribute)).Any() + }).ToList(); + + var capName = parameters.FirstOrDefault(x => !x.IsFromCap)?.ParameterType.FullName; + if (string.IsNullOrEmpty(capName)) continue; + + yield return new ConsumerExecutorDescriptor + { + Attribute = new CapSubscribeAttribute(capName) + { + Group = topic + }, + Parameters = parameters, + MethodInfo = method, + ImplTypeInfo = typeInfo, + TopicNamePrefix = _capOptions.TopicNamePrefix, + ServiceTypeInfo = typeInfo + }; + } + } + } +} \ No newline at end of file diff --git a/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/appsettings.json b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/appsettings.json new file mode 100644 index 00000000..f673654c --- /dev/null +++ b/samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/appsettings.json @@ -0,0 +1,10 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Debug", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + }, + "AllowedHosts": "*" +} diff --git a/src/DotNetCore.CAP/CAP.Builder.cs b/src/DotNetCore.CAP/CAP.Builder.cs index 8cbf830d..2dd498a4 100644 --- a/src/DotNetCore.CAP/CAP.Builder.cs +++ b/src/DotNetCore.CAP/CAP.Builder.cs @@ -6,6 +6,7 @@ using System.Reflection; using DotNetCore.CAP.Filter; using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Processor; using JetBrains.Annotations; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index b6768c6e..3a2c8cea 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -27,6 +27,7 @@ public CapOptions() Version = "v1"; DefaultGroupName = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name.ToLower(); CollectorCleaningInterval = 300; + UseDispatchingPerGroup = false; } internal IList Extensions { get; } @@ -80,6 +81,12 @@ public CapOptions() /// public int ConsumerThreadCount { get; set; } + /// + /// If true then each message group will have own independent dispatching pipeline. Each pipeline use as many threads as value is. + /// Default is false. + /// + public bool UseDispatchingPerGroup { get; set; } + /// /// The number of producer thread connections. /// Default is 1 diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index d5fcc0d8..393bd458 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -51,9 +51,8 @@ public static CapBuilder AddCap(this IServiceCollection services, Action(); services.TryAddSingleton(); - //Sender and Executors + //Sender services.TryAddSingleton(); - services.TryAddSingleton(); services.TryAddSingleton(); @@ -63,6 +62,17 @@ public static CapBuilder AddCap(this IServiceCollection services, Action(); + } + else + { + services.TryAddSingleton(); + } + foreach (var serviceExtension in options.Extensions) { serviceExtension.AddServices(services); diff --git a/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs index 84c521a7..969c8487 100644 --- a/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs @@ -45,7 +45,7 @@ public SubscribeDispatcher( public Task DispatchAsync(MediumMessage message, CancellationToken cancellationToken) { - var selector = _provider.GetService(); + var selector = _provider.GetRequiredService(); if (!selector.TryGetTopicExecutor(message.Origin.GetName(), message.Origin.GetGroup(), out var executor)) { var error = $"Message (Name:{message.Origin.GetName()},Group:{message.Origin.GetGroup()}) can not be found subscriber." + @@ -66,13 +66,13 @@ public async Task DispatchAsync(MediumMessage message, ConsumerEx OperateResult result; do { - var executedResult = await ExecuteWithoutRetryAsync(message, descriptor, cancellationToken); - result = executedResult.Item2; + var (shouldRetry, operateResult) = await ExecuteWithoutRetryAsync(message, descriptor, cancellationToken); + result = operateResult; if (result == OperateResult.Success) { return result; } - retry = executedResult.Item1; + retry = shouldRetry; } while (retry); return result; @@ -89,7 +89,10 @@ public async Task DispatchAsync(MediumMessage message, ConsumerEx try { - _logger.ConsumerExecuting(descriptor.MethodInfo.Name); + _logger.ConsumerExecuting( + descriptor.ImplTypeInfo.Name, + descriptor.MethodInfo.Name, + descriptor.Attribute.Group ?? _options.DefaultGroupName); var sp = Stopwatch.StartNew(); @@ -99,7 +102,11 @@ public async Task DispatchAsync(MediumMessage message, ConsumerEx await SetSuccessfulState(message); - _logger.ConsumerExecuted(descriptor.MethodInfo.Name, sp.Elapsed.TotalMilliseconds); + _logger.ConsumerExecuted( + descriptor.ImplTypeInfo.Name, + descriptor.MethodInfo.Name, + descriptor.Attribute.Group ?? _options.DefaultGroupName, + sp.Elapsed.TotalMilliseconds); return (false, OperateResult.Success); } diff --git a/src/DotNetCore.CAP/Internal/LoggerExtensions.cs b/src/DotNetCore.CAP/Internal/LoggerExtensions.cs index 235bee1f..19b20e95 100644 --- a/src/DotNetCore.CAP/Internal/LoggerExtensions.cs +++ b/src/DotNetCore.CAP/Internal/LoggerExtensions.cs @@ -45,14 +45,14 @@ public static void MessagePublishException(this ILogger logger, string messageId logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}"); } - public static void ConsumerExecuting(this ILogger logger, string methodName) + public static void ConsumerExecuting(this ILogger logger, string className, string methodName, string group) { - logger.LogInformation($"Executing subscriber method '{methodName}'"); + logger.LogInformation($"Executing subscriber method '{className}.{methodName}' on group '{group}'"); } - public static void ConsumerExecuted(this ILogger logger, string methodName, double milliseconds) + public static void ConsumerExecuted(this ILogger logger, string className, string methodName, string group, double milliseconds) { - logger.LogInformation($"Executed subscriber method '{methodName}' in {milliseconds} ms"); + logger.LogInformation($"Executed subscriber method '{className}.{methodName}' on group '{group}' in {milliseconds} ms"); } public static void ServerStarting(this ILogger logger) diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index 0b153600..f88373f8 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -66,6 +66,8 @@ public void Start(CancellationToken stoppingToken) Task.WhenAll(Enumerable.Range(0, _options.ConsumerThreadCount) .Select(_ => Task.Factory.StartNew(() => Processing(stoppingToken), stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray()); + + _logger.LogInformation("Starting default Dispatcher"); } public void EnqueueToPublish(MediumMessage message) diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs b/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs new file mode 100644 index 00000000..678366f9 --- /dev/null +++ b/src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs @@ -0,0 +1,213 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Persistence; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.Processor +{ + internal class DispatcherPerGroup : IDispatcher + { + private readonly IMessageSender _sender; + private readonly CapOptions _options; + private readonly ISubscribeDispatcher _executor; + private readonly ILogger _logger; + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); + + private Channel _publishedChannel; + // private Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel; + private ConcurrentDictionary> _receivedChannels; + private CancellationToken _stoppingToken; + + public DispatcherPerGroup( + ILogger logger, + IMessageSender sender, + IOptions options, + ISubscribeDispatcher executor) + { + _logger = logger; + _sender = sender; + _options = options.Value; + _executor = executor; + } + + public void Start(CancellationToken stoppingToken) + { + _stoppingToken = stoppingToken; + _stoppingToken.ThrowIfCancellationRequested(); + _stoppingToken.Register(() => _cts.Cancel()); + + var capacity = _options.ProducerThreadCount * 500; + _publishedChannel = Channel.CreateBounded(new BoundedChannelOptions(capacity > 5000 ? 5000 : capacity) + { + AllowSynchronousContinuations = true, + SingleReader = _options.ProducerThreadCount == 1, + SingleWriter = true, + FullMode = BoundedChannelFullMode.Wait + }); + + Task.WhenAll(Enumerable.Range(0, _options.ProducerThreadCount) + .Select(_ => Task.Factory.StartNew(() => Sending(stoppingToken), stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray()); + + _receivedChannels = new ConcurrentDictionary>(_options.ConsumerThreadCount, _options.ConsumerThreadCount * 2); + GetOrCreateReceiverChannel(_options.DefaultGroupName); + + _logger.LogInformation("Starting DispatcherPerGroup"); + } + + public void EnqueueToPublish(MediumMessage message) + { + try + { + if (!_publishedChannel.Writer.TryWrite(message)) + { + while (_publishedChannel.Writer.WaitToWriteAsync(_cts.Token).AsTask().ConfigureAwait(false).GetAwaiter().GetResult()) + { + if (_publishedChannel.Writer.TryWrite(message)) + { + return; + } + } + } + } + catch (OperationCanceledException) + { + //Ignore + } + } + + public void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor) + { + try + { + var group = descriptor.Attribute.Group ?? _options.DefaultGroupName; + + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug("Enqueue message for group {ConsumerGroup}", group); + } + + var channel = GetOrCreateReceiverChannel(group); + + if (!channel.Writer.TryWrite((message, descriptor))) + { + while (channel.Writer.WaitToWriteAsync(_cts.Token).AsTask().ConfigureAwait(false).GetAwaiter().GetResult()) + { + if (channel.Writer.TryWrite((message, descriptor))) + { + return; + } + } + } + } + catch (OperationCanceledException) + { + //Ignore + } + } + + private Channel<(MediumMessage, ConsumerExecutorDescriptor)> GetOrCreateReceiverChannel(string key) + { + return _receivedChannels.GetOrAdd(key, group => + { + _logger.LogInformation("Creating receiver channel for group {ConsumerGroup} with thread count {ConsumerThreadCount}", group, _options.ConsumerThreadCount); + + var capacity = _options.ConsumerThreadCount * 300; + var channel = Channel.CreateBounded<(MediumMessage, ConsumerExecutorDescriptor)>(new BoundedChannelOptions(capacity > 3000 ? 3000 : capacity) + { + AllowSynchronousContinuations = true, + SingleReader = _options.ConsumerThreadCount == 1, + SingleWriter = true, + FullMode = BoundedChannelFullMode.Wait + }); + + Task.WhenAll(Enumerable.Range(0, _options.ConsumerThreadCount) + .Select(_ => Task.Factory.StartNew(() => Processing(group, channel, _stoppingToken), _stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray()); + + return channel; + }); + } + + private async Task Sending(CancellationToken cancellationToken) + { + try + { + while (await _publishedChannel.Reader.WaitToReadAsync(cancellationToken)) + { + while (_publishedChannel.Reader.TryRead(out var message)) + { + try + { + var result = await _sender.SendAsync(message); + if (!result.Succeeded) + { + _logger.MessagePublishException( + message.Origin.GetId(), + result.ToString(), + result.Exception); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"An exception occurred when sending a message to the MQ. Id:{message.DbId}"); + } + } + } + } + catch (OperationCanceledException) + { + // expected + } + } + + private async Task Processing(string group, Channel<(MediumMessage, ConsumerExecutorDescriptor)> channel, CancellationToken cancellationToken) + { + try + { + while (await channel.Reader.WaitToReadAsync(cancellationToken)) + { + while (channel.Reader.TryRead(out var message)) + { + try + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug("Dispatching message for group {ConsumerGroup}", group); + } + + await _executor.DispatchAsync(message.Item1, message.Item2, cancellationToken); + } + catch (OperationCanceledException) + { + //expected + } + catch (Exception e) + { + _logger.LogError(e, $"An exception occurred when invoke subscriber. MessageId:{message.Item1.DbId}"); + } + } + } + } + catch (OperationCanceledException) + { + // expected + } + } + + public void Dispose() + { + if (!_cts.IsCancellationRequested) + _cts.Cancel(); + } + } +} \ No newline at end of file