Permalink
Browse files

Merge pull request #7 from anodejs/yosefd1

Support loading of incomplete files
  • Loading branch information...
2 parents 1af569f + 573097d commit 4ed25a474235976552cfc33a9775327f49e12cac @yosefd yosefd committed Jul 14, 2012
Showing with 81 additions and 13 deletions.
  1. +9 −2 README.md
  2. +38 −8 lib/rebus.js
  3. +1 −1 package.json
  4. +33 −2 test/test.js
View
@@ -30,7 +30,7 @@ var bus = rebus(directoryName, function(err) {
// The bus is initialized and includes current shared state.
console.log('the entire shared state is:', bus.value);
// Can start listening on changes for a particular object.
- bus.subscribe('some.name.space.x.y.z', function(obj) {
+ var notification = bus.subscribe('some.name.space.x.y.z', function(obj) {
// Got notification about object being changed by some publisher.
console.log('some.name.space.x.y.z changed and its value is:', obj);
console.log(
@@ -47,6 +47,9 @@ var bus = rebus(directoryName, function(err) {
'published some other object and its value now:',
bus.value.some.other.name.space);
});
+ // Cleanup
+ notification.close();
+ bus.close();
});
```
Rebus can be instantiated and used synchronously:
@@ -59,7 +62,11 @@ console.log('the entire shared state is:', bus.value);
// Write.
bus.publish('x', { ... });
// bus.value.x is not necessary the one assigned as not used in publish
-// completion. However, it is still can be used and it includes some value.
+// completion. However, it is still can be used and it includes some value
+// that was in x, before or after assignment.
+console.log('the value of x now:', bus.value.x);
+// Cleanup
+bus.close();
```
Note: using rebus value or function before instantiation completion works,
View
@@ -10,7 +10,7 @@ var nfs = 'yosefsaysthiscannotbepropertyname';
// Rebus instances created for folders.
// Keep them global to have only one instance per process.
-process.rebusinstances = {};
+process.rebusinstances = process.rebusinstances || {};
// Used cached on process rebus instances only if they bare the same version.
var rebusversion = 4;
@@ -32,7 +32,7 @@ module.exports = function (folder, options, callback) {
options = null;
}
callback = callback || function () { }
- options = options || { persistent: false, singletons: true }
+ options = options || { persistent: true, singletons: true }
if (typeof callback !== 'function') {
throw new Error('invalid callback');
}
@@ -45,6 +45,7 @@ module.exports = function (folder, options, callback) {
if (options.singletons) {
var singleton = process.rebusinstances[folder];
if (singleton) {
+ console.log('This process already has rebus instance on folder ' + folder);
// If not the same vesion, go and create new rebus instance.
if (singleton.version === rebusversion) {
// Call completion after return value is available.
@@ -75,6 +76,8 @@ module.exports = function (folder, options, callback) {
var freeId = 0;
// Flag used to close rebus instance only once.
var closed = false;
+ // The state of the loder. Keeps files that are not loaded successfully.
+ var loader = { errors: {} };
// Create facade for rebus factory, which creates and initializes rebus instance.
var instance = syncasyncFacade({ create: createInstance, initializeAsync: initializeAsync, initializeSync: initializeSync }, callback);
@@ -100,12 +103,19 @@ module.exports = function (folder, options, callback) {
return;
}
- async.forEach(files, _loadFile, function (err) {
- if (err) {
- console.warn('Loading all files was not smooth, err:', err);
- }
- // Regardless errors, start watching the directory.
+ async.forEach(files, _loadFile, function () {
_startWatchdog();
+ var countErrors = Object.keys(loader.errors).length;
+ if (countErrors > 0) {
+ // Several files failed to load. As they might be updated by other process, wait for change events
+ // until all files are loaded.
+ console.warn('Loading ' + countErrors + ' files was not smooth, waiting for updates');
+ // Store callback to call once all files are loaded.
+ loader.callback = callback;
+ return;
+ }
+ // No errors during loading, all files are loaded.
+ loader = null;
_updateSingleton();
// Asynchronous initialization is completed.
callback();
@@ -247,9 +257,29 @@ module.exports = function (folder, options, callback) {
console.info('Object ' + filename + ' was not yet fully written, exception:', e);
// There will be another notification of change when the last write to file is completed.
// Meanwhile leave the previous value.
- callback(e);
+ if (loader) {
+ // Store this error to wait until file will be successfully loaded for the 1st time.
+ loader.errors[filename] = e;
+ }
+ // Don't return error to continue asynchronous loading of other files. Errors are assembled on loader.
+ callback();
return;
}
+ console.log('Loaded ' + filename);
+ if (loader) {
+ if (loader.errors[filename]) {
+ // File that previously failed to load, now is loaded.
+ delete loader.errors[filename];
+ var countErrors = Object.keys(loader.errors).length;
+ if (countErrors === 0) {
+ // All errors are fixed. This is the time to complete loading.
+ var initcb = loader.callback;
+ loader = null;
+ _updateSingleton();
+ initcb();
+ }
+ }
+ }
callback();
});
}
View
@@ -4,7 +4,7 @@
"main": "./main",
"bin": {},
"author": "anode <anode@microsoft.com>",
- "version": "0.4.3",
+ "version": "0.4.4",
"license": "MIT",
"contributors": [
"Yosef Dinerstein <yosefd@microsoft.com>",
View
@@ -2,6 +2,7 @@
var path = require('path');
var rimraf = require('rimraf');
var mkdirp = require('mkdirp');
+var fs = require('fs');
var rebus = require('../lib/rebus');
module.exports = testCase({
@@ -19,7 +20,7 @@ module.exports = testCase({
},
missingfolder: function (test) {
- var rebusNoFolder = rebus('nosuchfoler', function (err) {
+ var rebusNoFolder = rebus('nosuchfolder', function (err) {
test.ok(err);
test.done();
});
@@ -261,7 +262,7 @@ module.exports = testCase({
var self = this;
var folder = path.join(__dirname, 'testRebus');
var rebus1 = rebus(folder, function (err) {
- test.ok(!err, 'shold get rebus instance');
+ test.ok(!err, 'should get rebus instance');
var rebus2 = rebus(folder);
test.ok(rebus1 === rebus2, 'should be the same instance for the same folder');
test.deepEqual(rebus2.value, { a: { b: { c1: 'x', c2: 'y'} }, c: { d: {}} });
@@ -301,5 +302,35 @@ module.exports = testCase({
test.done();
});
rebus1.publish('a.b', { c1: 'x', c2: 'y' });
+ },
+
+ loaderIncompleteFile: function (test) {
+ var self = this;
+ var value = '{ "a": 1';
+ var loaded = false;
+ var completed = 0;
+ function complete() {
+ if (++completed === 2) {
+ test.done();
+ }
+ }
+ fs.writeFile(path.join(self.folder, 'b.json'), value, function (err) {
+ test.ok(!err, 'should write partial file');
+ var rebusT = rebus(self.folder, function (err) {
+ test.ok(!err, 'should get rebus instance');
+ loaded = true;
+ test.deepEqual(rebusT.value, { b: { a: 2} });
+ rebusT.close();
+ complete();
+ });
+ setTimeout(function () {
+ test.ok(!loaded, 'the file is not complete, should be waiting for correct file');
+ value = '{ "a": 2 }';
+ fs.writeFile(path.join(self.folder, 'b.json'), value, function (err) {
+ test.ok(!err, 'should write full file');
+ complete();
+ });
+ }, 300);
+ });
}
});

0 comments on commit 4ed25a4

Please sign in to comment.