Skip to content

Commit

Permalink
Fixes issues around delaying tasks affecting timers
Browse files Browse the repository at this point in the history
  • Loading branch information
Turnerj committed Dec 31, 2018
1 parent 4cfa85c commit 3f9a44e
Showing 1 changed file with 25 additions and 11 deletions.
36 changes: 25 additions & 11 deletions src/InfinityCrawler/ParallelAsyncTask.cs
Expand Up @@ -36,9 +36,19 @@ ParallelAsyncTaskOptions options
{
if (itemsToProcess.TryDequeue(out var item))
{
var task = action(item, itemsToProcess);
var taskStartDelay = 0d;
//Task delaying and backoff
if (options.DelayBetweenTaskStart.TotalMilliseconds > 0)
{
taskStartDelay = options.DelayBetweenTaskStart.TotalMilliseconds;
taskStartDelay += random.NextDouble() * options.DelayJitter.TotalMilliseconds;
}

taskStartDelay += currentBackoff;

var timer = new Stopwatch();
timer.Start();
var task = RunAction(item, action, itemsToProcess, (int)taskStartDelay, timer);

activeTasks.TryAdd(task, timer);
taskCount++;

Expand All @@ -49,15 +59,6 @@ ParallelAsyncTaskOptions options
return;
}

//Task delaying and backoff
if (options.DelayBetweenTaskStart.TotalMilliseconds > 0)
{
var taskStartDelay = options.DelayBetweenTaskStart.TotalMilliseconds;
taskStartDelay += random.NextDouble() * options.DelayJitter.TotalMilliseconds;
taskStartDelay += currentBackoff;
Thread.Sleep((int)taskStartDelay);
}

if (activeTasks.Count == options.MaxNumberOfSimultaneousTasks)
{
break;
Expand All @@ -84,6 +85,7 @@ ParallelAsyncTaskOptions options
{
successesSinceLastThrottle = 0;
currentBackoff += (int)options.ThrottlingRequestBackoff.TotalMilliseconds;
Console.WriteLine($"New backoff: {currentBackoff}ms");
}
else if (currentBackoff > 0)
{
Expand All @@ -93,11 +95,23 @@ ParallelAsyncTaskOptions options
var newBackoff = currentBackoff - options.ThrottlingRequestBackoff.TotalMilliseconds;
currentBackoff = Math.Max(0, (int)newBackoff);
successesSinceLastThrottle = 0;
Console.WriteLine($"New backoff: {currentBackoff}ms");
}
}
}
}
}

private static async Task RunAction<TModel>(TModel item, Func<TModel, ConcurrentQueue<TModel>, Task> action, ConcurrentQueue<TModel> itemsToProcess, int delay, Stopwatch timer)
{
if (delay > 0)
{
await Task.Delay(delay);
}
timer.Start();
await action(item, itemsToProcess);
timer.Stop();
}
}

public class ParallelAsyncTaskOptions
Expand Down

0 comments on commit 3f9a44e

Please sign in to comment.