Permalink
Browse files

Make sure all event registration and input processing happens on the …

…EventMachine thread, so that we don't get race conditions where incoming messages are processed before the socket's onmessage event handler has been registered.
  • Loading branch information...
1 parent c05e090 commit a65f5d9455a4466d4e2fba4f18dc5e5a9c21f256 @jcoglan jcoglan committed Mar 19, 2013
Showing with 29 additions and 13 deletions.
  1. +3 −0 examples/app.rb
  2. +1 −7 examples/server.rb
  3. +6 −3 lib/faye/rack_stream.rb
  4. +19 −3 lib/faye/websocket/api/event_target.rb
View
@@ -48,3 +48,6 @@
end
end
+def App.log(message)
+end
+
View
@@ -7,12 +7,6 @@
engine = ARGV[2] || 'thin'
spec = File.expand_path('../../spec', __FILE__)
-module Logger
- def self.log(message)
- $stdout.puts(message)
- end
-end
-
require File.expand_path('../app', __FILE__)
if %[goliath thin].include?(engine)
Faye::WebSocket.load_adapter(engine)
@@ -32,7 +26,7 @@ def response(env)
when 'puma'
events = Puma::Events.new($stdout, $stderr)
binder = Puma::Binder.new(events)
- binder.parse(["tcp://0.0.0.0:#{port}"], Logger)
+ binder.parse(["tcp://0.0.0.0:#{port}"], App)
server = Puma::Server.new(App, events)
server.binder = binder
server.run.join
View
@@ -23,16 +23,19 @@ def initialize(socket_object)
if socket_object.env['rack.hijack?']
socket_object.env['rack.hijack'].call
@rack_hijack_io = socket_object.env['rack.hijack_io']
- EventMachine.attach(@rack_hijack_io, Reader) { |r| r.stream = self }
+ EventMachine.attach(@rack_hijack_io, Reader) do |reader|
+ @rack_hijack_io_reader = reader
+ reader.stream = self
+ end
end
@connection.socket_stream = self if @connection.respond_to?(:socket_stream)
end
def clean_rack_hijack
return unless @rack_hijack_io
- @rack_hijack_io.close
- @rack_hijack_io = nil
+ @rack_hijack_io_reader.close_connection_after_writing
+ @rack_hijack_io = @rack_hijack_io_reader = nil
end
def close_connection
@@ -1,7 +1,18 @@
module Faye::WebSocket::API
module EventTarget
- attr_accessor :onopen, :onmessage, :onerror, :onclose
+ events = %w[open message error close]
+
+ events.each do |event_type|
+ define_method "on#{event_type}=" do |handler|
+ EventMachine.next_tick do
+ if buffer = @buffers && @buffers.delete(event_type)
+ buffer.each { |event| handler.call(event) }
+ end
+ instance_variable_set("@on#{event_type}", handler)
+ end
+ end
+ end
def add_event_listener(event_type, listener, use_capture = false)
@listeners ||= {}
@@ -20,8 +31,13 @@ def dispatch_event(event)
event.target = event.current_target = self
event.event_phase = Event::AT_TARGET
- callback = __send__("on#{ event.type }")
- callback.call(event) if callback
+ callback = instance_variable_get("@on#{ event.type }")
+ if callback
+ callback.call(event)
+ else
+ @buffers ||= Hash.new { |k,v| k[v] = [] }
+ @buffers[event.type].push(event)
+ end
return unless @listeners and @listeners[event.type]
@listeners[event.type].each do |listener|

0 comments on commit a65f5d9

Please sign in to comment.