From 29540b57cdb4a5aaea2cfbb3a52c0789a48e2fe3 Mon Sep 17 00:00:00 2001 From: Fu Wenhui <_@_._> Date: Thu, 8 Aug 2019 16:03:37 +0800 Subject: [PATCH] changes from dtle #477 by 790493303 --- replication/binlogstreamer.go | 7 ++++++- replication/binlogsyncer.go | 16 ++++++++++++---- replication/event.go | 6 ++++-- replication/parser.go | 2 +- 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/replication/binlogstreamer.go b/replication/binlogstreamer.go index bd9679a8a..cd5f23c23 100644 --- a/replication/binlogstreamer.go +++ b/replication/binlogstreamer.go @@ -3,6 +3,7 @@ package replication import ( "context" "github.com/pingcap/errors" + "github.com/opentracing/opentracing-go" "github.com/siddontang/go-log/log" "time" ) @@ -28,6 +29,10 @@ func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error) { select { case c := <-s.ch: + span := opentracing.StartSpan("send binlogEvent from go-mysql", opentracing.FollowsFrom(c.SpanContest)) + span.SetTag("send event from go mysql time ", time.Now().Unix()) + c.SpanContest = span.Context() + span.Finish() return c, nil case s.err = <-s.ech: return nil, s.err @@ -56,7 +61,7 @@ func (s *BinlogStreamer) GetEventWithStartTime(ctx context.Context, startTime ti } } -// DumpEvents dumps all left events +// DumpEvents dumps all left eventsmax_payload func (s *BinlogStreamer) DumpEvents() []*BinlogEvent { count := len(s.ch) events := make([]*BinlogEvent, 0, count) diff --git a/replication/binlogsyncer.go b/replication/binlogsyncer.go index 2cec5de87..10311aea9 100644 --- a/replication/binlogsyncer.go +++ b/replication/binlogsyncer.go @@ -11,6 +11,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/opentracing/opentracing-go" uuid "github.com/satori/go.uuid" "github.com/siddontang/go-log/log" "github.com/siddontang/go-mysql/client" @@ -645,7 +646,10 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { }() for { + span := opentracing.StartSpan("data source: get incremental data from ReadPacket()") + span.SetTag("before get incremental data time:", time.Now().Unix()) data, err := b.c.ReadPacket() + span.SetTag("after get incremental data time:", time.Now().Unix()) select { case <-b.ctx.Done(): s.close() @@ -694,7 +698,6 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { // we connect the server and begin to re-sync again. continue } - //set read timeout if b.cfg.ReadTimeout > 0 { b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout)) @@ -705,7 +708,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { switch data[0] { case OK_HEADER: - if err = b.parseEvent(s, data); err != nil { + if err = b.parseEvent(span.Context(), s, data); err != nil { s.closeWithError(err) return } @@ -724,13 +727,16 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) { log.Errorf("invalid stream header %c", data[0]) continue } + span.Finish() } } -func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { +func (b *BinlogSyncer) parseEvent(spanContext opentracing.SpanContext, s *BinlogStreamer, data []byte) error { //skip OK byte, 0x00 data = data[1:] - + span := opentracing.GlobalTracer().StartSpan(" incremental data are conversion to BinlogEvent", opentracing.ChildOf(spanContext)) + span.SetTag("time", time.Now().Unix()) + defer span.Finish() needACK := false if b.cfg.SemiSyncEnabled && (data[0] == SemiSyncIndicator) { needACK = (data[1] == 0x01) @@ -739,6 +745,8 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error { } e, err := b.parser.Parse(data) + e.SpanContest = span.Context() + span.SetTag("tx timestap", e.Header.Timestamp) if err != nil { return errors.Trace(err) } diff --git a/replication/event.go b/replication/event.go index e5457c8cb..43897fa3e 100644 --- a/replication/event.go +++ b/replication/event.go @@ -11,6 +11,7 @@ import ( "unicode" "github.com/pingcap/errors" + "github.com/opentracing/opentracing-go" "github.com/satori/go.uuid" . "github.com/siddontang/go-mysql/mysql" ) @@ -28,8 +29,9 @@ type BinlogEvent struct { // raw binlog data which contains all data, including binlog header and event body, and including crc32 checksum if exists RawData []byte - Header *EventHeader - Event Event + Header *EventHeader + Event Event + SpanContest opentracing.SpanContext } func (e *BinlogEvent) Dump(w io.Writer) { diff --git a/replication/parser.go b/replication/parser.go index a0cc46cc7..4d220c030 100644 --- a/replication/parser.go +++ b/replication/parser.go @@ -335,7 +335,7 @@ func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) { return nil, err } - return &BinlogEvent{RawData: rawData, Header: h, Event: e}, nil + return &BinlogEvent{RawData: rawData, Header: h, Event: e, SpanContest: nil}, nil } func (p *BinlogParser) verifyCrc32Checksum(rawData []byte) error {