Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

refactoring for clarity; restoring timeout to 20

  • Loading branch information...
commit 3021d3db562cab9edde97dbdf4e6a44c70e08264 1 parent fbc3fc4
Doug Bradbury authored
Showing with 50 additions and 22 deletions.
  1. +50 −22 lib/drb/drbfire.rb
View
72 lib/drb/drbfire.rb
@@ -85,7 +85,7 @@
module DRbFire
# The current version.
- VERSION = [0, 1, 1]
+ VERSION = [0, 1, 0]
# The role configuration key.
ROLE = "#{self}::ROLE"
@@ -114,6 +114,7 @@ def initialize(uri, config)
@uri = uri
@config = config
@connection = Protocol.open(uri, config, SIGNAL_CONN)
+ Protocol.set_sockopt(@connection)
@signal_id = @connection.read_signal_id
end
@@ -121,40 +122,60 @@ def uri
"#{@uri}?#{@signal_id}"
end
- def accept
+ def wait_for_signal
@connection.stream.read(1)
+ end
+
+ def open_outgoing_connection
connection = Protocol.open(@uri, @config, OUTGOING_CONN)
connection.stream.write([@signal_id].pack(ID_FORMAT))
connection
end
+ def accept
+ wait_for_signal
+ return open_outgoing_connection
+ end
+
def close
@connection.close
end
end
class ClientServerProxy
- def initialize(connection, id)
+ OUTGOING_CONNECTION_TIMEOUT = 20
+
+ def initialize(connection, signal_id)
@connection = connection
- @id = id
+ @signal_id = signal_id
@queue = Queue.new
end
def write_signal_id
- @connection.stream.write([@id].pack(ID_FORMAT))
+ @connection.stream.write([@signal_id].pack(ID_FORMAT))
end
def push(connection)
@queue.push(connection)
end
- def open
+ def signal_client_to_open_outgoing_connection
@connection.stream.write("0")
- timeout(2) do
- @queue.pop
+ end
+
+ def wait_for_outgoing_connection
+ timeout(OUTGOING_CONNECTION_TIMEOUT) do
+ connection = @queue.pop
+ # log.debug("#{@signal_id} Popped connection from queue")
+ return connection
end
+ end
+
+ def open
+ signal_client_to_open_outgoing_connection
+ return wait_for_outgoing_connection
rescue TimeoutError
- raise DRb::DRbConnError, "Unable to get a client connection."
+ raise DRb::DRbConnError, "#{@signal_id} Unable to get a client connection."
end
end
@@ -164,7 +185,7 @@ def open_server(uri, config)
parse_uri(uri)
if (server?(config))
- @client_server_proxies ||= {}
+ @client_servers ||= {}
sock = delegate(config).open_server(uri, config)
@@ -186,19 +207,20 @@ def open(uri, config, type=INCOMING_CONN)
connection.stream.write(type)
connection
else
- @client_server_proxies[parse_uri(uri).last.to_i].open
+ @client_servers[parse_uri(uri).last.to_i].open
end
end
- def add_client_connection(id, connection)
- if ((client_server_proxy = @client_server_proxies[id]))
- client_server_proxy.push(connection)
+ def add_client_connection(signal_id, connection)
+ if ((c = @client_servers[signal_id]))
+ c.push(connection)
+ # log.debug("#{signal_id} Adding client connection")
else
end
end
- def add_client_server_proxies(id, server)
- @client_server_proxies[id] = server
+ def add_client_server(id, server)
+ @client_servers[id] = server
end
def parse_uri(uri)
@@ -252,17 +274,22 @@ def initialize(uri, delegate)
@id_mutex = Mutex.new
end
+ def close
+ # log.info("#{@signal_id} Closing Connection")
+ __getobj__.close
+ end
+
def self.set_sockopt(connection)
begin
connection.stream.setsockopt(Socket::SOL_TCP, Socket::SO_KEEPALIVE, true)
connection.stream.setsockopt(Socket::SOL_TCP, Socket::SO_SNDTIMEO, 2000)
rescue Exception => e
- # log.error "Failed on sockopts: #{e}"
+ # log.warn "Failed to set socket options: #{e}"
end
end
def accept
- while (__getobj__.instance_eval{@socket})
+ while (__getobj__.instance_eval { @socket })
begin
delegate_accept_result = __getobj__.accept
connection = self.class.new(nil, delegate_accept_result)
@@ -286,9 +313,9 @@ def accept
@id_mutex.synchronize do
new_id = (@id += 1)
end
- client_server_proxy = ClientServerProxy.new(connection, new_id)
- self.class.add_client_server_proxies(new_id, client_server_proxy)
- client_server_proxy.write_signal_id
+ client_server = ClientServerProxy.new(connection, new_id)
+ self.class.add_client_server(new_id, client_server)
+ client_server.write_signal_id
next
else
# log.warn "Invalid drbfire socket type #{type.inspect} delegating to plain old drb"
@@ -299,7 +326,8 @@ def accept
end
def read_signal_id
- stream.read(4).unpack(ID_FORMAT).first
+ @signal_id = stream.read(4).unpack(ID_FORMAT).first
+ return @signal_id
end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.