From b8fb91a8497626e8d626a5d03a5d2f6c5e7c5d8c Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Mon, 21 Jan 2019 11:19:33 -0500 Subject: [PATCH] Updated for comments --- test/helpers/ghostferry_helper.rb | 99 +++++++++++-------- test/integration/interrupt_resume_test.rb | 30 +++++- .../integrationferry.go | 29 +++++- test/lib/go/minimal.go | 28 ------ test/test_helper.rb | 10 +- 5 files changed, 113 insertions(+), 83 deletions(-) rename test/lib/go/{integrationferry => }/integrationferry.go (90%) delete mode 100644 test/lib/go/minimal.go diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index 01506117..6d78f0bf 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -31,7 +31,6 @@ class Ghostferry # Keep these in sync with integrationferry.go ENV_KEY_PORT = "GHOSTFERRY_INTEGRATION_PORT" - MAX_MESSAGE_SIZE = 256 module Status # This should be in sync with integrationferry.go @@ -49,22 +48,14 @@ module Status attr_reader :stdout, :stderr, :exit_status, :pid def initialize(main_path, logger: nil, message_timeout: 30, port: 39393) - @main_path = main_path - @message_timeout = message_timeout @logger = logger if @logger.nil? @logger = Logger.new(STDOUT) @logger.level = Logger::DEBUG end - FileUtils.mkdir_p(GHOSTFERRY_TEMPDIR, mode: 0700) - - # full name relative to the ghostferry root dir, with / replaced with _ - # and the extension stripped. - full_path = File.absolute_path(@main_path) - full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory - binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_") - @compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name) + @main_path = main_path + @message_timeout = message_timeout @status_handlers = {} @@ -80,14 +71,54 @@ def initialize(main_path, logger: nil, message_timeout: 30, port: 39393) @exit_status = nil @stdout = [] @stderr = [] + + # Setup the directory to the compiled binary under the system temporary + # directory. + FileUtils.mkdir_p(GHOSTFERRY_TEMPDIR, mode: 0700) + + # To guarentee that the compiled binary will have an unique name, we use + # the full path of the file relative to the ghostferry root directory as + # the binary name. In order to avoid having / in the binary name all / in + # the full path is replaced with _. The .go extension is also stripped to + # avoid confusion. + full_path = File.absolute_path(@main_path) + full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory + binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_") + @compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name) end - def on_status(status, &block) - raise "must specify a block" unless block_given? - @status_handlers[status] ||= [] - @status_handlers[status] << block + # The main method to call to run a Ghostferry subprocess. + def run(resuming_state = nil) + resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) + + compile_binary + start_server + start_ghostferry(resuming_state) + start_server_watchdog + + @subprocess_thread.join + @server_watchdog_thread.join + @server_thread.join + ensure + kill + raise @server_last_error unless @server_last_error.nil? end + # When using this method, you need to ensure that the datawriter has been + # stopped properly (if you're using stop_datawriter_during_cutover). + def run_expecting_interrupt(resuming_state = nil) + run(resuming_state) + rescue GhostferryExitFailure + dumped_state = @stdout.join("") + JSON.parse(dumped_state) + else + raise "Ghostferry did not get interrupted" + end + + ###################################################### + # Methods representing the different stages of `run` # + ###################################################### + def compile_binary return if File.exist?(@compiled_binary_path) @@ -220,6 +251,16 @@ def start_server_watchdog end end + ################### + # Utility methods # + ################### + + def on_status(status, &block) + raise "must specify a block" unless block_given? + @status_handlers[status] ||= [] + @status_handlers[status] << block + end + def send_signal(signal) Process.kill(signal, @pid) if @pid != 0 end @@ -239,33 +280,5 @@ def kill # ignore end end - - def run(resuming_state = nil) - resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) - - compile_binary - start_server - start_ghostferry(resuming_state) - start_server_watchdog - - @subprocess_thread.join - @server_watchdog_thread.join - @server_thread.join - ensure - kill - raise @server_last_error unless @server_last_error.nil? - end - - # When using this method, you need to call it within the block of - # GhostferryIntegration::TestCase#with_state_cleanup to ensure that the - # integration server is shutdown properly. - def run_expecting_interrupt(resuming_state = nil) - run(resuming_state) - rescue GhostferryExitFailure - dumped_state = @stdout.join("") - JSON.parse(dumped_state) - else - raise "Ghostferry did not get interrupted" - end end end diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 2156912d..894766a0 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -3,9 +3,32 @@ require "json" class InterruptResumeTest < GhostferryTestCase - def test_interrupt_resume_with_writes_to_source + def setup seed_simple_database_with_single_table + end + + def test_interrupt_resume_without_writes_to_source_to_check_target_state_when_interrupted + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + # Writes one batch + ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + ghostferry.send_signal("TERM") + end + + dumped_state = ghostferry.run_expecting_interrupt + assert_basic_fields_exist_in_dumped_state(dumped_state) + + result = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}") + count = result.first["cnt"] + assert_equal 200, count + result = target_db.query("SELECT MAX(id) AS max_id FROM #{DEFAULT_FULL_TABLE_NAME}") + last_successful_id = result.first["max_id"] + assert last_successful_id > 0 + assert_equal last_successful_id, dumped_state["LastSuccessfulPrimaryKeys"]["#{DEFAULT_DB}.#{DEFAULT_TABLE}"] + end + + def test_interrupt_resume_with_writes_to_source # Start a ghostferry run expecting it to be interrupted. datawriter = new_source_datawriter ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) @@ -36,8 +59,6 @@ def test_interrupt_resume_with_writes_to_source end def test_interrupt_resume_when_table_has_completed - seed_simple_database_with_single_table - # Start a run of Ghostferry expecting to be interrupted datawriter = new_source_datawriter ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) @@ -58,6 +79,9 @@ def test_interrupt_resume_when_table_has_completed start_datawriter_with_ghostferry(datawriter, ghostferry) stop_datawriter_during_cutover(datawriter, ghostferry) + ghostferry.run(dumped_state) + + assert_test_table_is_identical end end diff --git a/test/lib/go/integrationferry/integrationferry.go b/test/lib/go/integrationferry.go similarity index 90% rename from test/lib/go/integrationferry/integrationferry.go rename to test/lib/go/integrationferry.go index 94211102..055a852a 100644 --- a/test/lib/go/integrationferry/integrationferry.go +++ b/test/lib/go/integrationferry.go @@ -1,4 +1,4 @@ -package integrationferry +package main import ( "encoding/json" @@ -12,13 +12,13 @@ import ( "github.com/Shopify/ghostferry" "github.com/Shopify/ghostferry/testhelpers" + "github.com/sirupsen/logrus" ) const ( // These should be kept in sync with ghostferry.rb - portEnvName string = "GHOSTFERRY_INTEGRATION_PORT" - timeout time.Duration = 30 * time.Second - maxMessageSize int = 256 + portEnvName string = "GHOSTFERRY_INTEGRATION_PORT" + timeout time.Duration = 30 * time.Second ) const ( @@ -194,3 +194,24 @@ func NewStandardConfig() (*ghostferry.Config, error) { return config, config.ValidateConfig() } + +func main() { + logrus.SetFormatter(&logrus.JSONFormatter{}) + logrus.SetLevel(logrus.DebugLevel) + + config, err := NewStandardConfig() + if err != nil { + panic(err) + } + + f := &IntegrationFerry{ + Ferry: &ghostferry.Ferry{ + Config: config, + }, + } + + err = f.Main() + if err != nil { + panic(err) + } +} diff --git a/test/lib/go/minimal.go b/test/lib/go/minimal.go deleted file mode 100644 index 3220d359..00000000 --- a/test/lib/go/minimal.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "github.com/Shopify/ghostferry" - "github.com/Shopify/ghostferry/test/lib/go/integrationferry" - "github.com/sirupsen/logrus" -) - -func main() { - logrus.SetFormatter(&logrus.JSONFormatter{}) - logrus.SetLevel(logrus.DebugLevel) - - config, err := integrationferry.NewStandardConfig() - if err != nil { - panic(err) - } - - f := &integrationferry.IntegrationFerry{ - Ferry: &ghostferry.Ferry{ - Config: config, - }, - } - - err = f.Main() - if err != nil { - panic(err) - } -} diff --git a/test/test_helper.rb b/test/test_helper.rb index 409979ba..94c3439b 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -19,7 +19,7 @@ class GhostferryTestCase < Minitest::Test include DbHelper include DataWriterHelper - MINIMAL_GHOSTFERRY = "minimal.go" + MINIMAL_GHOSTFERRY = "integrationferry.go" def new_ghostferry(filename) # Transform path to something ruby understands @@ -83,13 +83,13 @@ def assert_test_table_is_identical assert target[DEFAULT_FULL_TABLE_NAME][:row_count] > 0 assert_equal( - source[DEFAULT_FULL_TABLE_NAME][:checksum], - target[DEFAULT_FULL_TABLE_NAME][:checksum], + source[DEFAULT_FULL_TABLE_NAME][:row_count], + target[DEFAULT_FULL_TABLE_NAME][:row_count], ) assert_equal( - source[DEFAULT_FULL_TABLE_NAME][:sample_row], - target[DEFAULT_FULL_TABLE_NAME][:sample_row], + source[DEFAULT_FULL_TABLE_NAME][:checksum], + target[DEFAULT_FULL_TABLE_NAME][:checksum], ) end