Skip to content

Commit

Permalink
feat: Implement the IExecutionHistoryStore
Browse files Browse the repository at this point in the history
  • Loading branch information
JadynWong committed Jun 15, 2023
1 parent c6eb225 commit 45667f5
Show file tree
Hide file tree
Showing 17 changed files with 2,737 additions and 3 deletions.
Expand Up @@ -13,6 +13,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="SilkierQuartz.Plugins.RecentHistory" Version="5.0.356" />
<PackageReference Include="Volo.Abp.Emailing" Version="7.2.2" />
<PackageReference Include="Volo.Abp.Identity.Domain" Version="7.2.2" />
<PackageReference Include="Volo.Abp.PermissionManagement.Domain.Identity" Version="7.2.2" />
Expand Down
109 changes: 109 additions & 0 deletions src/Abp.SilkierQuartzDemo.Domain/Quartz/AbpExecutionHistoryPlugin.cs
@@ -0,0 +1,109 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Quartz;
using Quartz.Impl.Matchers;
using Quartz.Plugins.RecentHistory;
using Quartz.Spi;
using Volo.Abp;

namespace Abp.SilkierQuartzDemo.Quartz;

public class AbpExecutionHistoryPlugin : ISchedulerPlugin, IJobListener
{
private IScheduler _scheduler = null!;
private IExecutionHistoryStore _store = null!;

public Type StoreType { get; set; } = null!;

public string Name { get; protected set; } = string.Empty;

public Task Initialize(string pluginName, IScheduler scheduler, CancellationToken cancellationToken = default)
{
Name = pluginName;
_scheduler = scheduler;
_scheduler.ListenerManager.AddJobListener(this, EverythingMatcher<JobKey>.AllJobs());

return Task.FromResult(0);
}

public async Task Start(CancellationToken cancellationToken = default)
{
_store = _scheduler.Context.GetExecutionHistoryStore();

if (_store == null)
{
throw new AbpException(nameof(StoreType) + " is not set.");
}

_store.SchedulerName = _scheduler.SchedulerName;

if (_store is AbpExecutionHistoryStore abpStore)
await abpStore.InitializeSummaryAsync();

await _store.Purge();
}

public Task Shutdown(CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}

public async Task JobToBeExecuted(IJobExecutionContext context, CancellationToken cancellationToken = default)
{
var entry = new ExecutionHistoryEntry()
{
FireInstanceId = context.FireInstanceId,
SchedulerInstanceId = context.Scheduler.SchedulerInstanceId,
SchedulerName = context.Scheduler.SchedulerName,
ActualFireTimeUtc = context.FireTimeUtc.UtcDateTime,
ScheduledFireTimeUtc = context.ScheduledFireTimeUtc?.UtcDateTime,
Recovering = context.Recovering,
Job = context.JobDetail.Key.ToString(),
Trigger = context.Trigger.Key.ToString(),
};
await _store.Save(entry);
}

public async Task JobWasExecuted(IJobExecutionContext context, JobExecutionException? jobException, CancellationToken cancellationToken = default)
{
var entry = await _store.Get(context.FireInstanceId);
if (entry != null)
{
entry.FinishedTimeUtc = DateTime.UtcNow;
entry.ExceptionMessage = jobException?.GetBaseException()?.ToString();
}
else
{
entry = new ExecutionHistoryEntry()
{
FireInstanceId = context.FireInstanceId,
SchedulerInstanceId = context.Scheduler.SchedulerInstanceId,
SchedulerName = context.Scheduler.SchedulerName,
ActualFireTimeUtc = context.FireTimeUtc.UtcDateTime,
ScheduledFireTimeUtc = context.ScheduledFireTimeUtc?.UtcDateTime,
Recovering = context.Recovering,
Job = context.JobDetail.Key.ToString(),
Trigger = context.Trigger.Key.ToString(),
FinishedTimeUtc = DateTime.UtcNow,
ExceptionMessage = jobException?.GetBaseException()?.ToString()
};
}
await _store.Save(entry);

if (jobException == null)
await _store.IncrementTotalJobsExecuted();
else
await _store.IncrementTotalJobsFailed();
}

public async Task JobExecutionVetoed(IJobExecutionContext context, CancellationToken cancellationToken = default)
{
var entry = await _store.Get(context.FireInstanceId);
if (entry != null)
{
entry.Vetoed = true;
await _store.Save(entry);
}
}
}
153 changes: 153 additions & 0 deletions src/Abp.SilkierQuartzDemo.Domain/Quartz/AbpExecutionHistoryStore.cs
@@ -0,0 +1,153 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Quartz.Plugins.RecentHistory;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Guids;
using Volo.Abp.Uow;

namespace Abp.SilkierQuartzDemo.Quartz;

public class AbpExecutionHistoryStore : IExecutionHistoryStore, ISingletonDependency
{
public string SchedulerName { get; set; } = null!;

protected IServiceScopeFactory ServiceScopeFactory = null!;

public ILogger<AbpExecutionHistoryStore> Logger { get; set; }

public AbpExecutionHistoryStore()
{
Logger = NullLogger<AbpExecutionHistoryStore>.Instance;
}

public AbpExecutionHistoryStore(
IServiceScopeFactory serviceScopeFactory) : this()
{
ServiceScopeFactory = serviceScopeFactory;
}

public async Task<ExecutionHistoryEntry?> Get(string fireInstanceId)
{
using var scope = ServiceScopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IQuartzExecutionHistoryRepository>();

var quartzJobHistory = await repository.FindByFireInstanceIdAsync(fireInstanceId);
if (quartzJobHistory == null)
{
return null;
}
return quartzJobHistory.ToEntry();
}

public async Task Purge()
{
using var scope = ServiceScopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IQuartzExecutionHistoryRepository>();

await repository.PurgeAsync();
}

public async Task Save(ExecutionHistoryEntry entry)
{
using var scope = ServiceScopeFactory.CreateScope();
var guidGenerator = scope.ServiceProvider.GetRequiredService<IGuidGenerator>();
var repository = scope.ServiceProvider.GetRequiredService<IQuartzExecutionHistoryRepository>();

var quartzJobHistory = await repository.FindByFireInstanceIdAsync(entry.FireInstanceId);
if (quartzJobHistory == null)
{
quartzJobHistory = entry.ToEntity(new QuartzExecutionHistory(guidGenerator.Create(), entry.FireInstanceId));
await repository.InsertAsync(quartzJobHistory);
}
else
{
quartzJobHistory = entry.ToEntity(quartzJobHistory);
await repository.UpdateAsync(quartzJobHistory);
}
}

public async Task<IEnumerable<ExecutionHistoryEntry>> FilterLastOfEveryJob(int limitPerJob)
{
using var scope = ServiceScopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IQuartzExecutionHistoryRepository>();

var quartzJobHistories = await repository.GetLastOfEveryJobAsync(SchedulerName, limitPerJob);

return quartzJobHistories.Select(x => x.ToEntry());
}

public async Task<IEnumerable<ExecutionHistoryEntry>> FilterLastOfEveryTrigger(int limitPerTrigger)
{
using var scope = ServiceScopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IQuartzExecutionHistoryRepository>();

var quartzJobHistories = await repository.GetLastOfEveryTriggerAsync(SchedulerName, limitPerTrigger);

return quartzJobHistories.Select(x => x.ToEntry());
}

public async Task<IEnumerable<ExecutionHistoryEntry>> FilterLast(int limit)
{
using var scope = ServiceScopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IQuartzExecutionHistoryRepository>();

var quartzJobHistories = await repository.GetLastAsync(SchedulerName, limit);

return quartzJobHistories.Select(x => x.ToEntry());
}

public async Task<int> GetTotalJobsExecuted()
{
using var scope = ServiceScopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IQuartzJobSummaryRepository>();

return await repository.GetTotalJobsExecutedAsync(SchedulerName);
}

public async Task<int> GetTotalJobsFailed()
{
using var scope = ServiceScopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IQuartzJobSummaryRepository>();

return await repository.GetTotalJobsFailedAsync(SchedulerName);
}

public async Task IncrementTotalJobsExecuted()
{
using var scope = ServiceScopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IQuartzJobSummaryRepository>();

await repository.IncrementTotalJobsExecutedAsync(SchedulerName);
}

public async Task IncrementTotalJobsFailed()
{
using var scope = ServiceScopeFactory.CreateScope();
var repository = scope.ServiceProvider.GetRequiredService<IQuartzJobSummaryRepository>();

await repository.IncrementTotalJobsFailedAsync(SchedulerName);
}

public virtual async Task InitializeSummaryAsync()
{
using var scope = ServiceScopeFactory.CreateScope();
var unitOfWorkManager = scope.ServiceProvider.GetRequiredService<IUnitOfWorkManager>();
var repository = scope.ServiceProvider.GetRequiredService<IQuartzJobSummaryRepository>();
var guidGenerator = scope.ServiceProvider.GetRequiredService<IGuidGenerator>();

using var uow = unitOfWorkManager.Begin(true);

var quartzJobSummary = await repository.FindBySchedulerNameAsync(SchedulerName);
if (quartzJobSummary == null)
{
quartzJobSummary = new QuartzJobSummary(guidGenerator.Create(), SchedulerName);
await repository.InsertAsync(quartzJobSummary);
}

await uow.CompleteAsync();
}
}
@@ -0,0 +1,28 @@
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Volo.Abp.BackgroundWorkers;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Threading;

namespace Abp.SilkierQuartzDemo.Quartz;

public class ExecutionHistoryCleanBackgroundWorker : AsyncPeriodicBackgroundWorkerBase, ITransientDependency
{
public ExecutionHistoryCleanBackgroundWorker(AbpAsyncTimer timer, IServiceScopeFactory serviceScopeFactory)
: base(timer, serviceScopeFactory)
{
Timer.Period = 1000 * 60; //1 minutes
}

protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext workerContext)
{
Logger.LogDebug("ExecutionHistoryCleanBackgroundWorker executing");

await LazyServiceProvider
.LazyGetRequiredService<AbpExecutionHistoryStore>()
.Purge();

Logger.LogDebug("ExecutionHistoryCleanBackgroundWorker executed");
}
}
@@ -0,0 +1,24 @@
using System;
using System.Threading.Tasks;
using System.Threading;
using Volo.Abp.Domain.Repositories;
using System.Collections.Generic;

namespace Abp.SilkierQuartzDemo.Quartz;

public interface IQuartzExecutionHistoryRepository : IBasicRepository<QuartzExecutionHistory, Guid>
{
Task<QuartzExecutionHistory?> FindByFireInstanceIdAsync(string fireInstanceId, CancellationToken cancellationToken = default);

Task<List<QuartzExecutionHistory>> GetLastOfEveryJobAsync(string schedulerName, int limitPerJob, CancellationToken cancellationToken = default);

Task<List<QuartzExecutionHistory>> GetLastOfEveryTriggerAsync(
string schedulerName,
int limitPerTrigger,
int skipPerTrigger = 0,
CancellationToken cancellationToken = default);

Task<List<QuartzExecutionHistory>> GetLastAsync(string schedulerName, int limit, CancellationToken cancellationToken = default);

Task PurgeAsync(CancellationToken cancellationToken = default);
}
@@ -0,0 +1,19 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.Domain.Repositories;

namespace Abp.SilkierQuartzDemo.Quartz;

public interface IQuartzJobSummaryRepository : IBasicRepository<QuartzJobSummary, Guid>
{
Task<QuartzJobSummary?> FindBySchedulerNameAsync(string schedulerName, CancellationToken cancellationToken = default);

Task<int> GetTotalJobsExecutedAsync(string schedulerName, CancellationToken cancellationToken = default);

Task<int> GetTotalJobsFailedAsync(string schedulerName, CancellationToken cancellationToken = default);

Task IncrementTotalJobsExecutedAsync(string schedulerName, CancellationToken cancellationToken = default);

Task IncrementTotalJobsFailedAsync(string schedulerName, CancellationToken cancellationToken = default);
}
39 changes: 39 additions & 0 deletions src/Abp.SilkierQuartzDemo.Domain/Quartz/QuartzExecutionHistory.cs
@@ -0,0 +1,39 @@
using System;
using Volo.Abp.Domain.Entities;

namespace Abp.SilkierQuartzDemo.Quartz;

public class QuartzExecutionHistory : BasicAggregateRoot<Guid>
{
public string FireInstanceId { get; protected set; } = null!;

public string SchedulerInstanceId { get; set; } = null!;

public string SchedulerName { get; set; } = null!;

public string? Job { get; set; }

public string? Trigger { get; set; }

public DateTime? ScheduledFireTimeUtc { get; set; }

public DateTime ActualFireTimeUtc { get; set; }

public bool Recovering { get; set; }

public bool Vetoed { get; set; }

public DateTime? FinishedTimeUtc { get; set; }

public string? ExceptionMessage { get; set; }

protected QuartzExecutionHistory()
{

}

public QuartzExecutionHistory(Guid id, string fireInstanceId) : base(id)
{
FireInstanceId = fireInstanceId;
}
}

0 comments on commit 45667f5

Please sign in to comment.