Skip to content

Commit

Permalink
Hangfire job names (#4428)
Browse files Browse the repository at this point in the history
* Update hangfire to 1.8.5

* Set job names for hangfire

* Fix potential null reference in QueryRecurringJobs
  • Loading branch information
johnwc committed Nov 9, 2023
1 parent 5948f3f commit 173ae2a
Show file tree
Hide file tree
Showing 17 changed files with 129 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Elsa.Activities.Temporal.Common.Bookmarks;
using Elsa.Activities.Temporal.Common.Messages;
using Elsa.Activities.Temporal.Common.Services;
using Elsa.Models;
using Elsa.Services;
using Microsoft.Extensions.Logging;
using Rebus.Handlers;
Expand All @@ -13,12 +14,14 @@ public class ScheduleTriggerConsumer : IHandleMessages<ScheduleTemporalTrigger>
{
private readonly IBookmarkSerializer _bookmarkSerializer;
private readonly IWorkflowDefinitionScheduler _workflowDefinitionScheduler;
private readonly IWorkflowRegistry _workflowRegistry;
private readonly ILogger _logger;

public ScheduleTriggerConsumer(IBookmarkSerializer bookmarkSerializer, IWorkflowDefinitionScheduler workflowDefinitionScheduler, ILogger<ScheduleTriggerConsumer> logger)
public ScheduleTriggerConsumer(IBookmarkSerializer bookmarkSerializer, IWorkflowDefinitionScheduler workflowDefinitionScheduler, IWorkflowRegistry workflowRegistry, ILogger<ScheduleTriggerConsumer> logger)
{
_bookmarkSerializer = bookmarkSerializer;
_workflowDefinitionScheduler = workflowDefinitionScheduler;
_workflowRegistry = workflowRegistry;
_logger = logger;
}

Expand All @@ -29,18 +32,21 @@ public async Task Handle(ScheduleTemporalTrigger message)
var bookmarkType = Type.GetType(bookmarkTypeName)!;
var model = _bookmarkSerializer.Deserialize(trigger.Model, bookmarkType);

var workflowBlueprint = await _workflowRegistry.FindAsync(trigger.WorkflowDefinitionId, VersionOptions.Published, trigger.TenantId);
var jobName = workflowBlueprint?.GetJobName(trigger.ActivityId) ?? trigger.WorkflowDefinitionId;

_logger.LogDebug("Scheduling trigger {@Trigger}", model);

switch (model)
{
case TimerBookmark timerBookmark:
await _workflowDefinitionScheduler.ScheduleAsync(trigger.WorkflowDefinitionId, trigger.ActivityId, timerBookmark.ExecuteAt, timerBookmark.Interval);
await _workflowDefinitionScheduler.ScheduleAsync(jobName, trigger.WorkflowDefinitionId, trigger.ActivityId, timerBookmark.ExecuteAt, timerBookmark.Interval);
break;
case StartAtBookmark startAtBookmark:
await _workflowDefinitionScheduler.ScheduleAsync(trigger.WorkflowDefinitionId, trigger.ActivityId, startAtBookmark.ExecuteAt, null);
await _workflowDefinitionScheduler.ScheduleAsync(jobName, trigger.WorkflowDefinitionId, trigger.ActivityId, startAtBookmark.ExecuteAt, null);
break;
case CronBookmark cronBookmark:
await _workflowDefinitionScheduler.ScheduleAsync(trigger.WorkflowDefinitionId!, trigger.ActivityId, cronBookmark.CronExpression);
await _workflowDefinitionScheduler.ScheduleAsync(jobName, trigger.WorkflowDefinitionId!, trigger.ActivityId, cronBookmark.CronExpression);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using System;
using System;
using System.Threading;
using System.Threading.Tasks;
using Elsa.Activities.Temporal.Common.Bookmarks;
using Elsa.Activities.Temporal.Common.Services;
using Elsa.Events;
using Elsa.Models;
using Elsa.Services;
using MediatR;
using Microsoft.Extensions.Logging;
Expand All @@ -18,17 +19,20 @@ public class ScheduleTimers : INotificationHandler<TriggerIndexingFinished>, INo
private readonly IWorkflowDefinitionScheduler _workflowDefinitionScheduler;
private readonly IWorkflowInstanceScheduler _workflowInstanceScheduler;
private readonly IBookmarkSerializer _bookmarkSerializer;
private readonly IWorkflowRegistry _workflowRegistry;
private readonly ILogger<ScheduleTimers> _logger;

public ScheduleTimers(
IWorkflowDefinitionScheduler workflowDefinitionScheduler,
IWorkflowInstanceScheduler workflowInstanceScheduler,
IBookmarkSerializer bookmarkSerializer,
IBookmarkSerializer bookmarkSerializer,
IWorkflowRegistry workflowRegistry,
ILogger<ScheduleTimers> logger)
{
_workflowDefinitionScheduler = workflowDefinitionScheduler;
_workflowInstanceScheduler = workflowInstanceScheduler;
_bookmarkSerializer = bookmarkSerializer;
_workflowRegistry = workflowRegistry;
_logger = logger;
}

Expand All @@ -40,22 +44,30 @@ public async Task Handle(TriggerIndexingFinished notification, CancellationToken

await _workflowDefinitionScheduler.UnscheduleAsync(notification.WorkflowDefinitionId, cancellationToken);

var workflowBlueprint = await _workflowRegistry.FindAsync(notification.WorkflowDefinitionId, VersionOptions.Published, null, cancellationToken);

foreach (var trigger in startAtTriggers)
{
var jobName = workflowBlueprint?.GetJobName(trigger.ActivityId) ?? trigger.WorkflowDefinitionId;

var bookmark = _bookmarkSerializer.Deserialize<StartAtBookmark>(trigger.Model);
await Try(() => _workflowDefinitionScheduler.ScheduleAsync(trigger.WorkflowDefinitionId, trigger.ActivityId, bookmark.ExecuteAt, null, cancellationToken));
await Try(() => _workflowDefinitionScheduler.ScheduleAsync(jobName, trigger.WorkflowDefinitionId, trigger.ActivityId, bookmark.ExecuteAt, null, cancellationToken));
}

foreach (var trigger in timerTriggers)
{
var jobName = workflowBlueprint?.GetJobName(trigger.ActivityId) ?? trigger.WorkflowDefinitionId;

var bookmark = _bookmarkSerializer.Deserialize<TimerBookmark>(trigger.Model);
await Try(() => _workflowDefinitionScheduler.ScheduleAsync(trigger.WorkflowDefinitionId, trigger.ActivityId, bookmark.ExecuteAt, bookmark.Interval, cancellationToken));
await Try(() => _workflowDefinitionScheduler.ScheduleAsync(jobName, trigger.WorkflowDefinitionId, trigger.ActivityId, bookmark.ExecuteAt, bookmark.Interval, cancellationToken));
}

foreach (var trigger in cronTriggers)
{
var jobName = workflowBlueprint?.GetJobName(trigger.ActivityId) ?? trigger.WorkflowDefinitionId;

var bookmark = _bookmarkSerializer.Deserialize<CronBookmark>(trigger.Model);
await Try(() => _workflowDefinitionScheduler.ScheduleAsync(trigger.WorkflowDefinitionId, trigger.ActivityId, bookmark.CronExpression, cancellationToken));
await Try(() => _workflowDefinitionScheduler.ScheduleAsync(jobName, trigger.WorkflowDefinitionId, trigger.ActivityId, bookmark.CronExpression, cancellationToken));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ namespace Elsa.Activities.Temporal.Common.Services
{
public interface IWorkflowDefinitionScheduler
{
Task ScheduleAsync(string workflowDefinitionId, string activityId, Instant startAt, Duration? interval, CancellationToken cancellationToken = default);
Task ScheduleAsync(string workflowDefinitionId, string activityId, string cronExpression, CancellationToken cancellationToken = default);
Task ScheduleAsync(string jobName, string workflowDefinitionId, string activityId, Instant startAt, Duration? interval, CancellationToken cancellationToken = default);
Task ScheduleAsync(string jobName, string workflowDefinitionId, string activityId, string cronExpression, CancellationToken cancellationToken = default);
Task UnscheduleAsync(string workflowDefinitionId, string activityId, CancellationToken cancellationToken = default);
Task UnscheduleAsync(string workflowDefinitionId, CancellationToken cancellationToken = default);
Task UnscheduleAllAsync(CancellationToken cancellationToken = default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.4" />
<PackageReference Include="Hangfire.AspNetCore" Version="1.8.5" />
<PackageReference Include="ncrontab" Version="3.3.1" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
using System.Threading.Tasks;
using Elsa.Activities.Temporal.Hangfire.Models;
using Elsa.Services;
using Hangfire;

namespace Elsa.Activities.Temporal.Hangfire.Jobs
{
public class RunHangfireWorkflowDefinitionJob
{
private readonly IWorkflowDefinitionDispatcher _workflowDefinitionDispatcher;
public RunHangfireWorkflowDefinitionJob(IWorkflowDefinitionDispatcher workflowDefinitionDispatcher) => _workflowDefinitionDispatcher = workflowDefinitionDispatcher;
public async Task ExecuteAsync(RunHangfireWorkflowDefinitionJobModel data) => await _workflowDefinitionDispatcher.DispatchAsync(new ExecuteWorkflowDefinitionRequest(data.WorkflowDefinitionId, data.ActivityId, IgnoreAlreadyRunningAndSingleton: true));
public RunHangfireWorkflowDefinitionJob(IWorkflowDefinitionDispatcher workflowDefinitionDispatcher)
=> _workflowDefinitionDispatcher = workflowDefinitionDispatcher;

[JobDisplayName("{0}")]
public async Task ExecuteAsync(string jobName, RunHangfireWorkflowDefinitionJobModel data)
=> await _workflowDefinitionDispatcher.DispatchAsync(new ExecuteWorkflowDefinitionRequest(data.WorkflowDefinitionId, data.ActivityId, IgnoreAlreadyRunningAndSingleton: true));
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand All @@ -6,9 +7,12 @@
using Elsa.Activities.Temporal.Hangfire.Extensions;
using Elsa.Activities.Temporal.Hangfire.Jobs;
using Elsa.Activities.Temporal.Hangfire.Models;
using Elsa.Models;
using Elsa.Services;
using Hangfire;
using Hangfire.Common;
using Hangfire.Storage;
using Microsoft.Extensions.DependencyInjection;
using NodaTime;

namespace Elsa.Activities.Temporal.Hangfire.Services
Expand All @@ -17,6 +21,7 @@ public class HangfireWorkflowDefinitionScheduler : IWorkflowDefinitionScheduler
{
private readonly IBackgroundJobClient _backgroundJobClient;
private readonly IRecurringJobManager _recurringJobManager;
private readonly IServiceProvider _serviceProvider;
private readonly JobStorage _jobStorage;

public HangfireWorkflowDefinitionScheduler(IBackgroundJobClient backgroundJobClient, IRecurringJobManager recurringJobManager, JobStorage jobStorage)
Expand All @@ -26,23 +31,23 @@ public HangfireWorkflowDefinitionScheduler(IBackgroundJobClient backgroundJobCli
_jobStorage = jobStorage;
}

public Task ScheduleAsync(string workflowDefinitionId, string activityId, Instant startAt, Duration? interval, CancellationToken cancellationToken = default)
public Task ScheduleAsync(string jobName, string workflowDefinitionId, string activityId, Instant startAt, Duration? interval, CancellationToken cancellationToken = default)
{
var cronExpression = interval?.ToCronExpression();
var data = CreateData(workflowDefinitionId, activityId, cronExpression);

ScheduleJob(data, startAt);
ScheduleJob(jobName, data, startAt);

if (cronExpression != null)
ScheduleRecurringJob(data, cronExpression);
ScheduleRecurringJob(jobName, data, cronExpression);

return Task.CompletedTask;
}

public Task ScheduleAsync(string workflowDefinitionId, string activityId, string cronExpression, CancellationToken cancellationToken = default)
public Task ScheduleAsync(string jobName, string workflowDefinitionId, string activityId, string cronExpression, CancellationToken cancellationToken = default)
{
var data = CreateData(workflowDefinitionId, activityId, cronExpression);
ScheduleRecurringJob(data, cronExpression);
ScheduleRecurringJob(jobName, data, cronExpression);
return Task.CompletedTask;
}

Expand All @@ -68,12 +73,15 @@ public Task UnscheduleAllAsync(CancellationToken cancellationToken = default)
return Task.CompletedTask;
}

private void ScheduleJob(RunHangfireWorkflowDefinitionJobModel data, Instant instant) => _backgroundJobClient.Schedule<RunHangfireWorkflowDefinitionJob>(job => job.ExecuteAsync(data), instant.ToDateTimeOffset());
private void ScheduleJob(string jobName, RunHangfireWorkflowDefinitionJobModel data, Instant instant)
{
_backgroundJobClient.Schedule<RunHangfireWorkflowDefinitionJob>(job => job.ExecuteAsync(jobName, data), instant.ToDateTimeOffset());
}

private void ScheduleRecurringJob(RunHangfireWorkflowDefinitionJobModel data, string cronExpression)
private void ScheduleRecurringJob(string jobName, RunHangfireWorkflowDefinitionJobModel data, string cronExpression)
{
var identity = data.GetIdentity();
_recurringJobManager.AddOrUpdate<RunHangfireWorkflowDefinitionJob>(identity, job => job.ExecuteAsync(data), cronExpression);
_recurringJobManager.AddOrUpdate<RunHangfireWorkflowDefinitionJob>(identity, job => job.ExecuteAsync(jobName, data), cronExpression);
}

private void DeleteRecurringJob(RunHangfireWorkflowDefinitionJobModel data)
Expand Down Expand Up @@ -149,7 +157,7 @@ private void DeleteAllScheduledJobs()
DeleteScheduledJobs(jobIds);
}

private static RunHangfireWorkflowDefinitionJobModel GetJobModel(Job job) => (RunHangfireWorkflowDefinitionJobModel)job.Args[0];
private static RunHangfireWorkflowDefinitionJobModel GetJobModel(Job job) => (RunHangfireWorkflowDefinitionJobModel)job.Args[1];

private static RunHangfireWorkflowDefinitionJobModel CreateData(string workflowDefinitionId, string activityId, string? cronExpression = default) => new(workflowDefinitionId, activityId, cronExpression);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ private void DeleteAllRecurringJobs()
foreach (var job in recurringJobs)
_recurringJobManager.RemoveIfExists(job.Id);
}

private IEnumerable<RecurringJobDto> QueryRecurringJobs()
{
using var connection = _jobStorage.GetConnection();
return connection.GetRecurringJobs().Where(x => x.Job.Type == typeof(RunHangfireWorkflowInstanceJob));
return connection.GetRecurringJobs().Where(x => x.Job?.Type == typeof(RunHangfireWorkflowInstanceJob));
}

private void DeleteScheduledJob(RunHangfireWorkflowInstanceJobModel data)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
using Elsa.Activities.Temporal.Common.Services;
using Elsa.Activities.Temporal.Quartz.Jobs;
Expand Down Expand Up @@ -28,7 +28,7 @@ public QuartzWorkflowDefinitionScheduler(QuartzSchedulerProvider schedulerProvid
_logger = logger;
}

public async Task ScheduleAsync(string workflowDefinitionId, string activityId, Instant startAt, Duration? interval, CancellationToken cancellationToken = default)
public async Task ScheduleAsync(string jobName, string workflowDefinitionId, string activityId, Instant startAt, Duration? interval, CancellationToken cancellationToken = default)
{
var triggerBuilder = CreateTrigger(workflowDefinitionId, activityId).StartAt(startAt.ToDateTimeOffset());

Expand All @@ -39,7 +39,7 @@ public async Task ScheduleAsync(string workflowDefinitionId, string activityId,
await ScheduleJob(trigger, cancellationToken);
}

public async Task ScheduleAsync(string workflowDefinitionId, string activityId, string cronExpression, CancellationToken cancellationToken = default)
public async Task ScheduleAsync(string jobName, string workflowDefinitionId, string activityId, string cronExpression, CancellationToken cancellationToken = default)
{
var trigger = CreateTrigger(workflowDefinitionId, activityId).WithCronSchedule(cronExpression).Build();
await ScheduleJob(trigger, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Collections.Generic;
using System.Collections.Generic;
using System.Linq;
using Elsa.Models;
using Elsa.Services.Models;
Expand All @@ -9,6 +9,19 @@ public static class CompositeActivityBlueprintExtensions
{
public static IEnumerable<IActivityBlueprint> GetEndActivities(this ICompositeActivityBlueprint workflowBlueprint) => workflowBlueprint.Activities.Where(x => !workflowBlueprint.GetOutboundConnections(x.Id).Any());
public static IActivityBlueprint? GetActivity(this ICompositeActivityBlueprint workflowBlueprint, string id) => workflowBlueprint.Activities.FirstOrDefault(x => x.Id == id);
public static string? GetJobName(this ICompositeActivityBlueprint workflowBlueprint, string? activityId = null)
{
if (string.IsNullOrWhiteSpace(activityId) == false)
{
var activity = workflowBlueprint?.GetActivity(activityId);
if (activity is not null)
return $"{workflowBlueprint?.DisplayName ?? workflowBlueprint?.Name ?? workflowBlueprint?.Id}->{activity?.DisplayName ?? activity?.Name ?? activity?.Id}";
}
if (workflowBlueprint is not null)
return $"{workflowBlueprint?.DisplayName ?? workflowBlueprint?.Name ?? workflowBlueprint?.Id}";
return null;
}

public static IEnumerable<IActivityBlueprint> GetActivities(this ICompositeActivityBlueprint workflowBlueprint, IEnumerable<string> ids) => workflowBlueprint.Activities.Where(x => ids.Contains(x.Id));

public static IEnumerable<IActivityBlueprint> GetBlockingActivities(this ICompositeActivityBlueprint workflowBlueprint, WorkflowInstance workflowInstance) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@

<ItemGroup>
<PackageReference Include="Hangfire.InMemory" Version="0.5.1" />
<PackageReference Include="Hangfire.SqlServer" Version="1.8.4" />
<PackageReference Include="Hangfire.SqlServer" Version="1.8.5" />
<PackageReference Include="Quartz.Serialization.Json" Version="3.5.0" />
<PackageReference Include="Hangfire.SQLite" Version="1.4.2" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Hangfire.SqlServer" Version="1.8.4" />
<PackageReference Include="Hangfire.SqlServer" Version="1.8.5" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.0.0" />
<PackageReference Include="System.Text.Encodings.Web" Version="7.0.0" />
</ItemGroup>
Expand Down

0 comments on commit 173ae2a

Please sign in to comment.