Skip to content

Commit

Permalink
Merge pull request #10325 from abpframework/background-jobs-distribut…
Browse files Browse the repository at this point in the history
…ed-lock

Background jobs distributed lock
  • Loading branch information
maliming committed Oct 15, 2021
2 parents 2985d5a + 960619d commit a21fd41
Show file tree
Hide file tree
Showing 40 changed files with 429 additions and 125 deletions.
21 changes: 14 additions & 7 deletions framework/Volo.Abp.sln
Original file line number Diff line number Diff line change
Expand Up @@ -391,14 +391,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.IdentityModel.Test
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Threading.Tests", "test\Volo.Abp.Threading.Tests\Volo.Abp.Threading.Tests.csproj", "{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.EventBus.Boxes", "src\Volo.Abp.EventBus.Boxes\Volo.Abp.EventBus.Boxes.csproj", "{6E289F31-7924-418B-9DAC-62A7CFADF916}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.DistributedLocking", "src\Volo.Abp.DistributedLocking\Volo.Abp.DistributedLocking.csproj", "{9A7EEA08-15BE-476D-8168-53039867038E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Auditing.Contracts", "src\Volo.Abp.Auditing.Contracts\Volo.Abp.Auditing.Contracts.csproj", "{508B6355-AD28-4E60-8549-266D21DBF2CF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.Http.Client.Web", "src\Volo.Abp.Http.Client.Web\Volo.Abp.Http.Client.Web.csproj", "{F7407459-8AFA-45E4-83E9-9BB01412CC08}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.DistributedLocking.Abstractions", "src\Volo.Abp.DistributedLocking.Abstractions\Volo.Abp.DistributedLocking.Abstractions.csproj", "{CA805B77-D50C-431F-B3CB-1111C9C6E807}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.DistributedLocking.Abstractions.Tests", "test\Volo.Abp.DistributedLocking.Abstractions.Tests\Volo.Abp.DistributedLocking.Abstractions.Tests.csproj", "{C4F54FB5-C828-414D-BA03-E8E7A10C784D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Volo.Abp.BackgroundWorkers.Hangfire", "src\Volo.Abp.BackgroundWorkers.Hangfire\Volo.Abp.BackgroundWorkers.Hangfire.csproj", "{E5FCE710-C5A3-4F94-B9C9-BD1E99252BFB}"
EndProject
Global
Expand Down Expand Up @@ -1175,10 +1177,6 @@ Global
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B}.Release|Any CPU.Build.0 = Release|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6E289F31-7924-418B-9DAC-62A7CFADF916}.Release|Any CPU.Build.0 = Release|Any CPU
{9A7EEA08-15BE-476D-8168-53039867038E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9A7EEA08-15BE-476D-8168-53039867038E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9A7EEA08-15BE-476D-8168-53039867038E}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand All @@ -1191,6 +1189,14 @@ Global
{F7407459-8AFA-45E4-83E9-9BB01412CC08}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F7407459-8AFA-45E4-83E9-9BB01412CC08}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F7407459-8AFA-45E4-83E9-9BB01412CC08}.Release|Any CPU.Build.0 = Release|Any CPU
{CA805B77-D50C-431F-B3CB-1111C9C6E807}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CA805B77-D50C-431F-B3CB-1111C9C6E807}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CA805B77-D50C-431F-B3CB-1111C9C6E807}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CA805B77-D50C-431F-B3CB-1111C9C6E807}.Release|Any CPU.Build.0 = Release|Any CPU
{C4F54FB5-C828-414D-BA03-E8E7A10C784D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C4F54FB5-C828-414D-BA03-E8E7A10C784D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C4F54FB5-C828-414D-BA03-E8E7A10C784D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C4F54FB5-C828-414D-BA03-E8E7A10C784D}.Release|Any CPU.Build.0 = Release|Any CPU
{E5FCE710-C5A3-4F94-B9C9-BD1E99252BFB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E5FCE710-C5A3-4F94-B9C9-BD1E99252BFB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E5FCE710-C5A3-4F94-B9C9-BD1E99252BFB}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -1392,10 +1398,11 @@ Global
{90B1866A-EF99-40B9-970E-B898E5AA523F} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{40C6740E-BFCA-4D37-8344-3D84E2044BB2} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{7B2FCAD6-86E6-49C8-ADBE-A61B4F4B101B} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{6E289F31-7924-418B-9DAC-62A7CFADF916} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{9A7EEA08-15BE-476D-8168-53039867038E} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{508B6355-AD28-4E60-8549-266D21DBF2CF} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{F7407459-8AFA-45E4-83E9-9BB01412CC08} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{CA805B77-D50C-431F-B3CB-1111C9C6E807} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
{C4F54FB5-C828-414D-BA03-E8E7A10C784D} = {447C8A77-E5F0-4538-8687-7383196D04EA}
{E5FCE710-C5A3-4F94-B9C9-BD1E99252BFB} = {5DF0E140-0513-4D0D-BE2E-3D4D85CD70E6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<ItemGroup>
<ProjectReference Include="..\Volo.Abp.BackgroundJobs.Abstractions\Volo.Abp.BackgroundJobs.Abstractions.csproj" />
<ProjectReference Include="..\Volo.Abp.BackgroundWorkers\Volo.Abp.BackgroundWorkers.csproj" />
<ProjectReference Include="..\Volo.Abp.Core\Volo.Abp.Core.csproj" />
<ProjectReference Include="..\Volo.Abp.DistributedLocking.Abstractions\Volo.Abp.DistributedLocking.Abstractions.csproj" />
<ProjectReference Include="..\Volo.Abp.Guids\Volo.Abp.Guids.csproj" />
<ProjectReference Include="..\Volo.Abp.Timing\Volo.Abp.Timing.csproj" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.DistributedLocking;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
using Volo.Abp.Timing;
Expand All @@ -11,7 +12,8 @@ namespace Volo.Abp.BackgroundJobs
typeof(AbpBackgroundJobsAbstractionsModule),
typeof(AbpBackgroundWorkersModule),
typeof(AbpTimingModule),
typeof(AbpGuidsModule)
typeof(AbpGuidsModule),
typeof(AbpDistributedLockingAbstractionsModule)
)]
public class AbpBackgroundJobsModule : AbpModule
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,84 +5,108 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.DistributedLocking;
using Volo.Abp.Threading;
using Volo.Abp.Timing;

namespace Volo.Abp.BackgroundJobs
{
public class BackgroundJobWorker : AsyncPeriodicBackgroundWorkerBase, IBackgroundJobWorker
{
protected const string DistributedLockName = "AbpBackgroundJobWorker";

protected AbpBackgroundJobOptions JobOptions { get; }

protected AbpBackgroundJobWorkerOptions WorkerOptions { get; }

protected IAbpDistributedLock DistributedLock { get; }

public BackgroundJobWorker(
AbpAsyncTimer timer,
IOptions<AbpBackgroundJobOptions> jobOptions,
IOptions<AbpBackgroundJobWorkerOptions> workerOptions,
IServiceScopeFactory serviceScopeFactory)
IServiceScopeFactory serviceScopeFactory,
IAbpDistributedLock distributedLock)
: base(
timer,
serviceScopeFactory)
{
DistributedLock = distributedLock;
WorkerOptions = workerOptions.Value;
JobOptions = jobOptions.Value;
Timer.Period = WorkerOptions.JobPollPeriod;
}

protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
{
var store = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobStore>();

var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount);

if (!waitingJobs.Any())
await using (var handler = await DistributedLock.TryAcquireAsync(DistributedLockName, cancellationToken: StoppingToken))
{
return;
}

var jobExecuter = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>();
var clock = workerContext.ServiceProvider.GetRequiredService<IClock>();
var serializer = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>();

foreach (var jobInfo in waitingJobs)
{
jobInfo.TryCount++;
jobInfo.LastTryTime = clock.Now;

try
if (handler != null)
{
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
var context = new JobExecutionContext(workerContext.ServiceProvider, jobConfiguration.JobType, jobArgs);
var store = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobStore>();

try
{
await jobExecuter.ExecuteAsync(context);
var waitingJobs = await store.GetWaitingJobsAsync(WorkerOptions.MaxJobFetchCount);

await store.DeleteAsync(jobInfo.Id);
if (!waitingJobs.Any())
{
return;
}
catch (BackgroundJobExecutionException)

var jobExecuter = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobExecuter>();
var clock = workerContext.ServiceProvider.GetRequiredService<IClock>();
var serializer = workerContext.ServiceProvider.GetRequiredService<IBackgroundJobSerializer>();

foreach (var jobInfo in waitingJobs)
{
var nextTryTime = CalculateNextTryTime(jobInfo, clock);
jobInfo.TryCount++;
jobInfo.LastTryTime = clock.Now;

if (nextTryTime.HasValue)
try
{
jobInfo.NextTryTime = nextTryTime.Value;
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
var context = new JobExecutionContext(
workerContext.ServiceProvider,
jobConfiguration.JobType,
jobArgs);

try
{
await jobExecuter.ExecuteAsync(context);

await store.DeleteAsync(jobInfo.Id);
}
catch (BackgroundJobExecutionException)
{
var nextTryTime = CalculateNextTryTime(jobInfo, clock);

if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
else
{
jobInfo.IsAbandoned = true;
}

await TryUpdateAsync(store, jobInfo);
}
}
else
catch (Exception ex)
{
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
await TryUpdateAsync(store, jobInfo);
}

await TryUpdateAsync(store, jobInfo);
}
}
catch (Exception ex)
else
{
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
await TryUpdateAsync(store, jobInfo);
try
{
await Task.Delay(WorkerOptions.JobPollPeriod * 12, StoppingToken);
}
catch (TaskCanceledException) { }
}
}
}
Expand All @@ -101,7 +125,8 @@ protected virtual async Task TryUpdateAsync(IBackgroundJobStore store, Backgroun

protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1));
var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration *
(Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1));
var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
clock.Now.AddSeconds(nextWaitDuration);

Expand All @@ -113,4 +138,4 @@ protected virtual async Task TryUpdateAsync(IBackgroundJobStore store, Backgroun
return nextTryDate;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ public abstract class BackgroundWorkerBase : IBackgroundWorker
protected ILoggerFactory LoggerFactory => LazyServiceProvider.LazyGetRequiredService<ILoggerFactory>();

protected ILogger Logger => LazyServiceProvider.LazyGetService<ILogger>(provider => LoggerFactory?.CreateLogger(GetType().FullName) ?? NullLogger.Instance);

protected CancellationTokenSource StoppingTokenSource { get; }
protected CancellationToken StoppingToken { get; }

public BackgroundWorkerBase()
{
StoppingTokenSource = new CancellationTokenSource();
StoppingToken = StoppingTokenSource.Token;
}

public virtual Task StartAsync(CancellationToken cancellationToken = default)
{
Expand All @@ -31,6 +40,8 @@ public virtual Task StartAsync(CancellationToken cancellationToken = default)
public virtual Task StopAsync(CancellationToken cancellationToken = default)
{
Logger.LogDebug("Stopped background worker: " + ToString());
StoppingTokenSource.Cancel();
StoppingTokenSource.Dispose();
return Task.CompletedTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<ItemGroup>
<ProjectReference Include="..\Volo.Abp.Auditing\Volo.Abp.Auditing.csproj" />
<ProjectReference Include="..\Volo.Abp.Data\Volo.Abp.Data.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus.Boxes\Volo.Abp.EventBus.Boxes.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
<ProjectReference Include="..\Volo.Abp.ExceptionHandling\Volo.Abp.ExceptionHandling.csproj" />
<ProjectReference Include="..\Volo.Abp.Guids\Volo.Abp.Guids.csproj" />
<ProjectReference Include="..\Volo.Abp.MultiTenancy\Volo.Abp.MultiTenancy.csproj" />
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using Volo.Abp.Data;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.EventBus;
using Volo.Abp.EventBus.Boxes;
using Volo.Abp.ExceptionHandling;
using Volo.Abp.Guids;
using Volo.Abp.Modularity;
Expand All @@ -19,7 +18,7 @@ namespace Volo.Abp.Domain
[DependsOn(
typeof(AbpAuditingModule),
typeof(AbpDataModule),
typeof(AbpEventBusBoxesModule),
typeof(AbpEventBusModule),
typeof(AbpGuidsModule),
typeof(AbpMultiTenancyModule),
typeof(AbpThreadingModule),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<ConfigureAwait ContinueOnCapturedContext="false" />
</Weavers>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>Volo.Abp.EventBus.Boxes</AssemblyName>
<PackageId>Volo.Abp.EventBus.Boxes</PackageId>
<AssemblyName>Volo.Abp.DistributedLocking.Abstractions</AssemblyName>
<PackageId>Volo.Abp.DistributedLocking.Abstractions</PackageId>
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback>
<GenerateAssemblyConfigurationAttribute>false</GenerateAssemblyConfigurationAttribute>
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
Expand All @@ -15,9 +15,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Volo.Abp.BackgroundWorkers\Volo.Abp.BackgroundWorkers.csproj" />
<ProjectReference Include="..\Volo.Abp.DistributedLocking\Volo.Abp.DistributedLocking.csproj" />
<ProjectReference Include="..\Volo.Abp.EventBus\Volo.Abp.EventBus.csproj" />
<ProjectReference Include="..\Volo.Abp.Core\Volo.Abp.Core.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using Volo.Abp.Modularity;

namespace Volo.Abp.DistributedLocking
{
public class AbpDistributedLockingAbstractionsModule : AbpModule
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace Volo.Abp.DistributedLocking
{
public interface IAbpDistributedLock
{
/// <summary>
/// Tries to acquire a named lock.
/// Returns a disposable object to release the lock.
/// It is suggested to use this method within a using block.
/// Returns null if the lock could not be handled.
/// </summary>
/// <param name="name">The name of the lock</param>
/// <param name="timeout">Timeout value</param>
/// <param name="cancellationToken">Cancellation token</param>
[ItemCanBeNull]
Task<IAbpDistributedLockHandle> TryAcquireAsync(
[NotNull] string name,
TimeSpan timeout = default,
CancellationToken cancellationToken = default
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System;

namespace Volo.Abp.DistributedLocking
{
public interface IAbpDistributedLockHandle : IAsyncDisposable
{

}
}

0 comments on commit a21fd41

Please sign in to comment.