Skip to content

Commit

Permalink
v0.9.0, updated support for latest MRI and JRuby versions, improved c…
Browse files Browse the repository at this point in the history
…hunked encoding handling
  • Loading branch information
DavidTompkins committed Apr 28, 2014
1 parent 51f9aa7 commit 5d7f7ba
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 44 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
http_streaming_client 0.9.0 (04.28.2014)
========================================
* Tested and fixed for MRI ruby-2.0.0-p451 and JRuby jruby-1.7.12
* Fixed chunked encoding support handler, support for gzip'd blocks spanning chunks

http_streaming_client 0.8.11 ()
========================================
* Fixed warn logging for non-200 HTTP response codes
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Ruby HTTP client with support for HTTP 1.1 streaming, GZIP and zlib compressed s

## Ruby Version

MRI ruby-2.0.0-p353. If you need it, install via rvm: https://rvm.io/
MRI ruby-2.0.0-p451 and JRuby jruby-1.7.12. Install via rvm: https://rvm.io/

## Installation

Expand Down
33 changes: 21 additions & 12 deletions lib/http_streaming_client/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
require "http_streaming_client/custom_logger"
require "http_streaming_client/errors"
require "http_streaming_client/decoders/gzip"
require "http_streaming_client/decoders/chunked"

module HttpStreamingClient

Expand Down Expand Up @@ -231,7 +232,7 @@ def request(method, uri, opts = {}, &block)
response = ""

if response_compression then
logger.debug "response compression detected"
logger.debug "chunked transfer encoding with compression detected"
if block_given? then
decoder = HttpStreamingClient::Decoders::GZip.new { |line|
logger.debug "read #{line.size} uncompressed bytes, decoder queue bytes:#{decoder.size}"
Expand All @@ -241,6 +242,17 @@ def request(method, uri, opts = {}, &block)
logger.debug "read #{line.size} uncompressed bytes, #{response.size} bytes total, decoder queue bytes:#{decoder.size}"
response << line unless @interrupted }
end
else
logger.debug "chunked transfer encoding with no compression detected"
if block_given? then
decoder = HttpStreamingClient::Decoders::Chunked.new { |line|
logger.debug "read #{line.size} uncompressed bytes, decoder queue bytes:#{decoder.size}"
block.call(line) unless @interrupted }
else
decoder = HttpStreamingClient::Decoders::Chunked.new { |line|
logger.debug "read #{line.size} uncompressed bytes, #{response.size} bytes total, decoder queue bytes:#{decoder.size}"
response << line unless @interrupted }
end
end

while !socket.eof? && (line = socket.gets)
Expand Down Expand Up @@ -268,19 +280,16 @@ def request(method, uri, opts = {}, &block)
logger.debug "read #{partial.size} bytes, #{remaining} bytes remaining"
end

if response_compression then
return if @interrupted
decoder << partial
decoder << partial

if !block_given? then
logger.debug "no block specified, returning chunk results and halting streaming response"
return response
else
if block_given? then
yield partial
else
return response if @interrupted
logger.debug "no block specified, returning chunk results and halting streaming response"
response << partial
return response
end
return if @interrupted and response_compression
return response if @interrupted
end

end

logger.debug "socket EOF detected" if socket.eof?
Expand Down
98 changes: 98 additions & 0 deletions lib/http_streaming_client/decoders/chunked.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
###########################################################################
##
## http_streaming_client
##
## Ruby HTTP client with support for HTTP 1.1 streaming, GZIP compressed
## streams, and chunked transfer encoding. Includes extensible OAuth
## support for the Adobe Analytics Firehose and Twitter Streaming APIs.
##
## David Tompkins -- 4/25/2014
## tompkins@adobe_dot_com
##
###########################################################################
##
## Copyright (c) 2014 Adobe Systems Incorporated. All rights reserved.
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
###########################################################################

require "http_streaming_client/errors"

module HttpStreamingClient

module Decoders

class Chunked

def logger
HttpStreamingClient.logger
end

def initialize(&packet_callback)
logger.debug "Chunked:initialize"
@packet_callback = packet_callback
end

def <<(chunk)
return unless chunk && chunk.size > 0
chunk_io = StringIO.new(chunk)
while true
line = nonblock_readline(chunk_io)
break if line.nil?
process_line(line)
end
end

def size
logger.debug "Chunked:size"
return @line_buffer.size unless @line_buffer.nil?
return 0
end

def close
logger.debug "Chunked:close"
end

protected

def nonblock_readline(io)
@line_buffer ||= ""
ch = nil
begin
while ch = io.getc
@line_buffer += ch
if ch == "\n" then
result = @line_buffer
@line_buffer = ""
return result
end
end
rescue => e
logger.debug "nonblock_readline:error received:#{e.class}:#{e}"
return nil
end
end

private

def process_line(line)
logger.debug "Chunked:process_line:size:#{line.nil? ? "nil" : line.size}"
if line && line.size > 0
@packet_callback.call(line) unless @packet_callback.nil?
end
end

end
end
end
73 changes: 46 additions & 27 deletions lib/http_streaming_client/decoders/gzip.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# encoding: utf-8

###########################################################################
##
## http_streaming_client
Expand Down Expand Up @@ -37,6 +39,14 @@ module Decoders

class GZip

if defined?(JRUBY_VERSION) then
# JRuby: pass at least 8k bytes to GzipReader to avoid zlib EOF
GZIP_READER_MIN_BUF_SIZE = 8192
else
# MRI: pass at least 2k bytes to GzipReader to avoid zlib EOF
GZIP_READER_MIN_BUF_SIZE = 2048
end

def logger
HttpStreamingClient.logger
end
Expand All @@ -46,37 +56,19 @@ def initialize(&packet_callback)
@packet_callback = packet_callback
end

def nonblock_readline(io)
@line_buffer ||= ""
ch = nil
begin
while ch = io.getc
@line_buffer += ch
if ch == "\n" then
result = @line_buffer
@line_buffer = ""
return result
end
end
rescue Zlib::GzipFile::Error
# this is raised on EOF by ZLib, return nil to indicate EOF and leave partial line in the buffer
return nil
end
end

def <<(compressed_packet)
return unless compressed_packet && compressed_packet.size > 0
@buf ||= GZipBufferIO.new
@buf << compressed_packet

# pass at least 2k bytes to GzipReader to avoid zlib EOF
if @buf.size > 2048 then
# pass at least GZIP_READER_MIN_BUF_SIZE bytes to GzipReader to avoid zlib EOF
if @buf.size > GZIP_READER_MIN_BUF_SIZE then

@gzip ||= Zlib::GzipReader.new @buf

while true do
decompressed_packet = nonblock_readline(@gzip)
#logger.debug "decompressed_packet:#{decompressed_packet}"
#logger.debug "GZip:<<:decompressed_packet:#{decompressed_packet}"
break if decompressed_packet.nil?
process_decompressed_packet(decompressed_packet)
end
Expand All @@ -91,7 +83,7 @@ def close

while true do
decompressed_packet = nonblock_readline(@gzip)
#logger.debug "decompressed_packet:#{decompressed_packet}"
#logger.debug "GZip:close:decompressed_packet:#{decompressed_packet}"
break if decompressed_packet.nil?
process_decompressed_packet(decompressed_packet)
end
Expand All @@ -100,14 +92,40 @@ def close
raise HttpStreamingClient::DecoderError.new(e.message)
end
end

def size
@buf.size
end

def nonblock_readline(io)
@line_buffer ||= ""
ch = nil
begin
while ch = io.getc
@line_buffer += ch
if ch == "\n" then
result = @line_buffer
@line_buffer = ""
return result
end
end
rescue Zlib::GzipFile::Error
# this is raised on EOF by ZLib in MRI, return nil to indicate EOF and leave partial line in the buffer
logger.debug "Gzip:nonblock_readline:Zlib::GzipFile::Error:line_buffer.size:#{@line_buffer.size}"
return nil
rescue IOError
# this is raised on EOF by ZLib in JRuby, return nil to indicate EOF and leave partial line in the buffer
logger.debug "Gzip:nonblock_readline:IOError:line_buffer.size:#{@line_buffer.size}"
return nil
rescue => e
logger.debug "Gzip:nonblock_readline:error received:#{e.class}:#{e}"
raise e
end
end

protected

class GZipBufferIO < IO
class GZipBufferIO < StringIO

def logger
HttpStreamingClient.logger
Expand All @@ -116,6 +134,7 @@ def logger
def initialize(string="")
logger.debug "GZipBufferIO:initialize"
@packet_stream = string
@packet_stream.force_encoding("BINARY")
end

def <<(string)
Expand All @@ -124,7 +143,7 @@ def <<(string)

# called by GzipReader
def readpartial(length=nil, buffer=nil)
logger.debug "GZipBufferIO:readpartial:length:#{length}:@packet_stream:#{@packet_stream.nil? ? 'nil' : 'not null'}"
logger.debug "GZipBufferIO:readpartial:length:#{length}:@packet_stream:#{@packet_stream.nil? ? 'nil' : @packet_stream.size}"
buffer ||= ""

raise EOFError "" if @packet_stream.size == 0
Expand All @@ -150,14 +169,14 @@ def readpartial(length=nil, buffer=nil)

# called by GzipReader
def read(length=nil, buffer=nil)
logger.debug "read:length:#{length}"
logger.debug "GZipBufferIO:read:length:#{length}"
return nil if @packet_stream.size == 0
readpartial(length, buffer)
end

# called by GzipReader
def size
logger.debug "size():#{@packet_stream.size}"
logger.debug "GZipBufferIO:size():#{@packet_stream.size}"
@packet_stream.size
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/http_streaming_client/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@
###########################################################################

module HttpStreamingClient
VERSION = "0.8.11"
VERSION = "0.9.0"
end
7 changes: 4 additions & 3 deletions spec/reconnect_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

# currently disabled, requires a server that can be killed to simulate dropped connections

it "should receive exactly 10 messages, no reconnect" do
#it "should receive exactly 10 messages, no reconnect", :disabled => true do
#it "should receive exactly 10 messages, no reconnect" do
it "should receive exactly 10 messages, no reconnect", :disabled => true do

count = 0
client = HttpStreamingClient::Client.new(compression: false)
Expand All @@ -18,7 +18,8 @@
expect(count).to be(10)
end

it "should reconnect on any error or EOF" do
#it "should reconnect on any error or EOF" do
it "should reconnect on any error or EOF", :disabled => true do

client = HttpStreamingClient::Client.new(compression: false, reconnect: true, reconnect_attempts: 5, reconnect_interval: 1)
count = 0
Expand Down

0 comments on commit 5d7f7ba

Please sign in to comment.