Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interrupt Ghostferry via signals and resume from SerializedState #67

Merged
merged 7 commits into from
Nov 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 4 additions & 9 deletions batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ghostferry

import (
"database/sql"
"errors"
"fmt"
"sync"

Expand All @@ -24,15 +23,9 @@ type BatchWriter struct {
logger *logrus.Entry
}

func (w *BatchWriter) Initialize() error {
func (w *BatchWriter) Initialize() {
w.statements = make(map[string]*sql.Stmt)
w.logger = logrus.WithField("tag", "batch_writer")

if w.StateTracker == nil {
return errors.New("StateTracker must be defined")
}

return nil
}

func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error {
Expand Down Expand Up @@ -74,7 +67,9 @@ func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error {

// Note that the state tracker expects us the track based on the original
// database and table names as opposed to the target ones.
w.StateTracker.UpdateLastSuccessfulPK(batch.TableSchema().String(), pkpos)
if w.StateTracker != nil {
w.StateTracker.UpdateLastSuccessfulPK(batch.TableSchema().String(), pkpos)
}

return nil
})
Expand Down
56 changes: 34 additions & 22 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
const caughtUpThreshold = 10 * time.Second

type BinlogStreamer struct {
Db *sql.DB
Config *Config
DB *sql.DB
DBConfig DatabaseConfig
MyServerId uint32
ErrorHandler ErrorHandler
Filter CopyFilter

Expand All @@ -35,37 +36,37 @@ type BinlogStreamer struct {
eventListeners []func([]DMLEvent) error
}

func (s *BinlogStreamer) Initialize() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reasoning for removing the Initialize method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a legacy of the original Ghostferry design. This method doesn't do much and just complicates the API. Anything it does can be done directly in the Run method, or within the Ferry.NewBinlogStreamer methods.

s.logger = logrus.WithField("tag", "binlog_streamer")
s.stopRequested = false
return nil
func (s *BinlogStreamer) ensureLogger() {
if s.logger == nil {
s.logger = logrus.WithField("tag", "binlog_streamer")
}
}

func (s *BinlogStreamer) createBinlogSyncer() error {
var err error
var tlsConfig *tls.Config

if s.Config.Source.TLS != nil {
tlsConfig, err = s.Config.Source.TLS.BuildConfig()
if s.DBConfig.TLS != nil {
tlsConfig, err = s.DBConfig.TLS.BuildConfig()
if err != nil {
return err
}
}

if s.Config.MyServerId == 0 {
s.Config.MyServerId, err = s.generateNewServerId()
if s.MyServerId == 0 {
s.MyServerId, err = s.generateNewServerId()
if err != nil {
s.logger.WithError(err).Error("could not generate unique server_id")
return err
}
}

syncerConfig := replication.BinlogSyncerConfig{
ServerID: s.Config.MyServerId,
Host: s.Config.Source.Host,
Port: s.Config.Source.Port,
User: s.Config.Source.User,
Password: s.Config.Source.Pass,
ServerID: s.MyServerId,
Host: s.DBConfig.Host,
Port: s.DBConfig.Port,
User: s.DBConfig.User,
Password: s.DBConfig.Pass,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

TLSConfig: tlsConfig,
UseDecimal: true,
TimestampStringLocation: time.UTC,
Expand All @@ -76,22 +77,31 @@ func (s *BinlogStreamer) createBinlogSyncer() error {
}

func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() error {
err := s.createBinlogSyncer()
s.ensureLogger()

currentPosition, err := ShowMasterStatusBinlogPosition(s.DB)
if err != nil {
s.logger.WithError(err).Error("failed to read current binlog position")
return err
}

s.logger.Info("reading current binlog position")
s.lastStreamedBinlogPosition, err = ShowMasterStatusBinlogPosition(s.Db)
return s.ConnectBinlogStreamerToMysqlFrom(currentPosition)
}

func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition mysql.Position) error {
s.ensureLogger()

err := s.createBinlogSyncer()
if err != nil {
s.logger.WithError(err).Error("failed to read current binlog position")
return err
}

s.lastStreamedBinlogPosition = startFromBinlogPosition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like this should only be set after the binlogSyncer has actually streamed the position. i.e. initialize the syncer and let the Run method set this after it has streamed the starting position

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having it set there would be kind of redundant as this is an internal tracking variable for the main BinlogStreamer loop. Basically, it would have to look like:

ConnectBinlogStreamerToMySqlFrom():
  s.startFromBinlogPosition = startFromBinlogPosition

Run():
  s.lastStreamedBinlogPosition = s.startFromBinlogPosition
  for ... s.lastStreamedBinlogPosition.Compare(s.targetBinlogPosition) < 0:
     ....

Without setting lastStreamedBinlogPosition before we start that loop, the conditional would be more complex.

Note that this variable has nothing to do with the StateTracker binlog position, which is set in BinlogWriter and once in Ferry during initial connection.


s.logger.WithFields(logrus.Fields{
"file": s.lastStreamedBinlogPosition.Name,
"pos": s.lastStreamedBinlogPosition.Pos,
}).Info("found binlog position, starting synchronization")
}).Info("starting binlog streaming")

s.binlogStreamer, err = s.binlogSyncer.StartSync(s.lastStreamedBinlogPosition)
if err != nil {
Expand All @@ -103,6 +113,8 @@ func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() error {
}

func (s *BinlogStreamer) Run() {
s.ensureLogger()

defer func() {
s.logger.Info("exiting binlog streamer")
s.binlogSyncer.Close()
Expand Down Expand Up @@ -192,7 +204,7 @@ func (s *BinlogStreamer) FlushAndStop() {
// passed the initial target position.
err := WithRetries(100, 600*time.Millisecond, s.logger, "read current binlog position", func() error {
var err error
s.targetBinlogPosition, err = ShowMasterStatusBinlogPosition(s.Db)
s.targetBinlogPosition, err = ShowMasterStatusBinlogPosition(s.DB)
return err
})

Expand Down Expand Up @@ -290,7 +302,7 @@ func (s *BinlogStreamer) generateNewServerId() (uint32, error) {
for {
id = randomServerId()

exists, err := idExistsOnServer(id, s.Db)
exists, err := idExistsOnServer(id, s.DB)
if err != nil {
return 0, err
}
Expand Down
14 changes: 4 additions & 10 deletions binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ghostferry

import (
"database/sql"
"errors"
"fmt"

"github.com/siddontang/go-mysql/schema"
Expand All @@ -25,17 +24,10 @@ type BinlogWriter struct {
logger *logrus.Entry
}

func (b *BinlogWriter) Initialize() error {
if b.StateTracker == nil {
return errors.New("must specify StateTracker")
}

func (b *BinlogWriter) Run() {
b.logger = logrus.WithField("tag", "binlog_writer")
b.binlogEventBuffer = make(chan DMLEvent, b.BatchSize)
return nil
}

func (b *BinlogWriter) Run() {
batch := make([]DMLEvent, 0, b.BatchSize)
for {
firstEvent := <-b.binlogEventBuffer
Expand Down Expand Up @@ -116,7 +108,9 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
return fmt.Errorf("exec query (%d bytes): %v", len(query), err)
}

b.StateTracker.UpdateLastWrittenBinlogPosition(events[len(events)-1].BinlogPosition())
if b.StateTracker != nil {
b.StateTracker.UpdateLastWrittenBinlogPosition(events[len(events)-1].BinlogPosition())
}

return nil
}
14 changes: 14 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,19 @@ type Config struct {
// Optional: defaults to false
AutomaticCutover bool

// This specifies whether or not Ferry.Run will handle SIGINT and SIGTERM
// by dumping the current state to stdout.
// This state can be used to resume Ghostferry.
DumpStateToStdoutOnSignal bool

// Config for the ControlServer
ServerBindAddr string
WebBasedir string

// The state to resume from as dumped by the PanicErrorHandler.
// If this is null, a new Ghostferry run will be started. Otherwise, the
// reconciliation process will start and Ghostferry will resume after that.
StateToResumeFrom *SerializableState
}

func (c *Config) ValidateConfig() error {
Expand All @@ -255,6 +265,10 @@ func (c *Config) ValidateConfig() error {
return fmt.Errorf("Table filter function must be provided")
}

if c.StateToResumeFrom != nil && c.StateToResumeFrom.GhostferryVersion != VersionString {
return fmt.Errorf("StateToResumeFrom version mismatch: resume = %s, current = %s", c.StateToResumeFrom.GhostferryVersion, VersionString)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just being defensive or there's a legitimate reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defensive for now as there's no guarantee that we won't change things. If we want to force resume, we can just set the version passed to Ghostferry to the same one as the Ghostferry instance that's launched.


if c.DBWriteRetries == 0 {
c.DBWriteRetries = 5
}
Expand Down
23 changes: 10 additions & 13 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,21 @@ type CursorConfig struct {
}

// returns a new Cursor with an embedded copy of itself
func (c *CursorConfig) NewCursor(table *schema.Table, maxPk uint64) *Cursor {
func (c *CursorConfig) NewCursor(table *schema.Table, startPk, maxPk uint64) *Cursor {
return &Cursor{
CursorConfig: *c,
Table: table,
MaxPrimaryKey: maxPk,
RowLock: true,
CursorConfig: *c,
Table: table,
MaxPrimaryKey: maxPk,
RowLock: true,
lastSuccessfulPrimaryKey: startPk,
}
}

// returns a new Cursor with an embedded copy of itself
func (c *CursorConfig) NewCursorWithoutRowLock(table *schema.Table, maxPk uint64) *Cursor {
return &Cursor{
CursorConfig: *c,
Table: table,
MaxPrimaryKey: maxPk,
RowLock: false,
}
func (c *CursorConfig) NewCursorWithoutRowLock(table *schema.Table, startPk, maxPk uint64) *Cursor {
cursor := c.NewCursor(table, startPk, maxPk)
cursor.RowLock = false
return cursor
}

type Cursor struct {
Expand All @@ -72,7 +70,6 @@ type Cursor struct {
}

func (c *Cursor) Each(f func(*RowBatch) error) error {
c.lastSuccessfulPrimaryKey = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was this here for to begin with?

Copy link
Contributor Author

@shuhaowu shuhaowu Oct 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to ensure that subsequent calls to .Each would start from scratch. It turns out we don't actually use this method in this way.

We only ever call this method once.

c.logger = logrus.WithFields(logrus.Fields{
"table": c.Table.String(),
"tag": "cursor",
Expand Down
33 changes: 23 additions & 10 deletions data_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package ghostferry

import (
"database/sql"
"errors"
"fmt"
"math"
"sync"

"github.com/siddontang/go-mysql/schema"
Expand All @@ -25,20 +25,18 @@ type DataIterator struct {
logger *logrus.Entry
}

func (d *DataIterator) Initialize() error {
func (d *DataIterator) Run() {
d.logger = logrus.WithField("tag", "data_iterator")
d.targetPKs = &sync.Map{}

// If a state tracker is not provided, then the caller doesn't care about
// tracking state. However, some methods are still useful so we initialize
// a minimal local instance.
if d.StateTracker == nil {
return errors.New("StateTracker must be defined")
d.StateTracker = NewStateTracker(0)
}

return nil
}

func (d *DataIterator) Run() {
d.logger.WithField("tablesCount", len(d.Tables)).Info("starting data iterator run")

tablesWithData, emptyTables, err := MaxPrimaryKeys(d.DB, d.Tables, d.logger)
if err != nil {
d.ErrorHandler.Fatal("data_iterator", err)
Expand All @@ -49,7 +47,14 @@ func (d *DataIterator) Run() {
}

for table, maxPk := range tablesWithData {
d.targetPKs.Store(table.String(), maxPk)
tableName := table.String()
if d.StateTracker.IsTableComplete(tableName) {
// In a previous run, the table may have been completed.
// We don't need to reiterate those tables as it has already been done.
delete(tablesWithData, table)
} else {
d.targetPKs.Store(table.String(), maxPk)
}
}

tablesQueue := make(chan *schema.Table)
Expand All @@ -76,7 +81,15 @@ func (d *DataIterator) Run() {
return
}

cursor := d.CursorConfig.NewCursor(table, targetPKInterface.(uint64))
startPk := d.StateTracker.LastSuccessfulPK(table.String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does table.String() include the schema? i.e. is it db.table or just table? former is more correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's db.table. There are some issues if either your schema name or your table name includes a `. However, this problem is somewhat wide spread.

if startPk == math.MaxUint64 {
err := fmt.Errorf("%v has been marked as completed but a table iterator has been spawned, this is likely a programmer error which resulted in the inconsistent starting state", table.String())
logger.WithError(err).Error("this is definitely a bug")
d.ErrorHandler.Fatal("data_iterator", err)
return
}

cursor := d.CursorConfig.NewCursor(table, startPk, targetPKInterface.(uint64))
err := cursor.Each(func(batch *RowBatch) error {
metrics.Count("RowEvent", int64(batch.Size()), []MetricTag{
MetricTag{"table", table.Name},
Expand Down
Loading