Skip to content

Commit

Permalink
Switched to HTTP callbacks instead of socket
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Jan 15, 2019
1 parent f1465a7 commit 0e2a271
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 141 deletions.
2 changes: 1 addition & 1 deletion test/helpers/data_writer_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def initialize(db_config,
insert_probability: 0.33,
update_probability: 0.33,
delete_probability: 0.34,
number_of_writers: 2,
number_of_writers: 1,
logger: nil
)
@db_config = db_config
Expand Down
157 changes: 76 additions & 81 deletions test/helpers/ghostferry_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
require "json"
require "logger"
require "open3"
require "socket"
require "thread"
require "tmpdir"
require "webrick"
require "cgi"

module GhostferryHelper
GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration")
Expand All @@ -29,13 +30,9 @@ class Ghostferry
# ghostferry.run

# Keep these in sync with integrationferry.go
ENV_KEY_SOCKET_PATH = "GHOSTFERRY_INTEGRATION_SOCKET_PATH"
ENV_KEY_PORT = "GHOSTFERRY_INTEGRATION_PORT"
MAX_MESSAGE_SIZE = 256

SOCKET_PATH = ENV[ENV_KEY_SOCKET_PATH] || "/tmp/ghostferry-integration.sock"

CONTINUE = "CONTINUE"

module Status
# This should be in sync with integrationferry.go
READY = "READY"
Expand All @@ -51,7 +48,7 @@ module Status

attr_reader :stdout, :stderr, :exit_status, :pid

def initialize(main_path, logger: nil, message_timeout: 30)
def initialize(main_path, logger: nil, message_timeout: 30, port: 39393)
@main_path = main_path
@message_timeout = message_timeout
@logger = logger
Expand All @@ -70,13 +67,14 @@ def initialize(main_path, logger: nil, message_timeout: 30)
@compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name)

@status_handlers = {}
@stop_requested = false

@server_thread = nil
@server_watchdog_thread = nil
@subprocess_thread = nil

@server = nil
@server_started_notifier = Queue.new
@server_last_error = nil
@server_port = port

@pid = 0
@exit_status = nil
Expand Down Expand Up @@ -104,78 +102,60 @@ def compile_binary
end

def start_server
@server_thread = Thread.new do
@logger.info("starting integration test server")
@server = UNIXServer.new(SOCKET_PATH)
@server_started_notifier.push(true)
@server_last_error = nil

@last_message_time = Time.now
@server = WEBrick::HTTPServer.new(
BindAddress: "127.0.0.1",
Port: @server_port,
Logger: @logger,
AccessLog: [],
)

reads = [@server]
last_message_time = Time.now
@server.mount_proc "/" do |req, resp|
unless req.body
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send form data")
resp.status = 400
@server.shutdown
end

while (!@stop_requested && @exit_status.nil?) do
ready = IO.select(reads, nil, nil, 0.2)
query = CGI::parse(req.body)

if ready.nil?
next if Time.now - last_message_time < @message_timeout
status = query["status"]
data = query["data"]

raise "ghostferry did not report to the integration test server for the last #{@message_timeout}"
end
unless status
@server_last_error = ArgumentError.new("Ghostferry is improperly implemented and did not send a status")
resp.status = 400
@server.shutdown
end

last_message_time = Time.now

# Each client should send one message, expects a message back, and
# then close the connection.
#
# This is done because there are many goroutines operating in
# parallel and sending messages over a shared connection would result
# in multiplexing issues. Using separate connections gets around this
# problem.
ready[0].each do |socket|
if socket == @server
# A new message is to be sent by a goroutine
client = @server.accept_nonblock
reads << client
elsif socket.eof?
# A message was complete
@logger.warn("client disconnected?")
socket.close
reads.delete(socket)
else
# Receiving a message
data = socket.read_nonblock(MAX_MESSAGE_SIZE)
data = data.split("\0")
@logger.debug("server received status: #{data}")

status = data.shift

@status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil?
begin
socket.write(CONTINUE)
rescue Errno::EPIPE
# It is possible for the status handler to kill Ghostferry.
# In such scenarios, this write may result in a broken pipe as
# the socket is closed.
#
# We rescue this and move on.
@logger.debug("can't send CONTINUE due to broken pipe")
end
status = status.first

reads.delete(socket)
end
end
end
@last_message_time = Time.now
@status_handlers[status].each { |f| f.call(*data) } unless @status_handlers[status].nil?
rescue StandardError => e
# errors are not reported from WEBrick but the server should fail early
# as this indicates there is likely a programming error.
@server_last_error = e
@server.shutdown
end

@server.close
@server_thread = Thread.new do
@logger.info("starting server thread")
@server.start
@logger.info("server thread stopped")
end
end
end

def start_ghostferry(resuming_state = nil)
@subprocess_thread = Thread.new do
# No need to spam the logs with Ghostferry interrupted exceptions if
# Ghostferry is supposed to be interrupted.
Thread.current.report_on_exception = false

environment = {
ENV_KEY_SOCKET_PATH => SOCKET_PATH
ENV_KEY_PORT => @server_port.to_s
}

@logger.info("starting ghostferry test binary #{@compiled_binary_path}")
Expand Down Expand Up @@ -219,44 +199,59 @@ def start_ghostferry(resuming_state = nil)
end
end

def wait_until_server_has_started
@server_started_notifier.pop
@logger.info("integration test server started and listening for connection")
end
def start_server_watchdog
# If the subprocess hangs or exits abnormally due to a bad implementation
# (panic/other unexpected errors) we need to make sure to terminate the
# HTTP server to free up the port.
@server_watchdog_thread = Thread.new do
while @subprocess_thread.alive? do
if Time.now - @last_message_time > @message_timeout
@server.shutdown
raise "ghostferry did not report to the integration test server for the last #{@message_timeout}s"
end

def wait_until_ghostferry_run_is_complete
# Server thread should always join first because the loop within it
# should exit if @exit_status != nil.
@server_thread.join if @server_thread
@subprocess_thread.join if @subprocess_thread
sleep 1
end

@server.shutdown
@logger.info("server watchdog thread stopped")
end
end

def send_signal(signal)
Process.kill(signal, @pid) if @pid != 0
end

def kill
@stop_requested = true
@server.shutdown unless @server.nil?
send_signal("KILL")

# Need to ensure the server shutdown before returning so the port gets
# freed and can be reused.
@server_thread.join if @server_thread
@server_watchdog_thread.join if @server_watchdog_thread

begin
wait_until_ghostferry_run_is_complete
@subprocess_thread.join if @subprocess_thread
rescue GhostferryExitFailure
# ignore
end

File.unlink(SOCKET_PATH) if File.exist?(SOCKET_PATH)
end

def run(resuming_state = nil)
resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash)

compile_binary
start_server
wait_until_server_has_started
start_ghostferry(resuming_state)
wait_until_ghostferry_run_is_complete
start_server_watchdog

@subprocess_thread.join
@server_watchdog_thread.join
@server_thread.join
ensure
kill
raise @server_last_error unless @server_last_error.nil?
end

# When using this method, you need to call it within the block of
Expand Down
76 changes: 17 additions & 59 deletions test/lib/go/integrationferry/integrationferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"

Expand All @@ -16,15 +16,11 @@ import (

const (
// These should be kept in sync with ghostferry.rb
socketEnvName string = "GHOSTFERRY_INTEGRATION_SOCKET_PATH"
socketTimeout time.Duration = 30 * time.Second
portEnvName string = "GHOSTFERRY_INTEGRATION_PORT"
timeout time.Duration = 30 * time.Second
maxMessageSize int = 256
)

const (
CommandContinue string = "CONTINUE"
)

const (
// These should be kept in sync with ghostferry.rb

Expand All @@ -48,69 +44,31 @@ type IntegrationFerry struct {
// =========================================
// Code for integration server communication
// =========================================
func (f *IntegrationFerry) connect() (net.Conn, error) {
socketAddress := os.Getenv(socketEnvName)
if socketAddress == "" {
return nil, fmt.Errorf("environment variable %s must be specified", socketEnvName)
}

return net.DialTimeout("unix", socketAddress, socketTimeout)
}

func (f *IntegrationFerry) send(conn net.Conn, status string, arguments ...string) error {
conn.SetDeadline(time.Now().Add(socketTimeout))

arguments = append([]string{status}, arguments...)
data := []byte(strings.Join(arguments, "\x00"))

if len(data) > maxMessageSize {
return fmt.Errorf("message %v is greater than maxMessageSize %v", arguments, maxMessageSize)
}

_, err := conn.Write(data)
return err
}

func (f *IntegrationFerry) receive(conn net.Conn) (string, error) {
conn.SetDeadline(time.Now().Add(socketTimeout))

var buf [maxMessageSize]byte

n, err := conn.Read(buf[:])
if err != nil {
return "", err
func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, data ...string) error {
integrationPort := os.Getenv(portEnvName)
if integrationPort == "" {
return fmt.Errorf("environment variable %s must be specified", portEnvName)
}

return string(buf[0:n]), nil
}

// Sends a status string to the integration server and block until we receive
// "CONTINUE" from the server.
//
// We need to establish a new connection to the integration server for each
// message as there are multiple goroutines sending messages simultaneously.
func (f *IntegrationFerry) SendStatusAndWaitUntilContinue(status string, arguments ...string) error {
conn, err := f.connect()
if err != nil {
return err
client := &http.Client{
Timeout: timeout,
}
defer conn.Close()

err = f.send(conn, status, arguments...)
if err != nil {
return err
}
resp, err := client.PostForm(fmt.Sprintf("http://localhost:%s", integrationPort), url.Values{
"status": []string{status},
"data": data,
})

command, err := f.receive(conn)
if err != nil {
return err
}

if command == CommandContinue {
return nil
if resp.StatusCode != 200 {
return fmt.Errorf("server returned invalid status: %d", resp.StatusCode)
}

return fmt.Errorf("unrecognized command %s from integration server", command)
return nil
}

// Method override for Start in order to send status to the integration
Expand Down

0 comments on commit 0e2a271

Please sign in to comment.