forked from andeya/pholcus
/
matrix.go
284 lines (250 loc) · 6.97 KB
/
matrix.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package scheduler
import (
"sort"
"sync"
"sync/atomic"
"time"
"github.com/henrylee2cn/pholcus/app/aid/history"
"github.com/henrylee2cn/pholcus/app/downloader/request"
"github.com/henrylee2cn/pholcus/logs"
"github.com/henrylee2cn/pholcus/runtime/cache"
"github.com/henrylee2cn/pholcus/runtime/status"
)
// 一个Spider实例的请求矩阵
type Matrix struct {
maxPage int64 // 最大采集页数,以负数形式表示
resCount int32 // 资源使用情况计数
spiderName string // 所属Spider
reqs map[int][]*request.Request // [优先级]队列,优先级默认为0
priorities []int // 优先级顺序,从低到高
history history.Historier // 历史记录
tempHistory map[string]bool // 临时记录 [reqUnique(url+method)]true
failures map[string]*request.Request // 历史及本次失败请求
tempHistoryLock sync.RWMutex
failureLock sync.Mutex
sync.Mutex
}
func newMatrix(spiderName, spiderSubName string, maxPage int64) *Matrix {
matrix := &Matrix{
spiderName: spiderName,
maxPage: maxPage,
reqs: make(map[int][]*request.Request),
priorities: []int{},
history: history.New(spiderName, spiderSubName),
tempHistory: make(map[string]bool),
failures: make(map[string]*request.Request),
}
if cache.Task.Mode != status.SERVER {
matrix.history.ReadSuccess(cache.Task.OutType, cache.Task.SuccessInherit)
matrix.history.ReadFailure(cache.Task.OutType, cache.Task.FailureInherit)
matrix.setFailures(matrix.history.PullFailure())
}
return matrix
}
// 添加请求到队列,并发安全
func (self *Matrix) Push(req *request.Request) {
// 禁止并发,降低请求积存量
self.Lock()
defer self.Unlock()
if sdl.checkStatus(status.STOP) {
return
}
// 达到请求上限,停止该规则运行
if self.maxPage >= 0 {
return
}
// 暂停状态时等待,降低请求积存量
waited := false
for sdl.checkStatus(status.PAUSE) {
waited = true
time.Sleep(time.Second)
}
if waited && sdl.checkStatus(status.STOP) {
return
}
// 资源使用过多时等待,降低请求积存量
waited = false
for atomic.LoadInt32(&self.resCount) > sdl.avgRes() {
waited = true
time.Sleep(100 * time.Millisecond)
}
if waited && sdl.checkStatus(status.STOP) {
return
}
// 不可重复下载的req
if !req.IsReloadable() {
// 已存在成功记录时退出
if self.hasHistory(req.Unique()) {
return
}
// 添加到临时记录
self.insertTempHistory(req.Unique())
}
var priority = req.GetPriority()
// 初始化该蜘蛛下该优先级队列
if _, found := self.reqs[priority]; !found {
self.priorities = append(self.priorities, priority)
sort.Ints(self.priorities) // 从小到大排序
self.reqs[priority] = []*request.Request{}
}
// 添加请求到队列
self.reqs[priority] = append(self.reqs[priority], req)
// 大致限制加入队列的请求量,并发情况下应该会比maxPage多
atomic.AddInt64(&self.maxPage, 1)
}
// 从队列取出请求,不存在时返回nil,并发安全
func (self *Matrix) Pull() (req *request.Request) {
self.Lock()
defer self.Unlock()
if !sdl.checkStatus(status.RUN) {
return
}
// 按优先级从高到低取出请求
for i := len(self.reqs) - 1; i >= 0; i-- {
idx := self.priorities[i]
if len(self.reqs[idx]) > 0 {
req = self.reqs[idx][0]
self.reqs[idx] = self.reqs[idx][1:]
if sdl.useProxy {
req.SetProxy(sdl.proxy.GetOne(req.GetUrl()))
} else {
req.SetProxy("")
}
return
}
}
return
}
func (self *Matrix) Use() {
defer func() {
recover()
}()
sdl.count <- true
atomic.AddInt32(&self.resCount, 1)
}
func (self *Matrix) Free() {
<-sdl.count
atomic.AddInt32(&self.resCount, -1)
}
// 返回是否作为新的失败请求被添加至队列尾部
func (self *Matrix) DoHistory(req *request.Request, ok bool) bool {
if !req.IsReloadable() {
self.tempHistoryLock.Lock()
delete(self.tempHistory, req.Unique())
self.tempHistoryLock.Unlock()
if ok {
self.history.UpsertSuccess(req.Unique())
return false
}
}
if ok {
return false
}
self.failureLock.Lock()
defer self.failureLock.Unlock()
if _, ok := self.failures[req.Unique()]; !ok {
// 首次失败时,在任务队列末尾重新执行一次
self.failures[req.Unique()] = req
logs.Log.Informational(" * + 失败请求: [%v]\n", req.GetUrl())
return true
}
// 失败两次后,加入历史失败记录
self.history.UpsertFailure(req)
return false
}
func (self *Matrix) CanStop() bool {
if sdl.checkStatus(status.STOP) {
return true
}
if self.maxPage >= 0 {
return true
}
if atomic.LoadInt32(&self.resCount) != 0 {
return false
}
if self.Len() > 0 {
return false
}
self.failureLock.Lock()
defer self.failureLock.Unlock()
if len(self.failures) > 0 {
// 重新下载历史记录中失败的请求
var goon bool
for reqUnique, req := range self.failures {
if req == nil {
continue
}
self.failures[reqUnique] = nil
goon = true
logs.Log.Informational(" * - 失败请求: [%v]\n", req.GetUrl())
self.Push(req)
}
if goon {
return false
}
}
return true
}
// 非服务器模式下保存历史成功记录
func (self *Matrix) TryFlushSuccess() {
if cache.Task.Mode != status.SERVER && cache.Task.SuccessInherit {
self.history.FlushSuccess(cache.Task.OutType)
}
}
// 非服务器模式下保存历史失败记录
func (self *Matrix) TryFlushFailure() {
if cache.Task.Mode != status.SERVER && cache.Task.FailureInherit {
self.history.FlushFailure(cache.Task.OutType)
}
}
// 等待处理中的请求完成
func (self *Matrix) Wait() {
if sdl.checkStatus(status.STOP) {
// println("Wait$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
// 主动终止任务时,不等待运行中任务自然结束
return
}
for atomic.LoadInt32(&self.resCount) != 0 {
time.Sleep(500 * time.Millisecond)
}
}
func (self *Matrix) Len() int {
self.Lock()
defer self.Unlock()
var l int
for _, reqs := range self.reqs {
l += len(reqs)
}
return l
}
func (self *Matrix) hasHistory(reqUnique string) bool {
if self.history.HasSuccess(reqUnique) {
return true
}
self.tempHistoryLock.RLock()
has := self.tempHistory[reqUnique]
self.tempHistoryLock.RUnlock()
return has
}
func (self *Matrix) insertTempHistory(reqUnique string) {
self.tempHistoryLock.Lock()
self.tempHistory[reqUnique] = true
self.tempHistoryLock.Unlock()
}
func (self *Matrix) setFailures(reqs map[string]*request.Request) {
self.failureLock.Lock()
defer self.failureLock.Unlock()
for key, req := range reqs {
self.failures[key] = req
logs.Log.Informational(" * + 失败请求: [%v]\n", req.GetUrl())
}
}
// // 主动终止任务时,进行收尾工作
// func (self *Matrix) windup() {
// self.Lock()
// self.reqs = make(map[int][]*request.Request)
// self.priorities = []int{}
// self.tempHistory = make(map[string]bool)
// self.failures = make(map[string]*request.Request)
// self.Unlock()
// }