-
Notifications
You must be signed in to change notification settings - Fork 23
/
connect.go
153 lines (122 loc) · 3.05 KB
/
connect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package sqlagent
import (
"encoding/json"
"sync"
"time"
"github.com/jmoiron/sqlx"
)
var (
MaxIdleConns = 10
MaxConnLifetime = 10 * time.Minute
)
// Record is a database row keyed by column name. This requires the columns to be
// uniquely named.
type Record map[string]interface{}
// Iterator provides a lazy access to the database rows.
type Iterator struct {
Cols []string
rows *sqlx.Rows
}
// Close closes the iterator.
func (i *Iterator) Close() {
i.rows.Close()
}
// Next returns true if another row is available.
func (i *Iterator) Next() bool {
return i.rows.Next()
}
// Scan takes a record and scans the values of a row into the record.
func (i *Iterator) Scan(r Record) error {
if err := i.rows.MapScan(r); err != nil {
return err
}
mapBytesToString(r)
return nil
}
func (i *Iterator) ScanRow(r []interface{}) error {
return i.rows.Scan(r...)
}
// Connect connects to a database given a driver name and set of connection parameters.
// Each database supports a different set of connection parameters, however the few
// that are common are standardized.
//
// - `host` - The database host.
// - `port` - The database port.
// - `user` - The username to authenticate with.
// - `password` - The password to authenticate with.
// - `database` - The database to connect to.
//
// Other known database-specific parameters will be appended to the connection string and the remaining will be ignored.
func Connect(driver string, params map[string]interface{}) (*sqlx.DB, error) {
// Select the driver.
driver, ok := Drivers[driver]
if !ok {
return nil, ErrUnknownDriver
}
// Connect to the database.
connector := connectors[driver]
params = cleanParams(params)
dsn, ok := params["dsn"].(string)
if !ok {
dsn = connector(params)
}
return sqlx.Connect(driver, dsn)
}
// Execute takes a database instance, SQL statement, and parameters and executes the query
// returning the resulting rows.
func Execute(db *sqlx.DB, sql string, params map[string]interface{}) (*Iterator, error) {
var (
err error
rows *sqlx.Rows
)
// Execute the query.
if params != nil && len(params) > 0 {
rows, err = db.NamedQuery(sql, params)
} else {
rows, err = db.Queryx(sql)
}
if err != nil {
return nil, err
}
cols, err := rows.Columns()
if err != nil {
return nil, err
}
return &Iterator{
Cols: cols,
rows: rows,
}, nil
}
var (
connMap = make(map[string]*sqlx.DB)
connMapMutex = &sync.Mutex{}
)
func PersistentConnect(driver string, params map[string]interface{}) (*sqlx.DB, error) {
var (
db *sqlx.DB
ok bool
err error
)
connKey, _ := json.Marshal(params)
key := driver + string(connKey)
connMapMutex.Lock()
defer connMapMutex.Unlock()
if db, ok = connMap[key]; !ok {
db, err = Connect(driver, params)
if err != nil {
return nil, err
}
db.SetMaxIdleConns(MaxIdleConns)
db.SetConnMaxLifetime(MaxConnLifetime)
connMap[key] = db
}
return db, nil
}
// Shutdown closes all persisted database connections.
func Shutdown() {
connMapMutex.Lock()
for _, db := range connMap {
db.Close()
}
connMapMutex.Unlock()
}