Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using direct reply-to for RPC calls #259

Closed
facundoolano opened this issue Jun 24, 2016 · 19 comments
Closed

Using direct reply-to for RPC calls #259

facundoolano opened this issue Jun 24, 2016 · 19 comments

Comments

@facundoolano
Copy link

I’ve been trying to implement RPC in a project without having to create a new connection, channel and reply queue per RPC request (since that obviously performed poorly). Most examples out there show you one-off calls so there isn’t much guide on how to do it.

Reusing the connection is easy, but I attempted several ways of reusing the channel and the reply-to queue (relying on a correlationId to distinguish consumers) and hit different issues every time.

After reading about the direct reply-to feature that uses a pseudo queue, I settled to that since it proved to have the best performance. To make it work with this lib I still had to use a new channel per request, otherwise I saw the error PRECONDITION_FAILED - reply consumer already set when trying to consume from the amq.rabbitmq.reply-to pseudo queue.

I wonder if I see that error because indeed the proper way to use the direct reply-to feature is with separate channels per request, or the problem is that this lib is not prepared to handle that feature (or maybe I’m just using it wrong).

Thanks!

@michaelklishin
Copy link

There is no need for client library support for direct reply-to. Please post your code, it's impossible to suggest much without it.

@facundoolano
Copy link
Author

Ok, here's the client code. I call createClient once per process to create the connection, and then use it in sendRPCMessage for every RPC request.

This is the version of the code that's working, where I create a new channel per request:

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)

const sendRPCMessage = (client, message, rpcQueue) => conn.createChannel()
  .then((channel) => new Promise((resolve, reject) => {
    const replyToQueue = 'amq.rabbitmq.reply-to';
    const timeout = setTimeout(() => channel.close(), 10000);

    const correlationId = uuid.v4();
    const msgProperties = {
      correlationId,
      replyTo: replyToQueue
    };

    function consumeAndReply (msg) {
      if (!msg) return reject(Error.create('consumer cancelled by rabbitmq'));

      if (msg.properties.correlationId === correlationId) {
        resolve(msg.content);
        clearTimeout(timeout);
        channel.close();
      }
    }

    channel.consume(replyToQueue, consumeAndReply, {noAck: true})
    .then(() => channel.sendToQueue(rpcQueue, new Buffer(content), msgProperties))
  });

The version of the code that's NOT working is when I try to use a single channel per process (the call fails with PRECONDITION_FAILED - reply consumer already set):

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)
  .then((conn) => conn.createChannel())

const sendRPCMessage = (channel, message, rpcQueue) => new Promise((resolve, reject) => {
    const replyToQueue = 'amq.rabbitmq.reply-to';
    const correlationId = uuid.v4();
    const msgProperties = {
      correlationId,
      replyTo: replyToQueue
    };

    function consumeAndReply (msg) {
      if (!msg) return reject(Error.create('consumer cancelled by rabbitmq'));

      if (msg.properties.correlationId === correlationId) {
        resolve(msg.content);
      }
    }

    channel.consume(replyToQueue, consumeAndReply, {noAck: true})
    .then(() => channel.sendToQueue(rpcQueue, new Buffer(content), msgProperties))
  });
}

As said, I'm not sure if that second one is the wrong way to do direct reply-to (or RPC altogether), I just wanted to check before settling to using a bunch of channels just because it worked.

@squaremo
Copy link
Collaborator

squaremo commented Jul 3, 2016

Reading the description of the RabbitMQ feature, I don't think there's any reason it won't work with amqplib. I don't think there's any need to create a new channel each time.

Looking at the second example of code, I think the mistake is to consume each time you send an RPC.

What you want to do is consume once, and simply send a message for each RPC. If you are concerned that replies can come out of order, you should only send one RPC at a time on a channel -- or you can keep a queue of correlation IDs, and reorder when messages come in.

@facundoolano
Copy link
Author

Sending a single RPC at a time doesn't sound like an option for me, at least not with a single channel per client (I need the client to be able to handle multiple concurrent messages, otherwise I'd be introducing a bottleneck in my platform).

If I understand you correctly, to consume only once I'd need to introduce some sort of structure or maybe an event emitter, to be able to route each RPC response to the specific promise resolve that's expecting it. I'll give it some thought but I'll probably defer the added complexity until I have evidence that the current one channel per request approach is not good enough (btw please let me know if creating so many channels is a terrible idea for some reason).

@squaremo
Copy link
Collaborator

squaremo commented Jul 3, 2016

one channel per request approach is not good enough (btw please let me know if creating so many channels is a terrible idea for some reason).

Opening a channel per request is kind of an anti-pattern. You'd be better off having a channel per requesting thread-of-control -- i.e., for anything that needs answers to proceed -- or using a pool of channels. Extra complexity, I know.

@facundoolano
Copy link
Author

I understand, thanks for your answers. I actually like how it's looking with a single consumer and an event emitter to route responses:

const REPLY_QUEUE = 'amq.rabbitmq.reply-to';

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)
  .then((conn) => conn.createChannel())
  .then((channel) => {
    // create an event emitter where rpc responses will be published by correlationId
    channel.responseEmitter = new EventEmitter();
    channel.responseEmitter.setMaxListeners(0);
    channel.consume(REPLY_QUEUE,
      (msg) => channel.responseEmitter.emit(msg.properties.correlationId, msg.content),
      {noAck: true});

    return channel;
  });

const sendRPCMessage = (channel, message, rpcQueue) => new Promise((resolve) => {
  const correlationId = uuid.v4();
  // listen for the content emitted on the correlationId event
  channel.responseEmitter.once(correlationId, resolve);
  channel.sendToQueue(rpcQueue, new Buffer(message), { correlationId, replyTo: REPLY_QUEUE })
});

It seems to be working fine, I'll test it for a while and see how it goes.

@squaremo
Copy link
Collaborator

squaremo commented Jul 3, 2016

Cool. For the sake of completeness: another way, since you're already using promises, would be to keep a map of correlationID->promise.

@eltoro
Copy link

eltoro commented Aug 25, 2016

Sorry if I'm not understanding correctly but is this async? If I'm sending multiple calls will this block the client?

@squaremo
Copy link
Collaborator

@eltoro Yes, it's async; the responses will queue up for a consumer to collect.

@Voles
Copy link

Voles commented Nov 12, 2016

@facundoolano thanks for your example code!

Am I right that the worker still needs to send a response to the queue (amq.rabbitmq.reply-to)? Eg.

server.js

channel.sendToQueue(
  'amq.rabbitmq.reply-to',
  new Buffer(''),
  {correlationId: message.properties.correlationId}
);

@facundoolano
Copy link
Author

@Voles that's right. But I think it's better to let the client tell the worker where to reply, that's why the initial message includes a replyTo: REPLY_QUEUE along with the correlationId.

@Voles
Copy link

Voles commented Nov 12, 2016

@facundoolano I agree. Thanks for your very quick response!

@RP-3
Copy link

RP-3 commented Mar 31, 2017

@facundoolano If I had multiple clients, would the the direct reply-to queue guarantee that the response will be sent to the client that sent the message in the first place? BTW I really like your event-emitter solution!

@facundoolano
Copy link
Author

@facundoolano If I had multiple clients, would the the direct reply-to queue guarantee that the response will be sent to the client that sent the message in the first place

Yes, that's what direct reply-to does.

BTW I'm closing this issue since it was solved by following @squaremo suggestions.

@anhcao142
Copy link

anhcao142 commented Nov 18, 2017

UPDATE: I checked, and it seems like the problem caused because my rabbitmq version was 3.2.4. Is this method only work in version 3.4 or later?

Hi, I just implement the code like the one @facundoolano suggested and encounter this problem NOT_FOUND - no queue 'amq.rabbitmq.reply-to' in vhost '/'

When I tried to create this queue amq.rabbitmq.reply-to manually using the management site, It told me that the queue's name contains reserved prefix amq. Do you guy has any idea why this happens? It seems like the server treats this queue like a normal queue.

The code is exactly like this

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)

const sendRPCMessage = (client, message, rpcQueue) => conn.createChannel()
  .then((channel) => new Promise((resolve, reject) => {
    const replyToQueue = 'amq.rabbitmq.reply-to';
    const timeout = setTimeout(() => channel.close(), 10000);

    const correlationId = uuid.v4();
    const msgProperties = {
      correlationId,
      replyTo: replyToQueue
    };

    function consumeAndReply (msg) {
      if (!msg) return reject(Error.create('consumer cancelled by rabbitmq'));

      if (msg.properties.correlationId === correlationId) {
        resolve(msg.content);
        clearTimeout(timeout);
        channel.close();
      }
    }

    channel.consume(replyToQueue, consumeAndReply, {noAck: true})
    .then(() => channel.sendToQueue(rpcQueue, new Buffer(content), msgProperties))
  });

@cressie176
Copy link
Collaborator

cressie176 commented Nov 19, 2017

The following should answer your questions...

Queue names starting with "amq." are reserved for internal use by the broker. Attempts to declare a queue with a name that violates this rule will result in a channel-level exception with reply code 403 (ACCESS_REFUSED).

https://www.rabbitmq.com/tutorials/amqp-concepts.html

To use direct reply-to, an RPC client should:

Consume from the pseudo-queue amq.rabbitmq.reply-to in no-ack mode. There is no need to declare this "queue" first, although the client can do so if it wants.
Set the reply-to property in their request message to amq.rabbitmq.reply-to.
The RPC server will then see a reply-to property with a generated name. It should publish to the default exchange ("") with the routing key set to this value (i.e. just as if it were sending to a reply queue as usual). The message will then be sent straight to the client consumer.

https://www.rabbitmq.com/direct-reply-to.html

3.4.0 21 Oct 2014 Fast RPC replies
Live plugin activation
Reconnecting .net client
(changes)

https://www.rabbitmq.com/changelog.html

@mattqs
Copy link

mattqs commented Jul 28, 2018

I wrote an npm package amq.rabbitmq.reply-to.js that:

Usage:

const rabbitmqreplyto = require('amq.rabbitmq.reply-to.js');

const serverCallbackTimesTen = (message, rpcServer) => {
    const n = parseInt(message);
    return Promise.resolve(`${n * 10}`);
};

let rpcServer;
let rpcClient;
Promise.resolve().then(() => {
    const serverOptions = new rabbitmqreplyto.RpcServerOptions(
    /* url */ undefined, 
    /* serverId */ undefined, 
    /* callback */ serverCallbackTimesTen);

    return rabbitmqreplyto.RpcServer.Create(serverOptions);
}).then((rpcServerP) => {
    rpcServer = rpcServerP;
    return rabbitmqreplyto.RpcClient.Create();
}).then((rpcClientP) => {
    rpcClient = rpcClientP;
    const promises = [];
    for (let i = 1; i <= 20; i++) {
        promises.push(rpcClient.sendRPCMessage(`${i}`));
    }
    return Promise.all(promises);
}).then((replies) => {
    console.log(replies);
    return Promise.all([rpcServer.Close(), rpcClient.Close()]);
});

//['10',
//  '20',
//  '30',
//  '40',
//  '50',
//  '60',
//  '70',
//  '80',
//  '90',
//  '100',
//  '110',
//  '120',
//  '130',
//  '140',
//  '150',
//  '160',
//  '170',
//  '180',
//  '190',
//  '200']

@Igor-lkm
Copy link

...sorry about writing a comment into a closed issue.

I made an example based on #259 (comment) which would work out of the box without extra libraries, including server and client code:

https://github.com/Igor-lkm/node-rabbitmq-rpc-direct-reply-to - it might be helful to someone to start with direct reply-to.

@squaremo
Copy link
Collaborator

@Igor-lkm No worries -- people still look through closed issues, and it's helpful to have more worked examples. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants