Permalink
Browse files

take 1: HTTP -> SPDY -> ZMQ (XREQ) router

  • Loading branch information...
0 parents commit 7f4f752755616aa9a8d2e9d74e4be6cf7e0ac4af @igrigorik committed Apr 23, 2011
Showing with 120 additions and 0 deletions.
  1. +54 −0 config/router.rb
  2. +66 −0 router.rb
@@ -0,0 +1,54 @@
+@router = {}
+
+config['stream'] = 1
+config['identity'] = 'http-spdy-bridge'
+config['router'] = @router
+
+Thread.abort_on_exception = true
+ctx = EM::ZeroMQ::Context.new(1)
+
+class ZMQHandler
+ attr_reader :received
+
+ def initialize(router)
+ @router = router
+ @p = SPDY::Parser.new
+
+ @p.on_headers_complete do |stream, astream, priority, head|
+ p [:ZMQ_REPLY, :stream, stream, :headers, head]
+
+ status = head.delete('status')
+ version = head.delete('version')
+
+ headers = "#{version} #{status}\r\n"
+ head.each do |k,v|
+ headers << "%s: %s\r\n" % [k.capitalize, v]
+ end
+
+ @router[stream].stream_send(headers + "\r\n")
+ end
+
+ @p.on_body do |stream, data|
+ p [:SPDY_BODY, stream, data]
+ @router[stream].stream_send data
+ end
+
+ @p.on_message_complete do |stream|
+ p [:SPDY_FIN, stream]
+
+ @router[stream].stream_close
+ @router.delete stream
+ end
+ end
+
+ def on_readable(socket, messages)
+ messages.each do |m|
+ @p << m.copy_out_string
+ end
+ end
+end
+
+handler = ZMQHandler.new(@router)
+config['zmq'] = ctx.bind(ZMQ::XREQ, 'tcp://127.0.0.1:8000', handler, :identity => config['identity'])
+
+puts "Bound XREQ handler to port 8000, let the games begin!"
@@ -0,0 +1,66 @@
+$: << '/git/spdy/lib'
+$: << '/git/goliath/lib'
+$: << '/git/em-zeromq/lib'
+
+require 'em-zeromq'
+require 'goliath'
+require 'spdy'
+
+class Router < Goliath::API
+
+ def proxy(env, data)
+ sent = env.config['zmq'].send_msg('', data)
+ env.logger.info "Proxying: #{data.size} to ZMQ worker, #{sent}"
+ end
+
+ def on_headers(env, headers)
+ env.logger.info 'received headers: ' + headers.inspect
+
+ sr = SPDY::Protocol::Control::SynStream.new
+ headers = headers.inject({}) {|h,(k,v)| h[k.downcase] = v; h}
+ headers.merge!({
+ 'version' => env['HTTP_VERSION'],
+ 'method' => env['REQUEST_METHOD'],
+ 'url' => env['REQUEST_URI']
+ })
+
+ # assign a unique stream ID and store it
+ # in the HTTP > SPDY stream_id routing table
+ env['stream_id'] = env.config['stream']
+ env.config['router'][env['stream_id']] = env
+ env.config['stream'] += 1
+
+ sr.create(:stream_id => env['stream_id'], :headers => headers)
+ proxy(env, sr.to_binary_s)
+ end
+
+ def on_body(env, data)
+ env.logger.info 'received data: ' + data
+
+ body = SPDY::Protocol::Data::Frame.new
+ body.create(:stream_id => env['stream_id'], :data => data)
+
+ proxy(env, body.to_binary_s)
+ end
+
+ def on_close(env)
+ env.logger.info "client closed connection, stream: #{env['stream_id']}"
+ env.config['router'].delete env['stream_id']
+
+ # TODO: SEND RST
+ end
+
+ def response(env)
+ env.logger.info "Finished connection-request"
+
+ fin = SPDY::Protocol::Data::Frame.new
+ fin.create(:stream_id => env['stream_id'], :flags => 1)
+
+ proxy(env, fin.to_binary_s)
+
+ # TODO: merge upstream Goliath return
+ # Goliath::Connection::AsyncResponse
+ nil
+ end
+
+end

0 comments on commit 7f4f752

Please sign in to comment.