forked from tsenart/nap
/
db.go
139 lines (117 loc) · 4.19 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package nap
import (
"database/sql"
"database/sql/driver"
"strings"
"sync/atomic"
)
// DB is a logical database with multiple underlying physical databases
// forming a single master multiple slaves topology.
// Reads and writes are automatically directed to the correct physical db.
type DB struct {
pdbs []*sql.DB // Physical databases
count uint64 // Monotonically incrementing counter on each query
}
// Open concurrently opens each underlying physical db.
// dataSourceNames must be a semi-comma separated list of DSNs with the first
// one being used as the master and the rest as slaves.
func Open(driverName, dataSourceNames string) (*DB, error) {
conns := strings.Split(dataSourceNames, ";")
db := &DB{pdbs: make([]*sql.DB, len(conns))}
err := scatter(len(db.pdbs), func(i int) (err error) {
db.pdbs[i], err = sql.Open(driverName, conns[i])
return err
})
if err != nil {
return nil, err
}
return db, nil
}
// Close closes all physical databases concurrently, releasing any open resources.
func (db *DB) Close() error {
return scatter(len(db.pdbs), func(i int) error {
return db.pdbs[i].Close()
})
}
// Driver returns the physical database's underlying driver.
func (db *DB) Driver() driver.Driver {
return db.pdbs[0].Driver()
}
// Begin starts a transaction on the master. The isolation level is dependent on the driver.
func (db *DB) Begin() (*sql.Tx, error) {
return db.pdbs[0].Begin()
}
// Exec executes a query without returning any rows.
// The args are for any placeholder parameters in the query.
// Exec uses the master as the underlying physical db.
func (db *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
return db.pdbs[0].Exec(query, args...)
}
// Ping verifies if a connection to each physical database is still alive,
// establishing a connection if necessary.
func (db *DB) Ping() error {
return scatter(len(db.pdbs), func(i int) error {
return db.pdbs[i].Ping()
})
}
// Prepare creates a prepared statement for later queries or executions
// on each physical database, concurrently.
func (db *DB) Prepare(query string) (Stmt, error) {
stmts := make([]*sql.Stmt, len(db.pdbs))
err := scatter(len(db.pdbs), func(i int) (err error) {
stmts[i], err = db.pdbs[i].Prepare(query)
return err
})
if err != nil {
return nil, err
}
return &stmt{db: db, stmts: stmts}, nil
}
// Query executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
// Query uses a slave as the physical db.
func (db *DB) Query(query string, args ...interface{}) (*sql.Rows, error) {
return db.pdbs[db.slave(len(db.pdbs))].Query(query, args...)
}
// QueryRow executes a query that is expected to return at most one row.
// QueryRow always return a non-nil value.
// Errors are deferred until Row's Scan method is called.
// QueryRow uses a slave as the physical db.
func (db *DB) QueryRow(query string, args ...interface{}) *sql.Row {
return db.pdbs[db.slave(len(db.pdbs))].QueryRow(query, args...)
}
// SetMaxIdleConns sets the maximum number of connections in the idle
// connection pool for each underlying physical db.
// If MaxOpenConns is greater than 0 but less than the new MaxIdleConns then the
// new MaxIdleConns will be reduced to match the MaxOpenConns limit
// If n <= 0, no idle connections are retained.
func (db *DB) SetMaxIdleConns(n int) {
for i := range db.pdbs {
db.pdbs[i].SetMaxIdleConns(n)
}
}
// SetMaxOpenConns sets the maximum number of open connections
// to each physical database.
// If MaxIdleConns is greater than 0 and the new MaxOpenConns
// is less than MaxIdleConns, then MaxIdleConns will be reduced to match
// the new MaxOpenConns limit. If n <= 0, then there is no limit on the number
// of open connections. The default is 0 (unlimited).
func (db *DB) SetMaxOpenConns(n int) {
for i := range db.pdbs {
db.pdbs[i].SetMaxOpenConns(n)
}
}
// Slave returns one of the physical databases which is a slave
func (db *DB) Slave() *sql.DB {
return db.pdbs[db.slave(len(db.pdbs))]
}
// Master returns the master physical database
func (db *DB) Master() *sql.DB {
return db.pdbs[0]
}
func (db *DB) slave(n int) int {
if n <= 1 {
return 0
}
return int(1 + (atomic.AddUint64(&db.count, 1) % uint64(n-1)))
}