Permalink
Browse files

Fix a call stack explosion in ijod

  • Loading branch information...
1 parent 6b46fec commit 7cfb0cefd4f89b5f3b9b81de44a740358fc17620 @temas temas committed Nov 1, 2012
Showing with 81 additions and 84 deletions.
  1. +81 −84 lib/ijod.js
View
@@ -295,8 +295,8 @@ function IJOD(basePath) {
exports.IJOD = IJOD;
-IJOD.prototype.startAddTransaction = function(cbDone) {
- if (this.transactionItems) return cbDone();
+IJOD.prototype.startAddTransaction = function() {
+ if (this.transactionItems) return;
this.transactionItems = [];
this.transactionQueries = [];
/*
@@ -305,7 +305,6 @@ IJOD.prototype.startAddTransaction = function(cbDone) {
}
this.db.query("BEGIN", function(error, rows) { cbDone(); });
*/
- cbDone();
};
IJOD.prototype.commitAddTransaction = function(cbDone) {
@@ -403,31 +402,59 @@ IJOD.prototype.addData = function(arg, callback) {
qsql += ', ?';
}
var self = this;
- this.startAddTransaction(function() {
- var tmpJson = JSON.stringify(arg);
- var gzdata = zlib.compress(new Buffer(tmpJson+"\n"));
- self.transactionItems.push(gzdata);
- var offset = self.len;
- self.len += gzdata.length;
- arg.saved = Date.now(); // for pumps in the pipeline after ijod to know if it was saved
-
- memcache.replace(idr.hash(arg.idr), tmpJson, function(error, result) {
- // TODO, also replace idr2 in types?
- });
+ this.startAddTransaction();
+ var tmpJson = JSON.stringify(arg);
+ var gzdata = zlib.compress(new Buffer(tmpJson+"\n"));
+ self.transactionItems.push(gzdata);
+ var offset = self.len;
+ self.len += gzdata.length;
+ arg.saved = Date.now(); // for pumps in the pipeline after ijod to know if it was saved
+
+ memcache.replace(idr.hash(arg.idr), tmpJson, function(error, result) {
+ // TODO, also replace idr2 in types?
+ });
- var sql = "INSERT INTO Entries (base, idr, path, hash, offset, len, lat, " +
- "lng, q0, q1, q2, q3, par) VALUES (unhex(concat(rpad(?,32,'0'), " +
- "lpad(hex(?),12,'0'), substr(?,1,16))), unhex(?), ?, ?, ?, ?, ?, ?, " +
- qsql + ") ON DUPLICATE KEY UPDATE base=VALUES(base), " +
- "path=VALUES(path), hash=VALUES(hash), offset=VALUES(offset), " +
- "len=VALUES(len), lat=VALUES(lat), lng=VALUES(lng), q0=VALUES(q0), " +
- "q1=VALUES(q1), q2=VALUES(q2), q3=VALUES(q3), par=VALUES(par)";
+ var sql = "INSERT INTO Entries (base, idr, path, hash, offset, len, lat, " +
+ "lng, q0, q1, q2, q3, par) VALUES (unhex(concat(rpad(?,32,'0'), " +
+ "lpad(hex(?),12,'0'), substr(?,1,16))), unhex(?), ?, ?, ?, ?, ?, ?, " +
+ qsql + ") ON DUPLICATE KEY UPDATE base=VALUES(base), " +
+ "path=VALUES(path), hash=VALUES(hash), offset=VALUES(offset), " +
+ "len=VALUES(len), lat=VALUES(lat), lng=VALUES(lng), q0=VALUES(q0), " +
+ "q1=VALUES(q1), q2=VALUES(q2), q3=VALUES(q3), par=VALUES(par)";
+
+ self.transactionQueries.push(dal.sqlize(sql, [
+ idr.baseHash(arg.idr),
+ arg.at,
+ idr.hash(arg.idr),
+ idr.hash(arg.idr),
+ self.path,
+ hash,
+ offset,
+ (self.len - offset),
+ ll[0], ll[1],
+ qx[0], qx[1], qx[2], qx[3],
+ par
+ ]));
+
+ // if there's types, insert each of them too for filtering
+ if (!arg.data || !arg.types) return callback();
@smurthas

smurthas Nov 1, 2012

Contributor

I believe this should be a process.nextTick

+
+ // TODO: This doesn't call any async code and can be converted
+ async.forEachSeries(Object.keys(arg.types), function(type, cb) {
@smurthas

smurthas Nov 1, 2012

Contributor

Want to convert this while you're in there?

+ var i2 = idr.clone(arg.idr);
+ i2.protocol = type;
+ instruments.increment("data.types." + type).send();
+
+ if (typeof arg.types[type] === 'object' && arg.types[type].auth) {
+ // also index this with a different auth!
+ i2.auth = arg.types[type].auth;
+ }
self.transactionQueries.push(dal.sqlize(sql, [
- idr.baseHash(arg.idr),
+ idr.baseHash(i2),
arg.at,
- idr.hash(arg.idr),
- idr.hash(arg.idr),
+ idr.hash(i2),
+ idr.hash(i2),
self.path,
hash,
offset,
@@ -437,37 +464,8 @@ IJOD.prototype.addData = function(arg, callback) {
par
]));
- // if there's types, insert each of them too for filtering
- if (!arg.data || !arg.types) return callback();
-
- // TODO: This doesn't call any async code and can be converted
- async.forEachSeries(Object.keys(arg.types), function(type, cb) {
- var i2 = idr.clone(arg.idr);
- i2.protocol = type;
- instruments.increment("data.types." + type).send();
-
- if (typeof arg.types[type] === 'object' && arg.types[type].auth) {
- // also index this with a different auth!
- i2.auth = arg.types[type].auth;
- }
-
- self.transactionQueries.push(dal.sqlize(sql, [
- idr.baseHash(i2),
- arg.at,
- idr.hash(i2),
- idr.hash(i2),
- self.path,
- hash,
- offset,
- (self.len - offset),
- ll[0], ll[1],
- qx[0], qx[1], qx[2], qx[3],
- par
- ]));
-
- process.nextTick(cb);
- }, callback);
- });
+ process.nextTick(cb);
+ }, callback);
};
/// Get a single entry from an IJOD, requested by specific IDR
@@ -686,40 +684,39 @@ exports.batchSmartAdd = function(entries, callback) {
rows.forEach(function(row) {
knownIds[row.idr.toLowerCase()] = row.hash;
});
- ij.startAddTransaction(function() {
- async.forEachSeries(entries, function(entry, cb) {
- if (!entry) {
+ ij.startAddTransaction();
+ async.forEachSeries(entries, function(entry, cb) {
+ if (!entry) {
+ return process.nextTick(cb);
+ }
+ var entryIdrHash = idr.hash(entry.idr);
+ if (knownIds[entryIdrHash]) {
+ // See if we need to update
+ var hash = mmh.murmur128HexSync(JSON.stringify(entry));
+ // If the id and hashes match it's the same!
+ if (hash === knownIds[entryIdrHash]) {
return process.nextTick(cb);
+ } else {
+ entry.hash = hash;
}
- var entryIdrHash = idr.hash(entry.idr);
- if (knownIds[entryIdrHash]) {
- // See if we need to update
- var hash = mmh.murmur128HexSync(JSON.stringify(entry));
- // If the id and hashes match it's the same!
- if (hash === knownIds[entryIdrHash]) {
- return process.nextTick(cb);
- } else {
- entry.hash = hash;
- }
- }
- ij.addData(entry, cb);
- }, function(error) {
- if (error) {
- ij.abortAddTransaction(function() {
- handleError(error);
- });
+ }
+ ij.addData(entry, cb);
+ }, function(error) {
+ if (error) {
+ ij.abortAddTransaction(function() {
+ handleError(error);
+ });
- // This is OK because handleError calls callback()
- return;
- }
+ // This is OK because handleError calls callback()
+ return;
+ }
- ij.commitAddTransaction(function(error) {
- callback(error);
- });
+ ij.commitAddTransaction(function(error) {
+ callback(error);
+ });
- //console.log("Batch done: %d", (Date.now() - t));
- }); // forEachSeries(entries)
- }); // startAddTransaction
+ //console.log("Batch done: %d", (Date.now() - t));
+ }); // forEachSeries(entries)
});
};

0 comments on commit 7cfb0ce

Please sign in to comment.