Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Implement #publish and #unsubscribe for the Ruby client.

  • Loading branch information...
commit 55cf35e7d163fa89641a51e1cf01c1d14553fa74 1 parent 794811b
@jcoglan jcoglan authored
View
11 examples/rack/client.rb
@@ -12,10 +12,15 @@
EM.run do
client = Faye::Client.new('http://localhost:9292/comet')
- client.connect do
+ client.connect {
client.subscribe '/from/*' do |message|
- puts "[#{ message['user'] }]: #{ message['message'] }"
+ user = message['user']
+ puts "[#{ user }]: #{ message['message'] }"
+ client.publish("/mentioning/#{ user }", {
+ "user" => "logger",
+ "message" => "Got your message, #{ user }!"
+ })
end
- end
+ }
end
View
8 javascript/client.js
@@ -207,7 +207,7 @@ Faye.Client = Faye.Class({
if (this._state !== this.CONNECTED) return;
this._validateChannels([channel]);
- this.enqueue({
+ this._enqueue({
channel: channel,
data: data,
clientId: this._clientId
@@ -218,15 +218,15 @@ Faye.Client = Faye.Class({
this._timeout = setTimeout(function() {
delete self._timeout;
- self.flush();
+ self._flush();
}, this.MAX_DELAY * 1000);
},
- enqueue: function(message) {
+ _enqueue: function(message) {
this._outbox.push(message);
},
- flush: function() {
+ _flush: function() {
this._transport.send(this._outbox);
this._outbox = [];
},
View
76 lib/faye/client.rb
@@ -157,6 +157,58 @@ def subscribe(channels, &block)
end
end
+ # Request Response
+ # MUST include: * channel MUST include: * channel
+ # * clientId * successful
+ # * subscription * clientId
+ # MAY include: * ext * subscription
+ # * id MAY include: * error
+ # * advice
+ # * ext
+ # * id
+ # * timestamp
+ def unsubscribe(channels, &block)
+ return unless @state == CONNECTED
+
+ channels = [channels].flatten
+ validate_channels(channels)
+
+ @transport.send({
+ 'channel' => Channel::UNSUBSCRIBE,
+ 'clientId' => @client_id,
+ 'subscription' => channels
+
+ }) do |response|
+ if response['successful']
+ channels = [response['subscription']].flatten
+ channels.each { |channel| @channels[channel] = nil }
+ end
+ end
+ end
+
+ # Request Response
+ # MUST include: * channel MUST include: * channel
+ # * data * successful
+ # MAY include: * clientId MAY include: * id
+ # * id * error
+ # * ext * ext
+ def publish(channel, data)
+ return unless @state == CONNECTED
+ validate_channels([channel])
+
+ enqueue({
+ 'channel' => channel,
+ 'data' => data,
+ 'clientId' => @client_id
+ })
+
+ return if @timeout
+ @timeout = add_timer(Connection::MAX_DELAY) do
+ @timeout = nil
+ flush!
+ end
+ end
+
def handle_advice(advice)
@advice.update(advice)
@client_id = nil if @advice['reconnect'] == HANDSHAKE
@@ -169,27 +221,21 @@ def send_to_subscribers(message)
private
+ def enqueue(message)
+ @outbox << message
+ end
+
+ def flush!
+ @transport.send(@outbox)
+ @outbox = []
+ end
+
def validate_channels(channels)
channels.each do |channel|
raise "'#{ channel }' is not a valid channel name" unless Channel.valid?(channel)
raise "Clients may not subscribe to channel '#{ channel }'" unless Channel.subscribable?(channel)
end
end
-
-=begin
- _handleAdvice: function(advice) {
- Faye.extend(this._advice, advice);
- if (this._advice.reconnect === this.HANDSHAKE) this._clientId = null;
- },
-
- _sendToSubscribers: function(message) {
- var channels = this._channels.glob(message.channel);
- Faye.each(channels, function(callback) {
- if (!callback) return;
- callback[0].call(callback[1], message.data);
- });
- }
-=end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.