forked from andeya/pholcus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
crawler.go
216 lines (186 loc) · 4.77 KB
/
crawler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package crawler
import (
"bytes"
"math/rand"
"runtime"
"time"
"fmt"
"github.com/hegeng1212/pholcus/app/downloader"
"github.com/hegeng1212/pholcus/app/downloader/request"
"github.com/hegeng1212/pholcus/app/pipeline"
"github.com/hegeng1212/pholcus/app/spider"
"github.com/hegeng1212/pholcus/logs"
"github.com/hegeng1212/pholcus/runtime/cache"
)
// 采集引擎
type (
Crawler interface {
Init(*spider.Spider) Crawler //初始化采集引擎
Run() //运行任务
Stop() //主动终止
CanStop() bool //能否终止
GetId() int //获取引擎ID
}
crawler struct {
*spider.Spider //执行的采集规则
downloader.Downloader //全局公用的下载器
pipeline.Pipeline //结果收集与输出管道
id int //引擎ID
pause [2]int64 //[请求间隔的最短时长,请求间隔的增幅时长]
}
)
func New(id int) Crawler {
return &crawler{
id: id,
Downloader: downloader.SurferDownloader,
}
}
func (self *crawler) Init(sp *spider.Spider) Crawler {
self.Spider = sp.ReqmatrixInit()
self.Pipeline = pipeline.New(sp)
self.pause[0] = sp.Pausetime / 2
if self.pause[0] > 0 {
self.pause[1] = self.pause[0] * 3
} else {
self.pause[1] = 1
}
return self
}
// 任务执行入口
func (self *crawler) Run() {
// 预先启动数据收集/输出管道
self.Pipeline.Start()
// 运行处理协程
c := make(chan bool)
go func() {
self.run()
close(c)
}()
// 启动任务
self.Spider.Start()
<-c // 等待处理协程退出
// 停止数据收集/输出管道
self.Pipeline.Stop()
}
// 主动终止
func (self *crawler) Stop() {
// 主动崩溃爬虫运行协程
self.Spider.Stop()
self.Pipeline.Stop()
}
func (self *crawler) run() {
for {
// 队列中取出一条请求并处理
req := self.GetOne()
if req == nil {
// 停止任务
if self.Spider.CanStop() {
break
}
time.Sleep(20 * time.Millisecond)
continue
}
// 执行请求
self.UseOne()
go func() {
defer func() {
self.FreeOne()
}()
logs.Log.Debug(" * Start: %v", req.GetUrl())
self.Process(req)
}()
// 随机等待
self.sleep()
}
// 等待处理中的任务完成
self.Spider.Defer()
}
// core processer
func (self *crawler) Process(req *request.Request) {
var (
downUrl = req.GetUrl()
sp = self.Spider
)
defer func() {
if p := recover(); p != nil {
if sp.IsStopping() {
// println("Process$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
return
}
// 返回是否作为新的失败请求被添加至队列尾部
if sp.DoHistory(req, false) {
// 统计失败数
cache.PageFailCount()
}
// 提示错误
stack := make([]byte, 4<<10) //4KB
length := runtime.Stack(stack, true)
start := bytes.Index(stack, []byte("/src/runtime/panic.go"))
stack = stack[start:length]
start = bytes.Index(stack, []byte("\n")) + 1
stack = stack[start:]
if end := bytes.Index(stack, []byte("\ngoroutine ")); end != -1 {
stack = stack[:end]
}
stack = bytes.Replace(stack, []byte("\n"), []byte("\r\n"), -1)
logs.Log.Error(" * Panic [process][%s]: %s\r\n[TRACE]\r\n%s", downUrl, p, stack)
}
}()
fmt.Println(fmt.Printf("Process req %#v", req))
var ctx = self.Downloader.Download(sp, req) // download page
if err := ctx.GetError(); err != nil {
// 返回是否作为新的失败请求被添加至队列尾部
if sp.DoHistory(req, false) {
// 统计失败数
cache.PageFailCount()
}
// 提示错误
logs.Log.Error(" * Fail [download][%v]: %v\n", downUrl, err)
return
}
// 过程处理,提炼数据
ctx.Parse(req.GetRuleName())
// 该条请求文件结果存入pipeline
for _, f := range ctx.PullFiles() {
if self.Pipeline.CollectFile(f) != nil {
break
}
}
// 该条请求文本结果存入pipeline
for _, item := range ctx.PullItems() {
if self.Pipeline.CollectData(item) != nil {
break
}
}
// 处理成功请求记录
sp.DoHistory(req, true)
// 统计成功页数
cache.PageSuccCount()
// 提示抓取成功
logs.Log.Informational(" * Success: %v\n", downUrl)
// 释放ctx准备复用
spider.PutContext(ctx)
}
// 常用基础方法
func (self *crawler) sleep() {
sleeptime := self.pause[0] + rand.Int63n(self.pause[1])
time.Sleep(time.Duration(sleeptime) * time.Millisecond)
}
// 从调度读取一个请求
func (self *crawler) GetOne() *request.Request {
return self.Spider.RequestPull()
}
//从调度使用一个资源空位
func (self *crawler) UseOne() {
self.Spider.RequestUse()
}
//从调度释放一个资源空位
func (self *crawler) FreeOne() {
self.Spider.RequestFree()
}
func (self *crawler) SetId(id int) {
self.id = id
}
func (self *crawler) GetId() int {
return self.id
}