-
Notifications
You must be signed in to change notification settings - Fork 11
/
hq.go
287 lines (246 loc) · 7.93 KB
/
hq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package crawl
import (
"math"
"net/url"
"strings"
"sync"
"time"
"git.archive.org/wb/gocrawlhq"
"github.com/CorentinB/Zeno/internal/pkg/frontier"
"github.com/CorentinB/Zeno/internal/pkg/utils"
"github.com/sirupsen/logrus"
)
// This function connects to HQ's websocket and listen for messages.
// It also sends and "identify" message to the HQ to let it know that
// Zeno is connected. This "identify" message is sent every second and
// contains the crawler's stats and details.
func (c *Crawl) HQWebsocket() {
var (
// the "identify" message will be sent every second
// to the crawl HQ
identifyTicker = time.NewTicker(time.Second)
)
defer func() {
identifyTicker.Stop()
}()
// send an "identify" message to the crawl HQ every second
for {
err := c.HQClient.Identify(&gocrawlhq.IdentifyMessage{
Project: c.HQProject,
Job: c.Job,
IP: utils.GetOutboundIP().String(),
Hostname: utils.GetHostname(),
GoVersion: utils.GetVersion().GoVersion,
})
if err != nil {
logrus.WithFields(c.genLogFields(err, nil, nil)).Errorln("error sending identify payload to crawl HQ, trying to reconnect..")
err = c.HQClient.InitWebsocketConn()
if err != nil {
logrus.WithFields(c.genLogFields(err, nil, nil)).Errorln("error initializing websocket connection to crawl HQ")
}
}
<-identifyTicker.C
}
}
func (c *Crawl) HQProducer() {
defer c.HQChannelsWg.Done()
var (
discoveredArray = []gocrawlhq.URL{}
mutex = sync.Mutex{}
terminateProducer = make(chan bool)
)
// the discoveredArray is sent to the crawl HQ every 10 seconds
// or when it reaches a certain size
go func() {
HQLastSent := time.Now()
for {
select {
case <-terminateProducer:
// no need to lock the mutex here, because the producer channel
// is already closed, so no other goroutine can write to the slice
if len(discoveredArray) > 0 {
for {
_, err := c.HQClient.Discovered(discoveredArray, "seed", false, false)
if err != nil {
logrus.WithFields(c.genLogFields(err, nil, nil)).Errorln("error sending payload to crawl HQ, waiting 1s then retrying..")
time.Sleep(time.Second)
continue
}
break
}
}
return
default:
mutex.Lock()
if (len(discoveredArray) >= int(math.Ceil(float64(c.Workers)/2)) || time.Since(HQLastSent) >= time.Second*10) && len(discoveredArray) > 0 {
for {
_, err := c.HQClient.Discovered(discoveredArray, "seed", false, false)
if err != nil {
logrus.WithFields(c.genLogFields(err, nil, nil)).Errorln("error sending payload to crawl HQ, waiting 1s then retrying..")
time.Sleep(time.Second)
continue
}
break
}
discoveredArray = []gocrawlhq.URL{}
HQLastSent = time.Now()
}
mutex.Unlock()
}
}
}()
// listen to the discovered channel and add the URLs to the discoveredArray
for discoveredItem := range c.HQProducerChannel {
discoveredURL := gocrawlhq.URL{
Value: utils.URLToString(discoveredItem.URL),
Via: utils.URLToString(discoveredItem.ParentItem.URL),
}
for i := 0; uint8(i) < discoveredItem.Hop; i++ {
discoveredURL.Path += "L"
}
mutex.Lock()
discoveredArray = append(discoveredArray, discoveredURL)
mutex.Unlock()
}
// if we are here, it means that the HQProducerChannel has been closed
// so we need to send the last payload to the crawl HQ
terminateProducer <- true
}
func (c *Crawl) HQConsumer() {
for {
// This is on purpose evaluated every time,
// because the value of workers will maybe change
// during the crawl in the future (to be implemented)
var HQBatchSize = int(math.Ceil(float64(c.Workers) / 2))
if c.Finished.Get() {
break
}
if c.Paused.Get() {
time.Sleep(time.Second)
}
// If HQContinuousPull is set to true, we will pull URLs from HQ
// continuously, otherwise we will only pull URLs when needed
if !c.HQContinuousPull {
if c.ActiveWorkers.Value() >= int64(c.Workers-(c.Workers/10)) {
time.Sleep(time.Millisecond * 100)
continue
}
}
// If a specific HQ batch size is set, use it
if c.HQBatchSize != 0 {
HQBatchSize = c.HQBatchSize
}
// get batch from crawl HQ
batch, err := c.HQClient.Feed(HQBatchSize, c.HQStrategy)
if err != nil {
logrus.WithFields(c.genLogFields(err, nil, map[string]interface{}{
"batchSize": HQBatchSize,
})).Debugln("error getting new URLs from crawl HQ")
}
// send all URLs received in the batch to the frontier
for _, URL := range batch.URLs {
newURL, err := url.Parse(URL.Value)
if err != nil {
logrus.WithFields(c.genLogFields(err, nil, map[string]interface{}{
"batchSize": HQBatchSize,
})).Errorln("unable to parse URL received from crawl HQ, discarding")
}
c.Frontier.PushChan <- frontier.NewItem(newURL, nil, "seed", uint8(strings.Count(URL.Path, "L")), URL.ID)
}
}
}
func (c *Crawl) HQFinisher() {
defer c.HQChannelsWg.Done()
var (
finishedArray = []gocrawlhq.URL{}
locallyCrawledTotal int
)
for finishedItem := range c.HQFinishedChannel {
if finishedItem.ID == "" {
logWarning.WithFields(c.genLogFields(nil, finishedItem.URL, nil)).Warnln("URL has no ID, discarding")
continue
}
locallyCrawledTotal += int(finishedItem.LocallyCrawled)
finishedArray = append(finishedArray, gocrawlhq.URL{ID: finishedItem.ID, Value: utils.URLToString(finishedItem.URL)})
if len(finishedArray) == int(math.Ceil(float64(c.Workers)/2)) {
for {
_, err := c.HQClient.Finished(finishedArray, locallyCrawledTotal)
if err != nil {
logError.WithFields(c.genLogFields(err, nil, map[string]interface{}{
"finishedArray": finishedArray,
})).Errorln("error submitting finished urls to crawl HQ. retrying in one second...")
time.Sleep(time.Second)
continue
}
break
}
finishedArray = []gocrawlhq.URL{}
locallyCrawledTotal = 0
}
}
// send remaining finished URLs
if len(finishedArray) > 0 {
for {
_, err := c.HQClient.Finished(finishedArray, locallyCrawledTotal)
if err != nil {
logError.WithFields(c.genLogFields(err, nil, map[string]interface{}{
"finishedArray": finishedArray,
})).Errorln("error submitting finished urls to crawl HQ. retrying in one second...")
time.Sleep(time.Second)
continue
}
break
}
}
}
func (c *Crawl) HQSeencheckURLs(URLs []*url.URL) (seencheckedBatch []*url.URL, err error) {
var (
discoveredURLs []gocrawlhq.URL
)
for _, URL := range URLs {
discoveredURLs = append(discoveredURLs, gocrawlhq.URL{
Value: utils.URLToString(URL),
})
}
discoveredResponse, err := c.HQClient.Discovered(discoveredURLs, "asset", false, true)
if err != nil {
logError.WithFields(c.genLogFields(err, nil, map[string]interface{}{
"batchLen": len(URLs),
"urls": discoveredURLs,
})).Errorln("error sending seencheck payload to crawl HQ")
return seencheckedBatch, err
}
if discoveredResponse.URLs != nil {
for _, URL := range discoveredResponse.URLs {
// the returned payload only contain new URLs to be crawled by Zeno
newURL, err := url.Parse(URL.Value)
if err != nil {
logError.WithFields(c.genLogFields(err, URL, map[string]interface{}{
"batchLen": len(URLs),
})).Errorln("error parsing URL from HQ seencheck response")
return seencheckedBatch, err
}
seencheckedBatch = append(seencheckedBatch, newURL)
}
}
return seencheckedBatch, nil
}
func (c *Crawl) HQSeencheckURL(URL *url.URL) (bool, error) {
discoveredURL := gocrawlhq.URL{
Value: utils.URLToString(URL),
}
discoveredResponse, err := c.HQClient.Discovered([]gocrawlhq.URL{discoveredURL}, "asset", false, true)
if err != nil {
logrus.WithFields(c.genLogFields(err, URL, nil)).Errorln("error sending seencheck payload to crawl HQ")
return false, err
}
if discoveredResponse.URLs != nil {
for _, URL := range discoveredResponse.URLs {
if URL.Value == discoveredURL.Value {
return false, nil
}
}
}
// didn't find the URL in the HQ, so it's new and has been added to HQ's seencheck database
return true, nil
}