Skip to content

Commit

Permalink
Merge pull request #796 from vasi-stripe/vasi-readline
Browse files Browse the repository at this point in the history
Reduce allocations/syscalls for readline
  • Loading branch information
geemus committed Nov 8, 2022
2 parents 17ed919 + 7ac3568 commit 68452a7
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 17 deletions.
50 changes: 33 additions & 17 deletions lib/excon/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ def params=(new_params)

def_delegators(:@socket, :close)


def initialize(data = {})
@data = data
@nonblock = data[:nonblock]
@port ||= @data[:port] || 80
@read_buffer = String.new
@eof = false
@backend_eof = false
connect
end

Expand All @@ -61,9 +63,22 @@ def read(max_length = nil)

def readline
if @nonblock && RUBY_VERSION.to_f > 1.8_7
buffer = String.new
buffer << (read_nonblock(1) || raise(EOFError)) while buffer[-1] != "\n"
buffer
result = String.new
block = @read_buffer
@read_buffer = String.new

loop do
idx = block.index("\n")
if idx.nil?
result << block
else
add_to_read_buffer(block.slice!(idx+1, block.length))
result << block
break
end
block = read_nonblock(@data[:chunk_size]) || raise(EOFError)
end
result
else # nonblock/legacy
begin
Timeout.timeout(@data[:read_timeout]) do
Expand Down Expand Up @@ -172,20 +187,21 @@ def connect
end
end

def add_to_read_buffer(str)
@read_buffer << str
@eof = false
end

def read_nonblock(max_length)
begin
if max_length
until @read_buffer.length >= max_length
@read_buffer << @socket.read_nonblock(max_length - @read_buffer.length)
end
else
loop do
@read_buffer << @socket.read_nonblock(@data[:chunk_size])
end
while !@backend_eof && (!max_length || @read_buffer.length < max_length)
@read_buffer << @socket.read_nonblock(@data[:chunk_size])
end
rescue OpenSSL::SSL::SSLError => error
if error.message == 'read would block'
select_with_timeout(@socket, :read) && retry
if @read_buffer.empty?
select_with_timeout(@socket, :read) && retry
end
else
raise(error)
end
Expand All @@ -195,10 +211,10 @@ def read_nonblock(max_length)
select_with_timeout(@socket, :read) && retry
end
rescue EOFError
@eof = true
@backend_eof = true
end

if max_length
ret = if max_length
if @read_buffer.empty?
nil # EOF met at beginning
else
Expand All @@ -208,6 +224,8 @@ def read_nonblock(max_length)
# read until EOFError, so return everything
@read_buffer.slice!(0, @read_buffer.length)
end
@eof = @backend_eof && @read_buffer.empty?
ret
end

def read_block(max_length)
Expand All @@ -219,9 +237,7 @@ def read_block(max_length)
raise(error)
end
rescue *READ_RETRY_EXCEPTION_CLASSES
if @read_buffer.empty?
select_with_timeout(@socket, :read) && retry
end
select_with_timeout(@socket, :read) && retry
rescue EOFError
@eof = true
end
Expand Down
114 changes: 114 additions & 0 deletions tests/socket_tests.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# A fake Excon::Socket that allows passing in an arbitrary backend @socket
class MockExconSocket < Excon::Socket
attr_reader :read_count

def initialize(backend_socket, *args)
super(*args)
@read_count = 0
@socket = backend_socket
end

def read_nonblock(*args)
@read_count += 1
super
end

def connect
# pass
end

def select_with_timeout(*args)
# don't actually wait, assume we're ready
end
end

# A socket whose read_nonblock returns from an input list,
# and which counts the number of reads
class MockNonblockRubySocket
attr_reader :sequence

def initialize(nonblock_reads)
@nonblock_reads = nonblock_reads
@sequence = []
end

def read_nonblock(maxlen)
if @nonblock_reads.empty?
@sequence << 'EOF'
raise EOFError
elsif @nonblock_reads.first.empty?
@nonblock_reads.shift
if @nonblock_reads.empty?
@sequence << 'EOF'
raise EOFError
end
@sequence << 'EAGAIN'
raise Errno::EAGAIN
elsif
len = maxlen ? maxlen : @nonblock_reads.first.length
ret = @nonblock_reads.first.slice!(0, len)
@sequence << ret.length
ret
end
end

# Returns the results of `block`, as well as how many times we called read on the Excon
# socket, and the sequence of reads on the backend socket
def self.check_reads(nonblock_reads, socket_args, &block)
backend_socket = MockNonblockRubySocket.new(nonblock_reads)
socket = MockExconSocket.new(backend_socket, { nonblock: true }.merge(socket_args))
ret = block[socket]
[ret, socket.read_count, backend_socket.sequence]
end
end

Shindo.tests('socket') do
CHUNK_SIZES = [nil, 512]
CHUNK_SIZES.each do |chunk_size|
tests("chunk_size: #{chunk_size}") do
socket_args = {chunk_size: chunk_size}
tests('read_nonblock') do
tests('readline nonblock is efficient') do
returns(["one\n", 1, [8, 'EOF']]) do
MockNonblockRubySocket.check_reads(["one\ntwo\n"], socket_args) do |sock|
sock.readline
end
end
end

tests('readline nonblock works sequentially') do
returns([["one\n", "two\n"], 1, [8, 'EOF']]) do
MockNonblockRubySocket.check_reads(["one\ntwo\n"], socket_args) do |sock|
2.times.map { sock.readline }
end
end
end

tests('readline nonblock can handle partial reads') do
returns([["one\n", "two\n"], 2, [5, 'EAGAIN', 3, 'EOF']]) do
MockNonblockRubySocket.check_reads(["one\nt", "wo\n"], socket_args) do |sock|
2.times.map { sock.readline }
end
end
end

tests('readline nonblock before read') do
returns([["one\n", "two\n"], 2, [8, 'EOF']]) do
MockNonblockRubySocket.check_reads(["one\ntwo\n"], socket_args) do |sock|
[sock.readline, sock.read(6)]
end
end
end

tests('read_nonblock does not EOF early') do
returns([["one", "two"], 2, [3, 'EAGAIN', 3, 'EOF']]) do
# Data, EAGAIN, data, EOF
MockNonblockRubySocket.check_reads(["one", "two"], socket_args) do |sock|
[sock.read, sock.read]
end
end
end
end
end
end
end

0 comments on commit 68452a7

Please sign in to comment.