Skip to content

Commit

Permalink
WIP: Sets up backgroundtask infra #79
Browse files Browse the repository at this point in the history
  • Loading branch information
Zack Schwartz committed Apr 29, 2023
1 parent f2a55ac commit 6d5cf14
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Raytha.Application.Common.Interfaces;
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);

ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using MediatR;
using Raytha.Application.Common.Models;
using CSharpVitamins;
using Raytha.Application.Common.Interfaces;

namespace Raytha.Application.ContentItems.Commands;

public class BeginExportContentItemsToCsv
{
public record Command : LoggableRequest<CommandResponseDto<ShortGuid>>
{

}

public class Handler : IRequestHandler<Command, CommandResponseDto<ShortGuid>>
{
private readonly IRaythaDbContext _db;
private readonly IBackgroundTaskQueue _taskQueue;
public Handler(IRaythaDbContext db, IBackgroundTaskQueue taskQueue)
{
_db = db;
_taskQueue = taskQueue;
}
public async Task<CommandResponseDto<ShortGuid>> Handle(Command request, CancellationToken cancellationToken)
{
var backgroundJobId = Guid.NewGuid();

await _taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItem);

await _db.SaveChangesAsync(cancellationToken);

return new CommandResponseDto<ShortGuid>(backgroundJobId);
}

private async ValueTask BuildWorkItem(CancellationToken token)
{
// Simulate three 5-second tasks to complete
// for each enqueued work item

int delayLoop = 0;
var guid = Guid.NewGuid().ToString();

while (!token.IsCancellationRequested && delayLoop < 3)
{
try
{
var contentItem = _db.ContentItems.First();
Thread.Sleep(10000);
await Task.Delay(TimeSpan.FromSeconds(5), token);
}
catch (OperationCanceledException)
{
// Prevent throwing if the Delay is cancelled
}

delayLoop++;
}
}
}
}
41 changes: 41 additions & 0 deletions src/Raytha.Infrastructure/BackgroundTasks/BackgroundTaskQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Raytha.Application.Common.Interfaces;
using System.Threading.Channels;

namespace Raytha.Infrastructure.BackgroundTasks;
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

public BackgroundTaskQueue()
{
// Capacity should be set based on the expected application load and
// number of concurrent threads accessing the queue.
// BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task,
// which completes only when space became available. This leads to backpressure,
// in case too many publishers/calls start accumulating.
var options = new BoundedChannelOptions(20)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}

public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}

await _queue.Writer.WriteAsync(workItem);
}

public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);

return workItem;
}
}
55 changes: 55 additions & 0 deletions src/Raytha.Infrastructure/BackgroundTasks/QueuedHostedService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Raytha.Application.Common.Interfaces;

namespace Raytha.Infrastructure.BackgroundTasks;

public class QueuedHostedService : BackgroundService
{
private readonly ILogger<QueuedHostedService> _logger;

public QueuedHostedService(IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger)
{
TaskQueue = taskQueue;
_logger = logger;
}

public IBackgroundTaskQueue TaskQueue { get; }

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
$"Queued Hosted Service is running.{Environment.NewLine}" +
$"{Environment.NewLine}Tap W to add a work item to the " +
$"background queue.{Environment.NewLine}");

await BackgroundProcessing(stoppingToken);
}

private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem =
await TaskQueue.DequeueAsync(stoppingToken);

try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}

public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Queued Hosted Service is stopping.");

await base.StopAsync(stoppingToken);
}
}
4 changes: 4 additions & 0 deletions src/Raytha.Infrastructure/ConfigureServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Raytha.Infrastructure.JsonQueryEngine;
using Raytha.Infrastructure.FileStorage;
using Raytha.Application.Common.Utils;
using Raytha.Infrastructure.BackgroundTasks;

namespace Microsoft.Extensions.DependencyInjection;

Expand Down Expand Up @@ -51,6 +52,9 @@ public static IServiceCollection AddInfrastructureServices(this IServiceCollecti
throw new NotImplementedException($"Unsupported file storage provider: {fileStorageProvider}");
}

services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();

return services;
}
}
8 changes: 8 additions & 0 deletions src/Raytha.Web/Areas/Admin/Controllers/DashboardController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Microsoft.AspNetCore.Mvc.Filters;
using Microsoft.Extensions.Configuration;
using Raytha.Application.Common.Security;
using Raytha.Application.ContentItems.Commands;
using Raytha.Application.Dashboard.Queries;
using Raytha.Domain.Entities;
using Raytha.Web.Areas.Admin.Views.Dashboard;
Expand Down Expand Up @@ -37,6 +38,13 @@ public async Task<IActionResult> Index()
return View(viewModel);
}

[Route($"{RAYTHA_ROUTE_PREFIX}/testbackgroundjob", Name = "testbackgroundjob")]
public async Task<IActionResult> TestBackgroundJob()
{
var response = await Mediator.Send(new BeginExportContentItemsToCsv.Command());
return Content("Ok");
}

public override void OnActionExecuted(ActionExecutedContext context)
{
base.OnActionExecuted(context);
Expand Down

0 comments on commit 6d5cf14

Please sign in to comment.