diff --git a/binlog_streamer.go b/binlog_streamer.go index 4bd7d546..fd4178e5 100644 --- a/binlog_streamer.go +++ b/binlog_streamer.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" sqlorig "database/sql" + "errors" "fmt" "time" @@ -16,6 +17,14 @@ import ( const caughtUpThreshold = 10 * time.Second +// this is passed into event handlers to keep track of state of the binlog event stream. +type BinlogEventState struct { + evPosition mysql.Position + isEventPositionResumable bool + isEventPositionValid bool + nextFilename string +} + type BinlogStreamer struct { DB *sql.DB DBConfig *DatabaseConfig @@ -48,6 +57,11 @@ type BinlogStreamer struct { logger *logrus.Entry eventListeners []func([]DMLEvent) error + // eventhandlers can be attached to binlog Replication Events + // for any event that does not have a specific handler attached, a default eventHandler + // is provided (defaultEventHandler). Event handlers are provided the replication binLogEvent + // and a state object that carries information about the state of the binlog event stream. + eventHandlers map[string]func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error) } func (s *BinlogStreamer) ensureLogger() { @@ -133,6 +147,93 @@ func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPositio return s.lastStreamedBinlogPosition, err } +// the default event handler is called for replication binLogEvents that do not have a +// separate event Handler registered. + +func (s *BinlogStreamer) defaultEventHandler(ev *replication.BinlogEvent, query []byte, es *BinlogEventState) ([]byte, error) { + var err error + switch e := ev.Event.(type) { + case *replication.RotateEvent: + // This event is used to keep the "current binlog filename" of the binlog streamer in sync. + es.nextFilename = string(e.NextLogName) + + isFakeRotateEvent := ev.Header.LogPos == 0 && ev.Header.Timestamp == 0 + if isFakeRotateEvent { + // Sometimes the RotateEvent is fake and not a real rotation. we want to ignore the log position in the header for those events + // https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L278-L287 + // https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L904-L907 + + // However, we can always advance our lastStreamedBinlogPosition according to its data fields + es.evPosition = mysql.Position{ + Name: string(e.NextLogName), + Pos: uint32(e.Position), + } + } + + s.logger.WithFields(logrus.Fields{ + "new_position": es.evPosition.Pos, + "new_filename": es.evPosition.Name, + "last_position": s.lastStreamedBinlogPosition.Pos, + "last_filename": s.lastStreamedBinlogPosition.Name, + }).Info("binlog file rotated") + case *replication.FormatDescriptionEvent: + // This event is sent: + // 1) when our replication client connects to mysql + // 2) at the beginning of each binlog file + // + // For (1), if we are starting the binlog from a position that's greater + // than BIN_LOG_HEADER_SIZE (currently, 4th byte), this event's position + // is explicitly set to 0 and should not be considered valid according to + // the mysql source. See: + // https://github.com/percona/percona-server/blob/93165de1451548ff11dd32c3d3e5df0ff28cfcfa/sql/rpl_binlog_sender.cc#L1020-L1026 + es.isEventPositionValid = ev.Header.LogPos != 0 + case *replication.RowsQueryEvent: + // A RowsQueryEvent will always precede the corresponding RowsEvent + // if binlog_rows_query_log_events is enabled, and is used to get + // the full query that was executed on the master (with annotations) + // that is otherwise not possible to reconstruct + query = ev.Event.(*replication.RowsQueryEvent).Query + case *replication.RowsEvent: + err = s.handleRowsEvent(ev, query) + if err != nil { + s.logger.WithError(err).Error("failed to handle rows event") + s.ErrorHandler.Fatal("binlog_streamer", err) + } + case *replication.XIDEvent, *replication.GTIDEvent: + // With regards to DMLs, we see (at least) the following sequence + // of events in the binlog stream: + // + // - GTIDEvent <- START of transaction + // - QueryEvent + // - RowsQueryEvent + // - TableMapEvent + // - RowsEvent + // - RowsEvent + // - XIDEvent <- END of transaction + // + // *NOTE* + // + // First, RowsQueryEvent is only available with `binlog_rows_query_log_events` + // set to "ON". + // + // Second, there will be at least one (but potentially more) RowsEvents + // depending on the number of rows updated in the transaction. + // + // Lastly, GTIDEvents will only be available if they are enabled. + // + // As a result, the following case will set the last resumable position for + // interruption to EITHER the start (if using GTIDs) or the end of the + // last transaction + es.isEventPositionResumable = true + + // Here we also reset the query event as we are either at the beginning + // or the end of the current/next transaction. As such, the query will be + // reset following the next RowsQueryEvent before the corresponding RowsEvent(s) + query = nil + } + return query, err +} + func (s *BinlogStreamer) Run() { s.ensureLogger() @@ -145,13 +246,13 @@ func (s *BinlogStreamer) Run() { }() var query []byte + es := BinlogEventState{} currentFilename := s.lastStreamedBinlogPosition.Name - nextFilename := s.lastStreamedBinlogPosition.Name - + es.nextFilename = s.lastStreamedBinlogPosition.Name s.logger.Info("starting binlog streamer") for !s.stopRequested || (s.stopRequested && s.lastStreamedBinlogPosition.Compare(s.stopAtBinlogPosition) < 0) { - currentFilename = nextFilename + currentFilename = es.nextFilename var ev *replication.BinlogEvent var timedOut bool var err error @@ -174,109 +275,62 @@ func (s *BinlogStreamer) Run() { continue } - evPosition := mysql.Position{ + es.evPosition = mysql.Position{ Name: currentFilename, Pos: ev.Header.LogPos, } s.logger.WithFields(logrus.Fields{ - "position": evPosition.Pos, - "file": evPosition.Name, + "position": es.evPosition.Pos, + "file": es.evPosition.Name, "type": fmt.Sprintf("%T", ev.Event), "lastStreamedBinlogPosition": s.lastStreamedBinlogPosition, }).Debug("reached position") - isEventPositionResumable := false - isEventPositionValid := true - - switch e := ev.Event.(type) { - case *replication.RotateEvent: - // This event is used to keep the "current binlog filename" of the binlog streamer in sync. - nextFilename = string(e.NextLogName) - - isFakeRotateEvent := ev.Header.LogPos == 0 && ev.Header.Timestamp == 0 - if isFakeRotateEvent { - // Sometimes the RotateEvent is fake and not a real rotation. we want to ignore the log position in the header for those events - // https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L278-L287 - // https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L904-L907 - - // However, we can always advance our lastStreamedBinlogPosition according to its data fields - evPosition = mysql.Position{ - Name: string(e.NextLogName), - Pos: uint32(e.Position), - } - } + es.isEventPositionResumable = false + es.isEventPositionValid = true - s.logger.WithFields(logrus.Fields{ - "new_position": evPosition.Pos, - "new_filename": evPosition.Name, - "last_position": s.lastStreamedBinlogPosition.Pos, - "last_filename": s.lastStreamedBinlogPosition.Name, - }).Info("binlog file rotated") - case *replication.FormatDescriptionEvent: - // This event is sent: - // 1) when our replication client connects to mysql - // 2) at the beginning of each binlog file - // - // For (1), if we are starting the binlog from a position that's greater - // than BIN_LOG_HEADER_SIZE (currently, 4th byte), this event's position - // is explicitly set to 0 and should not be considered valid according to - // the mysql source. See: - // https://github.com/percona/percona-server/blob/93165de1451548ff11dd32c3d3e5df0ff28cfcfa/sql/rpl_binlog_sender.cc#L1020-L1026 - isEventPositionValid = ev.Header.LogPos != 0 - case *replication.RowsQueryEvent: - // A RowsQueryEvent will always precede the corresponding RowsEvent - // if binlog_rows_query_log_events is enabled, and is used to get - // the full query that was executed on the master (with annotations) - // that is otherwise not possible to reconstruct - query = ev.Event.(*replication.RowsQueryEvent).Query - case *replication.RowsEvent: - err = s.handleRowsEvent(ev, query) + // if there is a handler associated with this eventType, call it + eventTypeString := ev.Header.EventType.String() + if handler, ok := s.eventHandlers[eventTypeString]; ok { + query, err = handler(ev, query, &es) if err != nil { - s.logger.WithError(err).Error("failed to handle rows event") + s.logger.WithError(err).Error("failed to handle event") s.ErrorHandler.Fatal("binlog_streamer", err) } - case *replication.XIDEvent, *replication.GTIDEvent: - // With regards to DMLs, we see (at least) the following sequence - // of events in the binlog stream: - // - // - GTIDEvent <- START of transaction - // - QueryEvent - // - RowsQueryEvent - // - TableMapEvent - // - RowsEvent - // - RowsEvent - // - XIDEvent <- END of transaction - // - // *NOTE* - // - // First, RowsQueryEvent is only available with `binlog_rows_query_log_events` - // set to "ON". - // - // Second, there will be at least one (but potentially more) RowsEvents - // depending on the number of rows updated in the transaction. - // - // Lastly, GTIDEvents will only be available if they are enabled. - // - // As a result, the following case will set the last resumable position for - // interruption to EITHER the start (if using GTIDs) or the end of the - // last transaction - isEventPositionResumable = true - - // Here we also reset the query event as we are either at the beginning - // or the end of the current/next transaction. As such, the query will be - // reset following the next RowsQueryEvent before the corresponding RowsEvent(s) - query = nil + } else { + // call the default event handler for everything else + query, err = s.defaultEventHandler(ev, query, &es) } - if isEventPositionValid { + if es.isEventPositionValid { evType := fmt.Sprintf("%T", ev.Event) evTimestamp := ev.Header.Timestamp - s.updateLastStreamedPosAndTime(evTimestamp, evPosition, evType, isEventPositionResumable) + s.updateLastStreamedPosAndTime(evTimestamp, es.evPosition, evType, es.isEventPositionResumable) } } } +// Attach an event handler to a replication BinLogEvent +// We only support attaching events to any of the events defined in +// https://github.com/go-mysql-org/go-mysql/blob/master/replication/const.go +// custom event handlers are provided the replication BinLogEvent and a state object +// that carries the current state of the binlog event stream. +func (s *BinlogStreamer) AddBinlogEventHandler(evType replication.EventType, eh func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error)) error { + // verify that event-type is valid + // if eventTypeString is unrecognized, bail + eventTypeString := evType.String() + if eventTypeString == "UnknownEvent" { + return errors.New("Unknown event type") + } + + if s.eventHandlers == nil { + s.eventHandlers = make(map[string]func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error)) + } + s.eventHandlers[eventTypeString] = eh + return nil +} + func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error) { s.eventListeners = append(s.eventListeners, listener) } diff --git a/sharding/test/trivial_integration_test.go b/sharding/test/trivial_integration_test.go index 9c3ee47e..4c27a23b 100644 --- a/sharding/test/trivial_integration_test.go +++ b/sharding/test/trivial_integration_test.go @@ -1,10 +1,11 @@ package test import ( - sql "github.com/Shopify/ghostferry/sqlwrapper" "math/rand" "testing" + sql "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/Shopify/ghostferry/sharding" "github.com/Shopify/ghostferry/testhelpers" "github.com/stretchr/testify/assert" diff --git a/test/go/binlog_streamer_test.go b/test/go/binlog_streamer_test.go index e1a204f7..b16a4088 100644 --- a/test/go/binlog_streamer_test.go +++ b/test/go/binlog_streamer_test.go @@ -11,6 +11,7 @@ import ( "github.com/Shopify/ghostferry" "github.com/Shopify/ghostferry/testhelpers" + "github.com/go-mysql-org/go-mysql/replication" "github.com/stretchr/testify/suite" ) @@ -195,6 +196,20 @@ func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsQueryEventOnRowsEvent this.Require().True(eventAsserted) } +func (this *BinlogStreamerTestSuite) TestBinlogStreamerAddEventHandlerEventTypes() { + qe := func(ev *replication.BinlogEvent, query []byte, es *ghostferry.BinlogEventState) ([]byte, error) { + return query, nil + } + + // try attaching a handler to a valid event type + err := this.binlogStreamer.AddBinlogEventHandler(replication.TABLE_MAP_EVENT, qe) + this.Require().Nil(err) + + // try attaching a handler to an invalid event type + err = this.binlogStreamer.AddBinlogEventHandler(replication.EventType(byte(0)), qe) + this.Require().NotNil(err) +} + func TestBinlogStreamerTestSuite(t *testing.T) { testhelpers.SetupTest() suite.Run(t, &BinlogStreamerTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}}) diff --git a/test/integration/ddl_events_test.rb b/test/integration/ddl_events_test.rb new file mode 100644 index 00000000..c6d608d4 --- /dev/null +++ b/test/integration/ddl_events_test.rb @@ -0,0 +1,47 @@ +require "test_helper" + +class DdlEventsTest < GhostferryTestCase + DDL_GHOSTFERRY = "ddl_ghostferry" + + def test_default_event_handler + seed_simple_database_with_single_table + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + ghostferry.run_with_logs() + + assert_ghostferry_completed(ghostferry, times: 1) + end + + def test_ddl_event_handler + seed_simple_database_with_single_table + + ghostferry = new_ghostferry(DDL_GHOSTFERRY) + ghostferry.run_with_logs() + + assert_ghostferry_completed(ghostferry, times: 1) + end + + def test_ddl_event_handler_with_ddl_events + seed_simple_database_with_single_table + + table_name = full_table_name(DEFAULT_DB, DEFAULT_TABLE) + + ghostferry = new_ghostferry(DDL_GHOSTFERRY) + + ghostferry.on_status(GhostferryHelper::Ghostferry::Status::BINLOG_STREAMING_STARTED) do + source_db.query("INSERT INTO #{table_name} VALUES (9000, 'test')") + source_db.query("ALTER TABLE #{table_name} ADD INDEX (data(100))") + source_db.query("INSERT INTO #{table_name} (id, data) VALUES (9001, 'test')") + end + + ghostferry.run_expecting_failure + + source, target = source_and_target_table_metrics + source_count = source[DEFAULT_FULL_TABLE_NAME][:row_count] + target_count = target[DEFAULT_FULL_TABLE_NAME][:row_count] + + refute_equal(source_count, target_count, "target should have fewer rows than source") + + end +end diff --git a/test/lib/go/ddl_ghostferry/main.go b/test/lib/go/ddl_ghostferry/main.go new file mode 100644 index 00000000..8d867982 --- /dev/null +++ b/test/lib/go/ddl_ghostferry/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "errors" + + "github.com/Shopify/ghostferry" + tf "github.com/Shopify/ghostferry/test/lib/go/integrationferry" + "github.com/go-mysql-org/go-mysql/replication" +) + +func queryEventHandler(ev *replication.BinlogEvent, query []byte, es *ghostferry.BinlogEventState) ([]byte, error) { + query = ev.Event.(*replication.QueryEvent).Query + return query, errors.New("Query event") +} + +func AfterInitialize(f *tf.IntegrationFerry) error { + f.Ferry.BinlogStreamer.AddBinlogEventHandler(replication.QUERY_EVENT, queryEventHandler) + return nil +} + +func main() { + c := tf.RunCallbacks{ + AfterInitialize: AfterInitialize, + } + f := tf.Setup(&c) /* pass in initializers */ + + err := tf.Run(f) + if err != nil { + panic(err) + } +} diff --git a/test/lib/go/integrationferry.go b/test/lib/go/integrationferry/ferry.go similarity index 81% rename from test/lib/go/integrationferry.go rename to test/lib/go/integrationferry/ferry.go index 9f43c3e3..f1b4ce89 100644 --- a/test/lib/go/integrationferry.go +++ b/test/lib/go/integrationferry/ferry.go @@ -1,4 +1,4 @@ -package main +package integrationferry import ( "encoding/json" @@ -39,8 +39,14 @@ const ( StatusAfterBinlogApply string = "AFTER_BINLOG_APPLY" ) +type RunCallbacks struct { + BeforeInitialize func(*IntegrationFerry) error + AfterInitialize func(*IntegrationFerry) error +} + type IntegrationFerry struct { *ghostferry.Ferry + callbacks *RunCallbacks } // ========================================= @@ -257,7 +263,86 @@ func NewStandardConfig() (*ghostferry.Config, error) { return config, config.ValidateConfig() } -func main() { +func Run(f *IntegrationFerry) error { + err := f.SendStatusAndWaitUntilContinue(StatusReady) + if err != nil { + return err + } + + if f.callbacks.BeforeInitialize != nil { + if err := f.callbacks.BeforeInitialize(f); err != nil { + return err + } + } + + // run before callback + err = f.Initialize() + if err != nil { + return err + } + + // run after callback + if f.callbacks.AfterInitialize != nil { + if err := f.callbacks.AfterInitialize(f); err != nil { + return err + } + } + + err = f.Start() + if err != nil { + return err + } + + defer f.StopTargetVerifier() + + err = f.SendStatusAndWaitUntilContinue(StatusBinlogStreamingStarted) + if err != nil { + return err + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + f.Run() + }() + + f.WaitUntilRowCopyIsComplete() + err = f.SendStatusAndWaitUntilContinue(StatusRowCopyCompleted) + if err != nil { + return err + } + + // TODO: this method should return errors rather than calling + // the error handler to panic directly. + f.FlushBinlogAndStopStreaming() + wg.Wait() + + if f.Verifier != nil { + err := f.SendStatusAndWaitUntilContinue(StatusVerifyDuringCutover) + if err != nil { + return err + } + + result, err := f.Verifier.VerifyDuringCutover() + if err != nil { + return err + } + + // We now send the results back to the integration server as each verifier + // might log them differently, making it difficult to assert that the + // incorrect table was caught from the logs + err = f.SendStatusAndWaitUntilContinue(StatusVerified, result.IncorrectTables...) + if err != nil { + return err + } + + } + return f.SendStatusAndWaitUntilContinue(StatusDone) +} + +func Setup(c *RunCallbacks) *IntegrationFerry { logrus.SetFormatter(&logrus.JSONFormatter{}) logrus.SetLevel(logrus.DebugLevel) if os.Getenv("CI") == "true" { @@ -297,6 +382,7 @@ func main() { Ferry: &ghostferry.Ferry{ Config: config, }, + callbacks: c, } integrationPort := os.Getenv(portEnvName) @@ -312,8 +398,5 @@ func main() { DumpStateToStdoutOnError: true, } - err = f.Main() - if err != nil { - panic(err) - } + return f } diff --git a/test/lib/go/minimal_ghostferry/main.go b/test/lib/go/minimal_ghostferry/main.go new file mode 100644 index 00000000..7c11c46e --- /dev/null +++ b/test/lib/go/minimal_ghostferry/main.go @@ -0,0 +1,15 @@ +package main + +import ( + tf "github.com/Shopify/ghostferry/test/lib/go/integrationferry" +) + +func main() { + c := tf.RunCallbacks{} + f := tf.Setup(&c) /* pass in initializers */ + + err := tf.Run(f) + if err != nil { + panic(err) + } +} diff --git a/test/test_helper.rb b/test/test_helper.rb index f4cddca3..59427bb2 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -44,18 +44,18 @@ class GhostferryTestCase < Minitest::Test include DbHelper include DataWriterHelper - MINIMAL_GHOSTFERRY = "integrationferry.go" + MINIMAL_GHOSTFERRY = "minimal_ghostferry" - def new_ghostferry(filename, config: {}) + def new_ghostferry(filepath, config: {}) # Transform path to something ruby understands - path = File.join(GO_CODE_PATH, filename) + path = File.join(GO_CODE_PATH, filepath, "main.go") g = Ghostferry.new(path, config: config, logger: @log_capturer.logger) @ghostferry_instances << g g end - def new_ghostferry_with_interrupt_after_row_copy(filename, config: {}, after_batches_written: 0) - g = new_ghostferry(filename, config) + def new_ghostferry_with_interrupt_after_row_copy(filepath, config: {}, after_batches_written: 0) + g = new_ghostferry(filepath, config) batches_written = 0 g.on_status(Ghostferry::Status::AFTER_ROW_COPY) do