New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel the CancellationToken when the job is deleted from dashboard #976

Open
wants to merge 28 commits into
base: master
from

Conversation

8 participants
@pieceofsummer
Copy link
Contributor

pieceofsummer commented Aug 21, 2017

So running async tasks can gracefully abort

Tsabo and others added some commits May 29, 2017

Change order of the QueueAttribute state filter to be always the last…
… one (#900)

It's better to make it run as an apply state filter, but this is a
breaking change. This filter always should be called last, especially if
previous filters are suspected in changing candidate state to the
enqueued one, to not to ignore the queue setting. This behavior leads to
bug, when continuation is created when antecedent job is already
finished, and our continuation is enqueued to the default queue,
regardless of the specified one.
Allow Hangfire.SqlServer to use fetching based on invisibility timeou…
…t again

Transaction-based fetching appeared in 1.5.0 may prevent from taking transaction log backups, when there are a lot of long-running background jobs. This commit brings back support for fetching without a transaction.
Allow transactional fetch to dequeue timed out jobs
When upgrading from 1.4.7 or earlier version, there could be some messages with FetchedAt column set, due to ungraceful shutdown. We should process those messages either.
Check continuation age before throwing timeout exception
If it's large enogh, simply ignore the continuation, because we can
think its creation was aborted, and it will never be initialized.
Don't wait for sp_getapplock on SQL Server's side by using zero timeout
sp_getapplock may quickly lead to SQL Server's thread pool starvation,
when non-zero timeout is used (i.e. we are waiting on SQL Server's side)
and have a lot of workers, especially on SQL Azure.
Automatically open a given existing connection if it's closed
Otherwise all the acquired distributed locks will be silently released
by SQL Server, because Dapper will close the underlying connection just
after a sp_getapplock execution, and the just-acquired lock will be
silently released. This will cause problems with data consistency.
Throw exception when trying to acquire a lock on closed connection
Please see the description of the previous commit for details.
Call sp_releaseapplock only when connection is still open
If connection is closed or broken, SQL Server already released a lock
for us, and we shouldn't do anything. This commit will remove some
"Cannot release the application lock because it is not currently held"
messages.
Optimize the distributed lock acquire algorithm appeared in 1.6.15
Previous implementation may cause much more queries, when acquiring a
lock on a blocked resource, when high value is passed in the timeout
argument. We'll pass second-based timeout instead to the sp_getapplock
procedure instead – it's not too high to cause thread pool starvation,
and large enough to significantly reduce the number of queries.
Remove length limitations on SqlServerDistributedLock timeout
Since that timeout isn't passed to sp_getapplock anymore, they could be
removed.
@codecov-io

This comment has been minimized.

Copy link

codecov-io commented Aug 21, 2017

Codecov Report

Merging #976 into master will increase coverage by 0.38%.
The diff coverage is 77.34%.

Impacted Files Coverage Δ
src/Hangfire.Core/BackgroundJobServer.cs 0% <0%> (ø) ⬆️
src/Hangfire.SqlServer/SqlServerMonitoringApi.cs 1.18% <0%> (ø) ⬆️
...Hangfire.Core/Server/BackgroundProcessingServer.cs 72.36% <100%> (+0.36%) ⬆️
...e.Core/Server/BackgroundProcessingServerOptions.cs 100% <100%> (ø) ⬆️
src/Hangfire.Core/QueueAttribute.cs 100% <100%> (ø) ⬆️
src/Hangfire.SqlServer/SqlServerTransactionJob.cs 84.09% <100%> (ø)
src/Hangfire.Core/AutomaticRetryAttribute.cs 80.28% <100%> (+0.28%) ⬆️
src/Hangfire.Core/JobCancellationToken.cs 100% <100%> (ø) ⬆️
src/Hangfire.Core/BackgroundJobServerOptions.cs 93.54% <100%> (+0.44%) ⬆️
src/Hangfire.Core/Server/Worker.cs 86.95% <100%> (ø) ⬆️
... and 17 more
@plaisted

This comment has been minimized.

Copy link
Contributor

plaisted commented Aug 22, 2017

I've often started by job methods with something like:

//edited post to add in cancel/dispose to snippet
var source = new CancellationTokenSource();
try {
    Task.Run(() => JobCancellationHelper(source, jobCancellationToken, 5000)); //jobCancellationToken is hangfires IJobCancellationToken  
    //main code here,  use source.Token instead of IJobCancellationToken
} finally {
    source.Cancel();
    source.Dispose();
}

and then pass the new CancellationToken from the CancellationTokenSource instead of using the Hangfires IJobCancellationToken. The JobCancellationHelper is a simple method like:

public async Task JobCancellationHelper(CancellationTokenSource source, IJobCancellationToken cancellationToken, int delayMs)
{
    while (!source.Token.IsCancellationRequested)
    {
        try
        {
            //requires DB round trip
            cancellationToken.ThrowIfCancellationRequested();
        }catch (OperationCanceledException e)
        {
            source.Cancel();
            return;
        }
        await Task.Delay(delayMs, source.Token).ContinueWith(t=> { });//swallow exception
    }
}

Having Hangfire handle that for me would be nice. Haven't looked at your suggested changes but just posting this here in case others have this issue and want a workaround.

@pieceofsummer

This comment has been minimized.

Copy link
Contributor

pieceofsummer commented Aug 22, 2017

@plaisted I believe there's a serious issue with your snippet because multiple threads are using the same IStorageConnection instance, which could lead to race conditions and internal state corruption.

I also don't see how the task terminates when the job finishes normally. Do you need to manually call source.Cancel() when the job finishes?

@plaisted

This comment has been minimized.

Copy link
Contributor

plaisted commented Aug 22, 2017

@pieceofsummer
Thanks for the comments. Regarding calling source.Cancel() I've edited the code snippet I put there to reflect cancelling and disposing of the CTS.

As far as the shared IStorageConnection I'll look into the code for the Worker a little more. My assumption was that if it is safe to call in the main method it should be safe to call in a spawned task as long as you don't access it in both. I think the JobContext and IJobCancellationToken share a single IStorageConnection so there may be conflicts there if using both. Can you elaborate on the issue a little more? Looking at SqlServer it appears to create a new Sql Connection for each command so not sure there would be an issue there.

@pieceofsummer

This comment has been minimized.

Copy link
Contributor

pieceofsummer commented Aug 23, 2017

@plaisted sure, it may only affect the situation when you use it from the job code too, e.g. logging to Hangfire.Console etc.

SQL Server storage really creates a new SqlConnection every time, but there's no guarantee other storage providers will do the same.

Also, have to point out that IJobCancellationToken.ThrowIfCancellationRequested() may throw exception other than OperationCanceledException (for example, if some database error occurred), so you probably need to handle those too.

@rossknudsen

This comment has been minimized.

Copy link

rossknudsen commented Nov 9, 2018

How about rewriting @plaisted's solution with a class implementing IDisposable? But it would be nicer if Hangfire could replace IJobCancellationToken with a CancellationToken. Any comments on the following would be appreciated though.

public class JobCancellationTokenWrapper : IDisposable
{
    private readonly CancellationTokenSource _cancellationSource = new CancellationTokenSource();

    public JobCancellationTokenWrapper(IJobCancellationToken jobCancellationToken, int delayMs = 5000)
    {
        Task.Run(async () =>
        {
            while (!_cancellationSource.Token.IsCancellationRequested)
            {
                try
                {
                    jobCancellationToken.ThrowIfCancellationRequested();
                }
                catch (OperationCanceledException e)
                {
                    _cancellationSource.Cancel();
                    return;
                }

                await Task.Delay(delayMs, _cancellationSource.Token).ContinueWith(t => { }); //swallow exception
            }
        });
    }

    public CancellationToken Token => _cancellationSource.Token;

    public void Dispose()
    {
        _cancellationSource.Cancel();
        _cancellationSource.Dispose();
    }
}

Usage:

public async Task Execute(int snapshotId, IJobCancellationToken cancellationToken)
{
    using (var wrapper = new JobCancellationTokenWrapper(cancellationToken, 5000))
    {
        // TODO implement work here.
        // pass from wrapper.Token to methods that accept CancellationTokens
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment