Skip to content

Commit

Permalink
Fixes from feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Nov 20, 2018
1 parent 03d9484 commit 3a95b69
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
9 changes: 8 additions & 1 deletion data_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,14 @@ func (d *DataIterator) Run() {
}

for table, maxPk := range tablesWithData {
d.targetPKs.Store(table.String(), maxPk)
tableName := table.String()
if d.StateTracker.IsTableComplete(tableName) {
// In a previous run, the table may have been completed.
// We don't need to reiterate those tables as it has already been done.
delete(tablesWithData, tableName)
} else {
d.targetPKs.Store(table.String(), maxPk)
}
}

tablesQueue := make(chan *schema.Table)
Expand Down
6 changes: 0 additions & 6 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,6 @@ func (f *Ferry) NewBinlogStreamer() *BinlogStreamer {
}
}

// Even though this function is identical to NewBinlogStreamer, it is here
// for consistency so it will lead to less confusion.
func (f *Ferry) NewBinlogStreamerWithoutStateTracker() *BinlogStreamer {
return f.NewBinlogStreamer()
}

func (f *Ferry) NewBinlogWriter() *BinlogWriter {
return &BinlogWriter{
DB: f.TargetDB,
Expand Down
7 changes: 7 additions & 0 deletions state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ func (s *StateTracker) MarkTableAsCompleted(table string) {
s.completedTables[table] = true
}

func (s *StateTracker) IsTableComplete(table string) {
s.tableMutex.Lock()
defer s.tableMutex.Unlock()

return s.completedTables[table]
}

func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos mysql.Position) {
s.binlogMutex.Lock()
defer s.binlogMutex.Unlock()
Expand Down

0 comments on commit 3a95b69

Please sign in to comment.