Skip to content

Commit

Permalink
Don't reconnect nor raise on SHUTDOWN in pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
pietern committed Dec 6, 2011
1 parent 2e349ff commit 158b94a
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 5 deletions.
31 changes: 26 additions & 5 deletions lib/redis/client.rb
Expand Up @@ -88,11 +88,32 @@ def call_loop(*args)
end

def call_pipeline(pipeline, options = {})
call_pipelined(pipeline.commands, options).each_with_index.map do |reply, i|
if block = pipeline.blocks[i]
block.call(reply)
else
reply
without_reconnect_wrapper = lambda do |&blk| blk.call end
without_reconnect_wrapper = lambda do |&blk|
without_reconnect(&blk)
end if pipeline.without_reconnect?

shutdown_wrapper = lambda do |&blk| blk.call end
shutdown_wrapper = lambda do |&blk|
begin
blk.call
rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EBADF, Errno::EINVAL
# Assume the pipeline was sent in one piece, but execution of
# SHUTDOWN caused none of the replies for commands that were executed
# prior to it from coming back around.
[]
end
end if pipeline.shutdown?

without_reconnect_wrapper.call do
shutdown_wrapper.call do
call_pipelined(pipeline.commands, options).each_with_index.map do |reply, i|
if block = pipeline.blocks[i]
block.call(reply)
else
reply
end
end
end
end
end
Expand Down
19 changes: 19 additions & 0 deletions lib/redis/pipeline.rb
Expand Up @@ -4,10 +4,20 @@ class Pipeline
attr :blocks

def initialize
@without_reconnect = false
@shutdown = false
@commands = []
@blocks = []
end

def without_reconnect?
@without_reconnect
end

def shutdown?
@shutdown
end

# Starting with 2.2.1, assume that this method is called with a single
# array argument. Check its size for backwards compat.
def call(*args, &block)
Expand All @@ -17,15 +27,24 @@ def call(*args, &block)
command = args
end

# A pipeline that contains a shutdown should not raise ECONNRESET when
# the connection is gone.
@shutdown = true if command.first == :shutdown
@commands << command
@blocks << block
nil
end

def call_pipeline(pipeline, options = {})
@shutdown = true if pipeline.shutdown?
@commands.concat(pipeline.commands)
@blocks.concat(pipeline.blocks)
nil
end

def without_reconnect(&block)
@without_reconnect = true
yield
end
end
end
90 changes: 90 additions & 0 deletions test/connection_handling_test.rb
Expand Up @@ -79,6 +79,96 @@
end
end

test "SHUTDOWN from pipeline" do
commands = {
:shutdown => lambda { :exit }
}

redis_mock(commands) do
redis = Redis.new(OPTIONS.merge(:port => 6380))

result = redis.pipelined do
redis.shutdown
end

assert [] == result
assert !redis.client.connected?
end
end

test "SHUTDOWN with error from pipeline" do
connections = 0
commands = {
:select => lambda { |*_| connections += 1; "+OK\r\n" },
:connections => lambda { ":#{connections}\r\n" },
:shutdown => lambda { "-ERR could not shutdown\r\n" }
}

redis_mock(commands) do
redis = Redis.new(OPTIONS.merge(:port => 6380))

connections = redis.connections

# SHUTDOWN replies with an error: test that it gets raised
assert_raise Redis::Error do
redis.pipelined do
redis.shutdown
end
end

# The connection should remain in tact
assert connections == redis.connections
end
end

test "SHUTDOWN from MULTI/EXEC" do
commands = {
:multi => lambda { "+OK\r\n" },
:shutdown => lambda { "+QUEUED\r\n" },
:exec => lambda { :exit }
}

redis_mock(commands) do
redis = Redis.new(OPTIONS.merge(:port => 6380))

result = redis.multi do
redis.shutdown
end

assert nil == result
assert !redis.client.connected?
end
end

test "SHUTDOWN with error from MULTI/EXEC" do
connections = 0
commands = {
:select => lambda { |*_| connections += 1; "+OK\r\n" },
:connections => lambda { ":#{connections}\r\n" },
:multi => lambda { "+OK\r\n" },
:shutdown => lambda { "+QUEUED\r\n" },
:exec => lambda { "*1\r\n-ERR could not shutdown\r\n" }
}

redis_mock(commands) do
redis = Redis.new(OPTIONS.merge(:port => 6380))

connections = redis.connections

# SHUTDOWN replies with an error: test that it gets returned
result = redis.multi do
redis.shutdown
end

# We should test for Redis::Error here, but hiredis doesn't yet do custom error classes.
assert result[0].is_a?(StandardError)
assert result[0].message.match /could not shutdown/i

# The connection should remain in tact
assert connections == redis.connections
end
end

test "SLAVEOF" do
redis_mock(:slaveof => lambda { |host, port| "+SLAVEOF #{host} #{port}" }) do
redis = Redis.new(OPTIONS.merge(:port => 6380))
Expand Down

0 comments on commit 158b94a

Please sign in to comment.