Browse files

make it eventmachine driven

  • Loading branch information...
1 parent 82e25ae commit c1f8982a6a2f9efb51624563ca1adf36d4218767 @ichiban committed Dec 4, 2011
View
1 Gemfile
@@ -4,3 +4,4 @@ gemspec
# For development
gem 'yajl-ruby', '~> 0.7.8', :require => 'yajl'
+gem "async_sinatra", "~> 0.5.0"
View
4 example/.gitignore
@@ -0,0 +1,4 @@
+*.sqlite
+logs/
+tmp/
+
View
14 example/async_sinatra/app.rb
@@ -0,0 +1,14 @@
+require 'sinatra'
+require 'sinatra/async'
+require 'yajl/json_gem'
+
+class AsyncExample < Sinatra::Base
+ register Sinatra::Async
+
+ aget '*' do
+ EM.add_timer(5) do
+ body request.env.to_json
+ end
+ end
+end
+
View
8 example/async_sinatra/config.ru
@@ -0,0 +1,8 @@
+$: << File.expand_path(File.join(File.dirname(__FILE__), '..', '..', 'lib'))
+$:.unshift(File.expand_path('.')) # Ruby 1.9 doesn't have . in the load path...
+
+require 'rack/handler/mongrel2'
+require 'app'
+
+Rack::Handler::Mongrel2.run AsyncExample, :uuid => 'sinatra', :block => true
+exit(0)
View
4 example/mongrel2.conf
@@ -3,6 +3,10 @@ darkblog2 = Host(name='localhost', routes={
send_ident='9539ED88-1B33-4D19-A9F9-283E5BF11AC7',
recv_spec='tcp://127.0.0.1:9996',
recv_ident='')
+ '/async/': Handler(send_spec='tcp://127.0.0.1:9999',
+ send_ident='AEE66029-E420-42E7-A7C8-6C37BBFC7B9F',
+ recv_spec='tcp://127.0.0.1:9998',
+ recv_ident='')
})
main = Server(
View
25 example/sinatra/mongrel2.conf
@@ -1,25 +0,0 @@
-darkblog2 = Host(name='localhost', routes={
- '/': Handler(send_spec='tcp://127.0.0.1:9997',
- send_ident='sinatra',
- recv_spec='tcp://127.0.0.1:9996',
- recv_ident='')
-})
-
-main = Server(
- uuid='sinatra-example',
- chroot='.',
- access_log='/run/access.log',
- error_log='/run/error.log',
- pid_file='/run/mongrel2.pid',
- default_host='localhost',
- name='main',
- port=8080,
- hosts=[darkblog2]
-)
-
-
-settings = {
- 'zeromq.threads': 1
-}
-
-servers = [main]
View
61 lib/mongrel2/connection.rb
@@ -1,37 +1,70 @@
-require 'ffi-rzmq'
+require 'em-zeromq'
require 'mongrel2/request'
require 'mongrel2/response'
module Mongrel2
class Connection
+ attr_reader :received
@context = nil
def self.context
- @context ||= ZMQ::Context.new(1)
+ @context ||= EM::ZeroMQ::Context.new(1)
end
- def initialize(uuid, sub, pub)
- @uuid, @sub, @pub = uuid, sub, pub
+ def initialize(uuid, sub, pub, app)
+ @uuid, @sub, @pub, @app = uuid, sub, pub, app
# Connect to receive requests
- @reqs = self.class.context.socket(ZMQ::PULL)
- @reqs.connect(sub)
+ @reqs = self.class.context.connect(ZMQ::PULL, sub, self)
# Connect to send responses
- @resp = self.class.context.socket(ZMQ::PUB)
- @resp.connect(pub)
- @resp.setsockopt(ZMQ::IDENTITY, uuid)
+ @resp = self.class.context.connect(ZMQ::PUB, pub, nil, :identity => uuid)
end
- def recv
- msg = @reqs.recv_string(0)
- msg.nil? ? nil : Request.parse(msg)
+ def on_readable(socket, messages)
+ messages.each do |msg|
+ req = msg.nil? ? nil : Request.parse(msg.copy_out_string, self)
+ next if req.nil? || req.disconnect?
+ process req
+ end
+ end
+
+ def process(req)
+ pre = Proc.new do
+ method(:pre_process).call(req)
+ end
+
+ post = Proc.new do |resp|
+ method(:post_process).call(resp, req)
+ end
+
+ EM.defer pre, post
+ end
+
+ def pre_process(req)
+ status, headers, rack_response = -1, {}, []
+
+ catch(:async) do
+ status, headers, rack_response = @app.call(req.env)
+ end
+
+ [status, headers, rack_response]
+ end
+
+ def post_process(response, req)
+ status, headers, rack_response = *response
+ # Status code -1 indicates that we're going to respond later (async).
+ return if -1 == status
+
+ body = ''
+ rack_response.each { |b| body << b }
+ reply req, body, status, headers
end
def reply(req, body, status = 200, headers = {})
resp = Response.new(@resp)
- resp.send_http(req, body, status, headers)
- resp.close(req) if req.close?
+ resp.send_http req, body, status, headers
+ resp.close req if req.close?
end
def close
View
44 lib/mongrel2/request.rb
@@ -2,16 +2,16 @@
module Mongrel2
class Request
- attr_reader :headers, :body, :uuid, :conn_id, :path
+ attr_reader :headers, :body, :uuid, :conn_id, :path, :connection
class << self
- def parse(msg)
+ def parse(msg, connection)
# UUID CONN_ID PATH SIZE:HEADERS,SIZE:BODY,
uuid, conn_id, path, rest = msg.split(' ', 4)
headers, rest = parse_netstring(rest)
body, _ = parse_netstring(rest)
headers = Mongrel2::JSON.parse(headers)
- new(uuid, conn_id, path, headers, body)
+ new(uuid, conn_id, path, headers, body, connection)
end
def parse_netstring(ns)
@@ -24,9 +24,10 @@ def parse_netstring(ns)
end
end
- def initialize(uuid, conn_id, path, headers, body)
+ def initialize(uuid, conn_id, path, headers, body, connection)
@uuid, @conn_id, @path, @headers, @body = uuid, conn_id, path, headers, body
@data = headers['METHOD'] == 'JSON' ? Mongrel2::JSON.parse(body) : {}
+ @connection = connection
end
def disconnect?
@@ -36,5 +37,38 @@ def disconnect?
def close?
headers['connection'] == 'close' || headers['VERSION'] == 'HTTP/1.0'
end
+
+ def env
+ script_name = ENV['RACK_RELATIVE_URL_ROOT'] || headers['PATTERN'].split('(', 2).first.gsub(/\/$/, '')
+ env = {
+ 'rack.version' => Rack::VERSION,
+ 'rack.url_scheme' => 'http', # Only HTTP for now
+ 'rack.input' => StringIO.new(body),
+ 'rack.errors' => $stderr,
+ 'rack.multithread' => true,
+ 'rack.multiprocess' => true,
+ 'rack.run_once' => false,
+ 'mongrel2.pattern' => headers['PATTERN'],
+ 'REQUEST_METHOD' => headers['METHOD'],
+ 'CONTENT_TYPE' => headers['content-type'],
+ 'SCRIPT_NAME' => script_name,
+ 'PATH_INFO' => headers['PATH'].gsub(script_name, ''),
+ 'QUERY_STRING' => headers['QUERY'] || '',
+ 'async.callback' => Proc.new { |resp|
+ connection.method(:post_process).call(resp, self)
+ },
+ 'async.close' => EM::DefaultDeferrable.new
+ }
+
+ env['SERVER_NAME'], env['SERVER_PORT'] = headers['host'].split(':', 2)
+ headers.each do |key, val|
+ unless key =~ /content_(type|length)/i
+ key = "HTTP_#{key.upcase.gsub('-', '_')}"
+ end
+ env[key] = val
+ end
+
+ env
+ end
end
-end
+end
View
4 lib/mongrel2/response.rb
@@ -58,7 +58,7 @@ def close(req)
private
def send_resp(uuid, conn_id, data)
- @resp.send_string('%s %d:%s, %s' % [uuid, conn_id.size, conn_id, data])
+ @resp.send_msg('%s %d:%s, %s' % [uuid, conn_id.size, conn_id, data])
end
def build_http_response(body, status, headers)
@@ -67,4 +67,4 @@ def build_http_response(body, status, headers)
"HTTP/1.1 #{status} #{StatusMessage[status.to_i]}\r\n#{headers}\r\n\r\n#{body}"
end
end
-end
+end
View
53 lib/rack/handler/mongrel2.rb
@@ -1,5 +1,6 @@
require 'mongrel2/connection'
require 'stringio'
+require 'eventmachine'
module Rack
module Handler
@@ -14,52 +15,18 @@ def run(app, options = {})
raise ArgumentError.new('Must specify an :uuid or set RACK_MONGREL2_UUID') if options[:uuid].nil?
- conn = ::Mongrel2::Connection.new(options[:uuid], options[:recv], options[:send])
+ conn = nil
- running = true
-
- # This doesn't work at all until zmq fixes their shit (in 2.1.x I think), but trap it now anyway.
- %w(INT TERM KILL).each do |sig|
- trap(sig) do
- conn.close
- running = false
- end
- end
-
- while running
- req = conn.recv rescue nil
- next if req.nil? || req.disconnect?
- break if !running
-
- script_name = ENV['RACK_RELATIVE_URL_ROOT'] || req.headers['PATTERN'].split('(', 2).first.gsub(/\/$/, '')
- env = {
- 'rack.version' => Rack::VERSION,
- 'rack.url_scheme' => 'http', # Only HTTP for now
- 'rack.input' => StringIO.new(req.body),
- 'rack.errors' => $stderr,
- 'rack.multithread' => true,
- 'rack.multiprocess' => true,
- 'rack.run_once' => false,
- 'mongrel2.pattern' => req.headers['PATTERN'],
- 'REQUEST_METHOD' => req.headers['METHOD'],
- 'CONTENT_TYPE' => req.headers['content-type'],
- 'SCRIPT_NAME' => script_name,
- 'PATH_INFO' => req.headers['PATH'].gsub(script_name, ''),
- 'QUERY_STRING' => req.headers['QUERY'] || ''
- }
-
- env['SERVER_NAME'], env['SERVER_PORT'] = req.headers['host'].split(':', 2)
- req.headers.each do |key, val|
- unless key =~ /content_(type|length)/i
- key = "HTTP_#{key.upcase.gsub('-', '_')}"
+ EM.run do
+ conn = ::Mongrel2::Connection.new(options[:uuid], options[:recv], options[:send], app)
+
+ # This doesn't work at all until zmq fixes their shit (in 2.1.x I think), but trap it now anyway.
+ %w(INT TERM KILL).each do |sig|
+ trap(sig) do
+ conn.close
+ EM.stop
end
- env[key] = val
end
-
- status, headers, rack_response = app.call(env)
- body = ''
- rack_response.each { |b| body << b }
- conn.reply(req, body, status, headers)
end
ensure
conn.close if conn.respond_to?(:close)
View
2 rack-mongrel2.gemspec
@@ -41,7 +41,7 @@ Gem::Specification.new do |s|
## List your runtime dependencies here. Runtime dependencies are those
## that are needed for an end user to actually USE your code.
s.add_dependency('ffi', ['~> 1.0.0'])
- s.add_dependency('ffi-rzmq', ['~> 0.7.0'])
+ s.add_dependency('em-zeromq', ['~> 0.2.2'])
## List your development dependencies here. Development dependencies are
## those that are only needed during development
View
4 spec/request_spec.rb
@@ -26,7 +26,7 @@
it "should parse a Mongrel2 message and have all parts populated" do
netstring = "UUID CON PATH 253:{\"PATH\":\"/\",\"user-agent\":\"curl/7.19.7 (universal-apple-darwin10.0) libcurl/7.19.7 OpenSSL/0.9.8l zlib/1.2.3\",\"host\":\"localhost:6767\",\"accept\":\"*/*\",\"connection\":\"close\",\"x-forwarded-for\":\"::1\",\"METHOD\":\"GET\",\"VERSION\":\"HTTP/1.1\",\"URI\":\"/\",\"PATTERN\":\"/\"},0:,"
- r = Mongrel2::Request.parse(netstring)
+ r = Mongrel2::Request.parse(netstring, double())
r.should_not be_nil
r.uuid.should eql('UUID')
r.conn_id.should eql('CON')
@@ -44,4 +44,4 @@
r.headers['PATTERN'].should eql('/')
r.close?.should be_true
end
-end
+end
View
6 spec/response_spec.rb
@@ -13,7 +13,7 @@
@req.should_receive(:conn_id) { 'CONN_ID' }
httpreq = "UUID 7:CONN_ID, HTTP/1.1 200 OK\r\nContent-Length: 4\r\n\r\nBoo!"
- @resp.should_receive(:send_string).with(httpreq)
+ @resp.should_receive(:send_msg).with(httpreq)
@response.send_http(@req, 'Boo!', 200, {})
end
@@ -23,8 +23,8 @@
@req.should_receive(:conn_id) { 'CONN_ID' }
httpreq = 'UUID 7:CONN_ID, '
- @resp.should_receive(:send_string).with(httpreq)
+ @resp.should_receive(:send_msg).with(httpreq)
@response.close(@req)
end
-end
+end

0 comments on commit c1f8982

Please sign in to comment.