Skip to content

Commit

Permalink
Merge pull request #95 from BallAerospace/json-drb-retry
Browse files Browse the repository at this point in the history
Json drb retry
  • Loading branch information
ryanmelt committed Mar 23, 2015
2 parents f71919f + 9f24eb6 commit 5f91f6c
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 71 deletions.
44 changes: 38 additions & 6 deletions lib/cosmos/io/json_drb.rb
Expand Up @@ -30,8 +30,6 @@ class JsonDRb

# @return [Integer] The number of JSON-RPC requests processed
attr_accessor :request_count
# @return [Integer] The number of clients currently connected to the server
attr_accessor :num_clients
# @return [Array<String>] List of methods that should be allowed
attr_accessor :method_whitelist
# @return [ACL] The access control list
Expand All @@ -47,16 +45,42 @@ def initialize
@request_times = []
@request_times_index = 0
@request_mutex = Mutex.new
@num_clients = 0
@client_sockets = []
@client_threads = []
@client_mutex = Mutex.new
@thread_reader, @thread_writer = IO.pipe
end

# Returns the number of connected clients
# @return [Integer] The number of connected clients
def num_clients
@client_threads.length
end

# Stops the DRb service by closing the socket and the processing thread
def stop_service
Cosmos.kill_thread(self, @thread)
@thread = nil
Cosmos.close_socket(@listen_socket)
@listen_socket = nil
client_threads = nil
@client_mutex.synchronize do
@client_sockets.each do |client_socket|
Cosmos.close_socket(client_socket)
end
client_threads = @client_threads.clone
end

# This cannot be inside of the client_mutex or the threads will not
# be able to shutdown because they will stick on the client_mutex
client_threads.each do |client_thread|
Cosmos.kill_thread(self, client_thread)
end

@client_mutex.synchronize do
@client_threads.clear
@client_sockets.clear
end
end

# Gracefully kill the thread
Expand Down Expand Up @@ -254,7 +278,11 @@ def create_client_thread(socket)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)

Thread.new(socket) do |my_socket|
@num_clients += 1
@client_mutex.synchronize do
@client_sockets << my_socket
@client_threads << Thread.current
end

data = ''
begin
while true
Expand All @@ -270,14 +298,18 @@ def create_client_thread(socket)
break unless process_request(request_data, my_socket, start_time)
else
# Socket was closed by client
Cosmos.close_socket(my_socket)
break
end
end
rescue Exception => error
Logger.error "JsonDrb client thread unexpectedly died.\n#{error.formatted}"
end
@num_clients -= 1

@client_mutex.synchronize do
Cosmos.close_socket(my_socket)
@client_sockets.delete(my_socket)
@client_threads.delete(Thread.current)
end
end
end

Expand Down
163 changes: 98 additions & 65 deletions lib/cosmos/io/json_drb_object.rb
Expand Up @@ -58,6 +58,10 @@ def initialize(hostname, port, connect_timeout = 1.0)
# Disconnects from the JSON server
def disconnect
Cosmos.close_socket(@socket)
# Cannot set @socket to nil here because this method can be called by
# other threads and @socket being nil would cause unexpected errors in method_missing
# Also don't want to take the mutex so that we can interrupt method_missing if necessary
# Only method_missing can set @socket to nil
end

# Permanently disconnects from the JSON server
Expand All @@ -75,91 +79,120 @@ def shutdown
# protocol a DRb::DRbConnError exception is raised.
def method_missing(method_name, *method_params)
@mutex.synchronize do
raise DRb::DRbConnError, "Shutdown" if @shutdown
if !@socket or @socket.closed? or @request_in_progress
if @request_in_progress
# This flag and loop are used to automatically reconnect and retry if something goes
# wrong on the first attempt writing to the socket. Sockets can become disconnected
# between function calls, but as long as the remote server is back up and running the
# call should succeed even when it discovers a broken socket on the first attempt.
first_try = true
loop do
raise DRb::DRbConnError, "Shutdown" if @shutdown
connect() if !@socket or @socket.closed? or @request_in_progress

response = make_request(method_name, method_params, first_try)
unless response
disconnect()
@socket = nil
@request_in_progress = false
was_first_try = first_try
first_try = false
next if was_first_try
end
return handle_response(response)
end # loop
end # @mutex.synchronize
end # def method_missing

private

def connect
if @request_in_progress
disconnect()
@socket = nil
@request_in_progress = false
end
begin
addr = Socket.pack_sockaddr_in(@port, @hostname)
@socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
@socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
begin
@socket.connect_nonblock(addr)
rescue IO::WaitWritable
begin
addr = Socket.pack_sockaddr_in(@port, @hostname)
@socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
@socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
_, sockets, _ = IO.select(nil, [@socket], nil, @connect_timeout) # wait 3-way handshake completion
rescue IOError, Errno::ENOTSOCK
disconnect()
@socket = nil
raise "Connect canceled"
end
if sockets and !sockets.empty?
begin
@socket.connect_nonblock(addr)
rescue IO::WaitWritable
begin
_, sockets, _ = IO.select(nil, [@socket], nil, @connect_timeout) # wait 3-way handshake completion
rescue IOError, Errno::ENOTSOCK
disconnect()
@socket = nil
raise "Connect canceled"
end
if sockets and !sockets.empty?
begin
@socket.connect_nonblock(addr) # check connection failure
rescue IOError, Errno::ENOTSOCK
disconnect()
@socket = nil
raise "Connect canceled"
rescue Errno::EINPROGRESS
retry
rescue Errno::EISCONN, Errno::EALREADY
end
else
disconnect()
@socket = nil
raise "Connect timeout"
end
@socket.connect_nonblock(addr) # check connection failure
rescue IOError, Errno::ENOTSOCK
disconnect()
@socket = nil
raise "Connect canceled"
rescue Errno::EINPROGRESS
retry
rescue Errno::EISCONN, Errno::EALREADY
end
rescue => e
raise DRb::DRbConnError, e.message
else
disconnect()
@socket = nil
raise "Connect timeout"
end
end

request = JsonRpcRequest.new(method_name, method_params, @id)
@id += 1

request_data = request.to_json(:allow_nan => true)
begin
@request_in_progress = true
STDOUT.puts "Request:\n" if JsonDRb.debug?
STDOUT.puts request_data if JsonDRb.debug?
JsonDRb.send_data(@socket, request_data)
response_data = JsonDRb.receive_message(@socket, '')
STDOUT.puts "\nResponse:\n" if JsonDRb.debug?
STDOUT.puts response_data if JsonDRb.debug?
@request_in_progress = false
rescue => e
rescue IOError, Errno::ENOTSOCK
disconnect()
@socket = nil
raise DRb::DRbConnError, e.message, e.backtrace
raise "Connect canceled"
end
rescue => e
raise DRb::DRbConnError, e.message
end
end

if response_data
response = JsonRpcResponse.from_json(response_data)
if JsonRpcErrorResponse === response
if response.error.data
raise Exception.from_hash(response.error.data)
else
raise "JsonDRb Error (#{response.error.code}): #{response.error.message}"
end
def make_request(method_name, method_params, first_try)
request = JsonRpcRequest.new(method_name, method_params, @id)
@id += 1

request_data = request.to_json(:allow_nan => true)
begin
STDOUT.puts "Request:\n" if JsonDRb.debug?
STDOUT.puts request_data if JsonDRb.debug?
@request_in_progress = true
JsonDRb.send_data(@socket, request_data)
response_data = JsonDRb.receive_message(@socket, '')
@request_in_progress = false
STDOUT.puts "\nResponse:\n" if JsonDRb.debug?
STDOUT.puts response_data if JsonDRb.debug?
rescue => e
disconnect()
@socket = nil
return false if first_try
raise DRb::DRbConnError, e.message, e.backtrace
end
response_data
end

def handle_response(response_data)
# The code below will always either raise or return breaking out of the loop
if response_data
response = JsonRpcResponse.from_json(response_data)
if JsonRpcErrorResponse === response
if response.error.data
raise Exception.from_hash(response.error.data)
else
return response.result
raise "JsonDRb Error (#{response.error.code}): #{response.error.message}"
end
else
# Socket was closed by server
disconnect()
@socket = nil
raise DRb::DRbConnError, "Socket closed by server"
return response.result
end
else
# Socket was closed by server
disconnect()
@socket = nil
raise DRb::DRbConnError, "Socket closed by server"
end
end
end

end # class JsonDRbObject

end # module Cosmos
20 changes: 20 additions & 0 deletions spec/io/json_drb_object_spec.rb
Expand Up @@ -93,6 +93,26 @@ def my_method(param)
sleep(0.1)
end

it "handles the remote going away and coming back" do
class JsonDRbObjectServer
def my_method(param)
param * 2
end
end

json = JsonDRb.new
json.start_service('127.0.0.1', 7777, JsonDRbObjectServer.new)
obj = JsonDRbObject.new("localhost", 7777)
expect(obj.my_method(10)).to eql 20
json.stop_service
json = JsonDRb.new
json.start_service('127.0.0.1', 7777, JsonDRbObjectServer.new)
expect(obj.my_method(10)).to eql 20
obj.disconnect
json.stop_service
sleep(0.1)
end

end
end
end
Expand Down

0 comments on commit 5f91f6c

Please sign in to comment.