diff --git a/src/application.js b/src/application.js index 7414a17..4204833 100644 --- a/src/application.js +++ b/src/application.js @@ -5,13 +5,15 @@ import Worker from './worker'; export default class Application { constructor(options) { + debug(`.constructor() options=${JSON.stringify(options)}`); + this._client = new Client(options ? options.client : null); this._worker = new Worker(options ? options.worker : null); } connect(url) { - debug(`Connecting ${url}`); + debug(`.connect() url=${url}`); return amqp.connect(url).then(conn => { this._conn = conn; @@ -29,10 +31,10 @@ export default class Application { disconnect() { - debug(`Disconnecting`); + debug('.disconnect()'); if (!this._conn) { - debug('Disconnecting: Not connected'); + debug(' error: Not connected'); throw 'Disconnecting: Not connected'; } @@ -45,7 +47,7 @@ export default class Application { return this._conn.close(); }).then(() => { delete this._conn; - debug('Disconnected'); + debug(' success: disconnected'); }); } diff --git a/src/client.js b/src/client.js index 1c51f51..d80ef1e 100644 --- a/src/client.js +++ b/src/client.js @@ -20,6 +20,9 @@ const defaultOptions = { export default class Client { constructor(options) { this._options = Object.assign(defaultOptions, options); + + debug(`.constructor() options=${JSON.stringify(options)}`); + debug(` _options=${JSON.stringify(this._options)}`); } @@ -29,28 +32,28 @@ export default class Client { connect(url) { - debug(`Connecting ${url}`); + debug(`.connect() url=${url}`); return amqp.connect(url).then(conn => { this._conn = conn; this._conn.on('close', () => - debug('Connection closed') + debug('event: connection closed') ); this._conn.on('error', error => - debug('Connection error:', error) + debug('event: connection error:', error) ); return this._createChannel(this._conn); }); - }; + } disconnect() { - debug(`Disconnecting`); + debug('.disconnect()'); if (!this._conn) { - debug('Disconnecting: Not connected'); + debug(' error: Not connected'); throw 'Disconnecting: Not connected'; } @@ -58,50 +61,50 @@ export default class Client { return this._conn.close(); }).then(() => { delete this._conn; - debug('Disconnected') - }).catch(error => - debug('Discconect error:', error.message) - ); + debug(' success: disconnected'); + }).catch(error => { + debug(` error: ${error.message}`) + }); } - cast(endpoint, payload) { + cast(route, payload, options) { + debug(`.cast() route=${route} payload=${JSON.stringify(payload)} options=${options}`); + if (!this._conn) { - debug(`Casting ${endpoint}: Not connected`); - throw `Casting ${endpoint}: Not connected`; + debug(` error: Not connected`); + throw `Casting ${route}: Not connected`; } - debug(`Casting ${endpoint}(${payload})`); - - this._send(endpoint, payload); + this._send(route, payload, options); } - call(endpoint, payload, timeout) { + call(route, payload, timeout) { + debug(`.call() route=${route} payload=${JSON.stringify(payload)} timeout=${timeout}`); + if (!this._conn) { - debug(`Calling ${endpoint}: Not connected`); - throw `Calling ${endpoint}: Not connected`; + debug(` error: Not connected`); + throw `Calling ${route}: Not connected`; } const id = Util.generateUuid(); return new Promise((resolve, reject) => { - debug(`Calling ${endpoint}(${payload})`); - const timer = setTimeout(() => { - debug(`[${endpoint}] Timed out`, timeout || this._options.timeout); + debug(`error: call to ${route} timed out`, timeout || this._options.timeout); this._ch.emitter.removeAllListeners(id); - reject('Timed out'); + reject(`error: call to ${route} timed out`); }, timeout || this._options.timeout); const cb = (msg) => { if (msg && msg.properties.correlationId == id) { clearTimeout(timer); - debug(`[${endpoint}] Received`); - debug(`[${endpoint}] fields: ${JSON.stringify(msg.fields)}`); - debug(`[${endpoint}] properties: ${JSON.stringify(msg.properties)}`); - debug(`[${endpoint}] content: ${msg.content.toString()}`); + debug(`.callback() route=${route}`); + debug(` fields=${JSON.stringify(msg.fields)}`); + debug(` properties=${JSON.stringify(msg.properties)}`); + debug(` content=${msg.content.toString()}`); const response = new Content(msg.content); @@ -112,7 +115,7 @@ export default class Client { this._ch.emitter.once(id, cb); this._send( - endpoint, + route, payload, { correlationId: id, @@ -125,6 +128,8 @@ export default class Client { _createChannel(connection) { + debug('._createChannel()'); + // Yuck, but amqplib-mock doesn't return a promise from createChannel() return new Promise((resolve, reject) => { resolve(connection.createChannel()); @@ -140,10 +145,10 @@ export default class Client { ); this._ch.on('close', () => - debug('Channel closed') + debug('event: channel closed') ); this._ch.on('error', error => - debug('Channel error:', error) + debug('event: channel error:', error) ); return Promise.resolve(); @@ -152,15 +157,20 @@ export default class Client { _closeChannel() { + debug('._closeChannel()'); + debug(` tag=${this._options.consume.consumerTag}`); + return this._ch.cancel(this._options.consume.consumerTag).then(() => { return this._ch.close(); }); } - _send(endpoint, payload, options={}) { + _send(route, payload, options={}) { + debug(`._send() route=${route} payload=${JSON.stringify(payload)} options=${options}`); + return this._ch.sendToQueue( - endpoint, + route, Util.prepareBuffer(payload), options ); diff --git a/src/content.js b/src/content.js index f6efb7a..3ae77eb 100644 --- a/src/content.js +++ b/src/content.js @@ -1,17 +1,26 @@ +const debug = require('debug')('tibbar:content'); + export default class Content { constructor(buffer) { + debug(`.constructor() buffer=${buffer}`); this._buffer = buffer; } asString() { + debug('.asString()'); + debug(` returning=${this._buffer.toString()}`); return this._buffer.toString(); } asInt() { - return this._buffer.readInt32BE(); + debug('.asInt()'); + debug(` returning=${this._buffer.readDoubleBE()}`); + return this._buffer.readDoubleBE(); } asJSON() { + debug('.asJSON()'); + debug(` returning=${JSON.asString(this.asString())}`); return JSON.parse(this.asString()); } } \ No newline at end of file diff --git a/src/request.js b/src/request.js index 64ce91d..c1256f9 100644 --- a/src/request.js +++ b/src/request.js @@ -1,8 +1,10 @@ +const debug = require('debug')('tibbar:request'); import Content from './content'; import * as Util from './util'; export default class Request { constructor(msg) { + debug(`.constructor() msg=${JSON.stringify(msg)}`); this._msg = msg; } diff --git a/src/response.js b/src/response.js index 6bf8612..db66211 100644 --- a/src/response.js +++ b/src/response.js @@ -1,18 +1,28 @@ +const debug = require('debug')('tibbar:response'); import * as Util from './util'; export default class Response { constructor(channel, msg) { + debug(`.constructor() msg=${JSON.stringify(msg)}`); + this._ch = channel; this._msg = msg; } ack() { + debug('.ack()'); + debug(` msg=${JSON.stringify(this._msg)}`); this._ch.ack(this._msg); return this; } send(payload) { + debug(`.send() payload=${payload}`); + debug(` queue=${this._msg.properties.replyTo}`); + debug(` payload=${JSON.stringify(Util.prepareBuffer(payload))}`); + debug(` options={ correlationId: ${this._msg.properties.correlationId} }`); + this._ch.sendToQueue( /* queue */ this._msg.properties.replyTo, /* payload */ Util.prepareBuffer(payload), diff --git a/src/util.js b/src/util.js index 9421598..3f204da 100644 --- a/src/util.js +++ b/src/util.js @@ -1,19 +1,31 @@ +const debug = require('debug')('tibbar:util'); import os from 'os'; export function generateUuid() { - return os.hostname() + + debug('.generateUuid()'); + + const uuid = os.hostname() + Math.random().toString() + Math.random().toString(); + + debug(` uuid=${uuid}`); + + return uuid; } export function prepareBuffer(payload) { + debug(`.prepareBuffer() payload=${payload}`); + if (Buffer.isBuffer(payload)) { + debug(' type=Buffer'); return payload; } let buffer = null; + debug(` type=${typeof payload}`); + switch (typeof payload) { case 'undefined': buffer = Buffer.alloc(0); @@ -24,8 +36,8 @@ export function prepareBuffer(payload) { break; case 'number': - buffer = Buffer.alloc(4); - buffer.writeInt32BE(payload, 0); + buffer = Buffer.alloc(8); + buffer.writeDoubleBE(payload, 0); break; case 'object': diff --git a/src/worker.js b/src/worker.js index f2a6d8d..35ac93a 100644 --- a/src/worker.js +++ b/src/worker.js @@ -19,6 +19,9 @@ export default class Worker { this._queues = {}; this._middlewares = []; this._options = Object.assign(defaultOptions, options); + + debug(`.constructor() options=${JSON.stringify(options)}`); + debug(` _options=${JSON.stringify(this._options)}`); } @@ -28,16 +31,16 @@ export default class Worker { connect(url) { - debug(`Connecting ${url}`); + debug(`.connect() url=${url}`); return amqp.connect(url).then(conn => { this._conn = conn; this._conn.on('close', () => - debug('Connection closed') + debug('event: connection closed') ); this._conn.on('error', error => - debug('Connection error:', error) + debug('event: connection error:', error) ); return this._createChannel(this._conn); @@ -46,10 +49,10 @@ export default class Worker { disconnect() { - debug(`Disconnecting`); + debug('.disconnect()'); if (!this._conn) { - debug('Disconnecting: Not connected'); + debug(' error: Not connected'); throw 'Disconnecting: Not connected'; } @@ -57,15 +60,15 @@ export default class Worker { return this._conn.close(); }).then(() => { delete this._conn; - debug('Disconnected'); + debug(' success: disconnected'); }); } accept(name, callback) { - assert(!this._queues[name], `'${name}'' already exists`); + debug(`.accept() name='${name}', callback='${name}'`); - debug(`Adding queue '${name}'`); + assert(!this._queues[name], `'${name}'' already exists`); this._queues[name] = {}; this._queues[name].callback = callback; @@ -77,11 +80,15 @@ export default class Worker { use(middleware) { + debug(`.use() middleware=${middleware}`); + return this._middlewares.push(middleware); } _createChannel(connection) { + debug('_createChannel()'); + // Yuck, but amqplib-mock doesn't return a promise from createChannel() return new Promise((resolve, reject) => { return resolve(connection.createChannel()); @@ -89,10 +96,10 @@ export default class Worker { this._ch = ch; this._ch.on('close', () => - debug('Channel closed') + debug('event: channel closed') ); this._ch.on('error', error => - debug('Channel error:', error) + debug('event: channel error:', error) ); const promises = []; @@ -107,6 +114,8 @@ export default class Worker { _closeChannel() { + debug('_closeChannel()'); + const promises = []; for (let q in this._queues) { @@ -120,7 +129,7 @@ export default class Worker { _openQueue(q) { - debug(`[${q}] Opening`); + debug(`._openQueue() q=${q}`); const queue = this._queues[q]; @@ -132,10 +141,10 @@ export default class Worker { return; } - debug(`[${q}] Received`); - debug(`[${q}] fields: ${JSON.stringify(msg.fields)}`); - debug(`[${q}] properties: ${JSON.stringify(msg.properties)}`); - debug(`[${q}] content: ${msg.content.toString()}`); + debug(`.callback() q=${q}`); + debug(` fields=${JSON.stringify(msg.fields)}`); + debug(` properties=${JSON.stringify(msg.properties)}`); + debug(` content=${msg.content.toString()}`); const request = new Request(msg); const response = new Response(this._ch, msg); @@ -148,7 +157,7 @@ export default class Worker { _closeQueue(q) { - debug(`[${q}] Closing`); + debug(`._closeQueue() q=${q}`); const promise = this._ch.deleteQueue(q); delete this._queues[q]; @@ -157,10 +166,14 @@ export default class Worker { _execCallback(i, request, response, last) { + debug(`_execCallback() i=${i}`); + if (i === this._middlewares.length) { + debug(' executing callback'); return last(request, response); } + debug(' executing middleware'); return this._middlewares[i]( request, response,