Skip to content
This repository has been archived by the owner on Dec 7, 2018. It is now read-only.

Commit

Permalink
Merge 76055f3 into f7eb96a
Browse files Browse the repository at this point in the history
  • Loading branch information
tarcieri committed Jul 28, 2013
2 parents f7eb96a + 76055f3 commit 8da96a1
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 77 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
0.5.0
-----
* Pipelining support
* Reel::Request#body now returns a Reel::RequestBody object instead of a String

0.4.0
----
* Allow Reel to stop cleanly
Expand Down
3 changes: 2 additions & 1 deletion lib/reel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
require 'reel/logger'
require 'reel/request_info'
require 'reel/request'
require 'reel/response'
require 'reel/request_body'
require 'reel/request_parser'
require 'reel/response'
require 'reel/response_writer'
require 'reel/server'
require 'reel/ssl_server'
Expand Down
2 changes: 1 addition & 1 deletion lib/reel/app.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def initialize(host, port)
super()
@server = Reel::Server.supervise(host, port) do |connection|
while request = connection.request
status, headers, body = call Rack::MockRequest.env_for(request.url, :method => request.method, :input => request.body)
status, headers, body = call Rack::MockRequest.env_for(request.url, :method => request.method, :input => request.body.to_s)
response_klass = body.is_a?(Stream) ? StreamResponse : Response
connection.respond(response_klass.new(status_symbol(status), headers, body))
end
Expand Down
2 changes: 1 addition & 1 deletion lib/reel/rack_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def response_body(body_parts)
def env request
env = Hash[PROTO_RACK_ENV]

env[RACK_INPUT] = StringIO.new(request.body || INITIAL_BODY)
env[RACK_INPUT] = StringIO.new(request.body.to_s || INITIAL_BODY)
env[RACK_INPUT].set_encoding(Encoding::BINARY) if env[RACK_INPUT].respond_to?(:set_encoding)
env[SERVER_NAME], env[SERVER_PORT] = (request[HOST]||'').split(':', 2)
env[SERVER_PORT] ||= @handler[:port].to_s
Expand Down
89 changes: 29 additions & 60 deletions lib/reel/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,37 @@ def self.build(request_info, connection)
end

def_delegators :@connection, :<<, :write, :respond, :finish_response
attr_reader :body

# request_info is a RequestInfo object including the headers and
# the url, method and http version.
#
# Access it through the RequestMixin methods.
def initialize(request_info, connection = nil)
@request_info = request_info
@connection = connection
@request_info = request_info
@connection = connection
@finished = false
@buffer = ""
@body = RequestBody.new(self)
@finished_read = false
end

# Returns true if request fully finished reading
def finished_reading?; @finished_read; end
def finished_reading?; @finished_read; end

# When HTTP Parser marks the message parsing as complete, this will be set.
def finish_reading!
raise StateError, "already finished" if @finished_read
@finished_read = true
end

# Buffer body sent from connection, or send it directly to
# the @on_body callback if set (calling #body with a block)
def add_body(chunk)
if @on_body
@on_body.call(chunk)
else
@body ||= ""
@body << chunk
end
end

# Returns the body, if a block is given, the body is streamed
# to the block as the chunks become available, until the body
# has been read.
#
# If no block is given, the entire body will be read from the
# connection into the body buffer and then returned.
def body
raise "no connection given" unless @connection

if block_given?
# Callback from the http_parser will be calling add_body directly
@on_body = Proc.new

# clear out body buffered so far
yield read_from_body(nil) if @body

until finished_reading?
@connection.readpartial
end
@on_body = nil
else
until finished_reading?
@connection.readpartial
end
@body
end
# Fill the request buffer with data as it becomes available
def fill_buffer(chunk)
@buffer << chunk
end

# Read a number of bytes, looping until they are available or until
# read_from_body returns nil, indicating there are no more bytes to read
#
# Note that bytes read from the body buffer will be cleared as they are
# read.
# readpartial returns nil, indicating there are no more bytes to read
def read(length = nil, buffer = nil)
raise ArgumentError, "negative length #{length} given" if length && length < 0

Expand All @@ -89,7 +57,7 @@ def read(length = nil, buffer = nil)
chunk_size = length.nil? ? @connection.buffer_size : length
begin
while chunk_size > 0
chunk = read_from_body(chunk_size)
chunk = readpartial(chunk_size)
break unless chunk
res << chunk
chunk_size = length - res.length unless length.nil?
Expand All @@ -99,26 +67,27 @@ def read(length = nil, buffer = nil)
return length && res.length == 0 ? nil : res
end

# @private
# Reads a number of bytes from the byte buffer, asking
# the connection to add to the buffer if there are not enough
# bytes available.
#
# Body buffer is cleared as bytes are read from it.
def read_from_body(length = nil)
if length.nil?
slice = @body
@body = nil
# Read a string up to the given number of bytes, blocking until some
# data is available but returning immediately if some data is available
def readpartial(length = nil)
if length.nil? && @buffer.length > 0
slice = @buffer
@buffer = ""
else
@body ||= ''
unless finished_reading? || @body.length >= length
@connection.readpartial(length - @body.length)
unless finished_reading? || (length && length <= @buffer.length)
@connection.readpartial(length ? length - @buffer.length : Connection::BUFFER_SIZE)
end

if length
slice = @buffer.slice!(0, length)
else
slice = @buffer
@buffer = ""
end
slice = @body.slice!(0, length)
end

slice && slice.length == 0 ? nil : slice
end
private :read_from_body

end
end
52 changes: 52 additions & 0 deletions lib/reel/request_body.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
require 'forwardable'

module Reel
# Represents the bodies of Requests
# TODO: this is an initial attempt at adding a standardized body object
# Much of the logic that belongs here is in Reel::Request and should be
# extracted.
class RequestBody
def initialize(request)
@request = request
@streaming = nil
@contents = nil
end

# Read exactly the given amount of data
def read(length)
stream!
@request.read(length)
end

# Read up to length bytes, but return any data that's available
def readpartial(length = nil)
stream!
@request.readpartial(length)
end

# Eagerly consume the entire body as a string
def to_s
return @contents if @contents
raise StateError, "body is being streamed" unless @streaming.nil?

begin
@streaming = false
@contents = ""
while chunk = @request.readpartial
@contents << chunk
end
rescue
@contents = nil
raise
end

@contents
end

# Assert that the body is actively being streamed
def stream!
raise StateError, "body has already been consumed" if @streaming == false
@streaming = true
end
end
end
2 changes: 1 addition & 1 deletion lib/reel/request_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def on_headers_complete(headers)

# Send body directly to Reel::Response to be buffered.
def on_body(chunk)
@currently_reading.add_body(chunk)
@currently_reading.fill_buffer(chunk)
end

# Mark current request as complete, set this as ready to respond.
Expand Down
24 changes: 13 additions & 11 deletions spec/reel/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
request.url.should eq "/"
request.version.should eq "1.1"
request['Content-Length'].should eq body.length.to_s
request.body.should eq example_request.body
request.body.to_s.should eq example_request.body
end
end

Expand Down Expand Up @@ -108,6 +108,7 @@
}.should raise_error(Reel::Connection::StateError)
end
end

it "reads pipelined requests without bodies" do
with_socket_pair do |client, connection|
3.times do
Expand Down Expand Up @@ -149,12 +150,13 @@
request.url.should eq "/"
request.version.should eq "1.1"
request['Content-Length'].should eq expected_body.length.to_s
request.body.should eq expected_body
request.body.to_s.should eq expected_body

connection.respond :ok, {}, ""
end
end
end

it "reads pipelined requests with streamed bodies" do
with_socket_pair(4) do |client, connection|
3.times do |i|
Expand All @@ -174,7 +176,7 @@
request['Content-Length'].should eq expected_body.length.to_s
request.should_not be_finished_reading
new_content = ""
request.body do |chunk|
while chunk = request.body.readpartial(1)
new_content << chunk
end
new_content.should == expected_body
Expand All @@ -185,9 +187,7 @@
end
end

# This test will deadlock rspec waiting unless
# connection.request works properly
it 'does not block waiting for body to read before handling request' do
it "does not block waiting for body to read before handling request" do
with_socket_pair do |client, connection|
example_request = ExampleRequest.new
content = "Hi guys! Sorry I'm late to the party."
Expand All @@ -197,11 +197,11 @@
request = connection.request
request.should be_a(Reel::Request)
client << content
request.body.should == content
request.body.to_s.should == content
end
end

it 'blocks on read until written' do
it "blocks on read until written" do
with_socket_pair do |client, connection|
example_request = ExampleRequest.new
content = "Hi guys! Sorry I'm late to the party."
Expand All @@ -225,7 +225,7 @@
end
end

it 'streams body properly with #read and buffered body' do
it "streams body properly with #read and buffered body" do
with_socket_pair do |client, connection|
example_request = ExampleRequest.new
content = "I'm data you can stream!"
Expand All @@ -245,7 +245,8 @@
rebuilt.should == ["I'm data", " you can", " stream!"]
end
end
it 'streams body properly with #body &block' do

it "streams request bodies" do
with_socket_pair(8) do |client, connection|
example_request = ExampleRequest.new
content = "I'm data you can stream!"
Expand All @@ -257,7 +258,7 @@
request.should_not be_finished_reading
client << content
rebuilt = []
request.body do |chunk|
while chunk = request.body.readpartial(8)
rebuilt << chunk
end
request.should be_finished_reading
Expand Down Expand Up @@ -301,6 +302,7 @@
request.read.should eq "Hello, world!"
end
end

it "reads to EOF if length is nil" do
with_socket_pair do |client, connection|
body = "Hello, world!"
Expand Down
2 changes: 1 addition & 1 deletion spec/reel/rack_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@

raise ex if ex
end
end
end
2 changes: 1 addition & 1 deletion spec/reel/server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
begin
request = connection.request
request.method.should eq 'POST'
connection.respond :ok, request.body
connection.respond :ok, request.body.to_s
rescue => ex
end
end
Expand Down

0 comments on commit 8da96a1

Please sign in to comment.