Skip to content

Commit

Permalink
Added completed table case to integration test
Browse files Browse the repository at this point in the history
When a table is completed and we resume, Ghostferry should not error.
  • Loading branch information
shuhaowu committed Nov 20, 2018
1 parent b53e6b7 commit 3f45878
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
31 changes: 31 additions & 0 deletions test/integration/cases/trivial_integration_tests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,35 @@ def test_interrupt_resume_with_writes_to_source
assert_test_table_is_identical
end
end

def test_interrupt_resume_when_table_has_completed
@dbs.seed_simple_database_with_single_table
dumped_state = nil

results = @dbs.source.query("SELECT COUNT(*) as cnt FROM #{GhostferryIntegration::DbManager::DEFAULT_FULL_TABLE_NAME}")
rows = results.first["cnt"]

with_state_cleanup do
use_datawriter

@ghostferry.on_status(Ghostferry::Status::TABLE_COMPLETE) do |table_name|
assert_equal(
"#{GhostferryIntegration::DbManager::DEFAULT_DB}.#{GhostferryIntegration::DbManager::DEFAULT_TABLE}",
table_name
)

@ghostferry.send_signal("TERM")
end

dumped_state = @ghostferry.run_expecting_interrupt
assert_basic_fields_exist_in_dumped_state(dumped_state)
end

with_state_cleanup do
use_datawriter
@ghostferry.run(dumped_state)

assert_test_table_is_identical
end
end
end
4 changes: 2 additions & 2 deletions test/integration/go/integrationferry/integrationferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (f *IntegrationFerry) send(conn net.Conn, status string, arguments ...strin
return fmt.Errorf("message %v is greater than maxMessageSize %v", arguments, maxMessageSize)
}

_, err := conn.Write([]byte(data))
_, err := conn.Write(data)
return err
}

Expand Down Expand Up @@ -116,7 +116,7 @@ func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, argumen
// server.
func (f *IntegrationFerry) Start() error {
f.Ferry.DataIterator.AddTableCompleteListener(func(table *schema.Table) error {
return f.SendStatusAndWaitUntilContinue(StatusTableComplete, table.Name())
return f.SendStatusAndWaitUntilContinue(StatusTableComplete, table.String())
})

f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error {
Expand Down
11 changes: 10 additions & 1 deletion test/integration/ruby/ghostferry_integration/ghostferry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,16 @@ def start_server
status = data.shift

@status_handlers[status].call(*data) unless @status_handlers[status].nil?
socket.write(CONTINUE)
begin
socket.write(CONTINUE)
rescue Errno::EPIPE
# It is possible for the status handler to kill Ghostferry.
# In such scenarios, this write may result in a broken pipe as
# the socket is closed.
#
# We rescue this and move on.
@logger.debug("can't send CONTINUE due to broken pipe")
end

reads.delete(socket)
end
Expand Down

0 comments on commit 3f45878

Please sign in to comment.