Skip to content

Commit

Permalink
Fixed queue to utilize fetchToken and fixes #6
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-pat committed Oct 27, 2016
1 parent 31e1c58 commit 7031c05
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 19 deletions.
16 changes: 16 additions & 0 deletions Hangfire.MySql/JobQueue/MySqlFetchedJob.cs
Expand Up @@ -14,6 +14,9 @@ internal class MySqlFetchedJob : IFetchedJob
private readonly MySqlStorage _storage;
private readonly IDbConnection _connection;
private readonly int _id;
private bool _removedFromQueue;
private bool _requeued;
private bool _disposed;

public MySqlFetchedJob(
MySqlStorage storage,
Expand All @@ -33,7 +36,17 @@ internal class MySqlFetchedJob : IFetchedJob

public void Dispose()
{

if (_disposed) return;

if (!_removedFromQueue && !_requeued)
{
Requeue();
}

_storage.ReleaseConnection(_connection);

_disposed = true;
}

public void RemoveFromQueue()
Expand All @@ -48,6 +61,8 @@ public void RemoveFromQueue()
{
id = _id
});

_removedFromQueue = true;
}

public void Requeue()
Expand All @@ -62,6 +77,7 @@ public void Requeue()
{
id = _id
});
_requeued = true;
}

public string JobId { get; private set; }
Expand Down
44 changes: 25 additions & 19 deletions Hangfire.MySql/JobQueue/MySqlJobQueue.cs
Expand Up @@ -43,27 +43,33 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken)
{
using (new MySqlDistributedLock(_storage, "JobQueue", TimeSpan.FromSeconds(30)))
{
fetchedJob =
connection
.Query<FetchedJob>(
"select Id, JobId, Queue " +
"from JobQueue " +
"where (FetchedAt is null or FetchedAt < DATE_ADD(UTC_TIMESTAMP(), INTERVAL @timeout SECOND)) " +
" and Queue in @queues " +
"limit 1;",
new
{
queues = queues,
timeout = _options.InvisibilityTimeout.Negate().TotalSeconds
})
.SingleOrDefault();
string token = Guid.NewGuid().ToString();

if (fetchedJob != null)
int nUpdated = connection.Execute(
"update JobQueue set FetchedAt = UTC_TIMESTAMP(), FetchToken = @fetchToken " +
"where (FetchedAt is null or FetchedAt < DATE_ADD(UTC_TIMESTAMP(), INTERVAL @timeout SECOND)) " +
" and Queue in @queues " +
"LIMIT 1;",
new
{
queues = queues,
timeout = _options.InvisibilityTimeout.Negate().TotalSeconds,
fetchToken = token
});

if(nUpdated != 0)
{
connection
.Execute(
"update JobQueue set FetchedAt = @fetchedAt where Id = @id",
new {fetchedAt = DateTime.UtcNow, id = fetchedJob.Id});
fetchedJob =
connection
.Query<FetchedJob>(
"select Id, JobId, Queue " +
"from JobQueue " +
"where FetchToken = @fetchToken;",
new
{
fetchToken = token
})
.SingleOrDefault();
}
}
}
Expand Down

0 comments on commit 7031c05

Please sign in to comment.