-
Notifications
You must be signed in to change notification settings - Fork 16
/
goroutine.go
187 lines (157 loc) · 3.76 KB
/
goroutine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package pool
import (
"log"
"sync"
"sync/atomic"
"time"
)
type WorkerPool interface {
Schedule(task func())
Stop()
}
type GoroutinePool struct {
options Options
// capacity of the pool.
capacity int32
// running is the number of the currently running goroutines.
running int32
// freeSignal is used to notice pool there are available
// workers which can be sent to work.
freeSignal chan struct{}
// workers is a slice that store the available workers.
workers []*Worker
// stopped is used to check pool running status
stopped int32
// lock for synchronous operation
lock sync.Mutex
once sync.Once
done chan struct{}
}
type Options struct {
Max int32 // maximum number of workers
Idle int // idle number of workers
Timeout time.Duration // quit time for worker
}
type Option func(options *Options)
func NewGoroutinePool(opts ...Option) *GoroutinePool {
defaultOptions := Options{Max: 100, Idle: 10, Timeout: time.Second * 10}
for _, setter := range opts {
setter(&defaultOptions)
}
pool := &GoroutinePool{
options: defaultOptions,
capacity: defaultOptions.Max,
running: 0,
freeSignal: make(chan struct{}, defaultOptions.Max),
workers: make([]*Worker, 0, 20),
done: make(chan struct{}),
}
// 提前准备空闲协程池
pool.grow(defaultOptions.Idle)
pool.monitor()
return pool
}
func (pool *GoroutinePool) monitor() {
go func() {
for {
select {
case <-pool.done:
return
default:
// 定时缩容
pool.scaleDown()
}
}
}()
}
// grow 自动扩容worker数量
func (p *GoroutinePool) grow(n int) {
for i := 0; i < n; i++ {
// create instance of Worker
w := NewWorker(p, 10)
w.run()
// push into the slice
p.workers = append(p.workers, w)
p.running++
p.freeSignal <- struct{}{}
}
}
type f func()
// Stop 停止协程池
func (p *GoroutinePool) Stop() {
p.once.Do(func() {
atomic.StoreInt32(&p.stopped, 1)
close(p.done)
p.lock.Lock()
for _, worker := range p.workers {
worker.stop()
}
p.lock.Unlock()
})
}
// Schedule 执行任务
func (p *GoroutinePool) Schedule(task func()) {
// 判断pool是否已关闭
if atomic.LoadInt32(&p.stopped) == 1 {
return
}
// 调度一个worker来执行任务
p.acquireWorker().submit(task)
}
// acquireWorker 获取一个worker实例
func (p *GoroutinePool) acquireWorker() (w *Worker) {
p.lock.Lock()
defer p.lock.Unlock()
// 查看当前可用worker
available := len(p.workers)
if p.running < p.capacity {
if available == 0 {
p.grow(p.options.Idle)
}
<-p.freeSignal
w = p.popLastWorker()
return
}
// 当可用worker数为0且协程数达到上限时,
// 因为此时已被lock住,且无法通过releaseWorker释放,所以会导致死锁
// 所以这种情况下必须先释放锁
p.lock.Unlock()
<-p.freeSignal
p.lock.Lock()
w = p.popLastWorker()
return w
}
// popLastWorker 获取空闲队列里最尾部的一个worker
func (p *GoroutinePool) popLastWorker() (w *Worker) {
// 取数组最后一个worker
n := len(p.workers) - 1
w = p.workers[n]
p.workers[n] = nil
p.workers = p.workers[:n]
return
}
// releaseWorker puts a worker back into free pool, recycling the goroutines.
func (p *GoroutinePool) releaseWorker(worker *Worker) {
p.lock.Lock()
p.workers = append(p.workers, worker)
p.lock.Unlock()
p.freeSignal <- struct{}{}
}
// scaleDown 缩容
func (p *GoroutinePool) scaleDown() {
// 根据时间控制每隔一段时间按策略缩容
time.Sleep(p.options.Timeout)
p.lock.Lock()
defer p.lock.Unlock()
// 低于空闲数量则不缩容
available := len(p.workers)
if available <= p.options.Idle {
return
}
num := (available - p.options.Idle) / 4
log.Println("scale down: ", num)
for i := 0; i < num; i++ {
p.workers[i].stop()
}
p.workers = p.workers[num:]
}