Skip to content
This repository has been archived by the owner on Dec 7, 2018. It is now read-only.

Commit

Permalink
Merge b83c068 into a273967
Browse files Browse the repository at this point in the history
  • Loading branch information
digitalextremist committed Apr 26, 2013
2 parents a273967 + b83c068 commit 66c5605
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 15 deletions.
3 changes: 3 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ source 'https://rubygems.org'

gem 'celluloid', github: 'celluloid/celluloid', branch: 'master'
gem 'celluloid-io', github: 'celluloid/celluloid-io', branch: 'master'
gem 'websocket-protocol', github: 'faye/websocket-protocol-ruby', branch: 'master'

gem 'rack', github: 'rack/rack', branch: 'master'

gem 'jruby-openssl' if defined? JRUBY_VERSION
gem 'coveralls', require: false
Expand Down
12 changes: 9 additions & 3 deletions bin/reel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ parser = OptionParser.new do |opts|
options[:workers] = arg
end

opts.on "-x", "--pidfile FILE",
"Store PID of Reel's process in this file (default: none)" do |arg|
options[:pidfile] = arg
end

opts.on "-v", "--version", "Show version instead of slogan" do |arg|
options[:slogan] = false
end

opts.on "-r", "--rackup FILE",
"Load Rack config from this file (default: config.ru)" do |arg|
options[:rackup] = arg
Expand All @@ -57,9 +66,6 @@ end

handler = Rack::Handler::Reel.new(options)

Reel::Logger.info "A Reel good HTTP server!"
Reel::Logger.info "Listening on #{handler[:host]}:#{handler[:port]}"

handler.start

sleep
48 changes: 48 additions & 0 deletions examples/websocket_hijack.ru
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Run this as
# bundle exec rackup -E production -s reel examples/websocket_hijack.ru

require 'websocket/protocol'

class WS
attr_reader :env, :url

def initialize(env)
@env = env

secure = Rack::Request.new(env).ssl?
scheme = secure ? 'wss:' : 'ws:'
@url = scheme + '//' + env['HTTP_HOST'] + env['PATH_INFO']

@handler = WebSocket::Protocol.server(self)
end

def setup
env['rack.hijack'].call
@io = env['rack.hijack_io']

@handler.start

Celluloid::Actor.current.after(1) do
loop do
@handler.parse(@io.readpartial(1024))
end
end

@handler
end

def write(string)
@io.write(string)
end
end

class App
def self.call(env)
if WebSocket::Protocol.websocket?(env)
handler = WS.new(env).setup
handler.text "fofofo"
end
end
end

run App
28 changes: 23 additions & 5 deletions lib/rack/handler/reel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,26 @@ class Reel
:port => 3000,
:quiet => false,
:workers => 10,
:rackup => "config.ru"
:pidfile => nil,
:rackup => "config.ru",
:slogan => true
}

def self.run(app, options = {})

@handler = Reel.new(options.merge :app => app)

::Reel::Logger.info "A Reel good HTTP server!"
::Reel::Logger.info "Listening on #{@handler[:host]}:#{@handler[:port]}"


yield @handler if block_given?
@handler.start
end

def cli_announcement
::Reel::Logger.info ( @options[:slogan] ) ? "A Reel good HTTP server!" : "Reel #{::Reel::VERSION} "
::Reel::Logger.info "Listening on #{@options[:host]}:#{@options[:port]}"
::Reel::Logger.info "Number of workers: #{@options[:workers]}"
::Reel::Logger.info "Process ID saved to: #{@options[:pidfile]}" if @options[:pidfile]
::Reel::Logger.info "Process ID: #{Process.pid}"
end

def initialize(opts = {})
opts = normalize_options(opts)
Expand All @@ -38,18 +45,29 @@ def initialize(opts = {})
end

def start

cli_announcement
Celluloid::Actor[:reel_rack_pool] = ::Reel::RackWorker.pool(size: options[:workers], args: [self])

::Reel::Server.supervise_as(:reel_server, options[:host], options[:port]) do |connection|
Celluloid::Actor[:reel_rack_pool].handle(connection.detach)
end

if pidfile = @options[:pidfile]
File.open(pidfile, "w") { |f|
f.puts Process.pid
}
end

sleep
end

def stop
Celluloid::Actor[:reel_server].terminate!
Celluloid::Actor[:reel_rack_pool].terminate!

File.delete( pidfile ) if pidfile = @options[:pidfile] && File.file?( pidfile )

exit
end

Expand Down
2 changes: 1 addition & 1 deletion lib/reel/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def respond(response, headers_or_body = {}, body = nil)
# The client disconnected early
@keepalive = false
ensure
if @keepalive
if @keepalive || body.nil?
reset_request(:header)
else
@socket.close unless @socket.closed?
Expand Down
21 changes: 20 additions & 1 deletion lib/reel/rack_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,24 @@ def handle(connection)
when WebSocket
handle_websocket(request, connection)
end
return if request.hijacked?
end
end

def handle_request(request, connection)
status, headers, body_parts = @app.call(request_env(request, connection))
return if request.hijacked?

# XXX delete is not actually part of the Rack SPEC...
if hijacking_callback = headers.delete('rack.hijack')
connection.respond Response.new(status, headers)
hijacking_callback.call(request.hijack)
return
end

body, is_stream = response_body(body_parts)
connection.respond (is_stream ? StreamResponse : Response).new(status, headers, body)
request.close
end

def handle_websocket(request, connection)
Expand Down Expand Up @@ -113,12 +124,20 @@ def env request

env[RACK_INPUT] = StringIO.new(request.body || INITIAL_BODY)
env[RACK_INPUT].set_encoding(Encoding::BINARY) if env[RACK_INPUT].respond_to?(:set_encoding)
env[SERVER_NAME], env[SERVER_PORT] = (request[HOST]||'').split(':', 2)
if request[HOST]
env[SERVER_NAME], env[SERVER_PORT] = (request[HOST]||'').split(':', 2)
else
env[SERVER_NAME] = env[SERVER_PORT] = ""
end
env[SERVER_PORT] ||= @handler[:port].to_s
env[HTTP_VERSION] = request.version || env[SERVER_PROTOCOL]
env[REQUEST_METHOD] = request.method
env[PATH_INFO] = request.path
env[QUERY_STRING] = request.query_string || ''
env["rack.hijack?"] = true
env["rack.hijack"] = proc do
env["rack.hijack_io"] ||= request.hijack
end

(_ = request.headers.delete CONTENT_TYPE_ORIG) && (env[CONTENT_TYPE] = _)
(_ = request.headers.delete CONTENT_LENGTH_ORIG) && (env[CONTENT_LENGTH] = _)
Expand Down
19 changes: 14 additions & 5 deletions lib/reel/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,29 @@ def self.read(connection)
REQUEST_METHODS[parser.http_method] ||
raise(ArgumentError, "Unknown Request Method: %s" % parser.http_method)

upgrade = parser.headers[UPGRADE]
if upgrade && upgrade.downcase == WEBSOCKET
WebSocket.new(parser, connection.socket)
else
#upgrade = parser.headers[UPGRADE]
#if upgrade && upgrade.downcase == WEBSOCKET
# WebSocket.new(parser, connection.socket)
#else
Request.new(parser, connection)
end
#end
end

def_delegators :@connection, :respond, :finish_response, :close, :read

def initialize(http_parser, connection = nil)
@http_parser, @connection = http_parser, connection
@hijacked = false
end

def hijack
@hijacked = true
@connection.detach
@connection.socket
end

def hijacked?; @hijacked; end

def body
@body ||= begin
raise "no connection given" unless @connection
Expand Down
26 changes: 26 additions & 0 deletions lib/reel/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ class Server

def initialize(host, port, backlog = DEFAULT_BACKLOG, &callback)
# This is actually an evented Celluloid::IO::TCPServer

@server = TCPServer.new(host, port)
@server.listen(backlog)
@callback = callback
async.run

# TODO: Catch Errno::EADDRINUSE and kill overall process, even if supervised.

end

execute_block_on_receiver :initialize
Expand All @@ -28,6 +32,7 @@ def run

def handle_connection(socket)
connection = Connection.new(socket)
optimize_socket socket
begin
@callback[connection]
ensure
Expand All @@ -36,8 +41,29 @@ def handle_connection(socket)
end
end
rescue RequestError, EOFError
deoptimize_socket
# Client disconnected prematurely
# TODO: log this?
end

if RUBY_PLATFORM =~ /linux/
def optimize_socket(socket)
if socket.kind_of? TCPSocket
socket.setsockopt( Socket::IPPROTO_TCP, :TCP_NODELAY, 1 )
socket.setsockopt( Socket::IPPROTO_TCP, 3, 1 ) # TCP_CORK
socket.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1 )
end
end

def deoptimize_socket(socket)
socket.setsockopt(6, 3, 0) if socket.kind_of? TCPSocket
end
else
def optimize_socket(socket)
end

def deoptimize_socket(socket)
end
end
end
end

0 comments on commit 66c5605

Please sign in to comment.