Skip to content

Commit

Permalink
Added completed table case to integration test
Browse files Browse the repository at this point in the history
When a table is completed and we resume, Ghostferry should not error.
  • Loading branch information
shuhaowu committed Nov 20, 2018
1 parent 345f072 commit a456cd5
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 3 deletions.
31 changes: 31 additions & 0 deletions test/integration/cases/trivial_integration_tests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,35 @@ def test_interrupt_resume_with_writes_to_source
assert_test_table_is_identical
end
end

def test_interrupt_resume_when_table_has_completed
@dbs.seed_simple_database_with_single_table
dumped_state = nil

results = @dbs.source.query("SELECT COUNT(*) as cnt FROM #{GhostferryIntegration::DbManager::DEFAULT_FULL_TABLE_NAME}")
rows = results.first["cnt"]

with_state_cleanup do
use_datawriter

@ghostferry.on_status(Ghostferry::Status::TABLE_COMPLETE) do |table_name|
assert_equal(
"#{GhostferryIntegration::DbManager::DEFAULT_DB}.#{GhostferryIntegration::DbManager::DEFAULT_TABLE}",
table_name
)

@ghostferry.send_signal("TERM")
end

dumped_state = @ghostferry.run_expecting_interrupt
assert_basic_fields_exist_in_dumped_state(dumped_state)
end

with_state_cleanup do
use_datawriter
@ghostferry.run(dumped_state)

assert_test_table_is_identical
end
end
end
2 changes: 1 addition & 1 deletion test/integration/go/integrationferry/integrationferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, argumen
// server.
func (f *IntegrationFerry) Start() error {
f.Ferry.DataIterator.AddTableCompleteListener(func(table *schema.Table) error {
return f.SendStatusAndWaitUntilContinue(StatusTableComplete, table.Name())
return f.SendStatusAndWaitUntilContinue(StatusTableComplete, table.String())
})

f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error {
Expand Down
11 changes: 10 additions & 1 deletion test/integration/ruby/ghostferry_integration/ghostferry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,16 @@ def start_server
status = data.shift

@status_handlers[status].call(*data) unless @status_handlers[status].nil?
socket.write(CONTINUE)
begin
socket.write(CONTINUE)
rescue Errno::EPIPE
# It is possible for the status handler to kill Ghostferry.
# In such scenarios, this write may result in a broken pipe as
# the socket is closed.
#
# We rescue this and move on.
@logger.debug("can't send CONTINUE due to broken pipe")
end

reads.delete(socket)
end
Expand Down
2 changes: 1 addition & 1 deletion test/integration/ruby/ghostferry_integration/test_case.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TestCase < Minitest::Test

def before_all
@logger = Logger.new(STDOUT)
@logger.level = Logger::INFO
@logger.level = Logger::DEBUG
@ghostferry = Ghostferry.new(ghostferry_main_path, logger: @logger)
end

Expand Down

0 comments on commit a456cd5

Please sign in to comment.