/
database.go
280 lines (224 loc) · 6.36 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
package rwdb
import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"time"
)
// DB holds connection pool(s)
// This should be created once, and for each acquaring of
// new db pool, use New()
type DB struct {
cpool *CPool
sticky bool // sticky redirects subsequent queries after a write to Writer DB, this is default to true
modified bool
maxIdle int
maxOpen int
maxLifetime time.Duration
maxIdleModified bool
}
func walk(cpool *CPool, fn func(conn *sql.DB) error) error {
errors := make(chan error, 1)
cpool.lock.Lock()
defer cpool.lock.Unlock()
for _, conn := range cpool.pool {
if conn == nil {
continue
}
go func(conn *sql.DB) {
errors <- fn(conn)
}(conn)
}
// var e error
if err := <-errors; err != nil {
return err
}
return nil
}
// Open creates the DB instance
// The opening of each underline connection is non-blocking
func Open(driver string, dataSourceNames ...string) (*DB, error) {
var db = &DB{cpool: &CPool{}}
db.SetSticky(true)
if len(dataSourceNames) == 0 {
return nil, errors.New("no data source name available")
}
d, err := sql.Open(driver, dataSourceNames[0])
if err != nil {
// writer failed to open
// this is fatal
return nil, err
}
db.cpool.AddWriter(d)
for _, conn := range dataSourceNames[1:] {
go func(conn string) {
d, _ := sql.Open(driver, conn)
if db.maxIdleModified {
// we don't want set this to 0 blindly
// once 0 is used, it can't be 0 again
d.SetMaxIdleConns(db.maxIdle)
}
d.SetConnMaxLifetime(db.maxLifetime)
d.SetMaxOpenConns(db.maxOpen)
db.cpool.AddReader(d)
}(conn)
}
return db, nil
}
func (db *DB) next() (*sql.DB, error) {
if db.sticky && db.modified {
return db.cpool.Writer()
}
return db.cpool.Reader()
}
// SetSticky allows sticky be turned on and off
func (db *DB) SetSticky(stick bool) {
db.sticky = stick
}
// New creates a new DB with the same
// sticky and Connection pool, but reset modified
func (db *DB) New() *DB {
return &DB{
cpool: db.cpool,
sticky: db.sticky,
}
}
// Driver returns the driver of the DB
// The Writer's driver represents all readers
func (db *DB) Driver() driver.Driver {
writer, _ := db.cpool.Writer()
return writer.Driver()
}
// Begin starts a transaction on Writer
// It's likely the subsequent queries will perform a write
func (db *DB) Begin() (*sql.Tx, error) {
writer, err := db.cpool.Writer()
if err != nil {
return nil, err
}
return writer.Begin()
}
// Exec writes to Writer and mark db as modified
func (db *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
return db.ExecContext(context.Background(), query, args...)
}
// ExecContext execute a query with context
// and mark the db as modified
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
writer, err := db.cpool.Writer()
if err != nil {
return nil, err
}
result, err := writer.ExecContext(ctx, query, args...)
if err == nil {
if rowAffected, _ := result.RowsAffected(); rowAffected > 0 {
db.modified = true
}
}
return result, err
}
// Ping execute a ping context with a background context
func (db *DB) Ping() error {
return db.PingContext(context.Background())
}
// PingContext pings all physical dbs with context
func (db *DB) PingContext(ctx context.Context) error {
return walk(db.cpool, func(conn *sql.DB) error {
return conn.PingContext(ctx)
})
}
// Prepare prepare stateuments with a background context
func (db *DB) Prepare(query string) (Stmt, error) {
return db.PrepareContext(context.Background(), query)
}
// PrepareContext two statements, one in Writer one in Reader
// The statement will be executed in the writer
// and queries in reader
func (db *DB) PrepareContext(ctx context.Context, query string) (Stmt, error) {
stmt := &stmt{}
writer, err := db.cpool.Writer()
if err != nil {
return nil, err
}
write, err := writer.Prepare(query)
if err != nil {
return nil, err
}
stmt.stmts = []*sql.Stmt{write}
if db.cpool.poolSize() > 1 {
reader, err := db.cpool.Reader()
if err == nil {
// we have writer statement prepared
// this error can be ignored
go func(reader *sql.DB) {
read, _ := reader.PrepareContext(ctx, query)
stmt.lock.Lock()
defer stmt.lock.Unlock()
stmt.stmts = append(stmt.stmts, read)
}(reader)
}
}
return stmt, nil
}
// Close closes all physical db connections
func (db *DB) Close() error {
return walk(db.cpool, func(conn *sql.DB) error {
return conn.Close()
})
}
// Query perform a query context with background context
func (db *DB) Query(query string, args ...interface{}) (*sql.Rows, error) {
return db.QueryContext(context.Background(), query, args...)
}
// QueryContext executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
// The query will be performed in the next connection
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
reader, err := db.next()
if err != nil {
return nil, err
}
return reader.QueryContext(ctx, query, args...)
}
// QueryRow runs the QueryRowContext with a background context
func (db *DB) QueryRow(query string, args ...interface{}) Row {
return db.QueryRowContext(context.Background(), query, args)
}
// QueryRowContext perform the underline QueryRowContext of sql.DB
// on the next connection
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) Row {
reader, err := db.next()
if err != nil {
return &row{err: err}
}
return reader.QueryRowContext(ctx, query, args...)
}
// SetMaxIdleConns sets the max idel conns
// for all connections. This is concurrency safe.
func (db *DB) SetMaxIdleConns(n int) {
db.maxIdle = n
db.maxIdleModified = true
walk(db.cpool, func(conn *sql.DB) error {
conn.SetMaxIdleConns(n)
return nil
})
}
// SetMaxOpenConns sets the max open connections limit
// for all connections. This is concurrency safe.
func (db *DB) SetMaxOpenConns(n int) {
db.maxOpen = n
walk(db.cpool, func(conn *sql.DB) error {
conn.SetMaxOpenConns(n)
return nil
})
}
// SetConnMaxLifetime sets the maximum amount of time a connection may be reused
// for all connections. This is concurrency safe.
func (db *DB) SetConnMaxLifetime(d time.Duration) {
db.maxLifetime = d
walk(db.cpool, func(conn *sql.DB) error {
conn.SetConnMaxLifetime(d)
return nil
})
}