/
runner.go
114 lines (100 loc) · 2.61 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Copyright 2014, Hǎiliàng Wáng. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package getgo
import (
"net/http"
"sync"
)
// RetryNum is the retry number when failed to fetch a page.
var RetryNum = 3
// SequentialRunner is a simple single threaded task runner.
type SequentialRunner struct {
Client Doer
ErrorHandler
}
// Run implements the Run method of the Runner interface.
func (r SequentialRunner) Run(task Task) error {
req := task.Request()
resp, err := r.Client.Do(req)
if err != nil {
task.Handle(nil) // notify that the fetch has failed, ignore the error.
if err = r.HandleError(req, err); err != nil {
return err
}
return nil
}
defer resp.Body.Close()
if err := task.Handle(resp); err != nil {
if err = r.HandleError(req, err); err != nil {
return err
}
return nil
}
return nil
}
// Close implements the Close method of the Runner interface.
func (r SequentialRunner) Close() {
}
// ConcurrentRunner runs tasks concurrently.
type ConcurrentRunner struct {
seq SequentialRunner
ch chan Task
wg *sync.WaitGroup
}
// NewConcurrentRunner creates a concurrent runner.
func NewConcurrentRunner(workerNum int, client Doer, errHandler ErrorHandler) ConcurrentRunner {
r := ConcurrentRunner{SequentialRunner{RetryDoer{client, RetryNum}, errHandler}, make(chan Task), new(sync.WaitGroup)}
r.wg.Add(workerNum)
for i := 0; i < workerNum; i++ {
go r.work()
}
return r
}
// Run implements the Run method of the Runner interface.
func (r ConcurrentRunner) Run(task Task) error {
r.ch <- task
return nil
}
// Close implements the Close method of the Runner interface.
func (r ConcurrentRunner) Close() {
close(r.ch)
r.wg.Wait()
}
func (r ConcurrentRunner) work() {
defer r.wg.Done()
for task := range r.ch {
_ = r.seq.Run(task) // errors are ignored here, handled by error handler.
}
}
// RetryDoer wraps a Doer and implements the retry operation for Do method.
type RetryDoer struct {
Doer
RetryTime int
}
// Do implements the Doer interface.
func (d RetryDoer) Do(req *http.Request) (resp *http.Response, err error) {
for i := 0; i < d.RetryTime; i++ {
resp, err = d.Doer.Do(req)
if err == nil {
return resp, nil
}
}
return nil, err
}
// Run either HtmlTask, TextTask or Task. tx is commited if successful or
// rollbacked if failed.
func Run(runner Runner, tx Tx, tasks ...interface{}) error {
switch len(tasks) {
case 0:
return nil
case 1:
return runner.Run(ToTask(tasks[0], tx))
}
// more than 1 tasks
tg := NewTaskGroup(tx)
for _, task := range tasks {
addTask(task, tg)
}
return tg.Run(runner)
}