Skip to content

Commit

Permalink
Pass the current BinlogPosition to DMLEvent
Browse files Browse the repository at this point in the history
Since the BinlogWriter is a separate goroutine to the BinlogStreamer,
the position that it wrote to is important if we want to interrupt and
resume Ghostferry (interrupt saves BinlogWriter position, and resume
starts the BinlogStreamer from the BinlogWriter position).

To allow for this, we need to pass the binlog position from the binlog
streamer, via the DMLEvent, to the binlog writer. This patch adds
BinlogPosition() to DMLEvent to make this possible.
  • Loading branch information
shuhaowu committed Sep 18, 2018
1 parent 5d6b1f5 commit bb97a4f
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 26 deletions.
14 changes: 13 additions & 1 deletion binlog_streamer.go
Expand Up @@ -226,12 +226,24 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent) error {
eventTime := time.Unix(int64(ev.Header.Timestamp), 0)
rowsEvent := ev.Event.(*replication.RowsEvent)

if ev.Header.LogPos == 0 {
// This shouldn't happen, as rows events always have a logpos.
s.logger.Panicf("logpos: %d %d %T", ev.Header.LogPos, ev.Header.Timestamp, ev.Event)
}

pos := mysql.Position{
// The filename is only changed and visible during the RotateEvent, which
// is handled transparently in Run().
Name: s.lastStreamedBinlogPosition.Name,
Pos: ev.Header.LogPos,
}

table := s.TableSchema.Get(string(rowsEvent.Table.Schema), string(rowsEvent.Table.Table))
if table == nil {
return nil
}

dmlEvs, err := NewBinlogDMLEvents(table, ev)
dmlEvs, err := NewBinlogDMLEvents(table, ev, pos)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions binlog_writer.go
Expand Up @@ -109,5 +109,8 @@ func (b *BinlogWriter) writeEvents(events []DMLEvent) error {
if err != nil {
return fmt.Errorf("exec query (%d bytes): %v", len(query), err)
}

// lastWrittenBinlogPosition = events[len(events)-1].BinlogPosition()

return nil
}
27 changes: 17 additions & 10 deletions dml_events.go
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/shopspring/decimal"

"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
)
Expand Down Expand Up @@ -42,13 +43,15 @@ type DMLEvent interface {
OldValues() RowData
NewValues() RowData
PK() (uint64, error)
BinlogPosition() mysql.Position
}

// The base of DMLEvent to provide the necessary methods.
// This desires a copy of the struct in case we want to deal with schema
// changes in the future.
type DMLEventBase struct {
table schema.Table
pos mysql.Position
}

func (e *DMLEventBase) Database() string {
Expand All @@ -63,18 +66,22 @@ func (e *DMLEventBase) TableSchema() *schema.Table {
return &e.table
}

func (e *DMLEventBase) BinlogPosition() mysql.Position {
return e.pos
}

type BinlogInsertEvent struct {
newValues RowData
*DMLEventBase
}

func NewBinlogInsertEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error) {
func NewBinlogInsertEvents(table *schema.Table, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
insertEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
insertEvents[i] = &BinlogInsertEvent{
newValues: row,
DMLEventBase: &DMLEventBase{table: *table},
DMLEventBase: &DMLEventBase{table: *table, pos: pos},
}
}

Expand Down Expand Up @@ -113,7 +120,7 @@ type BinlogUpdateEvent struct {
*DMLEventBase
}

func NewBinlogUpdateEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error) {
func NewBinlogUpdateEvents(table *schema.Table, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
// UPDATE events have two rows in the RowsEvent. The first row is the
// entries of the old record (for WHERE) and the second row is the
// entries of the new record (for SET).
Expand All @@ -129,7 +136,7 @@ func NewBinlogUpdateEvents(table *schema.Table, rowsEvent *replication.RowsEvent
updateEvents[i/2] = &BinlogUpdateEvent{
oldValues: row,
newValues: rowsEvent.Rows[i+1],
DMLEventBase: &DMLEventBase{table: *table},
DMLEventBase: &DMLEventBase{table: *table, pos: pos},
}
}

Expand Down Expand Up @@ -174,13 +181,13 @@ func (e *BinlogDeleteEvent) NewValues() RowData {
return nil
}

func NewBinlogDeleteEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error) {
func NewBinlogDeleteEvents(table *schema.Table, rowsEvent *replication.RowsEvent, pos mysql.Position) ([]DMLEvent, error) {
deleteEvents := make([]DMLEvent, len(rowsEvent.Rows))

for i, row := range rowsEvent.Rows {
deleteEvents[i] = &BinlogDeleteEvent{
oldValues: row,
DMLEventBase: &DMLEventBase{table: *table},
DMLEventBase: &DMLEventBase{table: *table, pos: pos},
}
}

Expand All @@ -203,7 +210,7 @@ func (e *BinlogDeleteEvent) PK() (uint64, error) {
return pkFromEventData(&e.table, e.oldValues)
}

func NewBinlogDMLEvents(table *schema.Table, ev *replication.BinlogEvent) ([]DMLEvent, error) {
func NewBinlogDMLEvents(table *schema.Table, ev *replication.BinlogEvent, pos mysql.Position) ([]DMLEvent, error) {
rowsEvent := ev.Event.(*replication.RowsEvent)

for _, row := range rowsEvent.Rows {
Expand Down Expand Up @@ -236,11 +243,11 @@ func NewBinlogDMLEvents(table *schema.Table, ev *replication.BinlogEvent) ([]DML

switch ev.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
return NewBinlogInsertEvents(table, rowsEvent)
return NewBinlogInsertEvents(table, rowsEvent, pos)
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
return NewBinlogDeleteEvents(table, rowsEvent)
return NewBinlogDeleteEvents(table, rowsEvent, pos)
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
return NewBinlogUpdateEvents(table, rowsEvent)
return NewBinlogUpdateEvents(table, rowsEvent, pos)
default:
return nil, fmt.Errorf("unrecognized rows event: %s", ev.Header.EventType.String())
}
Expand Down
5 changes: 3 additions & 2 deletions sharding/test/copy_filter_test.go
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/sharding"

"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -129,15 +130,15 @@ func (t *CopyFilterTestSuite) TestShardingValueTypes() {
}

for _, tenantId := range tenantIds {
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}))
dmlEvents, _ := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, tenantId, "data"}), mysql.Position{})
applicable, err := t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Nil(err)
t.Require().True(applicable, fmt.Sprintf("value %t wasn't applicable", tenantId))
}
}

func (t *CopyFilterTestSuite) TestInvalidShardingValueTypesErrors() {
dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}))
dmlEvents, err := ghostferry.NewBinlogInsertEvents(t.normalTable, t.newRowsEvent([]interface{}{1001, string("1"), "data"}), mysql.Position{})
_, err = t.filter.ApplicableEvent(dmlEvents[0])
t.Require().Equal("parsing new sharding key: invalid type %!t(string=1)", err.Error())
}
Expand Down
47 changes: 45 additions & 2 deletions test/binlog_streamer_test.go
Expand Up @@ -2,6 +2,8 @@ package test

import (
"database/sql"
"strings"
"sync"
"testing"

"github.com/Shopify/ghostferry"
Expand All @@ -15,6 +17,7 @@ type BinlogStreamerTestSuite struct {

config *ghostferry.Config
binlogStreamer *ghostferry.BinlogStreamer
sourceDb *sql.DB
}

func (this *BinlogStreamerTestSuite) SetupTest() {
Expand All @@ -26,16 +29,27 @@ func (this *BinlogStreamerTestSuite) SetupTest() {
this.Require().Nil(err)

sourceDSN := sourceConfig.FormatDSN()
sourceDb, err := sql.Open("mysql", sourceDSN)
this.sourceDb, err = sql.Open("mysql", sourceDSN)
if err != nil {
this.Fail("failed to connect to source database")
}

testhelpers.SeedInitialData(this.sourceDb, "gftest", "test_table_1", 0)
tableSchemaCache, err := ghostferry.LoadTables(
this.sourceDb,
&testhelpers.TestTableFilter{
DbsFunc: testhelpers.DbApplicabilityFilter([]string{testhelpers.TestSchemaName}),
TablesFunc: nil,
},
)
this.Require().Nil(err)

this.binlogStreamer = &ghostferry.BinlogStreamer{
Db: sourceDb,
Db: this.sourceDb,
Config: testFerry.Config,
ErrorHandler: testFerry.ErrorHandler,
Filter: testFerry.CopyFilter,
TableSchema: tableSchemaCache,
}

this.Require().Nil(this.binlogStreamer.Initialize())
Expand Down Expand Up @@ -70,6 +84,35 @@ func (this *BinlogStreamerTestSuite) TestConnectErrorsOutIfErrorInServerIdGenera
this.Require().Zero(this.binlogStreamer.Config.MyServerId)
}

func (this *BinlogStreamerTestSuite) TestBinlogStreamerSetsBinlogPositionOnDMLEvent() {
err := this.binlogStreamer.ConnectBinlogStreamerToMysql()
this.Require().Nil(err)

eventAsserted := false

this.binlogStreamer.AddEventListener(func(evs []ghostferry.DMLEvent) error {
eventAsserted = true
this.Require().Equal(1, len(evs))
this.Require().True(strings.HasPrefix(evs[0].BinlogPosition().Name, "mysql-bin."))
this.Require().True(evs[0].BinlogPosition().Pos > 0)
this.binlogStreamer.FlushAndStop()
return nil
})

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
this.binlogStreamer.Run()
}()

_, err = this.sourceDb.Exec("INSERT INTO gftest.test_table_1 VALUES (null, 'testdata')")
this.Require().Nil(err)

wg.Wait()
this.Require().True(eventAsserted)
}

func TestBinlogStreamerTestSuite(t *testing.T) {
testhelpers.SetupTest()
suite.Run(t, &BinlogStreamerTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}})
Expand Down
23 changes: 12 additions & 11 deletions test/dml_events_test.go
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/Shopify/ghostferry"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() {
},
}

dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent)
dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{})
this.Require().Nil(err)
this.Require().Equal(2, len(dmlEvents))

Expand All @@ -75,7 +76,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsErro
Rows: [][]interface{}{{1000}},
}

dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent)
dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{})
this.Require().Nil(err)
this.Require().Equal(1, len(dmlEvents))

Expand All @@ -90,7 +91,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventMetadata() {
Rows: [][]interface{}{{1000}},
}

dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent)
dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.sourceTable, rowsEvent, mysql.Position{})
this.Require().Nil(err)
this.Require().Equal(1, len(dmlEvents))
this.Require().Equal("test_schema", dmlEvents[0].Database())
Expand All @@ -110,7 +111,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventGeneratesUpdateQuery() {
},
}

dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent)
dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{})
this.Require().Nil(err)
this.Require().Equal(2, len(dmlEvents))

Expand All @@ -129,7 +130,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithWrongColumnsReturnsErro
Rows: [][]interface{}{{1000}, {1000}},
}

dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent)
dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{})
this.Require().Nil(err)
this.Require().Equal(1, len(dmlEvents))

Expand All @@ -147,7 +148,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventWithNull() {
},
}

dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent)
dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{})
this.Require().Nil(err)
this.Require().Equal(1, len(dmlEvents))

Expand All @@ -162,7 +163,7 @@ func (this *DMLEventsTestSuite) TestBinlogUpdateEventMetadata() {
Rows: [][]interface{}{{1000}, {1001}},
}

dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent)
dmlEvents, err := ghostferry.NewBinlogUpdateEvents(this.sourceTable, rowsEvent, mysql.Position{})
this.Require().Nil(err)
this.Require().Equal(1, len(dmlEvents))
this.Require().Equal("test_schema", dmlEvents[0].Database())
Expand All @@ -180,7 +181,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventGeneratesDeleteQuery() {
},
}

dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent)
dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{})
this.Require().Nil(err)
this.Require().Equal(2, len(dmlEvents))

Expand All @@ -201,7 +202,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventWithNull() {
},
}

dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent)
dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{})
this.Require().Nil(err)
this.Require().Equal(1, len(dmlEvents))

Expand All @@ -216,7 +217,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventWithWrongColumnsReturnsErro
Rows: [][]interface{}{{1000}},
}

dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent)
dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{})
this.Require().Nil(err)
this.Require().Equal(1, len(dmlEvents))

Expand All @@ -231,7 +232,7 @@ func (this *DMLEventsTestSuite) TestBinlogDeleteEventMetadata() {
Rows: [][]interface{}{{1000}},
}

dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent)
dmlEvents, err := ghostferry.NewBinlogDeleteEvents(this.sourceTable, rowsEvent, mysql.Position{})
this.Require().Nil(err)
this.Require().Equal(1, len(dmlEvents))
this.Require().Equal("test_schema", dmlEvents[0].Database())
Expand Down

0 comments on commit bb97a4f

Please sign in to comment.