Navigation Menu

Skip to content

Commit

Permalink
Output logs with formatted prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 29, 2015
1 parent 530c5a6 commit 5aab584
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 70 deletions.
4 changes: 3 additions & 1 deletion index.js
Expand Up @@ -5,6 +5,8 @@ var socketIoAdapter = require('./lib/adapter/socket.io');
var dashboardUI = require('./lib/ui/dashboard');
var ConsoleLogger = require('./lib/console-logger').ConsoleLogger;

var LOG_PREFIX = '[global] ';

function droonga(application, params) {
params = params || {};

Expand All @@ -21,7 +23,7 @@ function droonga(application, params) {
if (params.server) {
socketIoAdapter.register(application, params.server, params);
params.server.on('error', function(error) {
params.logger.error(error);
params.logger.error(LOG_PREFIX + 'Unhandled Error', error);
if (typeof connectionPool.shutdown == 'function')
connectionPool.shutdown();
});
Expand Down
12 changes: 7 additions & 5 deletions lib/adapter/http.js
Expand Up @@ -9,6 +9,8 @@ var MAX_RETRY_COUNT = 60;
var ERROR_INTERNAL = 500;
var ERROR_GATEWAY_TIMEOUT = 504;

var LOG_PREFIX = '[http-adapter] ';

function createRequestResponseHandler(params) {
params = params || {};
var connectionPool = params.connectionPool;
Expand All @@ -17,7 +19,7 @@ function createRequestResponseHandler(params) {
var logger = definition.logger;

return (function(request, response) {
logger.trace('adapter.http.createRequestResponseHandler.handle');
logger.trace(LOG_PREFIX + 'createRequestResponseHandler.handle');

var options = {};
if (typeof definition.dataset != 'undefined')
Expand All @@ -27,13 +29,13 @@ function createRequestResponseHandler(params) {
options.timeout = timeout;

var callback = function(error, message) {
logger.trace('adapter.http.createRequestResponseHandler.handle.response');
logger.trace(LOG_PREFIX + 'createRequestResponseHandler.handle.response');
if (error) {
logger.trace('adapter.http.createRequestResponseHandler.handle.response.error:', error);
logger.trace(LOG_PREFIX + 'createRequestResponseHandler.handle.response.error:', error);
var body = message && message.body || null;
response.status(error).jsonp(body);
} else {
logger.trace('adapter.http.createRequestResponseHandler.handle.response.success');
logger.trace(LOG_PREFIX + 'createRequestResponseHandler.handle.response.success');
var body = message.body;
if (definition.onResponse) {
definition.onResponse(body, response);
Expand Down Expand Up @@ -96,7 +98,7 @@ function createGenericHandler(params) {
var logger = definition.logger;

return (function(request, response) {
logger.trace('adapter.http.createGenericHandler.handle');
logger.trace(LOG_PREFIX + 'createGenericHandler.handle');

var options = {};
if (typeof definition.dataset != 'undefined')
Expand Down
4 changes: 3 additions & 1 deletion lib/adapter/wrapper.js
@@ -1,3 +1,5 @@
var LOG_PREFIX = '[connection-wrapper] ';

function DroongaProtocolConnectionWrapper(connection, callback, options) { // or (connection, options)
this._connection = connection;
if (typeof callback == 'function') {
Expand Down Expand Up @@ -25,7 +27,7 @@ DroongaProtocolConnectionWrapper.prototype = {
},
emit: function(event, data, callback, options) {
if (this._connection.closed) {
this._logger.warn('connection is already closed.');
this._logger.warn(LOG_PREFIX + 'connection is already closed.');
return;
}

Expand Down
55 changes: 29 additions & 26 deletions lib/droonga-protocol/connection-pool.js
Expand Up @@ -17,6 +17,8 @@ var Connection = require('./connection').Connection;
var ConsoleLogger = require('../console-logger').ConsoleLogger;
var SerfAgent = require('../serf/agent');

var LOG_PREFIX = '[connection-pool] ';

function ConnectionPool(params) {
this._params = params || {};

Expand Down Expand Up @@ -126,16 +128,16 @@ ConnectionPool.prototype = {
},

getEnginesFromCluster: function(retryCount) {
this._logger.info('Getting engine names from cluster.');
this._logger.info(LOG_PREFIX + 'Getting engine names from cluster.');
if (this._watching)
return this.getEnginesFromClusterMember(this._serf.rpcAddress);

this._logger.info('Not watching: getting engine names from predetected member.');
this._logger.info(LOG_PREFIX + 'Not watching: getting engine names from predetected member.');
retryCount = retryCount || 0;
var hostName = this._hostNames[retryCount];
if (!hostName) {
var error = new Error('all cluster members are unaccessible.');
this._logger.error(error);
this._logger.error(LOG_PREFIX, error);
return Q.Promise.reject(error);
}

Expand All @@ -144,13 +146,14 @@ ConnectionPool.prototype = {
return engines;
})
.catch((function(error) {
this._logger.error('Failed to get the list of droonga-engine hosts from the cluster member ' + hostName + '. Retrying...');
this._logger.error(error);
this._logger.error(LOG_PREFIX + 'Failed to get the list of droonga-engine hosts ' +
'from the cluster member ' + hostName + '. Retrying...',
error);
return this.getEnginesFromCluster(retryCount + 1);
}).bind(this));
},
getEnginesFromClusterMember: function(rpcAddress) {
this._logger.info('Getting engine names from a cluster member ' + rpcAddress + '.');
this._logger.info(LOG_PREFIX + 'Getting engine names from a cluster member ' + rpcAddress + '.');
return Q.Promise((function(resolve, reject, notify) {
if (!rpcAddress)
reject(new Error('no RPC address is given'));
Expand All @@ -168,13 +171,13 @@ ConnectionPool.prototype = {
].join(' ');
exec(commandLine, (function(error, stdin, stdout) {
if (error) {
this._logger.error('express-droonga-report-live-engine-hosts:');
this._logger.error(stdin.trim());
this._logger.error(LOG_PREFIX + 'express-droonga-report-live-engine-hosts:',
stdin.trim());
return reject(error);
}
var result = JSON.parse(stdin.trim());
this._logger.trace('express-droonga-report-live-engine-hosts:');
this._logger.trace(result);
this._logger.trace(LOG_PREFIX + 'express-droonga-report-live-engine-hosts:',
result);
resolve(result.liveEngineNodes);
}).bind(this));
}).bind(this));
Expand All @@ -194,8 +197,8 @@ ConnectionPool.prototype = {
].join(' ');
exec(commandLine, (function(error, stdin, stdout) {
if (error) {
this._logger.error('express-droonga-join-to-cluster:');
this._logger.error(stdin.trim());
this._logger.error(LOG_PREFIX + 'express-droonga-join-to-cluster:',
stdin.trim());
return reject(error);
}
resolve();
Expand All @@ -207,7 +210,7 @@ ConnectionPool.prototype = {
if (this.updating)
return Q.Promise.resolve();

this._logger.info('Starting to update host names.');
this._logger.info(LOG_PREFIX + 'Starting to update host names.');
this.updating = true;
return this.getEnginesFromCluster()
.then((function(engines) {
Expand All @@ -219,15 +222,15 @@ ConnectionPool.prototype = {
// Otherwise, old droonga-engine processes become zombie.
this.closeAll();
this.hostNames = hostNames;
this._logger.info('List of droonga-engine hosts is successfully initialized from the cluster.');
this._logger.info('cluster id: '+this.clusterId);
this._logger.info(hostNames);
this._logger.info(LOG_PREFIX + 'List of droonga-engine hosts is successfully initialized from the cluster.');
this._logger.info(LOG_PREFIX + 'cluster id: '+this.clusterId);
this._logger.info(LOG_PREFIX + JSON.stringify(hostNames));
this.updating = false;
return hostNames;
}).bind(this))
.catch(function(error) {
this._logger.error('Failed to initialize the list of droonga-engine hosts from the cluster.');
this._logger.error(error);
this._logger.error(LOG_PREFIX + 'Failed to initialize the list of droonga-engine hosts from the cluster.',
error);
this.updating = false;
});
},
Expand All @@ -244,10 +247,10 @@ ConnectionPool.prototype = {
});
this._serf.start()
.then((function() {
this._logger.info('Start to watch changes in the cluster.');
this._logger.info(LOG_PREFIX + 'Start to watch changes in the cluster.');
this._watching = true;
this._serf.on('member-change', (function() {
this._logger.info('Serf cluster member is changed.');
this._logger.info(LOG_PREFIX + 'Serf cluster member is changed.');
if (this._updateHostNamesTimer)
return;
this._updateHostNamesTimer = setTimeout((function() {
Expand All @@ -263,8 +266,8 @@ ConnectionPool.prototype = {
resolve();
}).bind(this))
.catch(function(error) {
this._logger.error('Failed to start watching of changes in the cluster.');
this._logger.error(error);
this._logger.error(LOG_PREFIX + 'Failed to start watching of changes in the cluster.',
error);
reject(error);
});
}).bind(this));
Expand Down Expand Up @@ -295,13 +298,13 @@ ConnectionPool.prototype = {
},

shutdown: function() {
this._logger.info('closing all connections...');
this._logger.info(LOG_PREFIX + 'closing all connections...');
this.closeAll();
this._logger.info('done.');
this._logger.info(LOG_PREFIX + 'done.');

this._logger.info('stopping serf agent...');
this._logger.info(LOG_PREFIX + 'stopping serf agent...');
this.stopSyncHostNamesFromCluster();
this._logger.info('done.');
this._logger.info(LOG_PREFIX + 'done.');
}
};

Expand Down
28 changes: 15 additions & 13 deletions lib/droonga-protocol/connection.js
Expand Up @@ -17,6 +17,8 @@ var FluentReceiver = require('./receiver').FluentReceiver;
var ConsoleLogger = require('../console-logger').ConsoleLogger;
var NodeRole = require('../node-role');

var LOG_PREFIX = '[connection] ';

var DEFAULT_FLUENT_TAG =
Connection.DEFAULT_FLUENT_TAG =
'droonga';
Expand Down Expand Up @@ -89,7 +91,7 @@ Connection.prototype._initSender = function(wait) {
'[' + error.name + '] ' + error.message;
var ids = Object.keys(this._sendingMessages);
if (ids.length == 0) {
this._logger.error(errorMessage, error);
this._logger.error(LOG_PREFIX + errorMessage, error);
} else {
ids.forEach(function(id) {
var sendingMessage = this._sendingMessages[id];
Expand Down Expand Up @@ -131,7 +133,7 @@ Connection.prototype._initReceiver = function() {
}).bind(this));

var tag = this.tag + '.message';
this._logger.trace('Connection._initReceiver %d (%s): %d %d:',
this._logger.trace(LOG_PREFIX + 'Connection._initReceiver %d (%s): %d %d:',
this._id, this.hostAndPort,
receiver._id, this.receivePort, tag);
this._receiver = receiver;
Expand All @@ -147,14 +149,14 @@ Connection.prototype._handleMessage = function(envelope) {
var inReplyTo = envelope.inReplyTo;
if (inReplyTo) {
delete this._sendingMessages[inReplyTo];
this._logger.trace('Connection._handleMessage.reply %d (%s):',
this._logger.trace(LOG_PREFIX + 'Connection._handleMessage.reply %d (%s):',
this._id, this.hostAndPort, envelope);
var errorCode = envelope.statusCode;
if (!errorCode || isSuccess(errorCode))
errorCode = null;
this.emit('reply:' + inReplyTo, errorCode, envelope);
} else {
this._logger.trace('Connection._handleMessage.message %d (%s):',
this._logger.trace(LOG_PREFIX + 'Connection._handleMessage.message %d (%s):',
this._id, this.hostAndPort, envelope);
this.emit(envelope.type, envelope);
}
Expand Down Expand Up @@ -218,7 +220,7 @@ Connection.prototype.emitMessage = function(type, body, callback, options) {
}

if (!this.receivePort) {
this._logger.trace('Connection.emitMessage %d (%s): ' +
this._logger.trace(LOG_PREFIX + 'Connection.emitMessage %d (%s): ' +
'Receiver is not initialized yet. ' +
'Given message will be sent later.',
this._id, this.hostAndPort);
Expand All @@ -227,7 +229,7 @@ Connection.prototype.emitMessage = function(type, body, callback, options) {
}

var id = createId();
this._logger.trace('Connection.emitMessage %d (%s):',
this._logger.trace(LOG_PREFIX + 'Connection.emitMessage %d (%s):',
this._id, this.hostAndPort);
var from = this.getRouteToSelf(options);
var envelope = {
Expand All @@ -248,7 +250,7 @@ Connection.prototype.emitMessage = function(type, body, callback, options) {
if (typeof envelope.timeout == 'number')
envelope.timeout = toFloat(envelope.timeout);

this._logger.trace('emitMessage: trying to send message: ',
this._logger.trace(LOG_PREFIX + 'emitMessage: trying to send message: ',
envelope);

var sendingMessages = { type: type };
Expand All @@ -259,7 +261,7 @@ Connection.prototype.emitMessage = function(type, body, callback, options) {
var event = 'reply:' + id;
var timeoutId;
this.once(event, (function(errorCode, response) {
this._logger.trace('Connection.emitMessage.reply %d (%s):',
this._logger.trace(LOG_PREFIX + 'Connection.emitMessage.reply %d (%s):',
this._id, this.hostAndPort, errorCode);
if (timeoutId)
clearTimeout(timeoutId);
Expand All @@ -272,14 +274,14 @@ Connection.prototype.emitMessage = function(type, body, callback, options) {
callback = null;
}
catch(error) {
this._logger.error(error);
this._logger.error(LOG_PREFIX, error);
}
}).bind(this));

if (envelope.timeout > -1) {
var timeoutMilliseconds = envelope.timeout * ONE_SECOND_IN_MILLISECONDS;
timeoutId = setTimeout((function() {
this._logger.trace('Connection timed out (message id: '+id+')');
this._logger.trace(LOG_PREFIX + 'Connection timed out (message id: '+id+')');
this.removeAllListeners(event);
if (sendingMessages)
sendingMessages.callback = null;
Expand All @@ -290,7 +292,7 @@ Connection.prototype.emitMessage = function(type, body, callback, options) {
callback = null;
}
catch(error) {
this._logger.error(error);
this._logger.error(LOG_PREFIX, error);
}
}).bind(this), timeoutMilliseconds);
}
Expand All @@ -309,7 +311,7 @@ Connection.prototype.thenableEmitMessage = function(type, body, options) {
};

Connection.prototype._sendPendingMessages = function() {
this._logger.trace('Connection._sendPendingMessages %d (%s): ' +
this._logger.trace(LOG_PREFIX + 'Connection._sendPendingMessages %d (%s): ' +
'Send %d pending message(s).',
this._id, this.hostAndPort,
this._pendingMessages.length);
Expand All @@ -333,7 +335,7 @@ Connection.prototype.close = function() {
message.callback = callback = null;
}
catch(error) {
this._logger.error(error)
this._logger.error(LOG_PREFIX, error)
}
}
}, this);
Expand Down
15 changes: 9 additions & 6 deletions lib/droonga-protocol/receiver.js
Expand Up @@ -5,6 +5,9 @@ var msgpack = require('msgpack');
var Q = require('q');
var ConsoleLogger = require('../console-logger').ConsoleLogger;

var MSGPACK_PREFIX = '[msgpack-receiver] ';
var FLUENT_PREFIX = '[fluent-receiver] ';

function MsgPackReceiver(port, options) {
EventEmitter.call(this);
this.port = port || undefined;
Expand Down Expand Up @@ -38,9 +41,9 @@ MsgPackReceiver.prototype._onConnect = function(socket) {
};

MsgPackReceiver.prototype._onMessageReceive = function(data) {
this._logger.trace('MsgPackReceiver._onMessageReceive %d: start', this._id);
this._logger.trace(MSGPACK_PREFIX + '_onMessageReceive %d: start', this._id);
this.emit('receive', data);
this._logger.trace('MsgPackReceiver._onMessageReceive %d: done', this._id);
this._logger.trace(MSGPACK_PREFIX + '_onMessageReceive %d: done', this._id);
};

MsgPackReceiver.prototype.listen = function(callback) {
Expand Down Expand Up @@ -134,21 +137,21 @@ function FluentReceiver(port, options) {
util.inherits(FluentReceiver, MsgPackReceiver);

FluentReceiver.prototype._onMessageReceive = function(packet) {
this._logger.trace('FluentReceiver._onMessageReceive %d: start', this._id);
this._logger.trace(FLUENT_PREFIX + '_onMessageReceive %d: start', this._id);
MsgPackReceiver.prototype._onMessageReceive.call(this, packet);
if (packet.length == 3) { // Message type
var tag = packet[0];
var response = packet[2];
this._logger.trace('FluentReceiver._onMessageReceive.message %d', this._id, tag);
this._logger.trace(FLUENT_PREFIX + '_onMessageReceive.message %d', this._id, tag);
this.emit(tag, response);
}
else { // Forward type
this._logger.trace('FluentReceiver._onMessageReceive.forward %d', this._id, packet);
this._logger.trace(FLUENT_PREFIX + '_onMessageReceive.forward %d', this._id, packet);
packet[1].forEach(function(entry) {
this.emit(packet[0], entry[1]);
}, this);
}
this._logger.trace('FluentReceiver._onMessageReceive %d: done', this._id);
this._logger.trace(FLUENT_PREFIX + '_onMessageReceive %d: done', this._id);
};

exports.FluentReceiver = FluentReceiver;

0 comments on commit 5aab584

Please sign in to comment.