Skip to content

Commit

Permalink
Merge pull request #10008 from abpframework/eventboxes
Browse files Browse the repository at this point in the history
Outbox & Inbox patterns for the distributed event bus
  • Loading branch information
hikalkan committed Sep 30, 2021
2 parents b6d67b9 + 6cf2a5b commit 1ef5656
Show file tree
Hide file tree
Showing 139 changed files with 3,384 additions and 264 deletions.
13 changes: 13 additions & 0 deletions framework/Volo.Abp.sln
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.IdentityModel.Test
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Threading.Tests", "test\Volo.Abp.Threading.Tests\Volo.Abp.Threading.Tests.csproj", "{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.EventBus.Boxes", "src\Volo.Abp.EventBus.Boxes\Volo.Abp.EventBus.Boxes.csproj", "{6E289F31-7924-418B-9DAC-62A7CFADF916}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.DistributedLocking", "src\Volo.Abp.DistributedLocking\Volo.Abp.DistributedLocking.csproj", "{9A7EEA08-15BE-476D-8168-53039867038E}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Auditing.Contracts", "src\Volo.Abp.Auditing.Contracts\Volo.Abp.Auditing.Contracts.csproj", "{508B6355-AD28-4E60-8549-266D21DBF2CF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Http.Client.Web", "src\Volo.Abp.Http.Client.Web\Volo.Abp.Http.Client.Web.csproj", "{F7407459-8AFA-45E4-83E9-9BB01412CC08}"
Expand Down Expand Up @@ -1157,6 +1160,14 @@ Global
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}.Release|Any CPU.Build.0 = Release|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Release|Any CPU.Build.0 = Release|Any CPU
{9A7EEA08-15BE-476D-8168-53039867038E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9A7EEA08-15BE-476D-8168-53039867038E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9A7EEA08-15BE-476D-8168-53039867038E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9A7EEA08-15BE-476D-8168-53039867038E}.Release|Any CPU.Build.0 = Release|Any CPU
{508B6355-AD28-4E60-8549-266D21DBF2CF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{508B6355-AD28-4E60-8549-266D21DBF2CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{508B6355-AD28-4E60-8549-266D21DBF2CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -1360,6 +1371,8 @@ Global
{90B1866A-EF99-40B9-970E-B898E5AA523F} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{40C6740E-BFCA-4D37-8344-3D84E2044BB2} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{6E289F31-7924-418B-9DAC-62A7CFADF916} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{9A7EEA08-15BE-476D-8168-53039867038E} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{508B6355-AD28-4E60-8549-266D21DBF2CF} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{F7407459-8AFA-45E4-83E9-9BB01412CC08} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
EndGlobalSection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ public override void OnApplicationInitialization(ApplicationInitializationContex
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
if (options.IsJobExecutionEnabled)
{
context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.Add(
context.ServiceProvider
.GetRequiredService<IBackgroundJobWorker>()
);
context.AddBackgroundWorker<IBackgroundJobWorker>();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<ItemGroup>
<ProjectReference Include="..\Volo.Abp.Auditing\Volo.Abp.Auditing.csproj" />
<ProjectReference Include="..\Volo.Abp.Data\Volo.Abp.Data.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus.Boxes\Volo.Abp.EventBus.Boxes.csproj" />
<ProjectReference Include="..\Volo.Abp.ExceptionHandling\Volo.Abp.ExceptionHandling.csproj" />
<ProjectReference Include="..\Volo.Abp.Guids\Volo.Abp.Guids.csproj" />
<ProjectReference Include="..\Volo.Abp.MultiTenancy\Volo.Abp.MultiTenancy.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Volo.Abp.Data;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.ExceptionHandling;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
Expand All @@ -18,7 +19,7 @@ namespace Volo.Abp.Domain
[DependsOn(
typeof(AbpAuditingModule),
typeof(AbpDataModule),
typeof(AbpEventBusModule),
typeof(AbpEventBusBoxesModule),
typeof(AbpGuidsModule),
typeof(AbpMultiTenancyModule),
typeof(AbpThreadingModule),
Expand Down
3 changes: 3 additions & 0 deletions framework/src/Volo.Abp.DistributedLocking/FodyWeavers.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>
30 changes: 30 additions & 0 deletions framework/src/Volo.Abp.DistributedLocking/FodyWeavers.xsd
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" />
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>Volo.Abp.DistributedLocking</AssemblyName>
<PackageId>Volo.Abp.DistributedLocking</PackageId>
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<RootNamespace />
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Volo.Abp.Core\Volo.Abp.Core.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="DistributedLock.Core" Version="1.0.2" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Volo.Abp.Modularity;

namespace Volo.Abp.DistributedLocking
{
public class AbpDistributedLockingModule : AbpModule
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class MySQLInboxConfigExtensions
{
public static void UseMySQL<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox<TDbContext>);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class MySQLOutboxConfigExtensions
{
public static void UseMySQL<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , IOracleDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public OracleDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}

[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();

var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}

[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;

var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}

protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Uow;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , IOracleDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
public OracleDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider)
: base(dbContextProvider)
{
}

[UnitOfWork]
public override async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();

var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}

protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleInboxConfigExtensions
{
public static void UseOracle<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox<TDbContext>);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleOutboxConfigExtensions
{
public static void UseOracle<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox<TDbContext>);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Volo.Abp.Guids;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;

namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart
Expand All @@ -17,6 +19,9 @@ public override void ConfigureServices(ServiceConfigurationContext context)
options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary;
}
});

context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>));
context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.Timing;
using Volo.Abp.Uow;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , IOracleDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public OracleDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}

[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();

var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}

[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;

var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}

protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}

0 comments on commit 1ef5656

Please sign in to comment.