Skip to content

Commit

Permalink
closes #168, closes #80 Fix select blocking and implement IO without …
Browse files Browse the repository at this point in the history
…exceptions
  • Loading branch information
ryanmelt committed Jul 27, 2015
1 parent e839fe6 commit 305d0fe
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 62 deletions.
53 changes: 31 additions & 22 deletions lib/cosmos/io/json_drb.rb
Expand Up @@ -25,6 +25,7 @@ module Cosmos
# methods.
class JsonDRb
MINIMUM_REQUEST_TIME = 0.0001
FAST_READ = (RUBY_VERSION > "2.1")

@@debug = false

Expand All @@ -47,6 +48,7 @@ def initialize
@request_mutex = Mutex.new
@client_sockets = []
@client_threads = []
@client_pipe_writers = []
@client_mutex = Mutex.new
@thread_reader, @thread_writer = IO.pipe
end
Expand All @@ -68,6 +70,9 @@ def stop_service
@client_sockets.each do |client_socket|
Cosmos.close_socket(client_socket)
end
@client_pipe_writers.each do |client_pipe_writer|
client_pipe_writer.write('.')
end
client_threads = @client_threads.clone
end

Expand All @@ -80,6 +85,7 @@ def stop_service
@client_mutex.synchronize do
@client_threads.clear
@client_sockets.clear
@client_pipe_writers.clear
end
end

Expand All @@ -95,6 +101,7 @@ def graceful_kill
# CmdTlmServer.
def start_service(hostname = nil, port = nil, object = nil)
if hostname and port and object
@thread_reader, @thread_writer = IO.pipe
@object = object
hostname = '127.0.0.1'.freeze if (hostname.to_s.upcase == 'LOCALHOST'.freeze)

Expand Down Expand Up @@ -125,14 +132,7 @@ def start_service(hostname = nil, port = nil, object = nil)
rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK
read_ready, _ = IO.select([@listen_socket, @thread_reader])
if read_ready and read_ready.include?(@thread_reader)
begin
# Thread should be killed - Cleanout thread_reader first
# Don't let this break anything else though
@thread_reader.read(1)
rescue Exception
# Oh well - create a clean pipe in case we need one
@thread_reader, @thread_writer = IO.pipe
end
# Thread should be killed
break
else
retry
Expand Down Expand Up @@ -191,17 +191,18 @@ def average_request_time
# @param socket [Socket] The socket to the client
# @param data [String] Binary data which has already been read from the
# socket.
# @param pipe_reader [IO.pipe] Used to break out of select
# @return [String] The request message
def self.receive_message(socket, data)
self.get_at_least_x_bytes_of_data(socket, data, 4)
def self.receive_message(socket, data, pipe_reader)
self.get_at_least_x_bytes_of_data(socket, data, 4, pipe_reader)
if data.length >= 4
length = data[0..3].unpack('N'.freeze)[0]
data.replace(data[4..-1])
else
return nil
end

self.get_at_least_x_bytes_of_data(socket, data, length)
self.get_at_least_x_bytes_of_data(socket, data, length, pipe_reader)
if data.length >= length
message = data[0..(length - 1)]
data.replace(data[length..-1])
Expand All @@ -214,19 +215,23 @@ def self.receive_message(socket, data)
# @param socket [Socket] The socket to the client
# @param current_data [String] Binary data read from the socket
# @param required_num_bytes [Integer] The minimum number of bytes to read
# @param pipe_reader [IO.pipe] Used to break out of select
# before returning
def self.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes)
def self.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes, pipe_reader)
while (current_data.length < required_num_bytes)
begin
data = socket.recv_nonblock(65535)
if data.length == 0
current_data.replace('')
return
if FAST_READ
data = socket.read_nonblock(65535, exception: false)
if data == :wait_readable
IO.fast_select([socket, pipe_reader], nil, nil, nil)
else
current_data << data
end
else
begin
current_data << socket.read_nonblock(65535)
rescue IO::WaitReadable
IO.fast_select([socket, pipe_reader], nil, nil, nil)
end
current_data << data
rescue IO::WaitReadable
IO.fast_select([socket], nil, nil, nil)
retry
end
end
end
Expand Down Expand Up @@ -278,16 +283,19 @@ def create_client_thread(socket)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)

Thread.new(socket) do |my_socket|
pipe_reader, pipe_writer = IO.pipe
@client_mutex.synchronize do
@client_sockets << my_socket
@client_threads << Thread.current
@client_pipe_writers << pipe_writer
end

data = ''

begin
while true
begin
request_data = JsonDRb.receive_message(my_socket, data)
request_data = JsonDRb.receive_message(my_socket, data, pipe_reader)
start_time = Time.now
@request_count += 1
rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ENOTSOCK
Expand All @@ -309,6 +317,7 @@ def create_client_thread(socket)
Cosmos.close_socket(my_socket)
@client_sockets.delete(my_socket)
@client_threads.delete(Thread.current)
@client_pipe_writers.delete(pipe_writer)
end
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/cosmos/io/json_drb_object.rb
Expand Up @@ -48,6 +48,7 @@ def initialize(hostname, port, connect_timeout = 1.0)
@port = port
@mutex = Mutex.new
@socket = nil
@pipe_reader, @pipe_writer = IO.pipe
@id = 0
@request_in_progress = false
@connect_timeout = connect_timeout
Expand All @@ -58,6 +59,7 @@ def initialize(hostname, port, connect_timeout = 1.0)
# Disconnects from the JSON server
def disconnect
Cosmos.close_socket(@socket)
@pipe_writer.write('.')
# Cannot set @socket to nil here because this method can be called by
# other threads and @socket being nil would cause unexpected errors in method_missing
# Also don't want to take the mutex so that we can interrupt method_missing if necessary
Expand Down Expand Up @@ -113,6 +115,7 @@ def connect
addr = Socket.pack_sockaddr_in(@port, @hostname)
@socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
@socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
@pipe_reader, @pipe_writer = IO.pipe
begin
@socket.connect_nonblock(addr)
rescue IO::WaitWritable
Expand Down Expand Up @@ -159,7 +162,7 @@ def make_request(method_name, method_params, first_try)
STDOUT.puts request_data if JsonDRb.debug?
@request_in_progress = true
JsonDRb.send_data(@socket, request_data)
response_data = JsonDRb.receive_message(@socket, '')
response_data = JsonDRb.receive_message(@socket, '', @pipe_reader)
@request_in_progress = false
STDOUT.puts "\nResponse:\n" if JsonDRb.debug?
STDOUT.puts response_data if JsonDRb.debug?
Expand Down
88 changes: 70 additions & 18 deletions lib/cosmos/streams/tcpip_socket_stream.rb
Expand Up @@ -20,6 +20,8 @@ module Cosmos
class TcpipSocketStream < Stream
attr_reader :write_socket

FAST_READ = (RUBY_VERSION > "2.1")

# @param write_socket [Socket] Socket to write
# @param read_socket [Socket] Socket to read
# @param write_timeout [Float|nil] Number of seconds to wait for the write
Expand All @@ -39,6 +41,7 @@ def initialize(write_socket, read_socket, write_timeout, read_timeout)
# Mutex on write is needed to protect from commands coming in from more
# than one tool
@write_mutex = Mutex.new
@pipe_reader, @pipe_writer = IO.pipe
@connected = false
end

Expand All @@ -48,27 +51,66 @@ def read

# No read mutex is needed because there is only one stream procesor
# reading
begin
data = @read_socket.recv_nonblock(65535)
@raw_logger_pair.read_logger.write(data) if @raw_logger_pair
rescue IO::WaitReadable
# Wait for the socket to be ready for reading or for the timeout
if FAST_READ
begin
result = IO.fast_select([@read_socket], nil, nil, @read_timeout)
while true # Loop until we get some data
data = @read_socket.read_nonblock(65535, exception: false)
if data == :wait_readable
# Wait for the socket to be ready for reading or for the timeout
begin
result = IO.fast_select([@read_socket, @pipe_reader], nil, nil, @read_timeout)

# If select returns something it means the socket is now available for
# reading so retry the read. If it returns nil it means we timed out.
# If the pipe is present that means we closed the socket
if result
if result.include?(@pipe_reader)
raise IOError
else
next
end
else
raise Timeout::Error, "Read Timeout"
end
rescue IOError, Errno::ENOTSOCK
# These can happen with the socket being closed while waiting on select
data = ''
end
end
@raw_logger_pair.read_logger.write(data) if @raw_logger_pair
break
end
rescue Errno::ECONNRESET, Errno::ECONNABORTED, IOError, Errno::ENOTSOCK
data = ''
end
else
begin
data = @read_socket.read_nonblock(65535)
@raw_logger_pair.read_logger.write(data) if @raw_logger_pair
rescue IO::WaitReadable
# Wait for the socket to be ready for reading or for the timeout
begin
result = IO.fast_select([@read_socket, @pipe_reader], nil, nil, @read_timeout)

# If select returns something it means the socket is now available for
# reading so retry the read. If it returns nil it means we timed out.
if result
retry
else
raise Timeout::Error, "Read Timeout"
# If select returns something it means the socket is now available for
# reading so retry the read. If it returns nil it means we timed out.
# If the pipe is present that means we closed the socket
if result
if result.include?(@pipe_reader)
raise IOError
else
retry
end
else
raise Timeout::Error, "Read Timeout"
end
rescue IOError, Errno::ENOTSOCK
# These can happen with the socket being closed while waiting on select
data = ''
end
rescue IOError, Errno::ENOTSOCK
# These can happen with the socket being closed while waiting on select
rescue Errno::ECONNRESET, Errno::ECONNABORTED, IOError, Errno::ENOTSOCK
data = ''
end
rescue Errno::ECONNRESET, Errno::ECONNABORTED, IOError, Errno::ENOTSOCK
data = ''
end

data
Expand All @@ -79,8 +121,17 @@ def read_nonblock
# No read mutex is needed because there is only one stream procesor
# reading
begin
data = @read_socket.recv_nonblock(65535)
@raw_logger_pair.read_logger.write(data) if @raw_logger_pair
if FAST_READ
data = @read_socket.read_nonblock(65535, exception: false)
if data == :wait_readable
data = ''
else
@raw_logger_pair.read_logger.write(data) if @raw_logger_pair
end
else
data = @read_socket.read_nonblock(65535)
@raw_logger_pair.read_logger.write(data) if @raw_logger_pair
end
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::ECONNRESET, Errno::ECONNABORTED
data = ''
end
Expand Down Expand Up @@ -135,6 +186,7 @@ def connected?
def disconnect
Cosmos.close_socket(@write_socket)
Cosmos.close_socket(@read_socket)
@pipe_writer.write('.')
@connected = false
end

Expand Down
28 changes: 7 additions & 21 deletions spec/io/json_drb_spec.rb
Expand Up @@ -17,6 +17,7 @@ module Cosmos
describe JsonDRb do
before(:each) do
@json = JsonDRb.new
@pipe_reader, @pipe_writer = IO.pipe
end

describe "initialize" do
Expand Down Expand Up @@ -122,21 +123,6 @@ module Cosmos
end

describe "receive_message" do
it "returns nil if 4 bytes of data aren't available" do
@json.start_service('127.0.0.1', 7777, self)
socket = TCPSocket.open('127.0.0.1',7777)
# Stub recv_nonblock so it returns nothing
allow(socket).to receive(:recv_nonblock) { "" }
sleep 0.1
JsonDRb.send_data(socket, "\x00")
response_data = JsonDRb.receive_message(socket, '')
expect(response_data).to be_nil
socket.close
sleep 0.1
@json.stop_service
sleep(0.1)
end

it "processes success requests" do
class MyServer1
def my_method(param)
Expand All @@ -148,7 +134,7 @@ def my_method(param)
sleep 0.1
request = JsonRpcRequest.new('my_method', 'param', 1).to_json
JsonDRb.send_data(socket, request)
response_data = JsonDRb.receive_message(socket, '')
response_data = JsonDRb.receive_message(socket, '', @pipe_reader)
response = JsonRpcResponse.from_json(response_data)
expect(response).to be_a(JsonRpcSuccessResponse)
socket.close
Expand All @@ -166,7 +152,7 @@ class MyServer2
sleep 0.1
request = JsonRpcRequest.new('my_method', 'param', 1).to_json
JsonDRb.send_data(socket, request)
response_data = JsonDRb.receive_message(socket, '')
response_data = JsonDRb.receive_message(socket, '', @pipe_reader)
response = JsonRpcResponse.from_json(response_data)
expect(response).to be_a(JsonRpcErrorResponse)
expect(response.error.code).to eql -32601
Expand All @@ -188,7 +174,7 @@ def my_method(param1, param2)
sleep 0.1
request = JsonRpcRequest.new('my_method', 'param1', 1).to_json
JsonDRb.send_data(socket, request)
response_data = JsonDRb.receive_message(socket, '')
response_data = JsonDRb.receive_message(socket, '', @pipe_reader)
response = JsonRpcResponse.from_json(response_data)
expect(response).to be_a(JsonRpcErrorResponse)
expect(response.error.code).to eql -32602
Expand All @@ -211,7 +197,7 @@ def my_method(param)
sleep 0.1
request = JsonRpcRequest.new('my_method', 'param', 1).to_json
JsonDRb.send_data(socket, request)
response_data = JsonDRb.receive_message(socket, '')
response_data = JsonDRb.receive_message(socket, '', @pipe_reader)
response = JsonRpcResponse.from_json(response_data)
expect(response).to be_a(JsonRpcErrorResponse)
expect(response.error.code).to eql -1
Expand All @@ -228,7 +214,7 @@ def my_method(param)
sleep 0.1
request = JsonRpcRequest.new('send', 'param', 1).to_json
JsonDRb.send_data(socket, request)
response_data = JsonDRb.receive_message(socket, '')
response_data = JsonDRb.receive_message(socket, '', @pipe_reader)
response = JsonRpcResponse.from_json(response_data)
expect(response).to be_a(JsonRpcErrorResponse)
expect(response.error.code).to eql -1
Expand All @@ -247,7 +233,7 @@ def my_method(param)
request.gsub!("jsonrpc","version")
request.gsub!("2.0","1.1")
JsonDRb.send_data(socket, request)
response_data = JsonDRb.receive_message(socket, '')
response_data = JsonDRb.receive_message(socket, '', @pipe_reader)
response = JsonRpcResponse.from_json(response_data)
expect(response).to be_a(JsonRpcErrorResponse)
expect(response.error.code).to eql -32600
Expand Down

0 comments on commit 305d0fe

Please sign in to comment.