Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbox & Inbox patterns for the distributed event bus #10008

Merged
merged 40 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
6162581
Added outbox parameter and dbcontext abstraction
hikalkan Sep 7, 2021
5f50d33
Add AbpDistributedEventBusOptions to DistributedEventBusBase
hikalkan Sep 7, 2021
d1ea473
save to outbox
hikalkan Sep 8, 2021
eb7765c
Refactored configuration, added CreationTime
hikalkan Sep 8, 2021
3a89cb4
Added Volo.Abp.EventBus.Boxes.csproj
hikalkan Sep 8, 2021
afcd640
Basic implementation is done for the event outbox.
hikalkan Sep 8, 2021
6cb6b17
Update OutboxSender.cs
hikalkan Sep 8, 2021
7396932
Merge branch 'dev' into eventboxes
hikalkan Sep 8, 2021
ae95de6
Introduced Volo.Abp.DistributedLocking package
hikalkan Sep 8, 2021
8271fc9
Implemented distributed locking for the event outbox
hikalkan Sep 8, 2021
79015b0
Added cancellation tokens
hikalkan Sep 8, 2021
9593c87
Save incoming events to inbox
hikalkan Sep 9, 2021
8af7ccd
Implemented initial inbox processing logic
hikalkan Sep 9, 2021
2aa45fe
Check handlerselector for inbox event processing
hikalkan Sep 10, 2021
eb47e50
Apply ABP concepts on state change, not on savechanges.
hikalkan Sep 10, 2021
51ce477
Eliminate duplicate incoming events
hikalkan Sep 10, 2021
4e0e0de
Regularly delete old events
hikalkan Sep 10, 2021
3b7d151
Added initial mongodb boxing services
hikalkan Sep 10, 2021
59dada8
Refactored the example solution. Added mongodb extensions
hikalkan Sep 10, 2021
c8d5257
Fix concurrencystamp & tests
hikalkan Sep 10, 2021
60d97be
Use `AbpAsyncTimer` to replace `AbpTimer`.
maliming Sep 14, 2021
166ab48
Merge branch 'dev' into eventboxes
hikalkan Sep 14, 2021
8114a62
delete old todo note
hikalkan Sep 14, 2021
2db1a1c
Fast stop on application stop.
hikalkan Sep 15, 2021
4af63fe
Rename IRawEventPublisher to ISupportsEventBoxes and refactor methods.
hikalkan Sep 15, 2021
8200298
Remove unnecessary log
hikalkan Sep 15, 2021
8859803
Remove old TODOs
hikalkan Sep 15, 2021
cc14ae6
Define index for the creationtime.
hikalkan Sep 15, 2021
fc70729
Rename local variable: rawPublisher to supportsEventBoxes.
hikalkan Sep 15, 2021
9ecea2d
Update TodoEventHandler.cs
hikalkan Sep 28, 2021
ac5d536
Merge branch 'eventboxes' into liangshiwei/eventboxes
realLiangshiwei Sep 28, 2021
0cf5d24
Complete the Outbox & Inbox Patterns feature
realLiangshiwei Sep 28, 2021
f0f6188
Improve performance
realLiangshiwei Sep 28, 2021
bc4944c
Add ISqlAdapter
realLiangshiwei Sep 29, 2021
b809ee5
Refactor
realLiangshiwei Sep 29, 2021
28da0b8
Improve
realLiangshiwei Sep 29, 2021
3ee9f57
Improve
realLiangshiwei Sep 30, 2021
5291b97
Update OracleDbContextEventInbox.cs
realLiangshiwei Sep 30, 2021
cca31d2
Merge pull request #10159 from abpframework/liangshiwei/eventboxes
hikalkan Sep 30, 2021
6cf2a5b
Merge branch 'dev' into eventboxes
hikalkan Sep 30, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions framework/Volo.Abp.sln
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,9 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.IdentityModel.Test
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Threading.Tests", "test\Volo.Abp.Threading.Tests\Volo.Abp.Threading.Tests.csproj", "{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.EventBus.Boxes", "src\Volo.Abp.EventBus.Boxes\Volo.Abp.EventBus.Boxes.csproj", "{6E289F31-7924-418B-9DAC-62A7CFADF916}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.DistributedLocking", "src\Volo.Abp.DistributedLocking\Volo.Abp.DistributedLocking.csproj", "{9A7EEA08-15BE-476D-8168-53039867038E}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Auditing.Contracts", "src\Volo.Abp.Auditing.Contracts\Volo.Abp.Auditing.Contracts.csproj", "{508B6355-AD28-4E60-8549-266D21DBF2CF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Http.Client.Web", "src\Volo.Abp.Http.Client.Web\Volo.Abp.Http.Client.Web.csproj", "{F7407459-8AFA-45E4-83E9-9BB01412CC08}"
Expand Down Expand Up @@ -1157,6 +1160,14 @@ Global
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}.Release|Any CPU.Build.0 = Release|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Release|Any CPU.Build.0 = Release|Any CPU
{9A7EEA08-15BE-476D-8168-53039867038E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9A7EEA08-15BE-476D-8168-53039867038E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9A7EEA08-15BE-476D-8168-53039867038E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9A7EEA08-15BE-476D-8168-53039867038E}.Release|Any CPU.Build.0 = Release|Any CPU
{508B6355-AD28-4E60-8549-266D21DBF2CF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{508B6355-AD28-4E60-8549-266D21DBF2CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{508B6355-AD28-4E60-8549-266D21DBF2CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -1360,6 +1371,8 @@ Global
{90B1866A-EF99-40B9-970E-B898E5AA523F} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{40C6740E-BFCA-4D37-8344-3D84E2044BB2} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{6E289F31-7924-418B-9DAC-62A7CFADF916} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{9A7EEA08-15BE-476D-8168-53039867038E} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{508B6355-AD28-4E60-8549-266D21DBF2CF} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{F7407459-8AFA-45E4-83E9-9BB01412CC08} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
EndGlobalSection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ public override void OnApplicationInitialization(ApplicationInitializationContex
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
if (options.IsJobExecutionEnabled)
{
context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.Add(
context.ServiceProvider
.GetRequiredService<IBackgroundJobWorker>()
);
context.AddBackgroundWorker<IBackgroundJobWorker>();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<ItemGroup>
<ProjectReference Include="..\Volo.Abp.Auditing\Volo.Abp.Auditing.csproj" />
<ProjectReference Include="..\Volo.Abp.Data\Volo.Abp.Data.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus.Boxes\Volo.Abp.EventBus.Boxes.csproj" />
<ProjectReference Include="..\Volo.Abp.ExceptionHandling\Volo.Abp.ExceptionHandling.csproj" />
<ProjectReference Include="..\Volo.Abp.Guids\Volo.Abp.Guids.csproj" />
<ProjectReference Include="..\Volo.Abp.MultiTenancy\Volo.Abp.MultiTenancy.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Volo.Abp.Data;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.ExceptionHandling;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
Expand All @@ -18,7 +19,7 @@ namespace Volo.Abp.Domain
[DependsOn(
typeof(AbpAuditingModule),
typeof(AbpDataModule),
typeof(AbpEventBusModule),
typeof(AbpEventBusBoxesModule),
typeof(AbpGuidsModule),
typeof(AbpMultiTenancyModule),
typeof(AbpThreadingModule),
Expand Down
3 changes: 3 additions & 0 deletions framework/src/Volo.Abp.DistributedLocking/FodyWeavers.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>
30 changes: 30 additions & 0 deletions framework/src/Volo.Abp.DistributedLocking/FodyWeavers.xsd
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="ConfigureAwait" minOccurs="0" maxOccurs="1">
<xs:complexType>
<xs:attribute name="ContinueOnCapturedContext" type="xs:boolean" />
</xs:complexType>
</xs:element>
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\..\..\configureawait.props" />
<Import Project="..\..\..\common.props" />

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>Volo.Abp.DistributedLocking</AssemblyName>
<PackageId>Volo.Abp.DistributedLocking</PackageId>
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<RootNamespace />
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Volo.Abp.Core\Volo.Abp.Core.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="DistributedLock.Core" Version="1.0.2" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Volo.Abp.Modularity;

namespace Volo.Abp.DistributedLocking
{
public class AbpDistributedLockingModule : AbpModule
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class MySQLInboxConfigExtensions
{
public static void UseMySQL<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventInbox<TDbContext>);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class MySQLOutboxConfigExtensions
{
public static void UseMySQL<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(ISqlRawDbContextEventOutbox<TDbContext>);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.EventBus.Distributed;
using Volo.Abp.Timing;
using Volo.Abp.Uow;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , IOracleDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public OracleDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}

[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();

var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}

[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;

var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}

protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Volo.Abp.Uow;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventOutbox<TDbContext> : DbContextEventOutbox<TDbContext> , IOracleDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
public OracleDbContextEventOutbox(IDbContextProvider<TDbContext> dbContextProvider)
: base(dbContextProvider)
{
}

[UnitOfWork]
public override async Task DeleteAsync(Guid id)
{
var dbContext = (IHasEventOutbox) await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.OutgoingEvents.EntityType.GetSchemaQualifiedTableName();

var sql = $"DELETE FROM \"{tableName}\" WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}

protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleInboxConfigExtensions
{
public static void UseOracle<TDbContext>(this InboxConfig outboxConfig)
where TDbContext : IHasEventInbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventInbox<TDbContext>);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Volo.Abp.EventBus.Distributed;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public static class OracleOutboxConfigExtensions
{
public static void UseOracle<TDbContext>(this OutboxConfig outboxConfig)
where TDbContext : IHasEventOutbox
{
outboxConfig.ImplementationType = typeof(IOracleDbContextEventOutbox<TDbContext>);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Volo.Abp.Guids;
using Microsoft.Extensions.DependencyInjection;
using Volo.Abp.EntityFrameworkCore.DistributedEvents;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;

namespace Volo.Abp.EntityFrameworkCore.Oracle.Devart
Expand All @@ -17,6 +19,9 @@ public override void ConfigureServices(ServiceConfigurationContext context)
options.DefaultSequentialGuidType = SequentialGuidType.SequentialAsBinary;
}
});

context.Services.AddTransient(typeof(IOracleDbContextEventOutbox<>), typeof(OracleDbContextEventOutbox<>));
context.Services.AddTransient(typeof(IOracleDbContextEventInbox<>), typeof(OracleDbContextEventInbox<>));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventInbox<TDbContext> : IDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public interface IOracleDbContextEventOutbox<TDbContext> : IDbContextEventOutbox<TDbContext>
where TDbContext : IHasEventOutbox
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.Timing;
using Volo.Abp.Uow;

namespace Volo.Abp.EntityFrameworkCore.DistributedEvents
{
public class OracleDbContextEventInbox<TDbContext> : DbContextEventInbox<TDbContext> , IOracleDbContextEventInbox<TDbContext>
where TDbContext : IHasEventInbox
{
public OracleDbContextEventInbox(
IDbContextProvider<TDbContext> dbContextProvider,
IClock clock,
IOptions<AbpEventBusBoxesOptions> eventBusBoxesOptions)
: base(dbContextProvider, clock, eventBusBoxesOptions)
{
}

[UnitOfWork]
public override async Task MarkAsProcessedAsync(Guid id)
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();

var sql = $"UPDATE \"{tableName}\" SET \"Processed\" = '1', \"ProcessedTime\" = TO_DATE('{Clock.Now}', 'yyyy-mm-dd hh24:mi:ss') WHERE \"Id\" = HEXTORAW('{GuidToOracleType(id)}')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}

[UnitOfWork]
public override async Task DeleteOldEventsAsync()
{
var dbContext = await DbContextProvider.GetDbContextAsync();
var tableName = dbContext.IncomingEvents.EntityType.GetSchemaQualifiedTableName();
var timeToKeepEvents = Clock.Now - EventBusBoxesOptions.WaitTimeToDeleteProcessedInboxEvents;

var sql = $"DELETE FROM \"{tableName}\" WHERE \"Processed\" = '1' AND \"CreationTime\" < TO_DATE('{timeToKeepEvents}', 'yyyy-mm-dd hh24:mi:ss')";
await dbContext.Database.ExecuteSqlRawAsync(sql);
}

protected virtual string GuidToOracleType(Guid id)
{
return BitConverter.ToString(id.ToByteArray()).Replace("-", "").ToUpper();
}
}
}