Skip to content

Commit

Permalink
Added table complete listener
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Nov 20, 2018
1 parent 6e4b28b commit 345f072
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
27 changes: 22 additions & 5 deletions data_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ type DataIterator struct {
CursorConfig *CursorConfig
StateTracker *StateTracker

targetPKs *sync.Map
batchListeners []func(*RowBatch) error
doneListeners []func() error
logger *logrus.Entry
targetPKs *sync.Map
batchListeners []func(*RowBatch) error
tableCompleteListeners []func(*schema.Table) error
doneListeners []func() error
logger *logrus.Entry
}

func (d *DataIterator) Run() {
Expand Down Expand Up @@ -114,6 +115,14 @@ func (d *DataIterator) Run() {

logger.Debug("table iteration completed")
d.StateTracker.MarkTableAsCompleted(table.String())

for _, listener := range d.tableCompleteListeners {
err := listener(table)
if err != nil {
logger.WithError(err).Error("failed to run table complete listener")
d.ErrorHandler.Fatal("data_iterator", err)
}
}
}
}()
}
Expand All @@ -127,7 +136,11 @@ func (d *DataIterator) Run() {

wg.Wait()
for _, listener := range d.doneListeners {
listener()
err := listener()
if err != nil {
d.logger.WithError(err).Error("failed to run done listener")
d.ErrorHandler.Fatal("data_iterator", err)
}
}
}

Expand All @@ -138,3 +151,7 @@ func (d *DataIterator) AddBatchListener(listener func(*RowBatch) error) {
func (d *DataIterator) AddDoneListener(listener func() error) {
d.doneListeners = append(d.doneListeners, listener)
}

func (d *DataIterator) AddTableCompleteListener(listener func(*schema.Table) error) {
d.tableCompleteListeners = append(d.tableCompleteListeners, listener)
}
6 changes: 6 additions & 0 deletions test/integration/go/integrationferry/integrationferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/testhelpers"
"github.com/siddontang/go-mysql/schema"
)

const (
Expand All @@ -36,6 +37,7 @@ const (
StatusAfterRowCopy string = "AFTER_ROW_COPY"
StatusBeforeBinlogApply string = "BEFORE_BINLOG_APPLY"
StatusAfterBinlogApply string = "AFTER_BINLOG_APPLY"
StatusTableComplete string = "TABLE_COMPLETE"
)

type IntegrationFerry struct {
Expand Down Expand Up @@ -113,6 +115,10 @@ func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, argumen
// Method override for Start in order to send status to the integration
// server.
func (f *IntegrationFerry) Start() error {
f.Ferry.DataIterator.AddTableCompleteListener(func(table *schema.Table) error {
return f.SendStatusAndWaitUntilContinue(StatusTableComplete, table.Name())
})

f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error {
return f.SendStatusAndWaitUntilContinue(StatusBeforeRowCopy, rowBatch.TableSchema().Name)
})
Expand Down
1 change: 1 addition & 0 deletions test/integration/ruby/ghostferry_integration/ghostferry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ module Status
AFTER_ROW_COPY = "AFTER_ROW_COPY"
BEFORE_BINLOG_APPLY = "BEFORE_BINLOG_APPLY"
AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY"
TABLE_COMPLETE = "TABLE_COMPLETE"
end

attr_reader :stdout, :stderr, :exit_status, :pid
Expand Down

0 comments on commit 345f072

Please sign in to comment.