/
rdb.go
465 lines (366 loc) · 10.9 KB
/
rdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
package db
import (
"database/sql"
"fmt"
or "github.com/Cepave/open-falcon-backend/common/runtime"
"github.com/Cepave/open-falcon-backend/common/utils"
)
type TxFinale byte
const (
TxCommit TxFinale = 1
TxRollback TxFinale = 2
)
// Configuration of database
type DbConfig struct {
Dsn string
MaxIdle int
}
func (config *DbConfig) String() string {
return fmt.Sprintf("DSN: [%s]. Max Idle: [%d]", config.Dsn, config.MaxIdle)
}
// The main functions of this file is to gives IoC(Inverse of Control) of database(RDB) objects.
//
// For exception handling, all callback method should use panic() or log.Panicf() to release the error object.
//
// You may use PanicIfError to ease your process of Error object.
// Main controller of database
type DbController struct {
dbObject *sql.DB
panicHandlers []utils.PanicHandler
}
// The interface of DB callback for sql package
type DbCallback interface {
OnDb(db *sql.DB)
}
// The function object delegates the DbCallback interface
type DbCallbackFunc func(*sql.DB)
func (f DbCallbackFunc) OnDb(db *sql.DB) {
f(db)
}
// The interface of rows callback for sql package
type RowsCallback interface {
NextRow(row *sql.Rows) IterateControl
}
// The function object delegates the RowsCallback interface
type RowsCallbackFunc func(*sql.Rows) IterateControl
func (callbackFunc RowsCallbackFunc) NextRow(rows *sql.Rows) IterateControl {
return callbackFunc(rows)
}
// The interface of row callback for sql package
type RowCallback interface {
ResultRow(row *sql.Row)
}
// The function object delegates the RowCallback interface
type RowCallbackFunc func(*sql.Row)
func (callbackFunc RowCallbackFunc) ResultRow(row *sql.Row) {
callbackFunc(row)
}
// The interface of transaction callback for sql package
type TxCallback interface {
InTx(tx *sql.Tx) TxFinale
}
// The function object delegates the TxCallback interface
type TxCallbackFunc func(*sql.Tx) TxFinale
func (callbackFunc TxCallbackFunc) InTx(tx *sql.Tx) TxFinale {
return callbackFunc(tx)
}
// BuildTxForSqls builds function for exeuction of multiple SQLs
func BuildTxForSqls(queries ...string) TxCallback {
return TxCallbackFunc(func(tx *sql.Tx) TxFinale {
txExt := ToTxExt(tx)
for _, v := range queries {
txExt.Exec(v)
}
return TxCommit
})
}
// Executes callbacks in transaction if the boot callback has true value
type ExecuteIfByTx interface {
// First calling of database for boolean result
BootCallback(tx *sql.Tx) bool
// If the boot callback has true result, this callback would get called
IfTrue(tx *sql.Tx)
}
// Extension for sql.Rows
type RowsExt sql.Rows
// converts the sql.Rows to RowsExt
func ToRowsExt(rows *sql.Rows) *RowsExt {
return ((*RowsExt)(rows))
}
// Gets columns, with panic instead of returned error
func (rowsExt *RowsExt) Columns() []string {
columns, err := ((*sql.Rows)(rowsExt)).Columns()
PanicIfError(utils.BuildErrorWithCaller(err))
return columns
}
// Scans the values of row into variables, with panic instead of returned error
func (rowsExt *RowsExt) Scan(dest ...interface{}) {
err := ((*sql.Rows)(rowsExt)).Scan(dest...)
PanicIfError(utils.BuildErrorWithCaller(err))
}
// Extension for sql.Row
type RowExt sql.Row
// Converts the sql.Row to RowExt
func ToRowExt(row *sql.Row) *RowExt {
return ((*RowExt)(row))
}
// Scans the values of row into variables, with panic instead of returned error
func (rowExt *RowExt) Scan(dest ...interface{}) {
err := ((*sql.Row)(rowExt)).Scan(dest...)
PanicIfError(utils.BuildErrorWithCaller(err))
}
// Extension for sql.Stmt
type StmtExt sql.Stmt
// Converts sql.Stmt to StmtExt
func ToStmtExt(stmt *sql.Stmt) *StmtExt {
return ((*StmtExt)(stmt))
}
// Exec with panic instead of error
func (stmtExt *StmtExt) Exec(args ...interface{}) sql.Result {
result, err := ((*sql.Stmt)(stmtExt)).Exec(args...)
PanicIfError(utils.BuildErrorWithCaller(err))
return result
}
// Query with panic instead of error
func (stmtExt *StmtExt) Query(args ...interface{}) *sql.Rows {
rows, err := ((*sql.Stmt)(stmtExt)).Query(args...)
PanicIfError(utils.BuildErrorWithCaller(err))
return rows
}
// Extnesion for sql.Tx
type TxExt sql.Tx
// Converts sql.Tx to TxExt
func ToTxExt(tx *sql.Tx) *TxExt {
return ((*TxExt)(tx))
}
// Commit with panic instead of returned error
func (txExt *TxExt) Commit() {
err := ((*sql.Tx)(txExt)).Commit()
PanicIfError(utils.BuildErrorWithCaller(err))
}
// Commit with panic instead of returned error
func (txExt *TxExt) Exec(query string, args ...interface{}) sql.Result {
result, err := ((*sql.Tx)(txExt)).Exec(query, args...)
PanicIfError(utils.BuildErrorWithCaller(err))
return result
}
// Prepare with panic instead of returned error
func (txExt *TxExt) Prepare(query string) *sql.Stmt {
stmt, err := ((*sql.Tx)(txExt)).Prepare(query)
PanicIfError(utils.BuildErrorWithCaller(err))
return stmt
}
// Query with panic instead of returned error
func (txExt *TxExt) Query(query string, args ...interface{}) *sql.Rows {
rows, err := ((*sql.Tx)(txExt)).Query(query)
PanicIfError(utils.BuildErrorWithCaller(err))
return rows
}
// Rollback with panic instead of returned error
func (txExt *TxExt) Rollback() {
err := ((*sql.Tx)(txExt)).Rollback()
PanicIfError(utils.BuildErrorWithCaller(err))
}
// Extension for sql.Result
type ResultExt struct {
sqlResult sql.Result
}
// Converts sql.Result to ResultExt
func ToResultExt(result sql.Result) *ResultExt {
return &ResultExt{result}
}
// Gets last id of insert with panic instead of returned error
func (resultExt *ResultExt) LastInsertId() int64 {
insertId, err := resultExt.sqlResult.LastInsertId()
PanicIfError(utils.BuildErrorWithCaller(err))
return insertId
}
// Gets last number of affected rows with panic instead of returned error
func (resultExt *ResultExt) RowsAffected() int64 {
numberOfRowsAffected, err := resultExt.sqlResult.RowsAffected()
PanicIfError(utils.BuildErrorWithCaller(err))
return numberOfRowsAffected
}
// The control of iterating
type IterateControl byte
const (
IterateContinue = IterateControl(1)
IterateStop = IterateControl(0)
)
// Initialize a controller for database
//
// Without RegisterPanicHandler() any PanicHandler,
// The raised panic would be re-paniced.
func NewDbController(newDbObject *sql.DB) *DbController {
if newDbObject == nil {
PanicIfError(utils.BuildErrorWithCaller(
fmt.Errorf("Need viable DB object(sql.DB)"),
))
}
return &DbController{
dbObject: newDbObject,
panicHandlers: make([]utils.PanicHandler, 0),
}
}
// Registers a handler while a panic is raised
//
// This object may register multiple handlers for panic
func (dbController *DbController) RegisterPanicHandler(panicHandler utils.PanicHandler) {
dbController.panicHandlers = append(dbController.panicHandlers, panicHandler)
}
// Operate on database
func (dbController *DbController) OperateOnDb(dbCallback DbCallback) {
dbController.needInitializedOrPanic()
defer dbController.handlePanic()
defer utils.DeferCatchPanicWithCaller()()
dbCallback.OnDb(dbController.dbObject)
}
// Executes the query string or panic
func (dbController *DbController) Exec(query string, args ...interface{}) sql.Result {
callerInfo := or.GetCallerInfo()
var finalResult sql.Result
var dbFunc DbCallbackFunc = func(db *sql.DB) {
r, err := db.Exec(query, args...)
PanicIfError(utils.BuildErrorWithCallerInfo(err, callerInfo))
finalResult = r
}
dbController.OperateOnDb(dbFunc)
return finalResult
}
// Query for rows and get called of rows with Next()
func (dbController *DbController) QueryForRows(
rowsCallback RowsCallback,
sqlQuery string, args ...interface{},
) (numberOfRows uint) {
defer utils.DeferCatchPanicWithCaller()()
var dbFunc DbCallbackFunc = func(db *sql.DB) {
rows, err := db.Query(
sqlQuery, args...,
)
if err != nil {
err := fmt.Errorf(
"Query SQL with exception: %v. SQL: \"%s\" Params: %#v",
err, sqlQuery, args,
)
PanicIfError(utils.BuildErrorWithCaller(err))
}
defer rows.Close()
for rows.Next() {
numberOfRows++
if rowsCallback.NextRow(rows) == IterateStop {
break
}
}
}
dbController.OperateOnDb(dbFunc)
return
}
// Query for a row and get called if the query is not failed
func (dbController *DbController) QueryForRow(
rowCallback RowCallback,
sqlQuery string, args ...interface{},
) {
defer utils.DeferCatchPanicWithCaller()()
var dbFunc DbCallbackFunc = func(db *sql.DB) {
row := db.QueryRow(
sqlQuery, args...,
)
rowCallback.ResultRow(row)
}
dbController.OperateOnDb(dbFunc)
}
// Executes in transaction.
//
// This method would commit the transaction if there is no raised panic,
// rollback it otherwise.
func (dbController *DbController) InTx(txCallback TxCallback) {
defer utils.DeferCatchPanicWithCaller()()
var dbFunc DbCallbackFunc = func(db *sql.DB) {
callerInfo := or.GetCallerInfo()
tx, err := db.Begin()
PanicIfError(utils.BuildErrorWithCallerInfo(err, callerInfo))
/**
* Rollback the transaction when panic is raised
*/
defer func() {
p := recover()
if p == nil {
return
}
var finalError = utils.BuildErrorWithCallerInfo(
utils.SimpleErrorConverter(p), callerInfo,
)
/**
* Rollback the transaction
*/
rollbackError := tx.Rollback()
if rollbackError != nil {
finalError = utils.BuildErrorWithCallerInfo(
fmt.Errorf("Rollback has error: %v. Cause Error: %v", rollbackError, finalError), callerInfo,
)
}
// :~)
PanicIfError(finalError)
}()
// :~)
txExt := ToTxExt(tx)
switch txCallback.InTx(tx) {
case TxCommit:
txExt.Commit()
case TxRollback:
txExt.Rollback()
}
}
dbController.OperateOnDb(dbFunc)
}
// Executes the complex statement in transaction
func (dbController *DbController) InTxForIf(ifCallbacks ExecuteIfByTx) {
defer utils.DeferCatchPanicWithCaller()()
var txFunc TxCallbackFunc = func(tx *sql.Tx) TxFinale {
if ifCallbacks.BootCallback(tx) {
ifCallbacks.IfTrue(tx)
}
return TxCommit
}
dbController.InTx(txFunc)
}
// Executes in transaction
func (dbController *DbController) ExecQueriesInTx(queries ...string) {
defer utils.DeferCatchPanicWithCaller()()
dbController.InTx(BuildTxForSqls(queries...))
}
// Releases the database object under this object
//
// As of service application(web, daemon...), this method is rarely get called
func (dbController *DbController) Release() {
dbController.needInitializedOrPanic()
defer dbController.handlePanic()
err := dbController.dbObject.Close()
if err != nil {
PanicIfError(utils.BuildErrorWithCaller(
fmt.Errorf("Release database connection error. %v", err),
))
}
dbController.dbObject = nil
}
func (dbController *DbController) needInitializedOrPanic() {
if dbController.dbObject != nil {
return
}
PanicIfError(utils.BuildErrorWithCallerInfo(
fmt.Errorf("The controller is not initialized"),
or.GetCallerInfoWithDepth(1),
))
}
func (dbController *DbController) handlePanic() {
p := recover()
if p == nil {
return
}
if len(dbController.panicHandlers) == 0 {
panic(p)
}
for _, handler := range dbController.panicHandlers {
handler(p)
}
}