Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect ddl add handlers #323

Merged
merged 3 commits into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 139 additions & 85 deletions binlog_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
sqlorig "database/sql"
"errors"
"fmt"
"time"

Expand All @@ -16,6 +17,14 @@ import (

const caughtUpThreshold = 10 * time.Second

// this is passed into event handlers to keep track of state of the binlog event stream.
type BinlogEventState struct {
evPosition mysql.Position
isEventPositionResumable bool
isEventPositionValid bool
nextFilename string
}

type BinlogStreamer struct {
DB *sql.DB
DBConfig *DatabaseConfig
Expand Down Expand Up @@ -48,6 +57,11 @@ type BinlogStreamer struct {

logger *logrus.Entry
eventListeners []func([]DMLEvent) error
// eventhandlers can be attached to binlog Replication Events
// for any event that does not have a specific handler attached, a default eventHandler
// is provided (defaultEventHandler). Event handlers are provided the replication binLogEvent
// and a state object that carries information about the state of the binlog event stream.
eventHandlers map[string]func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error)
}

func (s *BinlogStreamer) ensureLogger() {
Expand Down Expand Up @@ -133,6 +147,93 @@ func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPositio
return s.lastStreamedBinlogPosition, err
}

// the default event handler is called for replication binLogEvents that do not have a
// separate event Handler registered.

func (s *BinlogStreamer) defaultEventHandler(ev *replication.BinlogEvent, query []byte, es *BinlogEventState) ([]byte, error) {
shivnagarajan marked this conversation as resolved.
Show resolved Hide resolved
var err error
switch e := ev.Event.(type) {
case *replication.RotateEvent:
// This event is used to keep the "current binlog filename" of the binlog streamer in sync.
es.nextFilename = string(e.NextLogName)

isFakeRotateEvent := ev.Header.LogPos == 0 && ev.Header.Timestamp == 0
if isFakeRotateEvent {
// Sometimes the RotateEvent is fake and not a real rotation. we want to ignore the log position in the header for those events
// https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L278-L287
// https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L904-L907

// However, we can always advance our lastStreamedBinlogPosition according to its data fields
es.evPosition = mysql.Position{
Name: string(e.NextLogName),
Pos: uint32(e.Position),
}
}

s.logger.WithFields(logrus.Fields{
"new_position": es.evPosition.Pos,
"new_filename": es.evPosition.Name,
"last_position": s.lastStreamedBinlogPosition.Pos,
"last_filename": s.lastStreamedBinlogPosition.Name,
}).Info("binlog file rotated")
case *replication.FormatDescriptionEvent:
// This event is sent:
// 1) when our replication client connects to mysql
// 2) at the beginning of each binlog file
//
// For (1), if we are starting the binlog from a position that's greater
// than BIN_LOG_HEADER_SIZE (currently, 4th byte), this event's position
// is explicitly set to 0 and should not be considered valid according to
// the mysql source. See:
// https://github.com/percona/percona-server/blob/93165de1451548ff11dd32c3d3e5df0ff28cfcfa/sql/rpl_binlog_sender.cc#L1020-L1026
es.isEventPositionValid = ev.Header.LogPos != 0
case *replication.RowsQueryEvent:
// A RowsQueryEvent will always precede the corresponding RowsEvent
// if binlog_rows_query_log_events is enabled, and is used to get
// the full query that was executed on the master (with annotations)
// that is otherwise not possible to reconstruct
query = ev.Event.(*replication.RowsQueryEvent).Query
case *replication.RowsEvent:
err = s.handleRowsEvent(ev, query)
if err != nil {
s.logger.WithError(err).Error("failed to handle rows event")
s.ErrorHandler.Fatal("binlog_streamer", err)
}
case *replication.XIDEvent, *replication.GTIDEvent:
// With regards to DMLs, we see (at least) the following sequence
// of events in the binlog stream:
//
// - GTIDEvent <- START of transaction
// - QueryEvent
// - RowsQueryEvent
// - TableMapEvent
// - RowsEvent
// - RowsEvent
// - XIDEvent <- END of transaction
//
// *NOTE*
//
// First, RowsQueryEvent is only available with `binlog_rows_query_log_events`
// set to "ON".
//
// Second, there will be at least one (but potentially more) RowsEvents
// depending on the number of rows updated in the transaction.
//
// Lastly, GTIDEvents will only be available if they are enabled.
//
// As a result, the following case will set the last resumable position for
// interruption to EITHER the start (if using GTIDs) or the end of the
// last transaction
es.isEventPositionResumable = true

// Here we also reset the query event as we are either at the beginning
// or the end of the current/next transaction. As such, the query will be
// reset following the next RowsQueryEvent before the corresponding RowsEvent(s)
query = nil
}
return query, err
}

func (s *BinlogStreamer) Run() {
s.ensureLogger()

Expand All @@ -145,13 +246,13 @@ func (s *BinlogStreamer) Run() {
}()

var query []byte
es := BinlogEventState{}

currentFilename := s.lastStreamedBinlogPosition.Name
nextFilename := s.lastStreamedBinlogPosition.Name

es.nextFilename = s.lastStreamedBinlogPosition.Name
s.logger.Info("starting binlog streamer")
for !s.stopRequested || (s.stopRequested && s.lastStreamedBinlogPosition.Compare(s.stopAtBinlogPosition) < 0) {
currentFilename = nextFilename
currentFilename = es.nextFilename
var ev *replication.BinlogEvent
var timedOut bool
var err error
Expand All @@ -174,109 +275,62 @@ func (s *BinlogStreamer) Run() {
continue
}

evPosition := mysql.Position{
es.evPosition = mysql.Position{
Name: currentFilename,
Pos: ev.Header.LogPos,
}

s.logger.WithFields(logrus.Fields{
"position": evPosition.Pos,
"file": evPosition.Name,
"position": es.evPosition.Pos,
"file": es.evPosition.Name,
"type": fmt.Sprintf("%T", ev.Event),
"lastStreamedBinlogPosition": s.lastStreamedBinlogPosition,
}).Debug("reached position")

isEventPositionResumable := false
isEventPositionValid := true

switch e := ev.Event.(type) {
case *replication.RotateEvent:
// This event is used to keep the "current binlog filename" of the binlog streamer in sync.
nextFilename = string(e.NextLogName)

isFakeRotateEvent := ev.Header.LogPos == 0 && ev.Header.Timestamp == 0
if isFakeRotateEvent {
// Sometimes the RotateEvent is fake and not a real rotation. we want to ignore the log position in the header for those events
// https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L278-L287
// https://github.com/percona/percona-server/blob/3ff016a46ce2cde58d8007ec9834f958da53cbea/sql/rpl_binlog_sender.cc#L904-L907

// However, we can always advance our lastStreamedBinlogPosition according to its data fields
evPosition = mysql.Position{
Name: string(e.NextLogName),
Pos: uint32(e.Position),
}
}
es.isEventPositionResumable = false
es.isEventPositionValid = true

s.logger.WithFields(logrus.Fields{
"new_position": evPosition.Pos,
"new_filename": evPosition.Name,
"last_position": s.lastStreamedBinlogPosition.Pos,
"last_filename": s.lastStreamedBinlogPosition.Name,
}).Info("binlog file rotated")
case *replication.FormatDescriptionEvent:
// This event is sent:
// 1) when our replication client connects to mysql
// 2) at the beginning of each binlog file
//
// For (1), if we are starting the binlog from a position that's greater
// than BIN_LOG_HEADER_SIZE (currently, 4th byte), this event's position
// is explicitly set to 0 and should not be considered valid according to
// the mysql source. See:
// https://github.com/percona/percona-server/blob/93165de1451548ff11dd32c3d3e5df0ff28cfcfa/sql/rpl_binlog_sender.cc#L1020-L1026
isEventPositionValid = ev.Header.LogPos != 0
case *replication.RowsQueryEvent:
// A RowsQueryEvent will always precede the corresponding RowsEvent
// if binlog_rows_query_log_events is enabled, and is used to get
// the full query that was executed on the master (with annotations)
// that is otherwise not possible to reconstruct
query = ev.Event.(*replication.RowsQueryEvent).Query
case *replication.RowsEvent:
err = s.handleRowsEvent(ev, query)
// if there is a handler associated with this eventType, call it
eventTypeString := ev.Header.EventType.String()
if handler, ok := s.eventHandlers[eventTypeString]; ok {
query, err = handler(ev, query, &es)
if err != nil {
s.logger.WithError(err).Error("failed to handle rows event")
s.logger.WithError(err).Error("failed to handle event")
s.ErrorHandler.Fatal("binlog_streamer", err)
pawandubey marked this conversation as resolved.
Show resolved Hide resolved
}
case *replication.XIDEvent, *replication.GTIDEvent:
// With regards to DMLs, we see (at least) the following sequence
// of events in the binlog stream:
//
// - GTIDEvent <- START of transaction
// - QueryEvent
// - RowsQueryEvent
// - TableMapEvent
// - RowsEvent
// - RowsEvent
// - XIDEvent <- END of transaction
//
// *NOTE*
//
// First, RowsQueryEvent is only available with `binlog_rows_query_log_events`
// set to "ON".
//
// Second, there will be at least one (but potentially more) RowsEvents
// depending on the number of rows updated in the transaction.
//
// Lastly, GTIDEvents will only be available if they are enabled.
//
// As a result, the following case will set the last resumable position for
// interruption to EITHER the start (if using GTIDs) or the end of the
// last transaction
isEventPositionResumable = true

// Here we also reset the query event as we are either at the beginning
// or the end of the current/next transaction. As such, the query will be
// reset following the next RowsQueryEvent before the corresponding RowsEvent(s)
query = nil
} else {
// call the default event handler for everything else
query, err = s.defaultEventHandler(ev, query, &es)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the defaultEventHandler fires events to the eventListeners, but if one were to override the eventHandler, eventListeners will no longer be called? This also means that the default event listener that handles the row event (by sending it to the BinlogWriter) will no longer be called. You would have to manually reregister handleRowEvent as a eventHandler? This feels very confusing and somewhat error prone? Are there any cases where we don't want to row events to be handled as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shivnagarajan and I discussed this briefly: the event listeners would only be missed if a custom handler for ROWS_EVENT was registered (overwriting the default handler behaviour). The problem is that the callers would not have access to s.eventListeners if they did want to fire them off themselves. That's probably okay since we are providing a hook into event handling and not a hook into the bodies of built-in event handlers themselves.

There are a few ways to tackle this, that I wanted to summarize:

  • Accept this limitation and note it in the documentation (possibly as a gotcha).
    This is the current approach. If there is demand for this, we can later come back and adopt one of the alternative approaches later.

  • Always firing off the eventListeners for ROWS_EVENT, even if a custom handler had been registered.
    This would solve the specific problem
    This solves the specific problem of firing off the listeners for ROWS_EVENT without increasing the surface area of the API exposed to the handler, but it will be a surprising, snowflake-y behaviour which cannot be turned off even if you wanted to.

  • Make eventListeners available to the callers inside the handler so that they can do with them as they wish.
    This solves the whole problem at the cost of muddling the handler signature with data unrelated to the event state. Moreover, the listeners are currently only be useful for one specific event but would need to be a part of the signature for handlers of all events (this is perhaps not too bad but still seems unnecessary).

  • Another approach not considered here?

I think we should go with the first option for now and revisit later if needed, FWIW.


Separately, I think if we wanted to be "truly" flexible in handler design then we should get to a place where all our default handlers could be registered as custom handlers instead of being implemented in a private fallback function. But we don't need to take on that complexity all at once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK the first option sounds OK for the time being.

I suspect this part of the code will need to be revisited if we ever look into a larger restructure of Ghostferry as I think the complexity is increasing with this change (and also would with any other changes proposed). Perhaps the callback system would need to be revamped 🤷 , but that's a problem for another day.

}

if isEventPositionValid {
if es.isEventPositionValid {
evType := fmt.Sprintf("%T", ev.Event)
evTimestamp := ev.Header.Timestamp
s.updateLastStreamedPosAndTime(evTimestamp, evPosition, evType, isEventPositionResumable)
s.updateLastStreamedPosAndTime(evTimestamp, es.evPosition, evType, es.isEventPositionResumable)
}
}
}

// Attach an event handler to a replication BinLogEvent
// We only support attaching events to any of the events defined in
// https://github.com/go-mysql-org/go-mysql/blob/master/replication/const.go
shivnagarajan marked this conversation as resolved.
Show resolved Hide resolved
// custom event handlers are provided the replication BinLogEvent and a state object
// that carries the current state of the binlog event stream.
func (s *BinlogStreamer) AddBinlogEventHandler(evType replication.EventType, eh func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error)) error {
// verify that event-type is valid
// if eventTypeString is unrecognized, bail
eventTypeString := evType.String()
if eventTypeString == "UnknownEvent" {
return errors.New("Unknown event type")
}

if s.eventHandlers == nil {
s.eventHandlers = make(map[string]func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error))
}
s.eventHandlers[eventTypeString] = eh
return nil
}

func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error) {
s.eventListeners = append(s.eventListeners, listener)
}
Expand Down
3 changes: 2 additions & 1 deletion sharding/test/trivial_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package test

import (
sql "github.com/Shopify/ghostferry/sqlwrapper"
"math/rand"
"testing"

sql "github.com/Shopify/ghostferry/sqlwrapper"

"github.com/Shopify/ghostferry/sharding"
"github.com/Shopify/ghostferry/testhelpers"
"github.com/stretchr/testify/assert"
Expand Down
15 changes: 15 additions & 0 deletions test/go/binlog_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/testhelpers"

"github.com/go-mysql-org/go-mysql/replication"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -195,6 +196,20 @@ func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsQueryEventOnRowsEvent
this.Require().True(eventAsserted)
}

func (this *BinlogStreamerTestSuite) TestBinlogStreamerAddEventHandlerEventTypes() {
qe := func(ev *replication.BinlogEvent, query []byte, es *ghostferry.BinlogEventState) ([]byte, error) {
return query, nil
}

// try attaching a handler to a valid event type
err := this.binlogStreamer.AddBinlogEventHandler(replication.TABLE_MAP_EVENT, qe)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noice!

this.Require().Nil(err)

// try attaching a handler to an invalid event type
err = this.binlogStreamer.AddBinlogEventHandler(replication.EventType(byte(0)), qe)
this.Require().NotNil(err)
}

func TestBinlogStreamerTestSuite(t *testing.T) {
testhelpers.SetupTest()
suite.Run(t, &BinlogStreamerTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}})
Expand Down
47 changes: 47 additions & 0 deletions test/integration/ddl_events_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
require "test_helper"

class DdlEventsTest < GhostferryTestCase
DDL_GHOSTFERRY = "ddl_ghostferry"

def test_default_event_handler
seed_simple_database_with_single_table

ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)

ghostferry.run_with_logs()

assert_ghostferry_completed(ghostferry, times: 1)
end

def test_ddl_event_handler
seed_simple_database_with_single_table

ghostferry = new_ghostferry(DDL_GHOSTFERRY)
ghostferry.run_with_logs()

assert_ghostferry_completed(ghostferry, times: 1)
end

def test_ddl_event_handler_with_ddl_events
seed_simple_database_with_single_table

table_name = full_table_name(DEFAULT_DB, DEFAULT_TABLE)

ghostferry = new_ghostferry(DDL_GHOSTFERRY)

ghostferry.on_status(GhostferryHelper::Ghostferry::Status::BINLOG_STREAMING_STARTED) do
source_db.query("INSERT INTO #{table_name} VALUES (9000, 'test')")
source_db.query("ALTER TABLE #{table_name} ADD INDEX (data(100))")
source_db.query("INSERT INTO #{table_name} (id, data) VALUES (9001, 'test')")
end

ghostferry.run_expecting_failure

source, target = source_and_target_table_metrics
source_count = source[DEFAULT_FULL_TABLE_NAME][:row_count]
target_count = target[DEFAULT_FULL_TABLE_NAME][:row_count]

refute_equal(source_count, target_count, "target should have fewer rows than source")

end
end
Loading