Skip to content

Commit

Permalink
Merge pull request #184 from zanker/reusable-conns
Browse files Browse the repository at this point in the history
Added connection reuse
  • Loading branch information
tarcieri committed Mar 26, 2015
2 parents bf3e0b3 + 9d163bd commit 4bf5a21
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 87 deletions.
1 change: 1 addition & 0 deletions lib/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "http/errors"
require "http/chainable"
require "http/client"
require "http/connection"
require "http/options"
require "http/request"
require "http/request/writer"
Expand Down
7 changes: 7 additions & 0 deletions lib/http/chainable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ def request(verb, uri, options = {})
branch(options).request verb, uri
end

# Flag as persistent
# @param [String] host
# @raise [Request::Error] if Host is invalid
def persistent(host)
branch default_options.with_persistent host
end

# Make a request through an HTTP proxy
# @param [Array] proxy
# @raise [Request::Error] if HTTP proxy is invalid
Expand Down
121 changes: 49 additions & 72 deletions lib/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ module HTTP
class Client
include Chainable

# Input buffer size
BUFFER_SIZE = 16_384
CONNECTION = "Connection".freeze
KEEP_ALIVE = "Keep-Alive".freeze
CLOSE = "close".freeze

attr_reader :default_options

def initialize(default_options = {})
@default_options = HTTP::Options.new(default_options)
@parser = HTTP::Response::Parser.new
@socket = nil
end

# Make an HTTP request
Expand All @@ -28,6 +27,13 @@ def request(verb, uri, opts = {})
proxy = opts.proxy
body = make_request_body(opts, headers)

# Tell the server to keep the conn open
if default_options.persistent?
headers[CONNECTION] = KEEP_ALIVE
else
headers[CONNECTION] = CLOSE
end

req = HTTP::Request.new(verb, uri, headers, proxy, body)
res = perform req, opts

Expand All @@ -48,67 +54,55 @@ def perform(req, options)
end

def make_request(req, options)
# finish previous response if client was re-used
# TODO: this is pretty wrong, as socket shoud be part of response
# connection, so that re-use of client will not break multiple
# chunked responses
finish_response

uri = req.uri

# TODO: keep-alive support
@socket = options[:socket_class].open(req.socket_host, req.socket_port)
@socket = start_tls(@socket, uri.host, options) if uri.is_a?(URI::HTTPS) && !req.using_proxy?

req.stream @socket
verify_connection!(req.uri)

read_headers!
@connection ||= HTTP::Connection.new(req, options)
@connection.send_request(req)
@connection.read_headers!

body = Response::Body.new(self)
res = Response.new(@parser.status_code, @parser.http_version, @parser.headers, body, uri)
res = Response.new(
@connection.parser.status_code,
@connection.parser.http_version,
@connection.parser.headers,
Response::Body.new(@connection),
req.uri
)

finish_response if :head == req.verb
@connection.finish_response if req.verb == :head

res
end

# Read a chunk of the body
#
# @return [String] data chunk
# @return [Nil] when no more data left
def readpartial(size = BUFFER_SIZE)
return unless @socket

begin
read_more size
finished = @parser.finished?
rescue EOFError
finished = true
# On any exception we reset the conn. This is a safety measure, to ensure
# we don't have conns in a bad state resulting in mixed requests/responses
rescue
if default_options.persistent? && @connection
@connection.close
@connection = nil
end

chunk = @parser.chunk

finish_response if finished

chunk.to_s
raise
end

private

# Initialize TLS connection
def start_tls(socket, host, options)
# TODO: abstract away SSLContexts so we can use other TLS libraries
context = options[:ssl_context] || OpenSSL::SSL::SSLContext.new
socket = options[:ssl_socket_class].new(socket, context)
socket.sync_close = true if socket.respond_to?(:sync_close=)

socket.connect
# Verify our request isn't going to be made against another URI
def verify_connection!(uri)
if default_options.persistent? && base_host(uri) != default_options.persistent
fail StateError, "Persistence is enabled for #{default_options.persistent}, but we got #{base_host(uri)}"

if context.verify_mode == OpenSSL::SSL::VERIFY_PEER
socket.post_connection_check(host)
# We re-create the connection object because we want to let prior requests
# lazily load the body as long as possible, and this mimics prior functionality.
elsif !default_options.persistent? || (@connection && !@connection.keep_alive?)
@connection = nil
end
end

socket
# Strips out query/path to give us a consistent way of comparing hosts
def base_host(uri)
base = uri.dup
base.query = nil
base.path = ""
base.to_s
end

# Merges query params if needed
Expand All @@ -128,7 +122,11 @@ def make_request_uri(uri, options)
# @param [#to_s] uri
# @return [URI]
def normalize_uri(uri)
uri = URI uri.to_s
if default_options.persistent? && uri !~ /^http|https/
uri = URI("#{default_options.persistent}#{uri}")
else
uri = URI(uri.to_s)
end

# Some proxies (seen on WEBRick) fail if URL has
# empty path (e.g. `http://example.com`) while it's RFC-complaint:
Expand All @@ -153,26 +151,5 @@ def make_request_body(opts, headers)
MimeType[:json].encode opts.json
end
end

# Reads data from socket up until headers
def read_headers!
read_more BUFFER_SIZE until @parser.headers
rescue IOError, Errno::ECONNRESET, Errno::EPIPE => ex
return if ex.is_a?(EOFError) && @parser.headers
raise IOError, "problem making HTTP request: #{ex}"
end

# Callback for when we've reached the end of a response
def finish_response
@socket.close if @socket && !@socket.closed?
@parser.reset

@socket = nil
end

# Feeds some more data into parser
def read_more(size)
@parser << @socket.readpartial(size) unless @parser.finished?
end
end
end
139 changes: 139 additions & 0 deletions lib/http/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
require "http/response/parser"

module HTTP
# A connection to the HTTP server
class Connection
attr_reader :socket, :parser, :persistent,
:pending_request, :pending_response, :sequence_id

# Attempt to read this much data
BUFFER_SIZE = 16_384

def initialize(req, options)
@persistent = options.persistent?

@parser = Response::Parser.new
@sequence_id = 0

@socket = options[:socket_class].open(req.socket_host, req.socket_port)

start_tls(req.uri.host, options[:ssl_socket_class], options[:ssl_context]) if req.uri.is_a?(URI::HTTPS) && !req.using_proxy?
end

# Send a request to the server
def send_request(req)
if pending_request
fail StateError, "Tried to send a request while one is pending already. This cannot be called from multiple threads!"
elsif pending_request
fail StateError, "Tried to send a request while a response is pending. Make sure you've fully read the body from the request."
end

@pending_request = true
@sequence_id += 1

req.stream socket

@pending_response = true
@pending_request = nil
end

# Read a chunk of the body
#
# @return [String] data chunk
# @return [Nil] when no more data left
def readpartial(size = BUFFER_SIZE)
return unless pending_response

begin
read_more size
finished = parser.finished?
rescue EOFError
finished = true
end

chunk = parser.chunk

finish_response if finished

chunk.to_s
end

# Reads data from socket up until headers
def read_headers!
read_more BUFFER_SIZE until parser.headers
set_keep_alive

rescue IOError, Errno::ECONNRESET, Errno::EPIPE => ex
return if ex.is_a?(EOFError) && parser.headers
raise IOError, "problem making HTTP request: #{ex}"
end

# Callback for when we've reached the end of a response
def finish_response
close unless keep_alive?

parser.reset

@pending_response = nil
end

# Close the connection
def close
socket.close unless socket.closed?

@pending_response = nil
@pending_request = nil
end

# Whether we're keeping the conn alive
def keep_alive?
!!@keep_alive && !socket.closed?
end

# Store whether the connection should be kept alive.
# Once we reset the parser, we lose all of this state.
def set_keep_alive
return @keep_alive = false unless persistent

# HTTP/1.0 requires opt in for Keep Alive
if parser.http_version == "1.0"
@keep_alive = parser.headers["Connection"] == HTTP::Client::KEEP_ALIVE

# HTTP/1.1 is opt-out
elsif parser.http_version == "1.1"
@keep_alive = parser.headers["Connection"] != HTTP::Client::CLOSE

# Anything else we assume doesn't supportit
else
@keep_alive = false
end
end

private :set_keep_alive

# Feeds some more data into parser
def read_more(size)
parser << socket.readpartial(size) unless parser.finished?
end

private :read_more

# Starts the SSL connection
def start_tls(host, ssl_socket_class, ssl_context)
# TODO: abstract away SSLContexts so we can use other TLS libraries
ssl_context ||= OpenSSL::SSL::SSLContext.new
@socket = ssl_socket_class.new(socket, ssl_context)
socket.sync_close = true

socket.connect

if ssl_context.verify_mode == OpenSSL::SSL::VERIFY_PEER
socket.post_connection_check(host)
end

socket
end

private :start_tls
end
end
6 changes: 5 additions & 1 deletion lib/http/options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def initialize(options = {})
self.headers.merge(headers)
end

%w(proxy params form json body follow response socket_class ssl_socket_class ssl_context).each do |method_name|
%w(proxy params form json body follow response socket_class ssl_socket_class ssl_context persistent).each do |method_name|
def_option method_name
end

Expand All @@ -71,6 +71,10 @@ def initialize(options = {})
end
end

def persistent?
!persistent.nil? && persistent != ""
end

def [](option)
send(option) rescue nil
end
Expand Down
Loading

0 comments on commit 4bf5a21

Please sign in to comment.