Permalink
Browse files

Added some more comments.

  • Loading branch information...
Gilbert Roulot
Gilbert Roulot committed Mar 1, 2012
1 parent 8726ac0 commit ce327d73fc38bf394fc08d418833505678a55137
Showing with 57 additions and 3 deletions.
  1. +4 −1 lib/slanger/api_server.rb
  2. +12 −0 lib/slanger/channel.rb
  3. +2 −0 lib/slanger/config.rb
  4. +13 −2 lib/slanger/handler.rb
  5. +22 −0 lib/slanger/presence_channel.rb
  6. +4 −0 lib/slanger/redis.rb
@@ -18,11 +18,14 @@ class ApiServer < Sinatra::Base
error(Signature::AuthenticationError) { |c| halt 401, "401 UNAUTHORIZED\n" }
post '/apps/:app_id/channels/:channel_id/events' do
# authenticate request. exclude 'channel_id' and 'app_id' included by sinatra but not sent by Pusher
# authenticate request. exclude 'channel_id' and 'app_id' included by sinatra but not sent by Pusher.
# Raises Signature::AuthenticationError if request does not authenticate.
Signature::Request.new('POST', env['PATH_INFO'], params.except('channel_id', 'app_id')).
authenticate { |key| Signature::Token.new key, Slanger::Config.secret }
f = Fiber.current
# Publish the event in Redis and translate the result into an HTTP
# status to return to the client.
Slanger::Redis.publish(params[:channel_id], payload).tap do |r|
r.callback { f.resume [202, {}, "202 ACCEPTED\n"] }
r.errback { f.resume [500, {}, "500 INTERNAL SERVER ERROR\n"] }
View
@@ -1,3 +1,10 @@
# Channel class.
#
# Uses an EventMachine channel to let clients interact with the
# Pusher channel. Relay events received from Redis into the
# EM channel.
#
require 'glamazon'
require 'eventmachine'
require 'forwardable'
@@ -18,10 +25,15 @@ def channel
@channel ||= EM::Channel.new
end
# Send a client event to the EventMachine channel.
# Only events to channels requiring authentication (private or presence)
# are accepted. Public channels only get events from the API.
def send_client_message(message)
push message.to_json if authenticated?
end
# Send an event received from Redis to the EventMachine channel
# which will send it to subscribed clients.
def dispatch(message, channel)
push(message.to_json) unless channel =~ /^slanger:/
end
View
@@ -1,3 +1,5 @@
# Config singleton holding the configuration.
module Slanger
module Config
def load(opts={})
View
@@ -1,3 +1,6 @@
# Handler class.
# Handles a client connected via a websocket connection.
require 'active_support/json'
require 'active_support/core_ext/hash'
require 'securerandom'
@@ -18,16 +21,17 @@ def onmessage(msg)
event = msg['event'].gsub('pusher:', 'pusher_')
if event =~ /^pusher_/
# Pusher event, call method if it exists.
send(event, msg) if respond_to? event, true
elsif event =~ /^client-/
# Client event. Send it to the destination channel.
msg['socket_id'] = @socket_id
channel = find_channel msg['channel']
channel.try :send_client_message, msg
end
end
# Unsubscribe this connection from the channel
# Unsubscribe this connection from all the channels on close.
def onclose
@subscriptions.each do |channel_id, subscription_id|
channel = find_channel channel_id
@@ -79,8 +83,11 @@ def pusher_pong msg; end
def subscribe_channel(channel_id)
channel = Slanger::Channel.find_or_create_by_channel_id(channel_id)
@socket.send(payload channel_id, 'pusher_internal:subscription_succeeded')
# Subscribe to the channel and have the events received from it
# sent to the client's socket.
subscription_id = channel.subscribe do |msg|
msg = JSON.parse(msg)
# Don't send the event if it was sent by the client
socket_id = msg.delete 'socket_id'
@socket.send msg.to_json unless socket_id == @socket_id
end
@@ -120,7 +127,11 @@ def handle_presence_subscription(msg)
}
})
}
# Subscribe to channel, call callback when done to send a
# subscription_succeeded event to the client.
channel.subscribe(msg, callback) do |msg|
# Send channel messages to the client, unless it is the
# sender of the event.
msg = JSON.parse(msg)
socket_id = msg.delete 'socket_id'
@socket.send msg.to_json unless socket_id == @socket_id
@@ -1,3 +1,10 @@
# PresenceChannel class.
#
# Uses an EventMachine channel to let handlers interact with the
# Pusher channel. Relay events received from Redis into the
# EM channel. Keeps data on the subscribers to send it to clients.
#
require 'glamazon'
require 'eventmachine'
require 'forwardable'
@@ -7,8 +14,11 @@ module Slanger
class PresenceChannel < Channel
def_delegators :channel, :push
# Send an event received from Redis to the EventMachine channel
def dispatch(message, channel)
if channel =~ /^slanger:/
# Messages received from the Redis channel slanger:* carry info on
# subscriptions. Update our subscribers accordingly.
update_subscribers message
else
push message.to_json
@@ -17,22 +27,28 @@ def dispatch(message, channel)
def initialize(attrs)
super
# Also subscribe the slanger daemon to a Redis channel used for events concerning subscriptions.
Slanger::Redis.subscribe 'slanger:connection_notification'
end
def subscribe(msg, callback, &blk)
channel_data = JSON.parse msg['data']['channel_data']
public_subscription_id = SecureRandom.uuid
# Send event about the new subscription to the Redis slanger:connection_notification Channel.
publisher = publish_connection_notification subscription_id: public_subscription_id, online: true,
channel_data: channel_data, channel: channel_id
# Associate the subscription data to the public id in Redis.
roster_add public_subscription_id, channel_data
# fuuuuuuuuuccccccck!
publisher.callback do
EM.next_tick do
# The Subscription event has been sent to Redis successfully.
# Call the provided callback.
callback.call
# Add the subscription to our table.
internal_subscription_table[public_subscription_id] = channel.subscribe &blk
end
end
@@ -51,6 +67,7 @@ def subscribers
def unsubscribe(public_subscription_id)
# Unsubcribe from EM::Channel
channel.unsubscribe(internal_subscription_table.delete(public_subscription_id)) # if internal_subscription_table[public_subscription_id]
# Remove subscription data from Redis
roster_remove public_subscription_id
# Notify all instances
publish_connection_notification subscription_id: public_subscription_id, online: false, channel: channel_id
@@ -59,6 +76,7 @@ def unsubscribe(public_subscription_id)
private
def get_roster
# Read subscription infos from Redis.
Fiber.new do
f = Fiber.current
Slanger::Redis.hgetall(channel_id).
@@ -68,14 +86,18 @@ def get_roster
end
def roster_add(key, value)
# Add subscription info to Redis.
Slanger::Redis.hset(channel_id, key, value)
end
def roster_remove(key)
# Remove subscription info from Redis.
Slanger::Redis.hdel(channel_id, key)
end
def publish_connection_notification(payload, retry_count=0)
# Send a subscription notification to the global slanger:connection_notification
# channel.
Slanger::Redis.publish('slanger:connection_notification', payload.to_json).
tap { |r| r.errback { publish_connection_notification payload, retry_count.succ unless retry_count == 5 } }
end
View
@@ -1,10 +1,14 @@
# Redis class.
# Interface with Redis.
require 'forwardable'
module Slanger
module Redis
extend Forwardable
def self.extended base
# Dispatch messages received from Redis to their destination channel.
base.on(:message) do |channel, message|
message = JSON.parse message
const = message['channel'] =~ /^presence-/ ? 'PresenceChannel' : 'Channel'

0 comments on commit ce327d7

Please sign in to comment.