Skip to content

Commit

Permalink
API reference docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Klishin committed Jan 18, 2013
1 parent 2512d84 commit 7cd8c7f
Showing 1 changed file with 99 additions and 3 deletions.
102 changes: 99 additions & 3 deletions lib/bunny/channel.rb
Expand Up @@ -184,6 +184,7 @@ def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))

# Opens the channel and resets its internal state
# @return [Bunny::Channel] Self
# @api public
def open
@threads_waiting_on_continuations = Set.new
@threads_waiting_on_confirms_continuations = Set.new
Expand All @@ -200,17 +201,20 @@ def open

# Closes the channel. Closed channels can no longer be used (this includes associated
# {Bunny::Queue}, {Bunny::Exchange} and {Bunny::Consumer} instances.
# @api public
def close
@connection.close_channel(self)
closed!
end

# @return [Boolean] true if this channel is open, false otherwise
# @api public
def open?
@status == :open
end

# @return [Boolean] true if this channel is closed (manually or because of an exception), false otherwise
# @api public
def closed?
@status == :closed
end
Expand Down Expand Up @@ -262,6 +266,7 @@ def frame_size
# @return [Bunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
# @api public
def fanout(name, opts = {})
Exchange.new(self, :fanout, name, opts)
end
Expand All @@ -279,6 +284,7 @@ def fanout(name, opts = {})
# @return [Bunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
# @api public
def direct(name, opts = {})
Exchange.new(self, :direct, name, opts)
end
Expand All @@ -296,6 +302,7 @@ def direct(name, opts = {})
# @return [Bunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
# @api public
def topic(name, opts = {})
Exchange.new(self, :topic, name, opts)
end
Expand All @@ -313,12 +320,14 @@ def topic(name, opts = {})
# @return [Bunny::Exchange] Exchange instance
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
# @api public
def headers(name, opts = {})
Exchange.new(self, :headers, name, opts)
end

# Provides access to the default exchange
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions to AMQP 0.9.1 guide
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @api public
def default_exchange
self.direct(AMQ::Protocol::EMPTY_STRING, :no_declare => true)
end
Expand Down Expand Up @@ -357,6 +366,7 @@ def exchange(name, opts = {})
#
# @return [Bunny::Queue] Queue that was declared or looked up in the cache
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})
q = find_queue(name) || Bunny::Queue.new(self, name, opts)

Expand All @@ -374,6 +384,7 @@ def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {})
# @param [Integer] prefetch_count Prefetch (QoS setting) for this channel
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def prefetch(prefetch_count)
self.basic_qos(prefetch_count, false)
end
Expand All @@ -382,11 +393,13 @@ def prefetch(prefetch_count)
# channel.
#
# @param [Boolean] active Should messages to consumers on this channel be delivered?
# @api public
def flow(active)
channel_flow(active)
end

# Tells RabbitMQ to redeliver unacknowledged messages
# @api public
def recover(ignored = true)
# RabbitMQ only supports basic.recover with requeue = true
basic_recover(true)
Expand All @@ -406,6 +419,7 @@ def recover(ignored = true)
# @see Bunny::Channel#ack
# @see Bunny::Channel#nack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def reject(delivery_tag, requeue = false)
basic_reject(delivery_tag, requeue)
end
Expand All @@ -416,6 +430,7 @@ def reject(delivery_tag, requeue = false)
# @param [Boolean] multiple (false) Should all unacknowledged messages up to this be acknowledged as well?
# @see Bunny::Channel#nack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def ack(delivery_tag, multiple = false)
basic_ack(delivery_tag, multiple)
end
Expand Down Expand Up @@ -470,6 +485,7 @@ def on_error(&block)
# @option opts [String] :app_id Optional application ID
#
# @return [Bunny::Channel] Self
# @api public
def basic_publish(payload, exchange, routing_key, opts = {})
raise_if_no_longer_open!

Expand Down Expand Up @@ -524,6 +540,7 @@ def basic_publish(payload, exchange, routing_key, opts = {})
# ch.acknowledge(delivery_info.delivery_tag)
# @see Bunny::Queue#pop
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_get(queue, opts = {:ack => true})
raise_if_no_longer_open!

Expand Down Expand Up @@ -553,6 +570,7 @@ def basic_get(queue, opts = {:ack => true})
# @param [AMQ::Protocol::Basic::QosOk] basic.qos-ok response
# @see Bunny::Channel#prefetch
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_qos(prefetch_count, global = false)
raise ArgumentError.new("prefetch count must be a positive integer, given: #{prefetch_count}") if prefetch_count < 0
raise_if_no_longer_open!
Expand All @@ -573,6 +591,7 @@ def basic_qos(prefetch_count, global = false)
#
# @param [Boolean] requeue Should messages be requeued?
# @return [AMQ::Protocol::Basic::RecoverOk] RabbitMQ response
# @api public
def basic_recover(requeue)
raise_if_no_longer_open!

Expand Down Expand Up @@ -622,6 +641,7 @@ def basic_recover(requeue)
#
# @see Bunny::Channel#basic_nack
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_reject(delivery_tag, requeue)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Reject.encode(@id, delivery_tag, requeue))
Expand Down Expand Up @@ -667,6 +687,7 @@ def basic_reject(delivery_tag, requeue)
# ch.basic_ack(delivery_info.delivery_tag, true)
#
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_ack(delivery_tag, multiple)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Ack.encode(@id, delivery_tag, multiple))
Expand Down Expand Up @@ -726,6 +747,7 @@ def basic_ack(delivery_tag, multiple)
#
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def basic_nack(delivery_tag, requeue, multiple = false)
raise_if_no_longer_open!
@connection.send_frame(AMQ::Protocol::Basic::Nack.encode(@id,
Expand All @@ -748,6 +770,7 @@ def basic_nack(delivery_tag, requeue, multiple = false)
#
# @return [AMQ::Protocol::Basic::ConsumeOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, exclusive = false, arguments = nil, &block)
raise_if_no_longer_open!
maybe_start_consumer_work_pool!
Expand Down Expand Up @@ -788,6 +811,7 @@ def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, e
#
# @return [AMQ::Protocol::Basic::ConsumeOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_consume_with(consumer)
raise_if_no_longer_open!
maybe_start_consumer_work_pool!
Expand Down Expand Up @@ -825,6 +849,7 @@ def basic_consume_with(consumer)
#
# @return [AMQ::Protocol::Basic::CancelOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def basic_cancel(consumer_tag)
@connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))

Expand Down Expand Up @@ -856,6 +881,7 @@ def basic_cancel(consumer_tag)
#
# @return [AMQ::Protocol::Queue::DeclareOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def queue_declare(name, opts = {})
raise_if_no_longer_open!

Expand Down Expand Up @@ -884,6 +910,7 @@ def queue_declare(name, opts = {})
#
# @return [AMQ::Protocol::Queue::DeleteOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def queue_delete(name, opts = {})
raise_if_no_longer_open!

Expand All @@ -906,6 +933,7 @@ def queue_delete(name, opts = {})
#
# @return [AMQ::Protocol::Queue::PurgeOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def queue_purge(name, opts = {})
raise_if_no_longer_open!

Expand All @@ -931,6 +959,7 @@ def queue_purge(name, opts = {})
# @return [AMQ::Protocol::Queue::BindOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see http://rubybunny.info/articles/bindings.html Bindings guide
# @api public
def queue_bind(name, exchange, opts = {})
raise_if_no_longer_open!

Expand Down Expand Up @@ -966,6 +995,7 @@ def queue_bind(name, exchange, opts = {})
# @return [AMQ::Protocol::Queue::UnbindOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @see http://rubybunny.info/articles/bindings.html Bindings guide
# @api public
def queue_unbind(name, exchange, opts = {})
raise_if_no_longer_open!

Expand Down Expand Up @@ -1006,6 +1036,7 @@ def queue_unbind(name, exchange, opts = {})
#
# @return [AMQ::Protocol::Exchange::DeclareOk] RabbitMQ response
# @see http://rubybunny.info/articles/echanges.html Exchanges and Publishing guide
# @api public
def exchange_declare(name, type, opts = {})
raise_if_no_longer_open!

Expand Down Expand Up @@ -1035,6 +1066,7 @@ def exchange_declare(name, type, opts = {})
#
# @return [AMQ::Protocol::Exchange::DeleteOk] RabbitMQ response
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @api public
def exchange_delete(name, opts = {})
raise_if_no_longer_open!

Expand All @@ -1050,6 +1082,21 @@ def exchange_delete(name, opts = {})
@last_exchange_delete_ok
end

# Binds an exchange to another exchange using exchange.bind AMQP 0.9.1 extension
# that RabbitMQ provides.
#
# @param [String] source Source exchange name
# @param [String] destination Destination exchange name
# @param [Hash] opts Options
#
# @option opts [String] routing_key (nil) Routing key used for binding
# @option opts [Hash] arguments ({}) Optional arguments
#
# @return [AMQ::Protocol::Exchange::BindOk] RabbitMQ response
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/bindings.html Bindings guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def exchange_bind(source, destination, opts = {})
raise_if_no_longer_open!

Expand Down Expand Up @@ -1079,6 +1126,21 @@ def exchange_bind(source, destination, opts = {})
@last_exchange_bind_ok
end

# Unbinds an exchange from another exchange using exchange.unbind AMQP 0.9.1 extension
# that RabbitMQ provides.
#
# @param [String] source Source exchange name
# @param [String] destination Destination exchange name
# @param [Hash] opts Options
#
# @option opts [String] routing_key (nil) Routing key used for binding
# @option opts [Hash] arguments ({}) Optional arguments
#
# @return [AMQ::Protocol::Exchange::UnbindOk] RabbitMQ response
# @see http://rubybunny.info/articles/exchanges.html Exchanges and Publishing guide
# @see http://rubybunny.info/articles/bindings.html Bindings guide
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def exchange_unbind(source, destination, opts = {})
raise_if_no_longer_open!

Expand Down Expand Up @@ -1114,6 +1176,16 @@ def exchange_unbind(source, destination, opts = {})

# @group Flow control (channel.*)

# Enables or disables message flow for the channel. When message flow is disabled,
# no new messages will be delivered to consumers on this channel. This is typically
# used by consumers that cannot keep up with the influx of messages.
#
# @note Recent (e.g. 2.8.x., 3.x) RabbitMQ will employ TCP/IP-level back pressure on publishers if it detects
# that consumers do not keep up with them.
#
# @return [AMQ::Protocol::Channel::FlowOk] RabbitMQ response
# @see http://rubybunny.info/articles/queues.html Queues and Consumers guide
# @api public
def channel_flow(active)
raise_if_no_longer_open!

Expand All @@ -1133,6 +1205,8 @@ def channel_flow(active)
# @group Transactions (tx.*)

# Puts the channel into transaction mode (starts a transaction)
# @return [AMQ::Protocol::Tx::SelectOk] RabbitMQ response
# @api public
def tx_select
raise_if_no_longer_open!

Expand All @@ -1146,6 +1220,8 @@ def tx_select
end

# Commits current transaction
# @return [AMQ::Protocol::Tx::CommitOk] RabbitMQ response
# @api public
def tx_commit
raise_if_no_longer_open!

Expand All @@ -1159,6 +1235,8 @@ def tx_commit
end

# Rolls back current transaction
# @return [AMQ::Protocol::Tx::RollbackOk] RabbitMQ response
# @api public
def tx_rollback
raise_if_no_longer_open!

Expand All @@ -1178,11 +1256,18 @@ def tx_rollback
# @group Publisher Confirms (confirm.*)

# @return [Boolean] true if this channel has Publisher Confirms enabled, false otherwise
# @api public
def using_publisher_confirmations?
@next_publish_seq_no > 0
end

# Enables publisher confirms
# Enables publisher confirms for the channel.
# @return [AMQ::Protocol::Confirm::SelectOk] RabbitMQ response
# @see #wait_for_confirms
# @see #unconfirmed_set
# @see #nacked_set
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def confirm_select(callback=nil)
raise_if_no_longer_open!

Expand All @@ -1204,7 +1289,14 @@ def confirm_select(callback=nil)
end

# Blocks calling thread until confirms are received for all
# currently unacknowledged published messages
# currently unacknowledged published messages.
#
# @return [Boolean] true if all messages were acknowledged positively, false otherwise
# @see #confirm_select
# @see #unconfirmed_set
# @see #nacked_set
# @see http://rubybunny.info/articles/extensions.html RabbitMQ Extensions guide
# @api public
def wait_for_confirms

wait_on_confirms_continuations
Expand Down Expand Up @@ -1238,6 +1330,8 @@ def generate_consumer_tag(name = "bunny")
# Recovery
#

# @group Network Failure Recovery

# Recovers basic.qos setting, exchanges, queues and consumers. Used by the Automatic Network Failure
# Recovery feature.
#
Expand Down Expand Up @@ -1296,6 +1390,8 @@ def recover_consumers
end
end

# @endgroup


#
# Implementation
Expand Down

0 comments on commit 7cd8c7f

Please sign in to comment.