/
DbPool.go
153 lines (126 loc) · 3.13 KB
/
DbPool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package pool
import (
"database/sql"
"errors"
_ "github.com/go-sql-driver/mysql"
"sync"
)
// DbPool Connection Pool
type DbPool struct {
Url string
InitialSize int
ExpandSize int
MaxOpen int
numOpen int
MinOpen int
isInitialize bool
TestConn bool
PoolQueue DbPoolQueue
mutex sync.Mutex
}
// GetConn Get a connection
func (pool *DbPool) GetConn() (*Connection, error) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
// Verify connection pool configuration parameters and create initialized connections
err := pool.validAndInit()
if err != nil {
return nil, err
}
var conn *Connection
for i := 0; i < 3; i++ {
// Get a connection from the queue
conn = pool.PoolQueue.Poll()
// If there are no more connections in the queue, expand the connections and fetch them from the queue once more
if conn == nil {
err := pool.expansion()
if err != nil {
return nil, err
}
conn = pool.PoolQueue.Poll()
}
if conn != nil {
err := conn.DB.Ping()
if err == nil {
conn.Status = true
break
}
conn.DB.Close()
if pool.TestConn {
continue
}
}
}
// If idle connections are already <= minimum connections, expand connections
if pool.PoolQueue.Size <= pool.MinOpen {
pool.expansion()
}
return conn, nil
}
// Close Returning used connections to the connection pool
func (pool *DbPool) Close(conn *Connection) {
pool.mutex.Lock()
defer pool.mutex.Unlock()
conn.Status = false
pool.numOpen--
pool.PoolQueue.Add(conn)
}
// expansion Expanded Connectivity
func (pool *DbPool) expansion() error {
if pool.numOpen < pool.MaxOpen {
remaining := pool.MaxOpen - pool.numOpen
if remaining >= pool.ExpandSize {
return pool.createConn(pool.ExpandSize)
}
return pool.createConn(remaining)
}
return errors.New("the current number of active connections has exceeded MaxOpen")
}
// createConn Create the specified number of connections to the queue
func (pool *DbPool) createConn(size int) error {
if size <= 0 {
return errors.New("the number of expansions must be greater than 0")
}
for i := 0; i < size; i++ {
db, err := sql.Open("mysql", pool.Url)
if err != nil {
return err
}
conn := new(Connection)
conn.DB = db
conn.Pool = pool
conn.Status = true
pool.PoolQueue.Add(conn)
}
pool.numOpen += size
return nil
}
// validAndInit Verify connection pool configuration parameters and create initialized connections
func (pool *DbPool) validAndInit() error {
if pool.isInitialize == false {
if pool.Url == "" {
return errors.New("url must not be empty")
}
if pool.ExpandSize <= 0 {
return errors.New("ExpandSize must > 0")
}
if pool.InitialSize <= 0 {
return errors.New("InitialSize must > 0")
}
if pool.InitialSize < pool.MinOpen {
return errors.New("InitialSize cannot be smaller than MinOpen")
}
if pool.MaxOpen <= 0 {
return errors.New("MaxOpen must > 0")
}
if pool.MinOpen < 0 {
return errors.New("MaxOpen must >= 0")
}
if pool.MaxOpen < pool.MinOpen {
return errors.New("MinOpen must not be larger than MaxOpen")
}
pool.createConn(pool.InitialSize)
pool.isInitialize = true
}
return nil
}