-
Notifications
You must be signed in to change notification settings - Fork 0
Rabbit MQ
INTRODUCTION
Messaging: Integrating multiple applications so that they can work together and exchange information
Characteristics:
- Asynchronous
- Can be reliable
- Can be durable
- Routing
- Many message Formats
- Recipient pulls message from Queue
History
- 1986 - The Information Bus (TIB) developed
- 1993 - IBM MQSeries released
- 1997 - Microsoft MSMQ released
- 2001 - Java Messaging Service released
AMQP: Advanced Message Queue Prorocol
- Developed by JP Morgan Chase and iMatrix
- Open standard protocol for messaging
- Wire level protocol like HTTP
- Vendor agnostic protocol for messaging systems
http://www.amqp.org
AMQP History:
- 2003 - AMQP starts development at JP Morgan
- 2005 - AMQP working group formed
- 2006 - June AMQP 0-8 released. December AMQP 0-9 released.
- 2006 - Rabbit technologies formed. RabbitMQ first release
- 2011 - AMQP 1.0 released by AMQP working group
- 2012 - October, AMQP 1.0 approved as an OASIS standard
Key Concepts:
- Message Broker
- Exchanges - Direct, Fan-Out, Topic, Headers
- Queues
- Bindings
AMQP message brokers: noting protocol version is important
- RabbitMQ
- Windoes Azure Service Bus
- Apache Qpid
- Apache ActiveMQ
- SwiftMQ
- StormMQ
- WSO2 message broker
- OpenAMQ
RabbitMQ:
- Open source. Built on Erlang language
- Message Broker: AMQP 0.8 & 0.9
- Client Libraries & Community Libraries: .net/WCF, Java, Python, Ruby, Spring
- Protocol Gateways: XMPP, Stomp, Smtp, Http
Features:
- High Availability - through Mirroring
- Clustering - For Scale out
- Management - Plugin, WebUI, Command line tool
- Security - Virtual Hosts, Authentication & Permissions
Key API Classes
- IConnection - Connection to RabbitMQ using AMQP
- IModel - Channel to AMQP Server
- ConnectionFactory - Construct IConnection object
- QueueingBasicConsumer - To receive messages from Queue
- Protocols - Version of AMQP
Message is sent to an Exchange. And inturn those messages are redirected to the Queues which are bound to the Exchange.
- Sender sends message to Exchange
- Queues are configured and bound to the Exchange
- Receiver Listens message from Queues
Exchange & Queue Creation:
var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName, Password = _password };
var connection = connectionFactory.CreateConnection();
var _model = connection.CreateModel();
_model.ExchangeDeclare("MyExchange", ExchangeType.Direct);
_model.QueueDeclare("MyQueue", true, false, false, parameters);
_model.QueueBind("MyQueue", "MyExchange", "", null);
Sending a message:
var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName, Password = _password };
var connection = connectionFactory.CreateConnection();
var _model = connection.CreateModel();
var properties = _model.CreateBasicProperties();
properties.Persistent = true;
var messageBuffer = Encoding.Default.GetBytes("Hello World!");
_model.BasicPublish("MyExchange", "", properties, messageBuffer);
Consuming a message:
var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName, Password = _password };
var connection = connectionFactory.CreateConnection();
var _model = connection.CreateModel();
_model.BasicQos(0,1,false);
var consumer = new QueueingBasicConsumer(_model);
_model.BasicConsume(_queueName, false, consumer);
while (Enabled)
{
var deliveryArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var message = Encoding.Default.GetString(deliveryArgs.Body);
_model.BasicAck(deliveryArgs.DeliveryTag, false);
}
.
EXCHANGE PATTERNS
-
One Way messaging - Sender sends message, Receiver gets the message
-
Worker Queues - Sender sends message, multiple threads pull message from Queue - Load Balancer
-
Publish Subscribe - Sender sends message, multiple receiever will get messages
-
[RPC] Remote Procedure Call - Sender sends message and waits for the response from Receiver
ADVANCED EXCHANGE PATTERNS
Routing:
- Message is sent to an Exchange along with a Routing Key
- A copy of message sent to Queues which exactly match the routing key
- Each Queue will have a receiver processing messages
Topics
- Message sent to an exchange along with a routing key
- A copy of message sent to queues which match expressions against the routing key
- Each Queue will be processed by receivers
- '*' can replace one word
- '#' can replace zero or more words
Headers
- Message sent to an exchange alongincluding some headers
- A copy of the message is sent to queues which match the headers
- Each queue is then processed by receivers
- Match Type indicates if all or any header must match
Scatter Gather
-
Sender will start polling a response Queue
-
Sender will send a request message to an exchange
-
Message will be copied to queues with a matching binding
-
Receivers will be process messages and send responses to the response queue
-
Sender will get its collection of responses
MESSAGES AND DATA
Serialization
- Object => byte Array & vice-versa
Message Format
- Type of object that has been serialized: Json, XML, Binary etc..
- set the content type in the parameter properties.ContentType = contentType;
Message Identification
- Indicates the Message type, .net Fully Qualified name
- set the type in the parameter
properties.Type = messageType;
Interoperable
-
Should be technology agnostic
-
Objects should be created based on XSD ?
LARGE MESSAGES -
To send large messages from one application to another application
-
General Condition => Small Messages
-
AMQP supports very large messages
-
RabbitMQ also supports use of large messages
- Consideration: Performance Impact
- Memory utilization
- I/O
- Consideration: Performance Impact
Buffered Demo: Small/Medium files
- Send the message in a single call
Chunked Demo: Large files
-
Send the message in chunks of data through several call
-
Ensure that the link between the chunks are established - Can be done using Header properties
ERRORS
Basic Errors & Retry
- Error Scenarios - Exception processing a message - Poison message - A message we don't understand
- Error Handling - Reject and Requeue (Retry) _model.BasicReject(deliveryArgs.DeliveryTag, false); - Reject and Discard (Delete) _model.BasicReject(deliveryArgs.DeliveryTag, true);
Advanced Errors & Retry
- We can't retry forever
- We should retry (requeue) 'n' times and then reject (discard)
Dead Letter Queue:
- We reject messages but we don't just want to throw them away
- Store errors for review by Ops
- Advanced Error handling?
- Repair & Resubmit
In case of failures, route the messages to a Dead-Letter-Queue (Error) Queue. To track the error messages. Set the Dead-Letter-Exchange for the Actual Exchange by setting the parameters below:
var parameters = new Dictionary<string, object> {{"x-dead-letter-exchange", _deadLetterExchangeName}};
_model.QueueDeclare(queueName, _isDurable, false, false, parameters);
Routing Failures:
- When processing messages sometimes a message will not be subscribed to
- Routing Failure - Could be error - Could be a malicious message - Could be fixable
Route the messages which are not mapped to any Queues to a different Exchange. To track the unadressed messages. Alternate Exchange - Set the alternate Exchange property.
var parameters = new Dictionary<string, object> { { "alternate-exchange", _routingFailureExchangeName } };
_model.ExchangeDeclare(_exchangeName, _exchangeType, false, false, parameters);
ADVANCED TOPICS
Scheduled Delivery:
-
How can I publish a message but delay the processing of the message?.
-
Have two exchange, one for recieving the message (Holding Queue), after certain time route it to another exchange which will send to the Receiver Queue.
-
This can be done by having Dead-Letter-Exchange for the Holding Queue. And when a message is sent to the exchange, set the Expiration time property. Based on which the message is routed to another exchange.
-
The idea is that there will not be any receiver for the Holding Queue and so the message will be redirected to the Dead-Letter-Exchange but that is after the Expiration timeout that has been set in the message.
var parameters = new Dictionary<string, object> { { "alternate-exchange", _routingFailureExchangeName } };
_model.ExchangeDeclare(_exchangeName, _exchangeType, false, false, parameters);
var properties = _model.CreateBasicProperties();
properties.Persistent = true;
properties.Expiration = "5000";
var messageBuffer = Encoding.Default.GetBytes("Hello World!");
_model.BasicPublish("MyExchange", "", properties, messageBuffer);
Virtual Hosts:
- Can I seperate the messages in RabbitMQ into different SandBoxes. Reason:
- Application Isolation
- Security: Data Isolation
- Same Queue for both Consumers: But the data is seperate for each Consumer
How:
- Create different users for each Consumer
- Create virtual hosts for each Users created above
- Each Virtual Host can enable/disable the consumer access (Different User Permission)
Include Virtual Host when creating the Connection Factory
var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName,
Password = _password, VirtualHost = _virtualHost };
And the user & Host combination will ensure that the messages are segregated for the same Queue.