Skip to content

Commit

Permalink
Changed cache api to more standard format
Browse files Browse the repository at this point in the history
  • Loading branch information
mallocator committed Mar 17, 2017
1 parent 934c805 commit a9f9353
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 12 deletions.
12 changes: 5 additions & 7 deletions cache/memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@ class MemoryCache {

/**
* Stores the payload in cache.
* @param {string} queue The id/name of the queue to cache for
* @param {string} queue The name of the queue to cache for
* @param {string} id The id of the document to store
* @param {object} payload The data to cache
* @param {string} payload.id The id that is being used to reference the message later on
* @param {Callback} [cb] Callback to be notified on async store operations
*/
store(queue, payload, cb) {
if (!payload.id) {
throw new Error('Unable to cache message queue payload because of missing id');
}
this._queues[queue] = this._queues[queue] || {}
this._queues[queue][payload.id] = payload;
store(queue, id, payload, cb) {
this._queues[queue] = this._queues[queue] || {};
this._queues[queue][id] = payload;
cb && cb();
}

Expand Down
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const EventEmitter = require('events').EventEmitter;
/**
* The standard callback definition that can be used anywhere where standards are followed.
* @typedef {function} Callback
* @param {Error|string} [err] Will contain error information if there's has been one
* @param {Error|string|null} [err] Will contain error information if there's has been one
* @param {*} [...args]
*/

Expand Down
9 changes: 5 additions & 4 deletions rabbitmq/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class Channel extends EventEmitter {
this._channel.deleteQueue(this.name, err => {
err && this._log.warn('There was an error trying to delete queue %s', this.name, err);
this._channel.removeAllListeners();
this._cache.clear();
this._cache.clear(this._name);
delete this._channel;
});
}
Expand All @@ -89,7 +89,7 @@ class Channel extends EventEmitter {
* @returns {object} The id that was used for this message
*/
send(msg, cb, id = shortId.generate()) {
this._cache.store(this.name, {id, payload:msg, callback:cb});
this._cache.store(this.name, id, {payload:msg, callback:cb});
this._setUp(err => {
this._cache.remove(this.name, id);
if (err) {
Expand All @@ -114,7 +114,7 @@ class Channel extends EventEmitter {
if (err) {
return cb && cb(err);
}
this._cache.store(this.name, { id, listener: cb, noAck });
this._cache.store(this.name, id, { listener: cb, noAck });
this._channel.consume(this._name, msg => {
cb && cb(null, JSON.parse(msg.content.toString('utf8')), ( ack = true ) => {
noAck || (ack ? this._channel.ack(msg) : this._channel.nack(msg));
Expand All @@ -131,6 +131,7 @@ class Channel extends EventEmitter {
*/
unlisten(id, cb) {
this._channel.cancel(id, cb);
this._cache.remove(this.name, id);
}

/**
Expand All @@ -145,7 +146,7 @@ class Channel extends EventEmitter {
if (err) {
return cb && cb(err);
}
this._cache.store(this.name, { id, receiver:cb, noAck });
this._cache.store(this.name, id, { receiver:cb, noAck });
this._channel.consume(this.name, msg => {
this._cache.remove(this.name, id);
noAck && this._channel.cancel(id);
Expand Down

0 comments on commit a9f9353

Please sign in to comment.