diff --git a/test/integration/Gemfile b/Gemfile similarity index 86% rename from test/integration/Gemfile rename to Gemfile index cac02822..e444bd32 100644 --- a/test/integration/Gemfile +++ b/Gemfile @@ -3,3 +3,5 @@ source "https://rubygems.org" gem "minitest" gem "minitest-hooks" gem "mysql2" + +gem "rake" diff --git a/test/integration/Gemfile.lock b/Gemfile.lock similarity index 90% rename from test/integration/Gemfile.lock rename to Gemfile.lock index 265557af..e9b0961f 100644 --- a/test/integration/Gemfile.lock +++ b/Gemfile.lock @@ -5,6 +5,7 @@ GEM minitest-hooks (1.5.0) minitest (> 5.3) mysql2 (0.5.2) + rake (12.3.2) PLATFORMS ruby @@ -13,6 +14,7 @@ DEPENDENCIES minitest minitest-hooks mysql2 + rake BUNDLED WITH 1.16.1 diff --git a/Rakefile b/Rakefile new file mode 100644 index 00000000..fba6a93c --- /dev/null +++ b/Rakefile @@ -0,0 +1,9 @@ +require 'rake/testtask' + +task :default => [:test] + +Rake::TestTask.new do |t| + t.test_files = FileList['test/**/*test.rb'] + t.verbose = true + t.libs << ["test", "test/helpers", "test/lib"] +end diff --git a/test/integration/ruby/ghostferry_integration/data_writer.rb b/test/helpers/data_writer_helper.rb similarity index 77% rename from test/integration/ruby/ghostferry_integration/data_writer.rb rename to test/helpers/data_writer_helper.rb index 1c5f7ea9..591a0f3b 100644 --- a/test/integration/ruby/ghostferry_integration/data_writer.rb +++ b/test/helpers/data_writer_helper.rb @@ -1,16 +1,32 @@ require "logger" require "thread" +require "db_helper" +require "ghostferry_helper" require "mysql2" -module GhostferryIntegration +module DataWriterHelper + def start_datawriter_with_ghostferry(dw, gf, &on_write) + gf.on_status(GhostferryHelper::Ghostferry::Status::READY) do + dw.start(&on_write) + end + end + + def stop_datawriter_during_cutover(dw, gf) + gf.on_status(GhostferryHelper::Ghostferry::Status::ROW_COPY_COMPLETED) do + # At the start of the cutover phase, we have to set the database to + # read-only. This is done by stopping the datawriter. + dw.stop_and_join + end + end + class DataWriter # A threaded data writer that just hammers the database with write # queries as much as possible. # # This is used essentially for random testing. def initialize(db_config, - tables: [DbManager::DEFAULT_FULL_TABLE_NAME], + tables: [DbHelper::DEFAULT_FULL_TABLE_NAME], insert_probability: 0.33, update_probability: 0.33, delete_probability: 0.34, @@ -26,6 +42,7 @@ def initialize(db_config, @delete_probability = [@update_probability[1], @update_probability[1] + delete_probability] @threads = [] + @started = false @stop_requested = false @logger = logger @@ -36,6 +53,8 @@ def initialize(db_config, end def start(&on_write) + raise "Cannot start DataWriter multiple times. Use a new instance instead " if @started + @started = true @number_of_writers.times do |i| @threads << Thread.new do @logger.info("starting data writer thread #{i}") @@ -50,8 +69,9 @@ def start(&on_write) end end - def stop + def stop_and_join @stop_requested = true + join end def join @@ -81,7 +101,7 @@ def write_data(connection, &on_write) def insert_data(connection) table = @tables.sample insert_statement = connection.prepare("INSERT INTO #{table} (id, data) VALUES (?, ?)") - insert_statement.execute(nil, GhostferryIntegration.rand_data) + insert_statement.execute(nil, DbHelper.rand_data) connection.last_id end @@ -89,7 +109,7 @@ def update_data(connection) table = @tables.sample id = random_real_id(connection, table) update_statement = connection.prepare("UPDATE #{table} SET data = ? WHERE id >= ? LIMIT 1") - update_statement.execute(GhostferryIntegration.rand_data, id) + update_statement.execute(DbHelper.rand_data, id) id end diff --git a/test/helpers/db_helper.rb b/test/helpers/db_helper.rb new file mode 100644 index 00000000..a8c8e61d --- /dev/null +++ b/test/helpers/db_helper.rb @@ -0,0 +1,150 @@ +require "logger" +require "mysql2" + +module DbHelper + ALPHANUMERICS = ("0".."9").to_a + ("a".."z").to_a + ("A".."Z").to_a + DB_PORTS = {source: 29291, target: 29292} + + DEFAULT_DB = "gftest" + DEFAULT_TABLE = "test_table_1" + + def self.full_table_name(db, table) + "`#{db}`.`#{table}`" + end + + def self.rand_data(length: 32) + ALPHANUMERICS.sample(length).join("") + "👻⛴️" + end + + DEFAULT_FULL_TABLE_NAME = full_table_name(DEFAULT_DB, DEFAULT_TABLE) + + def full_table_name(db, table) + DbHelper.full_table_name(db, table) + end + + def rand_data(length: 32) + DbHelper.rand_data(length: length) + end + + def default_db_config(port:) + { + host: "127.0.0.1", + port: port, + username: "root", + password: "", + encoding: "utf8mb4", + collation: "utf8mb4_unicode_ci", + } + end + + def transaction(connection) + raise ArgumentError, "must pass a block" if !block_given? + + begin + connection.query("BEGIN") + yield + rescue + connection.query("ROLLBACK") + raise + else + connection.query("COMMIT") + end + end + + def initialize_db_connections + @connections = {} + DB_PORTS.each do |name, port| + @connections[name] = Mysql2::Client.new(default_db_config(port: port)) + end + end + + def source_db + @connections[:source] + end + + def target_db + @connections[:target] + end + + def source_db_config + default_db_config(port: DB_PORTS[:source]) + end + + def target_db_config + default_db_config(port: DB_PORTS[:target]) + end + + # Database Seeding Methods + ########################## + # Each test case can choose what kind of database it wants to setup by + # calling one of these methods. + + def reset_data + @connections.each do |_, connection| + connection.query("DROP DATABASE IF EXISTS `#{DEFAULT_DB}`") + end + end + + def seed_random_data(connection, database_name: DEFAULT_DB, table_name: DEFAULT_TABLE, number_of_rows: 1111) + dbtable = full_table_name(database_name, table_name) + + connection.query("CREATE DATABASE IF NOT EXISTS #{database_name}") + connection.query("CREATE TABLE IF NOT EXISTS #{dbtable} (id bigint(20) not null auto_increment, data TEXT, primary key(id))") + + transaction(connection) do + insert_statement = connection.prepare("INSERT INTO #{dbtable} (id, data) VALUES (?, ?)") + + number_of_rows.times do + insert_statement.execute(nil, rand_data) + end + end + end + + def seed_simple_database_with_single_table + # Setup the source database with data. + max_id = 1111 + seed_random_data(source_db, number_of_rows: max_id) + + # Create some holes in the data. + delete_statement = source_db.prepare("DELETE FROM #{full_table_name(DEFAULT_DB, DEFAULT_TABLE)} WHERE id = ?") + 140.times do + delete_statement.execute(Random.rand(max_id) + 1) + end + + # Setup the target database with no data but the correct schema. + seed_random_data(target_db, number_of_rows: 0) + end + + # Get some overall metrics like CHECKSUM, row count, sample row from tables. + # Generally used for test validation. + def source_and_target_table_metrics(tables: [DEFAULT_FULL_TABLE_NAME]) + source_metrics = {} + target_metrics = {} + + tables.each do |table| + source_metrics[table] = table_metric(source_db, table) + target_metrics[table] = table_metric(target_db, table, sample_id: source_metrics[table][:sample_row]["id"]) + end + + [source_metrics, target_metrics] + end + + def table_metric(conn, table, sample_id: nil) + metrics = {} + result = conn.query("CHECKSUM TABLE #{table}") + metrics[:checksum] = result.first["Checksum"] + + result = conn.query("SELECT COUNT(*) AS cnt FROM #{table}") + metrics[:row_count] = result.first["cnt"] + + if sample_id.nil? + result = conn.query("SELECT * FROM #{table} ORDER BY RAND() LIMIT 1") + metrics[:sample_row] = result.first + else + result = conn.query("SELECT * FROM #{table} WHERE id = #{sample_id} LIMIT 1") + metrics[:sample_row] = result.first + end + + metrics + end +end diff --git a/test/integration/ruby/ghostferry_integration/ghostferry.rb b/test/helpers/ghostferry_helper.rb similarity index 92% rename from test/integration/ruby/ghostferry_integration/ghostferry.rb rename to test/helpers/ghostferry_helper.rb index ffb18702..fc51fd84 100644 --- a/test/integration/ruby/ghostferry_integration/ghostferry.rb +++ b/test/helpers/ghostferry_helper.rb @@ -1,3 +1,4 @@ +require "fileutils" require "json" require "logger" require "open3" @@ -5,13 +6,20 @@ require "thread" require "tmpdir" -module GhostferryIntegration +module GhostferryHelper + GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration") + + def self.remove_all_binaries + FileUtils.remove_entry(GHOSTFERRY_TEMPDIR) if Dir.exist?(GHOSTFERRY_TEMPDIR) + end + class GhostferryExitFailure < StandardError end class Ghostferry # Manages compiling, running, and communicating with Ghostferry. # + # # To use this class: # # ghostferry = Ghostferry.new("path/to/main.go") @@ -52,22 +60,21 @@ def initialize(main_path, logger: nil, message_timeout: 30) @logger.level = Logger::DEBUG end - @tempdir = Dir.mktmpdir("ghostferry-integration") + FileUtils.mkdir_p(GHOSTFERRY_TEMPDIR, mode: 0700) # full name relative to the ghostferry root dir, with / replaced with _ # and the extension stripped. full_path = File.absolute_path(@main_path) full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_") - @compiled_binary_path = File.join(@tempdir, binary_name) - - reset_state - end + @compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name) - def reset_state @status_handlers = {} @stop_requested = false + @server_thread = nil + @subprocess_thread = nil + @server = nil @server_started_notifier = Queue.new @@ -84,7 +91,7 @@ def on_status(status, &block) end def compile_binary - return if File.exists?(@compiled_binary_path) + return if File.exist?(@compiled_binary_path) @logger.info("compiling test binary to #{@compiled_binary_path}") rc = system( @@ -224,19 +231,12 @@ def wait_until_ghostferry_run_is_complete @subprocess_thread.join if @subprocess_thread end - def remove_socket - File.unlink(SOCKET_PATH) if File.exists?(SOCKET_PATH) - end - - def remove_binary - FileUtils.remove_entry(@tempdir) unless @tempdir.nil? - end - def send_signal(signal) Process.kill(signal, @pid) if @pid != 0 end - def stop_and_cleanup + def kill + @logger.info("killing ghostferry") @stop_requested = true send_signal("KILL") begin @@ -244,7 +244,8 @@ def stop_and_cleanup rescue GhostferryExitFailure # ignore end - reset_state + + File.unlink(SOCKET_PATH) if File.exist?(SOCKET_PATH) end def run(resuming_state = nil) @@ -256,7 +257,7 @@ def run(resuming_state = nil) start_ghostferry(resuming_state) wait_until_ghostferry_run_is_complete ensure - remove_socket + kill end # When using this method, you need to call it within the block of diff --git a/test/integration/cases/trivial_integration_tests.rb b/test/integration/cases/trivial_integration_tests.rb deleted file mode 100644 index c13b7299..00000000 --- a/test/integration/cases/trivial_integration_tests.rb +++ /dev/null @@ -1,84 +0,0 @@ -require "json" -require "ghostferry_integration" - -class TrivialIntegrationTests < GhostferryIntegration::TestCase - def ghostferry_main_path - "go/minimal.go" - end - - def test_copy_data_without_any_writes_to_source - @dbs.seed_simple_database_with_single_table - @ghostferry.run - assert_test_table_is_identical - end - - def test_copy_data_with_writes_to_source - use_datawriter - - @dbs.seed_simple_database_with_single_table - - @ghostferry.run - assert_test_table_is_identical - end - - def test_interrupt_resume_with_writes_to_source - @dbs.seed_simple_database_with_single_table - - dumped_state = nil - with_isolated_setup_and_teardown do - use_datawriter - - batches_written = 0 - @ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do - batches_written += 1 - if batches_written >= 2 - @ghostferry.send_signal("TERM") - end - end - - dumped_state = @ghostferry.run_expecting_interrupt - assert_basic_fields_exist_in_dumped_state(dumped_state) - end - - # We want to write some data to the source database while Ghostferry is down - # to verify that it is copied over. - 5.times do - @datawriter.insert_data(@dbs.source) - @datawriter.update_data(@dbs.source) - @datawriter.delete_data(@dbs.source) - end - - with_isolated_setup_and_teardown do - use_datawriter - @ghostferry.run(dumped_state) - - assert_test_table_is_identical - end - end - - def test_interrupt_resume_when_table_has_completed - @dbs.seed_simple_database_with_single_table - dumped_state = nil - - results = @dbs.source.query("SELECT COUNT(*) as cnt FROM #{GhostferryIntegration::DbManager::DEFAULT_FULL_TABLE_NAME}") - rows = results.first["cnt"] - - with_isolated_setup_and_teardown do - use_datawriter - - @ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do - @ghostferry.send_signal("TERM") - end - - dumped_state = @ghostferry.run_expecting_interrupt - assert_basic_fields_exist_in_dumped_state(dumped_state) - end - - with_isolated_setup_and_teardown do - use_datawriter - @ghostferry.run(dumped_state) - - assert_test_table_is_identical - end - end -end diff --git a/test/integration/ruby/ghostferry_integration.rb b/test/integration/ruby/ghostferry_integration.rb deleted file mode 100644 index 18aed064..00000000 --- a/test/integration/ruby/ghostferry_integration.rb +++ /dev/null @@ -1,13 +0,0 @@ -require "thread" - -module GhostferryIntegration - ALPHANUMERICS = ("0".."9").to_a + ("a".."z").to_a + ("A".."Z").to_a - def self.rand_data(length: 32) - ALPHANUMERICS.sample(32).join("") + "👻⛴️" - end -end - -require_relative "ghostferry_integration/db_manager" -require_relative "ghostferry_integration/data_writer" -require_relative "ghostferry_integration/ghostferry" -require_relative "ghostferry_integration/test_case" diff --git a/test/integration/ruby/ghostferry_integration/db_manager.rb b/test/integration/ruby/ghostferry_integration/db_manager.rb deleted file mode 100644 index 56f0a893..00000000 --- a/test/integration/ruby/ghostferry_integration/db_manager.rb +++ /dev/null @@ -1,147 +0,0 @@ -require "logger" - -require "mysql2" - -module GhostferryIntegration - class DbManager - KNOWN_PORTS = [29291, 29292] - - DEFAULT_DB = "gftest" - DEFAULT_TABLE = "test_table_1" - - def self.full_table_name(db, table) - "`#{db}`.`#{table}`" - end - - DEFAULT_FULL_TABLE_NAME = full_table_name(DEFAULT_DB, DEFAULT_TABLE) - - def self.default_db_config(port:) - { - host: "127.0.0.1", - port: port, - username: "root", - password: "", - encoding: "utf8mb4", - collation: "utf8mb4_unicode_ci", - } - end - - def self.transaction(connection) - raise ArgumentError, "must pass a block" if !block_given? - - begin - connection.query("BEGIN") - yield - rescue - connection.query("ROLLBACK") - raise - else - connection.query("COMMIT") - end - end - - def initialize(ports: KNOWN_PORTS, logger: nil) - @ports = ports - - @connections = [] - ports.each do |port| - @connections << Mysql2::Client.new(self.class.default_db_config(port: port)) - end - - @logger = logger - if @logger.nil? - @logger = Logger.new(STDOUT) - @logger.level = Logger::DEBUG - end - end - - # Do not use these methods in a separate thread as these are the raw mysql2 - # connections, which are not threadsafe. - # - # If you need to use connections in another thread for some reason, create - # your own connections via source_db_config, such as in the case of the - # DataWriter. - def source - @connections[0] - end - - def target - @connections[1] - end - - def source_db_config - self.class.default_db_config(port: @ports[0]) - end - - def target_db_config - self.class.default_db_config(port: @ports[1]) - end - - def seed_random_data(connection, database_name: DEFAULT_DB, table_name: DEFAULT_TABLE, number_of_rows: 1111) - full_table_name = self.class.full_table_name(database_name, table_name) - - connection.query("CREATE DATABASE IF NOT EXISTS #{database_name}") - connection.query("CREATE TABLE IF NOT EXISTS #{full_table_name} (id bigint(20) not null auto_increment, data TEXT, primary key(id))") - - self.class.transaction(connection) do - insert_statement = connection.prepare("INSERT INTO #{full_table_name} (id, data) VALUES (?, ?)") - - number_of_rows.times do - insert_statement.execute(nil, GhostferryIntegration.rand_data) - end - end - end - - def seed_simple_database_with_single_table - # Setup the source database with data. - max_id = 1111 - seed_random_data(source, number_of_rows: max_id) - - # Create some holes in the data. - delete_statement = source.prepare("DELETE FROM #{self.class.full_table_name(DEFAULT_DB, DEFAULT_TABLE)} WHERE id = ?") - 140.times do - delete_statement.execute(Random.rand(max_id) + 1) - end - - # Setup the target database with no data but the correct schema. - seed_random_data(target, number_of_rows: 0) - end - - def reset_data - @connections.each do |connection| - connection.query("DROP DATABASE IF EXISTS `#{DEFAULT_DB}`") - end - end - - def source_and_target_table_metrics(tables: [DEFAULT_FULL_TABLE_NAME]) - source_metrics = {} - target_metrics = {} - - tables.each do |table| - source_metrics[table] = table_metric(source, table) - target_metrics[table] = table_metric(target, table, sample_id: source_metrics[table][:sample_row]["id"]) - end - - [source_metrics, target_metrics] - end - - def table_metric(conn, table, sample_id: nil) - metrics = {} - result = conn.query("CHECKSUM TABLE #{table}") - metrics[:checksum] = result.first["Checksum"] - - result = conn.query("SELECT COUNT(*) AS cnt FROM #{table}") - metrics[:row_count] = result.first["cnt"] - - if sample_id.nil? - result = conn.query("SELECT * FROM #{table} ORDER BY RAND() LIMIT 1") - metrics[:sample_row] = result.first - else - result = conn.query("SELECT * FROM #{table} WHERE id = #{sample_id} LIMIT 1") - metrics[:sample_row] = result.first - end - - metrics - end - end -end diff --git a/test/integration/ruby/ghostferry_integration/test_case.rb b/test/integration/ruby/ghostferry_integration/test_case.rb deleted file mode 100644 index c97efada..00000000 --- a/test/integration/ruby/ghostferry_integration/test_case.rb +++ /dev/null @@ -1,139 +0,0 @@ -require "logger" -require "minitest" -require "minitest/hooks/test" - -module GhostferryIntegration - class TestCase < Minitest::Test - include Minitest::Hooks - include GhostferryIntegration - - ############## - # Test Hooks # - ############## - - def before_all - @logger = Logger.new(STDOUT) - if ENV["DEBUG"] == "1" - @logger.level = Logger::DEBUG - else - @logger.level = Logger::INFO - end - @ghostferry = Ghostferry.new(ghostferry_main_path, logger: @logger) - end - - def after_all - @ghostferry.remove_binary - end - - def before_setup - @dbs = DbManager.new(logger: @logger) - @dbs.reset_data - - setup_ghostferry_datawriter - end - - def after_teardown - teardown_ghostferry_datawriter - end - - ###################### - # Test Setup Helpers # - ###################### - - # If multiple Ghostferry runs are needed within a single test, such as in - # the case of interrupt/resume testing, we will need to wrap each - # @ghostferry.run within a block for this method. - # - # This method doesn't destroy the database state like before_setup and - # after_teardown does. - def with_isolated_setup_and_teardown - setup_ghostferry_datawriter - yield - teardown_ghostferry_datawriter - end - - # This setup the datawriter to start when Ghostferry start and stop when - # cutover is about to take place. - # - # The on_write block is called everytime the datawriter writes a row with - # the argument op, id. - # - # op: "INSERT"/"UPDATE"/"DELETE" - # id: the primary id of the row inserted - def use_datawriter(&on_write) - start_datawriter_with_ghostferry(&on_write) - stop_datawriter_during_cutover - end - - ##################### - # Assertion Helpers # - ##################### - - def assert_test_table_is_identical - source, target = @dbs.source_and_target_table_metrics - - assert source[DbManager::DEFAULT_FULL_TABLE_NAME][:row_count] > 0 - assert target[DbManager::DEFAULT_FULL_TABLE_NAME][:row_count] > 0 - - assert_equal( - source[DbManager::DEFAULT_FULL_TABLE_NAME][:checksum], - target[DbManager::DEFAULT_FULL_TABLE_NAME][:checksum], - ) - - assert_equal( - source[DbManager::DEFAULT_FULL_TABLE_NAME][:sample_row], - target[DbManager::DEFAULT_FULL_TABLE_NAME][:sample_row], - ) - end - - # Use this method to assert the validity of the structure of the dumped - # state. - # - # To actually assert the validity of the data within the dumped state, you - # have to do it manually. - def assert_basic_fields_exist_in_dumped_state(dumped_state) - refute dumped_state.nil? - refute dumped_state["GhostferryVersion"].nil? - refute dumped_state["LastKnownTableSchemaCache"].nil? - refute dumped_state["LastSuccessfulPrimaryKeys"].nil? - refute dumped_state["CompletedTables"].nil? - refute dumped_state["LastWrittenBinlogPosition"].nil? - end - - protected - def ghostferry_main_path - raise NotImplementedError - end - - private - - def start_datawriter_with_ghostferry(&on_write) - @ghostferry.on_status(Ghostferry::Status::READY) do - @datawriter.start(&on_write) - end - end - - def stop_datawriter_during_cutover - @ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do - # At the start of the cutover phase, we have to set the database to - # read-only. This is done by stopping the datawriter. - @datawriter.stop - @datawriter.join - end - end - - def setup_ghostferry_datawriter - @ghostferry.reset_state - @datawriter = DataWriter.new(@dbs.source_db_config, logger: @logger) - end - - # This should be a no op if ghostferry and datawriter have already been - # stopped. - def teardown_ghostferry_datawriter - @datawriter.stop - @datawriter.join - - @ghostferry.stop_and_cleanup - end - end -end diff --git a/test/integration/test.rb b/test/integration/test.rb deleted file mode 100644 index ece2b0d3..00000000 --- a/test/integration/test.rb +++ /dev/null @@ -1,6 +0,0 @@ -require "minitest/autorun" - -ruby_path = File.join(File.absolute_path(File.dirname(__FILE__)), "ruby") -$LOAD_PATH.unshift(ruby_path) unless $LOAD_PATH.include?(ruby_path) - -require_relative "cases/trivial_integration_tests" diff --git a/test/integration/trivial_integration_test.rb b/test/integration/trivial_integration_test.rb new file mode 100644 index 00000000..a73283da --- /dev/null +++ b/test/integration/trivial_integration_test.rb @@ -0,0 +1,94 @@ +require "test_helper" + +require "json" + +class TrivialIntegrationTests < GhostferryTestCase + def test_copy_data_without_any_writes_to_source + seed_simple_database_with_single_table + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + ghostferry.run + + assert_test_table_is_identical + end + + def test_copy_data_with_writes_to_source + seed_simple_database_with_single_table + + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + stop_datawriter_during_cutover(datawriter, ghostferry) + + ghostferry.run + assert_test_table_is_identical + end + + def test_interrupt_resume_with_writes_to_source + seed_simple_database_with_single_table + + # Start a ghostferry run expecting it to be interrupted. + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + stop_datawriter_during_cutover(datawriter, ghostferry) + + batches_written = 0 + ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + batches_written += 1 + if batches_written >= 2 + ghostferry.send_signal("TERM") + end + end + + dumped_state = ghostferry.run_expecting_interrupt + assert_basic_fields_exist_in_dumped_state(dumped_state) + + # We want to write some data to the source database while Ghostferry is down + # to verify that it is copied over. + 5.times do + datawriter.insert_data(source_db) + datawriter.update_data(source_db) + datawriter.delete_data(source_db) + end + + # Resume Ghostferry with dumped state + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + stop_datawriter_during_cutover(datawriter, ghostferry) + + ghostferry.run(dumped_state) + + assert_test_table_is_identical + end + + def test_interrupt_resume_when_table_has_completed + seed_simple_database_with_single_table + + # Start a run of Ghostferry expecting to be interrupted + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + stop_datawriter_during_cutover(datawriter, ghostferry) + + ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do + ghostferry.send_signal("TERM") + end + + dumped_state = ghostferry.run_expecting_interrupt + assert_basic_fields_exist_in_dumped_state(dumped_state) + + # Resume ghostferry from interrupted state + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + stop_datawriter_during_cutover(datawriter, ghostferry) + ghostferry.run(dumped_state) + end +end diff --git a/test/integration/go/integrationferry/integrationferry.go b/test/lib/go/integrationferry/integrationferry.go similarity index 100% rename from test/integration/go/integrationferry/integrationferry.go rename to test/lib/go/integrationferry/integrationferry.go diff --git a/test/integration/go/minimal.go b/test/lib/go/minimal.go similarity index 85% rename from test/integration/go/minimal.go rename to test/lib/go/minimal.go index 4cf778af..3220d359 100644 --- a/test/integration/go/minimal.go +++ b/test/lib/go/minimal.go @@ -2,7 +2,7 @@ package main import ( "github.com/Shopify/ghostferry" - "github.com/Shopify/ghostferry/test/integration/go/integrationferry" + "github.com/Shopify/ghostferry/test/lib/go/integrationferry" "github.com/sirupsen/logrus" ) diff --git a/test/test_helper.rb b/test/test_helper.rb new file mode 100644 index 00000000..2c18b0fe --- /dev/null +++ b/test/test_helper.rb @@ -0,0 +1,107 @@ +require "logger" +require "minitest" +require "minitest/autorun" +require "minitest/hooks/test" + +GO_CODE_PATH = File.join(File.absolute_path(File.dirname(__FILE__)), "lib", "go") + +require "db_helper" +require "ghostferry_helper" +require "data_writer_helper" + +Minitest.after_run do + GhostferryHelper.remove_all_binaries +end + +class GhostferryTestCase < Minitest::Test + include Minitest::Hooks + include GhostferryHelper + include DbHelper + include DataWriterHelper + + MINIMAL_GHOSTFERRY = "minimal.go" + + def new_ghostferry(filename) + # Transform path to something ruby understands + path = File.join(GO_CODE_PATH, filename) + g = Ghostferry.new(path, logger: @logger) + @ghostferry_instances << g + g + end + + def new_source_datawriter(*args) + dw = DataWriter.new(source_db_config, *args, logger: @logger) + @datawriter_instances << dw + dw + end + + ############## + # Test Hooks # + ############## + + def before_all + @logger = Logger.new(STDOUT) + if ENV["DEBUG"] == "1" + @logger.level = Logger::DEBUG + else + @logger.level = Logger::INFO + end + + initialize_db_connections + end + + def before_setup + # Any ghostferry instances created via the new_ghostferry method will be + # pushed to here, which allows the test to kill the process after each test + # should there be a hung process/failed test/errored test. + @ghostferry_instances = [] + + # Same thing with DataWriter as above + @datawriter_instances = [] + end + + def after_teardown + @ghostferry_instances.each do |ghostferry| + ghostferry.kill + end + + @datawriter_instances.each do |datawriter| + datawriter.stop_and_join + end + end + + ##################### + # Assertion Helpers # + ##################### + + def assert_test_table_is_identical + source, target = source_and_target_table_metrics + + assert source[DEFAULT_FULL_TABLE_NAME][:row_count] > 0 + assert target[DEFAULT_FULL_TABLE_NAME][:row_count] > 0 + + assert_equal( + source[DEFAULT_FULL_TABLE_NAME][:checksum], + target[DEFAULT_FULL_TABLE_NAME][:checksum], + ) + + assert_equal( + source[DEFAULT_FULL_TABLE_NAME][:sample_row], + target[DEFAULT_FULL_TABLE_NAME][:sample_row], + ) + end + + # Use this method to assert the validity of the structure of the dumped + # state. + # + # To actually assert the validity of the data within the dumped state, you + # have to do it manually. + def assert_basic_fields_exist_in_dumped_state(dumped_state) + refute dumped_state.nil? + refute dumped_state["GhostferryVersion"].nil? + refute dumped_state["LastKnownTableSchemaCache"].nil? + refute dumped_state["LastSuccessfulPrimaryKeys"].nil? + refute dumped_state["CompletedTables"].nil? + refute dumped_state["LastWrittenBinlogPosition"].nil? + end +end