-
Notifications
You must be signed in to change notification settings - Fork 3
/
crawl.go
322 lines (273 loc) · 9.16 KB
/
crawl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
package crawl
import (
"net/http"
"sync"
"time"
"git.archive.org/wb/gocrawlhq"
"github.com/CorentinB/Zeno/internal/pkg/frontier"
"github.com/CorentinB/Zeno/internal/pkg/utils"
"github.com/CorentinB/warc"
"github.com/paulbellamy/ratecounter"
"github.com/prometheus/client_golang/prometheus"
"github.com/remeh/sizedwaitgroup"
"github.com/sirupsen/logrus"
"github.com/telanflow/cookiejar"
"mvdan.cc/xurls/v2"
)
var (
logInfo *logrus.Logger
logWarning *logrus.Logger
logError *logrus.Logger
)
// PrometheusMetrics define all the metrics exposed by the Prometheus exporter
type PrometheusMetrics struct {
Prefix string
DownloadedURI prometheus.Counter
}
// Crawl define the parameters of a crawl process
type Crawl struct {
*sync.Mutex
StartTime time.Time
SeedList []frontier.Item
Paused *utils.TAtomBool
Finished *utils.TAtomBool
LiveStats bool
ElasticSearchURL string
// Frontier
Frontier *frontier.Frontier
// Crawl settings
WorkerPool sizedwaitgroup.SizedWaitGroup
MaxConcurrentAssets int
Client *warc.CustomHTTPClient
ClientProxied *warc.CustomHTTPClient
Logger logrus.Logger
DisabledHTMLTags []string
ExcludedHosts []string
ExcludedStrings []string
UserAgent string
Job string
JobPath string
MaxHops uint8
MaxRetry int
MaxRedirect int
HTTPTimeout int
MaxConcurrentRequestsPerDomain int
RateLimitDelay int
DisableAssetsCapture bool
CaptureAlternatePages bool
DomainsCrawl bool
Headless bool
Seencheck bool
Workers int
// Cookie-related settings
CookieFile string
KeepCookies bool
CookieJar http.CookieJar
// proxy settings
Proxy string
BypassProxy []string
// API settings
API bool
APIPort string
Prometheus bool
PrometheusMetrics *PrometheusMetrics
// Real time statistics
URIsPerSecond *ratecounter.RateCounter
ActiveWorkers *ratecounter.Counter
CrawledSeeds *ratecounter.Counter
CrawledAssets *ratecounter.Counter
// WARC settings
WARCPrefix string
WARCOperator string
WARCWriter chan *warc.RecordBatch
WARCWriterFinish chan bool
WARCTempDir string
CDXDedupeServer string
WARCFullOnDisk bool
WARCPoolSize int
WARCDedupSize int
DisableLocalDedupe bool
CertValidation bool
// Crawl HQ settings
UseHQ bool
HQAddress string
HQProject string
HQKey string
HQSecret string
HQStrategy string
HQBatchSize int
HQContinuousPull bool
HQClient *gocrawlhq.Client
HQFinishedChannel chan *frontier.Item
HQProducerChannel chan *frontier.Item
HQChannelsWg *sync.WaitGroup
}
// Start fire up the crawling process
func (c *Crawl) Start() (err error) {
c.StartTime = time.Now()
c.Paused = new(utils.TAtomBool)
c.Finished = new(utils.TAtomBool)
c.HQChannelsWg = new(sync.WaitGroup)
regexOutlinks = xurls.Relaxed()
// Setup logging, every day at midnight UTC a new setup
// is triggered in order to change the ES index's name
if c.ElasticSearchURL != "" {
// Goroutine loop that fetch the machine's IP address every second
go func() {
for {
ip := utils.GetOutboundIP().String()
constants.Store("ip", ip)
time.Sleep(time.Second * 10)
}
}()
logInfo, logWarning, logError = utils.SetupLogging(c.JobPath, c.LiveStats, c.ElasticSearchURL)
go func() {
// Get the current time in UTC and figure out when the next midnight will occur
now := time.Now().UTC()
midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
if now.After(midnight) {
midnight = midnight.Add(24 * time.Hour)
}
// Calculate the duration until midnight and add a little extra time to avoid calling your function just before midnight
duration := midnight.Sub(now) + time.Second*10
// Create a timer that will wait until midnight
timer := time.NewTimer(duration)
// Wait for the timer to finish (which will occur at midnight)
<-timer.C
// Call your function
logInfo, logWarning, logError = utils.SetupLogging(c.JobPath, c.LiveStats, c.ElasticSearchURL)
}()
} else {
logInfo, logWarning, logError = utils.SetupLogging(c.JobPath, c.LiveStats, c.ElasticSearchURL)
}
// Start the background process that will handle os signals
// to exit Zeno, like CTRL+C
go c.setupCloseHandler()
// Initialize the frontier
frontierLoggingChan := make(chan *frontier.FrontierLogMessage, 10)
go func() {
for log := range frontierLoggingChan {
switch log.Level {
case logrus.ErrorLevel:
logError.WithFields(c.genLogFields(nil, nil, log.Fields)).Error(log.Message)
case logrus.WarnLevel:
logWarning.WithFields(c.genLogFields(nil, nil, log.Fields)).Warn(log.Message)
case logrus.InfoLevel:
logInfo.WithFields(c.genLogFields(nil, nil, log.Fields)).Info(log.Message)
}
}
}()
c.Frontier.Init(c.JobPath, frontierLoggingChan, c.Workers, c.Seencheck)
c.Frontier.Load()
c.Frontier.Start()
// Start the background process that will periodically check if the disk
// have enough free space, and potentially pause the crawl if it doesn't
go c.handleCrawlPause()
// Function responsible for writing to disk the frontier's hosts pool
// and other stats needed to resume the crawl. The process happen every minute.
// The actual queue used during the crawl and seencheck aren't included in this,
// because they are written to disk in real-time.
go c.writeFrontierToDisk()
// Initialize WARC writer
logrus.Info("Initializing WARC writer..")
// Init WARC rotator settings
rotatorSettings := c.initWARCRotatorSettings()
// Change WARC pool size
rotatorSettings.WARCWriterPoolSize = c.WARCPoolSize
dedupeOptions := warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, SizeThreshold: c.WARCDedupSize}
if c.CDXDedupeServer != "" {
dedupeOptions = warc.DedupeOptions{LocalDedupe: !c.DisableLocalDedupe, CDXDedupe: true, CDXURL: c.CDXDedupeServer, SizeThreshold: c.WARCDedupSize}
}
// Init the HTTP client responsible for recording HTTP(s) requests / responses
HTTPClientSettings := warc.HTTPClientSettings{
RotatorSettings: rotatorSettings,
DedupeOptions: dedupeOptions,
DecompressBody: true,
SkipHTTPStatusCodes: []int{429},
VerifyCerts: c.CertValidation,
TempDir: c.WARCTempDir,
FullOnDisk: c.WARCFullOnDisk,
}
c.Client, err = warc.NewWARCWritingHTTPClient(HTTPClientSettings)
if err != nil {
logrus.Fatalf("Unable to init WARC writing HTTP client: %s", err)
}
go func() {
for err := range c.Client.ErrChan {
logError.WithFields(c.genLogFields(err, nil, nil)).Errorf("WARC HTTP client error")
}
}()
c.Client.Timeout = time.Duration(c.HTTPTimeout) * time.Second
logrus.Infof("HTTP client timeout set to %d seconds", c.HTTPTimeout)
if c.Proxy != "" {
proxyHTTPClientSettings := HTTPClientSettings
proxyHTTPClientSettings.Proxy = c.Proxy
c.ClientProxied, err = warc.NewWARCWritingHTTPClient(proxyHTTPClientSettings)
if err != nil {
logError.Fatal("unable to init WARC writing (proxy) HTTP client")
}
go func() {
for err := range c.ClientProxied.ErrChan {
logError.WithFields(c.genLogFields(err, nil, nil)).Error("WARC HTTP client error")
}
}()
}
logrus.Info("WARC writer initialized")
// Process responsible for slowing or pausing the crawl
// when the WARC writing queue gets too big
go c.crawlSpeedLimiter()
if c.API {
go c.startAPI()
}
// Parse input cookie file if specified
if c.CookieFile != "" {
cookieJar, err := cookiejar.NewFileJar(c.CookieFile, nil)
if err != nil {
logError.WithFields(c.genLogFields(err, nil, nil)).Fatal("unable to parse cookie file")
}
c.Client.Jar = cookieJar
}
// Fire up the desired amount of workers
for i := 0; i < c.Workers; i++ {
c.WorkerPool.Add()
go c.Worker()
}
// Start the process responsible for printing live stats on the standard output
if c.LiveStats {
go c.printLiveStats()
}
// If crawl HQ parameters are specified, then we start the background
// processes responsible for pulling and pushing seeds from and to HQ
if c.UseHQ {
c.HQClient, err = gocrawlhq.Init(c.HQKey, c.HQSecret, c.HQProject, c.HQAddress)
if err != nil {
logrus.Panic(err)
}
c.HQProducerChannel = make(chan *frontier.Item, c.Workers)
c.HQFinishedChannel = make(chan *frontier.Item, c.Workers)
c.HQChannelsWg.Add(2)
go c.HQConsumer()
go c.HQProducer()
go c.HQFinisher()
go c.HQWebsocket()
} else {
// Push the seed list to the queue
logrus.Info("Pushing seeds in the local queue..")
for _, item := range c.SeedList {
item := item
c.Frontier.PushChan <- &item
}
c.SeedList = nil
logrus.Info("All seeds are now in queue, crawling will start")
}
// Start the background process that will catch when there
// is nothing more to crawl
if !c.UseHQ {
c.catchFinish()
} else {
for {
time.Sleep(time.Second)
}
}
return
}