Skip to content

Commit

Permalink
Prevent zero delays between fetch retry attempts when lock acquisitio…
Browse files Browse the repository at this point in the history
…n failed without blocking

When using SlidingInvisibility-based fetching with sub-second polling intervals, lock acquisition on SQL Server's side may be failed without performing any blocking. In previous implementation this may result in busy loops without delays, causing high network consumption. Now it throws an exception caught by BackgroundExecution that's adding delays between invocations.

Relates to #1456
  • Loading branch information
odinserj committed Jun 28, 2019
1 parent 5c08eb4 commit 591e8ab
Showing 1 changed file with 24 additions and 21 deletions.
45 changes: 24 additions & 21 deletions src/Hangfire.SqlServer/SqlServerJobQueue.cs
Expand Up @@ -34,11 +34,11 @@ internal class SqlServerJobQueue : IPersistentJobQueue
// This is an optimization that helps to overcome the polling delay, when
// both client and server reside in the same process. Everything is working
// without this event, but it helps to reduce the delays in processing.
internal static readonly AutoResetEvent NewItemInQueueEvent = new AutoResetEvent(true);
internal static readonly AutoResetEvent NewItemInQueueEvent = new AutoResetEvent(false);

private static readonly TimeSpan LongPollingThreshold = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan PollingQuantum = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan MinPollingDelay = TimeSpan.FromMilliseconds(50);
private static readonly TimeSpan LongPollingThreshold = TimeSpan.FromSeconds(1);
private static readonly int PollingQuantumMs = 1000;
private static readonly int MinPollingDelayMs = 50;

private readonly SqlServerStorage _storage;
private readonly SqlServerStorageOptions _options;
Expand Down Expand Up @@ -92,10 +92,9 @@ private SqlServerTimeoutJob DequeueUsingSlidingInvisibilityTimeout(string[] queu
var lockResource = $"{_storage.SchemaName}_FetchLockLock_{String.Join("_", queues.OrderBy(x => x))}";
var isBlocking = false;

var pollingInterval = _options.QueuePollInterval;
var pollingDelay = pollingInterval > MinPollingDelay && pollingInterval <= PollingQuantum
? pollingInterval
: MinPollingDelay;
var pollingDelayMs = Math.Min(
Math.Max((int)_options.QueuePollInterval.TotalMilliseconds, MinPollingDelayMs),
PollingQuantumMs);

SqlServerTimeoutJob fetched;

Expand All @@ -104,17 +103,17 @@ private SqlServerTimeoutJob DequeueUsingSlidingInvisibilityTimeout(string[] queu
do
{
cancellationToken.ThrowIfCancellationRequested();
int? lockResult = null;

fetched = _storage.UseConnection(null, connection =>
{
var parameters = new
{
queues = queues,
timeout = _options.SlidingInvisibilityTimeout.Value.Negate().TotalSeconds,
lockResource = lockResource,
pollingDelayMs = (int)pollingDelay.TotalMilliseconds,
pollingQuantumMs = (int)PollingQuantum.TotalMilliseconds
};
var parameters = new DynamicParameters();
parameters.Add("@queues", queues);
parameters.Add("@timeout", (int)_options.SlidingInvisibilityTimeout.Value.Negate().TotalSeconds);
parameters.Add("@lockResource", lockResource);
parameters.Add("@pollingDelayMs", pollingDelayMs);
parameters.Add("@pollingQuantumMs", PollingQuantumMs);
parameters.Add("@result", dbType: DbType.Int32, direction: ParameterDirection.Output);
var query = isBlocking ? GetBlockingFetchSql() : GetNonBlockingFetchSql();
Expand All @@ -126,11 +125,12 @@ private SqlServerTimeoutJob DequeueUsingSlidingInvisibilityTimeout(string[] queu
var fetchedJob = reader.Read<FetchedJob>().SingleOrDefault(x => x != null);
if (fetchedJob != null && !(fetchedJob.Id == 0 && fetchedJob.JobId == 0 && fetchedJob.Queue == null))
{
return new SqlServerTimeoutJob(_storage, fetchedJob.Id, fetchedJob.JobId.ToString(CultureInfo.InvariantCulture), fetchedJob.Queue, fetchedJob.FetchedAt);
return new SqlServerTimeoutJob(_storage, fetchedJob.Id, fetchedJob.JobId.ToString(CultureInfo.InvariantCulture), fetchedJob.Queue, fetchedJob.FetchedAt.Value);
}
}
}
lockResult = parameters.Get<int?>("@result");
return null;
});

Expand All @@ -139,13 +139,18 @@ private SqlServerTimeoutJob DequeueUsingSlidingInvisibilityTimeout(string[] queu
break;
}

if (pollingInterval < LongPollingThreshold)
if (lockResult.HasValue && lockResult.Value < -1)
{
throw new InvalidOperationException($"A call to sp_getapplock returned unexpected result '{lockResult.Value}' while fetching a job. Please report this problem to Hangfire developers and don't use sub-second values for the QueuePollInterval option.");
}

if (_options.QueuePollInterval < LongPollingThreshold)
{
isBlocking = true;
}
else
{
WaitHandle.WaitAny(new WaitHandle[] { cancellationEvent.WaitHandle, NewItemInQueueEvent }, pollingInterval);
WaitHandle.WaitAny(new WaitHandle[] { cancellationEvent.WaitHandle, NewItemInQueueEvent }, _options.QueuePollInterval);
cancellationToken.ThrowIfCancellationRequested();
}
} while (true);
Expand Down Expand Up @@ -176,8 +181,6 @@ private string GetBlockingFetchSql()
set xact_abort on;
set transaction isolation level read committed;
declare @result int;
EXEC @result = sp_getapplock @Resource = @lockResource, @LockMode = 'Exclusive', @LockTimeout = @pollingQuantumMs, @LockOwner = 'Session';
IF (@result >= 0)
Expand Down

0 comments on commit 591e8ab

Please sign in to comment.