/
pool.go
106 lines (92 loc) · 2.08 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package types
import (
"fmt"
"time"
)
// ErrScheduleTimeout returned by Pool to indicate that there no free
// goroutines during some period of time.
var ErrScheduleTimeout = fmt.Errorf("pool: schedule error: timed out")
// ErrClosed notify object was stopped
var ErrClosed = fmt.Errorf("pool: closed")
// Pool implements logic of goroutine reuse.
type Pool interface {
Schedule(task func()) error
ScheduleTimeout(timeout time.Duration, task func()) error
Close() error
}
type pool struct {
quit chan struct{}
sem chan struct{}
work chan func()
}
// NewPool creates new goroutine pool with given size. It also creates a work
// queue of given size. Finally, it spawns given amount of goroutines
// immediately.
func NewPool(size, queue, spawn int) Pool {
if spawn <= 0 && queue > 0 {
panic("dead queue configuration detected")
}
if spawn > size {
panic("spawn > workers")
}
p := &pool{
quit: make(chan struct{}),
sem: make(chan struct{}, size),
work: make(chan func(), queue),
}
for i := 0; i < spawn; i++ {
p.sem <- struct{}{}
go p.worker(func() {})
}
return p
}
// Schedule schedules task to be executed over pool's workers.
func (p *pool) Schedule(task func()) error {
select {
case <-p.quit:
return ErrClosed
default:
}
return p.schedule(task, nil)
}
// ScheduleTimeout schedules task to be executed over pool's workers.
// It returns ErrScheduleTimeout when no free workers met during given timeout.
func (p *pool) ScheduleTimeout(timeout time.Duration, task func()) error {
select {
case <-p.quit:
return ErrClosed
default:
}
return p.schedule(task, time.After(timeout))
}
// Close shutdown pool
func (p *pool) Close() error {
select {
case <-p.quit:
default:
close(p.quit)
close(p.sem)
close(p.work)
}
return nil
}
func (p *pool) schedule(task func(), timeout <-chan time.Time) error {
select {
case <-timeout:
return ErrScheduleTimeout
case p.work <- task:
return nil
case p.sem <- struct{}{}:
go p.worker(task)
return nil
}
}
func (p *pool) worker(task func()) {
defer func() {
<-p.sem
}()
task()
for t := range p.work {
t()
}
}