diff --git a/CHANGES.md b/CHANGES.md index 7f960e9..31777ad 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,8 @@ +0.5.0 +----- +* Pipelining support +* Reel::Request#body now returns a Reel::RequestBody object instead of a String + 0.4.0 ---- * Allow Reel to stop cleanly diff --git a/lib/reel.rb b/lib/reel.rb index ab6d3ef..61694c5 100644 --- a/lib/reel.rb +++ b/lib/reel.rb @@ -13,8 +13,9 @@ require 'reel/logger' require 'reel/request_info' require 'reel/request' -require 'reel/response' +require 'reel/request_body' require 'reel/request_parser' +require 'reel/response' require 'reel/response_writer' require 'reel/server' require 'reel/ssl_server' diff --git a/lib/reel/app.rb b/lib/reel/app.rb index 027c8e1..a45f76e 100644 --- a/lib/reel/app.rb +++ b/lib/reel/app.rb @@ -16,7 +16,7 @@ def initialize(host, port) super() @server = Reel::Server.supervise(host, port) do |connection| while request = connection.request - status, headers, body = call Rack::MockRequest.env_for(request.url, :method => request.method, :input => request.body) + status, headers, body = call Rack::MockRequest.env_for(request.url, :method => request.method, :input => request.body.to_s) response_klass = body.is_a?(Stream) ? StreamResponse : Response connection.respond(response_klass.new(status_symbol(status), headers, body)) end diff --git a/lib/reel/rack_worker.rb b/lib/reel/rack_worker.rb index 3ecedd2..6c8d070 100644 --- a/lib/reel/rack_worker.rb +++ b/lib/reel/rack_worker.rb @@ -111,7 +111,7 @@ def response_body(body_parts) def env request env = Hash[PROTO_RACK_ENV] - env[RACK_INPUT] = StringIO.new(request.body || INITIAL_BODY) + env[RACK_INPUT] = StringIO.new(request.body.to_s || INITIAL_BODY) env[RACK_INPUT].set_encoding(Encoding::BINARY) if env[RACK_INPUT].respond_to?(:set_encoding) env[SERVER_NAME], env[SERVER_PORT] = (request[HOST]||'').split(':', 2) env[SERVER_PORT] ||= @handler[:port].to_s diff --git a/lib/reel/request.rb b/lib/reel/request.rb index 4f15564..0128c3e 100644 --- a/lib/reel/request.rb +++ b/lib/reel/request.rb @@ -17,69 +17,37 @@ def self.build(request_info, connection) end def_delegators :@connection, :<<, :write, :respond, :finish_response + attr_reader :body # request_info is a RequestInfo object including the headers and # the url, method and http version. # # Access it through the RequestMixin methods. def initialize(request_info, connection = nil) - @request_info = request_info - @connection = connection + @request_info = request_info + @connection = connection + @finished = false + @buffer = "" + @body = RequestBody.new(self) @finished_read = false end # Returns true if request fully finished reading - def finished_reading?; @finished_read; end + def finished_reading?; @finished_read; end # When HTTP Parser marks the message parsing as complete, this will be set. def finish_reading! + raise StateError, "already finished" if @finished_read @finished_read = true end - # Buffer body sent from connection, or send it directly to - # the @on_body callback if set (calling #body with a block) - def add_body(chunk) - if @on_body - @on_body.call(chunk) - else - @body ||= "" - @body << chunk - end - end - - # Returns the body, if a block is given, the body is streamed - # to the block as the chunks become available, until the body - # has been read. - # - # If no block is given, the entire body will be read from the - # connection into the body buffer and then returned. - def body - raise "no connection given" unless @connection - - if block_given? - # Callback from the http_parser will be calling add_body directly - @on_body = Proc.new - - # clear out body buffered so far - yield read_from_body(nil) if @body - - until finished_reading? - @connection.readpartial - end - @on_body = nil - else - until finished_reading? - @connection.readpartial - end - @body - end + # Fill the request buffer with data as it becomes available + def fill_buffer(chunk) + @buffer << chunk end # Read a number of bytes, looping until they are available or until - # read_from_body returns nil, indicating there are no more bytes to read - # - # Note that bytes read from the body buffer will be cleared as they are - # read. + # readpartial returns nil, indicating there are no more bytes to read def read(length = nil, buffer = nil) raise ArgumentError, "negative length #{length} given" if length && length < 0 @@ -89,7 +57,7 @@ def read(length = nil, buffer = nil) chunk_size = length.nil? ? @connection.buffer_size : length begin while chunk_size > 0 - chunk = read_from_body(chunk_size) + chunk = readpartial(chunk_size) break unless chunk res << chunk chunk_size = length - res.length unless length.nil? @@ -99,26 +67,27 @@ def read(length = nil, buffer = nil) return length && res.length == 0 ? nil : res end - # @private - # Reads a number of bytes from the byte buffer, asking - # the connection to add to the buffer if there are not enough - # bytes available. - # - # Body buffer is cleared as bytes are read from it. - def read_from_body(length = nil) - if length.nil? - slice = @body - @body = nil + # Read a string up to the given number of bytes, blocking until some + # data is available but returning immediately if some data is available + def readpartial(length = nil) + if length.nil? && @buffer.length > 0 + slice = @buffer + @buffer = "" else - @body ||= '' - unless finished_reading? || @body.length >= length - @connection.readpartial(length - @body.length) + unless finished_reading? || (length && length <= @buffer.length) + @connection.readpartial(length ? length - @buffer.length : Connection::BUFFER_SIZE) + end + + if length + slice = @buffer.slice!(0, length) + else + slice = @buffer + @buffer = "" end - slice = @body.slice!(0, length) end + slice && slice.length == 0 ? nil : slice end - private :read_from_body end end diff --git a/lib/reel/request_body.rb b/lib/reel/request_body.rb new file mode 100644 index 0000000..6500092 --- /dev/null +++ b/lib/reel/request_body.rb @@ -0,0 +1,52 @@ +require 'forwardable' + +module Reel + # Represents the bodies of Requests + # TODO: this is an initial attempt at adding a standardized body object + # Much of the logic that belongs here is in Reel::Request and should be + # extracted. + class RequestBody + def initialize(request) + @request = request + @streaming = nil + @contents = nil + end + + # Read exactly the given amount of data + def read(length) + stream! + @request.read(length) + end + + # Read up to length bytes, but return any data that's available + def readpartial(length = nil) + stream! + @request.readpartial(length) + end + + # Eagerly consume the entire body as a string + def to_s + return @contents if @contents + raise StateError, "body is being streamed" unless @streaming.nil? + + begin + @streaming = false + @contents = "" + while chunk = @request.readpartial + @contents << chunk + end + rescue + @contents = nil + raise + end + + @contents + end + + # Assert that the body is actively being streamed + def stream! + raise StateError, "body has already been consumed" if @streaming == false + @streaming = true + end + end +end diff --git a/lib/reel/request_parser.rb b/lib/reel/request_parser.rb index 7dbf149..0fad0d3 100644 --- a/lib/reel/request_parser.rb +++ b/lib/reel/request_parser.rb @@ -60,7 +60,7 @@ def on_headers_complete(headers) # Send body directly to Reel::Response to be buffered. def on_body(chunk) - @currently_reading.add_body(chunk) + @currently_reading.fill_buffer(chunk) end # Mark current request as complete, set this as ready to respond. diff --git a/spec/reel/connection_spec.rb b/spec/reel/connection_spec.rb index 9fba965..c2b1bd5 100644 --- a/spec/reel/connection_spec.rb +++ b/spec/reel/connection_spec.rb @@ -33,7 +33,7 @@ request.url.should eq "/" request.version.should eq "1.1" request['Content-Length'].should eq body.length.to_s - request.body.should eq example_request.body + request.body.to_s.should eq example_request.body end end @@ -108,6 +108,7 @@ }.should raise_error(Reel::Connection::StateError) end end + it "reads pipelined requests without bodies" do with_socket_pair do |client, connection| 3.times do @@ -149,12 +150,13 @@ request.url.should eq "/" request.version.should eq "1.1" request['Content-Length'].should eq expected_body.length.to_s - request.body.should eq expected_body + request.body.to_s.should eq expected_body connection.respond :ok, {}, "" end end end + it "reads pipelined requests with streamed bodies" do with_socket_pair(4) do |client, connection| 3.times do |i| @@ -174,7 +176,7 @@ request['Content-Length'].should eq expected_body.length.to_s request.should_not be_finished_reading new_content = "" - request.body do |chunk| + while chunk = request.body.readpartial(1) new_content << chunk end new_content.should == expected_body @@ -185,9 +187,7 @@ end end - # This test will deadlock rspec waiting unless - # connection.request works properly - it 'does not block waiting for body to read before handling request' do + it "does not block waiting for body to read before handling request" do with_socket_pair do |client, connection| example_request = ExampleRequest.new content = "Hi guys! Sorry I'm late to the party." @@ -197,11 +197,11 @@ request = connection.request request.should be_a(Reel::Request) client << content - request.body.should == content + request.body.to_s.should == content end end - it 'blocks on read until written' do + it "blocks on read until written" do with_socket_pair do |client, connection| example_request = ExampleRequest.new content = "Hi guys! Sorry I'm late to the party." @@ -225,7 +225,7 @@ end end - it 'streams body properly with #read and buffered body' do + it "streams body properly with #read and buffered body" do with_socket_pair do |client, connection| example_request = ExampleRequest.new content = "I'm data you can stream!" @@ -245,7 +245,8 @@ rebuilt.should == ["I'm data", " you can", " stream!"] end end - it 'streams body properly with #body &block' do + + it "streams request bodies" do with_socket_pair(8) do |client, connection| example_request = ExampleRequest.new content = "I'm data you can stream!" @@ -257,7 +258,7 @@ request.should_not be_finished_reading client << content rebuilt = [] - request.body do |chunk| + while chunk = request.body.readpartial(8) rebuilt << chunk end request.should be_finished_reading @@ -301,6 +302,7 @@ request.read.should eq "Hello, world!" end end + it "reads to EOF if length is nil" do with_socket_pair do |client, connection| body = "Hello, world!" diff --git a/spec/reel/server_spec.rb b/spec/reel/server_spec.rb index c9e4158..bb708ea 100644 --- a/spec/reel/server_spec.rb +++ b/spec/reel/server_spec.rb @@ -35,7 +35,7 @@ begin request = connection.request request.method.should eq 'POST' - connection.respond :ok, request.body + connection.respond :ok, request.body.to_s rescue => ex end end