Permalink
Browse files

Fix push for the IJOD changes

  • Loading branch information...
temas committed Mar 16, 2012
1 parent 2ca8e03 commit 9bcf158f782c6ed316ab83c393d9084debc4d750
Showing with 25 additions and 15 deletions.
  1. +4 −3 Common/node/ijod.js
  2. +10 −5 Common/node/lpushmanager.js
  3. +3 −1 Common/node/lsyncmanager.js
  4. +1 −0 Ops/webservice-push.js
  5. +1 −1 package.json
  6. +6 −5 tests/lpushmanager-test-local.js
View
@@ -231,7 +231,7 @@ IJOD.prototype.batchSmartAdd = function(entries, callback) {
}
function handleError(msg) {
- console.error("Error: %s", msg);
+ console.error("Batch smart add error: %s", msg);
self.abortAddTransaction(function() {
setTimeout(function() {
self.batchSmartAdd(entries, callback);
@@ -246,14 +246,15 @@ IJOD.prototype.batchSmartAdd = function(entries, callback) {
async.forEachSeries(entries, function(entry, cb) {
if (!entry) return cb();
stmt.bind(1, entry.id, function(err) {
- if (err) return handleError(err);
+ if (err) return cb(err);
stmt.step(function(error, row) {
- if (error) return handleError(error);
+ if (error) return cb(error);
stmt.reset();
cb();
});
});
}, function(error) {
+ if (error) return handleError(error);
self.db.execute("COMMIT TRANSACTION", function(error, rows) {
if (error) return handleError(error);
stmt.finalize(function(error) {
@@ -52,7 +52,9 @@ function getIJOD(dataset, create, callback) {
// only load if one exists or create flag is set
fs.stat(name+".db", function(err, stat){
if(!stat && !create) return callback();
- config.ijods[dataset] = new IJOD({name:name}, function(err, ij){
+ var ij = new IJOD({name:name})
+ config.ijods[dataset] = ij;
+ ij.open(function(err) {
if(err) logger.error(err);
return callback(ij);
});
@@ -83,6 +85,7 @@ function processData (deleteIDs, data, dataset, callback) {
lutil.atomicWriteFileSync(path.join(lconfig.lockerDir, lconfig.me, "push", 'push_config.json'),
JSON.stringify(config, null, 4));
}
+ // TODO: Explicitly close
getIJOD(dataset, true, function(ijod){
if (deleteIDs && deleteIDs.length > 0 && data) {
addData(dataset, data, ijod, function(err) {
@@ -136,13 +139,15 @@ function addData (dataset, data, ijod, callback) {
} else {
cb();
}
- }, 5);
- data.forEach(function(d){ q.push(d, errs.push); }); // hehe fun
+ }, 1);
+ ijod.startAddTransaction(function(err) {
+ data.forEach(function(d){ q.push(d, errs.push); }); // hehe fun
+ });
q.drain = function() {
if (errs.length > 0) {
- callback(errs);
+ ijod.abortAddTransaction(function() { callback(errs); });
} else {
- callback();
+ ijod.commitAddTransaction(callback);
}
};
}
@@ -421,7 +421,7 @@ function processResponse(deleteIDs, info, synclet, response, callback) {
stats.increment('synclet.' + info.id + '.' + synclet.name + '.added', synclet.added);
stats.increment('synclet.' + info.id + '.' + synclet.name + '.updated', synclet.updated);
stats.increment('synclet.' + info.id + '.' + synclet.name + '.deleted', synclet.deleted);
- stats.increment('synclet.' + info.id + '.' + synclet.name + '.length', dataKeys.reduce(function(prev, cur, idx, arr) { console.log(cur); return prev + response.data[cur].length; }, 0));
+ stats.increment('synclet.' + info.id + '.' + synclet.name + '.length', dataKeys.reduce(function(prev, cur, idx, arr) { return prev + response.data[cur].length; }, 0));
logger.info("total of "+synclet.added+"+"+synclet.updated+"+"+synclet.deleted+" and threshold "+threshold+" so setting tolerance to "+synclet.tolMax);
callback(err);
});
@@ -430,6 +430,7 @@ function processResponse(deleteIDs, info, synclet, response, callback) {
// simple async friendly wrapper
function getIJOD(id, key, create, callback) {
var name = path.join(lconfig.lockerDir, lconfig.me, id, key);
+ console.log("Open IJOD %s", name);
if(synclets.ijods[name]) return callback(synclets.ijods[name]);
// only load if one exists or create flag is set
fs.stat(name+".db", function(err, stat){
@@ -446,6 +447,7 @@ exports.getIJOD = getIJOD;
function closeIJOD(id, key, callback) {
var name = path.join(lconfig.lockerDir, lconfig.me, id, key);
+ console.log("Close IJOD %s", name);
if (synclets.ijods[name]) {
synclets.ijods[name].close(callback);
} else {
View
@@ -27,6 +27,7 @@ module.exports = function(locker) {
});
locker.get('/push/:dataset/getCurrent', function(req, res) {
+ console.log("push current for %s", req.params.dataset);
pushManager.getIJOD(req.params.dataset, false, function(ijod) {
if(!ijod) return res.send("not found",404);
ijod.reqCurrent(req, res);
View
@@ -43,7 +43,7 @@
"compress-buffer": "=0.5.1",
"mongodb": "=0.9.8-1",
"winston": "=0.5.2",
- "sqlite-fts": "=0.0.8",
+ "sqlite-fts": "=0.0.9",
"socket.io": "=0.8.5",
"socket.io-client": "=0.8.4",
"jade": "= 0.20.0",
@@ -84,13 +84,14 @@ vows.describe("Push Manager").addBatch({
}).addBatch({
"Querying the data API returns the data" : {
topic: function() {
- request.get({uri : "http://localhost:8043/push/testing/getCurrent"}, this.callback)
+ request.get({uri : "http://localhost:8043/push/testing/getCurrent?stream=true"}, this.callback)
},
"from testSync" : function(err, resp, body) {
- var data = JSON.parse(body);
- obj = data[0];
- assert.equal(data[0].id, 500);
- assert.equal(data[0].someData, 'BAM');
+ var parts = body.split("\n");
+ var data = JSON.parse(parts[0]);
+ obj = data;
+ assert.equal(data.id, 500);
+ assert.equal(data.someData, 'BAM');
}
}
}).addBatch({

0 comments on commit 9bcf158

Please sign in to comment.