Skip to content

Rabbit MQ

Sandesh Kota edited this page Mar 21, 2019 · 3 revisions
                                            INTRODUCTION

Messaging: Integrating multiple applications so that they can work together and exchange information

Characteristics:

  • Asynchronous
  • Can be reliable
  • Can be durable
  • Routing
  • Many message Formats
  • Recipient pulls message from Queue

History

  • 1986 - The Information Bus (TIB) developed
  • 1993 - IBM MQSeries released
  • 1997 - Microsoft MSMQ released
  • 2001 - Java Messaging Service released

AMQP: Advanced Message Queue Prorocol

  • Developed by JP Morgan Chase and iMatrix
  • Open standard protocol for messaging
  • Wire level protocol like HTTP
  • Vendor agnostic protocol for messaging systems
    http://www.amqp.org

AMQP History:

  • 2003 - AMQP starts development at JP Morgan
  • 2005 - AMQP working group formed
  • 2006 - June AMQP 0-8 released. December AMQP 0-9 released.
  • 2006 - Rabbit technologies formed. RabbitMQ first release
  • 2011 - AMQP 1.0 released by AMQP working group
  • 2012 - October, AMQP 1.0 approved as an OASIS standard

Key Concepts:

  • Message Broker
  • Exchanges - Direct, Fan-Out, Topic, Headers
  • Queues
  • Bindings

AMQP message brokers: noting protocol version is important

  • RabbitMQ
  • Windoes Azure Service Bus
  • Apache Qpid
  • Apache ActiveMQ
  • SwiftMQ
  • StormMQ
  • WSO2 message broker
  • OpenAMQ

RabbitMQ:

  • Open source. Built on Erlang language
  • Message Broker: AMQP 0.8 & 0.9
  • Client Libraries & Community Libraries: .net/WCF, Java, Python, Ruby, Spring
  • Protocol Gateways: XMPP, Stomp, Smtp, Http

Features:

  • High Availability - through Mirroring
  • Clustering - For Scale out
  • Management - Plugin, WebUI, Command line tool
  • Security - Virtual Hosts, Authentication & Permissions

Key API Classes

  • IConnection - Connection to RabbitMQ using AMQP
  • IModel - Channel to AMQP Server
  • ConnectionFactory - Construct IConnection object
  • QueueingBasicConsumer - To receive messages from Queue
  • Protocols - Version of AMQP

Message is sent to an Exchange. And inturn those messages are redirected to the Queues which are bound to the Exchange.

  • Sender sends message to Exchange
  • Queues are configured and bound to the Exchange
  • Receiver Listens message from Queues

Exchange & Queue Creation:

  var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName, Password = _password };
  var connection = connectionFactory.CreateConnection();
  var _model = connection.CreateModel();
  _model.ExchangeDeclare("MyExchange", ExchangeType.Direct);
  _model.QueueDeclare("MyQueue", true, false, false, parameters);
  _model.QueueBind("MyQueue", "MyExchange", "", null);

Sending a message:

  var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName, Password = _password };
  var connection = connectionFactory.CreateConnection();
  var _model = connection.CreateModel();
  var properties = _model.CreateBasicProperties();
  properties.Persistent = true;
  var messageBuffer = Encoding.Default.GetBytes("Hello World!");
  _model.BasicPublish("MyExchange", "", properties, messageBuffer);

Consuming a message:

  var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName, Password = _password };
  var connection = connectionFactory.CreateConnection();
  var _model = connection.CreateModel();
  _model.BasicQos(0,1,false);
  var consumer = new QueueingBasicConsumer(_model);
  _model.BasicConsume(_queueName, false, consumer);
  while (Enabled)
  {
      var deliveryArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
      var message = Encoding.Default.GetString(deliveryArgs.Body);
      _model.BasicAck(deliveryArgs.DeliveryTag, false);
  }

.
EXCHANGE PATTERNS

  • One Way messaging - Sender sends message, Receiver gets the message

  • Worker Queues - Sender sends message, multiple threads pull message from Queue - Load Balancer

  • Publish Subscribe - Sender sends message, multiple receiever will get messages

  • [RPC] Remote Procedure Call - Sender sends message and waits for the response from Receiver

                                              ADVANCED EXCHANGE PATTERNS
    

Routing:

  • Message is sent to an Exchange along with a Routing Key
  • A copy of message sent to Queues which exactly match the routing key
  • Each Queue will have a receiver processing messages

Topics

  • Message sent to an exchange along with a routing key
  • A copy of message sent to queues which match expressions against the routing key
  • Each Queue will be processed by receivers
    • can replace one word
  • can replace zero or more words

Headers

  • Message sent to an exchange alongincluding some headers
  • A copy of the message is sent to queues which match the headers
  • Each queue is then processed by receivers
  • Match Type indicates if all or any header must match

Scatter Gather

  • Sender will start polling a response Queue

  • Sender will send a request message to an exchange

  • Message will be copied to queues with a matching binding

  • Receivers will be process messages and send responses to the response queue

  • Sender will get its collection of responses

                                              MESSAGES AND DATA
    

Serialization

  • Object => byte Array & vice-versa

Message Format

  • Type of object that has been serialized: Json, XML, Binary etc..
  • set the content type in the parameter properties.ContentType = contentType;

Message Identification

  • Indicates the Message type, .net Fully Qualified name
  • set the type in the parameter properties.Type = messageType;

Interoperable

  • Should be technology agnostic

  • Objects should be created based on XSD ?

                                              LARGE MESSAGES
    
  • To send large messages from one application to another application

  • General Condition => Small Messages

  • AMQP supports very large messages

  • RabbitMQ also supports use of large messages

    • Consideration: Performance Impact
      • Memory utilization
      • I/O

Buffered Demo: Small/Medium files

  • Send the message in a single call

Chunked Demo: Large files

  • Send the message in chunks of data through several call

  • Ensure that the link between the chunks are established - Can be done using Header properties

                                              ERRORS
    

Basic Errors & Retry

  • Error Scenarios - Exception processing a message - Poison message - A message we don't understand
  • Error Handling - Reject and Requeue (Retry) _model.BasicReject(deliveryArgs.DeliveryTag, false); - Reject and Discard (Delete) _model.BasicReject(deliveryArgs.DeliveryTag, true);

Advanced Errors & Retry

  • We can't retry forever
  • We should retry (requeue) 'n' times and then reject (discard)

Dead Letter Queue:

  • We reject messages but we don't just want to throw them away
  • Store errors for review by Ops
  • Advanced Error handling?
  • Repair & Resubmit

In case of failures, route the messages to a Dead-Letter-Queue (Error) Queue. To track the error messages. Set the Dead-Letter-Exchange for the Actual Exchange by setting the parameters below:

      var parameters = new Dictionary<string, object> {{"x-dead-letter-exchange", _deadLetterExchangeName}};
      _model.QueueDeclare(queueName, _isDurable, false, false, parameters);

Routing Failures:

  • When processing messages sometimes a message will not be subscribed to
  • Routing Failure - Could be error - Could be a malicious message - Could be fixable

Route the messages which are not mapped to any Queues to a different Exchange. To track the unadressed messages. Alternate Exchange - Set the alternate Exchange property.

      var parameters = new Dictionary<string, object> { { "alternate-exchange", _routingFailureExchangeName } };
      _model.ExchangeDeclare(_exchangeName, _exchangeType, false, false, parameters);
                                            ADVANCED TOPICS

Scheduled Delivery:

  • How can I publish a message but delay the processing of the message?.

  • Have two exchange, one for recieving the message (Holding Queue), after certain time route it to another exchange which will send to the Receiver Queue.

  • This can be done by having Dead-Letter-Exchange for the Holding Queue. And when a message is sent to the exchange, set the Expiration time property. Based on which the message is routed to another exchange.

  • The idea is that there will not be any receiver for the Holding Queue and so the message will be redirected to the Dead-Letter-Exchange but that is after the Expiration timeout that has been set in the message.

      var parameters = new Dictionary<string, object> { { "alternate-exchange", _routingFailureExchangeName } };
      _model.ExchangeDeclare(_exchangeName, _exchangeType, false, false, parameters);
      var properties = _model.CreateBasicProperties();
      properties.Persistent = true;
      properties.Expiration = "5000";
      var messageBuffer = Encoding.Default.GetBytes("Hello World!");
      _model.BasicPublish("MyExchange", "", properties, messageBuffer);

Virtual Hosts:

  • Can I seperate the messages in RabbitMQ into different SandBoxes. Reason:
  • Application Isolation
  • Security: Data Isolation
  • Same Queue for both Consumers: But the data is seperate for each Consumer

How:

  • Create different users for each Consumer
  • Create virtual hosts for each Users created above
  • Each Virtual Host can enable/disable the consumer access (Different User Permission)

Include Virtual Host when creating the Connection Factory

      var connectionFactory = new ConnectionFactory { HostName = _hostName, UserName = _userName, 
                                                      Password = _password, VirtualHost = _virtualHost };

And the user & Host combination will ensure that the messages are segregated for the same Queue.

Clone this wiki locally