Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CAP.sln
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Pulsar", "sr
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Pulsar.InMemory", "samples\Sample.Pulsar.InMemory\Sample.Pulsar.InMemory.csproj", "{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.SqlServer.DispatcherPerGroup", "samples\Sample.RabbitMQ.SqlServer.DispatcherPerGroup\Sample.RabbitMQ.SqlServer.DispatcherPerGroup.csproj", "{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -196,6 +198,10 @@ Global
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Release|Any CPU.Build.0 = Release|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -228,6 +234,7 @@ Global
{23684403-7DA8-489A-8A1E-8056D7683E18} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{AB7A10CB-2C7E-49CE-AA21-893772FF6546} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{DCDF58E8-F823-4F04-9F8C-E8076DC16A68} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ public void ShowTime2(DateTime datetime)
}

```
`ShowTime1` and `ShowTime2` will be called at the same time.
`ShowTime1` and `ShowTime2` will be called one after another because all received messages are processed linear.
You can change that behaviour increasing `ConsumerThreadCount`.

BTW, You can specify the default group name in the configuration:

Expand Down
10 changes: 8 additions & 2 deletions docs/content/user-guide/en/cap/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ The interval of the collector processor deletes expired messages.

#### ConsumerThreadCount

> Default : 1
> Default: 1

Number of consumer threads, when this value is greater than 1, the order of message execution cannot be guaranteed.

Expand All @@ -115,4 +115,10 @@ Failure threshold callback. This action is called when the retry reaches the val

> Default: 24*3600 sec (1 days)

The expiration time (in seconds) of the success message. When the message is sent or consumed successfully, it will be removed from database storage when the time reaches `SucceedMessageExpiredAfter` seconds. You can set the expiration time by specifying this value.
The expiration time (in seconds) of the success message. When the message is sent or consumed successfully, it will be removed from database storage when the time reaches `SucceedMessageExpiredAfter` seconds. You can set the expiration time by specifying this value.

#### UseDispatchingPerGroup

> Default: false

If `true` then all consumers within the same group pushes received messages to own dispatching pipeline channel. Each channel has set thread count to `ConsumerThreadCount` value.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Data.SqlClient;
using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages;
using System;
using System.Threading.Tasks;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Controllers
{
public class HomeController : Controller
{
private readonly ICapPublisher _capPublisher;

public HomeController(ICapPublisher capPublisher)
{
_capPublisher = capPublisher;
}

public async Task<IActionResult> Index()
{
await using (var connection = new SqlConnection("Server=(local);Database=CAP-Test;Trusted_Connection=True;"))
{
using var transaction = connection.BeginTransaction(_capPublisher);
// This is where you would do other work that is going to persist data to your database

var message = TestMessage.Create($"This is message text created at {DateTime.Now:O}.");

await _capPublisher.PublishAsync(typeof(TestMessage).FullName, message);
transaction.Commit();
}

return Content("ok");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages
{
public class TestMessage
{
public static TestMessage Create(string text) => new()
{
Text = text
};

public string Text { get; private init; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages
{
[QueueHandlerTopic("fasttopic")]
public class VeryFastProcessingReceiver : QueueHandler
{
private readonly ILogger<VeryFastProcessingReceiver> _logger;

public VeryFastProcessingReceiver(ILogger<VeryFastProcessingReceiver> logger)
{
_logger = logger;
}

public async Task Handle(TestMessage value)
{
_logger.LogInformation($"Starting FAST processing handler {DateTime.Now:O}: {value.Text}");
await Task.Delay(50);
_logger.LogInformation($"Ending FAST processing handler {DateTime.Now:O}: {value.Text}");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.Messages
{
[QueueHandlerTopic("slowtopic")]
public class XSlowProcessingReceiver : QueueHandler
{
private readonly ILogger<XSlowProcessingReceiver> _logger;

public XSlowProcessingReceiver(ILogger<XSlowProcessingReceiver> logger)
{
_logger = logger;
}

public async Task Handle(TestMessage value)
{
_logger.LogInformation($"Starting SLOW processing handler {DateTime.Now:O}: {value.Text}");
await Task.Delay(10000);
_logger.LogInformation($"Ending SLOW processing handler {DateTime.Now:O}: {value.Text}");
}
}
}
58 changes: 58 additions & 0 deletions samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Serilog;
using Serilog.Events;
using System;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup
{
public class Program
{
public static int Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
.MinimumLevel.Override("Microsoft", LogEventLevel.Warning)
.Enrich.FromLogContext()
.WriteTo.Debug()
#if DEBUG
.WriteTo.Seq("http://localhost:5341")
#endif
.CreateLogger();

try
{
Log.Information("Starting host...");
CreateHostBuilder(args).Build().Run();
return 0;
}
catch (Exception ex)
{
Log.Fatal(ex.InnerException ?? ex, "Host terminated unexpectedly");
return 1;
}
finally
{
Log.CloseAndFlush();
}
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration((context, builder) =>
{
builder
.AddJsonFile("appsettings.json")
.AddJsonFile($"appsettings.{context.HostingEnvironment.EnvironmentName}.json", true);
})
.UseSerilog((context, configuration) =>
{
configuration.ReadFrom.Configuration(context.Configuration);
}, true, true)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<LangVersion>latest</LangVersion>
<AspNetCoreHostingModel>OutOfProcess</AspNetCoreHostingModel>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="5.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="5.0.4" />

<PackageReference Include="Serilog.AspNetCore" Version="4.1.0" />
<PackageReference Include="Serilog.Sinks.Seq" Version="5.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.RabbitMQ\DotNetCore.CAP.RabbitMQ.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.SqlServer\DotNetCore.CAP.SqlServer.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>
52 changes: 52 additions & 0 deletions samples/Sample.RabbitMQ.SqlServer.DispatcherPerGroup/Startup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers;
using Serilog;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup
{
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
services.AddLogging(x => x.AddSerilog());

services
.AddSingleton<IConsumerServiceSelector, TypedConsumerServiceSelector>()
.AddQueueHandlers(typeof(Startup).Assembly);

services.AddCap(options =>
{
options.UseSqlServer("Server=(local);Database=CAP-Test;Trusted_Connection=True;");
options.UseRabbitMQ("localhost");
options.UseDashboard();
options.GroupNamePrefix = "th";
options.ConsumerThreadCount = 1;

options.UseDispatchingPerGroup = true;
});

services.AddControllersWithViews();
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseDeveloperExceptionPage();
app.UseSerilogRequestLogging();
app.UseCapDashboard();
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllerRoute(
name: "default",
pattern: "{controller=Home}/{action=Index}/{id?}");
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers
{
public abstract class QueueHandler { }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers
{
[AttributeUsage(AttributeTargets.Class)]
public class QueueHandlerTopicAttribute : Attribute
{
public string Topic { get; }

public QueueHandlerTopicAttribute(string topic)
{
Topic = topic;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Linq;
using System.Reflection;

namespace Sample.RabbitMQ.SqlServer.DispatcherPerGroup.TypedConsumers
{
internal static class QueueHandlersExtensions
{
private static readonly Type queueHandlerType = typeof(QueueHandler);

public static IServiceCollection AddQueueHandlers(this IServiceCollection services, params Assembly[] assemblies)
{
assemblies ??= new[] { Assembly.GetEntryAssembly() };

foreach (var type in assemblies.Distinct().SelectMany(x => x.GetTypes().Where(FilterHandlers)))
{
services.AddTransient(queueHandlerType, type);
}

return services;
}

private static bool FilterHandlers(Type t)
{
var topic = t.GetCustomAttribute<QueueHandlerTopicAttribute>();

return queueHandlerType.IsAssignableFrom(t) && topic != null && t.IsClass && !t.IsAbstract;
}
}
}
Loading