Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

adding examples and readme

  • Loading branch information...
commit 07ef0284b1de72fc052db2127cc122cc161f7550 1 parent add3c56
Amos Elliston authored
View
2  LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2008 Amos Elliston
+Copyright (c) 2009 Amos Elliston
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
View
9 README
@@ -1,9 +0,0 @@
-carrot
-======
-
-Description goes here.
-
-COPYRIGHT
-=========
-
-Copyright (c) 2008 Amos Elliston. See LICENSE for details.
View
24 README.markdown
@@ -0,0 +1,24 @@
+# Carrot
+
+A synchronous amqp client. Based on Aman's amqp client:
+
+http://github.com/tmm1/amqp/tree/master
+
+## Example
+
+q = Carrot.queue('name', :durable => true, :host => 'q1.rabbitmq.com')
+100.times do
+ q.publish('foo')
+end
+
+pp :count, q.message_count
+
+while msg = q.pop(:ack => true)
+ puts msg
+ q.ack
+end
+Carrot.stop
+
+# LICENSE
+
+Copyright (c) 2009 Amos Elliston, Geni.com; Published under The MIT License, see License
View
2  Rakefile
@@ -37,4 +37,4 @@ Rcov::RcovTask.new do |t|
t.verbose = true
end
-task :default => :rcov
+task :default => :test
View
12 lib/amqp/exchange.rb
@@ -1,12 +1,13 @@
module AMQP
class Exchange
+ attr_accessor :server, :type, :name, :opts, :key
def initialize(server, type, name, opts = {})
@server, @type, @name, @opts = server, type, name, opts
@key = opts[:key]
unless name == "amq.#{type}" or name == ''
- @server.send_frame(
+ server.send_frame(
Protocol::Exchange::Declare.new(
{ :exchange => name, :type => type, :nowait => true }.merge(opts)
)
@@ -19,7 +20,7 @@ def publish(data, opts = {})
out = []
out << Protocol::Basic::Publish.new(
- { :exchange => name, :routing_key => opts.delete(:key) || @key }.merge(opts)
+ { :exchange => name, :routing_key => opts.delete(:key) || key }.merge(opts)
)
data = data.to_s
out << Protocol::Header.new(
@@ -32,16 +33,15 @@ def publish(data, opts = {})
)
out << Frame::Body.new(data)
- @server.send_frame(*out)
+ server.send_frame(*out)
end
def delete(opts = {})
- @server.send_frame(Protocol::Exchange::Delete.new({ :exchange => name, :nowait => true }.merge(opts)))
+ server.send_frame(Protocol::Exchange::Delete.new({ :exchange => name, :nowait => true }.merge(opts)))
end
def reset
- @deferred_status = nil
- initialize(@server, @type, @name, @opts)
+ initialize(server, type, name, opts)
end
end
end
View
1  lib/amqp/frame.rb
@@ -71,6 +71,7 @@ def self.get(server)
end
if $0 =~ /bacon/ or $0 == __FILE__
+ require 'rubygems'
require 'bacon'
include AMQP
View
18 lib/amqp/queue.rb
@@ -7,7 +7,7 @@ def initialize(server, name, opts = {})
@server = server
@opts = opts
@name = name
- @server.send_frame(
+ server.send_frame(
Protocol::Queue::Declare.new({ :queue => name, :nowait => true }.merge(opts))
)
end
@@ -16,6 +16,7 @@ def delete(opts = {})
server.send_frame(
Protocol::Queue::Delete.new({ :queue => name, :nowait => true }.merge(opts))
)
+ pp server.next_method
end
def pop(opts = {})
@@ -23,17 +24,15 @@ def pop(opts = {})
server.send_frame(
Protocol::Basic::Get.new({ :queue => name, :consumer_tag => name, :no_ack => !opts.delete(:ack), :nowait => true }.merge(opts))
)
- frame = server.next_frame
- return if frame.is_a?(Frame::Method) and frame.payload.is_a?(Protocol::Basic::GetEmpty)
+ method = server.next_method
+ return if method.is_a?(Protocol::Basic::GetEmpty)
- method = frame.payload
self.delivery_tag = method.delivery_tag
- frame = server.next_frame
- header = frame.payload
- frame = server.next_frame
- msg = frame.payload
+ header = server.next_payload
+ msg = server.next_payload
raise 'unexpected length' if msg.length < header.size
+
msg
end
@@ -59,8 +58,7 @@ def status(opts = {}, &blk)
server.send_frame(
Protocol::Queue::Declare.new({ :queue => name, :passive => true }.merge(opts))
)
- frame = @server.next_frame
- method = frame.payload
+ method = server.next_method
[method.message_count, method.consumer_count]
end
View
184 lib/amqp/server.rb
@@ -11,11 +11,11 @@ class Server
attr_reader :host, :port, :status
attr_accessor :retry_at, :channel, :ticket
- class Error < StandardError; end
- class ConnectionError < Error; end
- class ServerError < Error; end
- class ClientError < Error; end
- class ServerDown < Error; end
+ class ConnectionError < StandardError; end
+ class ServerError < StandardError; end
+ class ClientError < StandardError; end
+ class ServerDown < StandardError; end
+ class ProtocolError < StandardError; end
def initialize(opts = {})
@host = opts[:host] || 'localhost'
@@ -24,22 +24,48 @@ def initialize(opts = {})
@pass = opts[:pass] || 'guest'
@vhost = opts[:vhost] || '/'
@insist = opts[:insist]
- @channel= 0
@status = 'NOT CONNECTED'
@multithread = opts[:multithread]
+ start_session
+ end
+ def start_session
+ @channel = 0
write(HEADER)
write([1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4'))
- receive_frame
- end
+ raise ProtocolError, 'bad start connection' unless next_method.is_a?(Protocol::Connection::Start)
- def multithread?
- @multithread
- end
+ send_frame(
+ Protocol::Connection::StartOk.new(
+ {:platform => 'Ruby', :product => 'Carrot', :information => 'http://github.com/famosagle/carrot', :version => VERSION},
+ 'AMQPLAIN',
+ {:LOGIN => @user, :PASSWORD => @pass},
+ 'en_US'
+ )
+ )
- def retry?
- @retry_at.nil? or @retry_at < Time.now
+ if next_method.is_a?(Protocol::Connection::Tune)
+ send_frame(
+ Protocol::Connection::TuneOk.new( :channel_max => 0, :frame_max => 131072, :heartbeat => 0)
+ )
+ end
+
+ send_frame(
+ Protocol::Connection::Open.new(:virtual_host => @vhost, :capabilities => '', :insist => @insist)
+ )
+ raise ProtocolError, 'bad open connection' unless next_method.is_a?(Protocol::Connection::OpenOk)
+
+ @channel = 1
+ send_frame(Protocol::Channel::Open.new)
+ raise ProtocolError, "cannot open channel #{channel}" unless next_method.is_a?(Protocol::Channel::OpenOk)
+
+ send_frame(
+ Protocol::Access::Request.new(:realm => '/data', :read => true, :write => true, :active => true, :passive => true)
+ )
+ method = next_method
+ raise ProtocolError, 'access denied' unless method.is_a?(Protocol::Access::RequestOk)
+ self.ticket = method.ticket
end
def send_frame(*args)
@@ -53,19 +79,50 @@ def send_frame(*args)
end
end
+ def next_frame
+ frame = Frame.get(self)
+ log :received, frame
+ frame
+ end
+
+ def next_method
+ next_payload
+ end
+
+ def next_payload
+ next_frame.payload
+ end
+
+ def close
+ send_frame(
+ Protocol::Channel::Close.new(:reply_code => 200, :reply_text => 'bye', :method_id => 0, :class_id => 0)
+ )
+ puts "Error closing channel #{channel}" unless next_method.is_a?(Protocol::Channel::CloseOk)
+
+ self.channel = 0
+ send_frame(
+ Protocol::Connection::Close.new(:reply_code => 200, :reply_text => 'Goodbye', :class_id => 0, :method_id => 0)
+ )
+ puts "Error closing connection" unless next_method.is_a?(Protocol::Connection::CloseOk)
+
+ close_socket
+ end
+
def read(*args)
- with_socket do |socket|
+ with_socket_management do |socket|
socket.read(*args)
end
end
def write(*args)
- with_socket do |socket|
+ with_socket_management do |socket|
socket.write(*args)
end
end
- def with_socket(&block)
+ private
+
+ def with_socket_management(&block)
retried = false
begin
mutex.lock if multithread?
@@ -75,6 +132,7 @@ def with_socket(&block)
if not retried
# Close the socket and retry once.
close_socket
+ #start_session
retried = true
retry
else
@@ -91,92 +149,6 @@ def with_socket(&block)
end
end
- def next_frame
- frame = Frame.get(self)
- log :received, frame
- frame
- end
-
- def receive_frame
- frame = next_frame
- return unless frame
-
- case frame
- when Frame::Header
- @header = frame.payload
- @body = ''
- receive_frame
-
- when Frame::Body
- @body << frame.payload
- if @body.length >= @header.size
- @header.properties.update(@method.arguments)
- @body = @header = @consumer = @method = nil
- end
-
- when Frame::Method
- case method = frame.payload
- when Protocol::Connection::Start
- send_frame(
- Protocol::Connection::StartOk.new(
- {:platform => 'Ruby', :product => 'Carrot', :information => 'http://github.com/famosagle/carrot', :version => VERSION},
- 'AMQPLAIN',
- {:LOGIN => @user, :PASSWORD => @pass},
- 'en_US'
- )
- )
- receive_frame
-
- when Protocol::Connection::Tune
- send_frame(
- Protocol::Connection::TuneOk.new( :channel_max => 0, :frame_max => 131072, :heartbeat => 0)
- )
- send_frame(
- Protocol::Connection::Open.new(:virtual_host => @vhost, :capabilities => '', :insist => @insist)
- )
- receive_frame
-
- when Protocol::Connection::Close
- STDERR.puts "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
-
- when Protocol::Connection::OpenOk
- self.channel = 1
- send_frame(Protocol::Channel::Open.new)
- receive_frame
-
- when Protocol::Channel::OpenOk
- send_frame(
- Protocol::Access::Request.new(:realm => '/data', :read => true, :write => true, :active => true, :passive => true)
- )
- receive_frame
-
- when Protocol::Access::RequestOk
- self.ticket = method.ticket
-
- when Protocol::Basic::CancelOk, Protocol::Queue::DeclareOk
-
- when Protocol::Channel::Close
- raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}"
-
- end
- end
- end
-
- def close
- send_frame(
- Protocol::Channel::Close.new(:reply_code => 200, :reply_text => 'bye', :method_id => 0, :class_id => 0)
- )
- next_frame
- self.channel = 0
- send_frame(
- Protocol::Connection::Close.new(:reply_code => 200, :reply_text => 'Goodbye', :class_id => 0, :method_id => 0)
- )
- next_frame
- close_socket
- end
-
- private
-
def socket
return @socket if @socket and not @socket.closed?
raise ServerDown, "will retry at #{retry_at}" unless retry?
@@ -203,6 +175,14 @@ def socket
@socket
end
+ def multithread?
+ @multithread
+ end
+
+ def retry?
+ @retry_at.nil? or @retry_at < Time.now
+ end
+
def unexpected_eof!
raise ConnectionError, 'unexpected end of file'
end
View
12 lib/carrot.rb
@@ -18,6 +18,18 @@ def self.logging?
end
class Error < StandardError; end
+ def self.queue(name, opts = {})
+ instance(opts).queue(name, opts)
+ end
+
+ def self.stop
+ instance.stop
+ end
+
+ def self.instance(opts = {})
+ @instance ||= new(opts)
+ end
+
def initialize(opts = {})
@server = AMQP::Server.new(opts)
end
View
13 lib/examples/simple_pop.rb
@@ -0,0 +1,13 @@
+require File.dirname(File.expand_path(__FILE__)) + '/../carrot'
+
+#Carrot.logging = true
+q = Carrot.queue('carrot', :durable => true)
+100.times do
+ q.publish('foo', :persistent => true)
+end
+puts "count: #{q.message_count}"
+while msg = q.pop(:ack => true)
+ puts msg
+ q.ack
+end
+Carrot.stop
Please sign in to comment.
Something went wrong with that request. Please try again.