Permalink
Switch branches/tags
Nothing to show
Find file
Fetching contributors…
Cannot retrieve contributors at this time
106 lines (75 sloc) 3.26 KB
###
http://stackoverflow.com/questions/8261654/messaging-confusion-pub-sub-vs-multicast-vs-fan-out
SIMPLE EXPLANATION
a message is routed by an exchange to any queues who are bound on that key.
- multiple queues get the message at once if they are bound the same
- multiple programs get messages balanced if they subscribe to the same queue
EXCHANGES
are the first step in the routing process. It decides which queues to put the message in
EXCHANGE TYPE
fanout: ignores binding ids and routing keys. Everyone bound to the exchange gets the message
topic: can do everything, but a little slower
- unique queue name: everyone bound to it will receive it
- common queue name: load balances between it
- routing keys: will match based on a pattern
direct: like topic, but doesn't do any * or #
QUEUES
exist outside the scope of the file. If you declare and bind a queue in your program, it will remain bound after it exits
if you rebind a queue of the same name, it will receive messages for BOTH patterns
QUEUE NAME
if unique (empty string), it acts like a fanout, because the queue name is unique, and multiple unique queues are bound to the same exchange/id pattern.
###
amqp = require 'amqp'
events = require 'events'
# would you ever have an publisher and subscriber in the same one?
# sure... why not?
# EXCHANGE TYPE
module.exports = hub = (optionsOrUrl, channel, cb) ->
options = if optionsOrUrl.length?
url = optionsOrUrl
url = if url.match "amqp" then url else "amqp://" + url
{url}
else optionsOrUrl
cb ?= ->
# you're going to need two kinds of exchanges here
# one for fanouts, and one for topics?
exchange = null
connection = amqp.createConnection options
connection.on 'error', (err) ->
if err? then throw new Error "Could not connect to RabbitMQ server at #{host}"
connection.on 'ready', ->
connection.exchange channel, {type: 'direct'}, (exc) ->
exchange = exc
connection.emit 'exchangeReady', exchange
cb null, newhub
# lets you get things ready before everything is all ready
getExchange = (cb) ->
if exchange? then return cb exchange
connection.on 'exchangeReady', cb
## PUB/SUB Events #######
# send an event into the system
newhub =
emit: (id, data) ->
getExchange (exchange) ->
exchange.publish id, data
# one of many handlers for an event. We want to create a new queue for this and bind to our id on it
# maybe when you bind an exchange, you have to bind it separately
on: (id, cb) ->
getExchange (exchange) ->
connection.queue '', (q) ->
q.bind exchange, id
q.subscribe (data) ->
cb data
# create a new job. make sure it's durable
job: (id, data) ->
getExchange (exchange) ->
exchange.publish id, data, {deliveryMode: 2}
# handle a job. Assume prefetch count is 1 for now? Will result in low CPU utilization. is it bad to just bind several workers?
worker: (id, cb) ->
getExchange (exchange) ->
options = {autoDelete: false, durable: true}
# creating a queue takes a bit longer if it isn't already created :)
connection.queue id, options, (q) ->
q.bind exchange, id
q.subscribe {ack: true, prefetchCount: 1}, (data) ->
cb data, -> q.shift()