Permalink
Browse files

1. Fix to really reuse already created singleton.

2. Change default to set persistent listeners that require explicit cleanup calls.
3. Handle errors while initial loading. Wait for successfull load of all files.
4. Test for incomplete files.
  • Loading branch information...
1 parent c9c852a commit 573097d1c28a1d915bedb3a6d9e46d4b69acc190 @yosefd yosefd committed Apr 2, 2012
Showing with 81 additions and 13 deletions.
  1. +9 −2 README.md
  2. +38 −8 lib/rebus.js
  3. +1 −1 package.json
  4. +33 −2 test/test.js
View
@@ -30,7 +30,7 @@ var bus = rebus(directoryName, function(err) {
// The bus is initialized and includes current shared state.
console.log('the entire shared state is:', bus.value);
// Can start listening on changes for a particular object.
- bus.subscribe('some.name.space.x.y.z', function(obj) {
+ var notification = bus.subscribe('some.name.space.x.y.z', function(obj) {
// Got notification about object being changed by some publisher.
console.log('some.name.space.x.y.z changed and its value is:', obj);
console.log(
@@ -47,6 +47,9 @@ var bus = rebus(directoryName, function(err) {
'published some other object and its value now:',
bus.value.some.other.name.space);
});
+ // Cleanup
+ notification.close();
+ bus.close();
});
```
Rebus can be instantiated and used synchronously:
@@ -59,7 +62,11 @@ console.log('the entire shared state is:', bus.value);
// Write.
bus.publish('x', { ... });
// bus.value.x is not necessary the one assigned as not used in publish
-// completion. However, it is still can be used and it includes some value.
+// completion. However, it is still can be used and it includes some value
+// that was in x, before or after assignment.
+console.log('the value of x now:', bus.value.x);
+// Cleanup
+bus.close();
```
Note: using rebus value or function before instantiation completion works,
View
@@ -10,7 +10,7 @@ var nfs = 'yosefsaysthiscannotbepropertyname';
// Rebus instances created for folders.
// Keep them global to have only one instance per process.
-process.rebusinstances = {};
+process.rebusinstances = process.rebusinstances || {};
// Used cached on process rebus instances only if they bare the same version.
var rebusversion = 4;
@@ -32,7 +32,7 @@ module.exports = function (folder, options, callback) {
options = null;
}
callback = callback || function () { }
- options = options || { persistent: false, singletons: true }
+ options = options || { persistent: true, singletons: true }
if (typeof callback !== 'function') {
throw new Error('invalid callback');
}
@@ -45,6 +45,7 @@ module.exports = function (folder, options, callback) {
if (options.singletons) {
var singleton = process.rebusinstances[folder];
if (singleton) {
+ console.log('This process already has rebus instance on folder ' + folder);
// If not the same vesion, go and create new rebus instance.
if (singleton.version === rebusversion) {
// Call completion after return value is available.
@@ -75,6 +76,8 @@ module.exports = function (folder, options, callback) {
var freeId = 0;
// Flag used to close rebus instance only once.
var closed = false;
+ // The state of the loder. Keeps files that are not loaded successfully.
+ var loader = { errors: {} };
// Create facade for rebus factory, which creates and initializes rebus instance.
var instance = syncasyncFacade({ create: createInstance, initializeAsync: initializeAsync, initializeSync: initializeSync }, callback);
@@ -100,12 +103,19 @@ module.exports = function (folder, options, callback) {
return;
}
- async.forEach(files, _loadFile, function (err) {
- if (err) {
- console.warn('Loading all files was not smooth, err:', err);
- }
- // Regardless errors, start watching the directory.
+ async.forEach(files, _loadFile, function () {
_startWatchdog();
+ var countErrors = Object.keys(loader.errors).length;
+ if (countErrors > 0) {
+ // Several files failed to load. As they might be updated by other process, wait for change events
+ // until all files are loaded.
+ console.warn('Loading ' + countErrors + ' files was not smooth, waiting for updates');
+ // Store callback to call once all files are loaded.
+ loader.callback = callback;
+ return;
+ }
+ // No errors during loading, all files are loaded.
+ loader = null;
_updateSingleton();
// Asynchronous initialization is completed.
callback();
@@ -247,9 +257,29 @@ module.exports = function (folder, options, callback) {
console.info('Object ' + filename + ' was not yet fully written, exception:', e);
// There will be another notification of change when the last write to file is completed.
// Meanwhile leave the previous value.
- callback(e);
+ if (loader) {
+ // Store this error to wait until file will be successfully loaded for the 1st time.
+ loader.errors[filename] = e;
+ }
+ // Don't return error to continue asynchronous loading of other files. Errors are assembled on loader.
+ callback();
return;
}
+ console.log('Loaded ' + filename);
+ if (loader) {
+ if (loader.errors[filename]) {
+ // File that previously failed to load, now is loaded.
+ delete loader.errors[filename];
+ var countErrors = Object.keys(loader.errors).length;
+ if (countErrors === 0) {
+ // All errors are fixed. This is the time to complete loading.
+ var initcb = loader.callback;
+ loader = null;
+ _updateSingleton();
+ initcb();
+ }
+ }
+ }
callback();
});
}
View
@@ -4,7 +4,7 @@
"main": "./main",
"bin": {},
"author": "anode <anode@microsoft.com>",
- "version": "0.4.3",
+ "version": "0.4.4",
"license": "MIT",
"contributors": [
"Yosef Dinerstein <yosefd@microsoft.com>",
View
@@ -2,6 +2,7 @@
var path = require('path');
var rimraf = require('rimraf');
var mkdirp = require('mkdirp');
+var fs = require('fs');
var rebus = require('../lib/rebus');
module.exports = testCase({
@@ -19,7 +20,7 @@ module.exports = testCase({
},
missingfolder: function (test) {
- var rebusNoFolder = rebus('nosuchfoler', function (err) {
+ var rebusNoFolder = rebus('nosuchfolder', function (err) {
test.ok(err);
test.done();
});
@@ -261,7 +262,7 @@ module.exports = testCase({
var self = this;
var folder = path.join(__dirname, 'testRebus');
var rebus1 = rebus(folder, function (err) {
- test.ok(!err, 'shold get rebus instance');
+ test.ok(!err, 'should get rebus instance');
var rebus2 = rebus(folder);
test.ok(rebus1 === rebus2, 'should be the same instance for the same folder');
test.deepEqual(rebus2.value, { a: { b: { c1: 'x', c2: 'y'} }, c: { d: {}} });
@@ -301,5 +302,35 @@ module.exports = testCase({
test.done();
});
rebus1.publish('a.b', { c1: 'x', c2: 'y' });
+ },
+
+ loaderIncompleteFile: function (test) {
+ var self = this;
+ var value = '{ "a": 1';
+ var loaded = false;
+ var completed = 0;
+ function complete() {
+ if (++completed === 2) {
+ test.done();
+ }
+ }
+ fs.writeFile(path.join(self.folder, 'b.json'), value, function (err) {
+ test.ok(!err, 'should write partial file');
+ var rebusT = rebus(self.folder, function (err) {
+ test.ok(!err, 'should get rebus instance');
+ loaded = true;
+ test.deepEqual(rebusT.value, { b: { a: 2} });
+ rebusT.close();
+ complete();
+ });
+ setTimeout(function () {
+ test.ok(!loaded, 'the file is not complete, should be waiting for correct file');
+ value = '{ "a": 2 }';
+ fs.writeFile(path.join(self.folder, 'b.json'), value, function (err) {
+ test.ok(!err, 'should write full file');
+ complete();
+ });
+ }, 300);
+ });
}
});

0 comments on commit 573097d

Please sign in to comment.