Skip to content

Commit

Permalink
better solution for skipping notifications
Browse files Browse the repository at this point in the history
The previous solution of cloning was a bad one. Apparently cloning
wastes CPU like crazy. For big objects it takes seconds!
The new solution keeps track of files hashes and skips notifications if
file content did not change.
There are also no need for notifications from the publishing path. Those
are called only upon file load.
The tests are also had a flaw. Rebus instances should be different
(disable signleton mode) to check how changes are go from one rebus
instance to another.
  • Loading branch information
yosefd committed Jul 30, 2013
1 parent 60ed7df commit 4e09f4f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 43 deletions.
68 changes: 34 additions & 34 deletions lib/rebus.js
@@ -1,8 +1,7 @@
var path = require('path');
var fs = require('fs');
var async = require('async');
var deepEqual = require('deep-equal');
var clone = require('clone');
var crypto = require('crypto');
var syncasyncFacade = require('ypatterns').syncasyncFacade;

// Suffix for all published objects.
Expand Down Expand Up @@ -80,6 +79,8 @@ module.exports = function (folder, options, callback) {
var closed = false;
// The state of the loder. Keeps files that are not loaded successfully.
var loader = { errors: {} };
// Files hashes
var hashes = [];

// Create facade for rebus factory, which creates and initializes rebus instance.
var instance = syncasyncFacade({ create: createInstance, initializeAsync: initializeAsync, initializeSync: initializeSync }, callback);
Expand All @@ -92,7 +93,7 @@ module.exports = function (folder, options, callback) {

function createInstance() {
var instance = { publish: publish, subscribe: subscribe, close: close };
instance.__defineGetter__("value", function () { return clone(shared); });
instance.__defineGetter__("value", function () { return shared; });
return instance;
}

Expand Down Expand Up @@ -148,20 +149,23 @@ module.exports = function (folder, options, callback) {
if (!prop || typeof prop !== 'string' || prop.length < 1) {
throw new Error('invalid property path');
}
if (_checkEqual(prop, obj)) {
// There is no change. Skip this publish.
return callback();
}
// Write the object to the separate file.
var shortname = prop + '.json';
var fullname = path.join(folder, shortname);
fs.writeFile(fullname, JSON.stringify(obj), function (err) {
var data = JSON.stringify(obj);
if (!_checkNewHash(fullname, data)) {
// No need to publish is the data is the same.
return callback();
}
fs.writeFile(fullname, data, function (err) {
if (err) {
console.error('Failed to write file ' + fullname + ' err:', err);
}
else {
// Update the publisher instance before completion.
_loadObject(shortname, obj);
// Notification are not called from this update, but from the trigger of the chagned
// file
_loadObject(shortname, obj, { notify: false });
}
callback(err);
});
Expand Down Expand Up @@ -210,21 +214,15 @@ module.exports = function (folder, options, callback) {
// Private functions.
*/

// Check if the object inserted is equal to the exisitng object.
function _checkEqual(prop, obj) {
var props = prop.split('.');
var existing = shared;
props.forEach(function(p) {
if (existing) {
existing = existing[p];
}
});
if (existing) {
if (deepEqual(existing, obj)) {
return true;
}
// Check if file is differrent. Return the new hash if it is.
function _checkNewHash(filename, data) {
var shasum = crypto.createHash('sha1');
shasum.update(data);
var hash = shasum.digest('hex');
if (hash === hashes[filename]) {
return null;
}
return false;
return hash;
}

// Store the instance of rebus per process to be
Expand Down Expand Up @@ -315,15 +313,21 @@ module.exports = function (folder, options, callback) {
}

function _loadData(filename, data) {
var hash = _checkNewHash(filename, data);
if (!hash) {
// Skip loading if the file did not change.
return;
}
var obj = JSON.parse(data);
hashes[filename] = hash;
_loadObject(filename, obj);
}

function _loadObject(filename, obj) {
function _loadObject(filename, obj, options) {
var props = filename.split('.');
// Don't count suffix (.json).
props.pop();
_traverse(props, obj, null);
_traverse(props, obj, null, options);
}

// Traverse the shared object according to property path.
Expand All @@ -334,8 +338,9 @@ module.exports = function (folder, options, callback) {
// notification - if defined, pin the notification at the end of the specified path.
// Returns - if called with notification, returns the handler with information where the notification was pinned, so can be
// unpinned later.
function _traverse(props, obj, notification) {
function _traverse(props, obj, notification, options) {

options = options || { notify: true };
var length = props.length;
var refobj = shared;
var refmeta = meta;
Expand All @@ -361,11 +366,6 @@ module.exports = function (folder, options, callback) {
if (i === (length - 1)) {
// The end of the path.
if (obj) {
// Skip object update if nothing changed.
if (deepEqual(currentobj, obj)) {
// No need to call all the notifications along the path.
return handler;
}
// Pin the object here.
refobj[prop] = obj;
// Since object changed, append all notifications in the subtree.
Expand All @@ -382,7 +382,7 @@ module.exports = function (folder, options, callback) {
// Call notification in the next tick, so that return value from subsribtion
// will be available.
process.nextTick(function () {
notification(clone(currentobj));
notification(currentobj);
});
}
}
Expand All @@ -396,7 +396,7 @@ module.exports = function (folder, options, callback) {
refmeta = currentmeta;
}

if (obj) {
if (obj && options.notify) {
// Call all notifications.
async.parallel(fns);
}
Expand Down Expand Up @@ -424,7 +424,7 @@ module.exports = function (folder, options, callback) {
for (var id in meta[nfs]) {
fns.push(function (i) {
return function () {
meta[nfs][i](clone(obj));
meta[nfs][i](obj);
}
} (id));
}
Expand Down
4 changes: 1 addition & 3 deletions package.json
Expand Up @@ -4,7 +4,7 @@
"main": "./main",
"bin": {},
"author": "yosefd <yosefd@microsoft.com>",
"version": "0.4.6",
"version": "0.4.7",
"license": "MIT",
"contributors": [
"Yosef Dinerstein <yosefd@microsoft.com>",
Expand All @@ -26,8 +26,6 @@
},
"dependencies": {
"async": "0.1.x",
"deep-equal": "0.0.x",
"clone": "0.1.x",
"ypatterns": "0.2.x"
},
"devDependencies": {
Expand Down
12 changes: 6 additions & 6 deletions test/test.js
Expand Up @@ -214,7 +214,7 @@ module.exports = testCase({

publishWithoutChange: function (test) {
var self = this;
var rebus1 = rebus(self.folder, function (err) {
var rebus1 = rebus(self.folder, { singletons: false }, function (err) {
test.ok(!err, 'failed to start empty instance');
test.ok(rebus1, 'got the 1st rebus instance');
var count1 = 0;
Expand All @@ -229,7 +229,7 @@ module.exports = testCase({
count1++;
}
});
var rebus2 = rebus(self.folder, function (err) {
var rebus2 = rebus(self.folder, { singletons: false }, function (err) {
test.ok(!err, 'failed to start empty instance');
test.ok(rebus2, 'got the 2nd rebus instance');
rebus2.subscribe('a.c', function (obj) {
Expand All @@ -238,7 +238,7 @@ module.exports = testCase({
}
});
});
var rebus3 = rebus(self.folder, function (err) {
var rebus3 = rebus(self.folder, { singletons: false }, function (err) {
test.ok(!err, 'failed to start empty instance');
test.ok(rebus3, 'got the 3rd rebus instance');
rebus3.subscribe('a', function (obj) {
Expand All @@ -263,9 +263,9 @@ module.exports = testCase({
});
},

modifyRebusObject: function (test) {
modifyObject: function (test) {
var self = this;
var rebus1 = rebus(self.folder, function (err) {
var rebus1 = rebus(self.folder, { singletons: false }, function (err) {
test.ok(!err, 'failed to start empty instance');
test.ok(rebus1, 'got the 1st rebus instance');
var r1gotb = false;
Expand Down Expand Up @@ -298,7 +298,7 @@ module.exports = testCase({
}
});

var rebus2 = rebus(self.folder, function (err) {
var rebus2 = rebus(self.folder, { singletons: false }, function (err) {
test.ok(!err, 'failed to start empty instance');
test.ok(rebus2, 'got the 2nd rebus instance');
rebus2.subscribe('a.c', function (obj) {
Expand Down

0 comments on commit 4e09f4f

Please sign in to comment.