Skip to content

Rabbit MQ

Kota edited this page Sep 21, 2018 · 3 revisions
                                            INTRODUCTION

Messaging: Integrating multiple applications so that they can work together and exchange information

Characteristics: Asynchronous Can be reliable Can be durable Routing Many message Formats Recipient pulls message from Queue

History 1986 - The Information Bus (TIB) developed 1993 - IBM MQSeries released 1997 - Microsoft MSMQ released 2001 - Java Messaging Service released

AMQP: Advanced Message Queue Prorocol

  • Developed by JP Morgan Chase and iMatrix
  • Open standard protocol for messaging
  • Wire level protocol like HTTP
  • Vendor agnostic protocol for messaging systems http://www.amqp.org AMQP History: 2003 - AMQP starts development at JP Morgan 2005 - AMQP working group formed 2006 - June AMQP 0-8 released. December AMQP 0-9 released. 2006 - Rabbit technologies formed. RabbitMQ first release 2011 - AMQP 1.0 released by AMQP working group 2012 - October, AMQP 1.0 approved as an OASIS standard

Key Concepts: Message Broker Exchanges - Direct, Fan-Out, Topic, Headers Queues Bindings

AMQP message brokers: noting protocol version is important RabbitMQ Windoes Azure Service Bus Apache Qpid Apache ActiveMQ SwiftMQ StormMQ WSO2 message broker OpenAMQ

RabbitMQ:

  • Open source. Built on Erlang language Message Broker: AMQP 0.8 & 0.9 Client Libraries & Community Libraries: .net/WCF, Java, Python, Ruby, Spring Protocol Gateways: XMPP, Stomp, Smtp, Http

Features: High Availability - through Mirroring Clustering - For Scale out Management - Plugin, WebUI, Command line tool Security - Virtual Hosts, Authentication & Permissions

Key API Classes IConnection - Connection to RabbitMQ using AMQP IModel - Channel to AMQP Server ConnectionFactory - Construct IConnection object QueueingBasicConsumer - To receive messages from Queue Protocols - Version of AMQP

Message is sent to an Exchange. And inturn those messages are redirected to the Queues which are bound to the Exchange.

  • Sender sends message to Exchange
  • Queues are configured and bound to the Exchange
  • Receiver Listens message from Queues

Exchange & Queue Creation:

  var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName, Password = _password };
  var connection = connectionFactory.CreateConnection();
  var _model = connection.CreateModel();
  _model.ExchangeDeclare("MyExchange", ExchangeType.Direct);
  _model.QueueDeclare("MyQueue", true, false, false, parameters);
  _model.QueueBind("MyQueue", "MyExchange", "", null);

Sending a message:

  var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName, Password = _password };
  var connection = connectionFactory.CreateConnection();
  var _model = connection.CreateModel();
  var properties = _model.CreateBasicProperties();
  properties.Persistent = true;
  var messageBuffer = Encoding.Default.GetBytes("Hello World!");
  _model.BasicPublish("MyExchange", "", properties, messageBuffer);

Consuming a message:

  var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName, Password = _password };
  var connection = connectionFactory.CreateConnection();
  var _model = connection.CreateModel();
  _model.BasicQos(0,1,false);
  var consumer = new QueueingBasicConsumer(_model);
  _model.BasicConsume(_queueName, false, consumer);
  while (Enabled)
  {
      var deliveryArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
      var message = Encoding.Default.GetString(deliveryArgs.Body);
      _model.BasicAck(deliveryArgs.DeliveryTag, false);
  }

.
EXCHANGE PATTERNS

One Way messaging - Sender sends message, Receiver gets the message

Worker Queues - Sender sends message, multiple threads pull message from Queue - Load Balancer

Publish Subscribe - Sender sends message, multiple receiever will get messages

[RPC] Remote Procedure Call - Sender sends message and waits for the response from Receiver

                                            ADVANCED EXCHANGE PATTERNS

Routing:

  • Message is sent to an Exchange along with a Routing Key
  • A copy of message sent to Queues which exactly match the routing key
  • Each Queue will have a receiver processing messages

Topics

  • Message sent to an exchange along with a routing key
  • A copy of message sent to queues which match expressions against the routing key
  • Each Queue will be processed by receivers
    • can replace one word
  • can replace zero or more words

Headers

  • Message sent to an exchange alongincluding some headers
  • A copy of the message is sent to queues which match the headers
  • Each queue is then processed by receivers
  • Match Type indicates if all or any header must match

Scatter Gather

  • Sender will start polling a response Queue

  • Sender will send a request message to an exchange

  • Message will be copied to queues with a matching binding

  • Receivers will be process messages and send responses to the response queue

  • Sender will get its collection of responses

                                              MESSAGES AND DATA
    

Serialization

  • Object => byte Array & vice-versa

Message Format

  • Type of object that has been serialized: Json, XML, Binary etc..
  • set the content type in the parameter properties.ContentType = contentType;

Message Identification

  • Indicates the Message type, .net Fully Qualified name
  • set the type in the parameter properties.Type = messageType;

Interoperable

  • Should be technology agnostic

  • Objects should be created based on XSD ?

                                              LARGE MESSAGES
    

To send large messages from one application to another application General Condition => Small Messages AMQP supports very large messages RabbitMQ also supports use of large messages Consideration: Performance Impact - Memory utilization - I/O

Buffered Demo: Small/Medium files

  • Send the message in a single call

Chunked Demo: Large files

  • Send the message in chunks of data through several call

  • Ensure that the link between the chunks are established - Can be done using Header properties

                                              ERRORS
    

Errors & Retries:

Basic Errors & Retry

  • Error Scenarios - Exception processing a message - Poison message - A message we don't understand
  • Error Handling - Reject and Requeue (Retry) _model.BasicReject(deliveryArgs.DeliveryTag, false); - Reject and Discard (Delete) _model.BasicReject(deliveryArgs.DeliveryTag, true);

Advanced Errors & Retry

  • We can't retry forever
  • We should retry (requeue) 'n' times and then reject (discard)

Dead Letter Queue:

  • We reject messages but we don't just want to throw them away
  • Store errors for review by Ops
  • Advanced Error handling?
  • Repair & Resubmit

In case of failures, route the messages to a Dead-Letter-Queue (Error) Queue. To track the error messages. Set the Dead-Letter-Exchange for the Actual Exchange by setting the parameters below:

  var parameters = new Dictionary<string, object> {{"x-dead-letter-exchange", _deadLetterExchangeName}};
  _model.QueueDeclare(queueName, _isDurable, false, false, parameters);

Routing Failures:

  • When processing messages sometimes a message will not be subscribed to
  • Routing Failure - Could be error - Could be a malicious message - Could be fixable

Route the messages which are not mapped to any Queues to a different Exchange. To track the unadressed messages. Alternate Exchange - Set the alternate Exchange property.

  var parameters = new Dictionary<string, object> { { "alternate-exchange", _routingFailureExchangeName } };
  _model.ExchangeDeclare(_exchangeName, _exchangeType, false, false, parameters);

.

                                            ADVANCED TOPICS

Scheduled Delivery: How can I publish a message but delay the processing of the message?. Have two exchange, one for recieving the message (Holding Queue), after certain time route it to another exchange which will send to the Receiver Queue.

This can be done by having Dead-Letter-Exchange for the Holding Queue. And when a message is sent to the exchange, set the Expiration time property. Based on which the message is routed to another exchange.

The idea is that there will not be any receiver for the Holding Queue and so the message will be redirected to the Dead-Letter-Exchange but that is after the Expiration timeout that has been set in the message.

  var parameters = new Dictionary<string, object> { { "alternate-exchange", _routingFailureExchangeName } };
  _model.ExchangeDeclare(_exchangeName, _exchangeType, false, false, parameters);
  var properties = _model.CreateBasicProperties();
  properties.Persistent = true;
  properties.Expiration = "5000";
  var messageBuffer = Encoding.Default.GetBytes("Hello World!");
  _model.BasicPublish("MyExchange", "", properties, messageBuffer);

Virtual Hosts: Can I seperate the messages in RabbitMQ into different SandBoxes. Reason:

  • Application Isolation
  • Security: Data Isolation
  • Same Queue for both Consumers: But the data is seperate for each Consumer

How:

  • Create different users for each Consumer
  • Create virtual hosts for each Users created above
  • Each Virtual Host can enable/disable the consumer access (Different User Permission)

Include Virtual Host when creating the Connection Factory

  var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName, 
                                                  Password = _password, VirtualHost = _virtualHost };

And the user & Host combination will ensure that the messages are segregated for the same Queue.

Clone this wiki locally