Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Message Queue Integration #2264

Closed
54 changes: 54 additions & 0 deletions Abp.sln
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,20 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Abp.ZeroCore.Tests", "test\
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Abp.Castle.Log4Net.Tests", "test\Abp.Castle.Log4Net.Tests\Abp.Castle.Log4Net.Tests.csproj", "{F6F5F70B-2217-403F-AB87-6DFA9AD6692F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Abp.MqMessages", "src\Abp.MqMessages\Abp.MqMessages.csproj", "{65D9A698-3448-4DFA-B35D-9FBB84DAB4B7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Abp.MqMessages.Rebus", "src\Abp.MqMessages.Rebus\Abp.MqMessages.Rebus.csproj", "{51D0B81A-F65A-4449-9B73-CF946E695CC5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Abp.MqMessages.RebusRabbitMqPublisher", "src\Abp.MqMessages.RebusRabbitMqPublisher\Abp.MqMessages.RebusRabbitMqPublisher.csproj", "{D0A2DA46-E5DE-4508-A181-3A2325A64872}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Abp.MqMessages.RebusRabbitMqConsumer", "src\Abp.MqMessages.RebusRabbitMqConsumer\Abp.MqMessages.RebusRabbitMqConsumer.csproj", "{6FF899A2-3C25-4263-A8F6-74EB9534A6DC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RebusRabbitMqPublisher", "test\mq-messages-sample\Sample.RebusRabbitMqPublisher\Sample.RebusRabbitMqPublisher.csproj", "{F2FBB1A0-3D70-4CC4-BE7F-EDCB1B5E604C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RebusRabbitMqConsumer", "test\mq-messages-sample\Sample.RebusRabbitMqConsumer\Sample.RebusRabbitMqConsumer.csproj", "{0F410BAA-F444-4F53-A754-F3D1FA862398}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.MqMessages", "test\mq-messages-sample\Sample.MqMessages\Sample.MqMessages.csproj", "{A8333DD8-8A68-4C72-B587-FBC44901B4C7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -483,6 +497,34 @@ Global
{F6F5F70B-2217-403F-AB87-6DFA9AD6692F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F6F5F70B-2217-403F-AB87-6DFA9AD6692F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F6F5F70B-2217-403F-AB87-6DFA9AD6692F}.Release|Any CPU.Build.0 = Release|Any CPU
{65D9A698-3448-4DFA-B35D-9FBB84DAB4B7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{65D9A698-3448-4DFA-B35D-9FBB84DAB4B7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{65D9A698-3448-4DFA-B35D-9FBB84DAB4B7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{65D9A698-3448-4DFA-B35D-9FBB84DAB4B7}.Release|Any CPU.Build.0 = Release|Any CPU
{51D0B81A-F65A-4449-9B73-CF946E695CC5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{51D0B81A-F65A-4449-9B73-CF946E695CC5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{51D0B81A-F65A-4449-9B73-CF946E695CC5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{51D0B81A-F65A-4449-9B73-CF946E695CC5}.Release|Any CPU.Build.0 = Release|Any CPU
{D0A2DA46-E5DE-4508-A181-3A2325A64872}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D0A2DA46-E5DE-4508-A181-3A2325A64872}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D0A2DA46-E5DE-4508-A181-3A2325A64872}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D0A2DA46-E5DE-4508-A181-3A2325A64872}.Release|Any CPU.Build.0 = Release|Any CPU
{6FF899A2-3C25-4263-A8F6-74EB9534A6DC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6FF899A2-3C25-4263-A8F6-74EB9534A6DC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6FF899A2-3C25-4263-A8F6-74EB9534A6DC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6FF899A2-3C25-4263-A8F6-74EB9534A6DC}.Release|Any CPU.Build.0 = Release|Any CPU
{F2FBB1A0-3D70-4CC4-BE7F-EDCB1B5E604C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F2FBB1A0-3D70-4CC4-BE7F-EDCB1B5E604C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F2FBB1A0-3D70-4CC4-BE7F-EDCB1B5E604C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F2FBB1A0-3D70-4CC4-BE7F-EDCB1B5E604C}.Release|Any CPU.Build.0 = Release|Any CPU
{0F410BAA-F444-4F53-A754-F3D1FA862398}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0F410BAA-F444-4F53-A754-F3D1FA862398}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0F410BAA-F444-4F53-A754-F3D1FA862398}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0F410BAA-F444-4F53-A754-F3D1FA862398}.Release|Any CPU.Build.0 = Release|Any CPU
{A8333DD8-8A68-4C72-B587-FBC44901B4C7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A8333DD8-8A68-4C72-B587-FBC44901B4C7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A8333DD8-8A68-4C72-B587-FBC44901B4C7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A8333DD8-8A68-4C72-B587-FBC44901B4C7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -565,8 +607,20 @@ Global
{0AD3082B-68F2-4FE2-B53E-8BE6BEA2EB74} = {1E6B9E8D-D5C1-4AD7-89F9-7179FEFD7C01}
{0CA8B27D-292C-4D57-AD88-EB0073C68F31} = {1E6B9E8D-D5C1-4AD7-89F9-7179FEFD7C01}
{F6F5F70B-2217-403F-AB87-6DFA9AD6692F} = {1E6B9E8D-D5C1-4AD7-89F9-7179FEFD7C01}
{65D9A698-3448-4DFA-B35D-9FBB84DAB4B7} = {DFF0464B-5402-4DD6-86F5-2AEC1163B232}
{51D0B81A-F65A-4449-9B73-CF946E695CC5} = {DFF0464B-5402-4DD6-86F5-2AEC1163B232}
{D0A2DA46-E5DE-4508-A181-3A2325A64872} = {DFF0464B-5402-4DD6-86F5-2AEC1163B232}
{6FF899A2-3C25-4263-A8F6-74EB9534A6DC} = {DFF0464B-5402-4DD6-86F5-2AEC1163B232}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {5A1CB38E-F77D-4A40-B3A9-9A70C3F3BC6D}
{65D9A698-3448-4DFA-B35D-9FBB84DAB4B7} = {DFF0464B-5402-4DD6-86F5-2AEC1163B232}
{51D0B81A-F65A-4449-9B73-CF946E695CC5} = {DFF0464B-5402-4DD6-86F5-2AEC1163B232}
{D0A2DA46-E5DE-4508-A181-3A2325A64872} = {DFF0464B-5402-4DD6-86F5-2AEC1163B232}
{27D7303F-C5BF-49A2-B674-A04170420D25} = {1E6B9E8D-D5C1-4AD7-89F9-7179FEFD7C01}
{6FF899A2-3C25-4263-A8F6-74EB9534A6DC} = {DFF0464B-5402-4DD6-86F5-2AEC1163B232}
{A8333DD8-8A68-4C72-B587-FBC44901B4C7} = {27D7303F-C5BF-49A2-B674-A04170420D25}
{F2FBB1A0-3D70-4CC4-BE7F-EDCB1B5E604C} = {27D7303F-C5BF-49A2-B674-A04170420D25}
{0F410BAA-F444-4F53-A754-F3D1FA862398} = {27D7303F-C5BF-49A2-B674-A04170420D25}
EndGlobalSection
EndGlobal
23 changes: 23 additions & 0 deletions src/Abp.MqMessages.Rebus/Abp.MqMessages.Rebus.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<RootNamespace>Abp</RootNamespace>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Abp\Abp.csproj" />
<PackageReference Include="System.Linq.Queryable" Version="4.3.0" />
<PackageReference Include="System.Data.Common" Version="4.3.0" />
<PackageReference Include="System.Threading" Version="4.3.0" />
<PackageReference Include="System.Xml.XPath.XmlDocument" Version="4.3.0" />
<PackageReference Include="System.ComponentModel.Annotations" Version="4.4.0" />
<PackageReference Include="System.Runtime.Serialization.Formatters" Version="4.3.0" />
<PackageReference Include="System.Security.Claims" Version="4.3.0" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.0.0" />
<PackageReference Include="Nito.AsyncEx.Context" Version="1.1.0" />
<PackageReference Include="Nito.AsyncEx.Coordination" Version="1.0.2" />
<PackageReference Include="Rebus" Version="4.1.0-b4" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System.Reflection;
using System.Threading.Tasks;
using Abp.Json;
using Abp.Runtime.Session;
using Abp.Threading;
using Castle.Core.Logging;
using Rebus.Bus;

namespace Abp.MqMessages.Publishers
{
public class RebusRabbitMqPublisher : IMqMessagePublisher
{
private readonly IBus _bus;

public ILogger Logger { get; set; }

public IAbpSession AbpSession { get; set; }

public RebusRabbitMqPublisher(IBus bus)
{
_bus = bus;
Logger = NullLogger.Instance;
AbpSession = NullAbpSession.Instance;
}

public void Publish(object mqMessages)
{
TryFillSessionInfo(mqMessages);

Logger.Debug(mqMessages.GetType().FullName + ":" + mqMessages.ToJsonString());

AsyncHelper.RunSync(() => _bus.Publish(mqMessages));
}

private void TryFillSessionInfo(object mqMessages)
{
if (AbpSession.UserId.HasValue)
{
var operatorUserIdProperty = mqMessages.GetType().GetProperty("UserId");
if (operatorUserIdProperty != null && (operatorUserIdProperty.PropertyType == typeof(long?)))
{
operatorUserIdProperty.SetValue(mqMessages, AbpSession.UserId);
}
}

if (AbpSession.TenantId.HasValue)
{
var tenantIdProperty = mqMessages.GetType().GetProperty("TenantId");
if (tenantIdProperty != null && (tenantIdProperty.PropertyType == typeof(int?)))
{
tenantIdProperty.SetValue(mqMessages, AbpSession.TenantId);
}
}
}

public async Task PublishAsync(object mqMessages)
{
TryFillSessionInfo(mqMessages);

Logger.Debug(mqMessages.GetType().FullName + ":" + mqMessages.ToJsonString());

await _bus.Publish(mqMessages);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<RootNamespace>Abp</RootNamespace>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Abp.MqMessages.Rebus\Abp.MqMessages.Rebus.csproj" />
<ProjectReference Include="..\Abp\Abp.csproj" />
<PackageReference Include="Rebus" Version="4.1.0-b4" />
<PackageReference Include="Rebus.CastleWindsor" Version="4.0.0" />
<PackageReference Include="Rebus.RabbitMq" Version="4.0.0" />

</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Abp.MqMessages.Consumers;

namespace Abp.Configuration.Startup
{
public static class RebusRabbitMqConsumerConfigurationExtensions
{
public static IRebusRabbitMqConsumerModuleConfig UseRebusRabbitMqConsumer(this IModuleConfigurations configurations)
{
return configurations.AbpConfiguration.GetOrCreate("Modules.Abp.RebusRabbitMqConsumer", () => configurations.AbpConfiguration.IocManager.Resolve<IRebusRabbitMqConsumerModuleConfig>());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.Reflection;
using Rebus.Config;
using Rebus.Serialization;

namespace Abp.MqMessages.Consumers
{
public interface IRebusRabbitMqConsumerModuleConfig
{
bool Enabled { get; }

string ConnectionString { get; }

string QueueName { get; }

int MaxParallelism { get; }

int NumberOfWorkers { get; }

bool MessageAuditingEnabled { get; }

string MessageAuditingQueueName { get; }

Assembly[] AssemblysIncludeRebusMqMessageHandlers { get; }

Action<RebusLoggingConfigurer> LoggingConfigurer { get; }

Action<OptionsConfigurer> OptionsConfigurer { get; }

Action<StandardConfigurer<ISerializer>> SerializerConfigurer { get; }

IRebusRabbitMqConsumerModuleConfig Enable(bool enabled);

IRebusRabbitMqConsumerModuleConfig ConnectTo(string connectString);

IRebusRabbitMqConsumerModuleConfig UseQueue(string queueName);

IRebusRabbitMqConsumerModuleConfig SetMaxParallelism(int maxParallelism);

IRebusRabbitMqConsumerModuleConfig SetNumberOfWorkers(int numberOfWorkers);

IRebusRabbitMqConsumerModuleConfig EnableMessageAuditing(string messageAuditingQueueName);

IRebusRabbitMqConsumerModuleConfig RegisterHandlerInAssemblys(params Assembly[] assemblys);

IRebusRabbitMqConsumerModuleConfig UseLogging(Action<RebusLoggingConfigurer> loggingConfigurer);

IRebusRabbitMqConsumerModuleConfig UseOptions(Action<OptionsConfigurer> optionsConfigurer);

IRebusRabbitMqConsumerModuleConfig UseSerializer(Action<StandardConfigurer<ISerializer>> serializerConfigurer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using Abp.Modules;
using Abp.MqMessages.Publishers;
using Rebus.Auditing.Messages;
using Rebus.Bus;
using Rebus.CastleWindsor;
using Rebus.Config;
using Rebus.Handlers;

namespace Abp.MqMessages.Consumers
{
public class RebusRabbitMqConsumerModule : AbpModule
{
private IBus _bus;

public override void PreInitialize()
{
IocManager.Register<IRebusRabbitMqConsumerModuleConfig, RebusRabbitMqConsumerModuleConfig>();
IocManager.Register<IMqMessagePublisher, RebusRabbitMqPublisher>();
}

public override void Initialize()
{
IocManager.RegisterAssemblyByConvention(Assembly.GetExecutingAssembly());
}

public override void PostInitialize()
{
var moduleConfig = IocManager.Resolve<IRebusRabbitMqConsumerModuleConfig>();

if (moduleConfig.Enabled)
{
var rebusConfig = Configure.With(new CastleWindsorContainerAdapter(IocManager.IocContainer));

if (moduleConfig.LoggingConfigurer != null)
{
rebusConfig.Logging(moduleConfig.LoggingConfigurer);
}

rebusConfig.Serialization(moduleConfig.SerializerConfigurer);

if (moduleConfig.OptionsConfigurer != null)
{
rebusConfig.Options(moduleConfig.OptionsConfigurer);
}

rebusConfig.Options(c =>
{
c.SetMaxParallelism(moduleConfig.MaxParallelism);
c.SetNumberOfWorkers(moduleConfig.NumberOfWorkers);
});

if (moduleConfig.MessageAuditingEnabled)
{
rebusConfig.Options(o => o.EnableMessageAuditing(moduleConfig.MessageAuditingQueueName));
}

var mqMessageTypes = new List<Type>();
//Register handlers first!
foreach (var assembly in moduleConfig.AssemblysIncludeRebusMqMessageHandlers)
{
IocManager.IocContainer.AutoRegisterHandlersFromAssembly(assembly);

mqMessageTypes.AddRange(assembly.GetTypes()
.Where(t => t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandleMessages<>)))
.SelectMany(t => t.GetInterfaces())
.Distinct()
.SelectMany(t => t.GetGenericArguments())
.Distinct());
}

_bus = rebusConfig.Transport(c => c.UseRabbitMq(moduleConfig.ConnectionString, moduleConfig.QueueName))
.Start();

//Subscribe messages
mqMessageTypes = mqMessageTypes.Distinct().ToList();

foreach (var mqMessageType in mqMessageTypes)
{
_bus.Subscribe(mqMessageType);
}
}
}

public override void Shutdown()
{
if (_bus != null)
{
_bus.Dispose();
}
}
}
}
Loading