Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added connection reuse #184

Merged
merged 1 commit into from
Mar 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "http/errors"
require "http/chainable"
require "http/client"
require "http/connection"
require "http/options"
require "http/request"
require "http/request/writer"
Expand Down
7 changes: 7 additions & 0 deletions lib/http/chainable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ def request(verb, uri, options = {})
branch(options).request verb, uri
end

# Flag as persistent
# @param [String] host
# @raise [Request::Error] if Host is invalid
def persistent(host)
branch default_options.with_persistent host
end

# Make a request through an HTTP proxy
# @param [Array] proxy
# @raise [Request::Error] if HTTP proxy is invalid
Expand Down
121 changes: 49 additions & 72 deletions lib/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ module HTTP
class Client
include Chainable

# Input buffer size
BUFFER_SIZE = 16_384
CONNECTION = "Connection".freeze
KEEP_ALIVE = "Keep-Alive".freeze
CLOSE = "close".freeze

attr_reader :default_options

def initialize(default_options = {})
@default_options = HTTP::Options.new(default_options)
@parser = HTTP::Response::Parser.new
@socket = nil
end

# Make an HTTP request
Expand All @@ -28,6 +27,13 @@ def request(verb, uri, opts = {})
proxy = opts.proxy
body = make_request_body(opts, headers)

# Tell the server to keep the conn open
if default_options.persistent?
headers[CONNECTION] = KEEP_ALIVE
else
headers[CONNECTION] = CLOSE
end

req = HTTP::Request.new(verb, uri, headers, proxy, body)
res = perform req, opts

Expand All @@ -48,67 +54,55 @@ def perform(req, options)
end

def make_request(req, options)
# finish previous response if client was re-used
# TODO: this is pretty wrong, as socket shoud be part of response
# connection, so that re-use of client will not break multiple
# chunked responses
finish_response

uri = req.uri

# TODO: keep-alive support
@socket = options[:socket_class].open(req.socket_host, req.socket_port)
@socket = start_tls(@socket, uri.host, options) if uri.is_a?(URI::HTTPS) && !req.using_proxy?

req.stream @socket
verify_connection!(req.uri)

read_headers!
@connection ||= HTTP::Connection.new(req, options)
@connection.send_request(req)
@connection.read_headers!

body = Response::Body.new(self)
res = Response.new(@parser.status_code, @parser.http_version, @parser.headers, body, uri)
res = Response.new(
@connection.parser.status_code,
@connection.parser.http_version,
@connection.parser.headers,
Response::Body.new(@connection),
req.uri
)

finish_response if :head == req.verb
@connection.finish_response if req.verb == :head

res
end

# Read a chunk of the body
#
# @return [String] data chunk
# @return [Nil] when no more data left
def readpartial(size = BUFFER_SIZE)
return unless @socket

begin
read_more size
finished = @parser.finished?
rescue EOFError
finished = true
# On any exception we reset the conn. This is a safety measure, to ensure
# we don't have conns in a bad state resulting in mixed requests/responses
rescue
if default_options.persistent? && @connection
@connection.close
@connection = nil
end

chunk = @parser.chunk

finish_response if finished

chunk.to_s
raise
end

private

# Initialize TLS connection
def start_tls(socket, host, options)
# TODO: abstract away SSLContexts so we can use other TLS libraries
context = options[:ssl_context] || OpenSSL::SSL::SSLContext.new
socket = options[:ssl_socket_class].new(socket, context)
socket.sync_close = true if socket.respond_to?(:sync_close=)

socket.connect
# Verify our request isn't going to be made against another URI
def verify_connection!(uri)
if default_options.persistent? && base_host(uri) != default_options.persistent
fail StateError, "Persistence is enabled for #{default_options.persistent}, but we got #{base_host(uri)}"

if context.verify_mode == OpenSSL::SSL::VERIFY_PEER
socket.post_connection_check(host)
# We re-create the connection object because we want to let prior requests
# lazily load the body as long as possible, and this mimics prior functionality.
elsif !default_options.persistent? || (@connection && !@connection.keep_alive?)
@connection = nil
end
end

socket
# Strips out query/path to give us a consistent way of comparing hosts
def base_host(uri)
base = uri.dup
base.query = nil
base.path = ""
base.to_s
end

# Merges query params if needed
Expand All @@ -128,7 +122,11 @@ def make_request_uri(uri, options)
# @param [#to_s] uri
# @return [URI]
def normalize_uri(uri)
uri = URI uri.to_s
if default_options.persistent? && uri !~ /^http|https/
uri = URI("#{default_options.persistent}#{uri}")
else
uri = URI(uri.to_s)
end

# Some proxies (seen on WEBRick) fail if URL has
# empty path (e.g. `http://example.com`) while it's RFC-complaint:
Expand All @@ -153,26 +151,5 @@ def make_request_body(opts, headers)
MimeType[:json].encode opts.json
end
end

# Reads data from socket up until headers
def read_headers!
read_more BUFFER_SIZE until @parser.headers
rescue IOError, Errno::ECONNRESET, Errno::EPIPE => ex
return if ex.is_a?(EOFError) && @parser.headers
raise IOError, "problem making HTTP request: #{ex}"
end

# Callback for when we've reached the end of a response
def finish_response
@socket.close if @socket && !@socket.closed?
@parser.reset

@socket = nil
end

# Feeds some more data into parser
def read_more(size)
@parser << @socket.readpartial(size) unless @parser.finished?
end
end
end
139 changes: 139 additions & 0 deletions lib/http/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
require "http/response/parser"

module HTTP
# A connection to the HTTP server
class Connection
attr_reader :socket, :parser, :persistent,
:pending_request, :pending_response, :sequence_id

# Attempt to read this much data
BUFFER_SIZE = 16_384

def initialize(req, options)
@persistent = options.persistent?

@parser = Response::Parser.new
@sequence_id = 0

@socket = options[:socket_class].open(req.socket_host, req.socket_port)

start_tls(req.uri.host, options[:ssl_socket_class], options[:ssl_context]) if req.uri.is_a?(URI::HTTPS) && !req.using_proxy?
end

# Send a request to the server
def send_request(req)
if pending_request
fail StateError, "Tried to send a request while one is pending already. This cannot be called from multiple threads!"
elsif pending_request
fail StateError, "Tried to send a request while a response is pending. Make sure you've fully read the body from the request."
end

@pending_request = true
@sequence_id += 1

req.stream socket

@pending_response = true
@pending_request = nil
end

# Read a chunk of the body
#
# @return [String] data chunk
# @return [Nil] when no more data left
def readpartial(size = BUFFER_SIZE)
return unless pending_response

begin
read_more size
finished = parser.finished?
rescue EOFError
finished = true
end

chunk = parser.chunk

finish_response if finished

chunk.to_s
end

# Reads data from socket up until headers
def read_headers!
read_more BUFFER_SIZE until parser.headers
set_keep_alive

rescue IOError, Errno::ECONNRESET, Errno::EPIPE => ex
return if ex.is_a?(EOFError) && parser.headers
raise IOError, "problem making HTTP request: #{ex}"
end

# Callback for when we've reached the end of a response
def finish_response
close unless keep_alive?

parser.reset

@pending_response = nil
end

# Close the connection
def close
socket.close unless socket.closed?

@pending_response = nil
@pending_request = nil
end

# Whether we're keeping the conn alive
def keep_alive?
!!@keep_alive && !socket.closed?
end

# Store whether the connection should be kept alive.
# Once we reset the parser, we lose all of this state.
def set_keep_alive
return @keep_alive = false unless persistent

# HTTP/1.0 requires opt in for Keep Alive
if parser.http_version == "1.0"
@keep_alive = parser.headers["Connection"] == HTTP::Client::KEEP_ALIVE

# HTTP/1.1 is opt-out
elsif parser.http_version == "1.1"
@keep_alive = parser.headers["Connection"] != HTTP::Client::CLOSE

# Anything else we assume doesn't supportit
else
@keep_alive = false
end
end

private :set_keep_alive

# Feeds some more data into parser
def read_more(size)
parser << socket.readpartial(size) unless parser.finished?
end

private :read_more

# Starts the SSL connection
def start_tls(host, ssl_socket_class, ssl_context)
# TODO: abstract away SSLContexts so we can use other TLS libraries
ssl_context ||= OpenSSL::SSL::SSLContext.new
@socket = ssl_socket_class.new(socket, ssl_context)
socket.sync_close = true

socket.connect

if ssl_context.verify_mode == OpenSSL::SSL::VERIFY_PEER
socket.post_connection_check(host)
end

socket
end

private :start_tls
end
end
6 changes: 5 additions & 1 deletion lib/http/options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def initialize(options = {})
self.headers.merge(headers)
end

%w(proxy params form json body follow response socket_class ssl_socket_class ssl_context).each do |method_name|
%w(proxy params form json body follow response socket_class ssl_socket_class ssl_context persistent).each do |method_name|
def_option method_name
end

Expand All @@ -71,6 +71,10 @@ def initialize(options = {})
end
end

def persistent?
!persistent.nil? && persistent != ""
end

def [](option)
send(option) rescue nil
end
Expand Down
Loading