forked from TarsCloud/TarsGo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gpool.go
93 lines (80 loc) · 1.57 KB
/
gpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package gpool
//Worker goroutine struct.
type Worker struct {
WorkerQueue chan *Worker
JobChannel chan Job
Stop chan struct{}
}
//Start start gotoutine pool.
func (w *Worker) Start() {
go func() {
var job Job
for {
w.WorkerQueue <- w
select {
case job = <-w.JobChannel:
job()
case <-w.Stop:
w.Stop <- struct{}{}
return
}
}
}()
}
func newWorker(pool chan *Worker) *Worker {
return &Worker{
WorkerQueue: pool,
JobChannel: make(chan Job),
Stop: make(chan struct{}),
}
}
//Job is a function for doing jobs.
type Job func()
//Pool is goroutine pool config.
type Pool struct {
JobQueue chan Job
WorkerQueue chan *Worker
stop chan struct{}
}
//NewPool news gotouine pool
func NewPool(numWorkers int, jobQueueLen int) *Pool {
jobQueue := make(chan Job, jobQueueLen)
workerQueue := make(chan *Worker, numWorkers)
pool := &Pool{
JobQueue: jobQueue,
WorkerQueue: workerQueue,
stop: make(chan struct{}),
}
pool.Start()
return pool
}
//Start starts all workers
func (p *Pool) Start() {
for i := 0; i < cap(p.WorkerQueue); i++ {
worker := newWorker(p.WorkerQueue)
worker.Start()
}
go p.dispatch()
}
func (p *Pool) dispatch() {
for {
select {
case job := <-p.JobQueue:
worker := <-p.WorkerQueue
worker.JobChannel <- job
case <-p.stop:
for i := 0; i < cap(p.WorkerQueue); i++ {
worker := <-p.WorkerQueue
worker.Stop <- struct{}{}
<-worker.Stop
}
p.stop <- struct{}{}
return
}
}
}
//Release release all workers
func (p *Pool) Release() {
p.stop <- struct{}{}
<-p.stop
}