Skip to content

Commit

Permalink
TCPSocket compatibility updates
Browse files Browse the repository at this point in the history
- recv and send now behave differently
- methods raise the right exceptions at the right time
- specs added
  • Loading branch information
eliaslevy committed Jun 9, 2012
1 parent de0efbb commit e7f2b92
Show file tree
Hide file tree
Showing 2 changed files with 651 additions and 47 deletions.
185 changes: 157 additions & 28 deletions lib/em-synchrony/tcpsocket.rb
Expand Up @@ -7,8 +7,22 @@ def new(*args)
if args.size == 1
_old_new(*args)
else
socket = EventMachine::connect(*args[0..1], self)
raise SocketError unless socket.sync(:in) # wait for connection
# In TCPSocket, new against an unknown hostname raises SocketError with
# a message "getaddrinfo: nodename nor servname provided, or not known".
# In EM, connect against an unknown hostname raises EM::ConnectionError
# with a message of "unable to resolve server address"
begin
socket = EventMachine::connect(*args[0..1], self)
rescue EventMachine::ConnectionError => e
raise SocketError, e.message
end
# In TCPSocket, new against a closed port raises Errno::ECONNREFUSED.
# In EM, connect against a closed port result in a call to unbind with
# a reason param of Errno::ECONNREFUSED as a class, not an instance.
unless socket.sync(:in) # wait for connection
raise socket.unbind_reason.new if socket.unbind_reason.is_a? Class
raise SocketError
end
socket
end
end
Expand All @@ -17,77 +31,186 @@ def new(*args)

def post_init
@in_buff, @out_buff = '', ''
@in_req = @out_req = nil
@in_req = @out_req = @unbind_reason = @read_type = nil
@opening = true
@closed = @remote_closed = false
end

def closed?
@in_req.nil? && @out_req.nil?
# In TCPSocket,
# closed? on a remotely closed socket, when we've not yet read EOF, returns false
# closed? on a remotely closed socket, when we've read EOF, returns false
# closed? on a socket after #close, returns true
# Therefore, we set @close to true when #close is called, but not when unbind is.
@closed
end

# direction must be one of :in or :out
def sync(direction)
req = self.instance_variable_set "@#{direction.to_s}_req", EventMachine::DefaultDeferrable.new
EventMachine::Synchrony.sync req
ensure
self.instance_variable_set "@#{direction.to_s}_req", nil
end

# TCPSocket interface
def setsockopt(level, name, value); end

def send(msg, flags = 0)
def send(msg, flags)
raise "Unknown flags in send(): #{flags}" if flags.nonzero?
# write(X) on a socket after #close, raises IOError with message "closed stream"
# send(X,0) on a socket after #close, raises IOError with message "closed stream"
raise IOError, "closed stream" if @closed
# the first write(X) on a remotely closed socket, <= than some buffer size, generates no error
# the first write(X) on a remotely closed socket, > than some buffer size, generates no error
# (on my box this buffer appears to be 80KB)
# further write(X) on a remotely closed socket, raises Errno::EPIPE
# the first send(X,0) on a remotely closed socket, <= than some buffer size, generates no error
# the first send(X,0) on a remotely closed socket, > than some buffer size, generates no error
# (on my box this buffer appears to be 80KB)
# further send(X,0) on a remotely closed socket, raises Errno::EPIPE
raise Errno::EPIPE if @remote_closed

len = msg.bytesize
write_data(msg) or sync(:out) or raise(IOError)
# write(X) on an open socket, where the remote end closes during the write, raises Errno::EPIPE
# send(X,0) on an open socket, where the remote end closes during the write, raises Errno::EPIPE
write_data(msg) or sync(:out) or raise(Errno::EPIPE)
len
end
alias_method :write, :send

def write(msg)
send(msg,0)
end

def read(num_bytes = 16*1024, dest = nil)
read_data(num_bytes, dest) or sync(:in) or raise(IOError)
def read(num_bytes = nil, dest = nil)
handle_read(:read, num_bytes, dest)
end
alias_method :read_nonblock, :read
alias_method :recv, :read

def recv(num_bytes, flags = 0)
raise "Unknown flags in recv(): #{flags}" if flags.nonzero?
handle_read(:recv, num_bytes)
end

def close
# close on a closed socket raises IOError with a message of "closed stream"
raise IOError, "closed stream" if @closed
@closed = true
close_connection true
@in_req = @out_req = nil
# close on an open socket returns nil
nil
end

# EventMachine interface
def connection_completed
@opening = false
@in_req.succeed self
end

def unbind
@in_req.fail nil if @in_req

attr_reader :unbind_reason

# Can't set a default value for reason (e.g. reason=nil), as in that case
# EM fails to pass in the reason argument and you'll always get the default
# value.
def unbind(reason)
@unbind_reason = reason
@remote_closed = true unless @closed
if @opening
@in_req.fail nil if @in_req
else
@in_req.succeed read_data if @in_req
end
@out_req.fail nil if @out_req
close
@in_req = @out_req = nil
end

def receive_data(data)
@in_buff << data
if @in_req && (data = read_data)
@in_req.succeed data
@in_req.succeed data unless data == :block
end
end

protected
def read_data(num_bytes = nil, dest = nil)
@read_bytes = num_bytes if num_bytes
@read_dest = dest if dest
if @in_buff.size > 0
data = @in_buff.slice!(0, @read_bytes)
@read_bytes = 0
def handle_read(type, num_bytes, dest=nil)
# read(-n) always raises ArgumentError
# recv(-n) always raises ArgumentError
raise ArgumentError, "negative length #{num_bytes} given" if num_bytes != nil and num_bytes < 0
# read(n) on a socket after #close, raises IOError with message "closed stream"
# read(0) on a socket after #close, raises IOError with message "closed stream"
# read() on a socket after #close, raises IOError with message "closed stream"
# recv(n) on a socket after #close, raises IOError with message "closed stream"
# recv(0) on a socket after #close, raises IOError with message "closed stream"
raise IOError, "closed stream" if @closed
# read(0) on an open socket, return ""
# read(0) on a remotely closed socket, with buffered data, returns ""
# read(0) on a remotely closed socket, with no buffered data, returns ""
# recv(0) on an open socket, return ""
# recv(0) on a remotely closed socket, with buffered data, returns ""
# recv(0) on a remotely closed socket, with no buffered data, returns ""
return "" if num_bytes == 0

if @read_dest
@read_dest.replace data
@read_dest = nil
@read_type = type
@read_bytes = num_bytes
@read_dest = dest if dest

(data = read_data) != :block ? data : sync(:in)
end

def try_read_data
if @read_type == :read
unless @remote_closed
if @read_bytes
# read(n) on an open socket, with >= than n buffered data, returns n data
if @in_buff.size >= @read_bytes then @in_buff.slice!(0, @read_bytes)
# read(n) on an open socket, with < than n buffered data, blocks
else :block end
else
# read() on an open socket, blocks until a remote close and returns all the data sent
:block
end
else
if @read_bytes
# read(n) on a remotely closed socket, with no buffered data, returns nil
if @in_buff.empty? then nil
# read(n) on a remotely closed socket, with buffered data, returns the buffered data up to n
else @in_buff.slice!(0, @read_bytes) end
else
# read() on a remotely closed socket, with no buffered data, returns ""
if @in_buff.empty? then ""
# read() on a remotely closed socket, with buffered data, returns the buffered data
else @in_buff.slice!(0, @in_buff.size) end
end
end
else #recv
unless @remote_closed
# recv(n) on an open socket, with no buffered data, blocks
if @in_buff.empty? then :block
# recv(n) on an open socket, with < than n buffered data, return the buffered data
# recv(n) on an open socket, with >= than n buffered data, returns n data
else @in_buff.slice!(0, @read_bytes) end
else
# recv(n) on a remotely closed socket, with no buffered data, returns ""
if @in_buff.empty? then ""
# recv(n) on a remotely closed socket, with < than n buffered data, return the buffered data
# recv(n) on a remotely closed socket, with >= than n buffered data, returns n data
else @in_buff.slice!(0, @read_bytes) end
end
data
else
nil
end
end


def read_data
data = try_read_data
unless data == :block
@read_bytes = 0
# read(n,buffer) returns the buffer when it does not return nil or raise an exception
data = @read_dest.replace(data) if @read_dest and not data.nil?
@read_dest = nil
end
data
end

def write_data(data = nil)
@out_buff += data if data

Expand All @@ -98,9 +221,15 @@ def write_data(data = nil)
end

if self.get_outbound_data_size > EventMachine::FileStreamer::BackpressureLevel
# write(X) on an open socket, where the remote end is not reading, > than some buffer size, blocks
# send(X,0) on an open socket, where the remote end is not reading, > than some buffer size, blocks
# where that buffer size is EventMachine::FileStreamer::BackpressureLevel, returning false will
# cause write/send to block
EventMachine::next_tick { write_data }
return false
else
# write(X) on an open socket, where the remote end is not reading, <= than some buffer size, sends and returns
# send(X,0) on an open socket, where the remote end is not reading, <= than some buffer size, sends returns
len = [@out_buff.bytesize, EventMachine::FileStreamer::ChunkSize].min
self.send_data @out_buff.slice!( 0, len )
end
Expand Down

1 comment on commit e7f2b92

@nel
Copy link

@nel nel commented on e7f2b92 Jun 13, 2012

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am testing this pull request to try and pass Mongoid 3 moped driver and their is definitly an improvement on #read and #closed? when it comed to mimicking the TCPSocket behavior.

Maybe it would be a good idea to also add an empty TCPSocket#set_encoding when runned under Ruby 1.9 as this is in the official API

Please sign in to comment.