diff --git a/config/router.rb b/config/router.rb index 87928b9..7619acc 100644 --- a/config/router.rb +++ b/config/router.rb @@ -25,17 +25,21 @@ def initialize(router) headers << "%s: %s\r\n" % [k.capitalize, v] end + # emit HTTP headers generated by ZMQ worker @router[stream].stream_send(headers + "\r\n") end @p.on_body do |stream, data| p [:SPDY_BODY, stream, data] + + # emit data chunk generated by ZMQ worker @router[stream].stream_send data end @p.on_message_complete do |stream| p [:SPDY_FIN, stream] + # terminate connection when ZMQ worker sends a FIN flag @router[stream].stream_close @router.delete stream end diff --git a/router.rb b/router.rb index 0b11d70..a3a7343 100644 --- a/router.rb +++ b/router.rb @@ -8,13 +8,22 @@ class Router < Goliath::API - def proxy(env, data) - sent = env.config['zmq'].send_msg('', data) - env.logger.info "Proxying: #{data.size} to ZMQ worker, #{sent}" + def proxy(env, data, flush = false) + if flush + data = [env['spdy'], data].flatten + env.logger.info data + sent = env.config['zmq'].send_msg('', *data) + env.logger.info "Proxying: #{data.size} messages to ZMQ worker, status: #{sent}" + + else + env['spdy'] ||= [] + env['spdy'] << data + env.logger.info "Buffered SPDY message. buffer size: #{env['spdy'].size}" + end end def on_headers(env, headers) - env.logger.info 'received headers: ' + headers.inspect + env.logger.info 'received HTTP headers: ' + headers.inspect sr = SPDY::Protocol::Control::SynStream.new headers = headers.inject({}) {|h,(k,v)| h[k.downcase] = v; h} @@ -35,12 +44,15 @@ def on_headers(env, headers) end def on_body(env, data) - env.logger.info 'received data: ' + data + return if data.empty? + env.logger.info 'received HTTP data: ' + data - body = SPDY::Protocol::Data::Frame.new - body.create(:stream_id => env['stream_id'], :data => data) + # TODO: Proxy body data in chunks + # body = SPDY::Protocol::Data::Frame.new + # body.create(:stream_id => env['stream_id'], :data => data) + # proxy(env, body.to_binary_s) - proxy(env, body.to_binary_s) + (env['body'] ||= '') << data end def on_close(env) @@ -54,9 +66,9 @@ def response(env) env.logger.info "Finished connection-request" fin = SPDY::Protocol::Data::Frame.new - fin.create(:stream_id => env['stream_id'], :flags => 1) + fin.create(:stream_id => env['stream_id'], :data => env['body'], :flags => 1) - proxy(env, fin.to_binary_s) + proxy(env, fin.to_binary_s, true) # TODO: merge upstream Goliath return # Goliath::Connection::AsyncResponse