From 2b25a1d9c1e74850e5d4ee0363132a26af6d85c9 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Mon, 19 Nov 2018 21:45:45 -0500 Subject: [PATCH] Added completed table case to integration test When a table is completed and we resume, Ghostferry should not error. --- .../cases/trivial_integration_tests.rb | 31 +++++++++++++++++++ .../go/integrationferry/integrationferry.go | 4 +-- .../ruby/ghostferry_integration/ghostferry.rb | 11 ++++++- 3 files changed, 43 insertions(+), 3 deletions(-) diff --git a/test/integration/cases/trivial_integration_tests.rb b/test/integration/cases/trivial_integration_tests.rb index 5d24b6f9..a5d054ef 100644 --- a/test/integration/cases/trivial_integration_tests.rb +++ b/test/integration/cases/trivial_integration_tests.rb @@ -48,4 +48,35 @@ def test_interrupt_resume_with_writes_to_source assert_test_table_is_identical end end + + def test_interrupt_resume_when_table_has_completed + @dbs.seed_simple_database_with_single_table + dumped_state = nil + + results = @dbs.source.query("SELECT COUNT(*) as cnt FROM #{GhostferryIntegration::DbManager::DEFAULT_FULL_TABLE_NAME}") + rows = results.first["cnt"] + + with_state_cleanup do + use_datawriter + + @ghostferry.on_status(Ghostferry::Status::TABLE_COMPLETE) do |table_name| + assert_equal( + "#{GhostferryIntegration::DbManager::DEFAULT_DB}.#{GhostferryIntegration::DbManager::DEFAULT_TABLE}", + table_name + ) + + @ghostferry.send_signal("TERM") + end + + dumped_state = @ghostferry.run_expecting_interrupt + assert_basic_fields_exist_in_dumped_state(dumped_state) + end + + with_state_cleanup do + use_datawriter + @ghostferry.run(dumped_state) + + assert_test_table_is_identical + end + end end diff --git a/test/integration/go/integrationferry/integrationferry.go b/test/integration/go/integrationferry/integrationferry.go index 35ff134b..5b7ac7c5 100644 --- a/test/integration/go/integrationferry/integrationferry.go +++ b/test/integration/go/integrationferry/integrationferry.go @@ -66,7 +66,7 @@ func (f *IntegrationFerry) send(conn net.Conn, status string, arguments ...strin return fmt.Errorf("message %v is greater than maxMessageSize %v", arguments, maxMessageSize) } - _, err := conn.Write([]byte(data)) + _, err := conn.Write(data) return err } @@ -116,7 +116,7 @@ func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, argumen // server. func (f *IntegrationFerry) Start() error { f.Ferry.DataIterator.AddTableCompleteListener(func(table *schema.Table) error { - return f.SendStatusAndWaitUntilContinue(StatusTableComplete, table.Name()) + return f.SendStatusAndWaitUntilContinue(StatusTableComplete, table.String()) }) f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error { diff --git a/test/integration/ruby/ghostferry_integration/ghostferry.rb b/test/integration/ruby/ghostferry_integration/ghostferry.rb index 747ab5ae..8110cc33 100644 --- a/test/integration/ruby/ghostferry_integration/ghostferry.rb +++ b/test/integration/ruby/ghostferry_integration/ghostferry.rb @@ -142,7 +142,16 @@ def start_server status = data.shift @status_handlers[status].call(*data) unless @status_handlers[status].nil? - socket.write(CONTINUE) + begin + socket.write(CONTINUE) + rescue Errno::EPIPE + # It is possible for the status handler to kill Ghostferry. + # In such scenarios, this write may result in a broken pipe as + # the socket is closed. + # + # We rescue this and move on. + @logger.debug("can't send CONTINUE due to broken pipe") + end reads.delete(socket) end