Skip to content

Commit

Permalink
Refactor quartz job scheduler into async background service
Browse files Browse the repository at this point in the history
When a system has many suspended workflows, it takes a long time for Quartz to schedule each trigger, blocking application startup.
Moving this logic to a hosted service allows this process to happen in the background, not blocking startup
  • Loading branch information
sfmskywalker committed Jul 9, 2021
1 parent d7d8c5e commit 8e97f7a
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using System;
using Elsa.Activities.Temporal.Common.Bookmarks;
using Elsa.Activities.Temporal.Common.Handlers;
using Elsa.Activities.Temporal.Common.HostedServices;
using Elsa.Activities.Temporal.Common.Options;
using Elsa.Activities.Temporal.Common.StartupTasks;
using Elsa.HostedServices;
using Elsa.Runtime;
using Microsoft.Extensions.DependencyInjection;

Expand Down Expand Up @@ -31,7 +32,7 @@ public static ElsaOptionsBuilder AddCommonTemporalActivities(this ElsaOptionsBui

options.Services
.AddNotificationHandlers(typeof(RemoveScheduledTriggers))
.AddStartupTask<StartJobs>()
.AddHostedService<ScopedBackgroundService<StartJobs>>()
.AddBookmarkProvider<TimerBookmarkProvider>()
.AddBookmarkProvider<CronBookmarkProvider>()
.AddBookmarkProvider<StartAtBookmarkProvider>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,70 +2,98 @@
using System.Threading.Tasks;
using Elsa.Activities.Temporal.Common.Bookmarks;
using Elsa.Activities.Temporal.Common.Services;
using Elsa.HostedServices;
using Elsa.Services;
using Elsa.Services.Bookmarks;
using Microsoft.Extensions.Logging;
using Open.Linq.AsyncExtensions;

namespace Elsa.Activities.Temporal.Common.StartupTasks
namespace Elsa.Activities.Temporal.Common.HostedServices
{
/// <summary>
/// Starts jobs based on workflow instances blocked on a Timer, Cron or StartAt activity.
/// </summary>
public class StartJobs : IStartupTask
public class StartJobs : IScopedBackgroundService
{
// TODO: Figure out how to start jobs across multiple tenants / how to get a list of all tenants.
private const string? TenantId = default;

private readonly IBookmarkFinder _bookmarkFinder;
private readonly IWorkflowInstanceScheduler _workflowScheduler;
private readonly IDistributedLockProvider _distributedLockProvider;
private readonly ILogger<StartJobs> _logger;

public StartJobs(IBookmarkFinder bookmarkFinder, IWorkflowInstanceScheduler workflowScheduler)
public StartJobs(IBookmarkFinder bookmarkFinder, IWorkflowInstanceScheduler workflowScheduler, IDistributedLockProvider distributedLockProvider, ILogger<StartJobs> logger)
{
_bookmarkFinder = bookmarkFinder;
_workflowScheduler = workflowScheduler;
_distributedLockProvider = distributedLockProvider;
_logger = logger;
}

public int Order => 2000;

public async Task ExecuteAsync(CancellationToken cancellationToken)
public async Task ExecuteAsync(CancellationToken stoppingToken)
{
await ScheduleTimerEventWorkflowsAsync(cancellationToken);
await ScheduleCronEventWorkflowsAsync(cancellationToken);
await ScheduleStartAtWorkflowsAsync(cancellationToken);
await using var handle = await _distributedLockProvider.AcquireLockAsync(nameof(StartJobs), null, stoppingToken);

if (handle == null)
return;

await ScheduleTimerEventWorkflowsAsync(stoppingToken);
await ScheduleCronEventWorkflowsAsync(stoppingToken);
await ScheduleStartAtWorkflowsAsync(stoppingToken);
}

private async Task ScheduleStartAtWorkflowsAsync(CancellationToken cancellationToken)
{
// Schedule workflow instances that are blocked on a start-at.
var bookmarkResults = await _bookmarkFinder.FindBookmarksAsync<StartAt>(tenantId: TenantId, cancellationToken: cancellationToken);
var bookmarkResults = await _bookmarkFinder.FindBookmarksAsync<StartAt>(tenantId: TenantId, cancellationToken: cancellationToken).ToList();

_logger.LogDebug("Found {BookmarkResultCount} bookmarks for StartAt", bookmarkResults.Count);
var index = 0;

foreach (var result in bookmarkResults)
{
var bookmark = (StartAtBookmark) result.Bookmark;
await _workflowScheduler.ScheduleAsync(result.WorkflowInstanceId!, result.ActivityId, bookmark.ExecuteAt, null, cancellationToken);

index++;
_logger.LogDebug("Scheduled {CurrentBookmarkIndex} of {BookmarkResultCount}", index, bookmarkResults.Count);
}
}

private async Task ScheduleTimerEventWorkflowsAsync(CancellationToken cancellationToken)
{
// Schedule workflow instances that are blocked on a timer.
var bookmarkResults = await _bookmarkFinder.FindBookmarksAsync<Timer>(tenantId: TenantId, cancellationToken: cancellationToken);
var bookmarkResults = await _bookmarkFinder.FindBookmarksAsync<Timer>(tenantId: TenantId, cancellationToken: cancellationToken).ToList();

_logger.LogDebug("Found {BookmarkResultCount} bookmarks for Timer", bookmarkResults.Count);
var index = 0;

foreach (var result in bookmarkResults)
{
var bookmark = (TimerBookmark) result.Bookmark;
await _workflowScheduler.ScheduleAsync(result.WorkflowInstanceId!, result.ActivityId, bookmark.ExecuteAt, null, cancellationToken);

index++;
_logger.LogDebug("Scheduled {CurrentBookmarkIndex} of {BookmarkResultCount}", index, bookmarkResults.Count);
}
}

private async Task ScheduleCronEventWorkflowsAsync(CancellationToken cancellationToken)
{
// Schedule workflow instances blocked on a cron event.
var cronEventTriggers = await _bookmarkFinder.FindBookmarksAsync<Cron>(tenantId: TenantId, cancellationToken: cancellationToken);
var bookmarkResults = await _bookmarkFinder.FindBookmarksAsync<Cron>(tenantId: TenantId, cancellationToken: cancellationToken).ToList();

_logger.LogDebug("Found {BookmarkResultCount} bookmarks for StartAt", bookmarkResults.Count);
var index = 0;

foreach (var result in cronEventTriggers)
foreach (var result in bookmarkResults)
{
var trigger = (CronBookmark) result.Bookmark;
await _workflowScheduler.ScheduleAsync(result.WorkflowInstanceId!, result.ActivityId, trigger.ExecuteAt!.Value, null, cancellationToken);

index++;
_logger.LogDebug("Scheduled {CurrentBookmarkIndex} of {BookmarkResultCount}", index, bookmarkResults.Count);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading.Tasks;
using Elsa.Activities.Temporal.Common.Services;
using Elsa.Activities.Temporal.Quartz.Jobs;
using Elsa.Services;
using Microsoft.Extensions.Logging;
using NodaTime;
using Quartz;
Expand All @@ -13,12 +14,15 @@ public class QuartzWorkflowDefinitionScheduler : IWorkflowDefinitionScheduler
{
private static readonly string RunWorkflowJobKey = nameof(RunQuartzWorkflowDefinitionJob);
private readonly QuartzSchedulerProvider _schedulerProvider;
private readonly IDistributedLockProvider _distributedLockProvider;
private readonly ElsaOptions _elsaOptions;
private readonly ILogger _logger;
private readonly SemaphoreSlim _semaphore = new(1);

public QuartzWorkflowDefinitionScheduler(QuartzSchedulerProvider schedulerProvider, ILogger<QuartzWorkflowDefinitionScheduler> logger)
public QuartzWorkflowDefinitionScheduler(QuartzSchedulerProvider schedulerProvider, IDistributedLockProvider distributedLockProvider, ElsaOptions elsaOptions, ILogger<QuartzWorkflowDefinitionScheduler> logger)
{
_schedulerProvider = schedulerProvider;
_distributedLockProvider = distributedLockProvider;
_elsaOptions = elsaOptions;
_logger = logger;
}

Expand Down Expand Up @@ -69,7 +73,11 @@ public async Task UnscheduleAllAsync(CancellationToken cancellationToken)
private async Task ScheduleJob(ITrigger trigger, CancellationToken cancellationToken)
{
var scheduler = await _schedulerProvider.GetSchedulerAsync(cancellationToken);
await _semaphore.WaitAsync(cancellationToken);
var sharedResource = $"{nameof(QuartzWorkflowInstanceScheduler)}:{trigger.Key}";
await using var handle = await _distributedLockProvider.AcquireLockAsync(sharedResource, _elsaOptions.DistributedLockTimeout, cancellationToken);

if (handle == null)
return;

try
{
Expand All @@ -83,10 +91,6 @@ private async Task ScheduleJob(ITrigger trigger, CancellationToken cancellationT
{
_logger.LogWarning(e, "Failed to schedule trigger {TriggerKey}", trigger.Key.ToString());
}
finally
{
_semaphore.Release();
}
}

private TriggerBuilder CreateTrigger(string workflowDefinitionId, string activityId) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@
using System.Threading.Tasks;
using Elsa.Activities.Temporal.Common.Services;
using Elsa.Activities.Temporal.Quartz.Jobs;
using Medallion.Threading;
using Microsoft.Extensions.Logging;
using NodaTime;
using Quartz;
using Quartz.Impl.Matchers;
using IDistributedLockProvider = Elsa.Services.IDistributedLockProvider;

namespace Elsa.Activities.Temporal.Quartz.Services
{
public class QuartzWorkflowInstanceScheduler : IWorkflowInstanceScheduler
{
private static readonly string RunWorkflowJobKey = nameof(RunQuartzWorkflowInstanceJob);
private readonly QuartzSchedulerProvider _schedulerProvider;
private readonly IDistributedLockProvider _distributedLockProvider;
private readonly ElsaOptions _elsaOptions;
private readonly ILogger _logger;
private readonly SemaphoreSlim _semaphore = new(1);

public QuartzWorkflowInstanceScheduler(QuartzSchedulerProvider schedulerProvider, ILogger<QuartzWorkflowInstanceScheduler> logger)
public QuartzWorkflowInstanceScheduler(QuartzSchedulerProvider schedulerProvider, IDistributedLockProvider distributedLockProvider, ElsaOptions elsaOptions, ILogger<QuartzWorkflowInstanceScheduler> logger)
{
_schedulerProvider = schedulerProvider;
_distributedLockProvider = distributedLockProvider;
_elsaOptions = elsaOptions;
_logger = logger;
}

Expand Down Expand Up @@ -68,8 +73,12 @@ public async Task UnscheduleAllAsync(CancellationToken cancellationToken)

private async Task ScheduleJob(ITrigger trigger, CancellationToken cancellationToken)
{
var scheduler = await _schedulerProvider. GetSchedulerAsync(cancellationToken);
await _semaphore.WaitAsync(cancellationToken);
var scheduler = await _schedulerProvider.GetSchedulerAsync(cancellationToken);
var sharedResource = $"{nameof(QuartzWorkflowInstanceScheduler)}:{trigger.Key}";
await using var handle = await _distributedLockProvider.AcquireLockAsync(sharedResource, _elsaOptions.DistributedLockTimeout, cancellationToken);

if (handle == null)
return;

try
{
Expand All @@ -84,10 +93,6 @@ private async Task ScheduleJob(ITrigger trigger, CancellationToken cancellationT
{
_logger.LogWarning(e, "Failed to schedule trigger {TriggerKey}", trigger.Key.ToString());
}
finally
{
_semaphore.Release();
}
}

private TriggerBuilder CreateTrigger(string workflowInstanceId, string activityId) =>
Expand Down
10 changes: 10 additions & 0 deletions src/core/Elsa.Core/HostedServices/IScopedBackgroundService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Threading;
using System.Threading.Tasks;

namespace Elsa.HostedServices
{
public interface IScopedBackgroundService
{
Task ExecuteAsync(CancellationToken cancellationToken);
}
}
24 changes: 24 additions & 0 deletions src/core/Elsa.Core/HostedServices/ScopedBackgroundService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Elsa.HostedServices
{
/// <summary>
/// Executed the specified worker within a scoped-lifetime scope.
/// </summary>
public class ScopedBackgroundService<TWorker> : BackgroundService where TWorker:IScopedBackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;

public ScopedBackgroundService(IServiceScopeFactory scopeFactory) => _scopeFactory = scopeFactory;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var scope = _scopeFactory.CreateScope();
var worker = (IScopedBackgroundService)ActivatorUtilities.GetServiceOrCreateInstance<TWorker>(scope.ServiceProvider);
await worker.ExecuteAsync(stoppingToken);
}
}
}

0 comments on commit 8e97f7a

Please sign in to comment.