diff --git a/agent/agent.go b/agent/agent.go index 956235caa..1f95c1b36 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -15,13 +15,12 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/memberlist" - ucli "github.com/actiontech/dtle/internal/client" uconf "github.com/actiontech/dtle/internal/config" ulog "github.com/actiontech/dtle/internal/logger" umodel "github.com/actiontech/dtle/internal/models" usrv "github.com/actiontech/dtle/internal/server" + "github.com/hashicorp/memberlist" ) // Agent is a long running daemon that is used to run both diff --git a/agent/command.go b/agent/command.go index 52c58c4c6..c5a598a11 100644 --- a/agent/command.go +++ b/agent/command.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "log" + "net/http" "os" "os/signal" "path/filepath" @@ -19,16 +20,17 @@ import ( "strings" "syscall" "time" - "net/http" "github.com/actiontech/dtle/internal/g" + ulog "github.com/actiontech/dtle/internal/logger" "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/mitchellh/cli" - - ulog "github.com/actiontech/dtle/internal/logger" + opentracing "github.com/opentracing/opentracing-go" "github.com/rakyll/autopprof" + jaeger "github.com/uber/jaeger-client-go" + jaegercnf "github.com/uber/jaeger-client-go/config" report "github.com/ikarishinjieva/golang-live-coverage-report/pkg" ) @@ -94,6 +96,8 @@ func (c *Command) readConfig() *Config { flags.IntVar(&cmdConfig.CoverageReportPort, "coverage-report-port", 0, "") flags.StringVar(&cmdConfig.CoverageReportRawCodeDir, "coverage-report-raw-code-dir", "/usr/lib/dtle", "") flags.StringVar(&cmdConfig.NodeName, "node", "", "") + flags.StringVar(&cmdConfig.JaegerAgentAddress, "jaeger-agent-address", "", "") + flags.StringVar(&cmdConfig.JaegerAgentPort, "jaeger-agent-port", "", "") if err := flags.Parse(c.args); err != nil { return nil @@ -248,6 +252,7 @@ func (c *Command) setupLoggers(config *Config) (io.Writer, error) { // setupAgent is used to start the agent and various interfaces func (c *Command) setupAgent(config *Config, logOutput io.Writer) error { + c.logger.Printf("Starting Dtle server...") agent, err := NewAgent(config, logOutput, c.logger) if err != nil { @@ -275,6 +280,36 @@ func (c *Command) Run(args []string) int { if config == nil { return 1 } + if config.JaegerAgentAddress != "" && config.JaegerAgentPort != "" { + cfg := jaegercnf.Configuration{ + Sampler: &jaegercnf.SamplerConfig{ + Type: "const", + Param: 1, + }, + ServiceName: "dtle", + Reporter: &jaegercnf.ReporterConfig{ + LogSpans: true, + BufferFlushInterval: 1 * time.Second, + }, + } + sender, err := jaeger.NewUDPTransport(config.JaegerAgentAddress+":"+config.JaegerAgentPort, 0) + if err != nil { + return 1 + } + + reporter := jaeger.NewRemoteReporter(sender) + // Initialize tracer with a logger and a metrics factory + tracer, closer, err := cfg.NewTracer( + jaegercnf.Reporter(reporter), + ) + + /*tracer, closer, err := cfg.NewTracer()*/ + if err != nil { + return 1 + } + opentracing.SetGlobalTracer(tracer) + defer closer.Close() + } // Setup the log outputs logOutput, err := c.setupLoggers(config) diff --git a/agent/config.go b/agent/config.go index a9916911e..d4fe69b60 100644 --- a/agent/config.go +++ b/agent/config.go @@ -134,6 +134,11 @@ type Config struct { // CoverageReportRawCodeDir is the root deploy directory of coverage report raw code CoverageReportRawCodeDir string `mapstructure:"coverage_report_raw_code_dir"` + + //jaegerAgentAddress is jaeger tracing Data reporting address + JaegerAgentAddress string `mapstructure:"jaeger_agent_address"` + //jaegerAgentPort is jaeger tracing Data reporting port + JaegerAgentPort string `mapstructure:"jaeger_agent_port"` } // ClientConfig is configuration specific to the client mode @@ -252,15 +257,17 @@ type Node struct { // DefaultConfig is a the baseline configuration for Udup func DefaultConfig() *Config { return &Config{ - LogLevel: "INFO", - LogFile: "/var/log/dtle/dtle.log", - LogToStdout: false, - PprofSwitch: false, - PprofTime: 0, - PidFile: "/var/run/dtle/dtle.pid", - Region: "global", - Datacenter: "dc1", - BindAddr: "0.0.0.0", + LogLevel: "INFO", + LogFile: "/var/log/dtle/dtle.log", + LogToStdout: false, + PprofSwitch: false, + PprofTime: 0, + PidFile: "/var/run/dtle/dtle.pid", + Region: "global", + Datacenter: "dc1", + BindAddr: "0.0.0.0", + JaegerAgentAddress: "", + JaegerAgentPort: "", Ports: &Ports{ HTTP: 8190, RPC: 8191, @@ -376,6 +383,12 @@ func (c *Config) Merge(b *Config) *Config { if b.LeaveOnTerm { result.LeaveOnTerm = true } + if b.JaegerAgentAddress != "" { + result.JaegerAgentAddress = b.JaegerAgentAddress + } + if b.JaegerAgentPort != "" { + result.JaegerAgentPort = b.JaegerAgentPort + } // Apply the metric config if result.Metric == nil && b.Metric != nil { diff --git a/agent/config_parse.go b/agent/config_parse.go index 30c1db64e..9d7f21248 100644 --- a/agent/config_parse.go +++ b/agent/config_parse.go @@ -104,6 +104,8 @@ func parseConfig(result *Config, list *ast.ObjectList) error { "consul", "http_api_response_headers", "dtle_schema_name", + "jaeger_agent_address", + "jaeger_agent_port", } if err := checkHCLKeys(list, valid); err != nil { return multierror.Prefix(err, "config:") diff --git a/cmd/dtle/main.go b/cmd/dtle/main.go index c10f0ba4f..a3190fe11 100644 --- a/cmd/dtle/main.go +++ b/cmd/dtle/main.go @@ -10,14 +10,12 @@ import ( "fmt" "io/ioutil" "log" - "os" - - "github.com/mitchellh/cli" - _ "net/http/pprof" + "os" "github.com/actiontech/dtle/agent" "github.com/actiontech/dtle/cmd/dtle/command" + "github.com/mitchellh/cli" ) // The git commit that was compiled. This will be filled in by the compiler. @@ -33,9 +31,6 @@ func main() { func realMain() int { log.SetOutput(ioutil.Discard) - - // Get the command line args. We shortcut "--version" and "-v" to - // just show the version. args := os.Args[1:] for _, arg := range args { if arg == "-v" || arg == "--version" { diff --git a/git b/git new file mode 100644 index 000000000..e69de29bb diff --git a/internal/client/driver/mysql/applier.go b/internal/client/driver/mysql/applier.go index cdfb2f050..4348b08dd 100644 --- a/internal/client/driver/mysql/applier.go +++ b/internal/client/driver/mysql/applier.go @@ -12,6 +12,8 @@ import ( "fmt" "github.com/actiontech/dtle/internal/g" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" //"math" "bytes" @@ -43,6 +45,7 @@ import ( "github.com/actiontech/dtle/internal/models" "github.com/actiontech/dtle/utils" + "github.com/not.go" "github.com/satori/go.uuid" ) @@ -241,10 +244,6 @@ type Applier struct { nDumpEntry int64 stubFullApplyDelay bool - - // TODO we might need to save these from DumpEntry for reconnecting. - //SystemVariablesStatement string - //SqlMode string } func NewApplier(subject, tp string, cfg *config.MySQLDriverConfig, logger *log.Logger) (*Applier, error) { @@ -291,7 +290,7 @@ func (a *Applier) MtsWorker(workerIndex int) { case tx := <-a.applyBinlogMtsTxQueue: a.logger.Debugf("mysql.applier: a binlogEntry MTS dequeue, worker: %v. GNO: %v", workerIndex, tx.Coordinates.GNO) - if err := a.ApplyBinlogEvent(workerIndex, tx); err != nil { + if err := a.ApplyBinlogEvent(nil, workerIndex, tx); err != nil { a.onError(TaskStateDead, err) // TODO coordinate with other goroutine keepLoop = false } else { @@ -495,7 +494,6 @@ func (a *Applier) initNatSubClient() (err error) { a.natsConn = sc return nil } - func DecodeDumpEntry(data []byte) (entry *DumpEntry, err error) { msg, err := snappy.Decode(nil, data) if err != nil { @@ -513,6 +511,7 @@ func DecodeDumpEntry(data []byte) (entry *DumpEntry, err error) { } return entry, nil } + // Decode func Decode(data []byte, vPtr interface{}) (err error) { msg, err := snappy.Decode(nil, data) @@ -590,13 +589,16 @@ func (a *Applier) heterogeneousReplay() { var err error stopSomeLoop := false prevDDL := false + var ctx context.Context for !stopSomeLoop { select { case binlogEntry := <-a.applyDataEntryQueue: if nil == binlogEntry { continue } - + spanContext := binlogEntry.SpanContext + span := opentracing.GlobalTracer().StartSpan("dest use binlogEntry ", opentracing.FollowsFrom(spanContext)) + ctx = opentracing.ContextWithSpan(ctx, span) a.logger.Debugf("mysql.applier: a binlogEntry. remaining: %v. gno: %v, lc: %v, seq: %v", len(a.applyDataEntryQueue), binlogEntry.Coordinates.GNO, binlogEntry.Coordinates.LastCommitted, binlogEntry.Coordinates.SeqenceNumber) @@ -605,7 +607,6 @@ func (a *Applier) heterogeneousReplay() { a.logger.Debugf("mysql.applier: skipping a dtle tx. osid: %v", binlogEntry.Coordinates.OSID) continue } - // region TestIfExecuted if a.gtidExecuted == nil { // udup crash recovery or never executed @@ -615,7 +616,6 @@ func (a *Applier) heterogeneousReplay() { return } } - txSid := binlogEntry.Coordinates.GetSid() gtidSetItem, hasSid := a.gtidExecuted[binlogEntry.Coordinates.SID] @@ -629,7 +629,6 @@ func (a *Applier) heterogeneousReplay() { continue } // endregion - // this must be after duplication check var rotated bool if a.currentCoordinates.File == binlogEntry.Coordinates.LogFile { @@ -656,7 +655,6 @@ func (a *Applier) heterogeneousReplay() { newInterval := append(gtidSetItem.Intervals, thisInterval).Normalize() // TODO this is assigned before real execution gtidSetItem.Intervals = newInterval - if binlogEntry.Coordinates.SeqenceNumber == 0 { // MySQL 5.6: non mts err := a.setTableItemForBinlogEntry(binlogEntry) @@ -664,7 +662,7 @@ func (a *Applier) heterogeneousReplay() { a.onError(TaskStateDead, err) return } - if err := a.ApplyBinlogEvent(0, binlogEntry); err != nil { + if err := a.ApplyBinlogEvent(ctx, 0, binlogEntry); err != nil { a.onError(TaskStateDead, err) return } @@ -680,13 +678,11 @@ func (a *Applier) heterogeneousReplay() { a.logger.Warnf("DTLE_BUG: len(a.mtsManager.m) should be 0") } } - // If there are TXs skipped by udup source-side for a.mtsManager.lastEnqueue+1 < binlogEntry.Coordinates.SeqenceNumber { a.mtsManager.lastEnqueue += 1 a.mtsManager.chExecuted <- a.mtsManager.lastEnqueue } - hasDDL := func() bool { for i := range binlogEntry.Events { dmlEvent := &binlogEntry.Events[i] @@ -698,7 +694,6 @@ func (a *Applier) heterogeneousReplay() { } return false }() - // DDL must be executed separatedly if hasDDL || prevDDL { a.logger.Debugf("mysql.applier: gno: %v MTS found DDL(%v,%v). WaitForAllCommitted", @@ -707,7 +702,6 @@ func (a *Applier) heterogeneousReplay() { return // shutdown } } - if hasDDL { prevDDL = true } else { @@ -717,15 +711,16 @@ func (a *Applier) heterogeneousReplay() { if !a.mtsManager.WaitForExecution(binlogEntry) { return // shutdown } - a.logger.Debugf("mysql.applier: a binlogEntry MTS enqueue. gno: %v", binlogEntry.Coordinates.GNO) err = a.setTableItemForBinlogEntry(binlogEntry) if err != nil { a.onError(TaskStateDead, err) return } + binlogEntry.SpanContext = span.Context() a.applyBinlogMtsTxQueue <- binlogEntry } + span.Finish() if !a.shutdown { // TODO what is this used for? a.mysqlContext.Gtid = fmt.Sprintf("%s:1-%d", txSid, binlogEntry.Coordinates.GNO) @@ -789,10 +784,20 @@ OUTER: func (a *Applier) initiateStreaming() error { a.mysqlContext.MarkRowCopyStartTime() a.logger.Debugf("mysql.applier: nats subscribe") + tracer := opentracing.GlobalTracer() _, err := a.natsConn.Subscribe(fmt.Sprintf("%s_full", a.subject), func(m *gonats.Msg) { a.logger.Debugf("mysql.applier: full. recv a msg. copyRowsQueue: %v", len(a.copyRowsQueue)) - - dumpData, err := DecodeDumpEntry(m.Data) + t := not.NewTraceMsg(m) + // Extract the span context from the request message. + sc, err := tracer.Extract(opentracing.Binary, t) + if err != nil { + a.logger.Debugf("applier:get data") + } + // Setup a span referring to the span context of the incoming NATS message. + replySpan := tracer.StartSpan("Service Responder", ext.SpanKindRPCServer, ext.RPCServerOption(sc)) + ext.MessageBusDestination.Set(replySpan, m.Subject) + defer replySpan.Finish() + dumpData, err := DecodeDumpEntry(t.Bytes()) if err != nil { a.onError(TaskStateDead, err) // TODO return? @@ -823,7 +828,17 @@ func (a *Applier) initiateStreaming() error { _, err = a.natsConn.Subscribe(fmt.Sprintf("%s_full_complete", a.subject), func(m *gonats.Msg) { dumpData := &dumpStatResult{} - if err := Decode(m.Data, dumpData); err != nil { + t := not.NewTraceMsg(m) + // Extract the span context from the request message. + sc, err := tracer.Extract(opentracing.Binary, t) + if err != nil { + a.logger.Debugf("applier:get data") + } + // Setup a span referring to the span context of the incoming NATS message. + replySpan := tracer.StartSpan("Service Responder", ext.SpanKindRPCServer, ext.RPCServerOption(sc)) + ext.MessageBusDestination.Set(replySpan, m.Subject) + defer replySpan.Finish() + if err := Decode(t.Bytes(), dumpData); err != nil { a.onError(TaskStateDead, err) } a.currentCoordinates.RetrievedGtidSet = dumpData.Gtid @@ -851,7 +866,17 @@ func (a *Applier) initiateStreaming() error { if a.mysqlContext.ApproveHeterogeneous { _, err := a.natsConn.Subscribe(fmt.Sprintf("%s_incr_hete", a.subject), func(m *gonats.Msg) { var binlogEntries binlog.BinlogEntries - if err := Decode(m.Data, &binlogEntries); err != nil { + t := not.NewTraceMsg(m) + // Extract the span context from the request message. + spanContext, err := tracer.Extract(opentracing.Binary, t) + if err != nil { + a.logger.Debugf("applier:get data") + } + // Setup a span referring to the span context of the incoming NATS message. + replySpan := tracer.StartSpan("nast : dest to get data ", ext.SpanKindRPCServer, ext.RPCServerOption(spanContext)) + ext.MessageBusDestination.Set(replySpan, m.Subject) + defer replySpan.Finish() + if err := Decode(t.Bytes(), &binlogEntries); err != nil { a.onError(TaskStateDead, err) } @@ -867,6 +892,7 @@ func (a *Applier) initiateStreaming() error { } else { a.logger.Debugf("applier. incr. applyDataEntryQueue enqueue") for _, binlogEntry := range binlogEntries.Entries { + binlogEntry.SpanContext = replySpan.Context() a.applyDataEntryQueue <- binlogEntry a.currentCoordinates.RetrievedGtidSet = binlogEntry.Coordinates.GetGtidForThisTx() atomic.AddInt64(&a.mysqlContext.DeltaEstimate, 1) @@ -895,7 +921,17 @@ func (a *Applier) initiateStreaming() error { } else { _, err := a.natsConn.Subscribe(fmt.Sprintf("%s_incr", a.subject), func(m *gonats.Msg) { var binlogTx []*binlog.BinlogTx - if err := Decode(m.Data, &binlogTx); err != nil { + t := not.NewTraceMsg(m) + // Extract the span context from the request message. + sc, err := tracer.Extract(opentracing.Binary, t) + if err != nil { + a.logger.Debugf("applier:get data") + } + // Setup a span referring to the span context of the incoming NATS message. + replySpan := tracer.StartSpan(a.subject, ext.SpanKindRPCServer, ext.RPCServerOption(sc)) + ext.MessageBusDestination.Set(replySpan, m.Subject) + defer replySpan.Finish() + if err := Decode(t.Bytes(), &binlogTx); err != nil { a.onError(TaskStateDead, err) } for _, tx := range binlogTx { @@ -1173,11 +1209,12 @@ func (a *Applier) getTableItem(schema string, table string) *applierTableItem { // buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog // event entry on the original table. -func (a *Applier) buildDMLEventQuery(dmlEvent binlog.DataEvent, workerIdx int) (query *gosql.Stmt, args []interface{}, rowsDelta int64, err error) { +func (a *Applier) buildDMLEventQuery(dmlEvent binlog.DataEvent, workerIdx int, spanContext opentracing.SpanContext) (query *gosql.Stmt, args []interface{}, rowsDelta int64, err error) { // Large piece of code deleted here. See git annotate. tableItem := dmlEvent.TableItem.(*applierTableItem) var tableColumns = tableItem.columns - + span := opentracing.GlobalTracer().StartSpan("desc buildDMLEventQuery ", opentracing.FollowsFrom(spanContext)) + defer span.Finish() doPrepareIfNil := func(stmts []*gosql.Stmt, query string) (*gosql.Stmt, error) { var err error if stmts[workerIdx] == nil { @@ -1237,12 +1274,26 @@ func (a *Applier) buildDMLEventQuery(dmlEvent binlog.DataEvent, workerIdx int) ( } // ApplyEventQueries applies multiple DML queries onto the dest table -func (a *Applier) ApplyBinlogEvent(workerIdx int, binlogEntry *binlog.BinlogEntry) error { +func (a *Applier) ApplyBinlogEvent(ctx context.Context, workerIdx int, binlogEntry *binlog.BinlogEntry) error { dbApplier := a.dbs[workerIdx] var totalDelta int64 var err error + var spanContext opentracing.SpanContext + var span opentracing.Span + if ctx != nil { + spanContext = opentracing.SpanFromContext(ctx).Context() + span = opentracing.GlobalTracer().StartSpan(" desc single binlogEvent transform to sql ", opentracing.ChildOf(spanContext)) + span.SetTag("start insert sql ", time.Now().UnixNano()/1e6) + defer span.Finish() + } else { + spanContext = binlogEntry.SpanContext + span = opentracing.GlobalTracer().StartSpan("desc mts binlogEvent transform to sql ", opentracing.ChildOf(spanContext)) + span.SetTag("start insert sql ", time.Now().UnixNano()/1e6) + defer span.Finish() + spanContext = span.Context() + } txSid := binlogEntry.Coordinates.GetSid() dbApplier.DbMutex.Lock() @@ -1251,6 +1302,7 @@ func (a *Applier) ApplyBinlogEvent(workerIdx int, binlogEntry *binlog.BinlogEntr return err } defer func() { + span.SetTag("begin commit sql ", time.Now().UnixNano()/1e6) if err := tx.Commit(); err != nil { a.onError(TaskStateDead, err) } else { @@ -1259,10 +1311,11 @@ func (a *Applier) ApplyBinlogEvent(workerIdx int, binlogEntry *binlog.BinlogEntr if a.printTps { atomic.AddUint32(&a.txLastNSeconds, 1) } + span.SetTag("after commit sql ", time.Now().UnixNano()/1e6) dbApplier.DbMutex.Unlock() }() - + span.SetTag("begin transform binlogEvent to sql time ", time.Now().UnixNano()/1e6) for i, event := range binlogEntry.Events { a.logger.Debugf("mysql.applier: ApplyBinlogEvent. gno: %v, event: %v", binlogEntry.Coordinates.GNO, i) @@ -1319,7 +1372,7 @@ func (a *Applier) ApplyBinlogEvent(workerIdx int, binlogEntry *binlog.BinlogEntr a.logger.Debugf("mysql.applier: Exec [%s]", event.Query) default: a.logger.Debugf("mysql.applier: ApplyBinlogEvent: a dml event") - stmt, args, rowDelta, err := a.buildDMLEventQuery(event, workerIdx) + stmt, args, rowDelta, err := a.buildDMLEventQuery(event, workerIdx, spanContext) if err != nil { a.logger.Errorf("mysql.applier: Build dml query error: %v", err) return err @@ -1342,7 +1395,7 @@ func (a *Applier) ApplyBinlogEvent(workerIdx int, binlogEntry *binlog.BinlogEntr totalDelta += rowDelta } } - + span.SetTag("after transform binlogEvent to sql ", time.Now().UnixNano()/1e6) a.logger.Debugf("ApplyBinlogEvent. insert gno: %v", binlogEntry.Coordinates.GNO) _, err = dbApplier.PsInsertExecutedGtid.Exec(binlogEntry.Coordinates.SID.Bytes(), binlogEntry.Coordinates.GNO) if err != nil { diff --git a/internal/client/driver/mysql/binlog/binlog_entry.go b/internal/client/driver/mysql/binlog/binlog_entry.go index c647ac137..8cb4f2186 100644 --- a/internal/client/driver/mysql/binlog/binlog_entry.go +++ b/internal/client/driver/mysql/binlog/binlog_entry.go @@ -10,6 +10,7 @@ import ( "fmt" "github.com/actiontech/dtle/internal/client/driver/mysql/base" + opentracing "github.com/opentracing/opentracing-go" ) type BinlogEntries struct { @@ -20,9 +21,9 @@ type BinlogEntries struct { type BinlogEntry struct { hasBeginQuery bool Coordinates base.BinlogCoordinateTx - - Events []DataEvent - OriginalSize int // size of binlog entry + SpanContext opentracing.SpanContext + Events []DataEvent + OriginalSize int // size of binlog entry } // NewBinlogEntry creates an empty, ready to go BinlogEntry object diff --git a/internal/client/driver/mysql/binlog/binlog_reader.go b/internal/client/driver/mysql/binlog/binlog_reader.go index 055f0bab9..5dabdb187 100644 --- a/internal/client/driver/mysql/binlog/binlog_reader.go +++ b/internal/client/driver/mysql/binlog/binlog_reader.go @@ -12,7 +12,6 @@ import ( "time" "github.com/actiontech/dtle/internal/g" - //"encoding/hex" "fmt" "regexp" @@ -24,10 +23,10 @@ import ( "github.com/issuj/gofaster/base64" "github.com/pingcap/parser" - ast "github.com/pingcap/parser/ast" + "github.com/pingcap/parser/ast" _ "github.com/pingcap/tidb/types/parser_driver" - uuid "github.com/satori/go.uuid" + "github.com/satori/go.uuid" gomysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "golang.org/x/net/context" @@ -42,6 +41,7 @@ import ( log "github.com/actiontech/dtle/internal/logger" "github.com/actiontech/dtle/internal/models" "github.com/actiontech/dtle/utils" + "github.com/opentracing/opentracing-go" ) // BinlogReader is a general interface whose implementations can choose their methods of reading @@ -309,6 +309,11 @@ type parseDDLResult struct { // StreamEvents func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel chan<- *BinlogEntry) error { + spanContext := ev.SpanContest + trace := opentracing.GlobalTracer() + span := trace.StartSpan("incremental binlogEvent translation to sql", opentracing.ChildOf(spanContext)) + span.SetTag("begin to translation", time.Now().Unix()) + defer span.Finish() if b.currentCoordinates.SmallerThanOrEquals(&b.LastAppliedRowsEventHint) { b.logger.Debugf("mysql.reader: Skipping handled query at %+v", b.currentCoordinates) return nil @@ -378,6 +383,7 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c NotDML, ) b.currentBinlogEntry.Events = append(b.currentBinlogEntry.Events, event) + b.currentBinlogEntry.SpanContext = span.Context() entriesChannel <- b.currentBinlogEntry b.LastAppliedRowsEventHint = b.currentCoordinates return nil @@ -520,11 +526,13 @@ func (b *BinlogReader) handleEvent(ev *replication.BinlogEvent, entriesChannel c b.currentBinlogEntry.Events = append(b.currentBinlogEntry.Events, event) } } + b.currentBinlogEntry.SpanContext = span.Context() entriesChannel <- b.currentBinlogEntry b.LastAppliedRowsEventHint = b.currentCoordinates } } case replication.XID_EVENT: + b.currentBinlogEntry.SpanContext = span.Context() entriesChannel <- b.currentBinlogEntry b.LastAppliedRowsEventHint = b.currentCoordinates default: @@ -706,11 +714,17 @@ func (b *BinlogReader) DataStreamEvents(entriesChannel chan<- *BinlogEntry) erro break } + trace := opentracing.GlobalTracer() ev, err := b.binlogStreamer.GetEvent(context.Background()) if err != nil { b.logger.Errorf("mysql.reader error GetEvent. err: %v", err) return err } + spanContext := ev.SpanContest + span := trace.StartSpan("DataStreamEvents() get binlogEvent from mysql-go ", opentracing.FollowsFrom(spanContext)) + span.SetTag("time", time.Now().Unix()) + ev.SpanContest = span.Context() + if ev.Header.EventType == replication.HEARTBEAT_EVENT { continue } @@ -739,6 +753,7 @@ func (b *BinlogReader) DataStreamEvents(entriesChannel chan<- *BinlogEntry) erro return err } } + span.Finish() } return nil diff --git a/internal/client/driver/mysql/extractor.go b/internal/client/driver/mysql/extractor.go index ebc88f8f2..00cbbe70d 100644 --- a/internal/client/driver/mysql/extractor.go +++ b/internal/client/driver/mysql/extractor.go @@ -12,6 +12,8 @@ import ( "fmt" "github.com/actiontech/dtle/internal/g" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" "github.com/pkg/errors" //"math" @@ -26,6 +28,7 @@ import ( "github.com/golang/snappy" gonats "github.com/nats-io/go-nats" + "github.com/not.go" gomysql "github.com/siddontang/go-mysql/mysql" "os" @@ -34,6 +37,8 @@ import ( "net" + "context" + "github.com/actiontech/dtle/internal/client/driver/mysql/base" "github.com/actiontech/dtle/internal/client/driver/mysql/binlog" "github.com/actiontech/dtle/internal/client/driver/mysql/sql" @@ -58,15 +63,15 @@ const ( // Extractor is the main schema extract flow manager. type Extractor struct { - logger *log.Entry - subject string - tp string - maxPayload int - mysqlContext *config.MySQLDriverConfig + logger *log.Entry + subject string + tp string + maxPayload int + mysqlContext *config.MySQLDriverConfig mysqlVersionDigit int - db *gosql.DB - singletonDB *gosql.DB - dumpers []*dumper + db *gosql.DB + singletonDB *gosql.DB + dumpers []*dumper // db.tb exists when creating the job, for full-copy. // vs e.mysqlContext.ReplicateDoDb: all user assigned db.tb replicateDoDb []*config.DataSource @@ -225,6 +230,10 @@ func (e *Extractor) Run() { } if fullCopy { + var ctx context.Context + span := opentracing.GlobalTracer().StartSpan("span_full_complete") + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) e.mysqlContext.MarkRowCopyStartTime() if err := e.mysqlDump(); err != nil { e.onError(TaskStateDead, err) @@ -234,7 +243,7 @@ func (e *Extractor) Run() { if err != nil { e.onError(TaskStateDead, err) } - if err := e.publish(fmt.Sprintf("%s_full_complete", e.subject), "", dumpMsg); err != nil { + if err := e.publish(ctx, fmt.Sprintf("%s_full_complete", e.subject), "", dumpMsg); err != nil { e.onError(TaskStateDead, err) } } else { @@ -794,13 +803,14 @@ func Encode(v interface{}) ([]byte, error) { // StreamEvents will begin streaming events. It will be blocking, so should be // executed by a goroutine func (e *Extractor) StreamEvents() error { + var ctx context.Context + //tracer := opentracing.GlobalTracer() + if e.mysqlContext.ApproveHeterogeneous { go func() { defer e.logger.Debugf("extractor. StreamEvents goroutine exited") - entries := binlog.BinlogEntries{} entriesSize := 0 - sendEntries := func() error { var gno int64 = 0 if len(entries.Entries) > 0 { @@ -812,7 +822,7 @@ func (e *Extractor) StreamEvents() error { return err } e.logger.Debugf("mysql.extractor: sending gno: %v, n: %v", gno, len(entries.Entries)) - if err = e.publish(fmt.Sprintf("%s_incr_hete", e.subject), "", txMsg); err != nil { + if err = e.publish(ctx, fmt.Sprintf("%s_incr_hete", e.subject), "", txMsg); err != nil { return err } e.logger.Debugf("mysql.extractor: send acked gno: %v, n: %v", gno, len(entries.Entries)) @@ -835,6 +845,12 @@ func (e *Extractor) StreamEvents() error { var addrs []net.Addr select { case binlogEntry := <-e.dataChannel: + spanContext := binlogEntry.SpanContext + span := opentracing.GlobalTracer().StartSpan("nat send :begin send binlogEntry from src dtle to desc dtle", opentracing.ChildOf(spanContext)) + span.SetTag("time", time.Now().Unix()) + ctx = opentracing.ContextWithSpan(ctx, span) + //span.SetTag("timetag", time.Now().Unix()) + binlogEntry.SpanContext = nil entries.Entries = append(entries.Entries, binlogEntry) entriesSize += binlogEntry.OriginalSize if int64(len(entries.Entries)) <= 1 { @@ -858,17 +874,18 @@ func (e *Extractor) StreamEvents() error { e.logger.Debugf("mysql.extractor: err is : %v", err != nil) if entriesSize >= e.mysqlContext.GroupMaxSize || int64(len(entries.Entries)) == e.mysqlContext.ReplChanBufferSize { - e.logger.Debugf("extractor. incr. send by GroupLimit. entriesSize: %v", entriesSize) + e.logger.Debugf("extractor. incr. send by GroupLimit. entriesSize: %v , groupMaxSize: %v,Entries.len: %v", entriesSize, e.mysqlContext.GroupMaxSize, len(entries.Entries)) err = sendEntries() if !timer.Stop() { <-timer.C } timer.Reset(groupTimeoutDuration) } + span.Finish() case <-timer.C: nEntries := len(entries.Entries) if nEntries > 0 { - e.logger.Debugf("extractor. incr. send by timeout. entriesSize: %v", entriesSize) + e.logger.Debugf("extractor. incr. send by timeout. entriesSize: %v,timeout time: %v", entriesSize, e.mysqlContext.GroupTimeout) err = sendEntries() } timer.Reset(groupTimeoutDuration) @@ -978,7 +995,7 @@ func (e *Extractor) StreamEvents() error { if len(txMsg) > e.maxPayload { e.onError(TaskStateDead, gonats.ErrMaxPayload) } - if err = e.publish(subject, fmt.Sprintf("%s:1-%d", binlogTx.SID, binlogTx.GNO), txMsg); err != nil { + if err = e.publish(ctx, subject, fmt.Sprintf("%s:1-%d", binlogTx.SID, binlogTx.GNO), txMsg); err != nil { e.onError(TaskStateDead, err) break L } @@ -999,7 +1016,7 @@ func (e *Extractor) StreamEvents() error { if len(txMsg) > e.maxPayload { e.onError(TaskStateDead, gonats.ErrMaxPayload) } - if err = e.publish(subject, + if err = e.publish(ctx, subject, fmt.Sprintf("%s:1-%d", txArray[len(txArray)-1].SID, txArray[len(txArray)-1].GNO), @@ -1033,10 +1050,32 @@ func (e *Extractor) StreamEvents() error { // retryOperation attempts up to `count` attempts at running given function, // exiting as soon as it returns with non-error. -func (e *Extractor) publish(subject, gtid string, txMsg []byte) (err error) { +func (e *Extractor) publish(ctx context.Context, subject, gtid string, txMsg []byte) (err error) { + tracer := opentracing.GlobalTracer() + var t not.TraceMsg + var spanctx opentracing.SpanContext + if ctx != nil { + spanctx = opentracing.SpanFromContext(ctx).Context() + } else { + parent := tracer.StartSpan("no parent ", ext.SpanKindProducer) + defer parent.Finish() + spanctx = parent.Context() + } + + span := tracer.StartSpan("nat: src publish() to send data ", ext.SpanKindProducer, opentracing.ChildOf(spanctx)) + + ext.MessageBusDestination.Set(span, subject) + + // Inject span context into our traceMsg. + if err := tracer.Inject(span.Context(), opentracing.Binary, &t); err != nil { + e.logger.Debugf("mysql.extractor: start tracer fail, got %v", err) + } + // Add the payload. + t.Write(txMsg) + defer span.Finish() for { e.logger.Debugf("mysql.extractor: publish. gtid: %v, msg_len: %v", gtid, len(txMsg)) - _, err = e.natsConn.Request(subject, txMsg, DefaultConnectWait) + _, err = e.natsConn.Request(subject, t.Bytes(), DefaultConnectWait) if err == nil { if gtid != "" { e.mysqlContext.Gtid = gtid @@ -1390,13 +1429,17 @@ func (e *Extractor) mysqlDump() error { return nil } func (e *Extractor) encodeDumpEntry(entry *DumpEntry) error { + var ctx context.Context + //tracer := opentracing.GlobalTracer() + span := opentracing.GlobalTracer().StartSpan("span_full") + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) bs, err := entry.Marshal(nil) if err != nil { return err } txMsg := snappy.Encode(nil, bs) - - if err := e.publish(fmt.Sprintf("%s_full", e.subject), "", txMsg); err != nil { + if err := e.publish(ctx, fmt.Sprintf("%s_full", e.subject), "", txMsg); err != nil { return err } e.mysqlContext.Stage = models.StageSendingData diff --git a/vendor/github.com/not.go/.gitignore b/vendor/github.com/not.go/.gitignore new file mode 100644 index 000000000..f1c181ec9 --- /dev/null +++ b/vendor/github.com/not.go/.gitignore @@ -0,0 +1,12 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out diff --git a/vendor/github.com/not.go/CODE-OF-CONDUCT.md b/vendor/github.com/not.go/CODE-OF-CONDUCT.md new file mode 100644 index 000000000..0dfc5eb7f --- /dev/null +++ b/vendor/github.com/not.go/CODE-OF-CONDUCT.md @@ -0,0 +1,3 @@ +# Community Code of Conduct + +NATS follows the [CNCF Code of Conduct](https://github.com/cncf/foundation/blob/master/code-of-conduct.md). diff --git a/vendor/github.com/not.go/GOVERNANCE.md b/vendor/github.com/not.go/GOVERNANCE.md new file mode 100644 index 000000000..609f2ef94 --- /dev/null +++ b/vendor/github.com/not.go/GOVERNANCE.md @@ -0,0 +1,3 @@ +# NATS OpenTracing Governance + +This repository is part of the NATS project and is subject to the [NATS Governance](https://github.com/nats-io/nats-general/blob/master/GOVERNANCE.md). diff --git a/vendor/github.com/not.go/LICENSE b/vendor/github.com/not.go/LICENSE new file mode 100644 index 000000000..261eeb9e9 --- /dev/null +++ b/vendor/github.com/not.go/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/not.go/MAINTAINERS.md b/vendor/github.com/not.go/MAINTAINERS.md new file mode 100644 index 000000000..687408558 --- /dev/null +++ b/vendor/github.com/not.go/MAINTAINERS.md @@ -0,0 +1,10 @@ +# Maintainers + +Maintainership is on a per project basis. + +### Core-maintainers + - Derek Collison [@derekcollison](https://github.com/derekcollison) + - Colin Sullivan [@ColinSullivan1](https://github.com/ColinSullivan1) + +### Maintainers + - Waldemar Quevedo [@wallyqs](https://github.com/wallyqs) diff --git a/vendor/github.com/not.go/README.md b/vendor/github.com/not.go/README.md new file mode 100644 index 000000000..2a2267c96 --- /dev/null +++ b/vendor/github.com/not.go/README.md @@ -0,0 +1,215 @@ +# OpenTracing with NATS + +Over the years, we've had periodic requests to support distributed tracing in +[NATS](https://nats.io). While distributed tracing is valuable, +philosophically we did not want to add external dependencies to NATS, +internally or via API. Nor did we want to provide guidance that would make +developers work in ways that didn't feel natural or aligned with the tenets +of NATS. We left it to the application developers using NATS. + +[OpenTracing](https://opentracing.io) changes this and offers a way to +implement distributed tracing with NATS that aligns with our goals and +philosophy of simplicity and ease of use and does not require adding +dependencies into NATS. This repository provides a reference to +facilitate the use of OpenTracing with NATS enabled applications. + +## What is OpenTracing + +OpenTracing provides a non-intrusive vendor-neutral API and instrumentation +for distributed tracing, with wide language support. Because each use case +is slightly different, we've decided not to provide specific implementations +at this point. Instead we are providing a reference architecture with +examples demonstrating easy usage of OpenTracing with NATS. In line with +other NATS projects, these canonical examples are provided in +[Go](https://golang.org), but this approach should port smoothly into many +other languages. More language examples will be added soon. + +## How it works + +OpenTracing is actually fairly simple to implement in an applicaton. +A "Trace" is defined, and then sets up "spans" to represent an operation +or event and log information, which is reported to the OpenTracing aggregator +for reporting and display. + +To propogate, span contexts are serialized into a NATS message using the +binary format. We provide a `not.TraceMsg` which will do what is needed to +[inject](https://opentracing.io/docs/overview/inject-extract/) span contexts +into messages and to extract them on the other side. + +Here's how to send a span context over NATS. + +```go +// A NATS OpenTracing Message. +var t not.TraceMsg + +// Setup a span for the operation to publish a message. +pubSpan := tracer.StartSpan("Published Message", ext.SpanKindProducer) +ext.MessageBusDestination.Set(pubSpan, subj) +defer pubSpan.Finish() + +// Inject span context into our traceMsg. +if err := tracer.Inject(pubSpan.Context(), opentracing.Binary, &t); err != nil { + log.Fatalf("%v for Inject.", err) +} + +// Add the payload. +t.Write(msg) + +// Send the message over NATS. +nc.Publish(subj, t.Bytes()) +``` + +Note that the payload is added after injection. This order is a requirement, but +simplifies the API and feels natural. + +Retrieving a span from an inbound message and associating with a new response +span is straightforward as well. + +```go +// Create new TraceMsg from the NATS message. +t := not.NewTraceMsg(msg) + +// Extract the span context from the request message. +sc, err := tracer.Extract(opentracing.Binary, t) +if err != nil { + log.Printf("Extract error: %v", err) +} + +// Setup a span referring to the span context of the incoming NATS message. +replySpan := tracer.StartSpan("Service Responder", ext.SpanKindRPCServer, ext.RPCServerOption(sc)) +ext.MessageBusDestination.Set(replySpan, msg.Subject) +defer replySpan.Finish() + +nc.Publish(msg.Reply, reply) +replySpan.LogEvent(fmt.Sprintf("Response msg: %s", reply)) + +``` + +Check out the [examples](./examples) for additional usage. + +## Setting up an OpenTracing Tracer + +To run the the examples, we setup [Jaeger](https://www.jaegertracing.io/) +as the OpenTracing tracer with its convenient "all-in-one" docker image. +Jaeger is a CNCF open source, end-to-end distributed tracing project. + +```bash +docker run -d --name jaeger \ + -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \ + -p 5775:5775/udp \ + -p 6831:6831/udp \ + -p 6832:6832/udp \ + -p 5778:5778 \ + -p 16686:16686 \ + -p 14268:14268 \ + -p 9411:9411 \ + jaegertracing/all-in-one:1.9 +``` + +See Jaeger's [getting started](https://www.jaegertracing.io/docs/1.9/getting-started/) +documentation for more information. + +## Request/Reply Examples + +* [Requestor Example](./examples/request/main.go) +* [Replier Example](./examples/reply/main.go) + +Open two terminals, in one terminal go to the reply example directory +and run: + +```bash +./reply foo "here's some help" +``` + +In the other terminal, go to the request example direcory and run: + +```bash +./request foo help +``` + +### Request Output + +```text +Initializing logging reporter +Published [foo] : 'help' +Received [_INBOX.4fkjE3Kld4s26FoJxnfxNy.dyOD78y3] : 'here's some help' +Reporting span 3007527f6bf0a38e:3007527f6bf0a38e:0:1 +``` + +### Reply Output + +```text +Initializing logging reporter +Listening on [foo] +Received request: help +Reporting span 3007527f6bf0a38e:66628b457927103d:3007527f6bf0a38e:1 +``` + +### Viewing the Request/Reply output in the Jaeger UI + +Navigate with a browser to http://localhost:16686. Find the _NATS Requestor_ +service in the services list and click the _Find Traces_ button. Click on +the _NATS Requestor_ service and you will see a screen similar to the following: + +![Jaeger UI Request Reply](./images/RequestReply.jpg) + +You can see the entire span of the request and the associated replier span. + +## Publish/Subscribe Examples + +* [Publisher Example](./examples/publish/main.go) +* [Subscriber Example](./examples/subscribe/main.go) + +Open three terminals, in the first two terminals go to the subscribe example +directory and run: + +```bash +go build +./subscribe foo +``` + +and in the second terminal: + +```bash +./subscribe foo +``` + +And finally in the third terminal go to the publish example directory: + +```bash +go build +./publish foo hello +``` + +Navigate with a browser to http://localhost:16686. Find the _NATS Publisher_ +service in the services list and click the _Find Traces_ button. Click on the +_NATS Publisher_ service and you will see a screen to the following: + +![Jaeger UI Publish Subscribe](./images/PubSub.jpg) + +You can see the publish span and the two associated subscriber spans. The gap +the middle includes the NATS client library publishing the message, the NATS server +routing and fanning out the message, and the subscriber NATS clients receiving the +messages and passing them to application code where the subscriber span is reported. + +### Subscriber Output + +```text +Initializing logging reporter +Listening on [foo] +Received msg: "hello" +Reporting span 2b78c114fc32bcad:132b0c35588f3c16:2b78c114fc32bcad:1 +``` + +### Publisher Output + +```text +Initializing logging reporter +Published [foo] : 'hello' +Reporting span 2b78c114fc32bcad:2b78c114fc32bcad:0:1 +``` + +## Our sponsor for this project + +Many thanks to [MasterCard](http://mastercard.com) for sponsoring this project. +We appreciate MasterCard's support of NATS, CNCF, and the OSS community. \ No newline at end of file diff --git a/vendor/github.com/not.go/examples/publish/main.go b/vendor/github.com/not.go/examples/publish/main.go new file mode 100644 index 000000000..2d22154b6 --- /dev/null +++ b/vendor/github.com/not.go/examples/publish/main.go @@ -0,0 +1,88 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "log" + + "github.com/nats-io/go-nats" + "github.com/nats-io/not.go" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" +) + +func usage() { + log.Fatalf("Usage: publish [-s server] [-creds file] ") +} + +func main() { + var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)") + var userCreds = flag.String("creds", "", "User Credentials File") + + log.SetFlags(0) + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if len(args) != 2 { + usage() + } + + tracer, closer := not.InitTracing("NATS Publisher") + opentracing.SetGlobalTracer(tracer) + defer closer.Close() + + // Connect Options. + opts := []nats.Option{nats.Name("NATS Sample Tracing Publisher")} + + // Use UserCredentials. + if *userCreds != "" { + opts = append(opts, nats.UserCredentials(*userCreds)) + } + + // Connect to NATS. + nc, err := nats.Connect(*urls, opts...) + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + subj, msg := args[0], []byte(args[1]) + + // A NATS OpenTracing Message. + var t not.TraceMsg + + // Setup a span for the operation to publish a message. + pubSpan := tracer.StartSpan("Published Message", ext.SpanKindProducer) + ext.MessageBusDestination.Set(pubSpan, subj) + defer pubSpan.Finish() + + // Inject span context into our traceMsg. + if err := tracer.Inject(pubSpan.Context(), opentracing.Binary, &t); err != nil { + log.Fatalf("%v for Inject.", err) + } + + // Add the payload. + t.Write(msg) + + // Send the message over NATS. + nc.Publish(subj, t.Bytes()) + + if err := nc.LastError(); err != nil { + log.Fatal(err) + } else { + log.Printf("Published [%s] : '%s'\n", subj, msg) + } +} diff --git a/vendor/github.com/not.go/examples/reply/main.go b/vendor/github.com/not.go/examples/reply/main.go new file mode 100644 index 000000000..9dd28d9ef --- /dev/null +++ b/vendor/github.com/not.go/examples/reply/main.go @@ -0,0 +1,106 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "fmt" + "log" + "sync" + + "github.com/nats-io/go-nats" + "github.com/nats-io/not.go" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" +) + +func usage() { + log.Fatalf("Usage: reply [-s server] [-creds file] [-t] ") +} + +func main() { + var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)") + var userCreds = flag.String("creds", "", "User Credentials File") + var showTime = flag.Bool("t", false, "Display timestamps") + var numMsgs = flag.Int("n", 1, "Exit after N msgs processed.") + + log.SetFlags(0) + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if len(args) < 2 { + usage() + } + + tracer, closer := not.InitTracing("NATS Responder") + opentracing.SetGlobalTracer(tracer) + defer closer.Close() + + // Connect Options. + opts := []nats.Option{nats.Name("NATS Sample Responder")} + opts = not.SetupConnOptions(tracer, opts) + + // Use UserCredentials + if *userCreds != "" { + opts = append(opts, nats.UserCredentials(*userCreds)) + } + // Connect to NATS + nc, err := nats.Connect(*urls, opts...) + if err != nil { + log.Fatal(err) + } + + wg := sync.WaitGroup{} + wg.Add(*numMsgs) + + subj, reply := args[0], []byte(args[1]) + + nc.Subscribe(subj, func(msg *nats.Msg) { + defer wg.Done() + + // Create new TraceMsg from the NATS message. + t := not.NewTraceMsg(msg) + + // Extract the span context from the request message. + sc, err := tracer.Extract(opentracing.Binary, t) + if err != nil { + log.Printf("Extract error: %v", err) + } + + // Setup a span referring to the span context of the incoming NATS message. + replySpan := tracer.StartSpan("Service Responder", ext.SpanKindRPCServer, ext.RPCServerOption(sc)) + ext.MessageBusDestination.Set(replySpan, msg.Subject) + defer replySpan.Finish() + + log.Printf("Received request: %s", t) + + if err := nc.Publish(msg.Reply, reply); err != nil { + replySpan.LogEvent(fmt.Sprintf("error: %v", err)) + } + + replySpan.LogEvent(fmt.Sprintf("Response msg: %s", reply)) + }) + + if err := nc.LastError(); err != nil { + log.Fatal(err) + } + + log.Printf("Listening on [%s]", subj) + if *showTime { + log.SetFlags(log.LstdFlags) + } + + wg.Wait() +} diff --git a/vendor/github.com/not.go/examples/request/main.go b/vendor/github.com/not.go/examples/request/main.go new file mode 100644 index 000000000..b6f799a39 --- /dev/null +++ b/vendor/github.com/not.go/examples/request/main.go @@ -0,0 +1,97 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "log" + "time" + + "github.com/nats-io/go-nats" + "github.com/nats-io/not.go" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" +) + +func usage() { + log.Fatalf("Usage: request [-s server] [-creds file] ") +} + +func main() { + var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)") + var userCreds = flag.String("creds", "", "User Credentials File") + + log.SetFlags(0) + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if len(args) < 2 { + usage() + } + + // Connect Options. + opts := []nats.Option{nats.Name("NATS Sample Requestor")} + + // Use UserCredentials + if *userCreds != "" { + opts = append(opts, nats.UserCredentials(*userCreds)) + } + + tracer, closer := not.InitTracing("NATS Requestor") + opentracing.SetGlobalTracer(tracer) + defer closer.Close() + + // Connect to NATS + nc, err := nats.Connect(*urls, opts...) + if err != nil { + log.Fatal(err) + } + defer nc.Close() + subj, payload := args[0], []byte(args[1]) + + // Setup our request span + reqSpan := tracer.StartSpan("Service Request", ext.SpanKindRPCClient) + ext.MessageBusDestination.Set(reqSpan, subj) + defer reqSpan.Finish() + + // Log our request + reqSpan.LogEvent("Starting request.") + + // A NATS OpenTracing Message. + var t not.TraceMsg + + // Inject the span context into the TraceMsg. + if err := tracer.Inject(reqSpan.Context(), opentracing.Binary, &t); err != nil { + log.Printf("%v for Inject.", err) + } + + // Add the payload. + t.Write(payload) + + // Make the request. + msg, err := nc.Request(subj, t.Bytes(), time.Second) + if err != nil { + if nc.LastError() != nil { + log.Fatalf("%v for request", nc.LastError()) + } + log.Fatalf("%v for request", err) + } else { + log.Printf("Published [%s] : '%s'", subj, payload) + log.Printf("Received [%v] : '%s'", msg.Subject, string(msg.Data)) + } + + // Log that we've completed the request. + reqSpan.LogEvent("Request Complete") +} diff --git a/vendor/github.com/not.go/examples/subscribe/main.go b/vendor/github.com/not.go/examples/subscribe/main.go new file mode 100644 index 000000000..484bb7d4a --- /dev/null +++ b/vendor/github.com/not.go/examples/subscribe/main.go @@ -0,0 +1,107 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "fmt" + "log" + "sync" + + "github.com/nats-io/go-nats" + "github.com/nats-io/not.go" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" +) + +func usage() { + log.Fatalf("Usage: subscribe [-s server] [-creds file] [-t] [-n msgs] ") +} + +func printMsg(m *nats.Msg, i int) { + log.Printf("[#%d] Received on [%s]: '%s'", i, m.Subject, string(m.Data)) +} + +func main() { + var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)") + var userCreds = flag.String("creds", "", "User Credentials File") + var showTime = flag.Bool("t", false, "Display timestamps") + var numMsgs = flag.Int("n", 1, "Exit after N msgs received.") + + log.SetFlags(0) + flag.Usage = usage + flag.Parse() + + args := flag.Args() + if len(args) != 1 { + usage() + } + + tracer, closer := not.InitTracing("NATS Subscriber") + opentracing.SetGlobalTracer(tracer) + defer closer.Close() + + // Connect Options. + opts := []nats.Option{nats.Name("NATS Sample Tracing Subscriber")} + opts = not.SetupConnOptions(tracer, opts) + + // Use UserCredentials. + if *userCreds != "" { + opts = append(opts, nats.UserCredentials(*userCreds)) + } + + // Connect to NATS. + nc, err := nats.Connect(*urls, opts...) + if err != nil { + log.Fatal(err) + } + + // Process N messages then exit. + wg := sync.WaitGroup{} + wg.Add(*numMsgs) + + subj := args[0] + + nc.Subscribe(subj, func(msg *nats.Msg) { + defer wg.Done() + + // Create new TraceMsg from normal NATS message. + t := not.NewTraceMsg(msg) + + // Extract the span context. + sc, err := tracer.Extract(opentracing.Binary, t) + if err != nil { + log.Printf("Extract error: %v", err) + } + + // Setup a span referring to the span context of the incoming NATS message. + span := tracer.StartSpan("Received Message", ext.SpanKindConsumer, opentracing.FollowsFrom(sc)) + ext.MessageBusDestination.Set(span, msg.Subject) + defer span.Finish() + + // The rest of t that has not been read is the payload. + fmt.Printf("Received msg: %q\n", t) + }) + + if err := nc.LastError(); err != nil { + log.Fatal(err) + } + + log.Printf("Listening on [%s]", subj) + if *showTime { + log.SetFlags(log.LstdFlags) + } + + wg.Wait() +} diff --git a/vendor/github.com/not.go/images/PubSub.jpg b/vendor/github.com/not.go/images/PubSub.jpg new file mode 100644 index 000000000..5a54e15f1 Binary files /dev/null and b/vendor/github.com/not.go/images/PubSub.jpg differ diff --git a/vendor/github.com/not.go/images/RequestReply.jpg b/vendor/github.com/not.go/images/RequestReply.jpg new file mode 100644 index 000000000..528838ca4 Binary files /dev/null and b/vendor/github.com/not.go/images/RequestReply.jpg differ diff --git a/vendor/github.com/not.go/not.go b/vendor/github.com/not.go/not.go new file mode 100644 index 000000000..9d25786c9 --- /dev/null +++ b/vendor/github.com/not.go/not.go @@ -0,0 +1,107 @@ +// Copyright 2019 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package not + +import ( + "bytes" + "fmt" + "io" + "log" + "time" + + "github.com/nats-io/go-nats" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-lib/metrics" + + jaeger "github.com/uber/jaeger-client-go" + jaegercfg "github.com/uber/jaeger-client-go/config" + jaegerlog "github.com/uber/jaeger-client-go/log" +) + +// TraceMsg will be used as an io.Writer and io.Reader for the span's context and +// the payload. The span will have to be written first and read first. +type TraceMsg struct { + bytes.Buffer +} + +// NewTraceMsg creates a trace msg from a NATS message's data payload. +func NewTraceMsg(m *nats.Msg) *TraceMsg { + b := bytes.NewBuffer(m.Data) + return &TraceMsg{*b} +} + +// InitTracing handles the common tracing setup functionality, and keeps +// implementation specific (Jaeger) configuration here. +func InitTracing(serviceName string) (opentracing.Tracer, io.Closer) { + // Sample configuration for testing. Use constant sampling to sample every trace + // and enable LogSpan to log every span via configured Logger. + cfg := jaegercfg.Configuration{ + ServiceName: serviceName, + Sampler: &jaegercfg.SamplerConfig{ + Type: jaeger.SamplerTypeConst, + Param: 1, + }, + Reporter: &jaegercfg.ReporterConfig{ + LogSpans: true, + }, + } + + // Example logger and metrics factory. Use github.com/uber/jaeger-client-go/log + // and github.com/uber/jaeger-lib/metrics respectively to bind to real logging and metrics + // frameworks. + jLogger := jaegerlog.StdLogger + jMetricsFactory := metrics.NullFactory + + // Initialize tracer with a logger and a metrics factory + tracer, closer, err := cfg.NewTracer( + jaegercfg.Logger(jLogger), + jaegercfg.Metrics(jMetricsFactory), + ) + if err != nil { + log.Fatalf("couldn't setup tracing: %v", err) + } + return tracer, closer +} + +// SetupConnOptions sets up connection options with a tracer to trace +// salient events. +func SetupConnOptions(tracer opentracing.Tracer, opts []nats.Option) []nats.Option { + totalWait := 10 * time.Minute + reconnectDelay := time.Second + + opts = append(opts, nats.ReconnectWait(reconnectDelay)) + opts = append(opts, nats.MaxReconnects(int(totalWait/reconnectDelay))) + opts = append(opts, nats.DisconnectHandler(func(nc *nats.Conn) { + span := tracer.StartSpan("Disconnect Handler") + s := fmt.Sprintf("Disconnected: will attempt reconnects for %.0fm", totalWait.Minutes()) + span.LogEvent(s) + span.Finish() + log.Printf(s) + })) + opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) { + span := tracer.StartSpan("Reconnect Handler") + s := fmt.Sprintf("Reconnected [%s]", nc.ConnectedUrl()) + span.LogEvent(s) + span.Finish() + log.Printf(s) + })) + opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) { + span := tracer.StartSpan("Closed Handler") + s := "Exiting, no servers available" + span.LogEvent(s) + span.Finish() + log.Printf(s) + })) + return opts +} diff --git a/vendor/github.com/not.go/scripts/start_jaeger.sh b/vendor/github.com/not.go/scripts/start_jaeger.sh new file mode 100644 index 000000000..bd854c557 --- /dev/null +++ b/vendor/github.com/not.go/scripts/start_jaeger.sh @@ -0,0 +1,13 @@ +#!/bin/sh + +docker run -d --name jaeger \ + -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \ + -p 5775:5775/udp \ + -p 6831:6831/udp \ + -p 6832:6832/udp \ + -p 5778:5778 \ + -p 16686:16686 \ + -p 14268:14268 \ + -p 9411:9411 \ + jaegertracing/all-in-one:1.9 + diff --git a/vendor/github.com/opentracing/opentracing-go b/vendor/github.com/opentracing/opentracing-go new file mode 160000 index 000000000..135aa78c6 --- /dev/null +++ b/vendor/github.com/opentracing/opentracing-go @@ -0,0 +1 @@ +Subproject commit 135aa78c6f95b4a199daf2f0470d231136cbbd0c diff --git a/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go b/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go index 2dcd0c161..22243eedc 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go +++ b/vendor/github.com/siddontang/go-mysql/replication/binlogstreamer.go @@ -3,7 +3,10 @@ package replication import ( "context" + "time" + "github.com/juju/errors" + "github.com/opentracing/opentracing-go" "github.com/siddontang/go-log/log" ) @@ -28,6 +31,10 @@ func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error) { select { case c := <-s.ch: + span := opentracing.StartSpan("send binlogEvent from go-mysql", opentracing.FollowsFrom(c.SpanContest)) + span.SetTag("send event from go mysql time ", time.Now().Unix()) + c.SpanContest = span.Context() + span.Finish() return c, nil case s.err = <-s.ech: return nil, s.err @@ -36,7 +43,7 @@ func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error) { } } -// DumpEvents dumps all left events +// DumpEvents dumps all left eventsmax_payload func (s *BinlogStreamer) DumpEvents() []*BinlogEvent { count := len(s.ch) events := make([]*BinlogEvent, 0, count) diff --git a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go index 371212f4b..dc03074d5 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go +++ b/vendor/github.com/siddontang/go-mysql/replication/binlogsyncer.go @@ -11,6 +11,7 @@ import ( "time" "github.com/juju/errors" + "github.com/opentracing/opentracing-go" "github.com/satori/go.uuid" "github.com/siddontang/go-log/log" "github.com/siddontang/go-mysql/client" @@ -618,7 +619,10 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { }() for { + span := opentracing.StartSpan("data source: get incremental data from ReadPacket()") + span.SetTag("before get incremental data time:", time.Now().Unix()) data, err := b.c.ReadPacket() + span.SetTag("after get incremental data time:", time.Now().Unix()) if err != nil { log.Error(err) @@ -655,7 +659,6 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { // we connect the server and begin to re-sync again. continue } - //set read timeout if b.cfg.ReadTimeout > 0 { b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout)) @@ -666,7 +669,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { switch data[0] { case OK_HEADER: - if err = b.parseEvent(s, data); err != nil { + if err = b.parseEvent(span.Context(), s, data); err != nil { s.closeWithError(err) return } @@ -685,13 +688,16 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { log.Errorf("invalid stream header %c", data[0]) continue } + span.Finish() } } -func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { +func (b *BinlogSyncer) parseEvent(spanContext opentracing.SpanContext, s *BinlogStreamer, data []byte) error { //skip OK byte, 0x00 data = data[1:] - + span := opentracing.GlobalTracer().StartSpan(" incremental data are conversion to BinlogEvent", opentracing.ChildOf(spanContext)) + span.SetTag("time", time.Now().Unix()) + defer span.Finish() needACK := false if b.cfg.SemiSyncEnabled && (data[0] == SemiSyncIndicator) { needACK = (data[1] == 0x01) @@ -700,6 +706,8 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { } e, err := b.parser.Parse(data) + e.SpanContest = span.Context() + span.SetTag("tx timestap", e.Header.Timestamp) if err != nil { return errors.Trace(err) } diff --git a/vendor/github.com/siddontang/go-mysql/replication/event.go b/vendor/github.com/siddontang/go-mysql/replication/event.go index 737b431a0..0c102abf3 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/event.go +++ b/vendor/github.com/siddontang/go-mysql/replication/event.go @@ -10,6 +10,7 @@ import ( "unicode" "github.com/juju/errors" + "github.com/opentracing/opentracing-go" "github.com/satori/go.uuid" . "github.com/siddontang/go-mysql/mysql" ) @@ -26,8 +27,9 @@ type BinlogEvent struct { // raw binlog data which contains all data, including binlog header and event body, and including crc32 checksum if exists RawData []byte - Header *EventHeader - Event Event + Header *EventHeader + Event Event + SpanContest opentracing.SpanContext } func (e *BinlogEvent) Dump(w io.Writer) { diff --git a/vendor/github.com/siddontang/go-mysql/replication/parser.go b/vendor/github.com/siddontang/go-mysql/replication/parser.go index 221122999..7698a35b9 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/parser.go +++ b/vendor/github.com/siddontang/go-mysql/replication/parser.go @@ -317,7 +317,7 @@ func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) { return nil, err } - return &BinlogEvent{RawData: rawData, Header: h, Event: e}, nil + return &BinlogEvent{RawData: rawData, Header: h, Event: e, SpanContest: nil}, nil } func (p *BinlogParser) verifyCrc32Checksum(rawData []byte) error { diff --git a/vendor/github.com/uber/jaeger-client-go b/vendor/github.com/uber/jaeger-client-go new file mode 160000 index 000000000..402bec9e6 --- /dev/null +++ b/vendor/github.com/uber/jaeger-client-go @@ -0,0 +1 @@ +Subproject commit 402bec9e6ead856e972bfa449c9c5d6b8965aef1 diff --git a/vendor/github.com/uber/jaeger-lib b/vendor/github.com/uber/jaeger-lib new file mode 160000 index 000000000..d036253de --- /dev/null +++ b/vendor/github.com/uber/jaeger-lib @@ -0,0 +1 @@ +Subproject commit d036253de8f5b698150d81b922486f1e8e7628ec