diff --git a/cache/memory.js b/cache/memory.js index 4925eec..4085081 100644 --- a/cache/memory.js +++ b/cache/memory.js @@ -5,17 +5,15 @@ class MemoryCache { /** * Stores the payload in cache. - * @param {string} queue The id/name of the queue to cache for + * @param {string} queue The name of the queue to cache for + * @param {string} id The id of the document to store * @param {object} payload The data to cache * @param {string} payload.id The id that is being used to reference the message later on * @param {Callback} [cb] Callback to be notified on async store operations */ - store(queue, payload, cb) { - if (!payload.id) { - throw new Error('Unable to cache message queue payload because of missing id'); - } - this._queues[queue] = this._queues[queue] || {} - this._queues[queue][payload.id] = payload; + store(queue, id, payload, cb) { + this._queues[queue] = this._queues[queue] || {}; + this._queues[queue][id] = payload; cb && cb(); } diff --git a/index.js b/index.js index 9543fdf..c86d28b 100644 --- a/index.js +++ b/index.js @@ -6,7 +6,7 @@ const EventEmitter = require('events').EventEmitter; /** * The standard callback definition that can be used anywhere where standards are followed. * @typedef {function} Callback - * @param {Error|string} [err] Will contain error information if there's has been one + * @param {Error|string|null} [err] Will contain error information if there's has been one * @param {*} [...args] */ diff --git a/rabbitmq/channel.js b/rabbitmq/channel.js index 71f7e9d..d3b6899 100644 --- a/rabbitmq/channel.js +++ b/rabbitmq/channel.js @@ -72,7 +72,7 @@ class Channel extends EventEmitter { this._channel.deleteQueue(this.name, err => { err && this._log.warn('There was an error trying to delete queue %s', this.name, err); this._channel.removeAllListeners(); - this._cache.clear(); + this._cache.clear(this._name); delete this._channel; }); } @@ -89,7 +89,7 @@ class Channel extends EventEmitter { * @returns {object} The id that was used for this message */ send(msg, cb, id = shortId.generate()) { - this._cache.store(this.name, {id, payload:msg, callback:cb}); + this._cache.store(this.name, id, {payload:msg, callback:cb}); this._setUp(err => { this._cache.remove(this.name, id); if (err) { @@ -114,7 +114,7 @@ class Channel extends EventEmitter { if (err) { return cb && cb(err); } - this._cache.store(this.name, { id, listener: cb, noAck }); + this._cache.store(this.name, id, { listener: cb, noAck }); this._channel.consume(this._name, msg => { cb && cb(null, JSON.parse(msg.content.toString('utf8')), ( ack = true ) => { noAck || (ack ? this._channel.ack(msg) : this._channel.nack(msg)); @@ -131,6 +131,7 @@ class Channel extends EventEmitter { */ unlisten(id, cb) { this._channel.cancel(id, cb); + this._cache.remove(this.name, id); } /** @@ -145,7 +146,7 @@ class Channel extends EventEmitter { if (err) { return cb && cb(err); } - this._cache.store(this.name, { id, receiver:cb, noAck }); + this._cache.store(this.name, id, { receiver:cb, noAck }); this._channel.consume(this.name, msg => { this._cache.remove(this.name, id); noAck && this._channel.cancel(id);