Skip to content

Commit

Permalink
Updated for comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Jan 21, 2019
1 parent bd60675 commit b8fb91a
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 83 deletions.
99 changes: 56 additions & 43 deletions test/helpers/ghostferry_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class Ghostferry

# Keep these in sync with integrationferry.go
ENV_KEY_PORT = "GHOSTFERRY_INTEGRATION_PORT"
MAX_MESSAGE_SIZE = 256

module Status
# This should be in sync with integrationferry.go
Expand All @@ -49,22 +48,14 @@ module Status
attr_reader :stdout, :stderr, :exit_status, :pid

def initialize(main_path, logger: nil, message_timeout: 30, port: 39393)
@main_path = main_path
@message_timeout = message_timeout
@logger = logger
if @logger.nil?
@logger = Logger.new(STDOUT)
@logger.level = Logger::DEBUG
end

FileUtils.mkdir_p(GHOSTFERRY_TEMPDIR, mode: 0700)

# full name relative to the ghostferry root dir, with / replaced with _
# and the extension stripped.
full_path = File.absolute_path(@main_path)
full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory
binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_")
@compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name)
@main_path = main_path
@message_timeout = message_timeout

@status_handlers = {}

Expand All @@ -80,14 +71,54 @@ def initialize(main_path, logger: nil, message_timeout: 30, port: 39393)
@exit_status = nil
@stdout = []
@stderr = []

# Setup the directory to the compiled binary under the system temporary
# directory.
FileUtils.mkdir_p(GHOSTFERRY_TEMPDIR, mode: 0700)

# To guarentee that the compiled binary will have an unique name, we use
# the full path of the file relative to the ghostferry root directory as
# the binary name. In order to avoid having / in the binary name all / in
# the full path is replaced with _. The .go extension is also stripped to
# avoid confusion.
full_path = File.absolute_path(@main_path)
full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory
binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_")
@compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name)
end

def on_status(status, &block)
raise "must specify a block" unless block_given?
@status_handlers[status] ||= []
@status_handlers[status] << block
# The main method to call to run a Ghostferry subprocess.
def run(resuming_state = nil)
resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash)

compile_binary
start_server
start_ghostferry(resuming_state)
start_server_watchdog

@subprocess_thread.join
@server_watchdog_thread.join
@server_thread.join
ensure
kill
raise @server_last_error unless @server_last_error.nil?
end

# When using this method, you need to ensure that the datawriter has been
# stopped properly (if you're using stop_datawriter_during_cutover).
def run_expecting_interrupt(resuming_state = nil)
run(resuming_state)
rescue GhostferryExitFailure
dumped_state = @stdout.join("")
JSON.parse(dumped_state)
else
raise "Ghostferry did not get interrupted"
end

######################################################
# Methods representing the different stages of `run` #
######################################################

def compile_binary
return if File.exist?(@compiled_binary_path)

Expand Down Expand Up @@ -220,6 +251,16 @@ def start_server_watchdog
end
end

###################
# Utility methods #
###################

def on_status(status, &block)
raise "must specify a block" unless block_given?
@status_handlers[status] ||= []
@status_handlers[status] << block
end

def send_signal(signal)
Process.kill(signal, @pid) if @pid != 0
end
Expand All @@ -239,33 +280,5 @@ def kill
# ignore
end
end

def run(resuming_state = nil)
resuming_state = JSON.generate(resuming_state) if resuming_state.is_a?(Hash)

compile_binary
start_server
start_ghostferry(resuming_state)
start_server_watchdog

@subprocess_thread.join
@server_watchdog_thread.join
@server_thread.join
ensure
kill
raise @server_last_error unless @server_last_error.nil?
end

# When using this method, you need to call it within the block of
# GhostferryIntegration::TestCase#with_state_cleanup to ensure that the
# integration server is shutdown properly.
def run_expecting_interrupt(resuming_state = nil)
run(resuming_state)
rescue GhostferryExitFailure
dumped_state = @stdout.join("")
JSON.parse(dumped_state)
else
raise "Ghostferry did not get interrupted"
end
end
end
30 changes: 27 additions & 3 deletions test/integration/interrupt_resume_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,32 @@
require "json"

class InterruptResumeTest < GhostferryTestCase
def test_interrupt_resume_with_writes_to_source
def setup
seed_simple_database_with_single_table
end

def test_interrupt_resume_without_writes_to_source_to_check_target_state_when_interrupted
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)

# Writes one batch
ghostferry.on_status(Ghostferry::Status::AFTER_ROW_COPY) do
ghostferry.send_signal("TERM")
end

dumped_state = ghostferry.run_expecting_interrupt
assert_basic_fields_exist_in_dumped_state(dumped_state)

result = target_db.query("SELECT COUNT(*) AS cnt FROM #{DEFAULT_FULL_TABLE_NAME}")
count = result.first["cnt"]
assert_equal 200, count

result = target_db.query("SELECT MAX(id) AS max_id FROM #{DEFAULT_FULL_TABLE_NAME}")
last_successful_id = result.first["max_id"]
assert last_successful_id > 0
assert_equal last_successful_id, dumped_state["LastSuccessfulPrimaryKeys"]["#{DEFAULT_DB}.#{DEFAULT_TABLE}"]
end

def test_interrupt_resume_with_writes_to_source
# Start a ghostferry run expecting it to be interrupted.
datawriter = new_source_datawriter
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)
Expand Down Expand Up @@ -36,8 +59,6 @@ def test_interrupt_resume_with_writes_to_source
end

def test_interrupt_resume_when_table_has_completed
seed_simple_database_with_single_table

# Start a run of Ghostferry expecting to be interrupted
datawriter = new_source_datawriter
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)
Expand All @@ -58,6 +79,9 @@ def test_interrupt_resume_when_table_has_completed

start_datawriter_with_ghostferry(datawriter, ghostferry)
stop_datawriter_during_cutover(datawriter, ghostferry)

ghostferry.run(dumped_state)

assert_test_table_is_identical
end
end
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package integrationferry
package main

import (
"encoding/json"
Expand All @@ -12,13 +12,13 @@ import (

"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/testhelpers"
"github.com/sirupsen/logrus"
)

const (
// These should be kept in sync with ghostferry.rb
portEnvName string = "GHOSTFERRY_INTEGRATION_PORT"
timeout time.Duration = 30 * time.Second
maxMessageSize int = 256
portEnvName string = "GHOSTFERRY_INTEGRATION_PORT"
timeout time.Duration = 30 * time.Second
)

const (
Expand Down Expand Up @@ -194,3 +194,24 @@ func NewStandardConfig() (*ghostferry.Config, error) {

return config, config.ValidateConfig()
}

func main() {
logrus.SetFormatter(&logrus.JSONFormatter{})
logrus.SetLevel(logrus.DebugLevel)

config, err := NewStandardConfig()
if err != nil {
panic(err)
}

f := &IntegrationFerry{
Ferry: &ghostferry.Ferry{
Config: config,
},
}

err = f.Main()
if err != nil {
panic(err)
}
}
28 changes: 0 additions & 28 deletions test/lib/go/minimal.go

This file was deleted.

10 changes: 5 additions & 5 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class GhostferryTestCase < Minitest::Test
include DbHelper
include DataWriterHelper

MINIMAL_GHOSTFERRY = "minimal.go"
MINIMAL_GHOSTFERRY = "integrationferry.go"

def new_ghostferry(filename)
# Transform path to something ruby understands
Expand Down Expand Up @@ -83,13 +83,13 @@ def assert_test_table_is_identical
assert target[DEFAULT_FULL_TABLE_NAME][:row_count] > 0

assert_equal(
source[DEFAULT_FULL_TABLE_NAME][:checksum],
target[DEFAULT_FULL_TABLE_NAME][:checksum],
source[DEFAULT_FULL_TABLE_NAME][:row_count],
target[DEFAULT_FULL_TABLE_NAME][:row_count],
)

assert_equal(
source[DEFAULT_FULL_TABLE_NAME][:sample_row],
target[DEFAULT_FULL_TABLE_NAME][:sample_row],
source[DEFAULT_FULL_TABLE_NAME][:checksum],
target[DEFAULT_FULL_TABLE_NAME][:checksum],
)
end

Expand Down

0 comments on commit b8fb91a

Please sign in to comment.