diff --git a/Gemfile b/Gemfile new file mode 100644 index 00000000..e444bd32 --- /dev/null +++ b/Gemfile @@ -0,0 +1,7 @@ +source "https://rubygems.org" + +gem "minitest" +gem "minitest-hooks" +gem "mysql2" + +gem "rake" diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 00000000..e9b0961f --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,20 @@ +GEM + remote: https://rubygems.org/ + specs: + minitest (5.11.3) + minitest-hooks (1.5.0) + minitest (> 5.3) + mysql2 (0.5.2) + rake (12.3.2) + +PLATFORMS + ruby + +DEPENDENCIES + minitest + minitest-hooks + mysql2 + rake + +BUNDLED WITH + 1.16.1 diff --git a/Makefile b/Makefile index 1100bbb3..92dc5e16 100644 --- a/Makefile +++ b/Makefile @@ -51,7 +51,8 @@ $(GOBIN): test: @go version - go test ./test ./copydb/test ./sharding/test -p 1 -v + go test ./test/go ./copydb/test ./sharding/test -p 1 -v + bundle install && bundle exec rake test clean: rm -rf build diff --git a/Rakefile b/Rakefile new file mode 100644 index 00000000..fba6a93c --- /dev/null +++ b/Rakefile @@ -0,0 +1,9 @@ +require 'rake/testtask' + +task :default => [:test] + +Rake::TestTask.new do |t| + t.test_files = FileList['test/**/*test.rb'] + t.verbose = true + t.libs << ["test", "test/helpers", "test/lib"] +end diff --git a/dev.yml b/dev.yml index e47fbda2..9bbe23b4 100644 --- a/dev.yml +++ b/dev.yml @@ -4,6 +4,8 @@ up: - homebrew: - glide - mysql + - ruby: 2.5.1 + - bundler - go: version: 1.10.3 - custom: diff --git a/test/binlog_streamer_test.go b/test/go/binlog_streamer_test.go similarity index 100% rename from test/binlog_streamer_test.go rename to test/go/binlog_streamer_test.go diff --git a/test/config_test.go b/test/go/config_test.go similarity index 100% rename from test/config_test.go rename to test/go/config_test.go diff --git a/test/control_server_test.go b/test/go/control_server_test.go similarity index 100% rename from test/control_server_test.go rename to test/go/control_server_test.go diff --git a/test/data_iterator_test.go b/test/go/data_iterator_test.go similarity index 100% rename from test/data_iterator_test.go rename to test/go/data_iterator_test.go diff --git a/test/dml_events_test.go b/test/go/dml_events_test.go similarity index 100% rename from test/dml_events_test.go rename to test/go/dml_events_test.go diff --git a/test/error_handler_test.go b/test/go/error_handler_test.go similarity index 100% rename from test/error_handler_test.go rename to test/go/error_handler_test.go diff --git a/test/ferry_test.go b/test/go/ferry_test.go similarity index 100% rename from test/ferry_test.go rename to test/go/ferry_test.go diff --git a/test/iterative_verifier_collation_test.go b/test/go/iterative_verifier_collation_test.go similarity index 100% rename from test/iterative_verifier_collation_test.go rename to test/go/iterative_verifier_collation_test.go diff --git a/test/iterative_verifier_integration_test.go b/test/go/iterative_verifier_integration_test.go similarity index 100% rename from test/iterative_verifier_integration_test.go rename to test/go/iterative_verifier_integration_test.go diff --git a/test/iterative_verifier_test.go b/test/go/iterative_verifier_test.go similarity index 100% rename from test/iterative_verifier_test.go rename to test/go/iterative_verifier_test.go diff --git a/test/lag_throttler_test.go b/test/go/lag_throttler_test.go similarity index 100% rename from test/lag_throttler_test.go rename to test/go/lag_throttler_test.go diff --git a/test/metrics_test.go b/test/go/metrics_test.go similarity index 100% rename from test/metrics_test.go rename to test/go/metrics_test.go diff --git a/test/race_conditions_integration_test.go b/test/go/race_conditions_integration_test.go similarity index 100% rename from test/race_conditions_integration_test.go rename to test/go/race_conditions_integration_test.go diff --git a/test/replication_config_test.go b/test/go/replication_config_test.go similarity index 100% rename from test/replication_config_test.go rename to test/go/replication_config_test.go diff --git a/test/row_batch_test.go b/test/go/row_batch_test.go similarity index 100% rename from test/row_batch_test.go rename to test/go/row_batch_test.go diff --git a/test/status_test.go b/test/go/status_test.go similarity index 100% rename from test/status_test.go rename to test/go/status_test.go diff --git a/test/table_schema_cache_test.go b/test/go/table_schema_cache_test.go similarity index 100% rename from test/table_schema_cache_test.go rename to test/go/table_schema_cache_test.go diff --git a/test/throttler_test.go b/test/go/throttler_test.go similarity index 100% rename from test/throttler_test.go rename to test/go/throttler_test.go diff --git a/test/trivial_integration_test.go b/test/go/trivial_integration_test.go similarity index 100% rename from test/trivial_integration_test.go rename to test/go/trivial_integration_test.go diff --git a/test/types_integration_test.go b/test/go/types_integration_test.go similarity index 100% rename from test/types_integration_test.go rename to test/go/types_integration_test.go diff --git a/test/utils_test.go b/test/go/utils_test.go similarity index 100% rename from test/utils_test.go rename to test/go/utils_test.go diff --git a/test/verifier_test.go b/test/go/verifier_test.go similarity index 100% rename from test/verifier_test.go rename to test/go/verifier_test.go diff --git a/test/wait_until_replica_is_caught_up_to_master_test.go b/test/go/wait_until_replica_is_caught_up_to_master_test.go similarity index 100% rename from test/wait_until_replica_is_caught_up_to_master_test.go rename to test/go/wait_until_replica_is_caught_up_to_master_test.go diff --git a/test/helpers/data_writer_helper.rb b/test/helpers/data_writer_helper.rb new file mode 100644 index 00000000..41341195 --- /dev/null +++ b/test/helpers/data_writer_helper.rb @@ -0,0 +1,132 @@ +require "logger" +require "thread" + +require "db_helper" +require "ghostferry_helper" +require "mysql2" + +module DataWriterHelper + def start_datawriter_with_ghostferry(dw, gf, &on_write) + gf.on_status(GhostferryHelper::Ghostferry::Status::READY) do + dw.start(&on_write) + end + end + + def stop_datawriter_during_cutover(dw, gf) + gf.on_status(GhostferryHelper::Ghostferry::Status::ROW_COPY_COMPLETED) do + # At the start of the cutover phase, we have to set the database to + # read-only. This is done by stopping the datawriter. + dw.stop_and_join + end + end + + class DataWriter + # A threaded data writer that just hammers the database with write + # queries as much as possible. + # + # This is used essentially for random testing. + def initialize(db_config, + tables: [DbHelper::DEFAULT_FULL_TABLE_NAME], + insert_probability: 0.33, + update_probability: 0.33, + delete_probability: 0.34, + number_of_writers: 1, + logger: nil + ) + @db_config = db_config + @tables = tables + + @number_of_writers = number_of_writers + @insert_probability = [0, insert_probability] + @update_probability = [@insert_probability[1], @insert_probability[1] + update_probability] + @delete_probability = [@update_probability[1], @update_probability[1] + delete_probability] + + @threads = [] + @started = false + @stop_requested = false + + @logger = logger + if @logger.nil? + @logger = Logger.new(STDOUT) + @logger.level = Logger::DEBUG + end + end + + def start(&on_write) + raise "Cannot start DataWriter multiple times. Use a new instance instead " if @started + @started = true + @number_of_writers.times do |i| + @threads << Thread.new do + @logger.info("starting data writer thread #{i}") + + connection = Mysql2::Client.new(@db_config) + until @stop_requested do + write_data(connection, &on_write) + end + + @logger.info("stopped data writer thread #{i}") + end + end + end + + def stop_and_join + @stop_requested = true + join + end + + def join + @threads.each do |t| + t.join + end + end + + def write_data(connection, &on_write) + r = rand + + if r >= @insert_probability[0] && r < @insert_probability[1] + id = insert_data(connection) + op = "INSERT" + elsif r >= @update_probability[0] && r < @update_probability[1] + id = update_data(connection) + op = "UPDATE" + elsif r >= @delete_probability[0] && r < @delete_probability[1] + id = delete_data(connection) + op = "DELETE" + end + + @logger.debug("writing data: #{op} #{id}") + on_write.call(op, id) unless on_write.nil? + end + + def insert_data(connection) + table = @tables.sample + insert_statement = connection.prepare("INSERT INTO #{table} (id, data) VALUES (?, ?)") + insert_statement.execute(nil, DbHelper.rand_data) + connection.last_id + end + + def update_data(connection) + table = @tables.sample + id = random_real_id(connection, table) + update_statement = connection.prepare("UPDATE #{table} SET data = ? WHERE id >= ? LIMIT 1") + update_statement.execute(DbHelper.rand_data, id) + id + end + + def delete_data(connection) + table = @tables.sample + id = random_real_id(connection, table) + delete_statement = connection.prepare("DELETE FROM #{table} WHERE id >= ? LIMIT 1") + delete_statement.execute(id) + id + end + + def random_real_id(connection, table) + # This query is slow for large datasets. + # For testing purposes, this should be okay. + result = connection.query("SELECT id FROM #{table} ORDER BY RAND() LIMIT 1") + raise "No rows in the database?" if result.first.nil? + result.first["id"] + end + end +end diff --git a/test/helpers/db_helper.rb b/test/helpers/db_helper.rb new file mode 100644 index 00000000..a8c8e61d --- /dev/null +++ b/test/helpers/db_helper.rb @@ -0,0 +1,150 @@ +require "logger" +require "mysql2" + +module DbHelper + ALPHANUMERICS = ("0".."9").to_a + ("a".."z").to_a + ("A".."Z").to_a + DB_PORTS = {source: 29291, target: 29292} + + DEFAULT_DB = "gftest" + DEFAULT_TABLE = "test_table_1" + + def self.full_table_name(db, table) + "`#{db}`.`#{table}`" + end + + def self.rand_data(length: 32) + ALPHANUMERICS.sample(length).join("") + "👻⛴️" + end + + DEFAULT_FULL_TABLE_NAME = full_table_name(DEFAULT_DB, DEFAULT_TABLE) + + def full_table_name(db, table) + DbHelper.full_table_name(db, table) + end + + def rand_data(length: 32) + DbHelper.rand_data(length: length) + end + + def default_db_config(port:) + { + host: "127.0.0.1", + port: port, + username: "root", + password: "", + encoding: "utf8mb4", + collation: "utf8mb4_unicode_ci", + } + end + + def transaction(connection) + raise ArgumentError, "must pass a block" if !block_given? + + begin + connection.query("BEGIN") + yield + rescue + connection.query("ROLLBACK") + raise + else + connection.query("COMMIT") + end + end + + def initialize_db_connections + @connections = {} + DB_PORTS.each do |name, port| + @connections[name] = Mysql2::Client.new(default_db_config(port: port)) + end + end + + def source_db + @connections[:source] + end + + def target_db + @connections[:target] + end + + def source_db_config + default_db_config(port: DB_PORTS[:source]) + end + + def target_db_config + default_db_config(port: DB_PORTS[:target]) + end + + # Database Seeding Methods + ########################## + # Each test case can choose what kind of database it wants to setup by + # calling one of these methods. + + def reset_data + @connections.each do |_, connection| + connection.query("DROP DATABASE IF EXISTS `#{DEFAULT_DB}`") + end + end + + def seed_random_data(connection, database_name: DEFAULT_DB, table_name: DEFAULT_TABLE, number_of_rows: 1111) + dbtable = full_table_name(database_name, table_name) + + connection.query("CREATE DATABASE IF NOT EXISTS #{database_name}") + connection.query("CREATE TABLE IF NOT EXISTS #{dbtable} (id bigint(20) not null auto_increment, data TEXT, primary key(id))") + + transaction(connection) do + insert_statement = connection.prepare("INSERT INTO #{dbtable} (id, data) VALUES (?, ?)") + + number_of_rows.times do + insert_statement.execute(nil, rand_data) + end + end + end + + def seed_simple_database_with_single_table + # Setup the source database with data. + max_id = 1111 + seed_random_data(source_db, number_of_rows: max_id) + + # Create some holes in the data. + delete_statement = source_db.prepare("DELETE FROM #{full_table_name(DEFAULT_DB, DEFAULT_TABLE)} WHERE id = ?") + 140.times do + delete_statement.execute(Random.rand(max_id) + 1) + end + + # Setup the target database with no data but the correct schema. + seed_random_data(target_db, number_of_rows: 0) + end + + # Get some overall metrics like CHECKSUM, row count, sample row from tables. + # Generally used for test validation. + def source_and_target_table_metrics(tables: [DEFAULT_FULL_TABLE_NAME]) + source_metrics = {} + target_metrics = {} + + tables.each do |table| + source_metrics[table] = table_metric(source_db, table) + target_metrics[table] = table_metric(target_db, table, sample_id: source_metrics[table][:sample_row]["id"]) + end + + [source_metrics, target_metrics] + end + + def table_metric(conn, table, sample_id: nil) + metrics = {} + result = conn.query("CHECKSUM TABLE #{table}") + metrics[:checksum] = result.first["Checksum"] + + result = conn.query("SELECT COUNT(*) AS cnt FROM #{table}") + metrics[:row_count] = result.first["cnt"] + + if sample_id.nil? + result = conn.query("SELECT * FROM #{table} ORDER BY RAND() LIMIT 1") + metrics[:sample_row] = result.first + else + result = conn.query("SELECT * FROM #{table} WHERE id = #{sample_id} LIMIT 1") + metrics[:sample_row] = result.first + end + + metrics + end +end diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb new file mode 100644 index 00000000..6d78f0bf --- /dev/null +++ b/test/helpers/ghostferry_helper.rb @@ -0,0 +1,284 @@ +require "fileutils" +require "json" +require "logger" +require "open3" +require "thread" +require "tmpdir" +require "webrick" +require "cgi" + +module GhostferryHelper + GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration") + + def self.remove_all_binaries + FileUtils.remove_entry(GHOSTFERRY_TEMPDIR) if Dir.exist?(GHOSTFERRY_TEMPDIR) + end + + class GhostferryExitFailure < StandardError + end + + class Ghostferry + # Manages compiling, running, and communicating with Ghostferry. + # + # + # To use this class: + # + # ghostferry = Ghostferry.new("path/to/main.go") + # ghostferry.on_status(Ghostferry::Status::BEFORE_ROW_COPY) do + # # do custom work here, such as injecting data into the database + # end + # ghostferry.run + + # Keep these in sync with integrationferry.go + ENV_KEY_PORT = "GHOSTFERRY_INTEGRATION_PORT" + + module Status + # This should be in sync with integrationferry.go + READY = "READY" + BINLOG_STREAMING_STARTED = "BINLOG_STREAMING_STARTED" + ROW_COPY_COMPLETED = "ROW_COPY_COMPLETED" + DONE = "DONE" + + BEFORE_ROW_COPY = "BEFORE_ROW_COPY" + AFTER_ROW_COPY = "AFTER_ROW_COPY" + BEFORE_BINLOG_APPLY = "BEFORE_BINLOG_APPLY" + AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY" + end + + attr_reader :stdout, :stderr, :exit_status, :pid + + def initialize(main_path, logger: nil, message_timeout: 30, port: 39393) + @logger = logger + if @logger.nil? + @logger = Logger.new(STDOUT) + @logger.level = Logger::DEBUG + end + + @main_path = main_path + @message_timeout = message_timeout + + @status_handlers = {} + + @server_thread = nil + @server_watchdog_thread = nil + @subprocess_thread = nil + + @server = nil + @server_last_error = nil + @server_port = port + + @pid = 0 + @exit_status = nil + @stdout = [] + @stderr = [] + + # Setup the directory to the compiled binary under the system temporary + # directory. + FileUtils.mkdir_p(GHOSTFERRY_TEMPDIR, mode: 0700) + + # To guarentee that the compiled binary will have an unique name, we use + # the full path of the file relative to the ghostferry root directory as + # the binary name. In order to avoid having / in the binary name all / in + # the full path is replaced with _. The .go extension is also stripped to + # avoid confusion. + full_path = File.absolute_path(@main_path) + full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory + binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_") + @compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name) + end + + # The main method to call to run a Ghostferry subprocess. + def run(resuming_state = nil) + resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) + + compile_binary + start_server + start_ghostferry(resuming_state) + start_server_watchdog + + @subprocess_thread.join + @server_watchdog_thread.join + @server_thread.join + ensure + kill + raise @server_last_error unless @server_last_error.nil? + end + + # When using this method, you need to ensure that the datawriter has been + # stopped properly (if you're using stop_datawriter_during_cutover). + def run_expecting_interrupt(resuming_state = nil) + run(resuming_state) + rescue GhostferryExitFailure + dumped_state = @stdout.join("") + JSON.parse(dumped_state) + else + raise "Ghostferry did not get interrupted" + end + + ###################################################### + # Methods representing the different stages of `run` # + ###################################################### + + def compile_binary + return if File.exist?(@compiled_binary_path) + + @logger.info("compiling test binary to #{@compiled_binary_path}") + rc = system( + "go", "build", + "-o", @compiled_binary_path, + @main_path + ) + + raise "could not compile ghostferry" unless rc + end + + def start_server + @server_last_error = nil + + @last_message_time = Time.now + @server = WEBrick::HTTPServer.new( + BindAddress: "127.0.0.1", + Port: @server_port, + Logger: @logger, + AccessLog: [], + ) + + @server.mount_proc "/" do |req, resp| + begin + unless req.body + @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send form data") + resp.status = 400 + @server.shutdown + end + + query = CGI::parse(req.body) + + status = query["status"] + data = query["data"] + + unless status + @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status") + resp.status = 400 + @server.shutdown + end + + status = status.first + + @last_message_time = Time.now + @status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil? + rescue StandardError => e + # errors are not reported from WEBrick but the server should fail early + # as this indicates there is likely a programming error. + @server_last_error = e + @server.shutdown + end + end + + @server_thread = Thread.new do + @logger.info("starting server thread") + @server.start + @logger.info("server thread stopped") + end + end + + def start_ghostferry(resuming_state = nil) + @subprocess_thread = Thread.new do + # No need to spam the logs with Ghostferry interrupted exceptions if + # Ghostferry is supposed to be interrupted. + Thread.current.report_on_exception = false + + environment = { + ENV_KEY_PORT => @server_port.to_s + } + + @logger.info("starting ghostferry test binary #{@compiled_binary_path}") + Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr| + stdin.puts(resuming_state) unless resuming_state.nil? + stdin.close + + @pid = wait_thr.pid + + reads = [stdout, stderr] + until reads.empty? do + ready_reads, _, _ = IO.select(reads) + ready_reads.each do |reader| + line = reader.gets + if line.nil? + # EOF effectively + reads.delete(reader) + next + end + + line.tr!("\n", '') # remove trailing newline + + if reader == stdout + @stdout << line + @logger.debug("stdout: #{line}") + elsif reader == stderr + @stderr << line + @logger.debug("stderr: #{line}") + end + end + end + + @exit_status = wait_thr.value + @pid = 0 + end + + @logger.info("ghostferry test binary exitted: #{@exit_status}") + if @exit_status.exitstatus != 0 + raise GhostferryExitFailure, "ghostferry test binary returned non-zero status: #{@exit_status}" + end + end + end + + def start_server_watchdog + # If the subprocess hangs or exits abnormally due to a bad implementation + # (panic/other unexpected errors) we need to make sure to terminate the + # HTTP server to free up the port. + @server_watchdog_thread = Thread.new do + while @subprocess_thread.alive? do + if Time.now - @last_message_time > @message_timeout + @server.shutdown + raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s" + end + + sleep 1 + end + + @server.shutdown + @logger.info("server watchdog thread stopped") + end + end + + ################### + # Utility methods # + ################### + + def on_status(status, &block) + raise "must specify a block" unless block_given? + @status_handlers[status] ||= [] + @status_handlers[status] << block + end + + def send_signal(signal) + Process.kill(signal, @pid) if @pid != 0 + end + + def kill + @server.shutdown unless @server.nil? + send_signal("KILL") + + # Need to ensure the server shutdown before returning so the port gets + # freed and can be reused. + @server_thread.join if @server_thread + @server_watchdog_thread.join if @server_watchdog_thread + + begin + @subprocess_thread.join if @subprocess_thread + rescue GhostferryExitFailure + # ignore + end + end + end +end diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb new file mode 100644 index 00000000..894766a0 --- /dev/null +++ b/test/integration/interrupt_resume_test.rb @@ -0,0 +1,87 @@ +require "test_helper" + +require "json" + +class InterruptResumeTest < GhostferryTestCase + def setup + seed_simple_database_with_single_table + end + + def test_interrupt_resume_without_writes_to_source_to_check_target_state_when_interrupted + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + # Writes one batch + ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + ghostferry.send_signal("TERM") + end + + dumped_state = ghostferry.run_expecting_interrupt + assert_basic_fields_exist_in_dumped_state(dumped_state) + + result = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}") + count = result.first["cnt"] + assert_equal 200, count + + result = target_db.query("SELECT MAX(id) AS max_id FROM #{DEFAULT_FULL_TABLE_NAME}") + last_successful_id = result.first["max_id"] + assert last_successful_id > 0 + assert_equal last_successful_id, dumped_state["LastSuccessfulPrimaryKeys"]["#{DEFAULT_DB}.#{DEFAULT_TABLE}"] + end + + def test_interrupt_resume_with_writes_to_source + # Start a ghostferry run expecting it to be interrupted. + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + + batches_written = 0 + ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + batches_written += 1 + if batches_written >= 2 + ghostferry.send_signal("TERM") + end + end + + dumped_state = ghostferry.run_expecting_interrupt + assert_basic_fields_exist_in_dumped_state(dumped_state) + + # Resume Ghostferry with dumped state + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + # The datawriter is still writing to the database since earlier, so we need + # to stop it during cutover. + stop_datawriter_during_cutover(datawriter, ghostferry) + + ghostferry.run(dumped_state) + + assert_test_table_is_identical + end + + def test_interrupt_resume_when_table_has_completed + # Start a run of Ghostferry expecting to be interrupted + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + stop_datawriter_during_cutover(datawriter, ghostferry) + + ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do + ghostferry.send_signal("TERM") + end + + dumped_state = ghostferry.run_expecting_interrupt + assert_basic_fields_exist_in_dumped_state(dumped_state) + + # Resume ghostferry from interrupted state + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + stop_datawriter_during_cutover(datawriter, ghostferry) + + ghostferry.run(dumped_state) + + assert_test_table_is_identical + end +end diff --git a/test/integration/trivial_test.rb b/test/integration/trivial_test.rb new file mode 100644 index 00000000..009edd4b --- /dev/null +++ b/test/integration/trivial_test.rb @@ -0,0 +1,25 @@ +require "test_helper" + +class TrivialIntegrationTests < GhostferryTestCase + def test_copy_data_without_any_writes_to_source + seed_simple_database_with_single_table + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + ghostferry.run + + assert_test_table_is_identical + end + + def test_copy_data_with_writes_to_source + seed_simple_database_with_single_table + + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + stop_datawriter_during_cutover(datawriter, ghostferry) + + ghostferry.run + assert_test_table_is_identical + end +end diff --git a/test/lib/go/integrationferry.go b/test/lib/go/integrationferry.go new file mode 100644 index 00000000..055a852a --- /dev/null +++ b/test/lib/go/integrationferry.go @@ -0,0 +1,217 @@ +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "os" + "sync" + "time" + + "github.com/Shopify/ghostferry" + "github.com/Shopify/ghostferry/testhelpers" + "github.com/sirupsen/logrus" +) + +const ( + // These should be kept in sync with ghostferry.rb + portEnvName string = "GHOSTFERRY_INTEGRATION_PORT" + timeout time.Duration = 30 * time.Second +) + +const ( + // These should be kept in sync with ghostferry.rb + + // Could only be sent once by the main thread + StatusReady string = "READY" + StatusBinlogStreamingStarted string = "BINLOG_STREAMING_STARTED" + StatusRowCopyCompleted string = "ROW_COPY_COMPLETED" + StatusDone string = "DONE" + + // Could be sent by multiple goroutines in parallel + StatusBeforeRowCopy string = "BEFORE_ROW_COPY" + StatusAfterRowCopy string = "AFTER_ROW_COPY" + StatusBeforeBinlogApply string = "BEFORE_BINLOG_APPLY" + StatusAfterBinlogApply string = "AFTER_BINLOG_APPLY" +) + +type IntegrationFerry struct { + *ghostferry.Ferry +} + +// ========================================= +// Code for integration server communication +// ========================================= + +func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, data ...string) error { + integrationPort := os.Getenv(portEnvName) + if integrationPort == "" { + return fmt.Errorf("environment variable %s must be specified", portEnvName) + } + + client := &http.Client{ + Timeout: timeout, + } + + resp, err := client.PostForm(fmt.Sprintf("http://localhost:%s", integrationPort), url.Values{ + "status": []string{status}, + "data": data, + }) + + if err != nil { + return err + } + + if resp.StatusCode != 200 { + return fmt.Errorf("server returned invalid status: %d", resp.StatusCode) + } + + return nil +} + +// Method override for Start in order to send status to the integration +// server. +func (f *IntegrationFerry) Start() error { + f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error { + return f.SendStatusAndWaitUntilContinue(StatusBeforeRowCopy, rowBatch.TableSchema().Name) + }) + + f.Ferry.BinlogStreamer.AddEventListener(func(events []ghostferry.DMLEvent) error { + return f.SendStatusAndWaitUntilContinue(StatusBeforeBinlogApply) + }) + + err := f.Ferry.Start() + if err != nil { + return err + } + + f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error { + return f.SendStatusAndWaitUntilContinue(StatusAfterRowCopy, rowBatch.TableSchema().Name) + }) + + f.Ferry.BinlogStreamer.AddEventListener(func(events []ghostferry.DMLEvent) error { + return f.SendStatusAndWaitUntilContinue(StatusAfterBinlogApply) + }) + + return nil +} + +// =========================================== +// Code to handle an almost standard Ferry run +// =========================================== +func (f *IntegrationFerry) Main() error { + var err error + + err = f.SendStatusAndWaitUntilContinue(StatusReady) + if err != nil { + return err + } + + err = f.Initialize() + if err != nil { + return err + } + + err = f.Start() + if err != nil { + return err + } + + err = f.SendStatusAndWaitUntilContinue(StatusBinlogStreamingStarted) + if err != nil { + return err + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + f.Run() + }() + + f.WaitUntilRowCopyIsComplete() + err = f.SendStatusAndWaitUntilContinue(StatusRowCopyCompleted) + if err != nil { + return err + } + + // TODO: this method should return errors rather than calling + // the error handler to panic directly. + f.FlushBinlogAndStopStreaming() + wg.Wait() + + return f.SendStatusAndWaitUntilContinue(StatusDone) +} + +func NewStandardConfig() (*ghostferry.Config, error) { + config := &ghostferry.Config{ + Source: ghostferry.DatabaseConfig{ + Host: "127.0.0.1", + Port: uint16(29291), + User: "root", + Pass: "", + Collation: "utf8mb4_unicode_ci", + Params: map[string]string{ + "charset": "utf8mb4", + }, + }, + + Target: ghostferry.DatabaseConfig{ + Host: "127.0.0.1", + Port: uint16(29292), + User: "root", + Pass: "", + Collation: "utf8mb4_unicode_ci", + Params: map[string]string{ + "charset": "utf8mb4", + }, + }, + + AutomaticCutover: true, + TableFilter: &testhelpers.TestTableFilter{ + DbsFunc: testhelpers.DbApplicabilityFilter([]string{"gftest"}), + TablesFunc: nil, + }, + + DumpStateToStdoutOnSignal: true, + } + + resumeStateJSON, err := ioutil.ReadAll(os.Stdin) + if err != nil { + return nil, err + } + + if len(resumeStateJSON) > 0 { + config.StateToResumeFrom = &ghostferry.SerializableState{} + err = json.Unmarshal(resumeStateJSON, config.StateToResumeFrom) + if err != nil { + return nil, err + } + } + + return config, config.ValidateConfig() +} + +func main() { + logrus.SetFormatter(&logrus.JSONFormatter{}) + logrus.SetLevel(logrus.DebugLevel) + + config, err := NewStandardConfig() + if err != nil { + panic(err) + } + + f := &IntegrationFerry{ + Ferry: &ghostferry.Ferry{ + Config: config, + }, + } + + err = f.Main() + if err != nil { + panic(err) + } +} diff --git a/test/test_helper.rb b/test/test_helper.rb new file mode 100644 index 00000000..94c3439b --- /dev/null +++ b/test/test_helper.rb @@ -0,0 +1,109 @@ +require "logger" +require "minitest" +require "minitest/autorun" +require "minitest/hooks/test" + +GO_CODE_PATH = File.join(File.absolute_path(File.dirname(__FILE__)), "lib", "go") + +require "db_helper" +require "ghostferry_helper" +require "data_writer_helper" + +Minitest.after_run do + GhostferryHelper.remove_all_binaries +end + +class GhostferryTestCase < Minitest::Test + include Minitest::Hooks + include GhostferryHelper + include DbHelper + include DataWriterHelper + + MINIMAL_GHOSTFERRY = "integrationferry.go" + + def new_ghostferry(filename) + # Transform path to something ruby understands + path = File.join(GO_CODE_PATH, filename) + g = Ghostferry.new(path, logger: @logger) + @ghostferry_instances << g + g + end + + def new_source_datawriter(*args) + dw = DataWriter.new(source_db_config, *args, logger: @logger) + @datawriter_instances << dw + dw + end + + ############## + # Test Hooks # + ############## + + def before_all + @logger = Logger.new(STDOUT) + if ENV["DEBUG"] == "1" + @logger.level = Logger::DEBUG + else + @logger.level = Logger::INFO + end + + initialize_db_connections + end + + def before_setup + reset_data + + # Any ghostferry instances created via the new_ghostferry method will be + # pushed to here, which allows the test to kill the process after each test + # should there be a hung process/failed test/errored test. + @ghostferry_instances = [] + + # Same thing with DataWriter as above + @datawriter_instances = [] + end + + def after_teardown + @ghostferry_instances.each do |ghostferry| + ghostferry.kill + end + + @datawriter_instances.each do |datawriter| + datawriter.stop_and_join + end + end + + ##################### + # Assertion Helpers # + ##################### + + def assert_test_table_is_identical + source, target = source_and_target_table_metrics + + assert source[DEFAULT_FULL_TABLE_NAME][:row_count] > 0 + assert target[DEFAULT_FULL_TABLE_NAME][:row_count] > 0 + + assert_equal( + source[DEFAULT_FULL_TABLE_NAME][:row_count], + target[DEFAULT_FULL_TABLE_NAME][:row_count], + ) + + assert_equal( + source[DEFAULT_FULL_TABLE_NAME][:checksum], + target[DEFAULT_FULL_TABLE_NAME][:checksum], + ) + end + + # Use this method to assert the validity of the structure of the dumped + # state. + # + # To actually assert the validity of the data within the dumped state, you + # have to do it manually. + def assert_basic_fields_exist_in_dumped_state(dumped_state) + refute dumped_state.nil? + refute dumped_state["GhostferryVersion"].nil? + refute dumped_state["LastKnownTableSchemaCache"].nil? + refute dumped_state["LastSuccessfulPrimaryKeys"].nil? + refute dumped_state["CompletedTables"].nil? + refute dumped_state["LastWrittenBinlogPosition"].nil? + end +end