Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions driver/common/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,28 @@ type ThroughputStat struct {
Time uint64
}
type BufferStat struct {
BinlogEventQueueSize int
ExtractorTxQueueSize int
ApplierMsgQueueSize int
ApplierTxQueueSize int
SendByTimeout int
SendBySizeFull int
BinlogEventQueueSize int
ExtractorTxQueueSize int
ApplierMsgQueueSize int
ApplierTxQueueSize int
SendByTimeout int
SendBySizeFull int
}
type MemoryStat struct {
Full int64
Incr int64
Full int64
Incr int64
}

type TxCount struct {
ExtractedTxCount *uint32
AppliedTxCount *uint32
}

type QueryCount struct {
ExtractedQueryCount *uint64
AppliedQueryCount *uint64
}

type TaskStatistics struct {
CurrentCoordinates *CurrentCoordinates
TableStats *TableStats
Expand All @@ -79,4 +84,5 @@ type TaskStatistics struct {
Timestamp int64
MemoryStat MemoryStat
HandledTxCount TxCount
HandledQueryCount QueryCount
}
7 changes: 7 additions & 0 deletions driver/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ func (h *taskHandle) emitStats(ru *common.TaskStatistics) {
if nil != ru.HandledTxCount.ExtractedTxCount {
metrics.SetGaugeWithLabels([]string{"src_extracted_incr_tx_count"}, float32(*ru.HandledTxCount.ExtractedTxCount), labels)
}

if nil != ru.HandledQueryCount.AppliedQueryCount {
metrics.SetGaugeWithLabels([]string{"dest_applied_incr_query_count"}, float32(*ru.HandledQueryCount.AppliedQueryCount), labels)
}
if nil != ru.HandledQueryCount.ExtractedQueryCount {
metrics.SetGaugeWithLabels([]string{"src_extracted_incr_query_count"}, float32(*ru.HandledQueryCount.ExtractedQueryCount), labels)
}
}

func (h *taskHandle) Destroy() bool {
Expand Down
5 changes: 5 additions & 0 deletions driver/mysql/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,8 +967,10 @@ func (a *Applier) Stats() (*common.TaskStatistics, error) {
}

var txCount uint32
var queryCount uint64
if a.ai != nil {
txCount = a.ai.appliedTxCount
queryCount = a.ai.appliedQueryCount
}
taskResUsage := common.TaskStatistics{
ExecMasterRowCount: totalRowsReplay,
Expand Down Expand Up @@ -1003,6 +1005,9 @@ func (a *Applier) Stats() (*common.TaskStatistics, error) {
HandledTxCount: common.TxCount{
AppliedTxCount: &txCount,
},
HandledQueryCount: common.QueryCount{
AppliedQueryCount: &queryCount,
},
}
if a.natsConn != nil {
taskResUsage.MsgStat = a.natsConn.Statistics
Expand Down
14 changes: 8 additions & 6 deletions driver/mysql/applier_incr.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ type ApplierIncr struct {
ctx context.Context
shutdownCh chan struct{}

memory2 *int64
printTps bool
txLastNSeconds uint32
appliedTxCount uint32
timestampCtx *TimestampContext
TotalDeltaCopied int64
memory2 *int64
printTps bool
txLastNSeconds uint32
appliedTxCount uint32
appliedQueryCount uint64
timestampCtx *TimestampContext
TotalDeltaCopied int64

EntryExecutedHook func(entry *common.DataEntry)

Expand Down Expand Up @@ -652,6 +653,7 @@ func (a *ApplierIncr) ApplyBinlogEvent(workerIdx int, binlogEntryCtx *common.Ent
}
}
timestamp = event.Timestamp
atomic.AddUint64(&a.appliedQueryCount, uint64(1))
}

if binlogEntry.Final {
Expand Down
1 change: 0 additions & 1 deletion driver/mysql/binlog/binlog_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1937,7 +1937,6 @@ func (b *BinlogReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *r
} else {
b.logger.Debug("event has not passed 'where'")
}

if b.entryContext.OriginalSize >= bigTxSplittingSize {
b.logger.Debug("splitting big tx", "index", b.entryContext.Entry.Index)
b.entryContext.Entry.Final = false
Expand Down
13 changes: 8 additions & 5 deletions driver/mysql/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ type Extractor struct {
// Original comment: TotalRowsCopied returns the accurate number of rows being copied (affected)
// This is not exactly the same as the rows being iterated via chunks, but potentially close enough.
// TODO What is the difference between mysqlContext.RowsEstimate ?
TotalRowsCopied int64
natsAddr string
TotalRowsCopied int64
extractorQueryCount uint64
natsAddr string

mysqlVersionDigit int
NetWriteTimeout int
Expand Down Expand Up @@ -999,7 +1000,6 @@ func (e *Extractor) StreamEvents() error {
for _, entry := range entries.Entries {
atomic.AddInt64(e.memory2, -int64(entry.Size()))
}

e.logger.Debug("publish.after", "gno", gno, "n", len(entries.Entries))
entries.Entries = nil
entriesSize = 0
Expand All @@ -1016,7 +1016,7 @@ func (e *Extractor) StreamEvents() error {
select {
case entryCtx := <-e.dataChannel:
binlogEntry := entryCtx.Entry

atomic.AddUint64(&e.extractorQueryCount, uint64(len(binlogEntry.Events)))
entries.Entries = append(entries.Entries, binlogEntry)
entriesSize += entryCtx.OriginalSize

Expand Down Expand Up @@ -1529,6 +1529,9 @@ func (e *Extractor) Stats() (*common.TaskStatistics, error) {
HandledTxCount: common.TxCount{
ExtractedTxCount: &extractedTxCount,
},
HandledQueryCount: common.QueryCount{
ExtractedQueryCount: &e.extractorQueryCount,
},
}
if e.natsConn != nil {
taskResUsage.MsgStat = e.natsConn.Statistics
Expand Down Expand Up @@ -1619,7 +1622,7 @@ func (e *Extractor) Shutdown() error {

func (e *Extractor) sendFullComplete() (err error) {
dumpMsg, err := common.Encode(&common.DumpStatResult{
Coord: e.initialBinlogCoordinates,
Coord: e.initialBinlogCoordinates,
TableSpecs: e.tableSpecs,
})
if err != nil {
Expand Down