Skip to content

Commit

Permalink
simplify send/recv implementation according to XCB cookie approach:
Browse files Browse the repository at this point in the history
  • Loading branch information
sunaku committed Nov 17, 2009
1 parent 132095f commit 9519835
Showing 1 changed file with 64 additions and 30 deletions.
94 changes: 64 additions & 30 deletions lib/rumai/ixp/transport.rb
Expand Up @@ -10,25 +10,20 @@
module Rumai
module IXP
##
# A thread-safe proxy that multiplexes many
# A thread-safe channel that multiplexes many
# threads onto a single 9P2000 connection.
#
# The send/recv implementation is based on the XCB cookie approach:
# http://www.x.org/releases/X11R7.5/doc/libxcb/tutorial/#requestsreplies
#
class Agent
attr_reader :msize

def initialize stream
@stream = stream
@send_lock = Mutex.new
@recv_bays = Hash.new {|h,k| h[k] = Queue.new } # tag => Queue(message)

# background thread which continuously receives
# and dispatches messages from the 9P2000 server
Thread.new do
while true
msg = Fcall.from_9p @stream
@recv_bays[msg.tag] << msg
end
end.priority = -1
@stream = stream

@recv_buf = {} # tag => message
@recv_lock = Mutex.new

@tag_pool = RangedPool.new(0...BYTE2_MASK)
@fid_pool = RangedPool.new(0...BYTE4_MASK)
Expand Down Expand Up @@ -102,30 +97,69 @@ def release member
end

##
# Sends the given message (Rumai::IXP::Fcall) and returns its response.
#
# This method allows you to perform a 9P2000 transaction without
# worrying about the details of tag collisions and thread safety.
# Sends the given request (Rumai::IXP::Fcall) and returns
# a ticket that you can use later to receive the reply.
#
def talk request
# send the request
def send request
tag = @tag_pool.obtain
bay = @recv_bays[tag]

request.tag = tag
output = request.to_9p
@send_lock.synchronize do
@stream << output
@stream << request.to_9p

tag
end

##
# Returns the reply for the given ticket, which was previously given
# to you when you sent the corresponding request (Rumai::IXP::Fcall).
#
def recv tag
loop do
reply = @recv_lock.synchronize do
if @recv_buf.key? tag
@recv_buf.delete tag
else
# reply was not in the receive buffer, so wait
# for the next reply... hoping that it is ours
msg = Fcall.from_9p(@stream)

if msg.tag == tag
msg
else
# we got someone else's reply, so buffer
# it (for them to receive) and try again
@recv_buf[msg.tag] = msg
nil
end
end
end

if reply
@tag_pool.release tag

if reply.is_a? Rerror
raise Error, reply.ename
end

return reply
else
# give other threads a chance to receive
Thread.pass
end
end
end

# receive the response
response = bay.shift
@tag_pool.release tag
##
# Sends the given request (Rumai::IXP::Fcall) and returns its reply.
#
def talk request
tag = send(request)

if response.is_a? Rerror
raise Error, "#{response.ename.inspect} in response to #{request.inspect}"
else
return response
begin
recv tag
rescue Error => e
e.message << " -- in reply to #{request.inspect}"
raise
end
end

Expand Down

0 comments on commit 9519835

Please sign in to comment.