Permalink
Browse files

Don't memoize connection.request

Memoizing connection.request was causing a lot of problems when
interacting with websockets. This change avoids memoization of the
request, instead opting to read and return it on-the-fly when requested.

Some robustness guarantees around connection state aren't yet in place,
but overall things seem to be working and best of all, the websockets
example works.
  • Loading branch information...
1 parent 3af40c9 commit 53ad93b1ae55219648d219650b63b708908297ee @tarcieri tarcieri committed Aug 10, 2012
View
@@ -23,14 +23,19 @@ def run
class TimeClient
include Celluloid
include Celluloid::Notifications
+ include Celluloid::Logger
def initialize(websocket)
+ info "Streaming time changes to client"
@socket = websocket
subscribe('time_change', :notify_time_change)
end
def notify_time_change(topic, new_time)
@socket << new_time.inspect
+ rescue Reel::SocketError
+ info "Time client disconnected"
+ terminate
end
end
@@ -48,7 +53,8 @@ def on_connection(connection)
when Reel::Request
route_request connection, request
when Reel::WebSocket
- TimeClient.new(request)
+ info "Received a WebSocket connection"
+ route_websocket request
end
end
end
@@ -62,6 +68,15 @@ def route_request(connection, request)
connection.respond :not_found, "Not found"
end
+ def route_websocket(socket)
+ if socket.url == "/timeinfo"
+ TimeClient.new(socket)
+ else
+ info "Received invalid WebSocket request for: #{socket.url}"
+ socket.close
+ end
+ end
+
def render_index(connection)
info "200 OK: /"
connection.respond :ok, <<-HTML
View
@@ -40,7 +40,7 @@ def start
Celluloid::Actor[:reel_rack_pool] = ::Reel::RackWorker.pool(size: options[:workers], args: [self])
::Reel::Server.supervise_as(:reel_server, options[:host], options[:port]) do |connection|
- Celluloid::Actor[:reel_rack_pool].handle(connection)
+ Celluloid::Actor[:reel_rack_pool].handle(connection.detach)
end
sleep
View
@@ -3,15 +3,16 @@ module Reel
class Connection
class StateError < RuntimeError; end # wrong state for a given operation
- attr_reader :request, :socket, :parser
+ attr_reader :socket, :parser
# Attempt to read this much data
BUFFER_SIZE = 4096
def initialize(socket)
- @socket = socket
+ @attached = true
+ @socket = socket
@keepalive = true
- @parser = Request::Parser.new
+ @parser = Request::Parser.new
reset_request
@response_state = :header
@@ -21,10 +22,18 @@ def initialize(socket)
# Is the connection still active?
def alive?; @keepalive; end
+ # Is the connection still attached to a Reel::Server?
+ def attached?; @attached; end
+
+ # Detach this connection from the Reel::Server and manage it independently
+ def detach
+ @attached = false
+ self
+ end
+
# Reset the current request state
def reset_request(state = :header)
@request_state = state
- @request = nil
@header_buffer = "" # Buffer headers in case of an upgrade request
@parser.reset
end
@@ -39,25 +48,27 @@ def local_address
# Read a request object from the connection
def request
- @request ||= begin
- Request.read(self).tap do |request|
- case request
- when Request
- @request_state = :body
- @keepalive = false if request['Connection'] == 'close' || request.version == "1.0"
- @body_remaining = Integer(request['Content-Length']) if request['Content-Length']
- when WebSocket
- @request_state = @response_state = :websocket
- @body_remaining = nil
- @socket = nil
- else raise "unexpected request type: #{request.class}"
- end
- end
+ return if @request_state == :websocket
+ req = Request.read(self)
+
+ case req
+ when Request
+ @request_state = :body
+ @keepalive = false if req['Connection'] == 'close' || req.version == "1.0"
+ @body_remaining = Integer(req['Content-Length']) if req['Content-Length']
+ when WebSocket
+ @request_state = @response_state = :websocket
+ @body_remaining = nil
+ @socket = nil
+ else raise "unexpected request type: #{req.class}"
end
+ req
rescue IOError, Errno::ECONNRESET, Errno::EPIPE
# The client is disconnected
+ @request_state = :closed
@keepalive = false
+ nil
end
# Read a chunk from the request
View
@@ -6,11 +6,9 @@ class Request
def self.read(connection)
parser = connection.parser
- header_buffer = ''
begin
data = connection.socket.readpartial(Connection::BUFFER_SIZE)
- header_buffer << data
parser << data
end until parser.headers
View
@@ -20,13 +20,15 @@ def run
def handle_connection(socket)
connection = Connection.new(socket)
begin
- @callback[connection] if connection.request
- end while connection.alive?
- rescue RequestError
- connection.close
- rescue EOFError
+ @callback[connection]
+ ensure
+ if connection.attached?
+ connection.close rescue nil
+ end
+ end
+ rescue RequestError, EOFError
# Client disconnected prematurely
- # FIXME: should probably do something here
+ # TODO: log this?
end
end
end
View
@@ -48,6 +48,8 @@ def read
def write(msg)
@socket << ::WebSocket::Message.new(msg).to_data
msg
+ rescue Errno::EPIPE
+ raise SocketError, "error writing to socket"
end
alias_method :<<, :write
@@ -49,7 +49,7 @@
handler = proc do |connection|
begin
- worker.handle!(connection)
+ worker.handle!(connection.detach)
rescue => ex
end
end

0 comments on commit 53ad93b

Please sign in to comment.