Skip to content

bangbang93/carrotmq

Repository files navigation

carrotmq

a much easy way to use rabbitmq

中文文档

Build Status Version npm NPM Downloads Dependencies NPM

APIDOC

documentation

usage

const {CarrotMQ} = require('carrotmq');

const mq = new CarrotMQ('amqp://localhost');
await mq.connect()

const publisher = new CarrotMQ('amqp://localhost'); //also can use without schema
await publisher.connect()

mq.queue('fooQueue', async (data, ctx) => {
    console.log(data);
    ctx.ack();
    //ctx.nack();
    //ctx.reject();
    //ctx.cancel(); cancel this consumer;
    ctx.reply({date: new Date}); //reply to message.properties.relyTo
    ctx.carrotmq //carrotmq instrance
    ctx.channel  //current channel
    return Promise.reject(); // or throw new Error('some thing happened') will execute `this.reject()` if this message hadn't been ack
});

mq.sendToQueue('queue', {msg: 'message'});
mq.publish('exchange', 'foo.bar.key', {msg: 'hello world!'});

RPC

mq.rpc('queue', {data: new Date})
.then((reply)=>{
  reply.ack();
  console.log(reply.data); //some reply result
});

If you prefer to use named queue rather than temp queue, you can set in config like

const mq = new CarrotMQ('amqp://localhost', {
  callbackQueue: {
    queue: 'carrotmq.rpc.callback'
  }
})

Or

mq.rpc('carrotmq.rpc', {data: 'foo'}, 'carrotmq.rpc.callback') 

RPC Over Exchange

app.queue('rpcQueue', async (data, ctx) => {
  await ctx.reply(data);
  await ctx.ack();
});

let time = new Date();
app.rpcExchange('exchange0', 'rpc.rpc', {time})
.then(function (reply){
  reply.ack();
  console.log(reply.data)//{time: time}
}) // if target exchange is an topic or fanout exchange, only the first reply will be accepted.

events

ready

emit after connection established

mq.on('ready', function(){});

error

emit when something happened

mq.on('error', function (err){});

message

emit when message come

mq.on('message', function (data){
  data.channel; //channel object
  data.queue   //queue name
  data.message  //message object
})

close

emit when connection close

mq.on('close', () => setTimeout(mq.connect(), 1000));

upgrade

V4 to V5

Because of rewritten in TypeScript, some export has changed before:

const CarrotMQ = require('carrotmq')

after:

const {CarrotMQ} = require('carrotmq')

V2 to V3

breaking change

  • mq.rpc() and mq.rpcExchange() method remove the 4th consumer argument.And using Promise

used to

  mq.rpc('someQueue', {data}, function(data) {
    const that = this;
    // or some data async logic
    doSomeThingAsync(data)
    .then(() => that.ack())
    .catch(() => that.nack());
    return data;
  }).then((data) => console.log(data));

now can replaced by

    let reply = await mq.rpc('someQueue', {data});
    try {
      await doSomeThingAsync(reply.data);
      reply.ack();
    } catch (e) {
      reply.nack();
    }