Permalink
Browse files

a working version with ack

  • Loading branch information...
1 parent 3008d32 commit add3c567bfa31e0afb502f470632150d07da463c Amos Elliston committed Apr 8, 2009
Showing with 54 additions and 118 deletions.
  1. +3 −5 lib/amqp/exchange.rb
  2. +32 −84 lib/amqp/queue.rb
  3. +19 −29 lib/amqp/server.rb
View
8 lib/amqp/exchange.rb
@@ -6,7 +6,7 @@ def initialize(server, type, name, opts = {})
@key = opts[:key]
unless name == "amq.#{type}" or name == ''
- @server.send_command(
+ @server.send_frame(
Protocol::Exchange::Declare.new(
{ :exchange => name, :type => type, :nowait => true }.merge(opts)
)
@@ -32,13 +32,11 @@ def publish(data, opts = {})
)
out << Frame::Body.new(data)
- @server.send_command(*out)
- self
+ @server.send_frame(*out)
end
def delete(opts = {})
- @server.send_command(Protocol::Exchange::Delete.new({ :exchange => name, :nowait => true }.merge(opts)))
- nil
+ @server.send_frame(Protocol::Exchange::Delete.new({ :exchange => name, :nowait => true }.merge(opts)))
end
def reset
View
116 lib/amqp/queue.rb
@@ -1,27 +1,46 @@
module AMQP
class Queue
- attr_reader :name
+ attr_reader :name, :server
+ attr_accessor :delivery_tag
def initialize(server, name, opts = {})
@server = server
@opts = opts
@name = name
- @server.send_command(
+ @server.send_frame(
Protocol::Queue::Declare.new({ :queue => name, :nowait => true }.merge(opts))
)
end
def delete(opts = {})
- @server.send_command(
+ server.send_frame(
Protocol::Queue::Delete.new({ :queue => name, :nowait => true }.merge(opts))
)
end
- def pop(opts = {}, &blk)
- @server.send_command(
+ def pop(opts = {})
+ self.delivery_tag = nil
+ server.send_frame(
Protocol::Basic::Get.new({ :queue => name, :consumer_tag => name, :no_ack => !opts.delete(:ack), :nowait => true }.merge(opts))
)
- @server.receive_frame(&blk)
+ frame = server.next_frame
+ return if frame.is_a?(Frame::Method) and frame.payload.is_a?(Protocol::Basic::GetEmpty)
+
+ method = frame.payload
+ self.delivery_tag = method.delivery_tag
+
+ frame = server.next_frame
+ header = frame.payload
+ frame = server.next_frame
+ msg = frame.payload
+ raise 'unexpected length' if msg.length < header.size
+ msg
+ end
+
+ def ack
+ server.send_frame(
+ Protocol::Basic::Ack.new(:delivery_tag => delivery_tag)
+ )
end
def publish(data, opts = {})
@@ -37,88 +56,17 @@ def consumer_count
end
def status(opts = {}, &blk)
- message_count, consumer_count = 0
- @server.send_command(Protocol::Queue::Declare.new({ :queue => name, :passive => true }.merge(opts))) do |status|
- message_count = status.message_count
- consumer_count = status.consumer_count
- end
- [message_count, consumer_count]
+ server.send_frame(
+ Protocol::Queue::Declare.new({ :queue => name, :passive => true }.merge(opts))
+ )
+ frame = @server.next_frame
+ method = frame.payload
+ [method.message_count, method.consumer_count]
end
- #--------------------------------------------------
- # def bind(exchange, opts = {})
- # exchange = exchange.respond_to?(:name) ? exchange.name : exchange
- # @bindings[exchange] = opts
- #
- # @server.send(
- # Protocol::Queue::Bind.new(
- # { :queue => name, :exchange => exchange, :routing_key => opts.delete(:key), :nowait => true }.merge(opts)
- # )
- # )
- # self
- # end
- #
- # def unbind(exchange, opts = {})
- # exchange = exchange.respond_to?(:name) ? exchange.name : exchange
- # @bindings.delete(exchange)
- # @server.send(
- # Protocol::Queue::Unbind.new(
- # { :queue => name, :exchange => exchange, :routing_key => opts.delete(:key), :nowait => true }.merge(opts)
- # )
- # )
- # self
- # end
- #
- # def reset
- # @deferred_status = nil
- # initialize @server, @name, @opts
- #
- # binds = @bindings
- # @bindings = {}
- # binds.each{|ex,opts| bind(ex, opts) }
- #
- # if blk = @on_msg
- # @on_msg = nil
- # subscribe(@on_msg_opts, &blk)
- # end
- #
- # if @on_pop
- # pop(@on_pop_opts, &@on_pop)
- # end
- # end
- #
- # def subscribed?
- # !!@on_msg
- # end
- #
- # def subscribe(opts = {}, &blk)
- # @consumer_tag = "#{name}-#{Kernel.rand(999_999_999_999)}"
- # @server.consumers[@consumer_tag] = self
- #
- # raise Error, 'already subscribed to the queue' if subscribed?
- #
- # @on_msg = blk
- # @on_msg_opts = opts
- #
- # @server.send(
- # Protocol::Basic::Consume.new(
- # {:queue => name, :consumer_tag => @consumer_tag, :no_ack => !opts.delete(:ack), :nowait => true }.merge(opts)
- # )
- # )
- # self
- # end
- #
- # def unsubscribe(opts = {}, &blk)
- # @on_msg = nil
- # @on_cancel = blk
- # @server.send(Protocol::Basic::Cancel.new({ :consumer_tag => @consumer_tag }.merge(opts)))
- # self
- # end
- #--------------------------------------------------
-
private
def exchange
- @exchange ||= Exchange.new(@server, :direct, '', :key => name)
+ @exchange ||= Exchange.new(server, :direct, '', :key => name)
end
end
end
View
48 lib/amqp/server.rb
@@ -42,7 +42,7 @@ def retry?
@retry_at.nil? or @retry_at < Time.now
end
- def send_command(*args)
+ def send_frame(*args)
args.each do |data|
data.ticket = ticket if ticket and data.respond_to?(:ticket=)
data = data.to_frame(channel) unless data.is_a?(Frame)
@@ -91,30 +91,33 @@ def with_socket(&block)
end
end
- def receive_frame(&block)
+ def next_frame
frame = Frame.get(self)
- return unless frame
-
log :received, frame
+ frame
+ end
+
+ def receive_frame
+ frame = next_frame
+ return unless frame
case frame
when Frame::Header
@header = frame.payload
@body = ''
- receive_frame(&block)
+ receive_frame
when Frame::Body
@body << frame.payload
if @body.length >= @header.size
@header.properties.update(@method.arguments)
- block.call(@header, @body) if block
@body = @header = @consumer = @method = nil
end
when Frame::Method
case method = frame.payload
when Protocol::Connection::Start
- send_command(
+ send_frame(
Protocol::Connection::StartOk.new(
{:platform => 'Ruby', :product => 'Carrot', :information => 'http://github.com/famosagle/carrot', :version => VERSION},
'AMQPLAIN',
@@ -125,10 +128,10 @@ def receive_frame(&block)
receive_frame
when Protocol::Connection::Tune
- send_command(
+ send_frame(
Protocol::Connection::TuneOk.new( :channel_max => 0, :frame_max => 131072, :heartbeat => 0)
)
- send_command(
+ send_frame(
Protocol::Connection::Open.new(:virtual_host => @vhost, :capabilities => '', :insist => @insist)
)
receive_frame
@@ -138,32 +141,19 @@ def receive_frame(&block)
when Protocol::Connection::OpenOk
self.channel = 1
- send_command(Protocol::Channel::Open.new)
+ send_frame(Protocol::Channel::Open.new)
receive_frame
when Protocol::Channel::OpenOk
- send_command(
+ send_frame(
Protocol::Access::Request.new(:realm => '/data', :read => true, :write => true, :active => true, :passive => true)
)
receive_frame
when Protocol::Access::RequestOk
self.ticket = method.ticket
- block.call(method) if block
- when Protocol::Queue::DeclareOk
- block.call(method) if block
-
- when Protocol::Basic::CancelOk, Protocol::Connection::CloseOk, Protocol::Channel::CloseOk
-
- when Protocol::Basic::Deliver, Protocol::Basic::GetOk
- @method = method
- @header = nil
- @body = ''
- receive_frame(&block)
-
- when Protocol::Basic::GetEmpty
- block.call(nil) if block
+ when Protocol::Basic::CancelOk, Protocol::Queue::DeclareOk
when Protocol::Channel::Close
raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}"
@@ -173,15 +163,15 @@ def receive_frame(&block)
end
def close
- send_command(
+ send_frame(
Protocol::Channel::Close.new(:reply_code => 200, :reply_text => 'bye', :method_id => 0, :class_id => 0)
)
- receive_frame
+ next_frame
self.channel = 0
- send_command(
+ send_frame(
Protocol::Connection::Close.new(:reply_code => 200, :reply_text => 'Goodbye', :class_id => 0, :method_id => 0)
)
- receive_frame
+ next_frame
close_socket
end

0 comments on commit add3c56

Please sign in to comment.