-
Notifications
You must be signed in to change notification settings - Fork 11
/
finish.go
105 lines (86 loc) · 3.07 KB
/
finish.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package crawl
import (
"os"
"os/signal"
"path"
"syscall"
"time"
"github.com/sirupsen/logrus"
)
// catchFinish is running in the background and detect when the crawl need to be terminated
// because it won't crawl anything more. This doesn't apply for Kafka-powered crawls.
func (crawl *Crawl) catchFinish() {
for crawl.CrawledSeeds.Value()+crawl.CrawledAssets.Value() <= 0 {
time.Sleep(1 * time.Second)
}
for {
time.Sleep(time.Second * 5)
if crawl.ActiveWorkers.Value() == 0 && crawl.Frontier.QueueCount.Value() == 0 && !crawl.Finished.Get() && (crawl.CrawledSeeds.Value()+crawl.CrawledAssets.Value() > 0) {
logrus.Warning("No additional URL to archive, finishing")
crawl.finish()
os.Exit(0)
}
}
}
func (crawl *Crawl) finish() {
crawl.Finished.Set(true)
// First we wait for the queue reader to finish its current work,
// and stop it, when it's stopped it won't dispatch any additional work
// so we can safely close the channel it is using, and wait for all the
// workers to notice the channel is closed, and terminate.
crawl.Frontier.FinishingQueueReader.Set(true)
for crawl.Frontier.IsQueueReaderActive.Get() {
time.Sleep(time.Second / 2)
}
close(crawl.Frontier.PullChan)
logrus.Warning("[WORKERS] Waiting for workers to finish")
crawl.WorkerPool.Wait()
logrus.Warning("[WORKERS] All workers finished")
// When all workers are finished, we can safely close the HQ related channels
if crawl.UseHQ {
logrus.Warning("[HQ] Waiting for finished channel to be closed")
close(crawl.HQFinishedChannel)
logrus.Warning("[HQ] Finished channel closed")
logrus.Warning("[HQ] Waiting for producer to finish")
close(crawl.HQProducerChannel)
logrus.Warning("[HQ] Producer finished")
logrus.Warning("[HQ] Waiting for all functions to return")
crawl.HQChannelsWg.Wait()
logrus.Warning("[HQ] All functions returned")
}
// Once all workers are done, it means nothing more is actively send to
// the PushChan channel, we ask for the queue writer to terminate, and when
// it's done we close the channel safely.
close(crawl.Frontier.PushChan)
crawl.Frontier.FinishingQueueWriter.Set(true)
for crawl.Frontier.IsQueueWriterActive.Get() {
time.Sleep(time.Second / 2)
}
logrus.Warning("[WARC] Closing writer(s)..")
crawl.Client.Close()
if crawl.Proxy != "" {
crawl.ClientProxied.Close()
}
logrus.Warning("[WARC] Writer(s) closed")
// Closing the local queue used by the frontier
crawl.Frontier.Queue.Close()
logrus.Warning("[FRONTIER] Queue closed")
// Closing the seencheck database
if crawl.Seencheck {
crawl.Frontier.Seencheck.SeenDB.Close()
logrus.Warning("[SEENCHECK] Database closed")
}
// Dumping hosts pool and frontier stats to disk
logrus.Warning("[FRONTIER] Dumping hosts pool and frontier stats to " + path.Join(crawl.Frontier.JobPath, "frontier.gob"))
crawl.Frontier.Save()
logrus.Warning("Finished!")
}
func (crawl *Crawl) setupCloseHandler() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
logrus.Warning("CTRL+C catched.. cleaning up and exiting.")
signal.Stop(c)
crawl.finish()
os.Exit(0)
}