Skip to content

Commit

Permalink
feat: add more trace span and integrate with jaeger (#177)
Browse files Browse the repository at this point in the history
* feat: add more trace span and integrate with jaeger
  • Loading branch information
fatelei committed Jul 17, 2022
1 parent 8290190 commit d703959
Show file tree
Hide file tree
Showing 30 changed files with 420 additions and 203 deletions.
25 changes: 18 additions & 7 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/server"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/pools"
_ "github.com/cectc/dbpack/third_party/types/parser_driver"
)
Expand Down Expand Up @@ -154,12 +155,6 @@ var (
}
}

// temporarily turn off tracer output
//tracingMgr, err := tracing.NewTracer(Version, "console")
//if err != nil {
// log.Fatalf("could not setup tracing manager: %s", err.Error())
//}

ctx, cancel := context.WithCancel(context.Background())
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
Expand All @@ -172,7 +167,6 @@ var (
cancel()
}()
<-c
//_ = tracingMgr.Shutdown(ctx)
os.Exit(1) // second signal. Exit directly.
}()

Expand All @@ -189,8 +183,13 @@ var (
if lisErr != nil {
log.Fatalf("unable init metrics server: %+v", lisErr)
}

go initServer(ctx, lis)

if conf.Trace != nil {
go initTracing(ctx, conf.Trace.JaegerEndpoint)
}

dbpack.Start(ctx)
},
}
Expand All @@ -202,6 +201,18 @@ func init() {
rootCommand.AddCommand(startCommand)
}

func initTracing(ctx context.Context, jaegerEndpoint string) {
traceCtl, err := tracing.NewTracer(Version, jaegerEndpoint)
if err != nil {
log.Fatalf("could not setup tracing manager: %s", err.Error())
}

go func() {
<-ctx.Done()
traceCtl.Shutdown(ctx)
}()
}

func initServer(ctx context.Context, lis net.Listener) {
go func() {
<-ctx.Done()
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b // indirect
go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0 // indirect
go.opencensus.io v0.23.0
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/exporters/jaeger v1.7.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down Expand Up @@ -807,6 +808,7 @@ github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down Expand Up @@ -915,9 +917,12 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/exporters/jaeger v1.7.0 h1:wXgjiRldljksZkZrldGVe6XrG9u3kYDyQmkZwmm5dI0=
go.opentelemetry.io/otel/exporters/jaeger v1.7.0/go.mod h1:PwQAOqBgqbLQRKlj466DuD2qyMjbtcPpfPfj+AqbSBs=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0 h1:8hPcgCg0rUJiKE6VWahRvjgLUrNl7rW2hffUEPKXVEM=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0/go.mod h1:K4GDXPY6TjUiwbOh+DkKaEdCF8y+lvMoM6SeAPyfCCM=
go.opentelemetry.io/otel/sdk v1.7.0 h1:4OmStpcKVOfvDOgCt7UriAPtKolwIhxpnSNI/yK+1B0=
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Configuration struct {
TerminationDrainDuration time.Duration `yaml:"termination_drain_duration" json:"termination_drain_duration"`

HTTPListenPort *int `yaml:"http_listen_port"`

Trace *Trace `yaml:"trace"`
}

type (
Expand Down Expand Up @@ -112,6 +114,10 @@ type (
Timeout time.Duration `yaml:"timeout" json:"-"`
PermitWithoutStream bool `yaml:"permit_without_stream"`
}

Trace struct {
JaegerEndpoint string `yaml:"jaeger_endpoint"`
}
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/dt/mysql_undo_log_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (manager MysqlUndoLogManager) DeleteUndoLogByXID(db proto.DB, xid string) e
}

func (manager MysqlUndoLogManager) DeleteUndoLogByLogCreated(db proto.DB, logCreated time.Time, limitRows int) error {
// TODO pass ctx.
result, _, err := db.ExecuteSql(context.Background(), DeleteUndoLogByCreateSql, logCreated, limitRows)
if err != nil {
return err
Expand Down
22 changes: 12 additions & 10 deletions pkg/executor/read_write_splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (executor *ReadWriteSplittingExecutor) ExecuteFieldList(ctx context.Context
}

func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context, sql string) (proto.Result, uint16, error) {
newCtx, span := tracing.GetTraceSpan(ctx, "rw_execute_com_query")
newCtx, span := tracing.GetTraceSpan(ctx, tracing.RWExecComQuery)
defer span.End()
var (
db *DataSourceBrief
Expand Down Expand Up @@ -272,31 +272,33 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context
}

func (executor *ReadWriteSplittingExecutor) ExecutorComStmtExecute(ctx context.Context, stmt *proto.Stmt) (proto.Result, uint16, error) {
connectionID := proto.ConnectionID(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.RWExecComStmt)
defer span.End()
connectionID := proto.ConnectionID(newCtx)
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
// in local transaction
tx := txi.(proto.Tx)
return tx.ExecuteStmt(ctx, stmt)
return tx.ExecuteStmt(newCtx, stmt)
}
switch st := stmt.StmtNode.(type) {
case *ast.InsertStmt, *ast.DeleteStmt, *ast.UpdateStmt:
db := executor.masters.Next(proto.WithMaster(ctx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithMaster(ctx), stmt)
db := executor.masters.Next(proto.WithMaster(newCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithMaster(newCtx), stmt)
case *ast.SelectStmt:
var db *DataSourceBrief
if has, dsName := hasUseDBHint(st.TableHints); has {
protoDB := resource.GetDBManager().GetDB(dsName)
if protoDB == nil {
log.Debugf("data source %d not found", dsName)
db = executor.reads.Next(proto.WithSlave(ctx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(ctx), stmt)
db = executor.reads.Next(proto.WithSlave(newCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(newCtx), stmt)
} else {
return protoDB.ExecuteStmt(proto.WithSlave(ctx), stmt)
return protoDB.ExecuteStmt(proto.WithSlave(newCtx), stmt)
}
}
db = executor.reads.Next(proto.WithSlave(ctx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(ctx), stmt)
db = executor.reads.Next(proto.WithSlave(newCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(newCtx), stmt)
default:
return nil, 0, errors.Errorf("unsupported %t statement", stmt.StmtNode)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (executor *ShardingExecutor) ExecutorComQuery(ctx context.Context, sql stri
plan proto.Plan
err error
)
newCtx, span := tracing.GetTraceSpan(ctx, "sharding_com_query")
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ShardExecComQuery)
defer span.End()

log.Debugf("query: %s", sql)
Expand Down Expand Up @@ -245,7 +245,7 @@ func (executor *ShardingExecutor) ExecutorComStmtExecute(ctx context.Context, st
plan proto.Plan
err error
)
newCtx, span := tracing.GetTraceSpan(ctx, "sharding_com_stmt_execute")
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ShardExecComStmt)
defer span.End()

for i := 0; i < len(stmt.BindVars); i++ {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/single_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (executor *SingleDBExecutor) ExecutorComQuery(ctx context.Context, sql stri
result proto.Result
err error
)
newCtx, span := tracing.GetTraceSpan(ctx, "sdb_execute_com_query")
newCtx, span := tracing.GetTraceSpan(ctx, tracing.SDBExecComQuery)
defer span.End()
connectionID := proto.ConnectionID(newCtx)
queryStmt := proto.QueryStmt(newCtx)
Expand Down
7 changes: 6 additions & 1 deletion pkg/filter/dt/exec/prepare_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cectc/dbpack/pkg/meta"
"github.com/cectc/dbpack/pkg/misc"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/parser/ast"
"github.com/cectc/dbpack/third_party/parser/format"
)
Expand All @@ -49,8 +50,11 @@ func NewPrepareDeleteExecutor(
}

func (executor *prepareDeleteExecutor) BeforeImage(ctx context.Context) (*schema.TableRecords, error) {
tableMeta, err := executor.GetTableMeta(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchBeforeImage)
defer span.End()
tableMeta, err := executor.GetTableMeta(newCtx)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
sql := executor.buildBeforeImageSql(tableMeta)
Expand All @@ -64,6 +68,7 @@ func (executor *prepareDeleteExecutor) BeforeImage(ctx context.Context) (*schema

result, _, err := executor.conn.PrepareQueryArgs(sql, args)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
return schema.BuildBinaryRecords(tableMeta, result), nil
Expand Down
9 changes: 7 additions & 2 deletions pkg/filter/dt/exec/prepare_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cectc/dbpack/pkg/misc"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/parser/ast"
"github.com/cectc/dbpack/third_party/parser/format"
)
Expand Down Expand Up @@ -58,19 +59,23 @@ func (executor *prepareInsertExecutor) BeforeImage(ctx context.Context) (*schema
}

func (executor *prepareInsertExecutor) AfterImage(ctx context.Context) (*schema.TableRecords, error) {
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchAfterImage)
defer span.End()
var afterImage *schema.TableRecords
var err error
pkValues, err := executor.getPKValuesByColumn(ctx)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
if executor.getPKIndex(ctx) >= 0 {
afterImage, err = executor.buildTableRecords(ctx, pkValues)
afterImage, err = executor.buildTableRecords(newCtx, pkValues)
} else {
pk, _ := executor.result.LastInsertId()
afterImage, err = executor.buildTableRecords(ctx, []interface{}{pk})
afterImage, err = executor.buildTableRecords(newCtx, []interface{}{pk})
}
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
return afterImage, nil
Expand Down
9 changes: 7 additions & 2 deletions pkg/filter/dt/exec/prepare_select_for_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"
"time"

"github.com/cectc/dbpack/pkg/tracing"

"github.com/cectc/dbpack/pkg/driver"
"github.com/cectc/dbpack/pkg/dt"
"github.com/cectc/dbpack/pkg/dt/schema"
Expand Down Expand Up @@ -54,7 +56,10 @@ func NewPrepareSelectForUpdateExecutor(
}

func (executor *prepareSelectForUpdateExecutor) Executable(ctx context.Context, xid string, lockRetryInterval time.Duration, lockRetryTimes int) (bool, error) {
tableMeta, err := executor.GetTableMeta(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, "executable")
defer span.End()

tableMeta, err := executor.GetTableMeta(newCtx)
if err != nil {
return false, err
}
Expand All @@ -70,7 +75,7 @@ func (executor *prepareSelectForUpdateExecutor) Executable(ctx context.Context,
err error
)
for i := 0; i < lockRetryTimes; i++ {
lockable, err = dt.GetDistributedTransactionManager().IsLockableWithXID(ctx,
lockable, err = dt.GetDistributedTransactionManager().IsLockableWithXID(newCtx,
executor.conn.DataSourceName(), lockKeys, xid)
if lockable && err == nil {
break
Expand Down
14 changes: 11 additions & 3 deletions pkg/filter/dt/exec/prepare_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cectc/dbpack/pkg/meta"
"github.com/cectc/dbpack/pkg/misc"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/parser/ast"
"github.com/cectc/dbpack/third_party/parser/format"
)
Expand All @@ -52,8 +53,11 @@ func NewPrepareUpdateExecutor(
}

func (executor *prepareUpdateExecutor) BeforeImage(ctx context.Context) (*schema.TableRecords, error) {
tableMeta, err := executor.GetTableMeta(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchBeforeImage)
defer span.End()
tableMeta, err := executor.GetTableMeta(newCtx)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
sql := executor.buildBeforeImageSql(tableMeta)
Expand All @@ -67,6 +71,7 @@ func (executor *prepareUpdateExecutor) BeforeImage(ctx context.Context) (*schema

result, _, err := executor.conn.PrepareQueryArgs(sql, args)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
return schema.BuildBinaryRecords(tableMeta, result), nil
Expand All @@ -76,9 +81,11 @@ func (executor *prepareUpdateExecutor) AfterImage(ctx context.Context) (*schema.
if executor.beforeImage == nil || len(executor.beforeImage.Rows) == 0 {
return nil, nil
}

tableMeta, err := executor.GetTableMeta(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchAfterImage)
defer span.End()
tableMeta, err := executor.GetTableMeta(newCtx)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}

Expand All @@ -89,6 +96,7 @@ func (executor *prepareUpdateExecutor) AfterImage(ctx context.Context) (*schema.
}
result, _, err := executor.conn.PrepareQueryArgs(afterImageSql, args)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
return schema.BuildBinaryRecords(tableMeta, result), nil
Expand Down
7 changes: 6 additions & 1 deletion pkg/filter/dt/exec/query_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cectc/dbpack/pkg/meta"
"github.com/cectc/dbpack/pkg/misc"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/parser/ast"
"github.com/cectc/dbpack/third_party/parser/format"
)
Expand All @@ -46,13 +47,17 @@ func NewQueryDeleteExecutor(
}

func (executor *queryDeleteExecutor) BeforeImage(ctx context.Context) (*schema.TableRecords, error) {
tableMeta, err := executor.GetTableMeta(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchBeforeImage)
defer span.End()
tableMeta, err := executor.GetTableMeta(newCtx)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
sql := executor.buildBeforeImageSql(tableMeta)
result, _, err := executor.conn.ExecuteWithWarningCount(sql, true)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
return schema.BuildTextRecords(tableMeta, result), nil
Expand Down

0 comments on commit d703959

Please sign in to comment.