This repository has been archived by the owner on Jan 8, 2019. It is now read-only.
/
pool.go
124 lines (101 loc) · 2.17 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package routinepool
import (
"sync"
"github.com/cosiner/gohper/sync2"
)
type Job interface{}
type Pool struct {
processor func(Job)
maxIdle uint64
maxActive uint64
lock sync.RWMutex
jobs chan Job
numIdle uint64
numActive uint64
closeCond *sync2.LockCond
}
// New create a pool with fix number of goroutine, if maxActive is 0, there is no
// limit of goroutine number
func New(processor func(Job), jobBufsize int, maxIdle, maxActive uint64) *Pool {
p := &Pool{
processor: processor,
jobs: make(chan Job, jobBufsize),
maxIdle: maxIdle,
maxActive: maxActive,
}
return p
}
// Info return current infomation about idle and activing goroutine number of the pool
func (p *Pool) Info() (numIdle, numActive uint64) {
p.lock.RLock()
numIdle = p.numIdle
numActive = p.numActive
p.lock.RUnlock()
return
}
// Do process a job. If there is no goroutine available and goroutine number already
// reach the limitation, it will blocked untile a goroutine is free. Otherwise
// create a new goroutine. Return false only if pool already closed
func (p *Pool) Do(job Job) bool {
p.lock.RLock()
closeCond := p.closeCond
numIdle := p.numIdle
numActive := p.numActive
p.lock.RUnlock()
if closeCond != nil {
return false
}
if numIdle == 0 && (p.maxActive == 0 || numActive < p.maxActive) {
p.lock.Lock()
p.numActive++
p.numIdle++
p.lock.Unlock()
go p.routine()
}
p.jobs <- job
return true
}
func (p *Pool) routine() {
for {
job, ok := <-p.jobs
if !ok {
return
}
p.lock.Lock()
p.numIdle--
p.lock.Unlock()
p.processor(job)
p.lock.Lock()
closeCond := p.closeCond
jobs := len(p.jobs)
if jobs == 0 && closeCond != nil {
closeCond.Signal()
p.numActive--
p.lock.Unlock()
return
}
if p.numIdle+1 > p.maxIdle {
p.numActive--
p.lock.Unlock()
return
}
p.numIdle++
p.lock.Unlock()
}
}
// Close stop receive new job, and waiting for all exists jobs to be processed
func (p *Pool) Close() {
p.lock.Lock()
if p.closeCond != nil {
p.lock.Unlock()
return
}
p.closeCond = sync2.NewLockCond(nil)
if len(p.jobs) != 0 {
p.lock.Unlock()
p.closeCond.Wait()
} else {
p.lock.Unlock()
}
close(p.jobs)
}