-
Notifications
You must be signed in to change notification settings - Fork 31
/
io.rb
52 lines (44 loc) · 1.34 KB
/
io.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
require "io/wait"
# this is the necessary magic to get a line-oriented protocol to
# respect a read timeout. unfortunately Ruby sockets do not provide any
# timeout support directly, delegating that to the IO reactor.
module Faktory
class TimeoutError < Timeout::Error; end
module ReadTimeout
CRLF = "\r\n"
BUFSIZE = 16_384
# Ruby's TCP sockets do not implement timeouts.
# We have to implement them ourselves by using
# nonblocking IO and IO.select.
def initialize(**opts)
@buf = +""
@timeout = opts[:timeout] || 5
end
def gets
while (crlf = @buf.index(CRLF)).nil?
@buf << read_timeout(BUFSIZE)
end
@buf.slice!(0, crlf + 2)
end
def read(nbytes)
result = @buf.slice!(0, nbytes)
result << read_timeout(nbytes - result.bytesize) while result.bytesize < nbytes
result
end
private
def read_timeout(nbytes)
loop do
result = @sock.read_nonblock(nbytes, exception: false)
if result == :wait_readable
raise Faktory::TimeoutError unless @sock.to_io.wait_readable(@timeout)
elsif result == :wait_writable
raise Faktory::TimeoutError unless @sock.to_io.wait_writeable(@timeout)
elsif result.nil?
raise Errno::ECONNRESET
else
return result
end
end
end
end
end