Skip to content

Commit

Permalink
refacter: add trace for BackendConnection (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
dk-lockdown committed Jul 20, 2022
1 parent b184688 commit 36b3894
Show file tree
Hide file tree
Showing 13 changed files with 38 additions and 34 deletions.
14 changes: 11 additions & 3 deletions pkg/driver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cectc/dbpack/pkg/misc"
"github.com/cectc/dbpack/pkg/mysql"
"github.com/cectc/dbpack/pkg/packet"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/pools"
)

Expand Down Expand Up @@ -875,13 +876,16 @@ func (conn *BackendConnection) ExecuteMulti(query string, wantFields bool) (resu
// ExecuteWithWarningCount is for fetching results and a warning count
// Note: In a future iteration this should be abolished and merged into the
// Execute API.
func (conn *BackendConnection) ExecuteWithWarningCount(query string, wantFields bool) (result *mysql.Result, warnings uint16, err error) {
func (conn *BackendConnection) ExecuteWithWarningCount(ctx context.Context, query string, wantFields bool) (result *mysql.Result, warnings uint16, err error) {
_, span := tracing.GetTraceSpan(ctx, tracing.ConnQuery)
defer func() {
if err != nil {
if sqlerr, ok := err.(*err2.SQLError); ok {
sqlerr.Query = query
}
span.RecordError(err)
}
span.End()
}()

// Send the query as a COM_QUERY packet.
Expand All @@ -901,12 +905,16 @@ func (conn *BackendConnection) PrepareExecuteArgs(query string, args []interface
return stmt.execArgs(args)
}

func (conn *BackendConnection) PrepareQueryArgs(query string, data []interface{}) (Result *mysql.Result, warnings uint16, err error) {
func (conn *BackendConnection) PrepareQueryArgs(ctx context.Context, query string, args []interface{}) (Result *mysql.Result, warnings uint16, err error) {
_, span := tracing.GetTraceSpan(ctx, tracing.ConnStmtExecute)
defer span.End()

stmt, err := conn.prepare(query)
if err != nil {
span.RecordError(err)
return nil, 0, err
}
return stmt.queryArgs(data)
return stmt.queryArgs(args)
}

func (conn *BackendConnection) PrepareExecute(query string, data []byte) (result *mysql.Result, warnings uint16, err error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/dt/exec/prepare_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (executor *prepareDeleteExecutor) BeforeImage(ctx context.Context) (*schema
args = append(args, executor.args[parameterID])
}

result, _, err := executor.conn.PrepareQueryArgs(sql, args)
result, _, err := executor.conn.PrepareQueryArgs(spanCtx, sql, args)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/dt/exec/prepare_global_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (executor *prepareGlobalLockExecutor) BeforeImage(ctx context.Context) (*sc
parameterID := fmt.Sprintf("v%d", begin+1)
args = append(args, executor.args[parameterID])
}
result, _, err := executor.conn.PrepareQueryArgs(sql, args)
result, _, err := executor.conn.PrepareQueryArgs(ctx, sql, args)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/dt/exec/prepare_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (executor *prepareInsertExecutor) buildTableRecords(ctx context.Context, pk
}

afterImageSql := executor.buildAfterImageSql(tableMeta, pkValues)
result, _, err := executor.conn.PrepareQueryArgs(afterImageSql, pkValues)
result, _, err := executor.conn.PrepareQueryArgs(ctx, afterImageSql, pkValues)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/filter/dt/exec/prepare_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (executor *prepareUpdateExecutor) BeforeImage(ctx context.Context) (*schema
args = append(args, executor.args[parameterID])
}

result, _, err := executor.conn.PrepareQueryArgs(sql, args)
result, _, err := executor.conn.PrepareQueryArgs(spanCtx, sql, args)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
Expand All @@ -94,7 +94,7 @@ func (executor *prepareUpdateExecutor) AfterImage(ctx context.Context) (*schema.
for _, field := range executor.beforeImage.PKFields() {
args = append(args, field.Value)
}
result, _, err := executor.conn.PrepareQueryArgs(afterImageSql, args)
result, _, err := executor.conn.PrepareQueryArgs(spanCtx, afterImageSql, args)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/dt/exec/query_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (executor *queryDeleteExecutor) BeforeImage(ctx context.Context) (*schema.T
return nil, err
}
sql := executor.buildBeforeImageSql(tableMeta)
result, _, err := executor.conn.ExecuteWithWarningCount(sql, true)
result, _, err := executor.conn.ExecuteWithWarningCount(spanCtx, sql, true)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/dt/exec/query_global_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (executor *queryGlobalLockExecutor) BeforeImage(ctx context.Context) (*sche
}

sql := executor.buildBeforeImageSql(tableMeta)
result, _, err := executor.conn.ExecuteWithWarningCount(sql, true)
result, _, err := executor.conn.ExecuteWithWarningCount(ctx, sql, true)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/dt/exec/query_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (executor *queryInsertExecutor) buildTableRecords(ctx context.Context, pkVa
}

afterImageSql := executor.buildAfterImageSql(tableMeta, pkValues)
result, _, err := executor.conn.ExecuteWithWarningCount(afterImageSql, true)
result, _, err := executor.conn.ExecuteWithWarningCount(ctx, afterImageSql, true)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/filter/dt/exec/query_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (executor *queryUpdateExecutor) BeforeImage(ctx context.Context) (*schema.T
return nil, err
}
sql := executor.buildBeforeImageSql(tableMeta)
result, _, err := executor.conn.ExecuteWithWarningCount(sql, true)
result, _, err := executor.conn.ExecuteWithWarningCount(spanCtx, sql, true)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
Expand All @@ -79,7 +79,7 @@ func (executor *queryUpdateExecutor) AfterImage(ctx context.Context) (*schema.Ta
}

afterImageSql := executor.buildAfterImageSql(tableMeta)
result, _, err := executor.conn.ExecuteWithWarningCount(afterImageSql, true)
result, _, err := executor.conn.ExecuteWithWarningCount(spanCtx, afterImageSql, true)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/listener/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (l *MysqlListener) handle(conn net.Conn, connectionID uint32) {
if err != nil {
writeErr := c.WriteErrorPacketFromError(err)
if writeErr != nil {
log.Errorf("Cannot write error packet to %s: %v", c, writeErr)
log.Warnf("Cannot write error packet to %s: %v", c, writeErr)
return
}
return
Expand Down
16 changes: 6 additions & 10 deletions pkg/sql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (db *DB) Query(ctx context.Context, query string) (proto.Result, uint16, er
return nil, 0, err
}

result, warn, err := conn.ExecuteWithWarningCount(query, true)
result, warn, err := conn.ExecuteWithWarningCount(spanCtx, query, true)
if err != nil {
return result, warn, err
}
Expand Down Expand Up @@ -171,15 +171,11 @@ func (db *DB) ExecuteStmt(ctx context.Context, stmt *proto.Stmt) (proto.Result,
if err := db.doConnectionPreFilter(spanCtx, conn); err != nil {
return nil, 0, err
}
if stmt.HasLongDataParam {
for i := 0; i < len(stmt.BindVars); i++ {
parameterID := fmt.Sprintf("v%d", i+1)
args = append(args, stmt.BindVars[parameterID])
}
result, warn, err = conn.PrepareQueryArgs(query, args)
} else {
result, warn, err = conn.PrepareQuery(query, stmt.ParamData)
for i := 0; i < len(stmt.BindVars); i++ {
parameterID := fmt.Sprintf("v%d", i+1)
args = append(args, stmt.BindVars[parameterID])
}
result, warn, err = conn.PrepareQueryArgs(spanCtx, query, args)
if err != nil {
return result, warn, err
}
Expand Down Expand Up @@ -209,7 +205,7 @@ func (db *DB) ExecuteSql(ctx context.Context, sql string, args ...interface{}) (
return nil, 0, err
}
// TODO PrepareQueryArgs support ctx
result, warn, err := conn.PrepareQueryArgs(sql, args)
result, warn, err := conn.PrepareQueryArgs(spanCtx, sql, args)
if err != nil {
return result, warn, err
}
Expand Down
16 changes: 6 additions & 10 deletions pkg/sql/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (tx *Tx) Query(ctx context.Context, query string) (proto.Result, uint16, er
if err := tx.db.doConnectionPreFilter(spanCtx, tx.conn); err != nil {
return nil, 0, err
}
result, warn, err := tx.conn.ExecuteWithWarningCount(query, true)
result, warn, err := tx.conn.ExecuteWithWarningCount(spanCtx, query, true)
if err != nil {
return result, warn, err
}
Expand Down Expand Up @@ -77,15 +77,11 @@ func (tx *Tx) ExecuteStmt(ctx context.Context, stmt *proto.Stmt) (proto.Result,
warn uint16
err error
)
if stmt.HasLongDataParam {
for i := 0; i < len(stmt.BindVars); i++ {
parameterID := fmt.Sprintf("v%d", i+1)
args = append(args, stmt.BindVars[parameterID])
}
result, warn, err = tx.conn.PrepareQueryArgs(query, args)
} else {
result, warn, err = tx.conn.PrepareQuery(query, stmt.ParamData)
for i := 0; i < len(stmt.BindVars); i++ {
parameterID := fmt.Sprintf("v%d", i+1)
args = append(args, stmt.BindVars[parameterID])
}
result, warn, err = tx.conn.PrepareQueryArgs(spanCtx, query, args)
if err != nil {
return result, warn, err
}
Expand All @@ -107,7 +103,7 @@ func (tx *Tx) ExecuteSql(ctx context.Context, sql string, args ...interface{}) (
if err := tx.db.doConnectionPreFilter(spanCtx, tx.conn); err != nil {
return nil, 0, err
}
result, warn, err := tx.conn.PrepareQueryArgs(sql, args)
result, warn, err := tx.conn.PrepareQueryArgs(spanCtx, sql, args)
if err != nil {
return result, warn, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/tracing/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,8 @@ const (
TxExecStmt = "tx_exec_stmt"
TxCommit = "db_local_transaction_commit"
TxRollback = "db_local_transaction_rollback"

// conn
ConnQuery = "conn_com_query"
ConnStmtExecute = "conn_com_stmt_exec"
)

0 comments on commit 36b3894

Please sign in to comment.