Skip to content

Commit

Permalink
Cast with options, better debug messages
Browse files Browse the repository at this point in the history
client.cast() takes options as an argument
more consistent, verbose, debug messages
  • Loading branch information
Brendan Myers committed Feb 23, 2017
1 parent 4e1f14b commit 7a6bac0
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 56 deletions.
10 changes: 6 additions & 4 deletions src/application.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import Worker from './worker';

export default class Application {
constructor(options) {
debug(`.constructor() options=${JSON.stringify(options)}`);

this._client = new Client(options ? options.client : null);
this._worker = new Worker(options ? options.worker : null);
}


connect(url) {
debug(`Connecting ${url}`);
debug(`.connect() url=${url}`);

return amqp.connect(url).then(conn => {
this._conn = conn;
Expand All @@ -29,10 +31,10 @@ export default class Application {


disconnect() {
debug(`Disconnecting`);
debug('.disconnect()');

if (!this._conn) {
debug('Disconnecting: Not connected');
debug(' error: Not connected');
throw 'Disconnecting: Not connected';
}

Expand All @@ -45,7 +47,7 @@ export default class Application {
return this._conn.close();
}).then(() => {
delete this._conn;
debug('Disconnected');
debug(' success: disconnected');
});
}

Expand Down
74 changes: 42 additions & 32 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const defaultOptions = {
export default class Client {
constructor(options) {
this._options = Object.assign(defaultOptions, options);

debug(`.constructor() options=${JSON.stringify(options)}`);
debug(` _options=${JSON.stringify(this._options)}`);
}


Expand All @@ -29,79 +32,79 @@ export default class Client {


connect(url) {
debug(`Connecting ${url}`);
debug(`.connect() url=${url}`);

return amqp.connect(url).then(conn => {
this._conn = conn;

this._conn.on('close', () =>
debug('Connection closed')
debug('event: connection closed')
);
this._conn.on('error', error =>
debug('Connection error:', error)
debug('event: connection error:', error)
);

return this._createChannel(this._conn);
});
};
}


disconnect() {
debug(`Disconnecting`);
debug('.disconnect()');

if (!this._conn) {
debug('Disconnecting: Not connected');
debug(' error: Not connected');
throw 'Disconnecting: Not connected';
}

this._closeChannel().then(() => {
return this._conn.close();
}).then(() => {
delete this._conn;
debug('Disconnected')
}).catch(error =>
debug('Discconect error:', error.message)
);
debug(' success: disconnected');
}).catch(error => {
debug(` error: ${error.message}`)
});
}


cast(endpoint, payload) {
cast(route, payload, options) {
debug(`.cast() route=${route} payload=${JSON.stringify(payload)} options=${options}`);

if (!this._conn) {
debug(`Casting ${endpoint}: Not connected`);
throw `Casting ${endpoint}: Not connected`;
debug(` error: Not connected`);
throw `Casting ${route}: Not connected`;
}

debug(`Casting ${endpoint}(${payload})`);

this._send(endpoint, payload);
this._send(route, payload, options);
}


call(endpoint, payload, timeout) {
call(route, payload, timeout) {
debug(`.call() route=${route} payload=${JSON.stringify(payload)} timeout=${timeout}`);

if (!this._conn) {
debug(`Calling ${endpoint}: Not connected`);
throw `Calling ${endpoint}: Not connected`;
debug(` error: Not connected`);
throw `Calling ${route}: Not connected`;
}

const id = Util.generateUuid();

return new Promise((resolve, reject) => {
debug(`Calling ${endpoint}(${payload})`);

const timer = setTimeout(() => {
debug(`[${endpoint}] Timed out`, timeout || this._options.timeout);
debug(`error: call to ${route} timed out`, timeout || this._options.timeout);
this._ch.emitter.removeAllListeners(id);
reject('Timed out');
reject(`error: call to ${route} timed out`);
}, timeout || this._options.timeout);

const cb = (msg) => {
if (msg && msg.properties.correlationId == id) {
clearTimeout(timer);

debug(`[${endpoint}] Received`);
debug(`[${endpoint}] fields: ${JSON.stringify(msg.fields)}`);
debug(`[${endpoint}] properties: ${JSON.stringify(msg.properties)}`);
debug(`[${endpoint}] content: ${msg.content.toString()}`);
debug(`.callback() route=${route}`);
debug(` fields=${JSON.stringify(msg.fields)}`);
debug(` properties=${JSON.stringify(msg.properties)}`);
debug(` content=${msg.content.toString()}`);

const response = new Content(msg.content);

Expand All @@ -112,7 +115,7 @@ export default class Client {
this._ch.emitter.once(id, cb);

this._send(
endpoint,
route,
payload,
{
correlationId: id,
Expand All @@ -125,6 +128,8 @@ export default class Client {


_createChannel(connection) {
debug('._createChannel()');

// Yuck, but amqplib-mock doesn't return a promise from createChannel()
return new Promise((resolve, reject) => {
resolve(connection.createChannel());
Expand All @@ -140,10 +145,10 @@ export default class Client {
);

this._ch.on('close', () =>
debug('Channel closed')
debug('event: channel closed')
);
this._ch.on('error', error =>
debug('Channel error:', error)
debug('event: channel error:', error)
);

return Promise.resolve();
Expand All @@ -152,15 +157,20 @@ export default class Client {


_closeChannel() {
debug('._closeChannel()');
debug(` tag=${this._options.consume.consumerTag}`);

return this._ch.cancel(this._options.consume.consumerTag).then(() => {
return this._ch.close();
});
}


_send(endpoint, payload, options={}) {
_send(route, payload, options={}) {
debug(`._send() route=${route} payload=${JSON.stringify(payload)} options=${options}`);

return this._ch.sendToQueue(
endpoint,
route,
Util.prepareBuffer(payload),
options
);
Expand Down
11 changes: 10 additions & 1 deletion src/content.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
const debug = require('debug')('tibbar:content');

export default class Content {
constructor(buffer) {
debug(`.constructor() buffer=${buffer}`);
this._buffer = buffer;
}

asString() {
debug('.asString()');
debug(` returning=${this._buffer.toString()}`);
return this._buffer.toString();
}

asInt() {
return this._buffer.readInt32BE();
debug('.asInt()');
debug(` returning=${this._buffer.readDoubleBE()}`);
return this._buffer.readDoubleBE();
}

asJSON() {
debug('.asJSON()');
debug(` returning=${JSON.asString(this.asString())}`);
return JSON.parse(this.asString());
}
}
2 changes: 2 additions & 0 deletions src/request.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
const debug = require('debug')('tibbar:request');
import Content from './content';
import * as Util from './util';

export default class Request {
constructor(msg) {
debug(`.constructor() msg=${JSON.stringify(msg)}`);
this._msg = msg;
}

Expand Down
10 changes: 10 additions & 0 deletions src/response.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
const debug = require('debug')('tibbar:response');
import * as Util from './util';

export default class Response {
constructor(channel, msg) {
debug(`.constructor() msg=${JSON.stringify(msg)}`);

this._ch = channel;
this._msg = msg;
}

ack() {
debug('.ack()');
debug(` msg=${JSON.stringify(this._msg)}`);
this._ch.ack(this._msg);

return this;
}

send(payload) {
debug(`.send() payload=${payload}`);
debug(` queue=${this._msg.properties.replyTo}`);
debug(` payload=${JSON.stringify(Util.prepareBuffer(payload))}`);
debug(` options={ correlationId: ${this._msg.properties.correlationId} }`);

this._ch.sendToQueue(
/* queue */ this._msg.properties.replyTo,
/* payload */ Util.prepareBuffer(payload),
Expand Down
18 changes: 15 additions & 3 deletions src/util.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
const debug = require('debug')('tibbar:util');
import os from 'os';

export function generateUuid() {
return os.hostname() +
debug('.generateUuid()');

const uuid = os.hostname() +
Math.random().toString() +
Math.random().toString();

debug(` uuid=${uuid}`);

return uuid;
}


export function prepareBuffer(payload) {
debug(`.prepareBuffer() payload=${payload}`);

if (Buffer.isBuffer(payload)) {
debug(' type=Buffer');
return payload;
}

let buffer = null;

debug(` type=${typeof payload}`);

switch (typeof payload) {
case 'undefined':
buffer = Buffer.alloc(0);
Expand All @@ -24,8 +36,8 @@ export function prepareBuffer(payload) {
break;

case 'number':
buffer = Buffer.alloc(4);
buffer.writeInt32BE(payload, 0);
buffer = Buffer.alloc(8);
buffer.writeDoubleBE(payload, 0);
break;

case 'object':
Expand Down

0 comments on commit 7a6bac0

Please sign in to comment.