diff --git a/config/defaults.queue.js b/config/defaults.queue.js new file mode 100644 index 0000000..c18df51 --- /dev/null +++ b/config/defaults.queue.js @@ -0,0 +1,30 @@ +module.exports = { + defaultType: 'rabbitmq', + rabbitmq: { + instance: { + type: 'rabbitmq', + appId: Object.defineProperty({}, 'prop', {get: () => process.title}).prop, + logger: 'message', + client: { + host: 'localhost', + user: 'guest', + pass: 'guest', + heartbeat: 30 + }, + channels: {} + }, + channel: { + cache: { + type: 'memory' + }, + queue: { + durable: true, + messageTtl: 24 * 60 * 60 * 1000 // 24h + }, + message: { + persistent: true, + mandatory: true + } + } + } +}; diff --git a/config/queue.js b/config/queue.js index 1e94b89..f053ebf 100644 --- a/config/queue.js +++ b/config/queue.js @@ -1,28 +1 @@ -module.exports = { - _default_: { - type: 'rabbitmq', - appId: Object.defineProperty({}, 'prop', { get: () => process.title }).prop, - logger: 'message', - client: { - host: 'localhost', - user: 'guest', - pass: 'guest', - heartbeat: 30 - }, - channels: { - _default_: { - cache: { - type: 'memory' - }, - queue: { - durable: true, - messageTtl: 24 * 60 * 60 * 1000 // 24h - }, - message: { - persistent: true, - mandatory: true - } - } - } - } -}; +module.exports = {}; diff --git a/index.js b/index.js index 34eae56..9543fdf 100644 --- a/index.js +++ b/index.js @@ -1,4 +1,5 @@ const _ = require('lodash'); +const config = require('cheevr-config'); const globalConfig = require('cheevr-config').addDefaultConfig(__dirname, 'config'); const EventEmitter = require('events').EventEmitter; @@ -81,16 +82,16 @@ class Manager extends EventEmitter { /** * Returns the instance object that holds information about all the channels it has. - * @param {string} name The name of instance to return. + * @param {string} [name=_default_] The name of instance to return. * @returns {Instance} */ instance(name = '_default_') { let opts = this._config[name]; if (!opts) { - throw new Error('Missing configuration for message queue server named ' + name); + opts = config.defaults.queue[config.defaults.queue.defaultType].instance; } - name != '_default_' && (opts = _.defaultsDeep({}, opts, this._config._default_)); - this._instances[name] = this._instances[name] || new (require('./' + opts.type))(name, opts); + let type = (opts && opts.type) || config.defaults.queue.defaultType; + this._instances[name] = this._instances[name] || new (require('./' + type))(name, opts); this._instances[name].on('error', err => this.emit('error', err)); return this._instances[name]; } @@ -101,7 +102,7 @@ class Manager extends EventEmitter { * @param {string} [instanceName=_default_] The name name of the message queue instance * @returns {Channel} */ - queue(name, instanceName = '_default_') { + queue(name, instanceName) { return this.instance(instanceName).channel(name); } @@ -127,7 +128,7 @@ class Manager extends EventEmitter { middleware() { let defaultInstance; for (let instanceName in this._config) { - if (instanceName != '_default_' && (this._config[instanceName].default || !defaultInstance)) { + if (this._config[instanceName].default || !defaultInstance) { defaultInstance = this.instance(instanceName); } } @@ -153,7 +154,7 @@ class Manager extends EventEmitter { if (!(instance instanceof String)) { cb = msg; msg = instance; - instance = '_default_'; + instance = undefined; } return this.queue(queue, instance).send(msg, cb); } @@ -167,7 +168,7 @@ class Manager extends EventEmitter { receive(queue, instance, cb) { if (!(instance instanceof String)) { cb = instance; - instance = '_default_'; + instance = undefined; } return this.queue(queue, instance).receive(cb); } @@ -182,7 +183,7 @@ class Manager extends EventEmitter { listen(queue, instance, cb) { if (!(instance instanceof String)) { cb = instance; - instance = '_default_'; + instance = undefined; } return this.queue(queue, instance).listen(cb); } @@ -199,7 +200,7 @@ class Manager extends EventEmitter { if (!(instance instanceof String)) { cb = id; id = instance; - instance = '_default_'; + instance = undefined; } return this.queue(queue, instance).unlisten(id, cb); } diff --git a/package.json b/package.json index 301fe1a..9d9c6e4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "cheevr-message", - "version": "1.1.3", + "version": "1.1.4", "description": "A message queue library", "private": true, "main": "index.js", diff --git a/rabbitmq/channel.js b/rabbitmq/channel.js index 481f226..71f7e9d 100644 --- a/rabbitmq/channel.js +++ b/rabbitmq/channel.js @@ -1,4 +1,6 @@ +const _ = require('lodash'); const Cache = require('../cache'); +const config = require('cheevr-config'); const EventEmitter = require('events').EventEmitter; const shortId = require('shortid'); @@ -7,15 +9,15 @@ class Channel extends EventEmitter { /** * @param {string} name The name of this channel * @param {Instance} host The host instance in case we need to access host operations - * @param {RabbitChannelConfig} config The configuration for this channel + * @param {RabbitChannelConfig} channelConfig The configuration for this channel */ - constructor(name, host, config) { + constructor(name, host, channelConfig) { super(); this._name = name; - this._config = config; + this._config = _.defaultsDeep({}, channelConfig, config.defaults.queue.rabbitmq.channel); this._host = host; this._log = host._log; - this._cache = Cache.get(config.cache, this._log); + this._cache = Cache.get(this._config.cache, this._log); this._host.on('reconnected', () => { this._cache.get(this.name, (err, entries) => { err && this._log.error('Unable to restore cached messages for channel %s:', this.name, err); diff --git a/rabbitmq/index.js b/rabbitmq/index.js index 2b9c127..1077ae5 100644 --- a/rabbitmq/index.js +++ b/rabbitmq/index.js @@ -1,5 +1,7 @@ +const _ = require('lodash'); const amqp = require('amqplib/callback_api'); const Channel = require('./channel'); +const config = require('cheevr-config'); const EventEmitter = require('events').EventEmitter; const Logging = require('cheevr-logging'); @@ -44,15 +46,15 @@ class Instance extends EventEmitter { /** * * @param {string} name - * @param {RabbitInstanceConfig} config + * @param {RabbitInstanceConfig} instanceConfig */ - constructor(name, config) { + constructor(name, instanceConfig) { super(); this._channels = {}; this._name = name; - this._config = config; - this._host = config.client.host; - this._log = Logging[config.logger]; + this._config = _.defaultsDeep({}, instanceConfig, config.defaults.queue.rabbitmq.instance); + this._host = this._config.client.host; + this._log = Logging[this._config.logger]; this._interrupted = false; this.connect(); this.on('interrupted', () => setTimeout(this.connect.bind(this)), 100); @@ -117,14 +119,9 @@ class Instance extends EventEmitter { * @private */ _setUp() { - let defaultConf = this._config.channels._default_; if (this._config.channels) { for (let name in this._config.channels) { - if (name == '_default_') { - continue; - } - let channelConfig = Object.assign({}, defaultConf, this._config.channels[name]); - let channel = this._channels[name] = new Channel(this.name + '-' + name, this, channelConfig); + let channel = this._channels[name] = new Channel(this.name + '-' + name, this, this._config.channels[name]); channel.on('error', err => this.emit('error', err, this, channel)); } } @@ -161,8 +158,7 @@ class Instance extends EventEmitter { */ channel(name) { if (!this._channels[name]) { - let channelConfig = Object.assign({}, this._config.channels._default_, this._config.channels[name]); - let channel = this._channels[name] = new Channel(this.name + '-' + name, this, channelConfig); + let channel = this._channels[name] = new Channel(this.name + '-' + name, this, this._config.channels[name]); channel.on('error', err => this.emit('error', err, this, channel)); } return this._channels[name];