diff --git a/test/helpers/data_writer_helper.rb b/test/helpers/data_writer_helper.rb index 591a0f3b..41341195 100644 --- a/test/helpers/data_writer_helper.rb +++ b/test/helpers/data_writer_helper.rb @@ -30,7 +30,7 @@ def initialize(db_config, insert_probability: 0.33, update_probability: 0.33, delete_probability: 0.34, - number_of_writers: 2, + number_of_writers: 1, logger: nil ) @db_config = db_config diff --git a/test/helpers/ghostferry_helper.rb b/test/helpers/ghostferry_helper.rb index f28108a0..99bff3c3 100644 --- a/test/helpers/ghostferry_helper.rb +++ b/test/helpers/ghostferry_helper.rb @@ -2,9 +2,10 @@ require "json" require "logger" require "open3" -require "socket" require "thread" require "tmpdir" +require "webrick" +require "cgi" module GhostferryHelper GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration") @@ -29,13 +30,9 @@ class Ghostferry # ghostferry.run # Keep these in sync with integrationferry.go - ENV_KEY_SOCKET_PATH = "GHOSTFERRY_INTEGRATION_SOCKET_PATH" + ENV_KEY_PORT = "GHOSTFERRY_INTEGRATION_PORT" MAX_MESSAGE_SIZE = 256 - SOCKET_PATH = ENV[ENV_KEY_SOCKET_PATH] || "/tmp/ghostferry-integration.sock" - - CONTINUE = "CONTINUE" - module Status # This should be in sync with integrationferry.go READY = "READY" @@ -51,7 +48,7 @@ module Status attr_reader :stdout, :stderr, :exit_status, :pid - def initialize(main_path, logger: nil, message_timeout: 30) + def initialize(main_path, logger: nil, message_timeout: 30, port: 39393) @main_path = main_path @message_timeout = message_timeout @logger = logger @@ -70,13 +67,14 @@ def initialize(main_path, logger: nil, message_timeout: 30) @compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name) @status_handlers = {} - @stop_requested = false @server_thread = nil + @server_watchdog_thread = nil @subprocess_thread = nil @server = nil - @server_started_notifier = Queue.new + @server_last_error = nil + @server_port = port @pid = 0 @exit_status = nil @@ -104,78 +102,60 @@ def compile_binary end def start_server - @server_thread = Thread.new do - @logger.info("starting integration test server") - @server = UNIXServer.new(SOCKET_PATH) - @server_started_notifier.push(true) + @server_last_error = nil + + @last_message_time = Time.now + @server = WEBrick::HTTPServer.new( + BindAddress: "127.0.0.1", + Port: @server_port, + Logger: @logger, + AccessLog: [], + ) - reads = [@server] - last_message_time = Time.now + @server.mount_proc "/" do |req, resp| + unless req.body + @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send form data") + resp.status = 400 + @server.shutdown + end - while (!@stop_requested && @exit_status.nil?) do - ready = IO.select(reads, nil, nil, 0.2) + query = CGI::parse(req.body) - if ready.nil? - next if Time.now - last_message_time < @message_timeout + status = query["status"] + data = query["data"] - raise "ghostferry did not report to the integration test server for the last #{@message_timeout}" - end + unless status + @server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status") + resp.status = 400 + @server.shutdown + end - last_message_time = Time.now - - # Each client should send one message, expects a message back, and - # then close the connection. - # - # This is done because there are many goroutines operating in - # parallel and sending messages over a shared connection would result - # in multiplexing issues. Using separate connections gets around this - # problem. - ready[0].each do |socket| - if socket == @server - # A new message is to be sent by a goroutine - client = @server.accept_nonblock - reads << client - elsif socket.eof? - # A message was complete - @logger.warn("client disconnected?") - socket.close - reads.delete(socket) - else - # Receiving a message - data = socket.read_nonblock(MAX_MESSAGE_SIZE) - data = data.split("\0") - @logger.debug("server received status: #{data}") - - status = data.shift - - @status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil? - begin - socket.write(CONTINUE) - rescue Errno::EPIPE - # It is possible for the status handler to kill Ghostferry. - # In such scenarios, this write may result in a broken pipe as - # the socket is closed. - # - # We rescue this and move on. - @logger.debug("can't send CONTINUE due to broken pipe") - end + status = status.first - reads.delete(socket) - end - end - end + @last_message_time = Time.now + @status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil? + rescue StandardError => e + # errors are not reported from WEBrick but the server should fail early + # as this indicates there is likely a programming error. + @server_last_error = e + @server.shutdown + end - @server.close + @server_thread = Thread.new do + @logger.info("starting server thread") + @server.start @logger.info("server thread stopped") - end + end end def start_ghostferry(resuming_state = nil) @subprocess_thread = Thread.new do + # No need to spam the logs with Ghostferry interrupted exceptions if + # Ghostferry is supposed to be interrupted. Thread.current.report_on_exception = false environment = { - ENV_KEY_SOCKET_PATH => SOCKET_PATH + ENV_KEY_PORT => @server_port.to_s } @logger.info("starting ghostferry test binary #{@compiled_binary_path}") @@ -219,16 +199,23 @@ def start_ghostferry(resuming_state = nil) end end - def wait_until_server_has_started - @server_started_notifier.pop - @logger.info("integration test server started and listening for connection") - end + def start_server_watchdog + # If the subprocess hangs or exits abnormally due to a bad implementation + # (panic/other unexpected errors) we need to make sure to terminate the + # HTTP server to free up the port. + @server_watchdog_thread = Thread.new do + while @subprocess_thread.alive? do + if Time.now - @last_message_time > @message_timeout + @server.shutdown + raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s" + end - def wait_until_ghostferry_run_is_complete - # Server thread should always join first because the loop within it - # should exit if @exit_status != nil. - @server_thread.join if @server_thread - @subprocess_thread.join if @subprocess_thread + sleep 1 + end + + @server.shutdown + @logger.info("server watchdog thread stopped") + end end def send_signal(signal) @@ -236,15 +223,19 @@ def send_signal(signal) end def kill - @stop_requested = true + @server.shutdown unless @server.nil? send_signal("KILL") + + # Need to ensure the server shutdown before returning so the port gets + # freed and can be reused. + @server_thread.join if @server_thread + @server_watchdog_thread.join if @server_watchdog_thread + begin - wait_until_ghostferry_run_is_complete + @subprocess_thread.join if @subprocess_thread rescue GhostferryExitFailure # ignore end - - File.unlink(SOCKET_PATH) if File.exist?(SOCKET_PATH) end def run(resuming_state = nil) @@ -252,11 +243,15 @@ def run(resuming_state = nil) compile_binary start_server - wait_until_server_has_started start_ghostferry(resuming_state) - wait_until_ghostferry_run_is_complete + start_server_watchdog + + @subprocess_thread.join + @server_watchdog_thread.join + @server_thread.join ensure kill + raise @server_last_error unless @server_last_error.nil? end # When using this method, you need to call it within the block of diff --git a/test/lib/go/integrationferry/integrationferry.go b/test/lib/go/integrationferry/integrationferry.go index ddaf572b..94211102 100644 --- a/test/lib/go/integrationferry/integrationferry.go +++ b/test/lib/go/integrationferry/integrationferry.go @@ -4,9 +4,9 @@ import ( "encoding/json" "fmt" "io/ioutil" - "net" + "net/http" + "net/url" "os" - "strings" "sync" "time" @@ -16,15 +16,11 @@ import ( const ( // These should be kept in sync with ghostferry.rb - socketEnvName string = "GHOSTFERRY_INTEGRATION_SOCKET_PATH" - socketTimeout time.Duration = 30 * time.Second + portEnvName string = "GHOSTFERRY_INTEGRATION_PORT" + timeout time.Duration = 30 * time.Second maxMessageSize int = 256 ) -const ( - CommandContinue string = "CONTINUE" -) - const ( // These should be kept in sync with ghostferry.rb @@ -48,69 +44,31 @@ type IntegrationFerry struct { // ========================================= // Code for integration server communication // ========================================= -func (f *IntegrationFerry) connect() (net.Conn, error) { - socketAddress := os.Getenv(socketEnvName) - if socketAddress == "" { - return nil, fmt.Errorf("environment variable %s must be specified", socketEnvName) - } - - return net.DialTimeout("unix", socketAddress, socketTimeout) -} - -func (f *IntegrationFerry) send(conn net.Conn, status string, arguments ...string) error { - conn.SetDeadline(time.Now().Add(socketTimeout)) - - arguments = append([]string{status}, arguments...) - data := []byte(strings.Join(arguments, "\x00")) - - if len(data) > maxMessageSize { - return fmt.Errorf("message %v is greater than maxMessageSize %v", arguments, maxMessageSize) - } - - _, err := conn.Write(data) - return err -} -func (f *IntegrationFerry) receive(conn net.Conn) (string, error) { - conn.SetDeadline(time.Now().Add(socketTimeout)) - - var buf [maxMessageSize]byte - - n, err := conn.Read(buf[:]) - if err != nil { - return "", err +func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, data ...string) error { + integrationPort := os.Getenv(portEnvName) + if integrationPort == "" { + return fmt.Errorf("environment variable %s must be specified", portEnvName) } - return string(buf[0:n]), nil -} - -// Sends a status string to the integration server and block until we receive -// "CONTINUE" from the server. -// -// We need to establish a new connection to the integration server for each -// message as there are multiple goroutines sending messages simultaneously. -func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, arguments ...string) error { - conn, err := f.connect() - if err != nil { - return err + client := &http.Client{ + Timeout: timeout, } - defer conn.Close() - err = f.send(conn, status, arguments...) - if err != nil { - return err - } + resp, err := client.PostForm(fmt.Sprintf("http://localhost:%s", integrationPort), url.Values{ + "status": []string{status}, + "data": data, + }) - command, err := f.receive(conn) if err != nil { return err } - if command == CommandContinue { - return nil + if resp.StatusCode != 200 { + return fmt.Errorf("server returned invalid status: %d", resp.StatusCode) } - return fmt.Errorf("unrecognized command %s from integration server", command) + return nil } // Method override for Start in order to send status to the integration