Permalink
Browse files

Make sure messages pushed to the client over a socket pass through th…

…e outgoing extensions.
  • Loading branch information...
1 parent be37b07 commit d5da96b390816cee60ccd78c95940ef0d9c220ec @jcoglan jcoglan committed Oct 7, 2012
View
@@ -38,6 +38,7 @@ packages:
- engines/connection
- engines/memory
- protocol/server
+ - protocol/socket
- transport/node_local
- transport/web_socket
- transport/node_http
@@ -7,7 +7,7 @@ Faye.Engine.Connection = Faye.Class({
},
deliver: function(message) {
- if (this.socket) return this.socket.send(JSON.stringify([message]));
+ if (this.socket) return this.socket.send(message);
this._inbox.push(message);
this._beginDeliveryTimeout();
},
@@ -15,7 +15,7 @@ Faye.Server = Faye.Class({
},
openSocket: function(clientId, socket) {
- this._engine.openSocket(clientId, socket);
+ this._engine.openSocket(clientId, new Faye.Server.Socket(this, socket));
},
closeSocket: function(clientId) {
@@ -0,0 +1,17 @@
+Faye.Server.Socket = Faye.Class({
+ initialize: function(server, socket) {
+ this._server = server;
+ this._socket = socket;
+ },
+
+ send: function(message) {
+ this._server.pipeThroughExtensions('outgoing', message, function(pipedMessage) {
+ this._socket.send(JSON.stringify([pipedMessage]));
+ }, this);
+ },
+
+ close: function() {
+ this._socket.close();
+ }
+});
+
@@ -15,7 +15,7 @@ def initialize(engine, id, options = {})
end
def deliver(message)
- return socket.send(Faye.to_json([message])) if socket
+ return socket.send(message) if socket
return unless @inbox.add?(message)
begin_delivery_timeout
end
@@ -1,6 +1,8 @@
module Faye
class Server
+ autoload :Socket, File.join(ROOT, 'faye', 'protocol', 'socket')
+
include Logging
include Extensible
@@ -22,7 +24,7 @@ def flush_connection(messages)
end
def open_socket(client_id, socket)
- @engine.open_socket(client_id, socket)
+ @engine.open_socket(client_id, Socket.new(self, socket))
end
def close_socket(client_id)
@@ -0,0 +1,23 @@
+module Faye
+ class Server
+
+ class Socket
+ def initialize(server, socket)
+ @server = server
+ @socket = socket
+ end
+
+ def send(message)
+ @server.pipe_through_extensions(:outgoing, message) do |piped_message|
+ @socket.send(Faye.to_json([piped_message]))
+ end
+ end
+
+ def close
+ @socket.close
+ end
+ end
+
+ end
+end
+
@@ -7,6 +7,13 @@ JS.ENV.IntegrationSteps = JS.Test.asyncSteps({
: null
this._adapter = new Faye.NodeAdapter({mount: "/bayeux", timeout: 2})
+ this._adapter.addExtension({
+ outgoing: function(message, callback) {
+ if (message.data) message.data.tagged = true
+ callback(message)
+ }
+ })
+
this._port = port
this._secure = ssl
this._adapter.listen(port, options, callback)
@@ -67,7 +74,7 @@ JS.ENV.Server.IntegrationSpec = JS.Test.describe("Server integration", function(
it("delivers a message between clients", function() { with(this) {
publish("alice", "/foo", {hello: "world", extra: null})
- check_inbox("bob", "/foo", [{hello: "world", extra: null}])
+ check_inbox("bob", "/foo", [{hello: "world", extra: null, tagged: true}])
}})
it("does not deliver messages for unsubscribed channels", function() { with(this) {
@@ -78,12 +85,12 @@ JS.ENV.Server.IntegrationSpec = JS.Test.describe("Server integration", function(
it("delivers multiple messages", function() { with(this) {
publish("alice", "/foo", {hello: "world"})
publish("alice", "/foo", {hello: "world"})
- check_inbox("bob", "/foo", [{hello: "world"},{hello: "world"}])
+ check_inbox("bob", "/foo", [{hello: "world", tagged: true}, {hello: "world", tagged: true}])
}})
it("delivers multibyte strings", function() { with(this) {
publish("alice", "/foo", {hello: "Apple = "})
- check_inbox("bob", "/foo", [{hello: "Apple = "}])
+ check_inbox("bob", "/foo", [{hello: "Apple = ", tagged: true}])
}})
}})
@@ -6,6 +6,13 @@
Thin::Logging.silent = true
IntegrationSteps = EM::RSpec.async_steps do
+ class Tagger
+ def outgoing(message, callback)
+ message["data"]["tagged"] = true if message["data"]
+ callback.call(message)
+ end
+ end
+
def server(port, ssl, &callback)
shared = File.dirname(__FILE__) + '/../../../examples/shared'
@@ -14,7 +21,9 @@ def server(port, ssl, &callback)
nil
@adapter = Faye::RackAdapter.new(:mount => "/bayeux", :timeout => 25)
+ @adapter.add_extension(Tagger.new)
@adapter.listen(port, options)
+
@port = port
@secure = ssl
EM.next_tick(&callback)
@@ -76,7 +85,7 @@ def check_inbox(name, channel, messages, &callback)
shared_examples_for "message bus" do
it "delivers a message between clients" do
publish :alice, "/foo", {"hello" => "world", "extra" => nil}
- check_inbox :bob, "/foo", [{"hello" => "world", "extra" => nil}]
+ check_inbox :bob, "/foo", [{"hello" => "world", "extra" => nil, "tagged" => true}]
end
it "does not deliver messages for unsubscribed channels" do
@@ -87,12 +96,12 @@ def check_inbox(name, channel, messages, &callback)
it "delivers multiple messages" do
publish :alice, "/foo", {"hello" => "world"}
publish :alice, "/foo", {"hello" => "world"}
- check_inbox :bob, "/foo", [{"hello" => "world"}, {"hello" => "world"}]
+ check_inbox :bob, "/foo", [{"hello" => "world", "tagged" => true}, {"hello" => "world", "tagged" => true}]
end
it "delivers multibyte strings" do
- publish :alice, "/foo", {"hello" => encode("Apple = ")}
- check_inbox :bob, "/foo", [{"hello" => encode("Apple = ")}]
+ publish :alice, "/foo", {"hello" => encode("Apple = "), "tagged" => true}
+ check_inbox :bob, "/foo", [{"hello" => encode("Apple = "), "tagged" => true}]
end
end
@@ -107,7 +116,7 @@ def check_inbox(name, channel, messages, &callback)
describe "with WebSocket transport" do
before do
- Faye::Transport::WebSocket.stub(:usable?).and_yield(false)
+ Faye::Transport::WebSocket.stub(:usable?).and_yield(true)
end
it_should_behave_like "message bus"

0 comments on commit d5da96b

Please sign in to comment.