-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.go
313 lines (267 loc) · 8.73 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/*
Package db defines a simple Statement interface for querying Oracle
databases, abstracting some boilerplate (beginning transactions, wraping bind
values in data structures, etc.).
Usage:
// Creating a database connection is straightforward.
db, err := New(&Config{
User: "user",
Pass: "pass",
Host: "host",
})
// You can also ping the connection, useful for building auto-reconnect
// functionality.
err := db.Ping()
// Begin a new transaction and return a Statement type. Statements use named
// query parameters.
stmt, err := db.Prepare("...")
// Bind values to the query. This wraps creating NamedArg data structures
// via sql.Named() calls.
stmt.Bind("foo", 1)
stmt.Bind("bar", "baz")
// When a result cursor is required, use Query() to execute the statement.
// This wraps the sql.Stmt.Query() call to compile the bound data (and any
// additional data passed in the exec call). Query generates a cursor that
// can be used to iterate through the results.
stmt.Query()
// To iterate through the cursor, a Next() method is provided. This wraps
// the sql.Rows.Next() and sql.Rows.Scan() methods into a single call. The
// input arguments are destination targets for the query results.
// See https://golang.org/pkg/database/sql/#Rows.Scan for details.
var foo int
var bar string
for stmt.Next(&foo, &bar) {
doThings(foo, bar)
}
// When a result cursor is not required, use Exec() to execute the statement.
// This wraps the sql.Stmt.Exec() call to compile the bound data (and any
// additional data passed in the exec call). Exec returns a sql.Result
// interface to summarize the executed SQL command.
result, err := stmt.Exec()
fmt.Println(result.RowsAffected())
// All statements are transactions, commit the transaction to save data
// changes and close the transaction.
err = stmt.Commit()
// Close the database connection. This returns an error if no connection
// exists.
err := db.Close()
*/
package db
import (
"context"
"database/sql"
"time"
"github.com/bdlm/errors/v2"
"github.com/bdlm/log/v2"
nr "github.com/newrelic/go-agent/v3/newrelic"
)
// DB defines an Oracle database connection structure.
type DB struct {
// Database configuration
Cfg *Config
// Database connection
Conn *sql.DB
Ctx context.Context
}
// New returns a new database connection instance.
//
// - Validate required configuration parameters.
// - Init config values as necessary.
// - Begin a NewRelic transaction if applicable.
// - Instrument the database driver.
// - Initialize the database client and connect.
// - Start a shutdown handler.
func New(cfg *Config) (*DB, error) {
// Validate required configuration parameters.
if nil == cfg.Connector && nil == cfg.Driver {
return nil, errors.New("a database connector or driver is required (*cfg.Connector, *cfg.Driver)")
}
if nil == cfg.Ctx {
return nil, errors.New("a context is required (*Config.Ctx)")
}
if "" == cfg.DatabaseName {
return nil, errors.New("a database name is required (*Config.DatabaseName)")
}
if "" == cfg.DriverName {
return nil, errors.New("a database driver name is required (*Config.DriverName)")
}
// Init config values as necessary.
if nil == cfg.Loc {
cfg.Loc = time.UTC
}
if nil == cfg.Params {
cfg.Params = map[string]string{}
}
if "" == cfg.DatabaseName {
cfg.DatabaseName = "UndefinedDatabaseName"
}
if nil == cfg.Ctx {
cfg.Ctx, cfg.Cancel = context.WithCancel(context.Background())
} else {
cfg.Ctx, cfg.Cancel = context.WithCancel(cfg.Ctx)
}
// Instrument the database driver.
if nil == cfg.Driver {
cfg.Driver = cfg.Connector.Driver()
}
if nil != cfg.NewRelic {
cfg.Driver = InstrumentSQLDriver(cfg)
cfg.DriverName = cfg.DriverName + "-" + cfg.DatabaseName
sql.Register(cfg.DriverName, cfg.Driver)
}
// Initialize the database client and connect.
db := &DB{
Cfg: cfg,
Ctx: cfg.Ctx,
}
err := db.Connect()
if nil != err {
return nil, errors.Wrap(err, "connect failed")
}
// Start a shutdown handler.
go func() {
<-cfg.Ctx.Done()
db.Close()
}()
return db, nil
}
// BeginTx is the constructor for Transaction instances.
//
// Transaction instances handle multiple statements and can be committed or
// rolled back. If a New Relic application has been provided, transaction
// metrics will be written there.
// https://golang.org/pkg/database/sql/#Conn.BeginTx
func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) {
return db.Conn.BeginTx(ctx, opts)
}
// Close closes the database, releasing any open resources. It is rare to
// Close a DB, as the DB handle is meant to be long-lived and shared
// between many goroutines.
// https://golang.org/pkg/database/sql/#DB.Close
func (db *DB) Close() error {
_ = db.Ping()
db.Cfg.Cancel()
return db.Conn.Close()
}
// Config returns the database configuration.
func (db *DB) Config() *Config {
return db.Cfg
}
// Connect opens a connection to the database with the provided credentials.
// If a database connection exists it will be disconnected before trying to
// reconnect.
func (db *DB) Connect() error {
if "" == db.Config().DriverName {
return errors.New("must provide a database driver name")
}
conn, err := sql.Open(db.Config().DriverName, db.Config().DSN())
if nil != err {
return errors.Wrap(err, "unable to open connection")
}
db.Conn = conn
return db.Ping()
}
// Exec implements database/sql.Exec
func (db *DB) Exec(query string, args ...interface{}) (sql.Result, error) {
return db.Conn.ExecContext(db.Ctx, query, args...)
}
// ExecContext implements database/sql.ExecContext
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
return db.Conn.ExecContext(ctx, query, args...)
}
// Ping verifies a connection to the database is still alive, establishing a
// connection if necessary.
func (db *DB) Ping() error {
if nil == db || nil == db.Conn {
return errors.New("no database connection")
}
return db.Conn.PingContext(db.Ctx)
}
// Prepare is the constructor for Statement instances.
//
// Statement instances handle all transaction logic.
func (db *DB) Prepare(query string) (*Statement, error) {
return db.PrepareContext(context.Background(), query)
}
// PrepareContext is the constructor for Statement instances.
//
// Statement instances handle all transaction logic.
func (db *DB) PrepareContext(ctx context.Context, query string) (*Statement, error) {
err := db.Ping()
if nil != err {
err = errors.Wrap(err, "ping failed")
err2 := db.Connect()
if nil != err2 {
return nil, errors.WrapE(err, err2)
}
}
var nrtxn *nr.Transaction
if nil != db.Config().NewRelic {
nrtxn = db.Config().NewRelic.StartTransaction(db.Config().DriverName)
ctx = nr.NewContext(ctx, nrtxn)
}
txn, err := db.BeginTx(ctx, nil)
if nil != err {
return nil, errors.Wrap(err, "unable to initialize database transaction")
}
stmt, err := txn.PrepareContext(ctx, query)
if nil != err {
return nil, errors.Wrap(err, "error preparing statement")
}
return &Statement{
make([]sql.NamedArg, 0),
ctx,
db,
nil,
nrtxn,
nil,
nil,
query,
stmt,
txn,
}, nil
}
// Query implements Tx.Query. Query executes a query that returns rows,
// typically a SELECT.
// https://golang.org/pkg/database/sql/#Tx.Query
func (db *DB) Query(query string, args ...interface{}) (*sql.Rows, error) {
return db.QueryContext(context.Background(), query, args...)
}
// QueryContext implemtnts Tx.Query. Query executes a query that returns rows,
// typically a SELECT.
// https://golang.org/pkg/database/sql/#Tx.Query
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
var nrtxn *nr.Transaction
if nil != db.Config().NewRelic {
nrtxn = db.Config().NewRelic.StartTransaction(db.Config().DriverName)
ctx = nr.NewContext(ctx, nrtxn)
}
tx, err := db.BeginTx(ctx, nil)
if nil != err {
return nil, errors.Wrap(err, "unable to initialize database transaction")
}
return tx.QueryContext(ctx, query, args...)
}
func (db *DB) QueryRow(query string, args ...interface{}) *sql.Row {
return db.QueryRowContext(context.Background(), query, args...)
}
// QueryContext implemtnts Tx.Query. QueryRow executes a query that returns row,
// typically a SELECT.
// https://golang.org/pkg/database/sql/#Tx.Query
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
var nrtxn *nr.Transaction
if nil != db.Config().NewRelic {
nrtxn = db.Config().NewRelic.StartTransaction(db.Config().DriverName)
ctx = nr.NewContext(ctx, nrtxn)
}
tx, err := db.BeginTx(ctx, nil)
if nil != err {
log.WithError(errors.Wrap(err, "unable to initialize database transaction")).Error("query failed")
return nil
}
return tx.QueryRowContext(ctx, query, args...)
}
var (
// RFC3339Milli is RFC3339 with miliseconds
RFC3339Milli = "2006-01-02T15:04:05.000Z07:00"
)