Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ process.on('SIGHUP', function() {
global.logger = global.log4js.getLogger();
console.log('Log files reloaded');
});

server.batch.logger.reopenFileStreams();
});

process.on('SIGTERM', function () {
Expand Down
2 changes: 1 addition & 1 deletion app/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ function App() {
var isBatchProcess = process.argv.indexOf('--no-batch') === -1;

if (global.settings.environment !== 'test' && isBatchProcess) {
app.batch = batchFactory(metadataBackend, redisConfig, statsd_client);
app.batch = batchFactory(metadataBackend, redisConfig, statsd_client, global.settings.batch_log_filename);
app.batch.start();
}

Expand Down
25 changes: 25 additions & 0 deletions batch/batch-logger.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
'use strict';

var bunyan = require('bunyan');
var fs = require('fs');

function BatchLogger (path) {
this.path = path;
this.logger = bunyan.createLogger({
name: 'batch-queries',
streams: [{
level: 'info',
stream: path ? fs.createWriteStream(path, { flags: 'a', encoding: 'utf8' }) : process.stdout
}]
});
}

module.exports = BatchLogger;

BatchLogger.prototype.log = function (job) {
return job.log(this.logger);
};

BatchLogger.prototype.reopenFileStreams = function () {
this.logger.reopenFileStreams();
};
5 changes: 4 additions & 1 deletion batch/batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ var forever = require('./util/forever');
var queue = require('queue-async');
var jobStatus = require('./job_status');

function Batch(jobSubscriber, jobQueuePool, jobRunner, jobService) {
function Batch(jobSubscriber, jobQueuePool, jobRunner, jobService, logger) {
EventEmitter.call(this);
this.jobSubscriber = jobSubscriber;
this.jobQueuePool = jobQueuePool;
this.jobRunner = jobRunner;
this.jobService = jobService;
this.logger = logger;
}
util.inherits(Batch, EventEmitter);

Expand Down Expand Up @@ -90,6 +91,8 @@ Batch.prototype._consumeJobs = function (host, queue, callback) {
debug('Job %s %s in %s', job_id, job.data.status, host);
}

self.logger.log(job);

self.emit('job:' + job.data.status, job_id);

callback();
Expand Down
7 changes: 4 additions & 3 deletions batch/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
'use strict';


var RedisPool = require('redis-mpool');
var _ = require('underscore');
var JobRunner = require('./job_runner');
Expand All @@ -14,9 +13,10 @@ var JobPublisher = require('./job_publisher');
var JobQueue = require('./job_queue');
var JobBackend = require('./job_backend');
var JobService = require('./job_service');
var BatchLogger = require('./batch-logger');
var Batch = require('./batch');

module.exports = function batchFactory (metadataBackend, redisConfig, statsdClient) {
module.exports = function batchFactory (metadataBackend, redisConfig, statsdClient, loggerPath) {
var redisPoolSubscriber = new RedisPool(_.extend(redisConfig, { name: 'batch-subscriber'}));
var redisPoolPublisher = new RedisPool(_.extend(redisConfig, { name: 'batch-publisher'}));
var queueSeeker = new QueueSeeker(metadataBackend);
Expand All @@ -30,6 +30,7 @@ module.exports = function batchFactory (metadataBackend, redisConfig, statsdClie
var jobCanceller = new JobCanceller(userDatabaseMetadataService);
var jobService = new JobService(jobBackend, jobCanceller);
var jobRunner = new JobRunner(jobService, jobQueue, queryRunner, statsdClient);
var logger = new BatchLogger(loggerPath);

return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService);
return new Batch(jobSubscriber, jobQueuePool, jobRunner, jobService, logger);
};
4 changes: 4 additions & 0 deletions batch/models/job_base.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,7 @@ JobBase.prototype.serialize = function () {

return data;
};

JobBase.prototype.log = function(/*logger*/) {
return false;
};
68 changes: 68 additions & 0 deletions batch/models/job_fallback.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ var QueryFallback = require('./query/query_fallback');
var MainFallback = require('./query/main_fallback');
var QueryFactory = require('./query/query_factory');

var JobUtils = require('./job_state_machine');
var jobUtils = new JobUtils();

function JobFallback(jobDefinition) {
JobBase.call(this, jobDefinition);

Expand Down Expand Up @@ -206,3 +209,68 @@ JobFallback.prototype.getLastFinishedStatus = function () {
return this.isFinalStatus(status) ? status : lastFinished;
}.bind(this), jobStatus.DONE);
};

JobFallback.prototype.log = function(logger) {
if (!isFinished(this)) {
return false;
}

var queries = this.data.query.query;

for (var i = 0; i < queries.length; i++) {
var query = queries[i];

var output = {
time: query.started_at,
endtime: query.ended_at,
username: this.data.user,
job: this.data.job_id,
elapsed: elapsedTime(query.started_at, query.ended_at)
};

var queryId = query.id;

if (queryId) {
output.query_id = queryId;

var node = parseQueryId(queryId);
if (node) {
output.analysis = node.analysisId;
output.node = node.nodeId;
output.type = node.nodeType;
}
}

logger.info(output);
}

return true;
};

function isFinished (job) {
return jobUtils.isFinalStatus(job.data.status) &&
(!job.data.fallback_status || jobUtils.isFinalStatus(job.data.fallback_status));
}

function parseQueryId (queryId) {
var data = queryId.split(':');

if (data.length === 3) {
return {
analysisId: data[0],
nodeId: data[1],
nodeType: data[2]
};
}
return null;
}

function elapsedTime (started_at, ended_at) {
if (!started_at || !ended_at) {
return;
}

var start = new Date(started_at);
var end = new Date(ended_at);
return end.getTime() - start.getTime();
}
1 change: 1 addition & 0 deletions config/environments/development.js.example
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ module.exports.db_host = 'localhost';
module.exports.db_port = '5432';
module.exports.db_batch_port = '5432';
module.exports.finished_jobs_ttl_in_seconds = 2 * 3600; // 2 hours
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.
// NOTE: not used by OGR-mediated accesses
Expand Down
1 change: 1 addition & 0 deletions config/environments/production.js.example
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module.exports.db_host = 'localhost';
module.exports.db_port = '6432';
module.exports.db_batch_port = '5432';
module.exports.finished_jobs_ttl_in_seconds = 2 * 3600; // 2 hours
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.i
// NOTE: not used by OGR-mediated accesses
Expand Down
1 change: 1 addition & 0 deletions config/environments/staging.js.example
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module.exports.db_host = 'localhost';
module.exports.db_port = '6432';
module.exports.db_batch_port = '5432';
module.exports.finished_jobs_ttl_in_seconds = 2 * 3600; // 2 hours
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.
// NOTE: not used by OGR-mediated accesses
Expand Down
1 change: 1 addition & 0 deletions config/environments/test.js.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ module.exports.db_host = 'localhost';
module.exports.db_port = '5432';
module.exports.db_batch_port = '5432';
module.exports.finished_jobs_ttl_in_seconds = 2 * 3600; // 2 hours
module.exports.batch_log_filename = 'logs/batch-queries.log';
// Max database connections in the pool
// Subsequent connections will wait for a free slot.
// NOTE: not used by OGR-mediated accesses
Expand Down
135 changes: 130 additions & 5 deletions npm-shrinkwrap.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"Sandro Santilli <strk@vizzuality.com>"
],
"dependencies": {
"bunyan": "1.8.1",
"cartodb-psql": "~0.6.0",
"cartodb-query-tables": "0.2.0",
"cartodb-redis": "0.13.1",
Expand Down