Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: Force redis disconnect if lock acquisition takes too long #8

Closed
wants to merge 11 commits into from
29 changes: 20 additions & 9 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@ jobs:
build:
runs-on: ubuntu-latest

env:
BUILD_TYPE: ${{ matrix.build_types }}

strategy:
fail-fast: false
matrix:
ruby:
- 2.6
- 2.7
build_types: ["LINT", "REDIS", "ACTIVERECORD"]

steps:
- uses: actions/checkout@v1
Expand All @@ -27,31 +32,37 @@ jobs:
ruby-version: ${{ matrix.ruby }}
architecture: 'x64'

- name: Setup redis
run: sudo apt-get install redis-server

- name: Setup bundler
run: gem install bundler

- name: Setup gems
run: bundle install

- name: Setup test app gems
run: cd spec/support/dummy_app && bundle install

- name: Rubocop
run: bundle exec rubocop
if: env.BUILD_TYPE == 'LINT'

- name: Setup redis
run: sudo apt-get install redis-server
if: env.BUILD_TYPE == 'REDIS'

- name: Redis specs
run: bin/rspec redis
if: env.BUILD_TYPE == 'REDIS'

- name: Setup test app gems
run: cd spec/support/dummy_app && bundle install
if: env.BUILD_TYPE == 'ACTIVERECORD'

- name: Setup postgres
run: |
make setup_pg
make start_pg
if: env.BUILD_TYPE == 'ACTIVERECORD'

- name: ActiveRecord specs
run: bin/rspec active_record

- name: Redis specs
run: bin/rspec redis
if: env.BUILD_TYPE == 'ACTIVERECORD'

publish:
if: contains(github.ref, 'refs/tags/v')
Expand Down
1 change: 1 addition & 0 deletions lib/rails_failover/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
end

require_relative "../redis/patches/client"
require_relative "../redis/patches/connection"
require_relative 'redis/connector'

module RailsFailover
Expand Down
37 changes: 27 additions & 10 deletions lib/rails_failover/redis/handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Handler
PRIMARY_ROLE_STATUS = "role:master"
PRIMARY_LOADED_STATUS = "loading:0"
VERIFY_FREQUENCY_BUFFER_PRECENT = 20
SOFT_DISCONNECT_TIMEOUT_SECONDS = 0.05

def initialize
@primaries_down = {}
Expand Down Expand Up @@ -177,21 +178,37 @@ def disconnect_clients(options)
key = options[:id]

mon_synchronize do
if to_disconnect = clients[key].dup
# Don't disconnect connections abruptly since it may lead to unexepcted
# errors. Is there a better way to do this without having to monkey patch
# the redis-rb gem heavily?
ObjectSpace.each_object(::Redis).each do |redis|
to_disconnect.each do |c|
if redis._client == c
redis.synchronize { |_client| _client.disconnect }
end
end
to_disconnect = clients[key].dup&.to_set
return if !to_disconnect

ObjectSpace.each_object(::Redis) do |redis|
redis_client = redis._client

# Sometimes Redis#_client is not a Redis::Client
# In this case, the real client can only be accessed via
# an instance variable
if redis_client.is_a? ::Redis::SubscribedClient
redis_client = redis.instance_variable_get(:@original_client)
end

next if !to_disconnect.include?(redis_client)

soft_disconnect_client(redis, redis_client)
end
end
end

def soft_disconnect_client(redis, client)
has_lock = Timeout::timeout(SOFT_DISCONNECT_TIMEOUT_SECONDS) { redis.mon_enter }
return if !client.connected?
client.disconnect
rescue Timeout::Error
return if !client.connected?
client.disconnect # Force disconnect
ensure
redis.mon_exit if has_lock
end

def logger
RailsFailover::Redis.logger
end
Expand Down
14 changes: 14 additions & 0 deletions lib/redis/patches/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true
class Redis
module Connection
class Ruby
def disconnect
@sock.shutdown
@sock.close
rescue
ensure
@sock = nil
end
end
end
end
6 changes: 4 additions & 2 deletions spec/helpers/redis_helper.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# frozen_string_literal: true

module RedisHelper
REDIS_PRIMARY_PORT = 6381
REDIS_REPLICA_PORT = 6382
def create_redis_client(opts = {})
Redis.new({
host: "127.0.0.1",
port: 6381,
port: REDIS_PRIMARY_PORT,
replica_host: "127.0.0.1",
replica_port: 6382,
replica_port: REDIS_REPLICA_PORT,
connector: RailsFailover::Redis::Connector
}.merge(opts))
end
Expand Down
85 changes: 85 additions & 0 deletions spec/integration/redis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,91 @@
system("make start_redis_primary")
end

it 'does not deadlock during disconnections' do
redis1 = create_redis_client
redis2 = create_redis_client

expect(redis1.ping).to eq("PONG")
expect(redis2.ping).to eq("PONG")

class <<redis1._client
attr_accessor :fake_longrunning_command
def call(commands)
sleep 0.1 while fake_longrunning_command
super
end
end
redis1._client.fake_longrunning_command = true

t1 = Thread.new do
redis1.ping
end

system("make stop_redis_primary")

t2 = Thread.new do
expect { redis2.ping }.to raise_error(Redis::CannotConnectError)
end

redis1._client.fake_longrunning_command = false

Timeout::timeout(5) do
t1.join
t2.join
end

# And now they should be failed over
expect(redis1.ping).to eq("PONG")
expect(redis2.ping).to eq("PONG")

expect(redis1.info("replication")["role"]).to eq("slave")
expect(redis2.info("replication")["role"]).to eq("slave")
rescue Timeout::Error
fail "Deadlock detected.\nThread 1: \n\n#{t1.backtrace.join("\n")}\n\nThread 2:\n\n#{t2.backtrace.join("\n")}\n\n"
ensure
system("make start_redis_primary")
t1.exit
t2.exit
end

it "handles long-running redis commands during fallback" do
simple_redis = create_redis_client
sub_redis = create_redis_client

expect(simple_redis.ping).to eq("PONG")
expect(sub_redis.ping).to eq("PONG")

# Infinitely subscribe
# This mimics things like message_bus
subscriber = Thread.new do
sub_redis.subscribe("mychannel") {}
rescue Redis::BaseConnectionError
retry
end

system("make stop_redis_primary")
sleep 0.03

expect(simple_redis.ping).to eq("PONG")
expect(simple_redis.connection[:port]).to eq(RedisHelper::REDIS_REPLICA_PORT)

expect(sub_redis.connected?).to eq(true)
expect(sub_redis.connection[:port]).to eq(RedisHelper::REDIS_REPLICA_PORT)

system("make start_redis_primary")
sleep 0.2

expect(simple_redis.ping).to eq("PONG")
expect(simple_redis.connection[:port]).to eq(RedisHelper::REDIS_PRIMARY_PORT)

expect(sub_redis.connected?).to eq(true)
expect(sub_redis.connection[:port]).to eq(RedisHelper::REDIS_PRIMARY_PORT)

ensure
system("make start_redis_primary")
subscriber&.exit
end

it 'handles failover and fallback for different host/port combinations' do
redis1 = create_redis_client
redis2 = create_redis_client(host: "0.0.0.0", replica_host: "0.0.0.0")
Expand Down