Skip to content

Commit

Permalink
Port multiple subscription features to Ruby.
Browse files Browse the repository at this point in the history
  • Loading branch information
jcoglan committed May 31, 2010
1 parent 2d38ba2 commit 714abfe
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 27 deletions.
1 change: 1 addition & 0 deletions Manifest.txt
Expand Up @@ -34,6 +34,7 @@ lib/faye/namespace.rb
lib/faye/publisher.rb
lib/faye/rack_adapter.rb
lib/faye/server.rb
lib/faye/subscription.rb
lib/faye/timeouts.rb
lib/faye/transport.rb
test/scenario.js
Expand Down
4 changes: 2 additions & 2 deletions lib/faye.rb
Expand Up @@ -15,8 +15,8 @@ module Faye
CONNECTION_TYPES = %w[long-polling callback-polling]

%w[ publisher timeouts logging grammar
namespace server channel connection
error client transport
namespace server channel subscription
connection error client transport

].each do |lib|
require File.join(ROOT, 'faye', lib)
Expand Down
27 changes: 27 additions & 0 deletions lib/faye/channel.rb
Expand Up @@ -119,6 +119,33 @@ def glob(path = [])
list << @children[:**].value if @children[:**]
list.flatten
end

def subscribe(names, callback)
return unless callback
names.each do |name|
channel = self[name] ||= Channel.new(name)
channel.add_subscriber(:message, callback)
end
end

def unsubscribe(names, callback)
dead_channels = []

names.each do |name|
channel = self[name]
next unless channel
channel.remove_subscriber(:message, callback)
dead_channels.push(name) if channel.count_subscribers(:message).zero?
end

dead_channels
end

def distribute_message(message)
glob(message['channel']).each do |channel|
channel.publish_event(:message, message['data'])
end
end
end

end
Expand Down
34 changes: 16 additions & 18 deletions lib/faye/client.rb
Expand Up @@ -165,10 +165,10 @@ def disconnect
# * id
# * timestamp
def subscribe(channels, &block)
channels = [channels].flatten
validate_channels(channels)

connect {
channels = [channels].flatten
validate_channels(channels)

info('Client ? attempting to subscribe to ?', @client_id, channels)

@transport.send({
Expand All @@ -177,15 +177,15 @@ def subscribe(channels, &block)
'subscription' => channels

}, &verify_client_id { |response|
if response['successful'] and block

info('Subscription acknowledged for ? to ?', @client_id, channels)
if response['successful']

channels = [response['subscription']].flatten
channels.each { |channel| @channels[channel] = block }
info('Subscription acknowledged for ? to ?', @client_id, channels)
@channels.subscribe(channels, block)
end
})
}
Subscription.new(self, channels, block)
end

# Request Response
Expand All @@ -199,24 +199,24 @@ def subscribe(channels, &block)
# * id
# * timestamp
def unsubscribe(channels, &block)
channels = [channels].flatten
validate_channels(channels)

dead_channels = @channels.unsubscribe(channels, block)

connect {
channels = [channels].flatten
validate_channels(channels)

info('Client ? attempting to unsubscribe from ?', @client_id, channels)
info('Client ? attempting to unsubscribe from ?', @client_id, dead_channels)

@transport.send({
'channel' => Channel::UNSUBSCRIBE,
'clientId' => @client_id,
'subscription' => channels
'subscription' => dead_channels

}, &verify_client_id { |response|
if response['successful']

info('Unsubscription acknowledged for ? from ?', @client_id, channels)

channels = [response['subscription']].flatten
channels.each { |channel| @channels[channel] = nil }
info('Unsubscription acknowledged for ? from ?', @client_id, channels)
end
})
}
Expand Down Expand Up @@ -252,9 +252,7 @@ def handle_advice(advice)
def deliver_messages(messages)
messages.each do |message|
info('Client ? calling listeners for ? with ?', @client_id, message['channel'], message['data'])

channels = @channels.glob(message['channel'])
channels.each { |callback| callback.call(message['data']) }
@channels.distribute_message(message)
end
end

Expand Down
5 changes: 5 additions & 0 deletions lib/faye/publisher.rb
@@ -1,6 +1,11 @@
module Faye
module Publisher

def count_subscribers(event_type)
return 0 unless @subscribers and @subscribers[event_type]
@subscribers[event_type].size
end

def add_subscriber(event_type, listener)
@subscribers ||= {}
list = @subscribers[event_type] ||= []
Expand Down
23 changes: 23 additions & 0 deletions lib/faye/subscription.rb
@@ -0,0 +1,23 @@
module Faye
class Subscription

def initialize(client, channels, callback)
@client = client
@channels = channels
@callback = callback
@cancelled = false
end

def cancel
return if @cancelled
@client.unsubscribe(@channels, &@callback)
@cancelled = true
end

def unsubscribe
cancel
end

end
end

26 changes: 19 additions & 7 deletions test/scenario.rb
Expand Up @@ -85,19 +85,31 @@ def local_client(name, channels, &block)
end

def setup_client(client, name, channels, &block)
channels.each do |channel|
client.subscribe(channel) do |message|
box = @inbox[name]
box[channel] ||= []
box[channel] << message
end
end
@clients[name] = client
@inbox[name] = {}
@pool += 1

channels.each { |channel| subscribe(name, channel) }
EM.add_timer(0.5 * channels.size, &block)
end

def subscribe(name, channel, &block)
client = @clients[name]

@last_sub = client.subscribe(channel) do |message|
box = @inbox[name]
box[channel] ||= []
box[channel] << message
end

EM.add_timer(0.5, &block)
end

def cancel_last_subscription(&block)
@last_sub.cancel
EM.add_timer(0.5, &block)
end

def publish(from, channel, message, &block)
@clients[from].publish(channel, message)
EM.add_timer(2, &block)
Expand Down
29 changes: 29 additions & 0 deletions test/test_clients.rb
Expand Up @@ -44,6 +44,35 @@ class TestClients < Test::Unit::TestCase
)
end

scenario "Two HTTP clients, two subscriptions on the same channel" do
server 8000
http_client :A, ['/channels/a']
http_client :B, []
subscribe :A, '/channels/a'
publish :B, '/channels/a', 'hello' => 'world'
check_inbox(
:A => {
'/channels/a' => [{'hello' => 'world'}, {'hello' => 'world'}]
},
:B => {}
)
end

scenario "Two HTTP clients, two subscriptions and one unsubscription" do
server 8000
http_client :A, ['/channels/a']
http_client :B, []
subscribe :A, '/channels/a'
cancel_last_subscription
publish :B, '/channels/a', 'another' => 'message'
check_inbox(
:A => {
'/channels/a' => [{'another' => 'message'}]
},
:B => {}
)
end

scenario "Three HTTP clients, single receiver" do
server 8000
http_client :A, ['/channels/a']
Expand Down

0 comments on commit 714abfe

Please sign in to comment.