Skip to content

Commit

Permalink
First implementing the capability to resume
Browse files Browse the repository at this point in the history
The library gained the ability to resume the BinlogStreamer and the
DataIterator from arbitrary states. We're still missing the
reconciliation process to make the interrupt/resume behaviour correct
with respect to data safety. We're also missing the ability to interrupt
from within the IterativeVerifier.

Additionally, this patch exposes the need to refactor how we initialize
DataIterator/IterativeVerifiers, and the other "public" components that
are below the level of Ferry. It is currently difficult to initialize
these structs without Ferry and causes things like the hacks within
RunOnlyDataCopyForTables (previously named RunStandaloneDataCopy). We
can get rid of this in a future PR.
  • Loading branch information
shuhaowu committed Sep 25, 2018
1 parent 4252d9e commit ccf1caa
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 38 deletions.
15 changes: 10 additions & 5 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,27 @@ func (s *BinlogStreamer) createBinlogSyncer() error {
}

func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() error {
err := s.createBinlogSyncer()
currentPosition, err := ShowMasterStatusBinlogPosition(s.Db)
if err != nil {
s.logger.WithError(err).Error("failed to read current binlog position")
return err
}

s.logger.Info("reading current binlog position")
s.lastStreamedBinlogPosition, err = ShowMasterStatusBinlogPosition(s.Db)
return s.ConnectBinlogStreamerToMysqlFrom(currentPosition)
}

func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition mysql.Position) error {
err := s.createBinlogSyncer()
if err != nil {
s.logger.WithError(err).Error("failed to read current binlog position")
return err
}

s.lastStreamedBinlogPosition = startFromBinlogPosition

s.logger.WithFields(logrus.Fields{
"file": s.lastStreamedBinlogPosition.Name,
"pos": s.lastStreamedBinlogPosition.Pos,
}).Info("found binlog position, starting synchronization")
}).Info("starting binlog streaming")

s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastStreamedBinlogPosition)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ type Config struct {
// Config for the ControlServer
ServerBindAddr string
WebBasedir string

// The state to resume from as dumped by the PanicErrorHandler.
// If this is null, a new Ghostferry run will be started. Otherwise, the
// reconciliation process will start and Ghostferry will resume after that.
StateToResumeFrom *SerializableState
}

func (c *Config) ValidateConfig() error {
Expand Down
23 changes: 12 additions & 11 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,24 @@ type CursorConfig struct {
}

// returns a new Cursor with an embedded copy of itself
func (c *CursorConfig) NewCursor(table *schema.Table, maxPk uint64) *Cursor {
func (c *CursorConfig) NewCursor(table *schema.Table, startPk, maxPk uint64) *Cursor {
return &Cursor{
CursorConfig: *c,
Table: table,
MaxPrimaryKey: maxPk,
RowLock: true,
CursorConfig: *c,
Table: table,
MaxPrimaryKey: maxPk,
RowLock: true,
lastSuccessfulPrimaryKey: startPk,
}
}

// returns a new Cursor with an embedded copy of itself
func (c *CursorConfig) NewCursorWithoutRowLock(table *schema.Table, maxPk uint64) *Cursor {
func (c *CursorConfig) NewCursorWithoutRowLock(table *schema.Table, startPk, maxPk uint64) *Cursor {
return &Cursor{
CursorConfig: *c,
Table: table,
MaxPrimaryKey: maxPk,
RowLock: false,
CursorConfig: *c,
Table: table,
MaxPrimaryKey: maxPk,
RowLock: false,
lastSuccessfulPrimaryKey: startPk,
}
}

Expand All @@ -72,7 +74,6 @@ type Cursor struct {
}

func (c *Cursor) Each(f func(*RowBatch) error) error {
c.lastSuccessfulPrimaryKey = 0
c.logger = logrus.WithFields(logrus.Fields{
"table": c.Table.String(),
"tag": "cursor",
Expand Down
19 changes: 14 additions & 5 deletions data_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"errors"
"fmt"
"math"
"sync"

"github.com/siddontang/go-mysql/schema"
Expand All @@ -15,9 +16,10 @@ type DataIterator struct {
Tables []*schema.Table
Concurrency int

ErrorHandler ErrorHandler
CursorConfig *CursorConfig
StateTracker *StateTracker
ErrorHandler ErrorHandler
CursorConfig *CursorConfig
StateTracker *StateTracker
ResumingFromKnownState bool

targetPKs *sync.Map
batchListeners []func(*RowBatch) error
Expand All @@ -38,7 +40,6 @@ func (d *DataIterator) Initialize() error {

func (d *DataIterator) Run() {
d.logger.WithField("tablesCount", len(d.Tables)).Info("starting data iterator run")

tablesWithData, emptyTables, err := MaxPrimaryKeys(d.DB, d.Tables, d.logger)
if err != nil {
d.ErrorHandler.Fatal("data_iterator", err)
Expand Down Expand Up @@ -76,7 +77,15 @@ func (d *DataIterator) Run() {
return
}

cursor := d.CursorConfig.NewCursor(table, targetPKInterface.(uint64))
startPk := d.StateTracker.LastSuccessfulPK(table.String())
if startPk == math.MaxUint64 {
err := fmt.Errorf("%v has been marked as completed but a table iterator has been spawned, this is likely a programmer error which resulted in the inconsistent starting state", table.String())
logger.WithError(err).Error("this is definitely a bug")
d.ErrorHandler.Fatal("data_iterator", err)
return
}

cursor := d.CursorConfig.NewCursor(table, startPk, targetPKInterface.(uint64))
err := cursor.Each(func(batch *RowBatch) error {
metrics.Count("RowEvent", int64(batch.Size()), []MetricTag{
MetricTag{"table", table.Name},
Expand Down
55 changes: 41 additions & 14 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Ferry struct {
rowCopyCompleteCh chan struct{}
}

func (f *Ferry) newDataIterator() (*DataIterator, error) {
func (f *Ferry) newDataIterator(resumingFromKnownState bool) (*DataIterator, error) {
dataIterator := &DataIterator{
DB: f.SourceDB,
Concurrency: f.Config.DataIterationConcurrency,
Expand All @@ -90,7 +90,8 @@ func (f *Ferry) newDataIterator() (*DataIterator, error) {
BatchSize: f.Config.DataIterationBatchSize,
ReadRetries: f.Config.DBReadRetries,
},
StateTracker: f.StateTracker,
StateTracker: f.StateTracker,
ResumingFromKnownState: resumingFromKnownState,
}

if f.CopyFilter != nil {
Expand All @@ -117,7 +118,8 @@ func (f *Ferry) Initialize() (err error) {
// dumping states due to an abort.
siddontanglog.SetDefaultLogger(siddontanglog.NewDefault(&siddontanglog.NullHandler{}))

// Connect to the database
// Connect to the source and target databases and check the validity
// of the connections
f.SourceDB, err = f.Source.SqlDB(f.logger.WithField("dbname", "source"))
if err != nil {
f.logger.WithError(err).Error("failed to connect to source database")
Expand Down Expand Up @@ -157,6 +159,9 @@ func (f *Ferry) Initialize() (err error) {
return fmt.Errorf("@@read_only must be OFF on target db")
}

// Check if we're running from a replica or not and sanity check
// the configurations given to Ghostferry as well as the configurations
// of the MySQL databases.
if f.WaitUntilReplicaIsCaughtUpToMaster != nil {
f.WaitUntilReplicaIsCaughtUpToMaster.ReplicaDB = f.SourceDB

Expand Down Expand Up @@ -205,6 +210,7 @@ func (f *Ferry) Initialize() (err error) {
}
}

// Initializing the necessary components of Ghostferry.
if f.ErrorHandler == nil {
f.ErrorHandler = &PanicErrorHandler{
Ferry: f,
Expand All @@ -215,7 +221,11 @@ func (f *Ferry) Initialize() (err error) {
f.Throttler = &PauserThrottler{}
}

f.StateTracker = NewStateTracker(f.DataIterationConcurrency * 10)
if f.StateToResumeFrom == nil {
f.StateTracker = NewStateTracker(f.DataIterationConcurrency * 10)
} else {
f.StateTracker = NewStateTrackerFromSerializedState(f.DataIterationConcurrency*10, f.StateToResumeFrom)
}

f.BinlogStreamer = &BinlogStreamer{
Db: f.SourceDB,
Expand Down Expand Up @@ -246,7 +256,7 @@ func (f *Ferry) Initialize() (err error) {
return err
}

f.DataIterator, err = f.newDataIterator()
f.DataIterator, err = f.newDataIterator(f.StateToResumeFrom != nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -288,7 +298,12 @@ func (f *Ferry) Start() error {
// miss some records that are inserted between the time the
// DataIterator determines the range of IDs to copy and the time that
// the starting binlog coordinates are determined.
err := f.BinlogStreamer.ConnectBinlogStreamerToMysql()
var err error
if f.StateToResumeFrom != nil {
err = f.BinlogStreamer.ConnectBinlogStreamerToMysqlFrom(f.StateToResumeFrom.LastWrittenBinlogPosition)
} else {
err = f.BinlogStreamer.ConnectBinlogStreamerToMysql()
}
if err != nil {
return err
}
Expand All @@ -298,11 +313,15 @@ func (f *Ferry) Start() error {
// in order to determine the PrimaryKey of each table as well as finding
// which value in the binlog event correspond to which field in the
// table.
metrics.Measure("LoadTables", nil, 1.0, func() {
f.Tables, err = LoadTables(f.SourceDB, f.TableFilter)
})
if err != nil {
return err
if f.StateToResumeFrom != nil {
f.Tables = f.StateToResumeFrom.LastKnownTableSchemaCache
} else {
metrics.Measure("LoadTables", nil, 1.0, func() {
f.Tables, err = LoadTables(f.SourceDB, f.TableFilter)
})
if err != nil {
return err
}
}

// TODO(pushrax): handle changes to schema during copying and clean this up.
Expand Down Expand Up @@ -397,19 +416,27 @@ func (f *Ferry) Run() {
supportingServicesWg.Wait()
}

func (f *Ferry) RunStandaloneDataCopy(tables []*schema.Table) error {
func (f *Ferry) RunOnlyDataCopyForTables(tables []*schema.Table) error {
if len(tables) == 0 {
return nil
}

dataIterator, err := f.newDataIterator()
dataIterator, err := f.newDataIterator(false)
if err != nil {
return err
}

// HACK:
// This process is not interruptible, so we can override the previous
// StateTracker for this instance only.
// However, if the PanicErrorHandler fires while running this method, we will
// get an error dump even though we should not get one.
// TODO: refactor this to make it better
dataIterator.StateTracker = NewStateTracker(f.DataIterationConcurrency * 10)

dataIterator.Tables = tables
dataIterator.AddBatchListener(f.BatchWriter.WriteRowBatch)
f.logger.WithField("tables", tables).Info("starting standalone table copy")
f.logger.WithField("tables", tables).Info("starting delta table copy in cutover")

dataIterator.Run()

Expand Down
2 changes: 1 addition & 1 deletion iterative_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func (v *IterativeVerifier) iterateAllTables(mismatchedPkFunc func(uint64, *sche
func (v *IterativeVerifier) iterateTableFingerprints(table *schema.Table, mismatchedPkFunc func(uint64, *schema.Table) error) error {
// The cursor will stop iterating when it cannot find anymore rows,
// so it will not iterate until MaxUint64.
cursor := v.CursorConfig.NewCursorWithoutRowLock(table, math.MaxUint64)
cursor := v.CursorConfig.NewCursorWithoutRowLock(table, 0, math.MaxUint64)

// It only needs the PKs, not the entire row.
cursor.ColumnsToSelect = []string{fmt.Sprintf("`%s`", table.GetPKColumn(0).Name)}
Expand Down
4 changes: 2 additions & 2 deletions sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (r *ShardingFerry) deltaCopyJoinedTables() error {
}
}

err := r.Ferry.RunStandaloneDataCopy(tables)
err := r.Ferry.RunOnlyDataCopyForTables(tables)
if err != nil {
return err
}
Expand Down Expand Up @@ -321,7 +321,7 @@ func (r *ShardingFerry) copyPrimaryKeyTables() error {
return fmt.Errorf("expected primary key tables could not be found")
}

err = r.Ferry.RunStandaloneDataCopy(tables)
err = r.Ferry.RunOnlyDataCopyForTables(tables)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions sharding/testhelpers/unit_test_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (t *ShardingUnitTestSuite) SetupTest() {
t.setupShardingFerry()
t.dropTestDbs()

testhelpers.SetupTest()

testhelpers.SeedInitialData(t.Ferry.Ferry.SourceDB, sourceDbName, testTable, 1000)
testhelpers.SeedInitialData(t.Ferry.Ferry.TargetDB, targetDbName, testTable, 0)

Expand Down
18 changes: 18 additions & 0 deletions state_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ghostferry

import (
"container/ring"
"math"
"sync"
"time"

Expand Down Expand Up @@ -44,6 +45,23 @@ func (s *StateTracker) UpdateLastSuccessfulPK(table string, pk uint64) {
s.updateSpeedLog(deltaPK)
}

func (s *StateTracker) LastSuccessfulPK(table string) uint64 {
s.tableMutex.RLock()
defer s.tableMutex.RUnlock()

_, found := s.completedTables[table]
if found {
return math.MaxUint64
}

pk, found := s.lastSuccessfulPrimaryKeys[table]
if !found {
return 0
}

return pk
}

func (s *StateTracker) MarkTableAsCompleted(table string) {
s.tableMutex.Lock()
defer s.tableMutex.Unlock()
Expand Down

0 comments on commit ccf1caa

Please sign in to comment.