Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

what about createChannel ? #19

Open
empitegayan opened this issue Jan 27, 2017 · 11 comments
Open

what about createChannel ? #19

empitegayan opened this issue Jan 27, 2017 · 11 comments

Comments

@empitegayan
Copy link

i'm trying to convert native this code using amqp-ts

amqp.connect(configuration.amqpUrl, function (err: any, conn: any) {
        conn.createChannel(function (err: any, ch: any) {
            ch.assertQueue('', { exclusive: true }, function (err: any, q: any) {
                let corr = uuid.v4();
                ch.consume(q.queue, function (msg: any) {
                    if (msg.properties.correlationId == corr && msg.content != null) {
                        let resData = JSON.parse(msg.content);
                        res.json(resData);
                    }
                }, { noAck: true });

                ch.sendToQueue(queue, new Buffer(JSON.stringify(req)), { correlationId: corr, replyTo: q.queue });
            });
        });
    });

but createChannel , assertQueue not implemented or not there ?
http://www.squaremobius.net/amqp.node/channel_api.html#model_createChannel

@abreits
Copy link
Owner

abreits commented Jan 27, 2017

amqp-ts was designed to make it easy to connect to amqp, without the need to create channels or wait for callbacks or promises to resolve before continuing. It uses the library you mention as a base.

The code you give could be written as follows with amqp.ts in typescript:

import * as Amqp from "amqp-ts";

// connect to the amqp server
var connection = new Amqp.Connection(configuration.amqpUrl);

// define a new queue
var queue = connection.declareQueue("", {exclusive: true});

// define the consumer for the queue
queue.activateConsumer( message => {
  let msg = message.getContent();
  if (msg.properties.correlationId == corr && msg.content != null) {
    let resData = JSON.parse(msg.content);
    res.json(resData);
  }
}, {noAck: true});

// send a message to the queue
queue.send(new Message({ correlationId: corr, replyTo: q.queue });

// or, if you want to absolutely make sure that the queue is created before you send something
connection.completeConfiguration().then(() => { 
  queue.send(new Message({ correlationId: corr, replyTo: q.queue });
}).catch(err => {
  console.log("An error occurred: " + err.message);
});

@abreits
Copy link
Owner

abreits commented Jan 27, 2017

Oops, just noticed that I messed up.
This should be better:

import * as Amqp from "amqp-ts";

// connect to the amqp server
var connection = new Amqp.Connection(configuration.amqpUrl);

// define a new queue
var queue = connection.declareQueue("", {exclusive: true});

let corr = uuid.v4();

// define the consumer for the queue
queue.activateConsumer( msg => {
  let content = msg.getContent();
  if (msg.properties.correlationId == corr && content != null) {
    res.json(content);  // I don't know what this line does
  }
}, {noAck: true});

// send a message to the queue
queue.send(new Message(req, { correlationId: corr, replyTo: q.queue });

// or, if you want to absolutely make sure that the everything is initialized before you send something
connection.completeConfiguration().then(() => { 
  queue.send(new Message(req, { correlationId: corr, replyTo: q.queue });
}).catch(err => {
  console.log("An error occurred: " + err.message);
});

@empitegayan
Copy link
Author

Hello @abreits thanks for the help i'm referring RPC call on my implementation. i have microservice to consume messages on queue. but above implementation api request it self consume the request from queue. but i need something like this

may be using this code it will possible

queue.rpc(num).then(function(result) {
  console.log(' [.] Got ', result.getContent());
});

but problem was we cannot pass Message to rpc method (cannot pass correlationId, replyTo as properties) or i might understood this in wrong way. could you please help me to undestand.

@abreits
Copy link
Owner

abreits commented Jan 31, 2017

Do you mean that you have an existing rpc server and you need to write the client, or that you can both write the server and client (if the latter, then you can use the code in tutorial 6:

https://github.com/abreits/amqp-ts/tree/master/tutorials/6_rpc

If however you have an already existing server implementation then you need to do the following:

  • connect to the reply_to queue
  • connect the consumer that processes the rpc response to the reply_to queue
  • connect to the rpc queue
  • send the message to the rpc queue (with the correct parameters)

something like this:

// connect to the reply_to queue
// amqp-ts does not work well with autogenerated queue names, so we generate a name
let responseQueueName = "rpc-response-" + uuid.v4(); 
let rpcResponseQueue = connection.declareQueue(responseQueueName, {exclusive: true});

//create a correlation id (if needed)
let corr = uuid.v4();

// define the consumer function (can also be done inline)
function rpcConsumer (msg: Amqp.Message) {
  rpcResponse = msg.getContent();
    // process rpc response, e.g.
    if (msg.properties.correlationId == corr && rpcResponse != null) {
      res.json(rpcResponse);  
    }
  }
}

// connect to consumer to the queue
rpcResponseQueue.activateConsumer(rpcConsumer, {noAck: true});

// connect to the rpc queue
let rpcQueue = connection.declareQueue(rpcQueueName);

// send a message to the rpc queue
let rpcRequest = new Amqp.Message(rpcRequestContent, {correlation_id: corr, reply_to: responseQueueName});
rpcQueue.send(rpcRequest);

@empitegayan
Copy link
Author

Thanks you very much!! That did the job!! thanks again..

@empitegayan
Copy link
Author

Sorry for the bothering is there any specific reason to use exclusive = true because server cannot access to exclusive queue to give the response

@abreits
Copy link
Owner

abreits commented Feb 1, 2017

No reason, I just added it because it was present in your original example. If it causes problems just remove it.

@empitegayan
Copy link
Author

when consuming message i'm getting error

error: Queue.onMessage consumer function returned error: content is not a buffer module=amqp-ts

i thought it because of queue option but something else

@abreits
Copy link
Owner

abreits commented Feb 1, 2017

According to the error something went wrong in your consumer function, try to debug it there.

@empitegayan
Copy link
Author

i have dig in to source code and realized i'm calling this function to initialize consumer

var connection = new amqp.Connection(configuration["amqpUrl"]);
        var queueReq = connection.declareQueue(queue, { durable: false });
        queueReq.activateConsumer((message: amqp.Message) => {
            /* some logic go there */

        }, { noAck: true })

but when i call queueReq.activateConsumer as a parameter message typeof Message getting

but in source this line

it again try to create message var message = new Message(msg.content, msg.properties);
but msg.content is a already Buffer

@abreits
Copy link
Owner

abreits commented Feb 1, 2017

That is not a problem, a new Message can be created with a Buffer, it will then be used as is.

see how the content of a new Message object is created:
https://github.com/abreits/amqp-ts/blob/master/src/amqp-ts.ts#L327

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants