Skip to content

Commit

Permalink
Clean up the ddl handlers a little
Browse files Browse the repository at this point in the history
Co-authored-by: Author name <pawan.dubey@shopify.com>
  • Loading branch information
shivnagarajan and pawandubey committed Dec 8, 2021
1 parent a70b04b commit acec2f4
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 70 deletions.
98 changes: 33 additions & 65 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"context"
"crypto/tls"
sqlorig "database/sql"
"errors"
"fmt"
"time"

sql "github.com/Shopify/ghostferry/sqlwrapper"
"github.com/blastrain/vitess-sqlparser/sqlparser"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
Expand All @@ -18,7 +16,7 @@ import (

const caughtUpThreshold = 10 * time.Second

type ReplicationEventContext struct {
type BinlogEventState struct {
evPosition mysql.Position
isEventPositionResumable bool
isEventPositionValid bool
Expand All @@ -45,7 +43,6 @@ type BinlogStreamer struct {
// See https://github.com/Shopify/ghostferry/pull/258 for details
DatabaseRewrites map[string]string
TableRewrites map[string]string
EventHandlers map[string]func(*replication.BinlogEvent, []byte, *ReplicationEventContext) ([]byte, error)

lastStreamedBinlogPosition mysql.Position
lastResumableBinlogPosition mysql.Position
Expand All @@ -58,6 +55,7 @@ type BinlogStreamer struct {

logger *logrus.Entry
eventListeners []func([]DMLEvent) error
eventHandlers map[string]func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error)
}

func (s *BinlogStreamer) ensureLogger() {
Expand Down Expand Up @@ -143,43 +141,12 @@ func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPositio
return s.lastStreamedBinlogPosition, err
}

type HandlerError struct {
err error
fatal bool
}

func (h *HandlerError) Error() string {
return fmt.Sprintf("%v", h.err)
}

func QueryEventHandler(ev *replication.BinlogEvent, query []byte, reContext *ReplicationEventContext) ([]byte, error) {
handlerError := HandlerError{}
var err error
query = ev.Event.(*replication.QueryEvent).Query
if string(query) == "BEGIN" {
return query, nil
}
statement, err := sqlparser.Parse(string(query))
if err != nil {
handlerError.err = errors.New("failed to parse query event")
handlerError.fatal = true
return query, &handlerError
}
switch statement.(type) {
case *sqlparser.DDL:
handlerError.err = errors.New("unexpected DDL event")
handlerError.fatal = true
return query, &handlerError
}
return query, nil
}

func (s *BinlogStreamer) DefaultEventHandler(ev *replication.BinlogEvent, query []byte, reContext *ReplicationEventContext) ([]byte, error) {
func (s *BinlogStreamer) defaultEventHandler(ev *replication.BinlogEvent, query []byte, es *BinlogEventState) ([]byte, error) {
var err error
switch e := ev.Event.(type) {
case *replication.RotateEvent:
// This event is used to keep the "current binlog filename" of the binlog streamer in sync.
reContext.nextFilename = string(e.NextLogName)
es.nextFilename = string(e.NextLogName)

isFakeRotateEvent := ev.Header.LogPos == 0 && ev.Header.Timestamp == 0
if isFakeRotateEvent {
Expand All @@ -188,15 +155,15 @@ func (s *BinlogStreamer) DefaultEventHandler(ev *replication.BinlogEvent, query
// https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L904-L907

// However, we can always advance our lastStreamedBinlogPosition according to its data fields
reContext.evPosition = mysql.Position{
es.evPosition = mysql.Position{
Name: string(e.NextLogName),
Pos: uint32(e.Position),
}
}

s.logger.WithFields(logrus.Fields{
"new_position": reContext.evPosition.Pos,
"new_filename": reContext.evPosition.Name,
"new_position": es.evPosition.Pos,
"new_filename": es.evPosition.Name,
"last_position": s.lastStreamedBinlogPosition.Pos,
"last_filename": s.lastStreamedBinlogPosition.Name,
}).Info("binlog file rotated")
Expand All @@ -210,7 +177,7 @@ func (s *BinlogStreamer) DefaultEventHandler(ev *replication.BinlogEvent, query
// is explicitly set to 0 and should not be considered valid according to
// the mysql source. See:
// https://github.com/percona/percona-server/blob/93165de1451548ff11dd32c3d3e5df0ff28cfcfa/sql/rpl_binlog_sender.cc#L1020-L1026
reContext.isEventPositionValid = ev.Header.LogPos != 0
es.isEventPositionValid = ev.Header.LogPos != 0
case *replication.RowsQueryEvent:
// A RowsQueryEvent will always precede the corresponding RowsEvent
// if binlog_rows_query_log_events is enabled, and is used to get
Expand Down Expand Up @@ -248,7 +215,7 @@ func (s *BinlogStreamer) DefaultEventHandler(ev *replication.BinlogEvent, query
// As a result, the following case will set the last resumable position for
// interruption to EITHER the start (if using GTIDs) or the end of the
// last transaction
reContext.isEventPositionResumable = true
es.isEventPositionResumable = true

// Here we also reset the query event as we are either at the beginning
// or the end of the current/next transaction. As such, the query will be
Expand All @@ -270,13 +237,13 @@ func (s *BinlogStreamer) Run() {
}()

var query []byte
reContext := ReplicationEventContext{}
es := BinlogEventState{}

currentFilename := s.lastStreamedBinlogPosition.Name
reContext.nextFilename = s.lastStreamedBinlogPosition.Name
es.nextFilename = s.lastStreamedBinlogPosition.Name
s.logger.Info("starting binlog streamer")
for !s.stopRequested || (s.stopRequested && s.lastStreamedBinlogPosition.Compare(s.stopAtBinlogPosition) < 0) {
currentFilename = reContext.nextFilename
currentFilename = es.nextFilename
var ev *replication.BinlogEvent
var timedOut bool
var err error
Expand All @@ -299,46 +266,47 @@ func (s *BinlogStreamer) Run() {
continue
}

reContext.evPosition = mysql.Position{
es.evPosition = mysql.Position{
Name: currentFilename,
Pos: ev.Header.LogPos,
}

s.logger.WithFields(logrus.Fields{
"position": reContext.evPosition.Pos,
"file": reContext.evPosition.Name,
"position": es.evPosition.Pos,
"file": es.evPosition.Name,
"type": fmt.Sprintf("%T", ev.Event),
"lastStreamedBinlogPosition": s.lastStreamedBinlogPosition,
}).Debug("reached position")

reContext.isEventPositionResumable = false
reContext.isEventPositionValid = true
es.isEventPositionResumable = false
es.isEventPositionValid = true

eventTypeString := ev.Header.EventType.String()
handler := s.EventHandlers[eventTypeString]
if handler == nil {
query, err = s.DefaultEventHandler(ev, query, &reContext)
} else {
query, err = handler(ev, query, &reContext)
handlerError, ok := err.(*HandlerError)
if ok {
if handlerError.err != nil {
s.logger.WithError(handlerError.err).Error("failed to handle event")
if handlerError.fatal {
s.ErrorHandler.Fatal("binlog_streamer", handlerError.err)
}
}
if handler := s.eventHandlers[eventTypeString]; handler != nil {
query, err = handler(ev, query, &es)
if err != nil {
s.logger.WithError(err).Error("failed to handle event")
s.ErrorHandler.Fatal("binlog_streamer", err)
}
} else {
query, err = s.defaultEventHandler(ev, query, &es)
}

if reContext.isEventPositionValid {
if es.isEventPositionValid {
evType := fmt.Sprintf("%T", ev.Event)
evTimestamp := ev.Header.Timestamp
s.updateLastStreamedPosAndTime(evTimestamp, reContext.evPosition, evType, reContext.isEventPositionResumable)
s.updateLastStreamedPosAndTime(evTimestamp, es.evPosition, evType, es.isEventPositionResumable)
}
}
}

func (s *BinlogStreamer) AddBinlogEventHandler(ev string, eh func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error)) {
if s.eventHandlers == nil {
s.eventHandlers = make(map[string]func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error))
}
s.eventHandlers[ev] = eh
}

func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error) {
s.eventListeners = append(s.eventListeners, listener)
}
Expand Down
26 changes: 21 additions & 5 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

sql "github.com/Shopify/ghostferry/sqlwrapper"
"github.com/blastrain/vitess-sqlparser/sqlparser"

_ "net/http/pprof"

Expand Down Expand Up @@ -128,7 +129,9 @@ func (f *Ferry) NewDataIteratorWithoutStateTracker() *DataIterator {
}

func (f *Ferry) NewSourceBinlogStreamer() *BinlogStreamer {
return f.newBinlogStreamer(f.SourceDB, f.Config.Source, nil, nil, "source_binlog_streamer")
b := f.newBinlogStreamer(f.SourceDB, f.Config.Source, nil, nil, "source_binlog_streamer")
b.AddBinlogEventHandler("QueryEvent", queryEventHandler)
return b
}

func (f *Ferry) NewTargetBinlogStreamer() (*BinlogStreamer, error) {
Expand All @@ -145,12 +148,26 @@ func (f *Ferry) NewTargetBinlogStreamer() (*BinlogStreamer, error) {
return f.newBinlogStreamer(f.TargetDB, f.Config.Target, schemaRewrites, tableRewrites, "target_binlog_streamer"), nil
}

func queryEventHandler(ev *replication.BinlogEvent, query []byte, es *BinlogEventState) ([]byte, error) {
var err error
query = ev.Event.(*replication.QueryEvent).Query
if string(query) == "BEGIN" {
return query, nil
}
statement, err := sqlparser.Parse(string(query))
if err != nil {
return query, errors.New("failed to parse query event")
}
switch statement.(type) {
case *sqlparser.DDL:
return query, errors.New("unexpected DDL event")
}
return query, nil
}

func (f *Ferry) newBinlogStreamer(db *sql.DB, dbConf *DatabaseConfig, schemaRewrites, tableRewrites map[string]string, logTag string) *BinlogStreamer {
f.ensureInitialized()

eventHandlers := make(map[string]func(*replication.BinlogEvent, []byte, *ReplicationEventContext) ([]byte, error))
eventHandlers["QueryEvent"] = QueryEventHandler

return &BinlogStreamer{
DB: db,
DBConfig: dbConf,
Expand All @@ -161,7 +178,6 @@ func (f *Ferry) newBinlogStreamer(db *sql.DB, dbConf *DatabaseConfig, schemaRewr
LogTag: logTag,
DatabaseRewrites: schemaRewrites,
TableRewrites: tableRewrites,
EventHandlers: eventHandlers,
}
}

Expand Down

0 comments on commit acec2f4

Please sign in to comment.