Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PATH
remote: .
specs:
lhm (3.0.0)
retriable

GEM
remote: https://rubygems.org/
Expand Down Expand Up @@ -51,6 +52,7 @@ GEM
http-cookie (>= 1.0.2, < 2.0)
mime-types (>= 1.16, < 4.0)
netrc (~> 0.8)
retriable (3.1.2)
simplecov (0.16.1)
docile (~> 1.1)
json (>= 1.8, < 3)
Expand Down
4 changes: 4 additions & 0 deletions lhm.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ Gem::Specification.new do |s|
s.require_paths = ['lib']
s.executables = ['lhm-kill-queue']

s.required_ruby_version = '>= 2.3.0'

s.add_dependency 'retriable'

s.add_development_dependency 'minitest'
s.add_development_dependency 'mocha'
s.add_development_dependency 'rake'
Expand Down
33 changes: 9 additions & 24 deletions lib/lhm/atomic_switcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

require 'lhm/command'
require 'lhm/migration'
require 'lhm/sql_helper'
require 'lhm/retry_helper'

module Lhm
# Switches origin with destination table using an atomic rename.
Expand All @@ -13,20 +13,19 @@ module Lhm
# Lhm::SqlHelper.supports_atomic_switch?.
class AtomicSwitcher
include Command
RETRY_SLEEP_TIME = 10
MAX_RETRIES = 600
include RetryHelper

attr_reader :connection, :retries
attr_writer :max_retries, :retry_sleep_time
attr_reader :connection

def initialize(migration, connection = nil)
def initialize(migration, connection = nil, options = {})
@migration = migration
@connection = connection
@origin = migration.origin
@destination = migration.destination
@retries = 0
@max_retries = MAX_RETRIES
@retry_sleep_time = RETRY_SLEEP_TIME
configure_retry({
tries: options.dig(:retriable, :tries) || 600,
base_interval: options.dig(:retriable, :base_interval) || 10
})
end

def atomic_switch
Expand All @@ -44,21 +43,7 @@ def validate
private

def execute
begin
@connection.execute(SqlHelper.tagged(atomic_switch))
rescue ActiveRecord::StatementInvalid => error
if should_retry_exception?(error) && (@retries += 1) < @max_retries
sleep(@retry_sleep_time)
Lhm.logger.warn "Retrying sql=#{atomic_switch} error=#{error} retries=#{@retries}"
retry
else
raise
end
end
end

def should_retry_exception?(error)
error.message =~ /Lock wait timeout exceeded/
execute_with_retries(atomic_switch)
end
end
end
31 changes: 8 additions & 23 deletions lib/lhm/entangler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,27 @@

require 'lhm/command'
require 'lhm/sql_helper'
require 'lhm/retry_helper'

module Lhm
class Entangler
include Command
include SqlHelper
include RetryHelper

attr_reader :connection

LOCK_WAIT_RETRIES = 10
RETRY_WAIT = 1

# Creates entanglement between two tables. All creates, updates and deletes
# to origin will be repeated on the destination table.
def initialize(migration, connection = nil, options = {})
@intersection = migration.intersection
@origin = migration.origin
@destination = migration.destination
@connection = connection
@max_retries = options[:lock_wait_retries] || LOCK_WAIT_RETRIES
@sleep_duration = options[:retry_wait] || RETRY_WAIT
configure_retry({
tries: options.dig(:retriable, :tries) || 10,
base_interval: options.dig(:retriable, :base_interval) || 1
})
end

def entangle
Expand Down Expand Up @@ -88,13 +89,13 @@ def validate

def before
entangle.each do |stmt|
with_retry { @connection.execute(tagged(stmt)) }
execute_with_retries(stmt)
end
end

def after
untangle.each do |stmt|
with_retry { @connection.execute(tagged(stmt)) }
execute_with_retries(stmt)
end
end

Expand All @@ -107,21 +108,5 @@ def revert
def strip(sql)
sql.strip.gsub(/\n */, "\n")
end

def with_retry
begin
retries ||= 0
yield
rescue StandardError => e
if e.message =~ /Lock wait timeout exceeded/ && retries < @max_retries
retries += 1
Lhm.logger.info("#{e} - retrying #{retries} time(s)")
Kernel.sleep @sleep_duration
retry
else
raise e
end
end
end
end
end
43 changes: 43 additions & 0 deletions lib/lhm/retry_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
require 'retriable'
require 'lhm/sql_helper'

module Lhm
# RetryHelper standardizes the interface for retry behavior in components like
# Entangler and AtomicSwitcher.
#
# To retry some behavior, use `execute_with_retries(statement)`
# which assumes `@connection` is available.
#
# `execute_with_retries` expects the caller to invoke `configure_retry` first, providing:
# * `tries` as an integer
# * `base_interval` as an integer
#
# For a full list of configuration options see https://github.com/kamui/retriable
module RetryHelper
def execute_with_retries(statement)
Retriable.retriable(retry_config) do
@connection.execute(SqlHelper.tagged(statement))
end
end

def configure_retry(options)
@retry_config = DEFAULT_RETRY_CONFIG.merge(options)
end

attr_reader :retry_config

private

DEFAULT_RETRY_CONFIG = {
on: {
StandardError => [/Lock wait timeout exceeded/]
},
multiplier: 1.5, # each successive interval grows by this factor
rand_factor: 0.25, # percentage to randomize the next retry interval time
max_elapsed_time: Float::INFINITY, # max total time in seconds that code is allowed to keep being retried
on_retry: Proc.new do |exception, try, elapsed_time, next_interval|
Lhm.logger.info("#{exception.class}: '#{exception.message}' - #{try} tries in #{elapsed_time} seconds and #{next_interval} seconds until the next try.")
end
}.freeze
end
end
14 changes: 7 additions & 7 deletions spec/integration/atomic_switcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@
connection.stubs(:data_source_exists?).returns(true)
connection.stubs(:execute).raises(ActiveRecord::StatementInvalid, 'Lock wait timeout exceeded; try restarting transaction.').then.returns(true)

switcher = Lhm::AtomicSwitcher.new(@migration, connection)
switcher.retry_sleep_time = 0
switcher = Lhm::AtomicSwitcher.new(@migration, connection, retriable: {base_interval: 0})

assert switcher.run
assert_equal(2, @logs.string.split("\n").length)
assert @logs.string.split("\n")[1].include?("error=Lock wait timeout exceeded; try restarting transaction. retries=1")
actual_message = @logs.string.split("\n")[1]
expected_message = "Lock wait timeout exceeded; try restarting transaction."

assert actual_message.include?(expected_message), "Expected '#{actual_message}' to include '#{expected_message}'"
end

it 'should give up on lock wait timeouts after MAX_RETRIES' do
it 'should give up on lock wait timeouts after a configured number of tries' do
connection = mock()
connection.stubs(:data_source_exists?).returns(true)
connection.stubs(:execute).twice.raises(ActiveRecord::StatementInvalid, 'Lock wait timeout exceeded; try restarting transaction.')

switcher = Lhm::AtomicSwitcher.new(@migration, connection)
switcher.max_retries = 2
switcher.retry_sleep_time = 0
switcher = Lhm::AtomicSwitcher.new(@migration, connection, retriable: {tries: 2, base_interval: 0})

assert_raises(ActiveRecord::StatementInvalid) { switcher.run }
end
Expand Down
28 changes: 8 additions & 20 deletions spec/unit/entangler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,48 +60,36 @@
end

it 'should retry trigger creation when it hits a lock wait timeout' do
expected_calls = Lhm::Entangler::LOCK_WAIT_RETRIES + 1
connection = mock()
@entangler = Lhm::Entangler.new(@migration, connection, retriable: {base_interval: 0})
expected_calls = @entangler.retry_config[:tries]
connection.expects(:execute).times(expected_calls).raises(Mysql2::Error, 'Lock wait timeout exceeded; try restarting transaction')

@entangler.instance_variable_set(:@connection, connection)
assert_raises(Mysql2::Error) { @entangler.before }
end

it 'should not retry trigger creation with other mysql errors' do
connection = mock()
connection.expects(:execute).once.raises(Mysql2::Error, 'The MySQL server is running with the --read-only option so it cannot execute this statement.')

@entangler.instance_variable_set(:@connection, connection)
@entangler = Lhm::Entangler.new(@migration, connection, retriable: {base_interval: 0})
assert_raises(Mysql2::Error) { @entangler.before }
end

it 'should succesfully finish after retrying' do
connection = mock()
connection.stubs(:execute).raises(Mysql2::Error, 'Lock wait timeout exceeded; try restarting transaction').then.returns(true)
@entangler.instance_variable_set(:@connection, connection)

Kernel.expects(:sleep).once
@entangler = Lhm::Entangler.new(@migration, connection, retriable: {base_interval: 0})

assert @entangler.before
end

it 'should retry as many times as specified by lock_wait_retries' do
connection = mock()
connection.expects(:execute).times(6).raises(Mysql2::Error, 'Lock wait timeout exceeded; try restarting transaction')
entangler = Lhm::Entangler.new(@migration, connection, {lock_wait_retries: 5})

assert_raises(Mysql2::Error) { entangler.before }
end

it 'should sleep as long as specified by retry_wait' do
it 'should retry as many times as specified by configuration' do
connection = mock()
connection.stubs(:execute).raises(Mysql2::Error, 'Lock wait timeout exceeded; try restarting transaction').then.returns(true)
entangler = Lhm::Entangler.new(@migration, connection, {retry_wait: 3})
connection.expects(:execute).times(5).raises(Mysql2::Error, 'Lock wait timeout exceeded; try restarting transaction')
@entangler = Lhm::Entangler.new(@migration, connection, retriable: {tries: 5, base_interval: 0})

Kernel.expects(:sleep).with(3)

assert entangler.before
assert_raises(Mysql2::Error) { @entangler.before }
end

describe 'super long table names' do
Expand Down