Permalink
Browse files

Cachig json representation of events

  • Loading branch information...
1 parent 772c119 commit 9597f14575379d0089997cc4e78eba2c7ea807fe @afcapel committed Sep 4, 2011
View
@@ -3,10 +3,10 @@
require_relative 'alondra/connection'
require_relative 'alondra/channel'
require_relative 'alondra/command'
+require_relative 'alondra/command_dispatcher'
require_relative 'alondra/event_router'
require_relative 'alondra/message_queue_client'
require_relative 'alondra/message_queue'
-require_relative 'alondra/message_dispatcher'
require_relative 'alondra/pushing'
require_relative 'alondra/event_listener'
require_relative 'alondra/session_parser'
@@ -73,7 +73,7 @@ def unsubscribe(connection)
:channel => name }
- event = Event.new event_hash, connection
+ event = Event.new(event_hash, nil, connection)
event.fire!
end
@@ -32,8 +32,7 @@ def fire_event(event_type)
:resource_type => @connection.session.class.name,
:channel => @channel_name}
- event = Event.new event_hash, connection
- event.fire!
+ Event.new(event_hash, nil, connection).fire!
end
end
end
@@ -0,0 +1,22 @@
+module Alondra
+
+ class NotRecognizedCommand < StandardError; end
+
+ module CommandDispatcher
+ extend self
+
+ def dispatch(input, connection)
+ msg = parse(input)
+
+ unless msg.kind_of?(Hash) && msg[:command].present?
+ raise NotRecognizedCommand.new("Unrecognized command: #{input}")
+ end
+
+ Command.new(connection, msg).execute!
+ end
+
+ def parse(string)
+ msg = ActiveSupport::JSON.decode(string).symbolize_keys
+ end
+ end
+end
View
@@ -6,9 +6,10 @@ class Event
attr_reader :resource_type
attr_reader :connection
- def initialize(event_hash, connection = nil)
- @connection = connection
- @type = event_hash[:event].to_sym
+ def initialize(event_hash, from_json = nil, connection = nil)
+ @connection = connection
+ @type = event_hash[:event].to_sym
+ @json_encoded = from_json
if Hash === event_hash[:resource]
@resource = fetch(event_hash[:resource_type], event_hash[:resource])
@@ -32,7 +33,10 @@ def channel
def fire!
if connection
- MessageQueue.instance.receive self
+ # We are inside the Alondra Server
+ EM.schedule do
+ MessageQueue.instance.receive self
+ end
else
MessageQueueClient.push self
end
@@ -66,6 +70,5 @@ def fetch(resource_type_name, attributes)
resource.assign_attributes(filtered_attributes, :without_protection => true)
resource
end
-
end
end
@@ -8,6 +8,8 @@ def self.listeners
def process(event)
event.channel.receive(event)
+ # Event listeners callback can manipulate AR objects and so can potentially
+ # block the EM reactor thread. To avoid that, we defer them to another thread.
EM.defer do
# Ensure the connection associated with the thread is checked in
@@ -1,22 +0,0 @@
-module Alondra
- module MessageDispatcher
- extend self
-
- def parse(string)
- msg = ActiveSupport::JSON.decode(string).symbolize_keys
- end
-
- def dispatch(input, connection)
- msg = parse(input)
-
- raise 'Unrecognized message' unless msg.kind_of?(Hash)
-
- if msg[:command]
- Command.new(connection, msg).execute!
- elsif msg[:event]
- event = Event.new(msg, connection)
- MessageQueueClient.push(event)
- end
- end
- end
-end
@@ -34,7 +34,8 @@ def parse(received_string)
received_hash = ActiveSupport::JSON.decode(received_string).symbolize_keys
if received_hash[:event]
- receive(Event.new(received_hash))
+ event = Event.new(received_hash, received_string)
+ receive(event)
elsif received_hash[:message]
message = Message.new(received_hash[:message], received_hash[:channel_names])
message.send_to_channels
@@ -31,7 +31,7 @@ def run
websocket.onmessage do |msg|
Rails.logger.info "received: #{msg}"
- MessageDispatcher.dispatch(msg, Connections[websocket])
+ CommandDispatcher.dispatch(msg, Connections[websocket])
end
end

0 comments on commit 9597f14

Please sign in to comment.