Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

First stab at implementing services. The API still needs a lot more w…

…ork to make it handle what's in the spec easily.
  • Loading branch information...
commit edc00d145b557d805e72fd0fc26002304638c45d 1 parent 6e332d9
@jcoglan jcoglan authored
View
1  lib/faye.rb
@@ -29,6 +29,7 @@ module Faye
protocol/subscription
protocol/client
protocol/server
+ protocol/session
transport/transport
transport/local
transport/web_socket
View
16 lib/faye/engines/memory.rb
@@ -85,16 +85,18 @@ def publish(message)
subs.each(&clients.method(:add))
end
- clients.each do |client_id|
- debug 'Queueing for client ?: ?', client_id, message
- @messages[client_id] ||= []
- @messages[client_id] << Faye.copy_object(message)
- empty_queue(client_id)
- end
-
+ clients.each { |client_id| deliver(client_id, message) }
+
trigger(:publish, message['clientId'], message['channel'], message['data'])
end
+ def deliver(client_id, message)
+ debug 'Queueing for client ?: ?', client_id, message
+ @messages[client_id] ||= []
+ @messages[client_id] << Faye.copy_object(message)
+ empty_queue(client_id)
+ end
+
private
def empty_queue(client_id)
View
17 lib/faye/engines/redis.rb
@@ -131,21 +131,22 @@ def publish(message)
init
debug 'Publishing message ?', message
- json_message = JSON.dump(message)
- channels = Channel.expand(message['channel'])
- keys = channels.map { |c| @ns + "/channels#{c}" }
+ channels = Channel.expand(message['channel'])
+ keys = channels.map { |c| @ns + "/channels#{c}" }
@redis.sunion(*keys) do |clients|
- clients.each do |client_id|
- debug 'Queueing for client ?: ?', client_id, message
- @redis.rpush(@ns + "/clients/#{client_id}/messages", json_message)
- @redis.publish(@ns + '/notifications', client_id)
- end
+ clients.each { |client_id| deliver(client_id, message) }
end
trigger(:publish, message['clientId'], message['channel'], message['data'])
end
+ def deliver(client_id, message)
+ debug 'Queueing for client ?: ?', client_id, message
+ @redis.rpush(@ns + "/clients/#{client_id}/messages", JSON.dump(message))
+ @redis.publish(@ns + '/notifications', client_id)
+ end
+
private
def get_current_time
View
10 lib/faye/protocol/channel.rb
@@ -67,6 +67,10 @@ def service?(name)
segments ? (segments.first == SERVICE) : nil
end
+ def publishable?(name)
+ subscribable?(name) and Grammar::CHANNEL_NAME =~ name
+ end
+
def subscribable?(name)
return nil unless valid?(name)
not meta?(name) and not service?(name)
@@ -110,11 +114,13 @@ def unsubscribe(name, callback)
end
end
- def distribute_message(message)
+ def distribute_message(message, *args)
+ return unless data = message['data']
+
channels = Channel.expand(message['channel'])
channels.each do |name|
channel = @channels[name]
- channel.trigger(:message, message['data']) if channel
+ channel.trigger(:message, data, *args) if channel
end
end
end
View
15 lib/faye/protocol/server.rb
@@ -5,16 +5,21 @@ class Server
include Extensible
attr_reader :engine
-
+
def initialize(options = {})
@options = options || {}
engine_opts = @options[:engine] || {}
engine_opts[:timeout] = @options[:timeout]
- @engine = Faye::Engine.get(engine_opts)
+ @engine = Engine.get(engine_opts)
+ @services = Channel::Set.new
info 'Created new server: ?', @options
end
+ def service(channels, &listener)
+ @services.subscribe([channels].flatten, listener)
+ end
+
def flush_connection(messages)
[messages].flatten.each do |message|
client_id = message["clientId"]
@@ -78,7 +83,11 @@ def handle(message, local = false, &callback)
return handle_meta(message, local, &callback) if Channel.meta?(channel_name)
- @engine.publish(message) unless message['error'] or Grammar::CHANNEL_NAME !~ channel_name
+ if Channel.publishable?(channel_name) and not message['error']
+ @engine.publish(message)
+ end
+
+ @services.distribute_message(message, Session.new(@engine, message['clientId']))
if message['clientId']
response = make_response(message)
View
15 lib/faye/protocol/session.rb
@@ -0,0 +1,15 @@
+module Faye
+ class Session
+ attr_reader :client_id
+
+ def initialize(engine, client_id)
+ @engine = engine
+ @client_id = client_id
+ end
+
+ def deliver(channel, data)
+ @engine.deliver(@client_id, {'channel' => channel, 'data' => data})
+ end
+ end
+end
+
View
21 spec/ruby/engine_spec.rb
@@ -63,6 +63,15 @@ def publish(messages, &resume)
EM.add_timer(0.01, &resume)
end
+ def deliver(name, messages, &resume)
+ messages = [messages].flatten
+ messages.each do |message|
+ message = {"id" => Faye.random}.merge(message)
+ engine.deliver(@clients[name], message)
+ end
+ EM.add_timer(0.01, &resume)
+ end
+
def publish_by(name, message, &resume)
message = {"clientId" => @clients[name], "id" => Faye.random}.merge(message)
engine.publish(message)
@@ -377,6 +386,18 @@ def create_engine
end
end
end
+
+ describe :deliver do
+ before do
+ @message = {"channel" => "/messages/foo", "data" => "ok"}
+ connect :bob, engine
+ end
+
+ it "delivers a message directly to one client" do
+ deliver :bob, @message
+ expect_message :bob, [@message]
+ end
+ end
end
shared_examples_for "distributed engine" do
View
37 spec/ruby/server/service_spec.rb
@@ -0,0 +1,37 @@
+require "spec_helper"
+
+describe "server handshake" do
+ let(:engine) { mock "engine" }
+ let(:server) { Faye::Server.new }
+
+ before do
+ Faye::Engine.stub(:get).and_return engine
+ end
+
+ describe "with a service" do
+ before do
+ server.service "/service/foo" do |message, client|
+ client.deliver("/foo", "hello" => "world")
+ end
+ end
+
+ it "invokes the service when a matching message is published" do
+ engine.should_receive(:deliver).with("abc123", {"channel" => "/foo", "data" => {"hello" => "world"}})
+ server.process({"clientId" => "abc123", "channel" => "/service/foo", "data" => {}}, false) {}
+ end
+ end
+
+ describe "with no matching service" do
+ before do
+ server.service "/service/bar" do |message, client|
+ client.deliver("/foo", "hello" => "world")
+ end
+ end
+
+ it "invokes the service when a matching message is published" do
+ engine.should_not_receive(:deliver)
+ server.process({"clientId" => "abc123", "channel" => "/service/foo", "data" => {}}, false) {}
+ end
+ end
+end
+
View
22 spec/ruby/server/subscribe_spec.rb
@@ -179,9 +179,9 @@
end
end
- describe "with a /meta/* channel" do
+ shared_examples_for "forbidden channel" do
before do
- message["subscription"] = "/meta/foo"
+ message["subscription"] = channel
engine.should_receive(:client_exists).with(client_id).and_yield true
end
@@ -195,15 +195,15 @@
response.should == {
"channel" => "/meta/subscribe",
"successful" => false,
- "error" => "403:/meta/foo:Forbidden channel",
+ "error" => "403:#{channel}:Forbidden channel",
"clientId" => client_id,
- "subscription" => "/meta/foo"
+ "subscription" => channel
}
end
end
it "subscribes local clients to the channel" do
- engine.should_receive(:subscribe).with(client_id, "/meta/foo")
+ engine.should_receive(:subscribe).with(client_id, channel)
server.subscribe(message, true) {}
end
@@ -214,12 +214,22 @@
"channel" => "/meta/subscribe",
"successful" => true,
"clientId" => client_id,
- "subscription" => "/meta/foo"
+ "subscription" => channel
}
end
end
end
+ describe "with a /meta/* channel" do
+ let(:channel) { "/meta/foo" }
+ it_should_behave_like "forbidden channel"
+ end
+
+ describe "with a /service/* channel" do
+ let(:channel) { "/service/foo" }
+ it_should_behave_like "forbidden channel"
+ end
+
describe "with an error" do
before do
message["error"] = "invalid"
Please sign in to comment.
Something went wrong with that request. Please try again.