-
Notifications
You must be signed in to change notification settings - Fork 556
/
pool.go
131 lines (115 loc) · 2.55 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package pool
import (
"errors"
"sync"
)
var (
ErrClosed = errors.New("pool closed")
ErrMax = errors.New("reach max connection limit")
)
type request chan interface{}
type Config struct {
MaxIdle uint
MaxActive uint
}
// Pool stores object for reusing, such as redis connection
type Pool struct {
Config
factory func() (interface{}, error)
finalizer func(x interface{})
idles chan interface{}
waitingReqs []request
activeCount uint // increases during creating connection, decrease during destroying connection
mu sync.Mutex
closed bool
}
func New(factory func() (interface{}, error), finalizer func(x interface{}), cfg Config) *Pool {
return &Pool{
factory: factory,
finalizer: finalizer,
idles: make(chan interface{}, cfg.MaxIdle),
waitingReqs: make([]request, 0),
Config: cfg,
}
}
// getOnNoIdle try to create a new connection or waiting for connection being returned
// invoker should have pool.mu
func (pool *Pool) getOnNoIdle() (interface{}, error) {
if pool.activeCount >= pool.MaxActive {
// waiting for connection being returned
req := make(chan interface{}, 1)
pool.waitingReqs = append(pool.waitingReqs, req)
pool.mu.Unlock()
x, ok := <-req
if !ok {
return nil, ErrMax
}
return x, nil
}
// create a new connection
pool.activeCount++ // hold a place for new connection
pool.mu.Unlock()
x, err := pool.factory()
if err != nil {
// create failed return token
pool.mu.Lock()
pool.activeCount-- // release the holding place
pool.mu.Unlock()
return nil, err
}
return x, nil
}
func (pool *Pool) Get() (interface{}, error) {
pool.mu.Lock()
if pool.closed {
pool.mu.Unlock()
return nil, ErrClosed
}
select {
case item := <-pool.idles:
pool.mu.Unlock()
return item, nil
default:
// no pooled item, create one
return pool.getOnNoIdle()
}
}
func (pool *Pool) Put(x interface{}) {
pool.mu.Lock()
if pool.closed {
pool.mu.Unlock()
pool.finalizer(x)
return
}
if len(pool.waitingReqs) > 0 {
req := pool.waitingReqs[0]
copy(pool.waitingReqs, pool.waitingReqs[1:])
pool.waitingReqs = pool.waitingReqs[:len(pool.waitingReqs)-1]
req <- x
pool.mu.Unlock()
return
}
select {
case pool.idles <- x:
pool.mu.Unlock()
return
default:
// reach max idle, destroy redundant item
pool.mu.Unlock()
pool.activeCount--
pool.finalizer(x)
}
}
func (pool *Pool) Close() {
pool.mu.Lock()
if pool.closed {
pool.mu.Unlock()
return
}
pool.closed = true
close(pool.idles)
pool.mu.Unlock()
for x := range pool.idles {
pool.finalizer(x)
}
}