Skip to content

Commit

Permalink
Removed StateTracker complexity
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Jul 15, 2019
1 parent afca2e5 commit 4f17d4d
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 255 deletions.
2 changes: 1 addition & 1 deletion batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type BatchWriter struct {
DB *sql.DB
InlineVerifier *InlineVerifier
StateTracker *CopyStateTracker
StateTracker *StateTracker

DatabaseRewrites map[string]string
TableRewrites map[string]string
Expand Down
4 changes: 2 additions & 2 deletions binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type BinlogWriter struct {
WriteRetries int

ErrorHandler ErrorHandler
StateTracker *CopyStateTracker
StateTracker *StateTracker

binlogEventBuffer chan DMLEvent
logger *logrus.Entry
Expand Down Expand Up @@ -110,7 +110,7 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
}

if b.StateTracker != nil {
b.StateTracker.UpdateLastProcessedBinlogPosition(events[len(events)-1].BinlogPosition())
b.StateTracker.UpdateLastWrittenBinlogPosition(events[len(events)-1].BinlogPosition())
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions data_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type DataIterator struct {

ErrorHandler ErrorHandler
CursorConfig *CursorConfig
StateTracker *CopyStateTracker
StateTracker *StateTracker

targetPKs *sync.Map
batchListeners []func(*RowBatch) error
Expand All @@ -32,7 +32,7 @@ func (d *DataIterator) Run(tables []*TableSchema) {
// tracking state. However, some methods are still useful so we initialize
// a minimal local instance.
if d.StateTracker == nil {
d.StateTracker = NewCopyStateTracker(0)
d.StateTracker = NewStateTracker(0)
}

d.logger.WithField("tablesCount", len(tables)).Info("starting data iterator run")
Expand Down
13 changes: 7 additions & 6 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (f *Ferry) NewDataIterator() *DataIterator {
BatchSize: f.Config.DataIterationBatchSize,
ReadRetries: f.Config.DBReadRetries,
},
StateTracker: f.StateTracker.CopyStage,
StateTracker: f.StateTracker,
}

if f.CopyFilter != nil {
Expand Down Expand Up @@ -144,7 +144,7 @@ func (f *Ferry) NewBinlogWriter() *BinlogWriter {
WriteRetries: f.Config.DBWriteRetries,

ErrorHandler: f.ErrorHandler,
StateTracker: f.StateTracker.CopyStage,
StateTracker: f.StateTracker,
}
}

Expand All @@ -159,7 +159,7 @@ func (f *Ferry) NewBatchWriter() *BatchWriter {

batchWriter := &BatchWriter{
DB: f.TargetDB,
StateTracker: f.StateTracker.CopyStage,
StateTracker: f.StateTracker,

DatabaseRewrites: f.Config.DatabaseRewrites,
TableRewrites: f.Config.TableRewrites,
Expand Down Expand Up @@ -480,7 +480,8 @@ func (f *Ferry) Start() error {
// If we don't set this now, there is a race condition where Ghostferry
// is terminated with some rows copied but no binlog events are written.
// This guarentees that we are able to restart from a valid location.
f.StateTracker.CopyStage.UpdateLastProcessedBinlogPosition(pos)
f.StateTracker.UpdateLastWrittenBinlogPosition(pos)
f.StateTracker.UpdateLastStoredBinlogPositionForInlineVerifier(pos)

return nil
}
Expand Down Expand Up @@ -698,7 +699,7 @@ func (f *Ferry) Progress() *Progress {
s.FinalBinlogPos = f.BinlogStreamer.targetBinlogPosition

// Table Progress
serializedState := f.StateTracker.CopyStage.Serialize()
serializedState := f.StateTracker.Serialize(nil)
s.Tables = make(map[string]TableProgress)
targetPKs := make(map[string]uint64)
f.DataIterator.targetPKs.Range(func(k, v interface{}) bool {
Expand Down Expand Up @@ -731,7 +732,7 @@ func (f *Ferry) Progress() *Progress {
// ETA
var totalPKsToCopy uint64 = 0
var completedPKs uint64 = 0
estimatedPKsPerSecond := f.StateTracker.CopyStage.EstimatedPKsPerSecond()
estimatedPKsPerSecond := f.StateTracker.EstimatedPKsPerSecond()
for _, targetPK := range targetPKs {
totalPKsToCopy += targetPK
}
Expand Down
2 changes: 1 addition & 1 deletion sharding/test/callbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (t *CallbacksTestSuite) TestFailsRunOnPanicError() {
stateDump := &ghostferry.SerializableState{}
jsonErr = json.Unmarshal([]byte(errorData["StateDump"]), stateDump)
t.Require().Nil(jsonErr)
t.Require().NotNil(stateDump.CopyStage)
t.Require().NotNil(stateDump)
t.Require().True(len(stateDump.LastKnownTableSchemaCache) > 0)

w.WriteHeader(http.StatusInternalServerError)
Expand Down
Loading

0 comments on commit 4f17d4d

Please sign in to comment.