Skip to content

Commit

Permalink
More ruby idiomatic
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Jan 15, 2019
1 parent c0c64e7 commit 15c373e
Show file tree
Hide file tree
Showing 15 changed files with 410 additions and 414 deletions.
2 changes: 2 additions & 0 deletions test/integration/Gemfile → Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ source "https://rubygems.org"
gem "minitest"
gem "minitest-hooks"
gem "mysql2"

gem "rake"
2 changes: 2 additions & 0 deletions test/integration/Gemfile.lock → Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ GEM
minitest-hooks (1.5.0)
minitest (> 5.3)
mysql2 (0.5.2)
rake (12.3.2)

PLATFORMS
ruby
Expand All @@ -13,6 +14,7 @@ DEPENDENCIES
minitest
minitest-hooks
mysql2
rake

BUNDLED WITH
1.16.1
9 changes: 9 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
require 'rake/testtask'

task :default => [:test]

Rake::TestTask.new do |t|
t.test_files = FileList['test/**/*test.rb']
t.verbose = true
t.libs << ["test", "test/helpers", "test/lib"]
end
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
require "logger"
require "thread"

require "db_helper"
require "ghostferry_helper"
require "mysql2"

module GhostferryIntegration
module DataWriterHelper
def start_datawriter_with_ghostferry(dw, gf, &on_write)
gf.on_status(GhostferryHelper::Ghostferry::Status::READY) do
dw.start(&on_write)
end
end

def stop_datawriter_during_cutover(dw, gf)
gf.on_status(GhostferryHelper::Ghostferry::Status::ROW_COPY_COMPLETED) do
# At the start of the cutover phase, we have to set the database to
# read-only. This is done by stopping the datawriter.
dw.stop_and_join
end
end

class DataWriter
# A threaded data writer that just hammers the database with write
# queries as much as possible.
#
# This is used essentially for random testing.
def initialize(db_config,
tables: [DbManager::DEFAULT_FULL_TABLE_NAME],
tables: [DbHelper::DEFAULT_FULL_TABLE_NAME],
insert_probability: 0.33,
update_probability: 0.33,
delete_probability: 0.34,
Expand All @@ -26,6 +42,7 @@ def initialize(db_config,
@delete_probability = [@update_probability[1], @update_probability[1] + delete_probability]

@threads = []
@started = false
@stop_requested = false

@logger = logger
Expand All @@ -36,6 +53,8 @@ def initialize(db_config,
end

def start(&on_write)
raise "Cannot start DataWriter multiple times. Use a new instance instead " if @started
@started = true
@number_of_writers.times do |i|
@threads << Thread.new do
@logger.info("starting data writer thread #{i}")
Expand All @@ -50,8 +69,9 @@ def start(&on_write)
end
end

def stop
def stop_and_join
@stop_requested = true
join
end

def join
Expand Down Expand Up @@ -81,15 +101,15 @@ def write_data(connection, &on_write)
def insert_data(connection)
table = @tables.sample
insert_statement = connection.prepare("INSERT INTO #{table} (id, data) VALUES (?, ?)")
insert_statement.execute(nil, GhostferryIntegration.rand_data)
insert_statement.execute(nil, DbHelper.rand_data)
connection.last_id
end

def update_data(connection)
table = @tables.sample
id = random_real_id(connection, table)
update_statement = connection.prepare("UPDATE #{table} SET data = ? WHERE id >= ? LIMIT 1")
update_statement.execute(GhostferryIntegration.rand_data, id)
update_statement.execute(DbHelper.rand_data, id)
id
end

Expand Down
150 changes: 150 additions & 0 deletions test/helpers/db_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
require "logger"
require "mysql2"

module DbHelper
ALPHANUMERICS = ("0".."9").to_a + ("a".."z").to_a + ("A".."Z").to_a
DB_PORTS = {source: 29291, target: 29292}

DEFAULT_DB = "gftest"
DEFAULT_TABLE = "test_table_1"

def self.full_table_name(db, table)
"`#{db}`.`#{table}`"
end

def self.rand_data(length: 32)
ALPHANUMERICS.sample(length).join("") + "👻⛴️"
end

DEFAULT_FULL_TABLE_NAME = full_table_name(DEFAULT_DB, DEFAULT_TABLE)

def full_table_name(db, table)
DbHelper.full_table_name(db, table)
end

def rand_data(length: 32)
DbHelper.rand_data(length: length)
end

def default_db_config(port:)
{
host: "127.0.0.1",
port: port,
username: "root",
password: "",
encoding: "utf8mb4",
collation: "utf8mb4_unicode_ci",
}
end

def transaction(connection)
raise ArgumentError, "must pass a block" if !block_given?

begin
connection.query("BEGIN")
yield
rescue
connection.query("ROLLBACK")
raise
else
connection.query("COMMIT")
end
end

def initialize_db_connections
@connections = {}
DB_PORTS.each do |name, port|
@connections[name] = Mysql2::Client.new(default_db_config(port: port))
end
end

def source_db
@connections[:source]
end

def target_db
@connections[:target]
end

def source_db_config
default_db_config(port: DB_PORTS[:source])
end

def target_db_config
default_db_config(port: DB_PORTS[:target])
end

# Database Seeding Methods
##########################
# Each test case can choose what kind of database it wants to setup by
# calling one of these methods.

def reset_data
@connections.each do |_, connection|
connection.query("DROP DATABASE IF EXISTS `#{DEFAULT_DB}`")
end
end

def seed_random_data(connection, database_name: DEFAULT_DB, table_name: DEFAULT_TABLE, number_of_rows: 1111)
dbtable = full_table_name(database_name, table_name)

connection.query("CREATE DATABASE IF NOT EXISTS #{database_name}")
connection.query("CREATE TABLE IF NOT EXISTS #{dbtable} (id bigint(20) not null auto_increment, data TEXT, primary key(id))")

transaction(connection) do
insert_statement = connection.prepare("INSERT INTO #{dbtable} (id, data) VALUES (?, ?)")

number_of_rows.times do
insert_statement.execute(nil, rand_data)
end
end
end

def seed_simple_database_with_single_table
# Setup the source database with data.
max_id = 1111
seed_random_data(source_db, number_of_rows: max_id)

# Create some holes in the data.
delete_statement = source_db.prepare("DELETE FROM #{full_table_name(DEFAULT_DB, DEFAULT_TABLE)} WHERE id = ?")
140.times do
delete_statement.execute(Random.rand(max_id) + 1)
end

# Setup the target database with no data but the correct schema.
seed_random_data(target_db, number_of_rows: 0)
end

# Get some overall metrics like CHECKSUM, row count, sample row from tables.
# Generally used for test validation.
def source_and_target_table_metrics(tables: [DEFAULT_FULL_TABLE_NAME])
source_metrics = {}
target_metrics = {}

tables.each do |table|
source_metrics[table] = table_metric(source_db, table)
target_metrics[table] = table_metric(target_db, table, sample_id: source_metrics[table][:sample_row]["id"])
end

[source_metrics, target_metrics]
end

def table_metric(conn, table, sample_id: nil)
metrics = {}
result = conn.query("CHECKSUM TABLE #{table}")
metrics[:checksum] = result.first["Checksum"]

result = conn.query("SELECT COUNT(*) AS cnt FROM #{table}")
metrics[:row_count] = result.first["cnt"]

if sample_id.nil?
result = conn.query("SELECT * FROM #{table} ORDER BY RAND() LIMIT 1")
metrics[:sample_row] = result.first
else
result = conn.query("SELECT * FROM #{table} WHERE id = #{sample_id} LIMIT 1")
metrics[:sample_row] = result.first
end

metrics
end
end
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
require "fileutils"
require "json"
require "logger"
require "open3"
require "socket"
require "thread"
require "tmpdir"

module GhostferryIntegration
module GhostferryHelper
GHOSTFERRY_TEMPDIR = File.join(Dir.tmpdir, "ghostferry-integration")

def self.remove_all_binaries
FileUtils.remove_entry(GHOSTFERRY_TEMPDIR) if Dir.exist?(GHOSTFERRY_TEMPDIR)
end

class GhostferryExitFailure < StandardError
end

class Ghostferry
# Manages compiling, running, and communicating with Ghostferry.
#
#
# To use this class:
#
# ghostferry = Ghostferry.new("path/to/main.go")
Expand Down Expand Up @@ -52,22 +60,21 @@ def initialize(main_path, logger: nil, message_timeout: 30)
@logger.level = Logger::DEBUG
end

@tempdir = Dir.mktmpdir("ghostferry-integration")
FileUtils.mkdir_p(GHOSTFERRY_TEMPDIR, mode: 0700)

# full name relative to the ghostferry root dir, with / replaced with _
# and the extension stripped.
full_path = File.absolute_path(@main_path)
full_path = full_path.split("/ghostferry/")[-1] # Assuming that ghostferry will show up in the path as its own directory
binary_name = File.join(File.dirname(full_path), File.basename(full_path, ".*")).gsub("/", "_")
@compiled_binary_path = File.join(@tempdir, binary_name)

reset_state
end
@compiled_binary_path = File.join(GHOSTFERRY_TEMPDIR, binary_name)

def reset_state
@status_handlers = {}
@stop_requested = false

@server_thread = nil
@subprocess_thread = nil

@server = nil
@server_started_notifier = Queue.new

Expand All @@ -84,7 +91,7 @@ def on_status(status, &block)
end

def compile_binary
return if File.exists?(@compiled_binary_path)
return if File.exist?(@compiled_binary_path)

@logger.info("compiling test binary to #{@compiled_binary_path}")
rc = system(
Expand Down Expand Up @@ -224,27 +231,21 @@ def wait_until_ghostferry_run_is_complete
@subprocess_thread.join if @subprocess_thread
end

def remove_socket
File.unlink(SOCKET_PATH) if File.exists?(SOCKET_PATH)
end

def remove_binary
FileUtils.remove_entry(@tempdir) unless @tempdir.nil?
end

def send_signal(signal)
Process.kill(signal, @pid) if @pid != 0
end

def stop_and_cleanup
def kill
@logger.info("killing ghostferry")
@stop_requested = true
send_signal("KILL")
begin
wait_until_ghostferry_run_is_complete
rescue GhostferryExitFailure
# ignore
end
reset_state

File.unlink(SOCKET_PATH) if File.exist?(SOCKET_PATH)
end

def run(resuming_state = nil)
Expand All @@ -256,7 +257,7 @@ def run(resuming_state = nil)
start_ghostferry(resuming_state)
wait_until_ghostferry_run_is_complete
ensure
remove_socket
kill
end

# When using this method, you need to call it within the block of
Expand Down
Loading

0 comments on commit 15c373e

Please sign in to comment.