Permalink
Browse files

Implement the in-memory pubsub engine in JavaScript version.

  • Loading branch information...
1 parent 8b3c0ee commit 2ce3de45ae6017e42c7733f4480806e80296927d @jcoglan jcoglan committed Dec 19, 2010
@@ -1,12 +1,65 @@
Faye.Engine.Memory = Faye.Class(Faye.Engine.Base, {
initialize: function(options) {
+ this._clients = {};
+ this._channels = {};
this._namespace = new Faye.Namespace();
+
Faye.Engine.Base.prototype.initialize.call(this, options);
},
createClientId: function(callback, scope) {
var clientId = this._namespace.generate();
+ this._clients[clientId] = new Faye.Set();
+ this.ping(clientId);
callback.call(scope, clientId);
+ },
+
+ clientExists: function(clientId, callback, scope) {
+ callback.call(scope, this._clients.hasOwnProperty(clientId));
+ },
+
+ ping: function(clientId) {
+ var timeout = this._options.timeout;
+ if (typeof timeout !== 'number') return;
+ this.removeTimeout(clientId);
+ this.addTimeout(clientId, 2 * timeout, function() {
+ this.disconnect(clientId);
+ }, this);
+ },
+
+ subscribe: function(clientId, channel) {
+ var clients = this._clients, channels = this._channels;
+ clients[clientId] = clients[clientId] || new Faye.Set();
+ channels[channel] = channels[channel] || new Faye.Set();
+ clients[clientId].add(channel);
+ channels[channel].add(clientId);
+ },
+
+ unsubscribe: function(clientId, channel) {
+ var clients = this._clients, channels = this._channels;
+ if (clients.hasOwnProperty(clientId)) clients[clientId].remove(channel);
+ if (channels.hasOwnProperty(channel)) channels[channel].remove(clientId);
+ },
+
+ distribute: function(message) {
+ if (message.error) return;
+ var channels = Faye.Channel.expand(message.channel);
+ Faye.each(channels, function(channel) {
+ var clients = this._channels[channel];
+ if (!clients) return;
+ clients.forEach(function(clientId) {
+ this.announce(clientId, message);
+ }, this);
+ }, this);
+ },
+
+ disconnect: function(clientId) {
+ var clients = this._clients;
+ if (!clients.hasOwnProperty(clientId)) return;
+ clients[clientId].forEach(function(channel) {
+ this.unsubscribe(clientId, channel);
+ }, this);
+ delete clients[clientId];
}
});
@@ -24,6 +24,23 @@ Faye.extend(Faye.Channel, {
META: '<%= Faye::Channel::META %>',
SERVICE: '<%= Faye::Channel::SERVICE %>',
+ expand: function(name) {
+ var segments = this.parse(name),
+ channels = ['/**', name];
+
+ var copy = segments.slice();
+ copy[copy.length - 1] = '*';
+ channels.push(this.unparse(copy));
+
+ for (var i = 1, n = segments.length; i < n; i++) {
+ copy = segments.slice(0, i);
+ copy.push('**');
+ channels.push(this.unparse(copy));
+ }
+
+ return channels;
+ },
+
isValid: function(name) {
return Faye.Grammar.CHANNEL_NAME.test(name) ||
Faye.Grammar.CHANNEL_PATTERN.test(name);
@@ -34,6 +51,10 @@ Faye.extend(Faye.Channel, {
return name.split('/').slice(1);
},
+ unparse: function(segments) {
+ return '/' + segments.join('/');
+ },
+
isMeta: function(name) {
var segments = this.parse(name);
return segments ? (segments[0] === this.META) : null;
@@ -0,0 +1,10 @@
+ChannelSpec = JS.Test.describe("Faye.Channel", function() {
+ describe("expand", function() {
+ it("returns all patterns that match a channel", function() {
+ assertEqual( ["/**", "/foo", "/*"], Faye.Channel.expand("/foo") )
+ assertEqual( ["/**", "/foo/bar", "/foo/*", "/foo/**"], Faye.Channel.expand("/foo/bar") )
+ assertEqual( ["/**", "/foo/bar/qux", "/foo/bar/*", "/foo/**", "/foo/bar/**"], Faye.Channel.expand("/foo/bar/qux") )
+ })
+ })
+})
+
@@ -8,12 +8,12 @@ EngineSpec = JS.Test.describe("Pub/sub engines", function() {
return clientId
})
+ define("options", function() { return {} })
+
before(function() {
this.alice = makeClientId()
this.bob = makeClientId()
this.cecil = makeClientId()
-
- this.options = {timeout: 60}
})
describe("createClientId", function() {
@@ -30,10 +30,154 @@ EngineSpec = JS.Test.describe("Pub/sub engines", function() {
assertEqual( 10, ids.count() )
})
})
+
+ describe("clientExists", function() {
+ it("returns true if the client id exists", function() {
+ engine.clientExists(alice, assertYield(true))
+ })
+
+ it("returns false if the client id does not exist", function() {
+ engine.clientExists("anything", assertYield(false))
+ })
+ })
+
+ describe("ping", function() {
+ define("options", function() { return {timeout: 1} })
+
+ it("removes a client if it does not ping often enough", function(resume) {
+ engine.clientExists(alice, assertYield(true))
+ setTimeout(function() {
+ resume(function() { engine.clientExists(alice, assertYield(false)) })
+ }, 2500)
+ })
+
+ it("prolongs the life of a client", function(resume) {
+ engine.clientExists(alice, assertYield(true))
+ setTimeout(function() {
+ engine.ping(alice)
+ setTimeout(function() {
+ resume(function() { engine.clientExists(alice, assertYield(true)) })
+ }, 1000)
+ }, 1500)
+ })
+ })
+
+ describe("disconnect", function() {
+ it("removes the given client", function() {
+ engine.disconnect(alice)
+ engine.clientExists(alice, assertYield(false))
+ })
+
+ describe("when the client has subscriptions", function() {
+ before(function() {
+ this.inbox = {}
+ this.message = {'channel': '/messages/foo', 'data': 'ok'}
+
+ engine.onMessage(function(clientId, message) {
+ inbox[clientId] = inbox[clientId] || []
+ inbox[clientId].push(message)
+ })
+ engine.subscribe(alice, "/messages/foo")
+ })
+
+ it("stops the client receiving messages", function() {
+ engine.disconnect(alice)
+ engine.distribute(message)
+ assertEqual( {}, inbox )
+ })
+ })
+ })
+
+ describe("distribute", function() {
+ before(function() {
+ this.inbox = {}
+ this.message = {'channel': '/messages/foo', 'data': 'ok'}
+
+ engine.onMessage(function(clientId, message) {
+ inbox[clientId] = inbox[clientId] || []
+ inbox[clientId].push(message)
+ })
+ })
+
+ describe("with no subscriptions", function() {
+ it("delivers no messages", function() {
+ engine.distribute(message)
+ assertEqual( {}, inbox )
+ })
+ })
+
+ describe("with a subscriber", function() {
+ before(function() {
+ engine.subscribe(alice, "/messages/foo")
+ })
+
+ it("delivers messages to the subscribed client", function() {
+ engine.distribute(message)
+ assertEqual( [message], inbox[alice] )
+ })
+ })
+
+ describe("with a subscriber that is removed", function() {
+ before(function() {
+ engine.subscribe(alice, "/messages/foo")
+ engine.unsubscribe(alice, "/messages/foo")
+ })
+
+ it("does not deliver messages to unsubscribed clients", function() {
+ engine.distribute(message)
+ assertEqual( {}, inbox )
+ })
+ })
+
+ describe("with multiple subscribers", function() {
+ before(function() {
+ engine.subscribe(alice, "/messages/foo")
+ engine.subscribe(bob, "/messages/bar")
+ engine.subscribe(cecil, "/messages/foo")
+ })
+
+ it("delivers messages to the subscribed clients", function() {
+ engine.distribute(message)
+ assertEqual( [message], inbox[alice] )
+ assertEqual( undefined, inbox[bob] )
+ assertEqual( [message], inbox[cecil] )
+ })
+ })
+
+ describe("with a single wildcard", function() {
+ before(function() {
+ engine.subscribe(alice, "/messages/*")
+ engine.subscribe(bob, "/messages/bar")
+ engine.subscribe(cecil, "/*")
+ })
+
+ it("delivers messages to matching subscriptions", function() {
+ engine.distribute(message)
+ assertEqual( [message], inbox[alice] )
+ assertEqual( undefined, inbox[bob] )
+ assertEqual( undefined, inbox[cecil] )
+ })
+ })
+
+ describe("with a double wildcard", function() {
+ before(function() {
+ engine.subscribe(alice, "/messages/**")
+ engine.subscribe(bob, "/messages/bar")
+ engine.subscribe(cecil, "/**")
+ })
+
+ it("delivers messages to matching subscriptions", function() {
+ engine.distribute(message)
+ assertEqual( [message], inbox[alice] )
+ assertEqual( undefined, inbox[bob] )
+ assertEqual( [message], inbox[cecil] )
+ })
+ })
+ })
})
describe("Faye.Engine.Memory", function() {
- before(function() { this.engine = new Faye.Engine.Memory(this.options) })
+ before(function() { this.engine = new Faye.Engine.Memory(this.options()) })
itShouldBehaveLike("faye engine")
})
})
@@ -42,7 +42,7 @@ def make_client_id
engine.client_exists? alice, &should_yield(false)
end
- it "removes a client if it does not ping often enough" do
+ it "prolongs the life of a client" do
engine.client_exists? alice, &should_yield(true)
sleep 1.5
engine.ping(alice)
@@ -59,7 +59,7 @@ def make_client_id
describe "when the client has subscriptions" do
let(:inbox) { Hash.new { |h,k| h[k] = [] } }
- let(:message) { {'channel' => '/messages/foo', 'data' => 'ok'} }
+ let(:message) { {'channel' => '/messages/foo', 'data' => 'ok'} }
before do
engine.on_message do |client_id, message|
@@ -78,7 +78,7 @@ def make_client_id
describe :distribute do
let(:inbox) { Hash.new { |h,k| h[k] = [] } }
- let(:message) { {'channel' => '/messages/foo', 'data' => 'ok'} }
+ let(:message) { {'channel' => '/messages/foo', 'data' => 'ok'} }
before do
engine.on_message do |client_id, message|
View
@@ -9,7 +9,18 @@
}})
JS.require('JS.Set', 'JS.Range', 'JS.Test', function() {
- JS.require('EngineSpec', JS.Test.method('autorun'))
+
+ JS.Test.Unit.Assertions.include({
+ assertYield: function(expectedValue) {
+ var self = this
+ return function(actualValue) { self.assertEqual(expectedValue, actualValue) }
+ }
+ })
+
+ JS.require( 'ChannelSpec',
+ 'EngineSpec',
+
+ JS.Test.method('autorun'))
})
})()

0 comments on commit 2ce3de4

Please sign in to comment.