/
worker.go
67 lines (61 loc) · 1.27 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Package workpool
package workpool
import (
"log"
"runtime/debug"
"time"
)
type worker struct {
WorkID string
jobData chan *JobBag
quit chan struct{}
executeIntervalMS int64
safeFunc func(job *JobBag)
}
func newWork(workID string, executeIntervalMS int64) *worker {
w := new(worker)
w.WorkID = workID
w.quit = make(chan struct{})
w.jobData = make(chan *JobBag)
w.executeIntervalMS = executeIntervalMS
w.safeFunc = func(job *JobBag) {
defer func() {
if p := recover(); p != nil {
log.Printf("worker: %s, error is %v.\n%s", w.WorkID, p, string(debug.Stack()))
}
}()
if job.JobFunc != nil {
job.JobFunc(job.Params...)
} else {
log.Printf("worker: %s, execute taskfunc found some error, msg is taskfunc is nil.\n", w.WorkID)
}
}
return w
}
func (w *worker) startWorker(wp *data) {
go func() {
defer func() {
_ = recover()
}()
for {
wp.WorkerQueue <- w
wp.sendFinishNotify()
select {
case job, ok := <-w.jobData:
if !ok {
return
}
w.safeFunc(job)
case <-w.quit:
return
}
if w.executeIntervalMS > 0 {
time.Sleep(time.Duration(w.executeIntervalMS) * time.Millisecond)
}
}
}()
}
func (w *worker) stopWorker() {
w.quit <- struct{}{}
close(w.jobData)
}