Permalink
Browse files

Merge pull request #22 from tech-angels/ce327d73fc38bf394fc08d4188335…

…05678a55137

Added some more comments.
  • Loading branch information...
stevegraham committed Mar 5, 2012
2 parents 8726ac0 + ce327d7 commit a63ee308b8475619f71b8f7fcd9d0d32a9bce1d9
Showing with 57 additions and 3 deletions.
  1. +4 −1 lib/slanger/api_server.rb
  2. +12 −0 lib/slanger/channel.rb
  3. +2 −0 lib/slanger/config.rb
  4. +13 −2 lib/slanger/handler.rb
  5. +22 −0 lib/slanger/presence_channel.rb
  6. +4 −0 lib/slanger/redis.rb
@@ -18,11 +18,14 @@ class ApiServer < Sinatra::Base
error(Signature::AuthenticationError) { |c| halt 401, "401 UNAUTHORIZED\n" }
post '/apps/:app_id/channels/:channel_id/events' do
- # authenticate request. exclude 'channel_id' and 'app_id' included by sinatra but not sent by Pusher
+ # authenticate request. exclude 'channel_id' and 'app_id' included by sinatra but not sent by Pusher.
+ # Raises Signature::AuthenticationError if request does not authenticate.
Signature::Request.new('POST', env['PATH_INFO'], params.except('channel_id', 'app_id')).
authenticate { |key| Signature::Token.new key, Slanger::Config.secret }
f = Fiber.current
+ # Publish the event in Redis and translate the result into an HTTP
+ # status to return to the client.
Slanger::Redis.publish(params[:channel_id], payload).tap do |r|
r.callback { f.resume [202, {}, "202 ACCEPTED\n"] }
r.errback { f.resume [500, {}, "500 INTERNAL SERVER ERROR\n"] }
View
@@ -1,3 +1,10 @@
+# Channel class.
+#
+# Uses an EventMachine channel to let clients interact with the
+# Pusher channel. Relay events received from Redis into the
+# EM channel.
+#
+
require 'glamazon'
require 'eventmachine'
require 'forwardable'
@@ -18,10 +25,15 @@ def channel
@channel ||= EM::Channel.new
end
+ # Send a client event to the EventMachine channel.
+ # Only events to channels requiring authentication (private or presence)
+ # are accepted. Public channels only get events from the API.
def send_client_message(message)
push message.to_json if authenticated?
end
+ # Send an event received from Redis to the EventMachine channel
+ # which will send it to subscribed clients.
def dispatch(message, channel)
push(message.to_json) unless channel =~ /^slanger:/
end
View
@@ -1,3 +1,5 @@
+# Config singleton holding the configuration.
+
module Slanger
module Config
def load(opts={})
View
@@ -1,3 +1,6 @@
+# Handler class.
+# Handles a client connected via a websocket connection.
+
require 'active_support/json'
require 'active_support/core_ext/hash'
require 'securerandom'
@@ -18,16 +21,17 @@ def onmessage(msg)
event = msg['event'].gsub('pusher:', 'pusher_')
if event =~ /^pusher_/
+ # Pusher event, call method if it exists.
send(event, msg) if respond_to? event, true
elsif event =~ /^client-/
+ # Client event. Send it to the destination channel.
msg['socket_id'] = @socket_id
channel = find_channel msg['channel']
channel.try :send_client_message, msg
end
-
end
- # Unsubscribe this connection from the channel
+ # Unsubscribe this connection from all the channels on close.
def onclose
@subscriptions.each do |channel_id, subscription_id|
channel = find_channel channel_id
@@ -79,8 +83,11 @@ def pusher_pong msg; end
def subscribe_channel(channel_id)
channel = Slanger::Channel.find_or_create_by_channel_id(channel_id)
@socket.send(payload channel_id, 'pusher_internal:subscription_succeeded')
+ # Subscribe to the channel and have the events received from it
+ # sent to the client's socket.
subscription_id = channel.subscribe do |msg|
msg = JSON.parse(msg)
+ # Don't send the event if it was sent by the client
socket_id = msg.delete 'socket_id'
@socket.send msg.to_json unless socket_id == @socket_id
end
@@ -120,7 +127,11 @@ def handle_presence_subscription(msg)
}
})
}
+ # Subscribe to channel, call callback when done to send a
+ # subscription_succeeded event to the client.
channel.subscribe(msg, callback) do |msg|
+ # Send channel messages to the client, unless it is the
+ # sender of the event.
msg = JSON.parse(msg)
socket_id = msg.delete 'socket_id'
@socket.send msg.to_json unless socket_id == @socket_id
@@ -1,3 +1,10 @@
+# PresenceChannel class.
+#
+# Uses an EventMachine channel to let handlers interact with the
+# Pusher channel. Relay events received from Redis into the
+# EM channel. Keeps data on the subscribers to send it to clients.
+#
+
require 'glamazon'
require 'eventmachine'
require 'forwardable'
@@ -7,8 +14,11 @@ module Slanger
class PresenceChannel < Channel
def_delegators :channel, :push
+ # Send an event received from Redis to the EventMachine channel
def dispatch(message, channel)
if channel =~ /^slanger:/
+ # Messages received from the Redis channel slanger:* carry info on
+ # subscriptions. Update our subscribers accordingly.
update_subscribers message
else
push message.to_json
@@ -17,22 +27,28 @@ def dispatch(message, channel)
def initialize(attrs)
super
+ # Also subscribe the slanger daemon to a Redis channel used for events concerning subscriptions.
Slanger::Redis.subscribe 'slanger:connection_notification'
end
def subscribe(msg, callback, &blk)
channel_data = JSON.parse msg['data']['channel_data']
public_subscription_id = SecureRandom.uuid
+ # Send event about the new subscription to the Redis slanger:connection_notification Channel.
publisher = publish_connection_notification subscription_id: public_subscription_id, online: true,
channel_data: channel_data, channel: channel_id
+ # Associate the subscription data to the public id in Redis.
roster_add public_subscription_id, channel_data
# fuuuuuuuuuccccccck!
publisher.callback do
EM.next_tick do
+ # The Subscription event has been sent to Redis successfully.
+ # Call the provided callback.
callback.call
+ # Add the subscription to our table.
internal_subscription_table[public_subscription_id] = channel.subscribe &blk
end
end
@@ -51,6 +67,7 @@ def subscribers
def unsubscribe(public_subscription_id)
# Unsubcribe from EM::Channel
channel.unsubscribe(internal_subscription_table.delete(public_subscription_id)) # if internal_subscription_table[public_subscription_id]
+ # Remove subscription data from Redis
roster_remove public_subscription_id
# Notify all instances
publish_connection_notification subscription_id: public_subscription_id, online: false, channel: channel_id
@@ -59,6 +76,7 @@ def unsubscribe(public_subscription_id)
private
def get_roster
+ # Read subscription infos from Redis.
Fiber.new do
f = Fiber.current
Slanger::Redis.hgetall(channel_id).
@@ -68,14 +86,18 @@ def get_roster
end
def roster_add(key, value)
+ # Add subscription info to Redis.
Slanger::Redis.hset(channel_id, key, value)
end
def roster_remove(key)
+ # Remove subscription info from Redis.
Slanger::Redis.hdel(channel_id, key)
end
def publish_connection_notification(payload, retry_count=0)
+ # Send a subscription notification to the global slanger:connection_notification
+ # channel.
Slanger::Redis.publish('slanger:connection_notification', payload.to_json).
tap { |r| r.errback { publish_connection_notification payload, retry_count.succ unless retry_count == 5 } }
end
View
@@ -1,10 +1,14 @@
+# Redis class.
+# Interface with Redis.
+
require 'forwardable'
module Slanger
module Redis
extend Forwardable
def self.extended base
+ # Dispatch messages received from Redis to their destination channel.
base.on(:message) do |channel, message|
message = JSON.parse message
const = message['channel'] =~ /^presence-/ ? 'PresenceChannel' : 'Channel'

0 comments on commit a63ee30

Please sign in to comment.