Skip to content
This repository has been archived by the owner on Dec 7, 2018. It is now read-only.

Reel::RequestBody #75

Merged
merged 2 commits into from
Aug 11, 2013
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
0.5.0
-----
* Pipelining support
* Reel::Request#body now returns a Reel::RequestBody object instead of a String

0.4.0
----
* Allow Reel to stop cleanly
Expand Down
3 changes: 2 additions & 1 deletion lib/reel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
require 'reel/logger'
require 'reel/request_info'
require 'reel/request'
require 'reel/response'
require 'reel/request_body'
require 'reel/request_parser'
require 'reel/response'
require 'reel/response_writer'
require 'reel/server'
require 'reel/ssl_server'
Expand Down
2 changes: 1 addition & 1 deletion lib/reel/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def initialize(host, port)
super()
@server = Reel::Server.supervise(host, port) do |connection|
while request = connection.request
status, headers, body = call Rack::MockRequest.env_for(request.url, :method => request.method, :input => request.body)
status, headers, body = call Rack::MockRequest.env_for(request.url, :method => request.method, :input => request.body.to_s)
response_klass = body.is_a?(Stream) ? StreamResponse : Response
connection.respond(response_klass.new(status_symbol(status), headers, body))
end
Expand Down
2 changes: 1 addition & 1 deletion lib/reel/rack_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def response_body(body_parts)
def env request
env = Hash[PROTO_RACK_ENV]

env[RACK_INPUT] = StringIO.new(request.body || INITIAL_BODY)
env[RACK_INPUT] = StringIO.new(request.body.to_s || INITIAL_BODY)
env[RACK_INPUT].set_encoding(Encoding::BINARY) if env[RACK_INPUT].respond_to?(:set_encoding)
env[SERVER_NAME], env[SERVER_PORT] = (request[HOST]||'').split(':', 2)
env[SERVER_PORT] ||= @handler[:port].to_s
Expand Down
89 changes: 29 additions & 60 deletions lib/reel/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,37 @@ def self.build(request_info, connection)
end

def_delegators :@connection, :<<, :write, :respond, :finish_response
attr_reader :body

# request_info is a RequestInfo object including the headers and
# the url, method and http version.
#
# Access it through the RequestMixin methods.
def initialize(request_info, connection = nil)
@request_info = request_info
@connection = connection
@request_info = request_info
@connection = connection
@finished = false
@buffer = ""
@body = RequestBody.new(self)
@finished_read = false
end

# Returns true if request fully finished reading
def finished_reading?; @finished_read; end
def finished_reading?; @finished_read; end

# When HTTP Parser marks the message parsing as complete, this will be set.
def finish_reading!
raise StateError, "already finished" if @finished_read
@finished_read = true
end

# Buffer body sent from connection, or send it directly to
# the @on_body callback if set (calling #body with a block)
def add_body(chunk)
if @on_body
@on_body.call(chunk)
else
@body ||= ""
@body << chunk
end
end

# Returns the body, if a block is given, the body is streamed
# to the block as the chunks become available, until the body
# has been read.
#
# If no block is given, the entire body will be read from the
# connection into the body buffer and then returned.
def body
raise "no connection given" unless @connection

if block_given?
# Callback from the http_parser will be calling add_body directly
@on_body = Proc.new

# clear out body buffered so far
yield read_from_body(nil) if @body

until finished_reading?
@connection.readpartial
end
@on_body = nil
else
until finished_reading?
@connection.readpartial
end
@body
end
# Fill the request buffer with data as it becomes available
def fill_buffer(chunk)
@buffer << chunk
end

# Read a number of bytes, looping until they are available or until
# read_from_body returns nil, indicating there are no more bytes to read
#
# Note that bytes read from the body buffer will be cleared as they are
# read.
# readpartial returns nil, indicating there are no more bytes to read
def read(length = nil, buffer = nil)
raise ArgumentError, "negative length #{length} given" if length && length < 0

Expand All @@ -89,7 +57,7 @@ def read(length = nil, buffer = nil)
chunk_size = length.nil? ? @connection.buffer_size : length
begin
while chunk_size > 0
chunk = read_from_body(chunk_size)
chunk = readpartial(chunk_size)
break unless chunk
res << chunk
chunk_size = length - res.length unless length.nil?
Expand All @@ -99,26 +67,27 @@ def read(length = nil, buffer = nil)
return length && res.length == 0 ? nil : res
end

# @private
# Reads a number of bytes from the byte buffer, asking
# the connection to add to the buffer if there are not enough
# bytes available.
#
# Body buffer is cleared as bytes are read from it.
def read_from_body(length = nil)
if length.nil?
slice = @body
@body = nil
# Read a string up to the given number of bytes, blocking until some
# data is available but returning immediately if some data is available
def readpartial(length = nil)
if length.nil? && @buffer.length > 0
slice = @buffer
@buffer = ""
else
@body ||= ''
unless finished_reading? || @body.length >= length
@connection.readpartial(length - @body.length)
unless finished_reading? || (length && length <= @buffer.length)
@connection.readpartial(length ? length - @buffer.length : Connection::BUFFER_SIZE)
end

if length
slice = @buffer.slice!(0, length)
else
slice = @buffer
@buffer = ""
end
slice = @body.slice!(0, length)
end

slice && slice.length == 0 ? nil : slice
end
private :read_from_body

end
end
47 changes: 47 additions & 0 deletions lib/reel/request_body.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
module Reel
# Represents the bodies of Requests
class RequestBody
def initialize(request)
@request = request
@streaming = nil
@contents = nil
end

# Read exactly the given amount of data
def read(length)
stream!
@request.read(length)
end

# Read up to length bytes, but return any data that's available
def readpartial(length = nil)
stream!
@request.readpartial(length)
end

# Eagerly consume the entire body as a string
def to_s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ponder whether this might be better called string as in StringIO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps. That said I'm trying to allow this body to passed directly through in e.g. Webmachine, and they probably want a #to_s style API. Still waiting to hear back from @seancribbs on that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either could be solved with an alias.

return @contents if @contents
raise StateError, "body is being streamed" unless @streaming.nil?

begin
@streaming = false
@contents = ""
while chunk = @request.readpartial
@contents << chunk
end
rescue
@contents = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to trash the partially-read data?
In the error case, perhaps it is useful to see this information.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this use case, I think so. This is eagerly attempting to read the entire body into a string. If it can't be read in its entirety, I think the best API here is just to raise.

If people want chunks of the body and to handle errors that occur partway through, they can use the streaming APIs.

raise
end

@contents
end

# Assert that the body is actively being streamed
def stream!
raise StateError, "body has already been consumed" if @streaming == false
@streaming = true
end
end
end
2 changes: 1 addition & 1 deletion lib/reel/request_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def on_headers_complete(headers)

# Send body directly to Reel::Response to be buffered.
def on_body(chunk)
@currently_reading.add_body(chunk)
@currently_reading.fill_buffer(chunk)
end

# Mark current request as complete, set this as ready to respond.
Expand Down
22 changes: 13 additions & 9 deletions spec/reel/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
request.url.should eq "/"
request.version.should eq "1.1"
request['Content-Length'].should eq body.length.to_s
request.body.should eq example_request.body
request.body.to_s.should eq example_request.body
end
end

Expand Down Expand Up @@ -108,6 +108,7 @@
}.should raise_error(Reel::Connection::StateError)
end
end

it "reads pipelined requests without bodies" do
with_socket_pair do |client, connection|
3.times do
Expand Down Expand Up @@ -149,12 +150,13 @@
request.url.should eq "/"
request.version.should eq "1.1"
request['Content-Length'].should eq expected_body.length.to_s
request.body.should eq expected_body
request.body.to_s.should eq expected_body

connection.respond :ok, {}, ""
end
end
end

it "reads pipelined requests with streamed bodies" do
with_socket_pair(4) do |client, connection|
3.times do |i|
Expand All @@ -174,7 +176,7 @@
request['Content-Length'].should eq expected_body.length.to_s
request.should_not be_finished_reading
new_content = ""
request.body do |chunk|
while chunk = request.body.readpartial(1)
new_content << chunk
end
new_content.should == expected_body
Expand All @@ -187,7 +189,7 @@

# This test will deadlock rspec waiting unless
# connection.request works properly
it 'does not block waiting for body to read before handling request' do
it "does not block waiting for body to read before handling request" do
with_socket_pair do |client, connection|
example_request = ExampleRequest.new
content = "Hi guys! Sorry I'm late to the party."
Expand All @@ -197,11 +199,11 @@
request = connection.request
request.should be_a(Reel::Request)
client << content
request.body.should == content
request.body.to_s.should == content
end
end

it 'blocks on read until written' do
it "blocks on read until written" do
with_socket_pair do |client, connection|
example_request = ExampleRequest.new
content = "Hi guys! Sorry I'm late to the party."
Expand All @@ -225,7 +227,7 @@
end
end

it 'streams body properly with #read and buffered body' do
it "streams body properly with #read and buffered body" do
with_socket_pair do |client, connection|
example_request = ExampleRequest.new
content = "I'm data you can stream!"
Expand All @@ -245,7 +247,8 @@
rebuilt.should == ["I'm data", " you can", " stream!"]
end
end
it 'streams body properly with #body &block' do

it "streams request bodies" do
with_socket_pair(8) do |client, connection|
example_request = ExampleRequest.new
content = "I'm data you can stream!"
Expand All @@ -257,7 +260,7 @@
request.should_not be_finished_reading
client << content
rebuilt = []
request.body do |chunk|
while chunk = request.body.readpartial(8)
rebuilt << chunk
end
request.should be_finished_reading
Expand Down Expand Up @@ -301,6 +304,7 @@
request.read.should eq "Hello, world!"
end
end

it "reads to EOF if length is nil" do
with_socket_pair do |client, connection|
body = "Hello, world!"
Expand Down
2 changes: 1 addition & 1 deletion spec/reel/server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
begin
request = connection.request
request.method.should eq 'POST'
connection.respond :ok, request.body
connection.respond :ok, request.body.to_s
rescue => ex
end
end
Expand Down