Skip to content

Commit

Permalink
fix a bug
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu committed Jun 14, 2023
1 parent f0a57cc commit c51e4a2
Show file tree
Hide file tree
Showing 10 changed files with 12 additions and 85 deletions.
4 changes: 2 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(
ResolvedTs: sortStats.ReceivedMaxResolvedTs,
}
stats.StageCheckpoints["sorter-egress"] = tablepb.Checkpoint{
CheckpointTs: sinkStats.ReceivedMaxCommitTs,
ResolvedTs: sinkStats.ReceivedMaxCommitTs,
CheckpointTs: sinkStats.ResolvedTs,
ResolvedTs: sinkStats.ResolvedTs,
}

return stats
Expand Down
15 changes: 5 additions & 10 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ type TableStats struct {
CheckpointTs model.Ts
ResolvedTs model.Ts
BarrierTs model.Ts
// From sorter.
ReceivedMaxCommitTs model.Ts
ReceivedMaxResolvedTs model.Ts
}

// SinkManager is the implementation of SinkManager.
Expand Down Expand Up @@ -934,23 +931,21 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
if m.redoDMLMgr != nil {
resolvedTs = m.redoDMLMgr.GetResolvedTs(span)
} else {
resolvedTs = m.sourceManager.GetTableResolvedTs(span)
resolvedTs = tableSink.getReceivedSorterResolvedTs()
}

if resolvedTs < checkpointTs.ResolvedMark() {
log.Error("sinkManager: resolved ts should not less than checkpoint ts",
log.Panic("sinkManager: resolved ts should not less than checkpoint ts",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("resolvedTs", resolvedTs),
zap.Any("checkpointTs", checkpointTs))
}
return TableStats{
CheckpointTs: checkpointTs.ResolvedMark(),
ResolvedTs: resolvedTs,
BarrierTs: tableSink.barrierTs.Load(),
ReceivedMaxCommitTs: tableSink.getReceivedSorterCommitTs(),
ReceivedMaxResolvedTs: tableSink.getReceivedSorterResolvedTs(),
CheckpointTs: checkpointTs.ResolvedMark(),
ResolvedTs: resolvedTs,
BarrierTs: tableSink.barrierTs.Load(),
}
}

Expand Down
1 change: 0 additions & 1 deletion cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
cachedSize := uint64(0)

defer func() {
task.tableSink.updateReceivedSorterCommitTs(advancer.lastTxnCommitTs)
eventCount := newRangeEventCount(advancer.lastPos, allEventCount)
task.tableSink.updateRangeEventCounts(eventCount)

Expand Down
1 change: 0 additions & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e

// If eventCache is nil, update sorter commit ts and range event count.
if w.eventCache == nil {
task.tableSink.updateReceivedSorterCommitTs(advancer.currTxnCommitTs)
eventCount := newRangeEventCount(advancer.lastPos, allEventCount)
task.tableSink.updateRangeEventCounts(eventCount)
}
Expand Down
13 changes: 0 additions & 13 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ type tableSinkWrapper struct {
// receivedSorterResolvedTs is the resolved ts received from the sorter.
// We use this to advance the redo log.
receivedSorterResolvedTs atomic.Uint64
// receivedSorterCommitTs is the commit ts received from the sorter.
// We use this to statistics the latency of the table sorter.
receivedSorterCommitTs atomic.Uint64
// receivedEventCount is the number of events received from the sorter.
receivedEventCount atomic.Int64

Expand Down Expand Up @@ -189,12 +186,6 @@ func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) {
}
}

func (t *tableSinkWrapper) updateReceivedSorterCommitTs(ts model.Ts) {
if ts > t.receivedSorterCommitTs.Load() {
t.receivedSorterCommitTs.Store(ts)
}
}

func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error {
t.tableSinkMu.RLock()
defer t.tableSinkMu.RUnlock()
Expand Down Expand Up @@ -225,10 +216,6 @@ func (t *tableSinkWrapper) getReceivedSorterResolvedTs() model.Ts {
return t.receivedSorterResolvedTs.Load()
}

func (t *tableSinkWrapper) getReceivedSorterCommitTs() model.Ts {
return t.receivedSorterCommitTs.Load()
}

func (t *tableSinkWrapper) getReceivedEventCount() int64 {
return t.receivedEventCount.Load()
}
Expand Down
3 changes: 0 additions & 3 deletions cdc/processor/sourcemanager/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type SortEngine interface {
// events are available for fetching, OnResolve is what you want.
Add(span tablepb.Span, events ...*model.PolymorphicEvent)

// GetResolvedTs gets resolved timestamp of the given table.
GetResolvedTs(span tablepb.Span) model.Ts

// OnResolve pushes action into SortEngine's hook list, which
// will be called after any events are resolved.
OnResolve(action func(tablepb.Span, model.Ts))
Expand Down
10 changes: 0 additions & 10 deletions cdc/processor/sourcemanager/engine/memory/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,6 @@ func (s *EventSorter) Add(span tablepb.Span, events ...*model.PolymorphicEvent)
}
}

// GetResolvedTs implements engine.SortEngine.
func (s *EventSorter) GetResolvedTs(span tablepb.Span) model.Ts {
value, exists := s.tables.Load(span)
if !exists {
log.Panic("get resolved ts from an unexist table", zap.Stringer("span", &span))
}

return value.(*tableSorter).getResolvedTs()
}

// OnResolve implements engine.SortEngine.
func (s *EventSorter) OnResolve(action func(tablepb.Span, model.Ts)) {
s.mu.Lock()
Expand Down
14 changes: 0 additions & 14 deletions cdc/processor/sourcemanager/engine/mock/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 5 additions & 26 deletions cdc/processor/sourcemanager/engine/pebble/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,44 +160,23 @@ func (s *EventSorter) Add(span tablepb.Span, events ...*model.PolymorphicEvent)
zap.Stringer("span", &span))
}

maxCommitTs := model.Ts(0)
maxResolvedTs := model.Ts(0)
maxCommitTs := state.maxReceivedCommitTs.Load()
maxResolvedTs := state.maxReceivedResolvedTs.Load()
for _, event := range events {
state.ch.In() <- eventWithTableID{uniqueID: state.uniqueID, span: span, event: event}
if event.IsResolved() {
if event.CRTs > maxResolvedTs {
maxResolvedTs = event.CRTs
state.maxReceivedResolvedTs.Store(maxResolvedTs)
}
} else {
state.receivedEvents.Add(1)
if event.CRTs > maxCommitTs {
maxCommitTs = event.CRTs
state.maxReceivedCommitTs.Store(maxCommitTs)
}
}
state.ch.In() <- eventWithTableID{uniqueID: state.uniqueID, span: span, event: event}
}

if maxCommitTs > state.maxReceivedCommitTs.Load() {
state.maxReceivedCommitTs.Store(maxCommitTs)
}
if maxResolvedTs > state.maxReceivedResolvedTs.Load() {
state.maxReceivedResolvedTs.Store(maxResolvedTs)
}
}

// GetResolvedTs implements engine.SortEngine.
func (s *EventSorter) GetResolvedTs(span tablepb.Span) model.Ts {
s.mu.RLock()
state, exists := s.tables.Get(span)
s.mu.RUnlock()

if !exists {
log.Panic("get resolved ts from an non-existent table",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.Stringer("span", &span))
}

return state.maxReceivedResolvedTs.Load()
}

// OnResolve implements engine.SortEngine.
Expand Down
5 changes: 0 additions & 5 deletions cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,6 @@ func (m *SourceManager) CleanByTable(span tablepb.Span, upperBound engine.Positi
return m.engine.CleanByTable(span, upperBound)
}

// GetTableResolvedTs returns the resolved ts of the table.
func (m *SourceManager) GetTableResolvedTs(span tablepb.Span) model.Ts {
return m.engine.GetResolvedTs(span)
}

// GetTablePullerStats returns the puller stats of the table.
func (m *SourceManager) GetTablePullerStats(span tablepb.Span) puller.Stats {
if m.multiplexing {
Expand Down

0 comments on commit c51e4a2

Please sign in to comment.