Skip to content

Commit

Permalink
online schema change plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
bobpattersonjr committed May 3, 2013
1 parent d0e320e commit af959a2
Show file tree
Hide file tree
Showing 6 changed files with 308 additions and 0 deletions.
45 changes: 45 additions & 0 deletions doc/online_schema_change.rdoc
@@ -0,0 +1,45 @@
= online_schema_change

== OVERVIEW:

This \Jetpants plugin allows the use of pt-online-schema-change in the \Jetpants suite of tools. With this plugin there will be checks to see if there is already an online schema change going on. Also you can run a online schema change against all your shards.

== CONFIGURATION:

This plugin has no extra options, just add the name to your plugins section and that is it.

To enable this plugin, add it to your \Jetpants configuration file (either <tt>/etc/jetpants.yaml</tt> or <tt>~/.jetpants.yaml</tt>). For example you configuration might look like this:

# ... rest of Jetpants config here

plugins:
online_schema_change:
# ... other plugins configured here

== ASSUMPTIONS AND REQUIREMENTS:

Use of this plugin assumes that you already have pt-online-schema-change installed

Also you should be using \Collins and the jetpants_collins plugin

== EXAMPLES:

dry run of an alter on a single pool
jetpants alter_table --database=allmydata --table=somedata --pool=users --dry-run --alter='ADD COLUMN c1 INT'

alter a single pool
jetpants alter_table --database=allmydata --table=somedata --pool=users --alter='ADD COLUMN c1 INT'

dry run of an alter of all the shards of your topology
jetpants alter_table --database=allmydata --table=somedata --all_shards --dry-run --alter='ADD COLUMN c1 INT'

alter all the shards of your topology
jetpants alter_table --database=allmydata --table=somedata --all_shards --alter='ADD COLUMN c1 INT'


the alter table does not drop the old table automatically, so to remove the table on a sigle pool
jetpants alter_table_drop --database=allmydata --table=somedata --pool=users

to drop the tables on all your shards
jetpants alter_table_drop --database=allmydata --table=somedata --all_shards

63 changes: 63 additions & 0 deletions plugins/online_schema_change/commandsuite.rb
@@ -0,0 +1,63 @@
require 'thor'

module Jetpants
class CommandSuite < Thor

desc 'alter_table', 'perform an alter table'
method_option :pool, :desc => 'Name of pool to run the alter table on'
method_option :dry_run, :desc => 'Dry run of the alter table'
method_option :alter, :desc => 'The alter statment (eg ADD COLUMN c1 INT)'
method_option :database, :desc => 'Database to run the alter table on'
method_option :table, :desc => 'Table to run the alter table on'
method_option :all_shards, :desc => 'To run on all the shards'
def alter_table
unless options[:all_shards]
pool_name = options[:pool] || ask('Please enter a name of a pool: ')
pool = Jetpants.topology.pool(pool_name)
raise "#{pool_name} is not a pool name" unless pool
end

all_shards = !!options[:all_shards]

database = options[:database] || false
table = options[:table] || ask('Please enter a name of a table: ')
alter = options[:alter] || ask('Please enter a alter table statment (eg ADD COLUMN c1 INT): ')

if options[:dry_run] then dry_run=true else dry_run=false end

if all_shards
Jetpants.topology.alter_table_shards(database, table, alter, dry_run)
else
unless pool.alter_table(database, table, alter, dry_run)
print "check for errors during online schema change\n"
end
end
end

desc 'alter_table_drop', 'drop the old table after the alter table is complete'
method_option :pool, :desc => 'Name of pool that your ran the alter table on'
method_option :table, :desc => 'Table you ran the alter table on'
method_option :database, :desc => 'Database you ran the alter table on'
method_option :all_shards, :desc => 'To run on all the shards'
def alter_table_drop
unless options[:all_shards]
pool_name = options[:pool] || ask('Please enter a name of a pool: ')
pool = Jetpants.topology.pool(pool_name)
raise "#{pool_name} is not a pool name" unless pool
end

all_shards = !!options[:all_shards]

database = options[:database] || false
table = options[:table] || ask('Please enter a name of a table: ')


if all_shards
Jetpants.topology.drop_old_alter_table_shards(database, table)
else
pool.drop_old_alter_table(database, table)
end
end

end
end
34 changes: 34 additions & 0 deletions plugins/online_schema_change/db.rb
@@ -0,0 +1,34 @@
module Jetpants
class DB

# Creates a temporary user for use of pt-table-checksum and pt-upgrade,
# yields to the supplied block, and then drops the user.
# The user will have a randomly-generated 50-character password, and will
# have elevated permissions (ALL PRIVILEGES on the application schema, and
# a few global privs as well) since these are necessary to run the tools.
# The block will be passed the randomly-generated password.
def with_online_schema_change_user(username, database)
o = [('a'..'z'),('A'..'Z'),(0..9)].map{|i| i.to_a}.flatten
password = (0...50).map{ o[rand(o.length)] }.join
create_user username, password
grant_privileges username, '*', 'PROCESS', 'REPLICATION CLIENT', 'REPLICATION SLAVE', 'SUPER'
grant_privileges username, database, 'ALL PRIVILEGES'
begin
yield password
rescue StandardError, Interrupt, IOError
drop_user username
raise
end
drop_user username
end

# make sure there is enough space to do an online schema change
def has_space_for_alter?(table_name, database_name=nil)
database_name ||= app_schema
table_size = dir_size("#{mysql_directory}/#{database_name}/#{table_name}.ibd")

table_size < mount_stats['available']
end

end
end
5 changes: 5 additions & 0 deletions plugins/online_schema_change/online_schema_change.rb
@@ -0,0 +1,5 @@
require 'json'


# load all the monkeypatches for other Jetpants classes
%w(pool db topology commandsuite).each {|mod| require "online_schema_change/#{mod}"}
105 changes: 105 additions & 0 deletions plugins/online_schema_change/pool.rb
@@ -0,0 +1,105 @@
# JetCollins monkeypatches to add Collins integration

module Jetpants
class Pool
collins_attr_accessor :online_schema_change

def alter_table(database, table, alter, dry_run=true, force=false)
database ||= app_schema
error = false

raise "not enough space to run alter table on #{table}" unless master.has_space_for_alter?(table, database)

unless(check_collins_for_alter)
raise "alter table already running on #{@name}"
end

max_threads_running = max_threads_running(30,1)
max_threads_running = 50 unless max_threads_running > 50

critical_threads_running = 2 * max_threads_running > 500 ? 2 * max_threads_running : 500

update_collins_for_alter(database, table, alter)

master.with_online_schema_change_user('pt-osc', database) do |password|

command = "pt-online-schema-change --nocheck-replication-filters --max-load='Threads_running:#{max_threads_running}' --critical-load='Threads_running:#{critical_threads_running}' --nodrop-old-table --retries=10 --set-vars='wait_timeout=100000' --dry-run --print --alter '#{alter}' D=#{database},t=#{table},h=#{master.ip},u=#{'pt-osc'},p=#{password}"

print "[#{@name.to_s.red}][#{Time.now.to_s.blue}]---------------------------------------------------------------------------------------\n"
print "[#{@name.to_s.red}][#{Time.now.to_s.blue}] #{command}\n"
print "[#{@name.to_s.red}][#{Time.now.to_s.blue}]---------------------------------------------------------------------------------------\n"

IO.popen command do |io|
io.each do |line|
print "[#{@name.to_s.red}][#{Time.now.to_s.blue}] #{line.gsub("\n","")}\n"
end
error = true if $?.to_i > 0
end

if !(dry_run || error)
continue = 'no'
unless force
continue = ask('Dry run complete would you like to continue?: (YES/no)')
end

if force || continue == 'YES'
command = "pt-online-schema-change --nocheck-replication-filters --max-load='Threads_running:#{max_threads_running}' --critical-load='Threads_running:#{critical_threads_running}' --nodrop-old-table --retries=10 --set-vars='wait_timeout=100000' --execute --print --alter '#{alter}' D=#{database},t=#{table},h=#{master.ip},u=#{'pt-osc'},p=#{password}"

print "[#{@name.to_s.red}][#{Time.now.to_s.blue}]---------------------------------------------------------------------------------------\n\n\n"
print "[#{@name.to_s.red}][#{Time.now.to_s.blue}] #{command}\n"
print "[#{@name.to_s.red}][#{Time.now.to_s.blue}]\n\n---------------------------------------------------------------------------------------\n\n\n"

IO.popen command do |io|
io.each do |line|
print "[#{@name.to_s.red}][#{Time.now.to_s.blue}] #{line.gsub("\n","")}\n"
end
error = true if $?.to_i > 0
end #end execute

end #end continue

end #end if ! dry run

end #end user grant block

clean_up_collins_for_alter

! error
end

# update collins for tracking alters, so there is only one running at a time
def update_collins_for_alter(database, table, alter)
self.collins_online_schema_change = JSON.pretty_generate({
'running' => true,
'started' => Time.now.to_i,
'database' => database,
'table' => table,
'alter' => alter
})

end

# check if a alter is already running
def check_collins_for_alter()
return true if self.collins_online_schema_change.empty?
meta = JSON.parse(self.collins_online_schema_change)
if(meta['running'])
return false
end
return true
end

# clean up collins after alter
def clean_up_collins_for_alter()
self.collins_online_schema_change = ''
end

# drop old table after an alter, this is because
# we do not drop the table after an alter
def drop_old_alter_table(database, table)
database ||= app_schema
master.mysql_root_cmd("USE #{database}; DROP TABLE _#{table}_old")
end

end
end
56 changes: 56 additions & 0 deletions plugins/online_schema_change/topology.rb
@@ -0,0 +1,56 @@
module Jetpants
class Topology

# run an alter table on all the sharded pools
# if you specify dry run it will run a dry run on all the shards
# otherwise it will run on the first shard and ask if you want to
# continue on the rest of the shards, 10 shards at a time
def alter_table_shards(database, table, alter, dry_run=true)
my_shards = shards.dup
first_shard = my_shards.shift
print "Will run on first shard and prompt for going past the dry run only on the first shard\n\n"
print "[#{Time.now.to_s.blue}] #{first_shard.pool.to_s}\n"
unless first_shard.alter_table(database, table, alter, dry_run, false)
print "First shard had an error, please check output\n"
return
end

continue = 'no'
continue = ask('First shard complete would you like to continue with the rest of the shards?: (YES/no)')
if continue == 'YES'
errors = []

my_shards.limited_concurrent_map(10) do |shard|
print "[#{Time.now.to_s.blue}] #{shard.pool.to_s}\n"
errors << shard unless shard.alter_table(database, table, alter, dry_run, true)
end

errors.each do |shard|
print "check #{shard.name} for errors during online schema change\n"
end
end
end

# will drop old table from the shards after a alter table
# this is because we do not drop the old table in the osc
# also I will do the first shard and ask if you want to
# continue, after that it will do each table serially
def drop_old_alter_table_shards(database, table)
my_shards = shards.dup
first_shard = my_shards.shift
print "Will run on first shard and prompt before going on to the rest\n\n"
print "[#{Time.now.to_s.blue}] #{first_shard.pool.to_s}\n"
first_shard.drop_old_alter_table(database, table)

continue = 'no'
continue = ask('First shard complete would you like to continue with the rest of the shards?: (YES/no)')
if continue == 'YES'
my_shards.each do |shard|
print "[#{Time.now.to_s.blue}] #{shard.pool.to_s}\n"
shard.drop_old_alter_table(database, table)
end
end
end

end
end

0 comments on commit af959a2

Please sign in to comment.