Permalink
Browse files

Merge pull request #192 from redis/socket

IO#read_nonblock and IO#select
  • Loading branch information...
2 parents 260fe2a + 0271777 commit 6810f18ae99540ce25fdab82e17cf0acf06e5d57 @djanowski djanowski committed Mar 14, 2012
Showing with 199 additions and 94 deletions.
  1. +7 −17 lib/redis/client.rb
  2. +11 −6 lib/redis/connection/hiredis.rb
  3. +159 −46 lib/redis/connection/ruby.rb
  4. +4 −4 lib/redis/connection/synchrony.rb
  5. +18 −21 test/internals_test.rb
View
@@ -169,7 +169,7 @@ def reconnect
def io
yield
- rescue Errno::EAGAIN
+ rescue TimeoutError
raise TimeoutError, "Connection timed out"
rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EBADF, Errno::EINVAL => e
raise ConnectionError, "Connection lost (%s)" % [e.class.name.split("::").last]
@@ -191,10 +191,10 @@ def without_socket_timeout
connect unless connected?
begin
- self.timeout = 0
+ connection.timeout = 0
yield
ensure
- self.timeout = @timeout if connected?
+ connection.timeout = @timeout if connected?
end
end
@@ -220,41 +220,31 @@ def logging(commands)
begin
commands.each do |name, *args|
- @logger.debug("Redis >> #{name.to_s.upcase} #{args.join(" ")}")
+ @logger.debug("Redis >> #{name.to_s.upcase} #{args.map(&:to_s).join(" ")}")
end
t1 = Time.now
yield
ensure
- @logger.debug("Redis >> %0.2fms" % ((Time.now - t1) * 1000))
+ @logger.debug("Redis >> %0.2fms" % ((Time.now - t1) * 1000)) if t1
end
end
def establish_connection
- # Need timeout in usecs, like socket timeout.
- timeout = Integer(@timeout * 1_000_000)
-
if @path
connection.connect_unix(@path, timeout)
else
connection.connect(@host, @port, timeout)
end
- # If the timeout is set we set the low level socket options in order
- # to make sure a blocking read will return after the specified number
- # of seconds. This hack is from memcached ruby client.
- self.timeout = @timeout
+ connection.timeout = @timeout
- rescue Timeout::Error
+ rescue TimeoutError
raise CannotConnectError, "Timed out connecting to Redis on #{location}"
rescue Errno::ECONNREFUSED
raise CannotConnectError, "Error connecting to Redis on #{location} (ECONNREFUSED)"
end
- def timeout=(timeout)
- connection.timeout = Integer(timeout * 1_000_000)
- end
-
def ensure_connected
tries = 0
@@ -14,20 +14,21 @@ def connected?
@connection.connected?
end
- def timeout=(usecs)
- @connection.timeout = usecs
+ def timeout=(timeout)
+ # Hiredis works with microsecond timeouts
+ @connection.timeout = Integer(timeout * 1_000_000)
end
def connect(host, port, timeout)
- @connection.connect(host, port, timeout)
+ @connection.connect(host, port, Integer(timeout * 1_000_000))
rescue Errno::ETIMEDOUT
- raise Timeout::Error
+ raise TimeoutError
end
def connect_unix(path, timeout)
- @connection.connect_unix(path, timeout)
+ @connection.connect_unix(path, Integer(timeout * 1_000_000))
rescue Errno::ETIMEDOUT
- raise Timeout::Error
+ raise TimeoutError
end
def disconnect
@@ -36,12 +37,16 @@ def disconnect
def write(command)
@connection.write(command.flatten(1))
+ rescue Errno::EAGAIN
+ raise TimeoutError
end
def read
reply = @connection.read
reply = CommandError.new(reply.message) if reply.is_a?(RuntimeError)
reply
+ rescue Errno::EAGAIN
+ raise TimeoutError
rescue RuntimeError => err
raise ProtocolError.new(err.message)
end
@@ -5,6 +5,155 @@
class Redis
module Connection
+ module SocketMixin
+
+ CRLF = "\r\n".freeze
+
+ def initialize(*args)
+ super(*args)
+
+ @timeout = nil
+ @buffer = ""
+ end
+
+ def timeout=(timeout)
+ if timeout && timeout > 0
+ @timeout = timeout
+ else
+ @timeout = nil
+ end
+ end
+
+ def read(nbytes)
+ result = @buffer.slice!(0, nbytes)
+
+ while result.bytesize < nbytes
+ result << _read_from_socket(nbytes - result.bytesize)
+ end
+
+ result
+ end
+
+ def gets
+ crlf = nil
+
+ while (crlf = @buffer.index(CRLF)) == nil
+ @buffer << _read_from_socket(1024)
+ end
+
+ @buffer.slice!(0, crlf + CRLF.bytesize)
+ end
+
+ def _read_from_socket(nbytes)
+ begin
+ read_nonblock(nbytes)
+
+ rescue Errno::EWOULDBLOCK, Errno::EAGAIN
+ if IO.select([self], nil, nil, @timeout)
+ retry
+ else
+ raise Redis::TimeoutError
+ end
+ end
+
+ rescue EOFError
+ raise Errno::ECONNRESET
+ end
+ end
+
+ if defined?(RUBY_ENGINE) && RUBY_ENGINE == "jruby"
+
+ require "timeout"
+
+ class TCPSocket < ::TCPSocket
+
+ include SocketMixin
+
+ def self.connect(host, port, timeout)
+ Timeout.timeout(timeout) do
+ sock = new(host, port)
+ sock
+ end
+ rescue Timeout::Error
+ raise TimeoutError
+ end
+ end
+
+ class UNIXSocket < ::UNIXSocket
+
+ # This class doesn't include the mixin, because JRuby raises
+ # Errno::EAGAIN on #read_nonblock even when IO.select says it is
+ # readable. This behavior shows in 1.6.6 in both 1.8 and 1.9 mode.
+ # Therefore, fall back on the default Unix socket implementation,
+ # without timeouts.
+
+ def self.connect(path, timeout)
+ Timeout.timeout(timeout) do
+ sock = new(path)
+ sock
+ end
+ rescue Timeout::Error
+ raise TimeoutError
+ end
+ end
+
+ else
+
+ class TCPSocket < ::Socket
+
+ include SocketMixin
+
+ def self.connect(host, port, timeout)
+ # Limit lookup to IPv4, as Redis doesn't yet do IPv6...
+ addr = ::Socket.getaddrinfo(host, nil, Socket::AF_INET)
+ sock = new(::Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
+ sockaddr = ::Socket.pack_sockaddr_in(port, addr[0][3])
+
+ begin
+ sock.connect_nonblock(sockaddr)
+ rescue Errno::EINPROGRESS
+ if IO.select(nil, [sock], nil, timeout) == nil
+ raise TimeoutError
+ end
+
+ begin
+ sock.connect_nonblock(sockaddr)
+ rescue Errno::EISCONN
+ end
+ end
+
+ sock
+ end
+ end
+
+ class UNIXSocket < ::Socket
+
+ # This class doesn't include the mixin to keep its behavior in sync
+ # with the JRuby implementation.
+
+ def self.connect(path, timeout)
+ sock = new(::Socket::AF_UNIX, Socket::SOCK_STREAM, 0)
+ sockaddr = ::Socket.pack_sockaddr_un(path)
+
+ begin
+ sock.connect_nonblock(sockaddr)
+ rescue Errno::EINPROGRESS
+ if IO.select(nil, [sock], nil, timeout) == nil
+ raise TimeoutError
+ end
+
+ begin
+ sock.connect_nonblock(sockaddr)
+ rescue Errno::EISCONN
+ end
+ end
+
+ sock
+ end
+ end
+
+ end
+
class Ruby
include Redis::Connection::CommandHelper
@@ -23,16 +172,11 @@ def connected?
end
def connect(host, port, timeout)
- with_timeout(timeout.to_f / 1_000_000) do
- @sock = TCPSocket.new(host, port)
- @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
- end
+ @sock = TCPSocket.connect(host, port, timeout)
end
def connect_unix(path, timeout)
- with_timeout(timeout.to_f / 1_000_000) do
- @sock = UNIXSocket.new(path)
- end
+ @sock = UNIXSocket.connect(path, timeout)
end
def disconnect
@@ -42,16 +186,9 @@ def disconnect
@sock = nil
end
- def timeout=(usecs)
- secs = Integer(usecs / 1_000_000)
- usecs = Integer(usecs - (secs * 1_000_000)) # 0 - 999_999
-
- optval = [secs, usecs].pack("l_2")
-
- begin
- @sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
- @sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
- rescue Errno::ENOPROTOOPT
+ def timeout=(timeout)
+ if @sock.respond_to?(:timeout=)
+ @sock.timeout = timeout
end
end
@@ -60,13 +197,12 @@ def write(command)
end
def read
- # We read the first byte using read() mainly because gets() is
- # immune to raw socket timeouts.
- reply_type = @sock.read(1)
-
- raise Errno::ECONNRESET unless reply_type
+ line = @sock.gets
+ reply_type = line.slice!(0, 1)
+ format_reply(reply_type, line)
- format_reply(reply_type, @sock.gets)
+ rescue Errno::EAGAIN
+ raise TimeoutError
end
def format_reply(reply_type, line)
@@ -106,29 +242,6 @@ def format_multi_bulk_reply(line)
Array.new(n) { read }
end
-
- protected
-
- begin
- require "system_timer"
-
- def with_timeout(seconds, &block)
- SystemTimer.timeout_after(seconds, &block)
- end
-
- rescue LoadError
- if ! defined?(RUBY_ENGINE)
- # MRI 1.8, all other interpreters define RUBY_ENGINE, JRuby and
- # Rubinius should have no issues with timeout.
- warn "WARNING: using the built-in Timeout class which is known to have issues when used for opening connections. Install the SystemTimer gem if you want to make sure the Redis client will not hang."
- end
-
- require "timeout"
-
- def with_timeout(seconds, &block)
- Timeout.timeout(seconds, &block)
- end
- end
end
end
end
@@ -61,21 +61,21 @@ class Synchrony
include Redis::Connection::CommandHelper
def initialize
- @timeout = 5_000_000
+ @timeout = 5.0
@connection = nil
end
def connected?
@connection && @connection.connected?
end
- def timeout=(usecs)
- @timeout = usecs
+ def timeout=(timeout)
+ @timeout = timeout
end
def connect(host, port, timeout)
conn = EventMachine.connect(host, port, RedisClient) do |c|
- c.pending_connect_timeout = [Float(timeout / 1_000_000), 0.1].max
+ c.pending_connect_timeout = [timeout, 0.1].max
end
setup_connect_callbacks(conn, Fiber.current)
Oops, something went wrong.

0 comments on commit 6810f18

Please sign in to comment.