-
Notifications
You must be signed in to change notification settings - Fork 49
/
DownloaderQueue.cs
131 lines (118 loc) · 4.12 KB
/
DownloaderQueue.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace Harvest
{
public class DownloaderQueue
{
/// <summary>
/// A list of the currentl executing downloaders.
/// </summary>
private readonly IList<Downloader> _executing = new List<Downloader>();
/// <summary>
/// A queue of downloaders that are waiting for execution.
/// </summary>
private readonly IList<Downloader> _queue = new List<Downloader>();
private readonly object _mutex = new object();
private readonly Action<Page> _onPageDownloaded;
/// <summary>
/// Max number of threads to run concurrently.
/// </summary>
public const int MaxConcurrency = 5;
/// <summary>
/// Raised when crawler is completed.
/// </summary>
public event Action OnCrawlingCompleted;
public DownloaderQueue(Action<Page> onPageDownloaded)
{
_onPageDownloaded = onPageDownloaded;
}
/// <summary>
/// Enqueue a new downloader.
/// </summary>
/// <param name="downloader">Downloader to enqueue.</param>
public void Enqueue(Downloader downloader)
{
lock (_mutex)
{
if (_queue.All(x => x.Url.AbsoluteUri != downloader.Url.AbsoluteUri))
{
_queue.Add(downloader);
}
}
}
/// <summary>
/// Process the downloader queue.
/// </summary>
public void Process()
{
ThreadStart enqueuerThreadMethod = () =>
{
var rand = new Random();
while (true)
{
lock (_mutex)
{
if (_queue.Any() && _executing.Count() < MaxConcurrency)
{
var fetcher = _queue[rand.Next(_queue.Count)];
if (_executing.Any(x => x.Url == fetcher.Url))
{
_queue.Remove(fetcher);
}
else
{
_queue.Remove(fetcher);
_executing.Add(fetcher);
fetcher.StartFetch();
}
}
}
}
};
ThreadStart downloaderCompletionCheckMethod = () =>
{
while (true)
{
lock (_mutex)
{
if (_executing.Any(x => x.Completed))
{
var downloader = _executing.First(x => x.Completed);
RemoveFetcher(downloader);
if (downloader.DownloadedPage != null)
{
// We did download a page
_onPageDownloaded(downloader.DownloadedPage);
if (CheckForCompleteness() && OnCrawlingCompleted != null)
{
OnCrawlingCompleted();
break;
}
}
}
}
}
};
new Thread(enqueuerThreadMethod).Start();
new Thread(downloaderCompletionCheckMethod).Start();
}
private bool CheckForCompleteness()
{
return !_queue.Any() && !_executing.Any();
}
/// <summary>
/// Remove a downloader from both the executing list and queue.
/// </summary>
/// <param name="downloader"></param>
private void RemoveFetcher(Downloader downloader)
{
lock (_mutex)
{
_executing.Remove(downloader);
_queue.Remove(downloader);
}
}
}
}