Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration tests by compiling and calling Ghostferry from Ruby #64

Merged
merged 6 commits into from
Jan 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
source "https://rubygems.org"

gem "minitest"
gem "minitest-hooks"
gem "mysql2"

gem "rake"
20 changes: 20 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
GEM
remote: https://rubygems.org/
specs:
minitest (5.11.3)
minitest-hooks (1.5.0)
minitest (> 5.3)
mysql2 (0.5.2)
rake (12.3.2)

PLATFORMS
ruby

DEPENDENCIES
minitest
minitest-hooks
mysql2
rake

BUNDLED WITH
1.16.1
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ $(GOBIN):

test:
@go version
go test ./test ./copydb/test ./sharding/test -p 1 -v
go test ./test/go ./copydb/test ./sharding/test -p 1 -v
bundle install && bundle exec rake test

clean:
rm -rf build
Expand Down
9 changes: 9 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
require 'rake/testtask'

task :default => [:test]

Rake::TestTask.new do |t|
t.test_files = FileList['test/**/*test.rb']
t.verbose = true
t.libs << ["test", "test/helpers", "test/lib"]
end
2 changes: 2 additions & 0 deletions dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ up:
- homebrew:
- glide
- mysql
- ruby: 2.5.1
- bundler
- go:
version: 1.10.3
- custom:
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
132 changes: 132 additions & 0 deletions test/helpers/data_writer_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
require "logger"
require "thread"

require "db_helper"
require "ghostferry_helper"
require "mysql2"

module DataWriterHelper
def start_datawriter_with_ghostferry(dw, gf, &on_write)
gf.on_status(GhostferryHelper::Ghostferry::Status::READY) do
dw.start(&on_write)
end
end

def stop_datawriter_during_cutover(dw, gf)
gf.on_status(GhostferryHelper::Ghostferry::Status::ROW_COPY_COMPLETED) do
# At the start of the cutover phase, we have to set the database to
# read-only. This is done by stopping the datawriter.
dw.stop_and_join
end
end

class DataWriter
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much of this is translated directly from data_writer.go. The goal is eventually we can use Ruby tests only.

# A threaded data writer that just hammers the database with write
# queries as much as possible.
#
# This is used essentially for random testing.
def initialize(db_config,
tables: [DbHelper::DEFAULT_FULL_TABLE_NAME],
insert_probability: 0.33,
update_probability: 0.33,
delete_probability: 0.34,
number_of_writers: 1,
logger: nil
)
@db_config = db_config
@tables = tables

@number_of_writers = number_of_writers
@insert_probability = [0, insert_probability]
@update_probability = [@insert_probability[1], @insert_probability[1] + update_probability]
@delete_probability = [@update_probability[1], @update_probability[1] + delete_probability]

@threads = []
@started = false
@stop_requested = false

@logger = logger
if @logger.nil?
@logger = Logger.new(STDOUT)
@logger.level = Logger::DEBUG
end
end

def start(&on_write)
raise "Cannot start DataWriter multiple times. Use a new instance instead " if @started
@started = true
@number_of_writers.times do |i|
@threads << Thread.new do
@logger.info("starting data writer thread #{i}")

connection = Mysql2::Client.new(@db_config)
until @stop_requested do
write_data(connection, &on_write)
end

@logger.info("stopped data writer thread #{i}")
end
end
end

def stop_and_join
@stop_requested = true
join
end

def join
@threads.each do |t|
t.join
end
end

def write_data(connection, &on_write)
r = rand

if r >= @insert_probability[0] && r < @insert_probability[1]
id = insert_data(connection)
op = "INSERT"
elsif r >= @update_probability[0] && r < @update_probability[1]
id = update_data(connection)
op = "UPDATE"
elsif r >= @delete_probability[0] && r < @delete_probability[1]
id = delete_data(connection)
op = "DELETE"
end

@logger.debug("writing data: #{op} #{id}")
on_write.call(op, id) unless on_write.nil?
end

def insert_data(connection)
table = @tables.sample
insert_statement = connection.prepare("INSERT INTO #{table} (id, data) VALUES (?, ?)")
insert_statement.execute(nil, DbHelper.rand_data)
connection.last_id
end

def update_data(connection)
table = @tables.sample
id = random_real_id(connection, table)
update_statement = connection.prepare("UPDATE #{table} SET data = ? WHERE id >= ? LIMIT 1")
update_statement.execute(DbHelper.rand_data, id)
id
end

def delete_data(connection)
table = @tables.sample
id = random_real_id(connection, table)
delete_statement = connection.prepare("DELETE FROM #{table} WHERE id >= ? LIMIT 1")
delete_statement.execute(id)
id
end

def random_real_id(connection, table)
# This query is slow for large datasets.
# For testing purposes, this should be okay.
result = connection.query("SELECT id FROM #{table} ORDER BY RAND() LIMIT 1")
raise "No rows in the database?" if result.first.nil?
result.first["id"]
end
end
end
150 changes: 150 additions & 0 deletions test/helpers/db_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
require "logger"
require "mysql2"

module DbHelper
ALPHANUMERICS = ("0".."9").to_a + ("a".."z").to_a + ("A".."Z").to_a
DB_PORTS = {source: 29291, target: 29292}

DEFAULT_DB = "gftest"
DEFAULT_TABLE = "test_table_1"

def self.full_table_name(db, table)
"`#{db}`.`#{table}`"
end

def self.rand_data(length: 32)
ALPHANUMERICS.sample(length).join("") + "👻⛴️"
end

DEFAULT_FULL_TABLE_NAME = full_table_name(DEFAULT_DB, DEFAULT_TABLE)

def full_table_name(db, table)
DbHelper.full_table_name(db, table)
end

def rand_data(length: 32)
DbHelper.rand_data(length: length)
end

def default_db_config(port:)
{
host: "127.0.0.1",
port: port,
username: "root",
password: "",
encoding: "utf8mb4",
collation: "utf8mb4_unicode_ci",
}
end

def transaction(connection)
raise ArgumentError, "must pass a block" if !block_given?

begin
connection.query("BEGIN")
yield
rescue
connection.query("ROLLBACK")
raise
else
connection.query("COMMIT")
end
end

def initialize_db_connections
@connections = {}
DB_PORTS.each do |name, port|
@connections[name] = Mysql2::Client.new(default_db_config(port: port))
end
end

def source_db
@connections[:source]
end

def target_db
@connections[:target]
end

def source_db_config
default_db_config(port: DB_PORTS[:source])
end

def target_db_config
default_db_config(port: DB_PORTS[:target])
end

# Database Seeding Methods
##########################
# Each test case can choose what kind of database it wants to setup by
# calling one of these methods.

def reset_data
@connections.each do |_, connection|
connection.query("DROP DATABASE IF EXISTS `#{DEFAULT_DB}`")
end
end

def seed_random_data(connection, database_name: DEFAULT_DB, table_name: DEFAULT_TABLE, number_of_rows: 1111)
dbtable = full_table_name(database_name, table_name)

connection.query("CREATE DATABASE IF NOT EXISTS #{database_name}")
connection.query("CREATE TABLE IF NOT EXISTS #{dbtable} (id bigint(20) not null auto_increment, data TEXT, primary key(id))")

transaction(connection) do
insert_statement = connection.prepare("INSERT INTO #{dbtable} (id, data) VALUES (?, ?)")

number_of_rows.times do
insert_statement.execute(nil, rand_data)
end
end
end

def seed_simple_database_with_single_table
# Setup the source database with data.
max_id = 1111
seed_random_data(source_db, number_of_rows: max_id)

# Create some holes in the data.
delete_statement = source_db.prepare("DELETE FROM #{full_table_name(DEFAULT_DB, DEFAULT_TABLE)} WHERE id = ?")
140.times do
delete_statement.execute(Random.rand(max_id) + 1)
end

# Setup the target database with no data but the correct schema.
seed_random_data(target_db, number_of_rows: 0)
end

# Get some overall metrics like CHECKSUM, row count, sample row from tables.
# Generally used for test validation.
def source_and_target_table_metrics(tables: [DEFAULT_FULL_TABLE_NAME])
source_metrics = {}
target_metrics = {}

tables.each do |table|
source_metrics[table] = table_metric(source_db, table)
target_metrics[table] = table_metric(target_db, table, sample_id: source_metrics[table][:sample_row]["id"])
end

[source_metrics, target_metrics]
end

def table_metric(conn, table, sample_id: nil)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big fan of this name, any ideas?

metrics = {}
result = conn.query("CHECKSUM TABLE #{table}")
metrics[:checksum] = result.first["Checksum"]

result = conn.query("SELECT COUNT(*) AS cnt FROM #{table}")
metrics[:row_count] = result.first["cnt"]

if sample_id.nil?
result = conn.query("SELECT * FROM #{table} ORDER BY RAND() LIMIT 1")
metrics[:sample_row] = result.first
else
result = conn.query("SELECT * FROM #{table} WHERE id = #{sample_id} LIMIT 1")
metrics[:sample_row] = result.first
end

metrics
end
end
Loading