Skip to content

Commit

Permalink
Merge branch 'pipelining'
Browse files Browse the repository at this point in the history
  • Loading branch information
Swizec committed Dec 19, 2010
2 parents 7183daa + 388aaa0 commit 1ce2ca8
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 50 deletions.
2 changes: 1 addition & 1 deletion boss.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ exports.push = function (task, queue) {

redis.rpush("rapid.queue:"+queue, JSON.stringify(task), function () {
redis.llen("rapid.queue:"+queue, function (err, len) {
if (len < 5)
//if (len < 5)
notify.publish("rapid.queue:"+queue+":pub", "task!");
});
});
Expand Down
2 changes: 1 addition & 1 deletion settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ exports.lockfile = '/tmp/rapid.queue.foreman.lock';
exports.path = '/home/swizec/Documents/preona-code/Plateboiler/rapid.queue';

exports.workers = [
{n: 1, queue: 'scraping'}
{n: 5, queue: 'scraping'}
]

exports.worker_mapping = {
Expand Down
119 changes: 71 additions & 48 deletions worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ var redis = require("redis").createClient(),
settings = require('./settings'),
logging = require('./logging'),
request = require('request'),
urllib = require('url');
urllib = require('url'),
async = require('async');

exports.listen = function (queue, worker, callback) {
var BUSY = false;
Expand All @@ -17,7 +18,11 @@ exports.listen = function (queue, worker, callback) {
var recurse = function () {
var inner_recurse = function () {
BUSY = false;
process.nextTick(inner_worker);
redis.llen("rapid.queue:"+queue, function (err, len) {
if (len > 0) {
process.nextTick(inner_worker);
}
});
}

if (redis.command_queue.length > 0) {
Expand All @@ -27,56 +32,74 @@ exports.listen = function (queue, worker, callback) {
}
};

redis.lpop("rapid.queue:"+queue, function (err, task) {
if (!err && task) {
task = JSON.parse(task+"");

logging.info("Started task "+task.id);

var notify_client = function (result) {
task.result = result;
try {
var url = urllib.parse(task.callback);
var body = JSON.stringify(task);

var client = http.createClient(url.port || 80, url.hostname);
var request = client.request('POST',
url.pathname || '/',
{'host': url.hostname,
'Content-Length': body.length});
request.write(body);
request.end();
request.on("response", function (response) {
if (response.statusCode != 200) {
logging.warning("Client responded with error "+task.id);
}else{
logging.info("Served task "+task.id);
}
recurse();
});
}catch (e) {
logging.warning("Client callback unreachable "+task.id);
recurse();
}
};

var execute = function (tasks, callback) {
var notify_client = function (task, callback) {
try {
worker(task, notify_client);
var url = urllib.parse(task.callback);
var body = JSON.stringify(task);

var client = http.createClient(url.port || 80, url.hostname);
var request = client.request('POST',
url.pathname || '/',
{'host': url.hostname,
'Content-Length': body.length});
request.write(body);
request.end();
request.on("response", function (response) {
if (response.statusCode != 200) {
logging.warning("Client responded with error "+task.id);
}else{
logging.info("Served task "+task.id);
}

callback(null, 'meow');
});
}catch (e) {
if (settings.notify_errors) {
var mail = require('mail').Mail(settings.mail.smtp);
mail.message(settings.mail.message)
.body("Task: "+JSON.stringify(task)+"\n\n\n"+e.message+"\n\n"+e.stack)
.send(function(err) {});
logging.warning("Client callback unreachable "+task.id);
callback(null, 'meow');
}
};

var callbacks = 0;
worker(tasks, function (result) {
callbacks++;
notify_client(result, function () {
if (callbacks >= tasks.length) {
callback();
}
});
});
}

logging.warning("Error serving task "+task.id);
notify_client("ERROR: Big fail\n\nMessage: "+e.message+"\nStack: "+e.stack);
}
}else{
setTimeout(recurse, 500);
}
});
async.mapSeries([1,2,3,4,5],
function (bla, callback) {
redis.lpop("rapid.queue:"+queue, function (err, task) {
if (task) {
task = JSON.parse(task+"");

logging.info("Started task "+task.id);

callback(null, task);
}else{
callback(null, null);
}
});
},
function (err, tasks) {
if (!err) {
var tmp = [], i=0;
for (; i < tasks.length; i++) {
if (tasks[i]) tmp[i] = tasks[i];
}
if (tmp.length > 0) {
execute(tmp, function () {
recurse();
})
}else {
recurse();
}
}
});
}

listener.subscribe("rapid.queue:"+queue+":pub");
Expand Down

0 comments on commit 1ce2ca8

Please sign in to comment.