Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
tree: 07ef0284b1
Fetching contributors…

Cannot retrieve contributors at this time

214 lines (177 sloc) 5.723 kB
require 'socket'
require 'thread'
require 'timeout'
module AMQP
class Server
CONNECT_TIMEOUT = 1.0
RETRY_DELAY = 10.0
DEFAULT_PORT = 5672
attr_reader :host, :port, :status
attr_accessor :retry_at, :channel, :ticket
class ConnectionError < StandardError; end
class ServerError < StandardError; end
class ClientError < StandardError; end
class ServerDown < StandardError; end
class ProtocolError < StandardError; end
def initialize(opts = {})
@host = opts[:host] || 'localhost'
@port = opts[:port] || DEFAULT_PORT
@user = opts[:user] || 'guest'
@pass = opts[:pass] || 'guest'
@vhost = opts[:vhost] || '/'
@insist = opts[:insist]
@status = 'NOT CONNECTED'
@multithread = opts[:multithread]
start_session
end
def start_session
@channel = 0
write(HEADER)
write([1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4'))
raise ProtocolError, 'bad start connection' unless next_method.is_a?(Protocol::Connection::Start)
send_frame(
Protocol::Connection::StartOk.new(
{:platform => 'Ruby', :product => 'Carrot', :information => 'http://github.com/famosagle/carrot', :version => VERSION},
'AMQPLAIN',
{:LOGIN => @user, :PASSWORD => @pass},
'en_US'
)
)
if next_method.is_a?(Protocol::Connection::Tune)
send_frame(
Protocol::Connection::TuneOk.new( :channel_max => 0, :frame_max => 131072, :heartbeat => 0)
)
end
send_frame(
Protocol::Connection::Open.new(:virtual_host => @vhost, :capabilities => '', :insist => @insist)
)
raise ProtocolError, 'bad open connection' unless next_method.is_a?(Protocol::Connection::OpenOk)
@channel = 1
send_frame(Protocol::Channel::Open.new)
raise ProtocolError, "cannot open channel #{channel}" unless next_method.is_a?(Protocol::Channel::OpenOk)
send_frame(
Protocol::Access::Request.new(:realm => '/data', :read => true, :write => true, :active => true, :passive => true)
)
method = next_method
raise ProtocolError, 'access denied' unless method.is_a?(Protocol::Access::RequestOk)
self.ticket = method.ticket
end
def send_frame(*args)
args.each do |data|
data.ticket = ticket if ticket and data.respond_to?(:ticket=)
data = data.to_frame(channel) unless data.is_a?(Frame)
data.channel = channel
log :send, data
write(data.to_s)
end
end
def next_frame
frame = Frame.get(self)
log :received, frame
frame
end
def next_method
next_payload
end
def next_payload
next_frame.payload
end
def close
send_frame(
Protocol::Channel::Close.new(:reply_code => 200, :reply_text => 'bye', :method_id => 0, :class_id => 0)
)
puts "Error closing channel #{channel}" unless next_method.is_a?(Protocol::Channel::CloseOk)
self.channel = 0
send_frame(
Protocol::Connection::Close.new(:reply_code => 200, :reply_text => 'Goodbye', :class_id => 0, :method_id => 0)
)
puts "Error closing connection" unless next_method.is_a?(Protocol::Connection::CloseOk)
close_socket
end
def read(*args)
with_socket_management do |socket|
socket.read(*args)
end
end
def write(*args)
with_socket_management do |socket|
socket.write(*args)
end
end
private
def with_socket_management(&block)
retried = false
begin
mutex.lock if multithread?
yield socket
rescue ClientError, ServerError, SocketError, SystemCallError, IOError => error
if not retried
# Close the socket and retry once.
close_socket
#start_session
retried = true
retry
else
# Mark the server dead and raise an error.
close(error.message)
# Reraise as a ConnectionError
new_error = ConnectionError.new("#{error.class}: #{error.message}")
new_error.set_backtrace(error.backtrace)
raise new_error
end
ensure
mutex.unlock if multithread?
end
end
def socket
return @socket if @socket and not @socket.closed?
raise ServerDown, "will retry at #{retry_at}" unless retry?
begin
# Attempt to connect.
mutex.lock if multithread?
@socket = timeout(CONNECT_TIMEOUT) do
TCPSocket.new(host, port)
end
if Socket.constants.include? 'TCP_NODELAY'
@socket.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
end
@retry_at = nil
@status = 'CONNECTED'
rescue SocketError, SystemCallError, IOError, Timeout::Error => e
close_socket
raise ServerDown, e.message
ensure
mutex.unlock if multithread?
end
@socket
end
def multithread?
@multithread
end
def retry?
@retry_at.nil? or @retry_at < Time.now
end
def unexpected_eof!
raise ConnectionError, 'unexpected end of file'
end
def close_socket(reason=nil)
# Close the socket. The server is not considered dead.
mutex.lock if multithread?
@socket.close if @socket and not @socket.closed?
@socket = nil
@retry_at = nil
@status = "NOT CONNECTED"
ensure
mutex.unlock if multithread?
end
def mutex
@mutex ||= Mutex.new
end
def log(*args)
return unless Carrot.logging?
require 'pp'
pp args
puts
end
end
end
Jump to Line
Something went wrong with that request. Please try again.