Skip to content

Commit

Permalink
Merge pull request #647 from djberg96/requests_in_batches
Browse files Browse the repository at this point in the history
Add Connection#requests_in_batches method
  • Loading branch information
geemus committed Oct 18, 2017
2 parents 669cc67 + 5c14216 commit c92f240
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 0 deletions.
7 changes: 7 additions & 0 deletions README.md
Expand Up @@ -215,6 +215,13 @@ connection.requests([{:method => :get}, {:method => :get}])
By default, each call to `requests` will use a separate persistent socket connection. To make multiple `requests` calls
using a single persistent connection, set `:persistent => true` when establishing the connection.

For large numbers of simultaneous requests please consider using the `batch_requests` method. This will automatically slice up the requests into batches based on the file descriptor limit of your operating system. The results are the same as the `requests` method, but using this method can help prevent timeout errors.

```ruby
large_array_of_requests = [{:method => :get, :path => 'some_path'}, { ... }] # Hundreds of items
connection.batch_requests(large_array_of_requests)
```

## Streaming Responses

You can stream responses by passing a block that will receive each chunk.
Expand Down
15 changes: 15 additions & 0 deletions lib/excon/connection.rb
Expand Up @@ -301,6 +301,21 @@ def requests(pipeline_params)
responses
end

# Sends the supplied requests to the destination host using pipelining in
# batches of @limit [Numeric] requests. This is your soft file descriptor
# limit by default, typically 256.
# @pipeline_params [Array<Hash>] pipeline_params An array of one or more optional params, override defaults set in Connection.new, see #request for details
def batch_requests(pipeline_params, limit = nil)
limit ||= Process.respond_to?(:getrlimit) ? Process.getrlimit(:NOFILE).first : 256
responses = []

pipeline_params.each_slice(limit) do |params|
responses.concat(requests(params))
end

responses
end

def reset
if old_socket = sockets.delete(@socket_key)
old_socket.close rescue nil
Expand Down
133 changes: 133 additions & 0 deletions tests/batch_requests.rb
@@ -0,0 +1,133 @@
require 'shindo'

Shindo.tests('Batch Requests') do
with_server('good') do
tests('with batch request size 2') do
returns(%w{ 1 2 1 2 }, 'batch request size 2') do
connection = Excon.new('http://127.0.0.1:9292')

ret = []
ret << connection.batch_requests([
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'}
], 2).map(&:body)

ret.flatten
end
end

tests('peristent with batch request size 2') do
returns(%w{ 1 2 3 4 }, 'persistent batch request size 2') do
connection = Excon.new('http://127.0.0.1:9292', :persistent => true)

ret = []
ret << connection.batch_requests([
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'}
], 2).map(&:body)

ret.flatten
end
end

tests('with batch request size 3') do
returns(%w{ 1 2 3 1 }, 'batch request size 3') do
connection = Excon.new('http://127.0.0.1:9292')

ret = []
ret << connection.batch_requests([
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'}
], 3).map(&:body)

ret.flatten
end
end

tests('persistent with batch request size 3') do
returns(%w{ 1 2 3 4 }, 'persistent batch request size 3') do
connection = Excon.new('http://127.0.0.1:9292', :persistent => true)

ret = []
ret << connection.batch_requests([
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'}
], 3).map(&:body)

ret.flatten
end
end

tests('with batch request size 4') do
returns(%w{ 1 2 3 4 }, 'batch request size 4') do
connection = Excon.new('http://127.0.0.1:9292')

ret = []
ret << connection.batch_requests([
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'}
], 4).map(&:body)

ret.flatten
end
end

tests('persistent with batch request size 4') do
returns(%w{ 1 2 3 4 }, 'persistent batch request size 4') do
connection = Excon.new('http://127.0.0.1:9292', :persistent => true)

ret = []
ret << connection.batch_requests([
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'}
], 4).map(&:body)

ret.flatten
end
end

tests('with batch request size 8') do
returns(%w{ 1 2 3 4 }, 'batch request size 8') do
connection = Excon.new('http://127.0.0.1:9292')

ret = []
ret << connection.batch_requests([
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'}
], 8).map(&:body)

ret.flatten
end
end

tests('persistent with batch request size 8') do
returns(%w{ 1 2 3 4 }, 'persistent batch request size 8') do
connection = Excon.new('http://127.0.0.1:9292', :persistent => true)

ret = []
ret << connection.batch_requests([
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'},
{:method => :get, :path => '/echo/request_count'}
], 8).map(&:body)

ret.flatten
end
end
end
end

0 comments on commit c92f240

Please sign in to comment.