Permalink
Browse files

Make MRI 1.9 behave properly w.r.t. timeouts

This change modifies the Ruby connection class to use IO#read_nonblock
and IO#select to make it time out when it is supposed to time out.

The previous implementation relied on raw socket timeouts, which don't
work for MRI 1.9. The use of IO#select is a little slower, but fixes
read timeouts. Connect timeouts are also enforced using IO#select in
this patch.

Inspiration for the dual socket implementation was drawn from the
`dalli` gem by Mike Perham.
  • Loading branch information...
1 parent 5ed1f1b commit b7767031388d693fcecafbcb4a12a5dcc099433d @pietern pietern committed Mar 13, 2012
Showing with 177 additions and 64 deletions.
  1. +159 −43 lib/redis/connection/ruby.rb
  2. +18 −21 test/internals_test.rb
@@ -5,6 +5,161 @@
class Redis
module Connection
+ module SocketMixin
+
+ CRLF = "\r\n".freeze
+
+ def initialize(*args)
+ super(*args)
+
+ @timeout = nil
+ @buffer = ""
+ end
+
+ def timeout=(timeout)
+ if timeout && timeout > 0
+ @timeout = timeout
+ else
+ @timeout = nil
+ end
+ end
+
+ def read(nbytes)
+ result = _read_from_buffer(nbytes)
+
+ while result.bytesize < nbytes
+ result << _read_from_socket(nbytes - result.bytesize)
+ end
+
+ result
+ end
+
+ def gets
+ crlf = nil
+ start = 0
+
+ while (crlf = @buffer.index(CRLF, start)) == nil
+ start = @buffer.bytesize
+ @buffer << _read_from_socket(16384)
+ end
+
+ _read_from_buffer(crlf + 2)
+ end
+
+ def _read_from_buffer(nbytes)
+ @buffer.slice!(0, nbytes)
+ end
+
+ def _read_from_socket(nbytes)
+ begin
+ read_nonblock(nbytes)
+
+ rescue Errno::EWOULDBLOCK, Errno::EAGAIN
+ if IO.select([self], nil, nil, @timeout)
+ retry
+ else
+ raise Redis::TimeoutError
+ end
+ end
+
+ rescue EOFError
+ raise Errno::ECONNRESET
+ end
+ end
+
+ if defined?(RUBY_ENGINE) && RUBY_ENGINE == "jruby"
+
+ require "timeout"
+
+ class TCPSocket < ::TCPSocket
+
+ include SocketMixin
+
+ def self.connect(host, port, timeout)
+ Timeout.timeout(timeout) do
+ sock = new(host, port)
+ sock
+ end
+ rescue Timeout::Error
+ raise TimeoutError
+ end
+ end
+
+ class UNIXSocket < ::UNIXSocket
+
+ # This class doesn't include the mixin, because JRuby raises
+ # Errno::EAGAIN on #read_nonblock even when IO.select says it is
+ # readable. This behavior shows in 1.6.6 in both 1.8 and 1.9 mode.
+ # Therefore, fall back on the default Unix socket implementation,
+ # without timeouts.
+
+ def self.connect(path, timeout)
+ Timeout.timeout(timeout) do
+ sock = new(path)
+ sock
+ end
+ rescue Timeout::Error
+ raise TimeoutError
+ end
+ end
+
+ else
+
+ class TCPSocket < ::Socket
+
+ include SocketMixin
+
+ def self.connect(host, port, timeout)
+ # Limit lookup to IPv4, as Redis doesn't yet do IPv6...
+ addr = ::Socket.getaddrinfo(host, nil, Socket::AF_INET)
+ sock = new(::Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
+ sockaddr = ::Socket.pack_sockaddr_in(port, addr[0][3])
+
+ begin
+ sock.connect_nonblock(sockaddr)
+ rescue Errno::EINPROGRESS
+ if IO.select(nil, [sock], nil, timeout) == nil
+ raise TimeoutError
+ end
+
+ begin
+ sock.connect_nonblock(sockaddr)
+ rescue Errno::EISCONN
+ end
+ end
+
+ sock
+ end
+ end
+
+ class UNIXSocket < ::Socket
+
+ # This class doesn't include the mixin to keep its behavior in sync
+ # with the JRuby implementation.
+
+ def self.connect(path, timeout)
+ sock = new(::Socket::AF_UNIX, Socket::SOCK_STREAM, 0)
+ sockaddr = ::Socket.pack_sockaddr_un(path)
+
+ begin
+ sock.connect_nonblock(sockaddr)
+ rescue Errno::EINPROGRESS
+ if IO.select(nil, [sock], nil, timeout) == nil
+ raise TimeoutError
+ end
+
+ begin
+ sock.connect_nonblock(sockaddr)
+ rescue Errno::EISCONN
+ end
+ end
+
+ sock
+ end
+ end
+
+ end
+
class Ruby
include Redis::Connection::CommandHelper
@@ -23,20 +178,11 @@ def connected?
end
def connect(host, port, timeout)
- with_timeout(timeout) do
- @sock = TCPSocket.new(host, port)
- @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
- end
- rescue Timeout::Error
- raise TimeoutError
+ @sock = TCPSocket.connect(host, port, timeout)
end
def connect_unix(path, timeout)
- with_timeout(timeout) do
- @sock = UNIXSocket.new(path)
- end
- rescue Timeout::Error
- raise TimeoutError
+ @sock = UNIXSocket.connect(path, timeout)
end
def disconnect
@@ -47,15 +193,8 @@ def disconnect
end
def timeout=(timeout)
- tv_sec = Integer(timeout)
- tv_usec = Integer((timeout - tv_sec) * 1_000_000) # 0 - 999_999
-
- optval = [tv_sec, tv_usec].pack("l_2")
-
- begin
- @sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
- @sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
- rescue Errno::ENOPROTOOPT
+ if @sock.respond_to?(:timeout=)
+ @sock.timeout = timeout
end
end
@@ -113,29 +252,6 @@ def format_multi_bulk_reply(line)
Array.new(n) { read }
end
-
- protected
-
- begin
- require "system_timer"
-
- def with_timeout(seconds, &block)
- SystemTimer.timeout_after(seconds, &block)
- end
-
- rescue LoadError
- if ! defined?(RUBY_ENGINE)
- # MRI 1.8, all other interpreters define RUBY_ENGINE, JRuby and
- # Rubinius should have no issues with timeout.
- warn "WARNING: using the built-in Timeout class which is known to have issues when used for opening connections. Install the SystemTimer gem if you want to make sure the Redis client will not hang."
- end
-
- require "timeout"
-
- def with_timeout(seconds, &block)
- Timeout.timeout(seconds, &block)
- end
- end
end
end
end
View
@@ -129,24 +129,21 @@
end
end
-# if driver == :ruby || driver == :hiredis
-# # Using a mock server in a thread doesn't work here (possibly because blocking
-# # socket ops, raw socket timeouts and Ruby's thread scheduling don't mix).
-# test "Bubble EAGAIN without retrying" do
-# cmd = %{(sleep 0.3; echo "+PONG\r\n") | nc -l 6380}
-# IO.popen(cmd) do |_|
-# sleep 0.1 # Give nc a little time to start listening
-# redis = Redis.connect(:port => 6380, :timeout => 0.1)
-#
-# begin
-# assert_raise(Errno::EAGAIN) { redis.ping }
-# ensure
-# # Explicitly close connection so nc can quit
-# redis.client.disconnect
-#
-# # Make the reactor loop do a tick to really close
-# EM::Synchrony.sleep(0) if driver == :synchrony
-# end
-# end
-# end
-# end
+if driver == :ruby || driver == :hiredis
+ # Using a mock server in a thread doesn't work here (possibly because blocking
+ # socket ops, raw socket timeouts and Ruby's thread scheduling don't mix).
+ test "Bubble EAGAIN without retrying" do
+ cmd = %{(sleep 0.3; echo "+PONG\r\n") | nc -l 6380}
+ IO.popen(cmd) do |_|
+ sleep 0.1 # Give nc a little time to start listening
+ redis = Redis.connect(:port => 6380, :timeout => 0.1)
+
+ begin
+ assert_raise(Redis::TimeoutError) { redis.ping }
+ ensure
+ # Explicitly close connection so nc can quit
+ redis.client.disconnect
+ end
+ end
+ end
+end

0 comments on commit b776703

Please sign in to comment.