From ea7428bc03d2ba38eca3d645e1d1969940ce935d Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 15 Jan 2019 10:06:06 -0500 Subject: [PATCH 1/6] Moved Go test files from test/*.go to test/go/*.go Makes room to make the tests more like ruby tests, which will be employed heavily. --- test/{ => go}/binlog_streamer_test.go | 0 test/{ => go}/config_test.go | 0 test/{ => go}/control_server_test.go | 0 test/{ => go}/data_iterator_test.go | 0 test/{ => go}/dml_events_test.go | 0 test/{ => go}/error_handler_test.go | 0 test/{ => go}/ferry_test.go | 0 test/{ => go}/iterative_verifier_collation_test.go | 0 test/{ => go}/iterative_verifier_integration_test.go | 0 test/{ => go}/iterative_verifier_test.go | 0 test/{ => go}/lag_throttler_test.go | 0 test/{ => go}/metrics_test.go | 0 test/{ => go}/race_conditions_integration_test.go | 0 test/{ => go}/replication_config_test.go | 0 test/{ => go}/row_batch_test.go | 0 test/{ => go}/status_test.go | 0 test/{ => go}/table_schema_cache_test.go | 0 test/{ => go}/throttler_test.go | 0 test/{ => go}/trivial_integration_test.go | 0 test/{ => go}/types_integration_test.go | 0 test/{ => go}/utils_test.go | 0 test/{ => go}/verifier_test.go | 0 test/{ => go}/wait_until_replica_is_caught_up_to_master_test.go | 0 23 files changed, 0 insertions(+), 0 deletions(-) rename test/{ => go}/binlog_streamer_test.go (100%) rename test/{ => go}/config_test.go (100%) rename test/{ => go}/control_server_test.go (100%) rename test/{ => go}/data_iterator_test.go (100%) rename test/{ => go}/dml_events_test.go (100%) rename test/{ => go}/error_handler_test.go (100%) rename test/{ => go}/ferry_test.go (100%) rename test/{ => go}/iterative_verifier_collation_test.go (100%) rename test/{ => go}/iterative_verifier_integration_test.go (100%) rename test/{ => go}/iterative_verifier_test.go (100%) rename test/{ => go}/lag_throttler_test.go (100%) rename test/{ => go}/metrics_test.go (100%) rename test/{ => go}/race_conditions_integration_test.go (100%) rename test/{ => go}/replication_config_test.go (100%) rename test/{ => go}/row_batch_test.go (100%) rename test/{ => go}/status_test.go (100%) rename test/{ => go}/table_schema_cache_test.go (100%) rename test/{ => go}/throttler_test.go (100%) rename test/{ => go}/trivial_integration_test.go (100%) rename test/{ => go}/types_integration_test.go (100%) rename test/{ => go}/utils_test.go (100%) rename test/{ => go}/verifier_test.go (100%) rename test/{ => go}/wait_until_replica_is_caught_up_to_master_test.go (100%) diff --git a/test/binlog_streamer_test.go b/test/go/binlog_streamer_test.go similarity index 100% rename from test/binlog_streamer_test.go rename to test/go/binlog_streamer_test.go diff --git a/test/config_test.go b/test/go/config_test.go similarity index 100% rename from test/config_test.go rename to test/go/config_test.go diff --git a/test/control_server_test.go b/test/go/control_server_test.go similarity index 100% rename from test/control_server_test.go rename to test/go/control_server_test.go diff --git a/test/data_iterator_test.go b/test/go/data_iterator_test.go similarity index 100% rename from test/data_iterator_test.go rename to test/go/data_iterator_test.go diff --git a/test/dml_events_test.go b/test/go/dml_events_test.go similarity index 100% rename from test/dml_events_test.go rename to test/go/dml_events_test.go diff --git a/test/error_handler_test.go b/test/go/error_handler_test.go similarity index 100% rename from test/error_handler_test.go rename to test/go/error_handler_test.go diff --git a/test/ferry_test.go b/test/go/ferry_test.go similarity index 100% rename from test/ferry_test.go rename to test/go/ferry_test.go diff --git a/test/iterative_verifier_collation_test.go b/test/go/iterative_verifier_collation_test.go similarity index 100% rename from test/iterative_verifier_collation_test.go rename to test/go/iterative_verifier_collation_test.go diff --git a/test/iterative_verifier_integration_test.go b/test/go/iterative_verifier_integration_test.go similarity index 100% rename from test/iterative_verifier_integration_test.go rename to test/go/iterative_verifier_integration_test.go diff --git a/test/iterative_verifier_test.go b/test/go/iterative_verifier_test.go similarity index 100% rename from test/iterative_verifier_test.go rename to test/go/iterative_verifier_test.go diff --git a/test/lag_throttler_test.go b/test/go/lag_throttler_test.go similarity index 100% rename from test/lag_throttler_test.go rename to test/go/lag_throttler_test.go diff --git a/test/metrics_test.go b/test/go/metrics_test.go similarity index 100% rename from test/metrics_test.go rename to test/go/metrics_test.go diff --git a/test/race_conditions_integration_test.go b/test/go/race_conditions_integration_test.go similarity index 100% rename from test/race_conditions_integration_test.go rename to test/go/race_conditions_integration_test.go diff --git a/test/replication_config_test.go b/test/go/replication_config_test.go similarity index 100% rename from test/replication_config_test.go rename to test/go/replication_config_test.go diff --git a/test/row_batch_test.go b/test/go/row_batch_test.go similarity index 100% rename from test/row_batch_test.go rename to test/go/row_batch_test.go diff --git a/test/status_test.go b/test/go/status_test.go similarity index 100% rename from test/status_test.go rename to test/go/status_test.go diff --git a/test/table_schema_cache_test.go b/test/go/table_schema_cache_test.go similarity index 100% rename from test/table_schema_cache_test.go rename to test/go/table_schema_cache_test.go diff --git a/test/throttler_test.go b/test/go/throttler_test.go similarity index 100% rename from test/throttler_test.go rename to test/go/throttler_test.go diff --git a/test/trivial_integration_test.go b/test/go/trivial_integration_test.go similarity index 100% rename from test/trivial_integration_test.go rename to test/go/trivial_integration_test.go diff --git a/test/types_integration_test.go b/test/go/types_integration_test.go similarity index 100% rename from test/types_integration_test.go rename to test/go/types_integration_test.go diff --git a/test/utils_test.go b/test/go/utils_test.go similarity index 100% rename from test/utils_test.go rename to test/go/utils_test.go diff --git a/test/verifier_test.go b/test/go/verifier_test.go similarity index 100% rename from test/verifier_test.go rename to test/go/verifier_test.go diff --git a/test/wait_until_replica_is_caught_up_to_master_test.go b/test/go/wait_until_replica_is_caught_up_to_master_test.go similarity index 100% rename from test/wait_until_replica_is_caught_up_to_master_test.go rename to test/go/wait_until_replica_is_caught_up_to_master_test.go From cbdb3a2324119874e096b90acf00a14167b7f48e Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 15 Jan 2019 10:14:46 -0500 Subject: [PATCH 2/6] Integration test running Ghostferry binary Instead of running integration tests directly in Go, we run it in Ruby by calling Ghostferry as a subprocess. The idea is that we will be able to test the interrupt/resume work more easily as well as making integration tests easier to write. In order to allow the ruby tests to inject race conditions for testing purposes, we need a way for the ruby code to pause and restart the execution of Go code at strategic locations. This is done with a local Unix socket. At locations of interests, the Go code will send some string via the Unix socket to the Ruby code. The ruby code will perform some sort of callback (such as injecting data, or locking a row) and then send a command back to the Go code via the same Unix socket, allowing it to continue executing. Note that each time we send some status to the ruby server, we do it in a new connection. This is to avoid race conditions if one shared connection was used between all the goroutines we employ inside Ghostferry. Stdout and stderr are captured and they can be asserted against/examined, more helper methods can be created later to give better structures for the stdout/stderr output such as being able to recognize a panic, or recognize particular stages of execution from within the logs. Lastly, the intention is to replace all integration tests with this framework in the future as it would likely be more robust and cause less issues such as goroutine leaks during the go test run. However, for the time being we will have both integration tests in Go and Ruby. --- Gemfile | 7 + Gemfile.lock | 20 ++ Makefile | 3 +- Rakefile | 9 + dev.yml | 2 + test/helpers/ghostferry_helper.rb | 274 ++++++++++++++++++ .../go/integrationferry/integrationferry.go | 238 +++++++++++++++ test/lib/go/minimal.go | 28 ++ 8 files changed, 580 insertions(+), 1 deletion(-) create mode 100644 Gemfile create mode 100644 Gemfile.lock create mode 100644 Rakefile create mode 100644 test/helpers/ghostferry_helper.rb create mode 100644 test/lib/go/integrationferry/integrationferry.go create mode 100644 test/lib/go/minimal.go diff --git a/Gemfile b/Gemfile new file mode 100644 index 00000000..e444bd32 --- /dev/null +++ b/Gemfile @@ -0,0 +1,7 @@ +source "https://rubygems.org" + +gem "minitest" +gem "minitest-hooks" +gem "mysql2" + +gem "rake" diff --git a/Gemfile.lock b/Gemfile.lock new file mode 100644 index 00000000..e9b0961f --- /dev/null +++ b/Gemfile.lock @@ -0,0 +1,20 @@ +GEM + remote: https://rubygems.org/ + specs: + minitest (5.11.3) + minitest-hooks (1.5.0) + minitest (> 5.3) + mysql2 (0.5.2) + rake (12.3.2) + +PLATFORMS + ruby + +DEPENDENCIES + minitest + minitest-hooks + mysql2 + rake + +BUNDLED WITH + 1.16.1 diff --git a/Makefile b/Makefile index 1100bbb3..92dc5e16 100644 --- a/Makefile +++ b/Makefile @@ -51,7 +51,8 @@ $(GOBIN): test: @go version - go test ./test ./copydb/test ./sharding/test -p 1 -v + go test ./test/go ./copydb/test ./sharding/test -p 1 -v + bundle install && bundle exec rake test clean: rm -rf build diff --git a/Rakefile b/Rakefile new file mode 100644 index 00000000..fba6a93c --- /dev/null +++ b/Rakefile @@ -0,0 +1,9 @@ +require 'rake/testtask' + +task :default => [:test] + +Rake::TestTask.new do |t| + t.test_files = FileList['test/**/*test.rb'] + t.verbose = true + t.libs << ["test", "test/helpers", "test/lib"] +end diff --git a/dev.yml b/dev.yml index e47fbda2..9bbe23b4 100644 --- a/dev.yml +++ b/dev.yml @@ -4,6 +4,8 @@ up: - homebrew: - glide - mysql + - ruby: 2.5.1 + - bundler - go: version: 1.10.3 - custom: diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb new file mode 100644 index 00000000..f28108a0 --- /dev/null +++ b/test/helpers/ghostferry_helper.rb @@ -0,0 +1,274 @@ +require "fileutils" +require "json" +require "logger" +require "open3" +require "socket" +require "thread" +require "tmpdir" + +module GhostferryHelper + GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration") + + def self.remove_all_binaries + FileUtils.remove_entry(GHOSTFERRY_TEMPDIR) if Dir.exist?(GHOSTFERRY_TEMPDIR) + end + + class GhostferryExitFailure < StandardError + end + + class Ghostferry + # Manages compiling, running, and communicating with Ghostferry. + # + # + # To use this class: + # + # ghostferry = Ghostferry.new("path/to/main.go") + # ghostferry.on_status(Ghostferry::Status::BEFORE_ROW_COPY) do + # # do custom work here, such as injecting data into the database + # end + # ghostferry.run + + # Keep these in sync with integrationferry.go + ENV_KEY_SOCKET_PATH = "GHOSTFERRY_INTEGRATION_SOCKET_PATH" + MAX_MESSAGE_SIZE = 256 + + SOCKET_PATH = ENV[ENV_KEY_SOCKET_PATH] || "/tmp/ghostferry-integration.sock" + + CONTINUE = "CONTINUE" + + module Status + # This should be in sync with integrationferry.go + READY = "READY" + BINLOG_STREAMING_STARTED = "BINLOG_STREAMING_STARTED" + ROW_COPY_COMPLETED = "ROW_COPY_COMPLETED" + DONE = "DONE" + + BEFORE_ROW_COPY = "BEFORE_ROW_COPY" + AFTER_ROW_COPY = "AFTER_ROW_COPY" + BEFORE_BINLOG_APPLY = "BEFORE_BINLOG_APPLY" + AFTER_BINLOG_APPLY = "AFTER_BINLOG_APPLY" + end + + attr_reader :stdout, :stderr, :exit_status, :pid + + def initialize(main_path, logger: nil, message_timeout: 30) + @main_path = main_path + @message_timeout = message_timeout + @logger = logger + if @logger.nil? + @logger = Logger.new(STDOUT) + @logger.level = Logger::DEBUG + end + + FileUtils.mkdir_p(GHOSTFERRY_TEMPDIR, mode: 0700) + + # full name relative to the ghostferry root dir, with / replaced with _ + # and the extension stripped. + full_path = File.absolute_path(@main_path) + full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory + binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_") + @compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name) + + @status_handlers = {} + @stop_requested = false + + @server_thread = nil + @subprocess_thread = nil + + @server = nil + @server_started_notifier = Queue.new + + @pid = 0 + @exit_status = nil + @stdout = [] + @stderr = [] + end + + def on_status(status, &block) + raise "must specify a block" unless block_given? + @status_handlers[status] ||= [] + @status_handlers[status] << block + end + + def compile_binary + return if File.exist?(@compiled_binary_path) + + @logger.info("compiling test binary to #{@compiled_binary_path}") + rc = system( + "go", "build", + "-o", @compiled_binary_path, + @main_path + ) + + raise "could not compile ghostferry" unless rc + end + + def start_server + @server_thread = Thread.new do + @logger.info("starting integration test server") + @server = UNIXServer.new(SOCKET_PATH) + @server_started_notifier.push(true) + + reads = [@server] + last_message_time = Time.now + + while (!@stop_requested && @exit_status.nil?) do + ready = IO.select(reads, nil, nil, 0.2) + + if ready.nil? + next if Time.now - last_message_time < @message_timeout + + raise "ghostferry did not report to the integration test server for the last #{@message_timeout}" + end + + last_message_time = Time.now + + # Each client should send one message, expects a message back, and + # then close the connection. + # + # This is done because there are many goroutines operating in + # parallel and sending messages over a shared connection would result + # in multiplexing issues. Using separate connections gets around this + # problem. + ready[0].each do |socket| + if socket == @server + # A new message is to be sent by a goroutine + client = @server.accept_nonblock + reads << client + elsif socket.eof? + # A message was complete + @logger.warn("client disconnected?") + socket.close + reads.delete(socket) + else + # Receiving a message + data = socket.read_nonblock(MAX_MESSAGE_SIZE) + data = data.split("\0") + @logger.debug("server received status: #{data}") + + status = data.shift + + @status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil? + begin + socket.write(CONTINUE) + rescue Errno::EPIPE + # It is possible for the status handler to kill Ghostferry. + # In such scenarios, this write may result in a broken pipe as + # the socket is closed. + # + # We rescue this and move on. + @logger.debug("can't send CONTINUE due to broken pipe") + end + + reads.delete(socket) + end + end + end + + @server.close + @logger.info("server thread stopped") + end + end + + def start_ghostferry(resuming_state = nil) + @subprocess_thread = Thread.new do + Thread.current.report_on_exception = false + + environment = { + ENV_KEY_SOCKET_PATH => SOCKET_PATH + } + + @logger.info("starting ghostferry test binary #{@compiled_binary_path}") + Open3.popen3(environment, @compiled_binary_path) do |stdin, stdout, stderr, wait_thr| + stdin.puts(resuming_state) unless resuming_state.nil? + stdin.close + + @pid = wait_thr.pid + + reads = [stdout, stderr] + until reads.empty? do + ready_reads, _, _ = IO.select(reads) + ready_reads.each do |reader| + line = reader.gets + if line.nil? + # EOF effectively + reads.delete(reader) + next + end + + line.tr!("\n", '') # remove trailing newline + + if reader == stdout + @stdout << line + @logger.debug("stdout: #{line}") + elsif reader == stderr + @stderr << line + @logger.debug("stderr: #{line}") + end + end + end + + @exit_status = wait_thr.value + @pid = 0 + end + + @logger.info("ghostferry test binary exitted: #{@exit_status}") + if @exit_status.exitstatus != 0 + raise GhostferryExitFailure, "ghostferry test binary returned non-zero status: #{@exit_status}" + end + end + end + + def wait_until_server_has_started + @server_started_notifier.pop + @logger.info("integration test server started and listening for connection") + end + + def wait_until_ghostferry_run_is_complete + # Server thread should always join first because the loop within it + # should exit if @exit_status != nil. + @server_thread.join if @server_thread + @subprocess_thread.join if @subprocess_thread + end + + def send_signal(signal) + Process.kill(signal, @pid) if @pid != 0 + end + + def kill + @stop_requested = true + send_signal("KILL") + begin + wait_until_ghostferry_run_is_complete + rescue GhostferryExitFailure + # ignore + end + + File.unlink(SOCKET_PATH) if File.exist?(SOCKET_PATH) + end + + def run(resuming_state = nil) + resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) + + compile_binary + start_server + wait_until_server_has_started + start_ghostferry(resuming_state) + wait_until_ghostferry_run_is_complete + ensure + kill + end + + # When using this method, you need to call it within the block of + # GhostferryIntegration::TestCase#with_state_cleanup to ensure that the + # integration server is shutdown properly. + def run_expecting_interrupt(resuming_state = nil) + run(resuming_state) + rescue GhostferryExitFailure + dumped_state = @stdout.join("") + JSON.parse(dumped_state) + else + raise "Ghostferry did not get interrupted" + end + end +end diff --git a/test/lib/go/integrationferry/integrationferry.go b/test/lib/go/integrationferry/integrationferry.go new file mode 100644 index 00000000..ddaf572b --- /dev/null +++ b/test/lib/go/integrationferry/integrationferry.go @@ -0,0 +1,238 @@ +package integrationferry + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net" + "os" + "strings" + "sync" + "time" + + "github.com/Shopify/ghostferry" + "github.com/Shopify/ghostferry/testhelpers" +) + +const ( + // These should be kept in sync with ghostferry.rb + socketEnvName string = "GHOSTFERRY_INTEGRATION_SOCKET_PATH" + socketTimeout time.Duration = 30 * time.Second + maxMessageSize int = 256 +) + +const ( + CommandContinue string = "CONTINUE" +) + +const ( + // These should be kept in sync with ghostferry.rb + + // Could only be sent once by the main thread + StatusReady string = "READY" + StatusBinlogStreamingStarted string = "BINLOG_STREAMING_STARTED" + StatusRowCopyCompleted string = "ROW_COPY_COMPLETED" + StatusDone string = "DONE" + + // Could be sent by multiple goroutines in parallel + StatusBeforeRowCopy string = "BEFORE_ROW_COPY" + StatusAfterRowCopy string = "AFTER_ROW_COPY" + StatusBeforeBinlogApply string = "BEFORE_BINLOG_APPLY" + StatusAfterBinlogApply string = "AFTER_BINLOG_APPLY" +) + +type IntegrationFerry struct { + *ghostferry.Ferry +} + +// ========================================= +// Code for integration server communication +// ========================================= +func (f *IntegrationFerry) connect() (net.Conn, error) { + socketAddress := os.Getenv(socketEnvName) + if socketAddress == "" { + return nil, fmt.Errorf("environment variable %s must be specified", socketEnvName) + } + + return net.DialTimeout("unix", socketAddress, socketTimeout) +} + +func (f *IntegrationFerry) send(conn net.Conn, status string, arguments ...string) error { + conn.SetDeadline(time.Now().Add(socketTimeout)) + + arguments = append([]string{status}, arguments...) + data := []byte(strings.Join(arguments, "\x00")) + + if len(data) > maxMessageSize { + return fmt.Errorf("message %v is greater than maxMessageSize %v", arguments, maxMessageSize) + } + + _, err := conn.Write(data) + return err +} + +func (f *IntegrationFerry) receive(conn net.Conn) (string, error) { + conn.SetDeadline(time.Now().Add(socketTimeout)) + + var buf [maxMessageSize]byte + + n, err := conn.Read(buf[:]) + if err != nil { + return "", err + } + + return string(buf[0:n]), nil +} + +// Sends a status string to the integration server and block until we receive +// "CONTINUE" from the server. +// +// We need to establish a new connection to the integration server for each +// message as there are multiple goroutines sending messages simultaneously. +func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, arguments ...string) error { + conn, err := f.connect() + if err != nil { + return err + } + defer conn.Close() + + err = f.send(conn, status, arguments...) + if err != nil { + return err + } + + command, err := f.receive(conn) + if err != nil { + return err + } + + if command == CommandContinue { + return nil + } + + return fmt.Errorf("unrecognized command %s from integration server", command) +} + +// Method override for Start in order to send status to the integration +// server. +func (f *IntegrationFerry) Start() error { + f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error { + return f.SendStatusAndWaitUntilContinue(StatusBeforeRowCopy, rowBatch.TableSchema().Name) + }) + + f.Ferry.BinlogStreamer.AddEventListener(func(events []ghostferry.DMLEvent) error { + return f.SendStatusAndWaitUntilContinue(StatusBeforeBinlogApply) + }) + + err := f.Ferry.Start() + if err != nil { + return err + } + + f.Ferry.DataIterator.AddBatchListener(func(rowBatch *ghostferry.RowBatch) error { + return f.SendStatusAndWaitUntilContinue(StatusAfterRowCopy, rowBatch.TableSchema().Name) + }) + + f.Ferry.BinlogStreamer.AddEventListener(func(events []ghostferry.DMLEvent) error { + return f.SendStatusAndWaitUntilContinue(StatusAfterBinlogApply) + }) + + return nil +} + +// =========================================== +// Code to handle an almost standard Ferry run +// =========================================== +func (f *IntegrationFerry) Main() error { + var err error + + err = f.SendStatusAndWaitUntilContinue(StatusReady) + if err != nil { + return err + } + + err = f.Initialize() + if err != nil { + return err + } + + err = f.Start() + if err != nil { + return err + } + + err = f.SendStatusAndWaitUntilContinue(StatusBinlogStreamingStarted) + if err != nil { + return err + } + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + defer wg.Done() + f.Run() + }() + + f.WaitUntilRowCopyIsComplete() + err = f.SendStatusAndWaitUntilContinue(StatusRowCopyCompleted) + if err != nil { + return err + } + + // TODO: this method should return errors rather than calling + // the error handler to panic directly. + f.FlushBinlogAndStopStreaming() + wg.Wait() + + return f.SendStatusAndWaitUntilContinue(StatusDone) +} + +func NewStandardConfig() (*ghostferry.Config, error) { + config := &ghostferry.Config{ + Source: ghostferry.DatabaseConfig{ + Host: "127.0.0.1", + Port: uint16(29291), + User: "root", + Pass: "", + Collation: "utf8mb4_unicode_ci", + Params: map[string]string{ + "charset": "utf8mb4", + }, + }, + + Target: ghostferry.DatabaseConfig{ + Host: "127.0.0.1", + Port: uint16(29292), + User: "root", + Pass: "", + Collation: "utf8mb4_unicode_ci", + Params: map[string]string{ + "charset": "utf8mb4", + }, + }, + + AutomaticCutover: true, + TableFilter: &testhelpers.TestTableFilter{ + DbsFunc: testhelpers.DbApplicabilityFilter([]string{"gftest"}), + TablesFunc: nil, + }, + + DumpStateToStdoutOnSignal: true, + } + + resumeStateJSON, err := ioutil.ReadAll(os.Stdin) + if err != nil { + return nil, err + } + + if len(resumeStateJSON) > 0 { + config.StateToResumeFrom = &ghostferry.SerializableState{} + err = json.Unmarshal(resumeStateJSON, config.StateToResumeFrom) + if err != nil { + return nil, err + } + } + + return config, config.ValidateConfig() +} diff --git a/test/lib/go/minimal.go b/test/lib/go/minimal.go new file mode 100644 index 00000000..3220d359 --- /dev/null +++ b/test/lib/go/minimal.go @@ -0,0 +1,28 @@ +package main + +import ( + "github.com/Shopify/ghostferry" + "github.com/Shopify/ghostferry/test/lib/go/integrationferry" + "github.com/sirupsen/logrus" +) + +func main() { + logrus.SetFormatter(&logrus.JSONFormatter{}) + logrus.SetLevel(logrus.DebugLevel) + + config, err := integrationferry.NewStandardConfig() + if err != nil { + panic(err) + } + + f := &integrationferry.IntegrationFerry{ + Ferry: &ghostferry.Ferry{ + Config: config, + }, + } + + err = f.Main() + if err != nil { + panic(err) + } +} From 5942c65352ed73ee69e3fc6c59bd7bbbd02ffde6 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 15 Jan 2019 10:16:07 -0500 Subject: [PATCH 3/6] Added some helpers to handle the database. The DataWriterHelper is a straight port from data_writer.go, which is a multithreaded data writer that writes random data to the database. The DbHelper helps connects to the database, seed it, and verify that the source matches the target so that tests don't have to implement it over and over again. --- test/helpers/data_writer_helper.rb | 132 +++++++++++++++++++++++++ test/helpers/db_helper.rb | 150 +++++++++++++++++++++++++++++ 2 files changed, 282 insertions(+) create mode 100644 test/helpers/data_writer_helper.rb create mode 100644 test/helpers/db_helper.rb diff --git a/test/helpers/data_writer_helper.rb b/test/helpers/data_writer_helper.rb new file mode 100644 index 00000000..591a0f3b --- /dev/null +++ b/test/helpers/data_writer_helper.rb @@ -0,0 +1,132 @@ +require "logger" +require "thread" + +require "db_helper" +require "ghostferry_helper" +require "mysql2" + +module DataWriterHelper + def start_datawriter_with_ghostferry(dw, gf, &on_write) + gf.on_status(GhostferryHelper::Ghostferry::Status::READY) do + dw.start(&on_write) + end + end + + def stop_datawriter_during_cutover(dw, gf) + gf.on_status(GhostferryHelper::Ghostferry::Status::ROW_COPY_COMPLETED) do + # At the start of the cutover phase, we have to set the database to + # read-only. This is done by stopping the datawriter. + dw.stop_and_join + end + end + + class DataWriter + # A threaded data writer that just hammers the database with write + # queries as much as possible. + # + # This is used essentially for random testing. + def initialize(db_config, + tables: [DbHelper::DEFAULT_FULL_TABLE_NAME], + insert_probability: 0.33, + update_probability: 0.33, + delete_probability: 0.34, + number_of_writers: 2, + logger: nil + ) + @db_config = db_config + @tables = tables + + @number_of_writers = number_of_writers + @insert_probability = [0, insert_probability] + @update_probability = [@insert_probability[1], @insert_probability[1] + update_probability] + @delete_probability = [@update_probability[1], @update_probability[1] + delete_probability] + + @threads = [] + @started = false + @stop_requested = false + + @logger = logger + if @logger.nil? + @logger = Logger.new(STDOUT) + @logger.level = Logger::DEBUG + end + end + + def start(&on_write) + raise "Cannot start DataWriter multiple times. Use a new instance instead " if @started + @started = true + @number_of_writers.times do |i| + @threads << Thread.new do + @logger.info("starting data writer thread #{i}") + + connection = Mysql2::Client.new(@db_config) + until @stop_requested do + write_data(connection, &on_write) + end + + @logger.info("stopped data writer thread #{i}") + end + end + end + + def stop_and_join + @stop_requested = true + join + end + + def join + @threads.each do |t| + t.join + end + end + + def write_data(connection, &on_write) + r = rand + + if r >= @insert_probability[0] && r < @insert_probability[1] + id = insert_data(connection) + op = "INSERT" + elsif r >= @update_probability[0] && r < @update_probability[1] + id = update_data(connection) + op = "UPDATE" + elsif r >= @delete_probability[0] && r < @delete_probability[1] + id = delete_data(connection) + op = "DELETE" + end + + @logger.debug("writing data: #{op} #{id}") + on_write.call(op, id) unless on_write.nil? + end + + def insert_data(connection) + table = @tables.sample + insert_statement = connection.prepare("INSERT INTO #{table} (id, data) VALUES (?, ?)") + insert_statement.execute(nil, DbHelper.rand_data) + connection.last_id + end + + def update_data(connection) + table = @tables.sample + id = random_real_id(connection, table) + update_statement = connection.prepare("UPDATE #{table} SET data = ? WHERE id >= ? LIMIT 1") + update_statement.execute(DbHelper.rand_data, id) + id + end + + def delete_data(connection) + table = @tables.sample + id = random_real_id(connection, table) + delete_statement = connection.prepare("DELETE FROM #{table} WHERE id >= ? LIMIT 1") + delete_statement.execute(id) + id + end + + def random_real_id(connection, table) + # This query is slow for large datasets. + # For testing purposes, this should be okay. + result = connection.query("SELECT id FROM #{table} ORDER BY RAND() LIMIT 1") + raise "No rows in the database?" if result.first.nil? + result.first["id"] + end + end +end diff --git a/test/helpers/db_helper.rb b/test/helpers/db_helper.rb new file mode 100644 index 00000000..a8c8e61d --- /dev/null +++ b/test/helpers/db_helper.rb @@ -0,0 +1,150 @@ +require "logger" +require "mysql2" + +module DbHelper + ALPHANUMERICS = ("0".."9").to_a + ("a".."z").to_a + ("A".."Z").to_a + DB_PORTS = {source: 29291, target: 29292} + + DEFAULT_DB = "gftest" + DEFAULT_TABLE = "test_table_1" + + def self.full_table_name(db, table) + "`#{db}`.`#{table}`" + end + + def self.rand_data(length: 32) + ALPHANUMERICS.sample(length).join("") + "👻⛴️" + end + + DEFAULT_FULL_TABLE_NAME = full_table_name(DEFAULT_DB, DEFAULT_TABLE) + + def full_table_name(db, table) + DbHelper.full_table_name(db, table) + end + + def rand_data(length: 32) + DbHelper.rand_data(length: length) + end + + def default_db_config(port:) + { + host: "127.0.0.1", + port: port, + username: "root", + password: "", + encoding: "utf8mb4", + collation: "utf8mb4_unicode_ci", + } + end + + def transaction(connection) + raise ArgumentError, "must pass a block" if !block_given? + + begin + connection.query("BEGIN") + yield + rescue + connection.query("ROLLBACK") + raise + else + connection.query("COMMIT") + end + end + + def initialize_db_connections + @connections = {} + DB_PORTS.each do |name, port| + @connections[name] = Mysql2::Client.new(default_db_config(port: port)) + end + end + + def source_db + @connections[:source] + end + + def target_db + @connections[:target] + end + + def source_db_config + default_db_config(port: DB_PORTS[:source]) + end + + def target_db_config + default_db_config(port: DB_PORTS[:target]) + end + + # Database Seeding Methods + ########################## + # Each test case can choose what kind of database it wants to setup by + # calling one of these methods. + + def reset_data + @connections.each do |_, connection| + connection.query("DROP DATABASE IF EXISTS `#{DEFAULT_DB}`") + end + end + + def seed_random_data(connection, database_name: DEFAULT_DB, table_name: DEFAULT_TABLE, number_of_rows: 1111) + dbtable = full_table_name(database_name, table_name) + + connection.query("CREATE DATABASE IF NOT EXISTS #{database_name}") + connection.query("CREATE TABLE IF NOT EXISTS #{dbtable} (id bigint(20) not null auto_increment, data TEXT, primary key(id))") + + transaction(connection) do + insert_statement = connection.prepare("INSERT INTO #{dbtable} (id, data) VALUES (?, ?)") + + number_of_rows.times do + insert_statement.execute(nil, rand_data) + end + end + end + + def seed_simple_database_with_single_table + # Setup the source database with data. + max_id = 1111 + seed_random_data(source_db, number_of_rows: max_id) + + # Create some holes in the data. + delete_statement = source_db.prepare("DELETE FROM #{full_table_name(DEFAULT_DB, DEFAULT_TABLE)} WHERE id = ?") + 140.times do + delete_statement.execute(Random.rand(max_id) + 1) + end + + # Setup the target database with no data but the correct schema. + seed_random_data(target_db, number_of_rows: 0) + end + + # Get some overall metrics like CHECKSUM, row count, sample row from tables. + # Generally used for test validation. + def source_and_target_table_metrics(tables: [DEFAULT_FULL_TABLE_NAME]) + source_metrics = {} + target_metrics = {} + + tables.each do |table| + source_metrics[table] = table_metric(source_db, table) + target_metrics[table] = table_metric(target_db, table, sample_id: source_metrics[table][:sample_row]["id"]) + end + + [source_metrics, target_metrics] + end + + def table_metric(conn, table, sample_id: nil) + metrics = {} + result = conn.query("CHECKSUM TABLE #{table}") + metrics[:checksum] = result.first["Checksum"] + + result = conn.query("SELECT COUNT(*) AS cnt FROM #{table}") + metrics[:row_count] = result.first["cnt"] + + if sample_id.nil? + result = conn.query("SELECT * FROM #{table} ORDER BY RAND() LIMIT 1") + metrics[:sample_row] = result.first + else + result = conn.query("SELECT * FROM #{table} WHERE id = #{sample_id} LIMIT 1") + metrics[:sample_row] = result.first + end + + metrics + end +end From f1465a7195b975e4ec7b021618b68febeed70261 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 15 Jan 2019 10:18:12 -0500 Subject: [PATCH 4/6] Added some integration tests Added some trivial integration tests and some interrupt/resume tests. Note the interrupt and resume works on master as of this commit without reconciler because the schema didn't change. --- test/integration/interrupt_resume_test.rb | 63 +++++++++++++ test/integration/trivial_test.rb | 25 +++++ test/test_helper.rb | 109 ++++++++++++++++++++++ 3 files changed, 197 insertions(+) create mode 100644 test/integration/interrupt_resume_test.rb create mode 100644 test/integration/trivial_test.rb create mode 100644 test/test_helper.rb diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb new file mode 100644 index 00000000..2156912d --- /dev/null +++ b/test/integration/interrupt_resume_test.rb @@ -0,0 +1,63 @@ +require "test_helper" + +require "json" + +class InterruptResumeTest < GhostferryTestCase + def test_interrupt_resume_with_writes_to_source + seed_simple_database_with_single_table + + # Start a ghostferry run expecting it to be interrupted. + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + + batches_written = 0 + ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + batches_written += 1 + if batches_written >= 2 + ghostferry.send_signal("TERM") + end + end + + dumped_state = ghostferry.run_expecting_interrupt + assert_basic_fields_exist_in_dumped_state(dumped_state) + + # Resume Ghostferry with dumped state + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + # The datawriter is still writing to the database since earlier, so we need + # to stop it during cutover. + stop_datawriter_during_cutover(datawriter, ghostferry) + + ghostferry.run(dumped_state) + + assert_test_table_is_identical + end + + def test_interrupt_resume_when_table_has_completed + seed_simple_database_with_single_table + + # Start a run of Ghostferry expecting to be interrupted + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + stop_datawriter_during_cutover(datawriter, ghostferry) + + ghostferry.on_status(Ghostferry::Status::ROW_COPY_COMPLETED) do + ghostferry.send_signal("TERM") + end + + dumped_state = ghostferry.run_expecting_interrupt + assert_basic_fields_exist_in_dumped_state(dumped_state) + + # Resume ghostferry from interrupted state + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + stop_datawriter_during_cutover(datawriter, ghostferry) + ghostferry.run(dumped_state) + end +end diff --git a/test/integration/trivial_test.rb b/test/integration/trivial_test.rb new file mode 100644 index 00000000..009edd4b --- /dev/null +++ b/test/integration/trivial_test.rb @@ -0,0 +1,25 @@ +require "test_helper" + +class TrivialIntegrationTests < GhostferryTestCase + def test_copy_data_without_any_writes_to_source + seed_simple_database_with_single_table + + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + ghostferry.run + + assert_test_table_is_identical + end + + def test_copy_data_with_writes_to_source + seed_simple_database_with_single_table + + datawriter = new_source_datawriter + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + start_datawriter_with_ghostferry(datawriter, ghostferry) + stop_datawriter_during_cutover(datawriter, ghostferry) + + ghostferry.run + assert_test_table_is_identical + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb new file mode 100644 index 00000000..409979ba --- /dev/null +++ b/test/test_helper.rb @@ -0,0 +1,109 @@ +require "logger" +require "minitest" +require "minitest/autorun" +require "minitest/hooks/test" + +GO_CODE_PATH = File.join(File.absolute_path(File.dirname(__FILE__)), "lib", "go") + +require "db_helper" +require "ghostferry_helper" +require "data_writer_helper" + +Minitest.after_run do + GhostferryHelper.remove_all_binaries +end + +class GhostferryTestCase < Minitest::Test + include Minitest::Hooks + include GhostferryHelper + include DbHelper + include DataWriterHelper + + MINIMAL_GHOSTFERRY = "minimal.go" + + def new_ghostferry(filename) + # Transform path to something ruby understands + path = File.join(GO_CODE_PATH, filename) + g = Ghostferry.new(path, logger: @logger) + @ghostferry_instances << g + g + end + + def new_source_datawriter(*args) + dw = DataWriter.new(source_db_config, *args, logger: @logger) + @datawriter_instances << dw + dw + end + + ############## + # Test Hooks # + ############## + + def before_all + @logger = Logger.new(STDOUT) + if ENV["DEBUG"] == "1" + @logger.level = Logger::DEBUG + else + @logger.level = Logger::INFO + end + + initialize_db_connections + end + + def before_setup + reset_data + + # Any ghostferry instances created via the new_ghostferry method will be + # pushed to here, which allows the test to kill the process after each test + # should there be a hung process/failed test/errored test. + @ghostferry_instances = [] + + # Same thing with DataWriter as above + @datawriter_instances = [] + end + + def after_teardown + @ghostferry_instances.each do |ghostferry| + ghostferry.kill + end + + @datawriter_instances.each do |datawriter| + datawriter.stop_and_join + end + end + + ##################### + # Assertion Helpers # + ##################### + + def assert_test_table_is_identical + source, target = source_and_target_table_metrics + + assert source[DEFAULT_FULL_TABLE_NAME][:row_count] > 0 + assert target[DEFAULT_FULL_TABLE_NAME][:row_count] > 0 + + assert_equal( + source[DEFAULT_FULL_TABLE_NAME][:checksum], + target[DEFAULT_FULL_TABLE_NAME][:checksum], + ) + + assert_equal( + source[DEFAULT_FULL_TABLE_NAME][:sample_row], + target[DEFAULT_FULL_TABLE_NAME][:sample_row], + ) + end + + # Use this method to assert the validity of the structure of the dumped + # state. + # + # To actually assert the validity of the data within the dumped state, you + # have to do it manually. + def assert_basic_fields_exist_in_dumped_state(dumped_state) + refute dumped_state.nil? + refute dumped_state["GhostferryVersion"].nil? + refute dumped_state["LastKnownTableSchemaCache"].nil? + refute dumped_state["LastSuccessfulPrimaryKeys"].nil? + refute dumped_state["CompletedTables"].nil? + refute dumped_state["LastWrittenBinlogPosition"].nil? + end +end From bd606751dd1420cf37d656703c41faace2087099 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 15 Jan 2019 14:45:27 -0500 Subject: [PATCH 5/6] Switched to HTTP callbacks instead of socket --- test/helpers/data_writer_helper.rb | 2 +- test/helpers/ghostferry_helper.rb | 155 +++++++++--------- .../go/integrationferry/integrationferry.go | 76 ++------- 3 files changed, 94 insertions(+), 139 deletions(-) diff --git a/test/helpers/data_writer_helper.rb b/test/helpers/data_writer_helper.rb index 591a0f3b..41341195 100644 --- a/test/helpers/data_writer_helper.rb +++ b/test/helpers/data_writer_helper.rb @@ -30,7 +30,7 @@ def initialize(db_config, insert_probability: 0.33, update_probability: 0.33, delete_probability: 0.34, - number_of_writers: 2, + number_of_writers: 1, logger: nil ) @db_config = db_config diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index f28108a0..01506117 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -2,9 +2,10 @@ require "json" require "logger" require "open3" -require "socket" require "thread" require "tmpdir" +require "webrick" +require "cgi" module GhostferryHelper GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration") @@ -29,13 +30,9 @@ class Ghostferry # ghostferry.run # Keep these in sync with integrationferry.go - ENV_KEY_SOCKET_PATH = "GHOSTFERRY_INTEGRATION_SOCKET_PATH" + ENV_KEY_PORT = "GHOSTFERRY_INTEGRATION_PORT" MAX_MESSAGE_SIZE = 256 - SOCKET_PATH = ENV[ENV_KEY_SOCKET_PATH] || "/tmp/ghostferry-integration.sock" - - CONTINUE = "CONTINUE" - module Status # This should be in sync with integrationferry.go READY = "READY" @@ -51,7 +48,7 @@ module Status attr_reader :stdout, :stderr, :exit_status, :pid - def initialize(main_path, logger: nil, message_timeout: 30) + def initialize(main_path, logger: nil, message_timeout: 30, port: 39393) @main_path = main_path @message_timeout = message_timeout @logger = logger @@ -70,13 +67,14 @@ def initialize(main_path, logger: nil, message_timeout: 30) @compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name) @status_handlers = {} - @stop_requested = false @server_thread = nil + @server_watchdog_thread = nil @subprocess_thread = nil @server = nil - @server_started_notifier = Queue.new + @server_last_error = nil + @server_port = port @pid = 0 @exit_status = nil @@ -104,78 +102,62 @@ def compile_binary end def start_server - @server_thread = Thread.new do - @logger.info("starting integration test server") - @server = UNIXServer.new(SOCKET_PATH) - @server_started_notifier.push(true) + @server_last_error = nil + + @last_message_time = Time.now + @server = WEBrick::HTTPServer.new( + BindAddress: "127.0.0.1", + Port: @server_port, + Logger: @logger, + AccessLog: [], + ) - reads = [@server] - last_message_time = Time.now + @server.mount_proc "/" do |req, resp| + begin + unless req.body + @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send form data") + resp.status = 400 + @server.shutdown + end - while (!@stop_requested && @exit_status.nil?) do - ready = IO.select(reads, nil, nil, 0.2) + query = CGI::parse(req.body) - if ready.nil? - next if Time.now - last_message_time < @message_timeout + status = query["status"] + data = query["data"] - raise "ghostferry did not report to the integration test server for the last #{@message_timeout}" + unless status + @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status") + resp.status = 400 + @server.shutdown end - last_message_time = Time.now - - # Each client should send one message, expects a message back, and - # then close the connection. - # - # This is done because there are many goroutines operating in - # parallel and sending messages over a shared connection would result - # in multiplexing issues. Using separate connections gets around this - # problem. - ready[0].each do |socket| - if socket == @server - # A new message is to be sent by a goroutine - client = @server.accept_nonblock - reads << client - elsif socket.eof? - # A message was complete - @logger.warn("client disconnected?") - socket.close - reads.delete(socket) - else - # Receiving a message - data = socket.read_nonblock(MAX_MESSAGE_SIZE) - data = data.split("\0") - @logger.debug("server received status: #{data}") - - status = data.shift - - @status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil? - begin - socket.write(CONTINUE) - rescue Errno::EPIPE - # It is possible for the status handler to kill Ghostferry. - # In such scenarios, this write may result in a broken pipe as - # the socket is closed. - # - # We rescue this and move on. - @logger.debug("can't send CONTINUE due to broken pipe") - end + status = status.first - reads.delete(socket) - end - end + @last_message_time = Time.now + @status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil? + rescue StandardError => e + # errors are not reported from WEBrick but the server should fail early + # as this indicates there is likely a programming error. + @server_last_error = e + @server.shutdown end + end - @server.close + @server_thread = Thread.new do + @logger.info("starting server thread") + @server.start @logger.info("server thread stopped") - end + end end def start_ghostferry(resuming_state = nil) @subprocess_thread = Thread.new do + # No need to spam the logs with Ghostferry interrupted exceptions if + # Ghostferry is supposed to be interrupted. Thread.current.report_on_exception = false environment = { - ENV_KEY_SOCKET_PATH => SOCKET_PATH + ENV_KEY_PORT => @server_port.to_s } @logger.info("starting ghostferry test binary #{@compiled_binary_path}") @@ -219,16 +201,23 @@ def start_ghostferry(resuming_state = nil) end end - def wait_until_server_has_started - @server_started_notifier.pop - @logger.info("integration test server started and listening for connection") - end + def start_server_watchdog + # If the subprocess hangs or exits abnormally due to a bad implementation + # (panic/other unexpected errors) we need to make sure to terminate the + # HTTP server to free up the port. + @server_watchdog_thread = Thread.new do + while @subprocess_thread.alive? do + if Time.now - @last_message_time > @message_timeout + @server.shutdown + raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s" + end - def wait_until_ghostferry_run_is_complete - # Server thread should always join first because the loop within it - # should exit if @exit_status != nil. - @server_thread.join if @server_thread - @subprocess_thread.join if @subprocess_thread + sleep 1 + end + + @server.shutdown + @logger.info("server watchdog thread stopped") + end end def send_signal(signal) @@ -236,15 +225,19 @@ def send_signal(signal) end def kill - @stop_requested = true + @server.shutdown unless @server.nil? send_signal("KILL") + + # Need to ensure the server shutdown before returning so the port gets + # freed and can be reused. + @server_thread.join if @server_thread + @server_watchdog_thread.join if @server_watchdog_thread + begin - wait_until_ghostferry_run_is_complete + @subprocess_thread.join if @subprocess_thread rescue GhostferryExitFailure # ignore end - - File.unlink(SOCKET_PATH) if File.exist?(SOCKET_PATH) end def run(resuming_state = nil) @@ -252,11 +245,15 @@ def run(resuming_state = nil) compile_binary start_server - wait_until_server_has_started start_ghostferry(resuming_state) - wait_until_ghostferry_run_is_complete + start_server_watchdog + + @subprocess_thread.join + @server_watchdog_thread.join + @server_thread.join ensure kill + raise @server_last_error unless @server_last_error.nil? end # When using this method, you need to call it within the block of diff --git a/test/lib/go/integrationferry/integrationferry.go b/test/lib/go/integrationferry/integrationferry.go index ddaf572b..94211102 100644 --- a/test/lib/go/integrationferry/integrationferry.go +++ b/test/lib/go/integrationferry/integrationferry.go @@ -4,9 +4,9 @@ import ( "encoding/json" "fmt" "io/ioutil" - "net" + "net/http" + "net/url" "os" - "strings" "sync" "time" @@ -16,15 +16,11 @@ import ( const ( // These should be kept in sync with ghostferry.rb - socketEnvName string = "GHOSTFERRY_INTEGRATION_SOCKET_PATH" - socketTimeout time.Duration = 30 * time.Second + portEnvName string = "GHOSTFERRY_INTEGRATION_PORT" + timeout time.Duration = 30 * time.Second maxMessageSize int = 256 ) -const ( - CommandContinue string = "CONTINUE" -) - const ( // These should be kept in sync with ghostferry.rb @@ -48,69 +44,31 @@ type IntegrationFerry struct { // ========================================= // Code for integration server communication // ========================================= -func (f *IntegrationFerry) connect() (net.Conn, error) { - socketAddress := os.Getenv(socketEnvName) - if socketAddress == "" { - return nil, fmt.Errorf("environment variable %s must be specified", socketEnvName) - } - - return net.DialTimeout("unix", socketAddress, socketTimeout) -} - -func (f *IntegrationFerry) send(conn net.Conn, status string, arguments ...string) error { - conn.SetDeadline(time.Now().Add(socketTimeout)) - - arguments = append([]string{status}, arguments...) - data := []byte(strings.Join(arguments, "\x00")) - - if len(data) > maxMessageSize { - return fmt.Errorf("message %v is greater than maxMessageSize %v", arguments, maxMessageSize) - } - - _, err := conn.Write(data) - return err -} -func (f *IntegrationFerry) receive(conn net.Conn) (string, error) { - conn.SetDeadline(time.Now().Add(socketTimeout)) - - var buf [maxMessageSize]byte - - n, err := conn.Read(buf[:]) - if err != nil { - return "", err +func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, data ...string) error { + integrationPort := os.Getenv(portEnvName) + if integrationPort == "" { + return fmt.Errorf("environment variable %s must be specified", portEnvName) } - return string(buf[0:n]), nil -} - -// Sends a status string to the integration server and block until we receive -// "CONTINUE" from the server. -// -// We need to establish a new connection to the integration server for each -// message as there are multiple goroutines sending messages simultaneously. -func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, arguments ...string) error { - conn, err := f.connect() - if err != nil { - return err + client := &http.Client{ + Timeout: timeout, } - defer conn.Close() - err = f.send(conn, status, arguments...) - if err != nil { - return err - } + resp, err := client.PostForm(fmt.Sprintf("http://localhost:%s", integrationPort), url.Values{ + "status": []string{status}, + "data": data, + }) - command, err := f.receive(conn) if err != nil { return err } - if command == CommandContinue { - return nil + if resp.StatusCode != 200 { + return fmt.Errorf("server returned invalid status: %d", resp.StatusCode) } - return fmt.Errorf("unrecognized command %s from integration server", command) + return nil } // Method override for Start in order to send status to the integration From b8fb91a8497626e8d626a5d03a5d2f6c5e7c5d8c Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Mon, 21 Jan 2019 11:19:33 -0500 Subject: [PATCH 6/6] Updated for comments --- test/helpers/ghostferry_helper.rb | 99 +++++++++++-------- test/integration/interrupt_resume_test.rb | 30 +++++- .../integrationferry.go | 29 +++++- test/lib/go/minimal.go | 28 ------ test/test_helper.rb | 10 +- 5 files changed, 113 insertions(+), 83 deletions(-) rename test/lib/go/{integrationferry => }/integrationferry.go (90%) delete mode 100644 test/lib/go/minimal.go diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index 01506117..6d78f0bf 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -31,7 +31,6 @@ class Ghostferry # Keep these in sync with integrationferry.go ENV_KEY_PORT = "GHOSTFERRY_INTEGRATION_PORT" - MAX_MESSAGE_SIZE = 256 module Status # This should be in sync with integrationferry.go @@ -49,22 +48,14 @@ module Status attr_reader :stdout, :stderr, :exit_status, :pid def initialize(main_path, logger: nil, message_timeout: 30, port: 39393) - @main_path = main_path - @message_timeout = message_timeout @logger = logger if @logger.nil? @logger = Logger.new(STDOUT) @logger.level = Logger::DEBUG end - FileUtils.mkdir_p(GHOSTFERRY_TEMPDIR, mode: 0700) - - # full name relative to the ghostferry root dir, with / replaced with _ - # and the extension stripped. - full_path = File.absolute_path(@main_path) - full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory - binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_") - @compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name) + @main_path = main_path + @message_timeout = message_timeout @status_handlers = {} @@ -80,14 +71,54 @@ def initialize(main_path, logger: nil, message_timeout: 30, port: 39393) @exit_status = nil @stdout = [] @stderr = [] + + # Setup the directory to the compiled binary under the system temporary + # directory. + FileUtils.mkdir_p(GHOSTFERRY_TEMPDIR, mode: 0700) + + # To guarentee that the compiled binary will have an unique name, we use + # the full path of the file relative to the ghostferry root directory as + # the binary name. In order to avoid having / in the binary name all / in + # the full path is replaced with _. The .go extension is also stripped to + # avoid confusion. + full_path = File.absolute_path(@main_path) + full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory + binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_") + @compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name) end - def on_status(status, &block) - raise "must specify a block" unless block_given? - @status_handlers[status] ||= [] - @status_handlers[status] << block + # The main method to call to run a Ghostferry subprocess. + def run(resuming_state = nil) + resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) + + compile_binary + start_server + start_ghostferry(resuming_state) + start_server_watchdog + + @subprocess_thread.join + @server_watchdog_thread.join + @server_thread.join + ensure + kill + raise @server_last_error unless @server_last_error.nil? end + # When using this method, you need to ensure that the datawriter has been + # stopped properly (if you're using stop_datawriter_during_cutover). + def run_expecting_interrupt(resuming_state = nil) + run(resuming_state) + rescue GhostferryExitFailure + dumped_state = @stdout.join("") + JSON.parse(dumped_state) + else + raise "Ghostferry did not get interrupted" + end + + ###################################################### + # Methods representing the different stages of `run` # + ###################################################### + def compile_binary return if File.exist?(@compiled_binary_path) @@ -220,6 +251,16 @@ def start_server_watchdog end end + ################### + # Utility methods # + ################### + + def on_status(status, &block) + raise "must specify a block" unless block_given? + @status_handlers[status] ||= [] + @status_handlers[status] << block + end + def send_signal(signal) Process.kill(signal, @pid) if @pid != 0 end @@ -239,33 +280,5 @@ def kill # ignore end end - - def run(resuming_state = nil) - resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash) - - compile_binary - start_server - start_ghostferry(resuming_state) - start_server_watchdog - - @subprocess_thread.join - @server_watchdog_thread.join - @server_thread.join - ensure - kill - raise @server_last_error unless @server_last_error.nil? - end - - # When using this method, you need to call it within the block of - # GhostferryIntegration::TestCase#with_state_cleanup to ensure that the - # integration server is shutdown properly. - def run_expecting_interrupt(resuming_state = nil) - run(resuming_state) - rescue GhostferryExitFailure - dumped_state = @stdout.join("") - JSON.parse(dumped_state) - else - raise "Ghostferry did not get interrupted" - end end end diff --git a/test/integration/interrupt_resume_test.rb b/test/integration/interrupt_resume_test.rb index 2156912d..894766a0 100644 --- a/test/integration/interrupt_resume_test.rb +++ b/test/integration/interrupt_resume_test.rb @@ -3,9 +3,32 @@ require "json" class InterruptResumeTest < GhostferryTestCase - def test_interrupt_resume_with_writes_to_source + def setup seed_simple_database_with_single_table + end + + def test_interrupt_resume_without_writes_to_source_to_check_target_state_when_interrupted + ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) + + # Writes one batch + ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do + ghostferry.send_signal("TERM") + end + + dumped_state = ghostferry.run_expecting_interrupt + assert_basic_fields_exist_in_dumped_state(dumped_state) + + result = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}") + count = result.first["cnt"] + assert_equal 200, count + result = target_db.query("SELECT MAX(id) AS max_id FROM #{DEFAULT_FULL_TABLE_NAME}") + last_successful_id = result.first["max_id"] + assert last_successful_id > 0 + assert_equal last_successful_id, dumped_state["LastSuccessfulPrimaryKeys"]["#{DEFAULT_DB}.#{DEFAULT_TABLE}"] + end + + def test_interrupt_resume_with_writes_to_source # Start a ghostferry run expecting it to be interrupted. datawriter = new_source_datawriter ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) @@ -36,8 +59,6 @@ def test_interrupt_resume_with_writes_to_source end def test_interrupt_resume_when_table_has_completed - seed_simple_database_with_single_table - # Start a run of Ghostferry expecting to be interrupted datawriter = new_source_datawriter ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY) @@ -58,6 +79,9 @@ def test_interrupt_resume_when_table_has_completed start_datawriter_with_ghostferry(datawriter, ghostferry) stop_datawriter_during_cutover(datawriter, ghostferry) + ghostferry.run(dumped_state) + + assert_test_table_is_identical end end diff --git a/test/lib/go/integrationferry/integrationferry.go b/test/lib/go/integrationferry.go similarity index 90% rename from test/lib/go/integrationferry/integrationferry.go rename to test/lib/go/integrationferry.go index 94211102..055a852a 100644 --- a/test/lib/go/integrationferry/integrationferry.go +++ b/test/lib/go/integrationferry.go @@ -1,4 +1,4 @@ -package integrationferry +package main import ( "encoding/json" @@ -12,13 +12,13 @@ import ( "github.com/Shopify/ghostferry" "github.com/Shopify/ghostferry/testhelpers" + "github.com/sirupsen/logrus" ) const ( // These should be kept in sync with ghostferry.rb - portEnvName string = "GHOSTFERRY_INTEGRATION_PORT" - timeout time.Duration = 30 * time.Second - maxMessageSize int = 256 + portEnvName string = "GHOSTFERRY_INTEGRATION_PORT" + timeout time.Duration = 30 * time.Second ) const ( @@ -194,3 +194,24 @@ func NewStandardConfig() (*ghostferry.Config, error) { return config, config.ValidateConfig() } + +func main() { + logrus.SetFormatter(&logrus.JSONFormatter{}) + logrus.SetLevel(logrus.DebugLevel) + + config, err := NewStandardConfig() + if err != nil { + panic(err) + } + + f := &IntegrationFerry{ + Ferry: &ghostferry.Ferry{ + Config: config, + }, + } + + err = f.Main() + if err != nil { + panic(err) + } +} diff --git a/test/lib/go/minimal.go b/test/lib/go/minimal.go deleted file mode 100644 index 3220d359..00000000 --- a/test/lib/go/minimal.go +++ /dev/null @@ -1,28 +0,0 @@ -package main - -import ( - "github.com/Shopify/ghostferry" - "github.com/Shopify/ghostferry/test/lib/go/integrationferry" - "github.com/sirupsen/logrus" -) - -func main() { - logrus.SetFormatter(&logrus.JSONFormatter{}) - logrus.SetLevel(logrus.DebugLevel) - - config, err := integrationferry.NewStandardConfig() - if err != nil { - panic(err) - } - - f := &integrationferry.IntegrationFerry{ - Ferry: &ghostferry.Ferry{ - Config: config, - }, - } - - err = f.Main() - if err != nil { - panic(err) - } -} diff --git a/test/test_helper.rb b/test/test_helper.rb index 409979ba..94c3439b 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -19,7 +19,7 @@ class GhostferryTestCase < Minitest::Test include DbHelper include DataWriterHelper - MINIMAL_GHOSTFERRY = "minimal.go" + MINIMAL_GHOSTFERRY = "integrationferry.go" def new_ghostferry(filename) # Transform path to something ruby understands @@ -83,13 +83,13 @@ def assert_test_table_is_identical assert target[DEFAULT_FULL_TABLE_NAME][:row_count] > 0 assert_equal( - source[DEFAULT_FULL_TABLE_NAME][:checksum], - target[DEFAULT_FULL_TABLE_NAME][:checksum], + source[DEFAULT_FULL_TABLE_NAME][:row_count], + target[DEFAULT_FULL_TABLE_NAME][:row_count], ) assert_equal( - source[DEFAULT_FULL_TABLE_NAME][:sample_row], - target[DEFAULT_FULL_TABLE_NAME][:sample_row], + source[DEFAULT_FULL_TABLE_NAME][:checksum], + target[DEFAULT_FULL_TABLE_NAME][:checksum], ) end