Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer doesn't get message, sent before instantiation #47

Closed
theosem opened this issue Jan 16, 2013 · 3 comments
Closed

consumer doesn't get message, sent before instantiation #47

theosem opened this issue Jan 16, 2013 · 3 comments

Comments

@theosem
Copy link

theosem commented Jan 16, 2013

I am evaluating a SimpleAmqpClient based cosumer example program (found from a previous issue).
The problem is that when I first send the message and then run the consumer, the consumer is not receiving anything. However, if I have the consumer running, then a new message sent to the exchange appears in the consumer.

following is the actual code:

#include <SimpleAmqpClient/SimpleAmqpClient.h>

#include <iostream>
#include <fstream>

using namespace AmqpClient;

class ConnectionData{
public:
    std::string host;
    int port;
    std::string user;
    std::string password;
    std::string vHost;
    std::string routingKey;
    std::string exchange;

}theConnectionData;

void main()
{

    theConnectionData.host = "127.0.0.1";
    theConnectionData.port = 5672;
    theConnectionData.user = "guest";
    theConnectionData.password = "guest";
    theConnectionData.vHost = "/";
    theConnectionData.routingKey = "";
    theConnectionData.exchange = "indexer";


  std::ofstream file( "Log.txt" );
  file << "Listener" << std::endl;
  file << "\t" << "host: " << theConnectionData.host << std::endl;
  file << "\t" << "port: " << theConnectionData.port << std::endl;
  file << "\t" << "user: " << theConnectionData.user << std::endl;
  file << "\t" << "password: " << theConnectionData.password << std::endl;
  file << "\t" << "vHost: " << theConnectionData.vHost << std::endl;
  file << "\t" << "routingKey: " << theConnectionData.routingKey << std::endl;
  file.flush();

  try
  {
   file << "AmqpClient::Channel::Create" << std::endl;
   file.flush();
   AmqpClient::Channel::ptr_t channelPtr = AmqpClient::Channel::Create();// theConnectionData.host, theConnectionData.port, theConnectionData.user, theConnectionData.password, theConnectionData.vHost);

    file << "DeclareQueue" << std::endl;
    file.flush();
    std::string queueName = channelPtr->DeclareQueue("");
    file << "\t" << "queue: " << queueName << std::endl;
    file.flush();

    file << "BindQueue" << std::endl;
    file.flush();
    channelPtr->BindQueue(queueName, theConnectionData.exchange, theConnectionData.routingKey );

    file << "BasicConsume" << std::endl;
    file.flush();
    std::string consumer_tag  =   channelPtr->BasicConsume( queueName, "" );
    file << "\t" << "tag: " << consumer_tag << std::endl;
    file.flush();

    /**
    **Fair dispatch**
    You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.
    This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.
    In order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
*/
    //int prefetchCount = 1;
    //channelPtr->BasicQos(consumer_tag,prefetchCount);

    file << "BasicConsumeMessage" << std::endl;
    file.flush();
    AmqpClient::Envelope::ptr_t envelopePtr =  channelPtr->BasicConsumeMessage( consumer_tag );
    file << "\t" << "body: " << envelopePtr->Message()->Body() << std::endl;
    file.flush();

    file << "BasicCancel" << std::endl;
    file.flush();
    channelPtr->BasicCancel( consumer_tag );
  }
  catch ( AmqpClient::AmqpException const& exception )
  {
    file << "Exception: " << typeid( exception ).name() << std::endl;
    file << "\t" << "what: " << exception.what() << std::endl;
    file << "\t" << "is_soft_error: " << exception.is_soft_error() << std::endl;
    file << "\t" << "reply_code: " << exception.reply_code() << std::endl;
    file << "\t" << "class_id: " << exception.class_id() << std::endl;
    file << "\t" << "method_id: " << exception.method_id() << std::endl;
    file << "\t" << "reply_text: " << exception.reply_text() << std::endl;
  }
  catch ( std::exception const& exception )
  {
    file << "Exception: " << typeid( exception ).name() << std::endl;
    file << "\t" << "what: " << exception.what() << std::endl;
  }
  catch ( ... )
  {
    file << "Exception: ..." << std::endl;
  }
}
@theosem
Copy link
Author

theosem commented Jan 16, 2013

As a producer for the messages I use this python producer https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/python/emit_log.py with the correct exchange name

@alanxz
Copy link
Owner

alanxz commented Jan 16, 2013

When you publish a message to an exchange, the broker tries to route the message to a queue. If message can't be routed to a queue, the message is simply dropped.

In your case, when your consumer program isn't running, there is no queue declared to store the message, so the broker simply drops the message.

If you haven't already, I'd suggest working through some of the tutorials here:
http://www.rabbitmq.com/getstarted.html

@alanxz alanxz closed this as completed Jan 16, 2013
@theosem
Copy link
Author

theosem commented Jan 17, 2013

OK, sorry for this. I thought that the messages remain in the broker until a consumer gets them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants