Skip to content

Commit

Permalink
Add support for headers exchange type.
Browse files Browse the repository at this point in the history
Signed-off-by: Aman Gupta <aman@tmm1.net>
  • Loading branch information
careo authored and tmm1 committed Jan 17, 2009
1 parent adcd681 commit b28b215
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 1 deletion.
34 changes: 34 additions & 0 deletions examples/mq/multiformat_clock.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
$:.unshift File.dirname(__FILE__) + '/../../lib'
require 'mq'
require 'time'
EM.run{

def log *args
p args
end

#AMQP.logging = true

clock = MQ.new.headers('multiformat_clock')
EM.add_periodic_timer(1){
puts

time = Time.new
["iso8601","rfc2822"].each do |format|
formatted_time = time.send(format)
log :publish, format, formatted_time
clock.publish "#{formatted_time}", :headers => {"format" => format}
end
}

["iso8601","rfc2822"].each do |format|
amq = MQ.new
amq.queue(format.to_s).bind(amq.headers('multiformat_clock'), :arguments => {"format" => format}).subscribe{ |time|
log "received #{format}", time
}
end


}

__END__
82 changes: 81 additions & 1 deletion lib/mq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,87 @@ def fanout name = 'amq.fanout', opts = {}
def topic name = 'amq.topic', opts = {}
exchanges[name] ||= Exchange.new(self, :topic, name, opts)
end


# Defines, intializes and returns an Exchange to act as an ingress
# point for all published messages.
#
# == Headers
# A headers exchange allows for messages to be published to an exchange
#
# Any published message, regardless of its persistence setting, is thrown
# away by the exchange when there are no queues bound to it.
#
# As part of the AMQP standard, each server _should_ predeclare a headers
# exchange called 'amq.match' (this is not required by the standard).
# Allocating this exchange without a name _or_ with the empty string
# will use the internal 'amq.match' exchange.
#
# TODO: The classic example is ...
#
# When publishing data to the exchange, bound queues subscribing to the
# exchange indicate which data interests them by passing arguments
# for matching against the headers in published messages. The
# form of the matching can be controlled by the 'x-match' argument, which
# may be 'any' or 'all'. If unspecified (in RabbitMQ at least), it defaults
# to "all".
#
# A value of 'all' for 'x-match' implies that all values must match (i.e.
# it does an AND of the headers ), while a value of 'any' implies that
# at least one should match (ie. it does an OR).
#
# TODO: document behavior when either the binding or the message is missing
# a header present in the other
#
# TODO: insert example
#
# == Options
# * :passive => true | false (default false)
# If set, the server will not create the exchange if it does not
# already exist. The client can use this to check whether an exchange
# exists without modifying the server state.
#
# * :durable => true | false (default false)
# If set when creating a new exchange, the exchange will be marked as
# durable. Durable exchanges remain active when a server restarts.
# Non-durable exchanges (transient exchanges) are purged if/when a
# server restarts.
#
# A transient exchange (the default) is stored in memory-only. The
# exchange and all bindings will be lost on a server restart.
# It makes no sense to publish a persistent message to a transient
# exchange.
#
# Durable exchanges and their bindings are recreated upon a server
# restart. Any published messages not routed to a bound queue are lost.
#
# * :auto_delete => true | false (default false)
# If set, the exchange is deleted when all queues have finished
# using it. The server waits for a short period of time before
# determining the exchange is unused to give time to the client code
# to bind a queue to it.
#
# If the exchange has been previously declared, this option is ignored
# on subsequent declarations.
#
# * :internal => true | false (default false)
# If set, the exchange may not be used directly by publishers, but
# only when bound to other exchanges. Internal exchanges are used to
# construct wiring that is not visible to applications.
#
# * :nowait => true | false (default true)
# If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
# == Exceptions
# Doing any of these activities are illegal and will raise MQ:Error.
# * redeclare an already-declared exchange to a different type
# * :passive => true and the exchange does not exist (NOT_FOUND)
# * using a value other than "any" or "all" for "x-match"
def headers name = 'amq.match', opts = {}
exchanges[name] ||= Exchange.new(self, :headers, name, opts)
end

# Queues store and forward messages. Queues can be configured in the server
# or created at runtime. Queues must be attached to at least one exchange
# in order to receive messages from publishers.
Expand Down

0 comments on commit b28b215

Please sign in to comment.