-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added background worker hosted service
- Loading branch information
1 parent
8e6f3e2
commit c280adc
Showing
30 changed files
with
1,461 additions
and
38 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
using System; | ||
using System.Collections.Concurrent; | ||
|
||
namespace FlightRecorder.BusinessLogic.Logic | ||
{ | ||
public class BackgroundQueue<T> : IBackgroundQueue<T> where T : class | ||
{ | ||
private readonly ConcurrentQueue<T> _queue = new(); | ||
|
||
/// <summary> | ||
/// Add a new item to the concurrent queue | ||
/// </summary> | ||
/// <param name="item"></param> | ||
/// <exception cref="ArgumentNullException"></exception> | ||
public void Enqueue(T item) | ||
{ | ||
if (item != null) | ||
{ | ||
_queue.Enqueue(item); | ||
} | ||
else | ||
{ | ||
throw new ArgumentNullException(nameof(item)); | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// De-queue an item | ||
/// </summary> | ||
/// <returns></returns> | ||
public T Dequeue() | ||
{ | ||
var successful = _queue.TryDequeue(out T item); | ||
return successful ? item : null; | ||
} | ||
} | ||
} |
84 changes: 84 additions & 0 deletions
84
src/FlightRecorder.BusinessLogic/Logic/BackgroundQueueProcessor.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
using FlightRecorder.BusinessLogic.Factory; | ||
using FlightRecorder.Entities.DataExchange; | ||
using Microsoft.Extensions.DependencyInjection; | ||
using Microsoft.Extensions.Hosting; | ||
using System; | ||
using System.Diagnostics.CodeAnalysis; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
namespace FlightRecorder.BusinessLogic.Logic | ||
{ | ||
[ExcludeFromCodeCoverage] | ||
public abstract class BackgroundQueueProcessor<T> : BackgroundService where T : BackgroundWorkItem | ||
{ | ||
private const int ProcessingLoopDelay = 500; | ||
|
||
private readonly IBackgroundQueue<T> _queue; | ||
|
||
protected IServiceScopeFactory ServiceScopeFactory { get; private set; } | ||
|
||
protected BackgroundQueueProcessor(IBackgroundQueue<T> queue, IServiceScopeFactory serviceScopeFactory) | ||
{ | ||
_queue = queue; | ||
ServiceScopeFactory = serviceScopeFactory; | ||
} | ||
|
||
/// <summary> | ||
/// Method called to process the work placed into the queue | ||
/// </summary> | ||
/// <param name="token"></param> | ||
/// <returns></returns> | ||
protected override async Task ExecuteAsync(CancellationToken token) | ||
Check warning on line 32 in src/FlightRecorder.BusinessLogic/Logic/BackgroundQueueProcessor.cs
|
||
{ | ||
while (token.IsCancellationRequested) | ||
{ | ||
// Wait, so there's not a busy loop, then get the next work item from the queue | ||
await Task.Delay(ProcessingLoopDelay, token); | ||
var item = _queue.Dequeue(); | ||
|
||
// Item may be null if there's nothing in the queue or there's a de-queuing error, so | ||
// check it's valid | ||
if (item != null) | ||
{ | ||
// Create a dependency resolution scope | ||
using (var scope = ServiceScopeFactory.CreateScope()) | ||
{ | ||
// Get a scoped instance of the flight recorder factory | ||
var factory = scope.ServiceProvider.GetService<FlightRecorderFactory>(); | ||
|
||
// Create the job status record | ||
var status = await factory.JobStatuses.AddAsync(item.JobName, null); | ||
|
||
try | ||
{ | ||
// Process the work item and, if successful, complete the job status record with | ||
// no error | ||
await ProcessWorkItem(item, factory); | ||
await factory.JobStatuses.UpdateAsync(status.Id, null); | ||
} | ||
catch (Exception ex) | ||
{ | ||
// Got an error during processing, so complete the job status record with the | ||
// exception details | ||
await factory.JobStatuses.UpdateAsync(status.Id, ex.ToString()); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// <summary> | ||
/// Process the specified work item from the queue | ||
/// </summary> | ||
/// <param name="item"></param> | ||
#pragma warning disable CS1998 | ||
protected virtual async Task ProcessWorkItem(T item, FlightRecorderFactory factory) | ||
{ | ||
// Ideally, this would be an abstract method with no body but that's not possible with | ||
// async methods so it's declared with an empty body in the expectation that the child | ||
// classes will override it | ||
} | ||
#pragma warning restore CS1998 | ||
} | ||
} |
98 changes: 98 additions & 0 deletions
98
src/FlightRecorder.BusinessLogic/Logic/JobStatusManager.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
using FlightRecorder.BusinessLogic.Extensions; | ||
using FlightRecorder.Data; | ||
using FlightRecorder.Entities.Db; | ||
using FlightRecorder.Entities.Interfaces; | ||
using Microsoft.EntityFrameworkCore; | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Linq.Expressions; | ||
using System.Threading.Tasks; | ||
|
||
namespace FlightRecorder.BusinessLogic.Logic | ||
{ | ||
internal class JobStatusManager : IJobStatusManager | ||
{ | ||
private readonly FlightRecorderDbContext _context; | ||
|
||
internal JobStatusManager(FlightRecorderDbContext context) | ||
{ | ||
_context = context; | ||
} | ||
|
||
/// <summary> | ||
/// Get the first job status matching the specified criteria | ||
/// </summary> | ||
/// <param name="predicate"></param> | ||
/// <returns></returns> | ||
public async Task<JobStatus> GetAsync(Expression<Func<JobStatus, bool>> predicate) | ||
{ | ||
List<JobStatus> statuses = await ListAsync(predicate, 1, 1).ToListAsync(); | ||
return statuses.FirstOrDefault(); | ||
} | ||
|
||
/// <summary> | ||
/// Return all entities matching the specified criteria | ||
/// </summary> | ||
/// <param name="predicate"></param> | ||
/// <param name="pageNumber"></param> | ||
/// <param name="pageSize"></param> | ||
/// <returns></returns> | ||
public IAsyncEnumerable<JobStatus> ListAsync(Expression<Func<JobStatus, bool>> predicate, int pageNumber, int pageSize) | ||
{ | ||
IAsyncEnumerable<JobStatus> results; | ||
if (predicate == null) | ||
{ | ||
results = _context.JobStatuses | ||
.Skip((pageNumber - 1) * pageSize) | ||
.Take(pageSize) | ||
.AsAsyncEnumerable(); | ||
} | ||
else | ||
{ | ||
results = _context.JobStatuses.Where(predicate).AsAsyncEnumerable(); | ||
} | ||
|
||
return results; | ||
} | ||
|
||
/// <summary> | ||
/// Create a new job status | ||
/// </summary> | ||
/// <param name="name"></param> | ||
/// <returns></returns> | ||
public async Task<JobStatus> AddAsync(string name, string parameters) | ||
{ | ||
JobStatus status = new JobStatus | ||
{ | ||
Name = name.CleanString(), | ||
Parameters = parameters.CleanString(), | ||
Start = DateTime.Now | ||
}; | ||
|
||
await _context.JobStatuses.AddAsync(status); | ||
await _context.SaveChangesAsync(); | ||
|
||
return status; | ||
} | ||
|
||
/// <summary> | ||
/// Update a job status, setting the end timestamp and error result | ||
/// </summary> | ||
/// <param name="id"></param> | ||
/// <param name="error"></param> | ||
/// <returns></returns> | ||
public async Task<JobStatus> UpdateAsync(long id, string error) | ||
{ | ||
JobStatus status = await GetAsync(x => x.Id == id); | ||
if (status != null) | ||
{ | ||
status.End = DateTime.Now; | ||
status.Error = error.CleanString(); | ||
await _context.SaveChangesAsync(); | ||
} | ||
|
||
return status; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.