Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement QueueBackgroundWorkItem #805

Closed
davidfowl opened this issue Nov 22, 2017 · 47 comments
Closed

Implement QueueBackgroundWorkItem #805

davidfowl opened this issue Nov 22, 2017 · 47 comments
Milestone

Comments

@davidfowl
Copy link
Member

Gives user code the ability to schedule a background task that's aware of shutdown semantics. Here's the original blog post of that feature in System.Web.

https://blogs.msdn.microsoft.com/webdev/2014/06/04/queuebackgroundworkitem-to-reliably-schedule-and-run-background-processes-in-asp-net/

@Tratcher wrote a prototype of it here dotnet/AspNetCore.Docs#3352 (comment).

A few initial thoughts on the version that should exist here:

  • It shouldn't be tied to anything Web specific, this is a generic host feature.
  • We should think about the fact that people will want to use services in these background operations. That might mean that we need to provide a way to active a "Task" so it's not just a func. e.g. QueueBackgroundWorkItem(arg1, arg2), etc.

/cc @glennc

@muratg
Copy link

muratg commented Jan 19, 2018

Tentatively putting this in 2.2.

@NickCraver
Copy link
Member

Can we please consider this in 2.2? We're working on migrating now, and this is the primary way some of our polling engines for monitoring applications work with System.Web today. If this isn't landing, what is the recommended alternative? Is there one in-box?

@Tratcher
Copy link
Member

Have you tried the prototype? dotnet/AspNetCore.Docs#3352 (comment)

@NickCraver
Copy link
Member

@Tratcher I haven't yet, thanks! I'll see about moving one of our major apps onto ASP.NET Core and giving it ago soon as time allows me to complete it and post feedback here. It's used a lot so should give us some good data there. I'll report back!

@muratg
Copy link

muratg commented Jun 15, 2018

@DamianEdwards @glennc for their thoughts.

@vegardlarsen
Copy link

vegardlarsen commented Jun 25, 2018

I have a suggestion as to how to get services properly injected into the background tasks. My suggestion adds a bit of implementation complexity, but still allows the same easy usage as the current prototype does. It is based directly off of the work @Tratcher did, and caused directly by needing to get proper dependencies into the background worker (as @davidfowl mentioned).

The core of my suggestion is to split the background task into two separate parts: the work order (the request, which are basically the parameters) and the worker (which gets dependency injected). The work order can be serializable, which means it can be put into an external queuing service (this does not apply for the task example).

public interface IBackgroundWorkOrder { }

public interface IBackgroundWorkOrder<TWorkOrder, TWorker> : IBackgroundWorkOrder
    where TWorker : IBackgroundWorker<TWorkOrder, TWorker>
    where TWorkOrder : IBackgroundWorkOrder<TWorkOrder, TWorker>
{
}

public interface IBackgroundWorker { }

public interface IBackgroundWorker<TWorkOrder, TWorker> : IBackgroundWorker
    where TWorker : IBackgroundWorker<TWorkOrder, TWorker>
    where TWorkOrder : IBackgroundWorkOrder<TWorkOrder, TWorker>
{
    Task DoWork(TWorkOrder order, CancellationToken cancellationToken);
}

public interface IBackgroundTaskQueue
{
    void Queue<TWorkOrder, TWorker>(IBackgroundWorkOrder<TWorkOrder, TWorker> order)
        where TWorker : IBackgroundWorker<TWorkOrder, TWorker>
        where TWorkOrder : IBackgroundWorkOrder<TWorkOrder, TWorker>;

    Task<IBackgroundWorkOrder> DequeueAsync(CancellationToken cancellationToken);
}

These interfaces:

  • Does go a bit crazy on the generics, but in all instances I've come up with, the generics are automatically resolved when following the basic pattern.
  • Lets us pull out the worker Type from the WorkOrder instance using reflection.
  • Lets the Worker get a properly typed WorkOrder when doing the work.
  • DequeueAsync returns the marker interface IBackgroundWorkOrder, and can use reflection to find out which worker and work order to use.

Using these interfaces we can implement the same functionality for async functions:

public static class BackgroundWorkItem
{
    public static void QueueBackgroundWorkItem(
        this IBackgroundTaskQueue queue,
        Func<CancellationToken, Task> method)
    {
        queue.Queue(new WorkOrder(method));
    }

    public class WorkOrder : IBackgroundWorkOrder<WorkOrder, Worker>
    {
        public WorkOrder(Func<CancellationToken, Task> method)
        {
            this.Method = method;
        }

        public Func<CancellationToken, Task> Method { get; }
    }

    public class Worker : IBackgroundWorker<WorkOrder, Worker>
    {
        public async Task DoWork(WorkOrder order, CancellationToken cancellationToken)
        {
            await order.Method.Invoke(cancellationToken);
        }
    }
}

The extension method QueueBackgroundWorkItem matches the signature provided earlier, so usage should be identical to before.

The actual implementations:

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly ConcurrentQueue<IBackgroundWorkOrder> _workOrders =
        new ConcurrentQueue<IBackgroundWorkOrder>();

    private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);

    public void Queue<TWorkOrder, TWorker>(IBackgroundWorkOrder<TWorkOrder, TWorker> order)
        where TWorker : IBackgroundWorker<TWorkOrder, TWorker>
        where TWorkOrder : IBackgroundWorkOrder<TWorkOrder, TWorker>
    {
        if (order == null)
        {
            throw new ArgumentNullException(nameof(order));
        }

        this._workOrders.Enqueue(order);
        this._signal.Release();
    }

    public async Task<IBackgroundWorkOrder> DequeueAsync(CancellationToken cancellationToken)
    {
        await this._signal.WaitAsync(cancellationToken);
        this._workOrders.TryDequeue(out var workItem);

        return workItem;
    }
}
public class QueuedHostedService : IHostedService
{
    private readonly IServiceProvider _services;

    private readonly CancellationTokenSource _shutdown = new CancellationTokenSource();
    private readonly ILogger _logger;
    private Task _backgroundTask;
    
    public QueuedHostedService(
        IServiceProvider services,
        IBackgroundTaskQueue taskQueue,
        ILoggerFactory loggerFactory)
    {
        this._services = services;
        this.TaskQueue = taskQueue;
        this._logger = loggerFactory.CreateLogger<QueuedHostedService>();
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        this._logger.LogInformation("Queued Hosted Service is starting.");

        this._backgroundTask = Task.Run(this.BackgroundProceessing);

        return Task.CompletedTask;
    }

    private async Task BackgroundProceessing()
    {
        while (!this._shutdown.IsCancellationRequested)
        {
            var workOrder = await this.TaskQueue.DequeueAsync(this._shutdown.Token);

            try
            {
                using (var scope = this._services.CreateScope())
                {
                    var workerType = workOrder
                        .GetType()
                        .GetInterfaces()
                        .First(t => t.IsConstructedGenericType && t.GetGenericTypeDefinition() == typeof(IBackgroundWorkOrder<,>))
                        .GetGenericArguments()
                        .Last();

                    var worker = scope.ServiceProvider
                        .GetRequiredService(workerType);

                    var task = (Task)workerType
                        .GetMethod("DoWork")
                        .Invoke(worker, new object[] { workOrder, this._shutdown.Token });
                    await task;
                }
            }
            catch (Exception ex)
            {
                this._logger.LogError(ex,
                    $"Error occurred executing {nameof(workOrder)}.");
            }
        }
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        this._logger.LogInformation("Queued Hosted Service is stopping.");

        this._shutdown.Cancel();

        return Task.WhenAny(
            this._backgroundTask,
            Task.Delay(Timeout.Infinite, cancellationToken));
    }
}

In summary:

  • easier code-sharing of background work
  • proper dependency injection
  • allows using external message bus for the queue for many types of background work (excluding Func<Task>, not implemented)
  • implementation adds complexity, but

The only real downside I see to this approach is that the worker classes has to be registered with the DI container.

@rekosko
Copy link

rekosko commented Aug 2, 2018

Hi, I'm currently having some serious issues on how to properly register those worker classes... Could you provide an example?

@vegardlarsen
Copy link

@rekosko You should only need to register the worker class as itself:

services.AddScoped<BackgroundWorkItem.Worker>();

@rekosko
Copy link

rekosko commented Aug 3, 2018

First of all thank you for you reply. I'm using autofac so I've registered it with containerBuilder.RegisterType<BackgroundWorkItem.Worker>().InstancePerLifetimeScope(); - that worked. I didn't get any exception with Worker not being registered but right now some other issue appeared.

The app is throwing at me System.ObjectDisposedException when I'm trying to invoke my service method which tries to query the db. This didn't happen before when I was manually creating scope and getting the correct service implementation.

Any ideas why it would not work? I'm not sure if this code is trying to .GetRequiredService correctly.

@Lissi4kin
Copy link

I've the same problem with autofac:

System.ObjectDisposedException: Cannot access a disposed object. A common cause of this error is disposing a context that was resolved from dependency injection and then later trying to use the same context instance elsewhere in your application. This may occur if you are calling Dispose() on the context, or wrapping the context in a using statement. If you are using dependency injection, you should let the dependency injection container take care of disposing context instances.

@vegardlarsen
Copy link

vegardlarsen commented Aug 5, 2018

@rekosko @Lissi4kin I think your problem is that you are trying to use your DbContext from a QueueBackgroundWorkItem task?

What is happening is that you are queuing something to happen after the HTTP channel ends. When the HTTP channel ends, the current scope is disposed, and with it goes all the services that were in that scope (e.g. your DbContext). So when the time comes to execute your background task, the services you are using are already gone.

If the above explains your situation, this is the exact reason for why I introduced/proposed this change; you will need to implement your own WorkOrder/Worker pair, and get proper dependency injection there.

So, say you want to send an event that recalculates some expensive property for a user after a change has happened. This is how it would be done:

public static class UpdateUserReputationBackgroundWork
{
    public static void QueueUpdateUserReputation(this IBackgroundTaskQueue queue, string userId)
    {
        queue.Queue(new Order(userId));
    }

    public class Order : IBackgroundWorkOrder<Order, Worker>
    {
        // This is just the data to pass to the background thread. 
        // No services in here, just a simple data object.
        public Order(string userId)
        {
            this.UserId = userId;
        }

        public string UserId { get; }
    }

    public class Worker : IBackgroundWorker<Order, Worker>
    {
        private readonly DatabaseContext _context;

        public Worker(DatabaseContext context)
        {
            // This is where you put your dependencies, services, etc.
            this._context = context;
        }

        public async Task DoWork(Order order, CancellationToken cancellationToken)
        {
            // This query is not meant to be representative of good code at all, 
            // but just an example of the work to be done, and how we use 
            // dependencies that were injected into the worker.
            var user = await this._context.Users.SingleAsync(u => u.Id == order.UserId);
            user.Reputation = await user.Posts.SumAsync(p => p.Reputation);
            await this._context.SaveChangesAsync();
        }
    }
}

You would register this worker as you did the other worker, and then call backgroundTaskQueue.QueueUpdateUserReputation("vegardlarsen");

You would implement this kind of pattern for any background task that requires using a service that is registered per scope. Any services that are registered per container lifetime should be fine to use with QueueBackgroundWorkItem.

Does that help?

@Lissi4kin
Copy link

@vegardlarsen yes, it's helped me. Thank you!

@aspnet-hello aspnet-hello transferred this issue from aspnet/Hosting Dec 18, 2018
@aspnet-hello aspnet-hello added this to the 3.0.0 milestone Dec 18, 2018
@Eilon Eilon removed the Extensions label Dec 18, 2018
@muratg
Copy link

muratg commented Jan 8, 2019

@davidfowl Are you able to take this for 3.0?

@mixandmatch025
Copy link

Is the plan only to support a single background task at a given moment?

@muratg
Copy link

muratg commented Jan 22, 2019

ping @davidfowl

@davidfowl
Copy link
Member Author

No if we do this, the only point is to give background tasks a grace period to executing when the host is shutting down. Maybe we’d alsp support a model where we activated a class per invocation if the background task so that it is easy to inject scoped services.

@glennc maybe we lump this into the background worker work?

@mixandmatch025
Copy link

@davidfowl not sure if I understood. If I queue multiple tasks, they'll run concurrently?

@davidfowl
Copy link
Member Author

@mixandmatch025 yes.

@omatrot
Copy link

omatrot commented Jan 24, 2019

@vegardlarsen I'm trying to use your code. How do you inject the backgroundTaskQueue in a SignalR hub for instance? Registering the Worker does not seem to do the trick. I have this error:

System.InvalidOperationException: Unable to resolve service for type 'IBackgroundTaskQueue' while attempting to activate 'WebAdmin.SignalR.ServerHub'.

Thanks in advance.

@vegardlarsen
Copy link

@omatrot Here is how I register all of the things I use this for currently:

services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
services.AddHostedService<QueuedHostedService>();
services.AddScoped<CustomerIoEventBackgroundWork.Worker>();
services.AddScoped<CustomerIoBroadcastBackgroundWork.Worker>();

@sravanthim
Copy link

sravanthim commented Feb 26, 2019

@vegardlarsen I am trying to implement a queue to hold api calls when the backend db is down and process these queued api calls when db becomes available again. I am trying to implement your solution using worker and workorder and am running into "Cant access disposed object" exception. However, your solution that you gave on Aug 5,2018 will not work for me. I am trying add the task to the queue, and this task is coming from a microservice's controller method(like a http post). Can you explain that answer a bit more please. I was wondering if i should create a worker/workorder for each http post call that I need to queue. Thank you.

@vegardlarsen
Copy link

@sravanthim The problem has to do with your IoC container's scopes. A scope can be described as the "context something is being done in". So when you are working in a controller action, you could say that the scope is the HTTP request, and that any services that are transient to that scope will be disposed when the scope ends (i.e. the HTTP request). This also means that a service being used from the wrong scope is at risk of having been disposed when you try to use it from the wrong scope.

For the background work implementation, we create a separate scope for each individual order being done, and we create a new worker for each order that comes through. It is therefore important that the worker implementation only accepts its dependencies through constructor parameters. It will then have its dependencies created by the IoC container with the correct scope.

To figure out your specific problem though, you will have to find out which of your objects is being accesses after it was disposed (it should be in the stack trace of the exception), and figure out if that object was instantiated correctly.

PS! Never pass services in the work orders. Work orders should only contain "simple" values (e.g. ints, strings, arrays, etc), never large objects that can carry state.

@hempels
Copy link

hempels commented Jun 4, 2019

As written, each registered QueuedHostedService will run a single task at a time, queuing up work to be completed as soon as the previous task completes. That's a typical pattern for a task queue. Converting it to a concurrent task queue would introduce complexity. It could be done but at that point I would suggest it's likely no longer a good fit for asp.net and should instead be moved to a service bus, etc.

@kakins
Copy link

kakins commented Jul 10, 2019

As written, each registered QueuedHostedService will run a single task at a time, queuing up work to be completed as soon as the previous task completes. That's a typical pattern for a task queue. Converting it to a concurrent task queue would introduce complexity. It could be done but at that point I would suggest it's likely no longer a good fit for asp.net and should instead be moved to a service bus, etc.

I think as @LinusCenterstrom mentioned, does this seem to contradict the response from @davidfowl on this topic?

@hempels
Copy link

hempels commented Jul 10, 2019

Does this seem to contradict the response from @davidfowl on this topic?

Two different conversations. @davidfowl was describing what MS would build if they add support for background tasks into the framework. I was describing how the solution contributed by @vegardlarsen works as written. They are very different approaches.

@kjkrum
Copy link

kjkrum commented Jul 25, 2019

I took an entirely different approach. Instead of explicitly maintaining a task queue, I let controllers schedule tasks in whatever manner is appropriate for the task, and register them with a singleton that will block graceful shutdown until all tasks complete.

    public class TaskRegistry : IDisposable
    {
        public CancellationToken ApplicationStopping { get; }
        private readonly CountdownEvent counter = new CountdownEvent(1);

        public TaskRegistry(IApplicationLifetime app)
        {
            ApplicationStopping = app.ApplicationStopping;
            ApplicationStopping.Register(() =>
            {
                counter.Signal();
                counter.Wait();
            });
        }

        public void Register(Task task)
        {
            if (task.Status == TaskStatus.Created)
            {
                throw new InvalidOperationException();
            }
            counter.AddCount();
            task.ContinueWith(t => counter.Signal());
        }

        public void Dispose()
        {
            counter.Dispose();
        }
    }

With this injected as Tasks, controllers simply call

   Tasks.Register(SomeAsyncMethod(Tasks.ApplicationStopping));

@LFCYNWA2019
Copy link

LFCYNWA2019 commented Aug 24, 2019

Where should I put the code backgroundTaskQueue.QueueUpdateUserReputation("vegardlarsen"); if using window service .net core 2.1? Also, can you post an example on how to add item to the queue as well? Thank You!

@vegardlarsen
Copy link

@LFCYNWA2019 You just need to get hold of a IBackgroundTaskQueue instance to call it. You can do that by just adding it as a constructor parameter on e.g. a controller. Adding an item to the queue is easy if you look at the implementation of QueueUpdateUserReputation above.

@bidianqing
Copy link

@davidfowl Any update to this?

@davidfowl
Copy link
Member Author

No, there's lots of great code in this thread that does the job.

@IGx89
Copy link

IGx89 commented Nov 27, 2019

@davidfowl The docs have a link to this issue labeled "tentatively scheduled to be built-in for ASP.NET Core". I'm assuming because of that and the fact that this issue is still open, that's still the plan?

@ssg
Copy link

ssg commented Feb 9, 2020

Perhaps closing this issue is in order?

@omatrot
Copy link

omatrot commented Mar 31, 2020

@davidfowl not sure if I understood. If I queue multiple tasks, they'll run concurrently?

@mixandmatch025 yes.

@davidfowl This is not the behavior I'm experiencing. I had a bug in a one of the queued tasks that made it trapped in a loop thus consuming CPU and running forever. All other pending tasks were patiently waiting in the queue.

I'm building an app that will be used by multiple consumers concurrently. One task should not prevent the others for running. I'd like to introduce a worker for a groups of consumer and also be able for a task to timeout and be canceled automatically.

@davidfowl
Copy link
Member Author

Not sure what you mean. I was referring to the API that exists in .NET Framework not the code on this thread. Maybe that’s where our wires for crossed.

Task queued to the thread pool execute in parallel.

@omatrot
Copy link

omatrot commented Mar 31, 2020

@davidfowl I was refereing to the code on this thread.

@davidfowl
Copy link
Member Author

I wasn’t 😬

@omatrot
Copy link

omatrot commented Mar 31, 2020

@vegardlarsen I'm using your code and so far it was ok. Then, because of a bug, I had a queued task running forever trapped in a loop. The side effect was that other queued tasks were not processed at all.

I have 2 questions:

  1. Can I make a running task cancel automatically after a timeout?
  2. Can I have a several queues processed in parallel?

Thanks in advance.

@vegardlarsen
Copy link

vegardlarsen commented Mar 31, 2020

@omatrot

  1. You could modify the BackgroundTaskQueue to support timeouts, by adding a timeout when you add it to the internal queue; and only allow it to be dequeued if the timeout hasn't passed. Or you could probably do something like await Task.WhenAny(task, Task.Delay(10000)); instead of await task; to not wait for the results. Or you could set something better up with CancellationTokenss.

  2. Regarding multiple queues in parallell; I think it could be as simple as adding multiple instances of the IHostedService, basically multiple calls to services.AddHostedService<QueuedHostedService>();. Not sure if the runtime will prevent you from doing that though.

Note that my code above is just an example of how I did this in my code, and you can modify it freely. At this point I sincerely doubt if this will make it into the library; and I am not sure if it is enough utility in it to roll it into its own Nuget.

@hempels
Copy link

hempels commented Apr 1, 2020

  1. Regarding multiple queues in parallell; I think it could be as simple as adding multiple instances of the IHostedService, basically multiple calls to services.AddHostedService<QueuedHostedService>();. Not sure if the runtime will prevent you from doing that though.

This works, the runtime has no issue with it. We run multiple hosted service instances so that we can have parallel queues for different task types.

@omatrot
Copy link

omatrot commented Apr 2, 2020

@hempels how do you access a particular instance?

@hempels
Copy link

hempels commented Apr 3, 2020

@hempels how do you access a particular instance?

Each instance registered separately via DI, which resolves them as an IEnumerable.

services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue<UpdateNamesAndRolesBackgroundTaskQueue>>();
services.AddHostedService<QueuedHostedService<UpdateNamesAndRolesBackgroundTaskQueue>>();
public NamesAndRolesSync(IEnumerable<IBackgroundTaskQueue> queues) {
var queue = queues.FirstOrDefault(q => q.GetType().GenericTypeArguments.FirstOrDefault()?.Name == typeof(UpdateNamesAndRolesBackgroundTaskQueue).Name);
...

This can be generalized into helper methods, or done more automagically with DI.

@vflame
Copy link

vflame commented Apr 15, 2020

Any advantages of using System.Thread.Channel versus this for implementation?

@davidfowl
Copy link
Member Author

@vflame Advantages of which approach, there are varying implementations on this thread alone. Channels works fine, the thread pool works fine, it depends on the set of tradeoffs you're willing to make.

  • Do you care about execution order of the scheduled tasks?
  • Do you care about a best effort way of completing tasks in case of a shutdown?
  • Do you care about the knowing the task definitely completed (What happens if it crashes?). None of these solutions handle that.

etc. These solutions are now pretty trivial to build because of the primitives in the system (channels, task.run, various synchronization primitives)

@analogrelay
Copy link

Triage: This thread has a lot of different suggestions for patterns and it feels like that means there's not a clear built-in solution for the base platform to provide. Closing as we don't plan to build this in to the product at this time. If there are issues blocking development of components that do provide queue-based background workers, we should look at addressing those in separate issues.

@dotnet dotnet locked as resolved and limited conversation to collaborators May 30, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests