Permalink
Browse files

Port socket-connection-caching logic from JavaScript to Ruby. The cli…

…ent will eagerly try to make a WebSocket connection before it knows whether the server supports it through /meta/handshake, and will reuse the connection if it turns out to be usable.
  • Loading branch information...
1 parent 3dc1f46 commit aac47ca80c03078cd95496e725cce3db7df04298 @jcoglan jcoglan committed Sep 23, 2012
@@ -18,14 +18,15 @@ class Client
CONNECTION_TIMEOUT = 60.0
DEFAULT_RETRY = 5.0
- attr_reader :endpoint, :endpoints, :client_id, :retry
+ attr_reader :client_id, :endpoint, :endpoints, :retry, :transports
def initialize(endpoint = nil, options = {})
info('New client created for ?', endpoint)
@options = options
@endpoint = endpoint || RackAdapter::DEFAULT_ENDPOINT
@endpoints = @options[:endpoints] || {}
+ @transports = {}
@cookies = CookieJar::Jar.new
@headers = {}
@disabled = []
@@ -1,7 +1,7 @@
module Faye
class Transport::Http < Transport
- def self.usable?(endpoint, &callback)
+ def self.usable?(client, endpoint, &callback)
callback.call(endpoint.is_a?(String))
end
@@ -1,7 +1,7 @@
module Faye
class Transport::Local < Transport
- def self.usable?(endpoint, &callback)
+ def self.usable?(client, endpoint, &callback)
callback.call(endpoint.is_a?(Server))
end
@@ -33,7 +33,9 @@ def send(message, timeout)
@outbox << message
@timeout = timeout
- return flush if message['channel'] == Channel::HANDSHAKE
+ if message['channel'] == Channel::HANDSHAKE
+ return add_timeout(:publish, 0.01) { flush }
+ end
if message['channel'] == Channel::CONNECT
@connection_message = message
@@ -77,16 +79,15 @@ def get(client, connection_types = nil, &callback)
select = lambda do |(conn_type, klass), resume|
conn_endpoint = client.endpoints[conn_type] || endpoint
- if connection_types.include?(conn_type)
- klass.usable?(conn_endpoint) do |is_usable|
- if is_usable
- callback.call(klass.new(client, conn_endpoint))
- else
- resume.call
- end
- end
- else
- resume.call
+ unless connection_types.include?(conn_type)
+ klass.usable?(client, conn_endpoint) { |u| }
+ next resume.call
+ end
+
+ klass.usable?(client, conn_endpoint) do |is_usable|
+ next resume.call unless is_usable
+ transport = klass.respond_to?(:create) ? klass.create(client, conn_endpoint) : klass.new(client, conn_endpoint)
+ callback.call(transport)
end
end
@@ -1,50 +1,36 @@
module Faye
class Transport::WebSocket < Transport
- WEBSOCKET_TIMEOUT = 1
-
UNCONNECTED = 1
CONNECTING = 2
CONNECTED = 3
include EventMachine::Deferrable
- def self.usable?(endpoint, &callback)
- connected = false
- called = false
- socket_url = endpoint.gsub(/^http(s?):/, 'ws\1:')
- socket = Faye::WebSocket::Client.new(socket_url)
-
- socket.onopen = lambda do |event|
- connected = true
- socket.close
- callback.call(true)
- called = true
- socket = nil
- end
-
- notconnected = lambda do |*args|
- callback.call(false) unless called or connected
- called = true
- end
-
- socket.onclose = socket.onerror = notconnected
- EventMachine.add_timer(WEBSOCKET_TIMEOUT, &notconnected)
+ def self.usable?(client, endpoint, &callback)
+ create(client, endpoint).usable?(&callback)
+ end
+
+ def self.create(client, endpoint)
+ sockets = client.transports[:websocket] ||= {}
+ sockets[endpoint] ||= new(client, endpoint)
end
def batching?
false
end
+ def usable?(&callback)
+ self.callback { callback.call(true) }
+ self.errback { callback.call(false) }
+ connect
+ end
+
def request(messages, timeout = nil)
return if messages.empty?
@messages ||= {}
messages.each { |message| @messages[message['id']] = message }
- with_socket { |socket| socket.send(Faye.to_json(messages)) }
- end
-
- def with_socket(&resume)
- callback(&resume)
+ callback { |socket| socket.send(Faye.to_json(messages)) }
connect
end
@@ -66,6 +52,7 @@ def connect
@socket.onopen = lambda do |*args|
@state = CONNECTED
+ @ever_connected = true
set_deferred_status(:succeeded, @socket)
trigger(:up)
end
@@ -83,13 +70,15 @@ def connect
@socket = nil
next resend if was_connected
+ next set_deferred_status(:failed) unless @ever_connected
EventMachine.add_timer(@client.retry) { connect }
trigger(:down)
end
end
def resend
+ return unless @messages
request(@messages.values)
end
end
@@ -1,11 +1,12 @@
require "spec_helper"
describe Faye::Transport do
+ before do
+ Faye.ensure_reactor_running!
+ end
+
let :client do
- client = mock("client")
- client.stub(:endpoint).and_return("http://example.com/")
- client.stub(:endpoints).and_return({})
- client
+ mock("client", :endpoint => "http://example.com/", :endpoints => {}, :transports => {})
end
describe :get do

0 comments on commit aac47ca

Please sign in to comment.