Skip to content
This repository has been archived by the owner on Jan 7, 2018. It is now read-only.

Commit

Permalink
Rework log indexing to use ElasticSearch bulk indexing.
Browse files Browse the repository at this point in the history
This allows the elasticsearch indexing to happen much more quickly and
more efficiently.

I'd still like to revisit a lot of this logging stuff, but for now, this
seems like the quickest way to improve the logging so it can recover
more quickly from a backlog, while still keeping our basic logging
the same.
  • Loading branch information
GUI committed May 23, 2015
1 parent 5c63c3d commit 9a1a372
Showing 1 changed file with 117 additions and 40 deletions.
157 changes: 117 additions & 40 deletions lib/log_processor/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ _.extend(Worker.prototype, {
this.connectRedis.bind(this),
this.connectElasticsearch.bind(this),
this.connectBeanstalk.bind(this),
this.connectBeanstalkDestroyer.bind(this),
], this.handleConnections.bind(this));
},

Expand Down Expand Up @@ -72,36 +73,66 @@ _.extend(Worker.prototype, {
}.bind(this));
},

connectBeanstalkDestroyer: function(asyncReadyCallback) {
// Establish a separate connection for running our destroy commands, since
// they run separately from the normal reserve flow (they are only
// destroyed after indexing, which happens asynchronously so we can bulk
// index items).
beanstalkConnect(function(error, client) {
if(!error) {
this.beanstalkDestroyer = client;
}

asyncReadyCallback(error);
}.bind(this));
},

handleConnections: function(error) {
if(error) {
logger.error({ err: error }, 'Log processor worker connections error');
process.exit(1);
return false;
}

// Create an async cargo queue to batch up the logs into groups of 250
// records to utilize the ElasticSearch bulk indexing (rather than indexing
// each record individually).
this.cargo = async.cargo(this.processCargoTasks.bind(this), 250);

// Every 5 seconds, call the cargo processor explicitly, to flush any
// records (in case we don't hit the 250 size immediately).
setInterval(this.cargo.process.bind(this.cargo), 5000);

this.reserveJob();
this.emit('ready');
},

reserveJob: function() {
this.beanstalk.reserve(function(error, jobId, payload) {
if(error) {
logger.error({ err: error }, 'beanstalk reserve error');
setImmediate(this.reserveJob.bind(this));
return false;
}

var logId = payload.toString();
this.processLog(jobId, logId, function(error) {
// Don't queue up any more local jobs if there's already 1000 items in the
// local cargo queue.
async.until(function() {
return this.cargo.tasks.length < 1000;
}.bind(this), function(callback) {
setTimeout(callback, 500);
}, function() {
this.beanstalk.reserve(function(error, jobId, payload) {
if(error) {
this.rescheduleFailedJob(error, jobId, logId, function() {
this.reserveJob();
}.bind(this));
} else {
this.reserveJob();
logger.error({ err: error }, 'beanstalk reserve error');
setImmediate(this.reserveJob.bind(this));
return false;
}
}.bind(this));

var logId = payload.toString();
this.processLog(jobId, logId, function(error) {
if(error) {
this.rescheduleFailedJob(error, jobId, logId, function() {
this.reserveJob();
}.bind(this));
} else {
this.reserveJob();
}
}.bind(this));
}.bind(this));
}.bind(this));
},

Expand All @@ -112,9 +143,8 @@ _.extend(Worker.prototype, {
this.parseLogData.bind(this, logId),
this.checkForIncompleteLogs.bind(this, jobId, logId),
this.cleanLogData.bind(this, logId),
this.indexLog.bind(this, logId),
this.deleteLogData.bind(this, logId),
this.deleteJob.bind(this, jobId),
this.cargoQueueData.bind(this, jobId, logId),
this.finishJob.bind(this, jobId),
], callback);
},

Expand Down Expand Up @@ -319,37 +349,84 @@ _.extend(Worker.prototype, {
}
},

indexLog: function(id, log, callback) {
logger.debug({ id: id }, 'Log Processor: indexLog');
var index = 'api-umbrella-logs-write-' + moment(log.request_at).utc().format('YYYY-MM');
this.elasticSearch.index({
index: index,
type: 'log',
id: id,
body: log,
}, function(error) {
callback(error);
});
},

deleteLogData: function(id, callback) {
logger.debug({ id: id }, 'Log Processor: deleteLogData');
this.redis.del('log:' + id, function(error) {
callback(error);
});
cargoQueueData: function(jobId, id, log, callback) {
this.cargo.push({ jobId: jobId, id: id, log: log });
callback();
},

deleteJob: function(jobId, callback) {
logger.debug({ jobId: jobId }, 'Log Processor: deleteJob');
this.beanstalk.destroy(jobId, function(error) {
finishJob: function(jobId, callback) {
// After queueing up a local cargo job (for bulk indexing), release this
// job so that it can be deleted once the indexing completes (in beanstalk
// jobs appear like they have to be released in order to delete). Schedule
// with a 10 minute delay, so this job should usually be deleted after the
// bulk indexing completes, but if anything goes wrong in indexing, it will
// be retried again.
logger.debug({ jobId: jobId }, 'Log Processor: finishJob');
var priority = 0;
var delay = 600;
this.beanstalk.release(jobId, priority, delay, function(error) {
if(error) {
logger.error({ err: error }, 'beanstalk destroy error');
logger.error({ err: error }, 'beanstalk release error');
}

callback();
});
},

processCargoTasks: function(tasks, callback) {
var bulkInserts = [];
var ids = [];
var multi = this.redis.multi();
for(var i = 0, len = tasks.length; i < len; i++) {
var task = tasks[i];
var id = task.id;
var log = task.log;
var index = 'api-umbrella-logs-write-' + moment(log.request_at).utc().format('YYYY-MM');

ids.push('log:' + id);

bulkInserts.push({
index: {
_index: index,
_type: 'log',
_id: id,
},
});
bulkInserts.push(log);

multi.del('log:' + id);
}

if(bulkInserts.length > 0) {
logger.debug({ ids: ids }, 'Log Processor: Bulk inserting');
this.elasticSearch.bulk({ body: bulkInserts }, function(error) {
if(error) {
logger.error({ err: error }, 'elasticsearch bulk index error');
return callback();
}

logger.debug({ ids: ids }, 'Log Processor: Bulk deleting');
multi.exec(function(error) {
if(error) {
logger.error({ err: error }, 'redis bulk delete error');
return callback();
}

logger.debug({ ids: ids }, 'Log Processor: Deleting beanstalk jobs');
async.eachSeries(tasks, function(task, eachCallback) {
this.beanstalkDestroyer.destroy(task.jobId, function(error) {
if(error) {
logger.error({ err: error, id: task.id, jobId: task.jobId }, 'beanstalk destroy error');
}

eachCallback();
}.bind(this));
}.bind(this), callback);
}.bind(this));
}.bind(this));
}
},

getFailedJobAttempts: function(jobId, logId, callback) {
this.beanstalk.stats_job(jobId, function(error, stats) {
if(error || !stats || (typeof stats.releases) !== 'number') {
Expand Down

0 comments on commit 9a1a372

Please sign in to comment.