Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions lib/helpers/child-process.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"use strict";
var async = require("async");

process.on('message', function(task) {
var hydrate = require(task.functionPath);
async.waterfall([
function startHydration(cb) {
cb.urlCallback = task.options.urlCallback;
cb.apiUrl = task.options.apiUrl;
hydrate(task.path, task.document, task.changes, cb);
}
],
function(err, changes) {
process.send({
err: err,
changes: changes
});
});
});
134 changes: 76 additions & 58 deletions lib/helpers/hydrater.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/

var async = require('async');
var nodeDomain = require('domain');
var shellFork = require('child_process').fork;
var request = require('supertest');
var crypto = require('crypto');
var restify = require('restify');
Expand Down Expand Up @@ -44,38 +44,35 @@ module.exports = function(hydraterFunction, logger, errLogger) {
/**
* Download the file from task.file_path, store it in a temporary file if there is file_path
*/
function initHydration(cb) {
function downloadFile(cb) {
if(task.file_path) {
// Download the file
var stream = fs.createWriteStream(path);

// Store error if statusCode !== 200
var err;
stream.on("finish", function() {
cb(err);
});

var apiUrl = url.parse(task.file_path, false, true);
var req = request(apiUrl.protocol + "//" + apiUrl.host)
.get(apiUrl.path);

req.end().req.once('response', function(res) {
if(res.statusCode !== 200) {
err = new restify.BadGatewayError('Error when downloading file ' + task.file_path + ': ' + res.statusCode);
stream.end();
this.abort();
}
});

req.pipe(stream);
request(apiUrl.protocol + "//" + apiUrl.host)
.get(apiUrl.path)
.expect(200)
.end(function(err, res) {
if(err) {
err = new restify.BadGatewayError('Error when downloading file ' + task.file_path + ': ' + err);
}
cb(err, res && res.text);
});
}
else {
cb(null);
}
},
function saveFile(res, cb) {
if(res) {
fs.writeFile(path, res, cb);
}
else {
cb();
}
},
function performHydration(cb) {
var domain = nodeDomain.create();

var child = shellFork(__dirname + '/child-process.js', {silent: true});
var stderr = "";
var stdout = "";
var timeout;
/**
* Function to call, either on domain error, on hydration error or successful hydration.
Expand All @@ -84,53 +81,73 @@ module.exports = function(hydraterFunction, logger, errLogger) {
var cleaner = function(err, changes) {
if(!cleaner.called) {
cleaner.called = true;
domain.exit();
domain.dispose();
cb(err, changes);
}
clearTimeout(timeout);
};
cleaner.called = false;

domain.on('error', cleaner);

// Run in a domain to prevent memory leak on crash
domain.run(function() {
async.waterfall([
function callHydrationFunction(cb) {
// Give user access to the final URL callback and the API url (which can be staging, prod or anything)
// In case he wants to bypass us and send the changes himself
cb.urlCallback = task.callback;
cb.apiUrl = '';
if(task.callback) {
var parsed = url.parse(task.callback);
cb.apiUrl = parsed.protocol + "//" + parsed.host;
}
child.on('error', function(exitCode) {
cleaner(new HydrationError("Wild error appeared while spawning child. Exit code:" + exitCode));
});

// Call the real function for hydration.
hydraterFunction((task.file_path) ? path : null, task.document, lib.defaultChanges(), cb);
}
], function cleanHydration(err, changes) {
// If the function replied with an "HydrationError", we'll wrap this in a nicely formatted document
// and stop the error from bubbling up.
if(err instanceof HydrationError) {
changes = {};
changes.hydration_errored = true;
changes.hydration_error = err.message;
err = null;
}
child.stderr.on('readable', function() {
var chunk;
while (null !== (chunk = child.stderr.read())) {
stderr += chunk;
}
});

child.stdout.on('readable', function() {
var chunk;
while (null !== (chunk = child.stdout.read())) {
stdout += chunk;
}
});

child.on('exit', function(errCode) {
if(errCode !== 0) {
cleaner(new HydrationError("Child exiting with err code: " + errCode + stdout + stderr));
}
});

// Build objects to send to child
var options = {};
options.urlCallback = task.callback;
options.apiUrl = '';
if(task.callback) {
var parsed = url.parse(task.callback);
options.apiUrl = parsed.protocol + "//" + parsed.host;
}

child.send({
functionPath: hydraterFunction,
path: (task.file_path) ? path : null,
document: task.document,
changes: lib.defaultChanges(),
options: options,
});

child.on('message', function(res) {
var err = res.err
// If the function replied with an "HydrationError", we'll wrap this in a nicely formatted document
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var err = res.err;
var changes = res.changes

// and stop the error from bubbling up.
if(err && err._hydrationError) {
res.changes = {};
res.changes.hydration_errored = true;
res.changes.hydration_error = res.err.message;
err = null;
}
cleaner(err, res.changes);

// Wait for nexttick, to end this function and be able to properly GC it on domain.dispose().
process.nextTick(function() {
cleaner(err, changes);
});
});
});

timeout = setTimeout(function() {
if(!cleaner.called) {
var changes = {};
changes.hydration_errored = true;
changes.hydration_error = "Task took too long.";
child.kill('SIGKILL');
cleaner(null, changes);
}
}, process.env.TIMEOUT || 60 * 1000);
Expand Down Expand Up @@ -183,6 +200,7 @@ module.exports = function(hydraterFunction, logger, errLogger) {
}

var apiUrl = url.parse(task.callback, false, true);

request(apiUrl.protocol + "//" + apiUrl.host)
.patch(apiUrl.path)
.send(changes)
Expand Down
1 change: 1 addition & 0 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,6 @@ module.exports.defaultChanges = function() {
module.exports.HydrationError = function(message) {
this.name = 'HydrationError';
this.message = (message || "").toString();
this._hydrationError = true;
};
util.inherits(module.exports.HydrationError, Error);
Loading