forked from vitessio/vitess
/
dbclient.go
101 lines (89 loc) · 2.42 KB
/
dbclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package binlogplayer
import (
"fmt"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/dbconfigs"
)
// DBClient is a real VtClient backed by a mysql connection
type DBClient struct {
dbConfig *sqldb.ConnParams
dbConn sqldb.Conn
}
// NewDbClient creates a DBClient instance
func NewDbClient(params *sqldb.ConnParams) *DBClient {
return &DBClient{
dbConfig: params,
}
}
func (dc *DBClient) handleError(err error) {
// log.Errorf("in DBClient handleError %v", err.(error))
if sqlErr, ok := err.(*sqldb.SQLError); ok {
if sqlErr.Number() >= 2000 && sqlErr.Number() <= 2018 { // mysql connection errors
dc.Close()
}
if sqlErr.Number() == 1317 { // Query was interrupted
dc.Close()
}
}
}
// Connect connects to a db server
func (dc *DBClient) Connect() error {
params, err := dbconfigs.MysqlParams(dc.dbConfig)
if err != nil {
return err
}
dc.dbConn, err = sqldb.Connect(params)
if err != nil {
return fmt.Errorf("error in connecting to mysql db, err %v", err)
}
return nil
}
// Begin starts a transaction
func (dc *DBClient) Begin() error {
_, err := dc.dbConn.ExecuteFetch("begin", 1, false)
if err != nil {
log.Errorf("BEGIN failed w/ error %v", err)
dc.handleError(err)
}
return err
}
// Commit commits the current transaction
func (dc *DBClient) Commit() error {
_, err := dc.dbConn.ExecuteFetch("commit", 1, false)
if err != nil {
log.Errorf("COMMIT failed w/ error %v", err)
dc.dbConn.Close()
}
return err
}
// Rollback rollbacks the current transaction
func (dc *DBClient) Rollback() error {
_, err := dc.dbConn.ExecuteFetch("rollback", 1, false)
if err != nil {
log.Errorf("ROLLBACK failed w/ error %v", err)
dc.dbConn.Close()
}
return err
}
// Close closes connection to the db server
func (dc *DBClient) Close() {
if dc.dbConn != nil {
dc.dbConn.Close()
dc.dbConn = nil
}
}
// ExecuteFetch sends query to the db server and fetch the result
func (dc *DBClient) ExecuteFetch(query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
mqr, err := dc.dbConn.ExecuteFetch(query, maxrows, wantfields)
if err != nil {
log.Errorf("ExecuteFetch failed w/ error %v", err)
dc.handleError(err)
return nil, err
}
return mqr, nil
}