Skip to content

Commit

Permalink
Implement two stage write combining.
Browse files Browse the repository at this point in the history
  • Loading branch information
creationix committed Aug 24, 2010
1 parent e02af6b commit d5035a5
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 157 deletions.
326 changes: 177 additions & 149 deletions lib/nstore.js
Expand Up @@ -18,23 +18,27 @@ var nStore = module.exports = Class.extend({
this.filename = filename; this.filename = filename;
this.fd = null; this.fd = null;
this.index = Hash.new(); this.index = Hash.new();
this.writeQueue = Queue.new(); this.toWriteCallbacks = [];
this.toWrite = Hash.new();
this.isWriting = Hash.new();
this.stale = 0; this.stale = 0;
this.dbLength = 0; this.dbLength = 0;
this.busy = false; this.busy = false;
this.filterFn = null; this.filterFn = null;
this.loadDatabase(callback); this.loadDatabase(callback);
}, },


// getter property that returns the number of documents in the database
get length() { get length() {
return this.index.length; return this.index.length;
}, },


getKey: function getKey() { genKey: function genKey() {
var key = (0x100000000 * ((Date.now() & 0x3ffff)+Math.random())).toString(32); // Lower 0x20 from Date.now and 0x100000000 from Math.random
if (this.index.hasOwnProperty(key)) { // Last character is from timestamp, the rest is random using full
return this.getKey(); // precision of Math.random (32 bit integer)
var key = (0x2000000000 * Math.random() + (Date.now() & 0x1f)).toString(32);
if (this.index.hasOwnProperty(key) || this.toWrite.hasOwnProperty(key) || this.isWriting.hasOwnProperty(key)) {
return this.genKey();
} }
return key; return key;
}, },
Expand Down Expand Up @@ -114,91 +118,48 @@ var nStore = module.exports = Class.extend({
}); });
}, },


compactDatabase: function (clear, callback) {
if ((!clear && this.stale === 0) || this.busy) return;
var tmpFile = Path.join(Path.dirname(this.filename), this.getKey() + ".tmpdb"),
tmpDb;

this.busy = true;
var self = this;
Step(
function makeNewDb() {
tmpDb = nStore.new(tmpFile, this);
},
function copyData(err) {
if (err) throw err;
if (clear) return true;
var group = this.group();
var copy = Step.fn(
function (key) {
self.get(key, this);
},
function (err, doc, key) {
if (err) throw err;
if (self.filterFn && self.filterFn(doc, key)) {
return true;
}
tmpDb.save(key, doc, this);
}
);

self.index.forEach(function (info, key) {
copy(key, group());
});
},
function closeOld(err) {
if (err) throw err;
fs.close(self.fd, this);
},
function moveNew(err) {
if (err) throw err;
fs.rename(tmpFile, self.filename, this);
},
function transitionState(err) {
if (err) throw err;
self.dbLength = tmpDb.dbLength;
self.index = tmpDb.index;
self.fd = tmpDb.fd;
self.stale = tmpDb.stale;
return true;
},
function cleanup(err) {
self.busy = false;
process.nextTick(function () {
self.checkQueue();
});
if (err) throw err;
return true;
},
function prologue(err) {
if (callback) {
callback(err);
}
}
);

},

save: function save(key, doc, callback) { save: function save(key, doc, callback) {
if (!key) { if (!key) {
key = this.getKey(); key = this.genKey();
} }
this.writeQueue.push({ this.toWrite[key] = doc;
key: key.toString(), this.toWriteCallbacks.push(function (err) {
doc: doc, if (err) callback(err);
callback: callback else callback(err, key);
}); });
this.checkQueue(); this.checkQueue();
}, },


// Load a single record from the disk // Load a single record from the disk
get: function getByKey(key, callback) { get: function getByKey(key, callback) {
function missing() {
var error = new Error("Document does not exist for " + key);
error.errno = process.ENOENT;
callback(error);
}
// Check the cache of just written values
if (this.toWrite.hasOwnProperty(key)) {
var value = this.toWrite[key];
if (value === undefined) return missing();
process.nextTick(function () {
callback(null, value, key);
});
return;
}
// Check the cache of in-progress values
if (this.isWriting.hasOwnProperty(key)) {
var value = this.isWriting[key];
if (value === undefined) return missing();
process.nextTick(function () {
callback(null, value, key);
});
return;
}
// Read from disk otherwise
try { try {
var info = this.index[key]; var info = this.index[key];
if (!info) { if (!info) {
var error = new Error("Document does not exist for " + key); missing();
error.errno = process.ENOENT;
callback(error);
return; return;
} }


Expand All @@ -216,6 +177,142 @@ var nStore = module.exports = Class.extend({
} }
}, },


// Checks the save queue to see if there is a record to write to disk
checkQueue: function checkQueue() {
// Only run when not locked
if (this.busy) return;
// Skip if there is nothing to write
if (this.toWriteCallbacks.length === 0) return;
// Lock the table
this.busy = true;

// Grab items off the queue
this.isWriting = this.toWrite;

this.toWrite = Hash.new();

var callbacks = this.toWriteCallbacks.splice(0, this.toWriteCallbacks.length);
function callback(err) {
for (var i = 0, l = callbacks.length; i < l; i++) {
callbacks[i](err);
}
callbacks.length = 0;
self.busy = false;
self.checkQueue();
}

var updates = Hash.new(),
offset = this.dbLength,
self = this,
output;

// Use Step to handle errors
Step(
function () {
// Serialize the data to be written
output = self.isWriting.map(function (value, key) {
var doc = value === undefined ? "" : JSON.stringify(value),
docLength = Buffer.byteLength(doc),
keyLength = Buffer.byteLength(key);

// New data for the disk index
updates[key] = {
offset: offset + keyLength + 1,
length: docLength
};

offset += keyLength + docLength + 2;

return key + "\t" + doc + "\n";
}).join("");
output = new Buffer(output);
File.write(self.fd, output, self.dbLength, this);
},
function (err) {
if (err) return callback(err);
updates.forEach(function (value, key) {
if (self.index.hasOwnProperty(key)) {
self.stale++;
if (value.length === 0) {
delete self.index[key];
}
}
if (value !== undefined) {
self.index[key] = value;
}
});
self.dbLength += output.length;
callback();
},
callback
);
},

// compactDatabase: function (clear, callback) {
// if ((!clear && this.stale === 0) || this.busy) return;
// var tmpFile = Path.join(Path.dirname(this.filename), this.genKey() + ".tmpdb"),
// tmpDb;
//
// this.busy = true;
// var self = this;
// Step(
// function makeNewDb() {
// tmpDb = nStore.new(tmpFile, this);
// },
// function copyData(err) {
// if (err) throw err;
// if (clear) return true;
// var group = this.group();
// var copy = Step.fn(
// function (key) {
// self.get(key, this);
// },
// function (err, doc, key) {
// if (err) throw err;
// if (self.filterFn && self.filterFn(doc, key)) {
// return true;
// }
// tmpDb.save(key, doc, this);
// }
// );
//
// self.index.forEach(function (info, key) {
// copy(key, group());
// });
// },
// function closeOld(err) {
// if (err) throw err;
// fs.close(self.fd, this);
// },
// function moveNew(err) {
// if (err) throw err;
// fs.rename(tmpFile, self.filename, this);
// },
// function transitionState(err) {
// if (err) throw err;
// self.dbLength = tmpDb.dbLength;
// self.index = tmpDb.index;
// self.fd = tmpDb.fd;
// self.stale = tmpDb.stale;
// return true;
// },
// function cleanup(err) {
// self.busy = false;
// process.nextTick(function () {
// self.checkQueue();
// });
// if (err) throw err;
// return true;
// },
// function prologue(err) {
// if (callback) {
// callback(err);
// }
// }
// );
//
// },

remove: function removeByKey(key, callback) { remove: function removeByKey(key, callback) {
try { try {
var info = this.index[key]; var info = this.index[key];
Expand All @@ -225,80 +322,11 @@ var nStore = module.exports = Class.extend({
callback(error); callback(error);
return; return;
} }
this.save(key, null, callback); this.save(key, undefined, callback);
} catch(err) { } catch(err) {
callback(err); callback(err);
} }
}, },


clear: function clearAll(callback) {
if (this.busy) {
var self = this;
process.nextTick(function () {
self.clear(callback);
});
return;
}
this.compactDatabase(true, callback);
},

// Checks the save queue to see if there is a record to write to disk
checkQueue: function checkQueue() {
if (this.busy) return;
var next = this.writeQueue.shift();
if (next === undefined) {
// Compact when the db is over half stale
if (this.stale > (this.length - this.stale)) {
this.compactDatabase();
}
return;
}
this.busy = true;
var self = this;
try {
var line = new Buffer(next.key + "\t" + JSON.stringify(next.doc) + "\n");
var keyLength = Buffer.byteLength(next.key);
} catch(err) {
console.log(err.stack);
if (next.callback) {
next.callback(err);
}
return;
}
Step(
function writeDocument() {
File.write(self.fd, line, self.dbLength, this);
},
function updateIndex(err) {
if (err) throw err;
// Count stale records
if (self.index.hasOwnProperty(next.key)) { self.stale++; }
if (next.doc) {
// Update index
self.index[next.key] = {
position: self.dbLength + keyLength + 1,
length: line.length - keyLength - 2
};
} else {
delete self.index[next.key];
}
// Update the pointer to the end of the database
self.dbLength += line.length;
return true;
},
function done(err) {
self.busy = false;
if (err) throw err;
self.checkQueue();
return true;
},
function (err) {
if (next.callback) {
next.callback(err, next.key);
}
}
);
},

}); });


2 changes: 1 addition & 1 deletion lib/nstore/cache.js
Expand Up @@ -14,7 +14,7 @@ module.exports = function CachePlugin(maxSize) {
var self = { var self = {
save: function save(key, doc, callback) { save: function save(key, doc, callback) {
// Go ahead and generate the auto key so we know what to cache // Go ahead and generate the auto key so we know what to cache
if (!key) key = this.getKey(); if (!key) key = this.genKey();
push(key, doc); push(key, doc);
// Call super // Call super
self.__proto__.save.call(this, key, doc, callback); self.__proto__.save.call(this, key, doc, callback);
Expand Down
2 changes: 1 addition & 1 deletion test/helper.js
Expand Up @@ -35,4 +35,4 @@ function clean() {
clean(); clean();


// Clean on exit too // Clean on exit too
process.addListener('exit', clean); // process.addListener('exit', clean);

0 comments on commit d5035a5

Please sign in to comment.