Skip to content

Commit

Permalink
reconnect logic change
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidTompkins committed May 21, 2014
1 parent 6749fcf commit 7dfb7b2
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 46 deletions.
86 changes: 45 additions & 41 deletions lib/http_streaming_client/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,52 +124,55 @@ def request(method, uri, opts = {}, &block)
uri = URI.parse(uri)
end

options_factory = opts.delete(:options_factory)
if !options_factory.nil? then
if options_factory.respond_to? "get_options" then
logger.debug("Client::request:options_factory detected")
generated_options = options_factory.get_options
logger.debug("Client::request:options_factory:#{generated_options}")
opts.merge!(generated_options || {})
else
logger.warn("Client::request:options_factory detected, but does not respond to get_options(). Ignoring.")
end
end
@reconnect_count = 0 if @reconnect_requested
@options_factory = opts.delete(:options_factory)
@static_body_option = opts[:body]

begin

default_headers = {
"User-Agent" => opts["User-Agent"] || "HttpStreamingClient #{HttpStreamingClient::VERSION}",
"Accept" => "*/*",
"Accept-Charset" => "utf-8"
}

if method == "POST" || method == "PUT"
default_headers["Content-Type"] = opts["Content-Type"] || "application/x-www-form-urlencoded;charset=UTF-8"
body = opts.delete(:body)
if body.is_a?(Hash)
body = body.keys.collect {|param| "#{URI.escape(param.to_s)}=#{URI.escape(body[param].to_s)}"}.join('&')
opts[:body] = @static_body_option unless @static_body_option.nil?

if !@options_factory.nil? then
if @options_factory.respond_to? "get_options" then
logger.info("Client::request:options_factory detected")
generated_options = @options_factory.get_options
logger.info("Client::request:options_factory:#{generated_options}")
opts.merge!(generated_options || {})
else
logger.warn("Client::request:options_factory detected, but does not respond to get_options(). Ignoring.")
end
end
default_headers["Content-Length"] = body.length
end

unless uri.userinfo.nil?
default_headers["Authorization"] = "Basic #{[uri.userinfo].pack('m').strip!}\r\n"
end
default_headers = {
"User-Agent" => opts["User-Agent"] || "HttpStreamingClient #{HttpStreamingClient::VERSION}",
"Accept" => "*/*",
"Accept-Charset" => "utf-8"
}

if method == "POST" || method == "PUT"
default_headers["Content-Type"] = opts["Content-Type"] || "application/x-www-form-urlencoded;charset=UTF-8"
body = opts.delete(:body)
if body.is_a?(Hash)
body = body.keys.collect {|param| "#{URI.escape(param.to_s)}=#{URI.escape(body[param].to_s)}"}.join('&')
end
default_headers["Content-Length"] = body.length
end

encodings = []
encodings << "gzip" if (@compression_requested and opts[:compression].nil?) or opts[:compression]
if encodings.any?
default_headers["Accept-Encoding"] = "#{encodings.join(',')}"
end
unless uri.userinfo.nil?
default_headers["Authorization"] = "Basic #{[uri.userinfo].pack('m').strip!}\r\n"
end

headers = default_headers.merge(opts[:headers] || {})
logger.debug "request headers: #{headers}"
encodings = []
encodings << "gzip" if (@compression_requested and opts[:compression].nil?) or opts[:compression]
if encodings.any?
default_headers["Accept-Encoding"] = "#{encodings.join(',')}"
end

begin
headers = default_headers.merge(opts[:headers] || {})
logger.debug "request headers: #{headers}"

socket = initialize_socket(uri, opts)

@reconnect_count = 0 if @reconnect_requested

request = "#{method} #{uri.path}#{uri.query ? "?"+uri.query : nil} HTTP/1.1\r\n"
request << "Host: #{uri.host}\r\n"
headers.each do |k, v|
Expand Down Expand Up @@ -254,7 +257,7 @@ def request(method, uri, opts = {}, &block)
logger.debug "read #{line.size} uncompressed bytes, #{response.size} bytes total, decoder queue bytes:#{decoder.size}"
response << line unless @interrupted }
end
else
else
logger.debug "chunked transfer encoding with no compression detected"
if block_given? then
decoder = HttpStreamingClient::Decoders::Chunked.new { |line|
Expand Down Expand Up @@ -295,8 +298,8 @@ def request(method, uri, opts = {}, &block)
decoder << partial

if !block_given? then
logger.debug "no block specified, returning chunk results and halting streaming response"
return response
logger.debug "no block specified, returning chunk results and halting streaming response"
return response
else
return if @interrupted and response_compression
return response if @interrupted
Expand Down Expand Up @@ -380,11 +383,12 @@ def request(method, uri, opts = {}, &block)
if @reconnect_requested then
logger.info "Connection closed. Reconnect requested. Trying..."
@reconnect_count = @reconnect_count + 1
logger.info "@reconnect_count is #{@reconnect_count} of #{@reconnect_attempts}, sleeping for #{@reconnect_interval}..."
sleep @reconnect_interval
retry if @reconnect_count < @reconnect_attempts
logger.info "Maximum number of failed reconnect attempts reached (#{@reconnect_attempts}). Exiting."
end

raise e unless e.instance_of? ReconnectRequest
end
ensure
Expand Down
10 changes: 5 additions & 5 deletions spec/reconnect_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@

count = 0
client = HttpStreamingClient::Client.new(compression: false)
response = client.get("http://localhost:3000/outbounds/consumer/1") { |line|
response = client.get("http://localhost:3000/outbounds/consumer") { |line|
logger.debug "line received: #{line}"
count = count + 1
}
expect(response).to be_true
expect(count).to be(10)
end

#it "should reconnect on any error or EOF" do
it "should reconnect on any error or EOF", :disabled => true do
it "should reconnect on any error or EOF" do
#it "should reconnect on any error or EOF", :disabled => true do

client = HttpStreamingClient::Client.new(compression: false, reconnect: true, reconnect_attempts: 5, reconnect_interval: 1)
client = HttpStreamingClient::Client.new(compression: false, reconnect: true, reconnect_attempts: 5, reconnect_interval: 5)
count = 0
response = client.get("http://localhost:3000/outbounds/consumer/1") { |line|
response = client.get("http://localhost:3000/outbounds/consumer") { |line|
logger.debug "line received: #{line}"
count = count + 1
client.interrupt if count > 20
Expand Down

0 comments on commit 7dfb7b2

Please sign in to comment.