Skip to content

Commit

Permalink
Factor out some code from nstore.js.
Browse files Browse the repository at this point in the history
We're getting ready for an even bigger refactor.
  • Loading branch information
creationix committed Aug 19, 2010
1 parent 58118cd commit eca6711
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 169 deletions.
37 changes: 37 additions & 0 deletions lib/class.js
@@ -0,0 +1,37 @@
// This is my proto library but without changing Object.prototype
// Then only sub-objects of Class have the special properties.
var Class = module.exports = Object.create(Object.prototype, {
// Implements a forEach much like the one for Array.prototype.forEach, but for
// any object.
forEach: {value: function forEach(callback, thisObject) {
var keys = Object.keys(this);
var length = keys.length;
for (var i = 0; i < length; i++) {
var key = keys[i];
callback.call(thisObject, this[key], key, this);
}
}},
// Implements a map much like the one for Array.prototype.map, but for any
// object. Returns an array, not a generic object.
map: {value: function map(callback, thisObject) {
var accum = [];
var keys = Object.keys(this);
var length = keys.length;
for (var i = 0; i < length; i++) {
var key = keys[i];
accum[i] = callback.call(thisObject, this[key], key, this);
}
return accum;
}},
// Implement extend for easy prototypal inheritance
extend: {value: function extend(obj) {
obj.__proto__ = this;
return obj;
}},
// Implement new for easy self-initializing objects
new: {value: function () {
var obj = Object.create(this);
if (obj.initialize) obj.initialize.apply(obj, arguments);
return obj;
}}
});
179 changes: 17 additions & 162 deletions lib/nstore.js
@@ -1,27 +1,28 @@
var EventEmitter = require('events').EventEmitter, var EventEmitter = require('events').EventEmitter,
Buffer = require('buffer').Buffer, Class = require('./class'),
Queue = require('./queue'),
Path = require('path'), Path = require('path'),
Utils = require('./utils'),
Step = require('step'), Step = require('step'),
fs = require('fs'); fs = require('fs');


const CHUNK_LENGTH = 40 * 1024, const CHUNK_LENGTH = 40 * 1024,
TAB = 9, TAB = 9,
NEWLINE = 10; NEWLINE = 10;


var nStore = { var nStore = module.exports = Class.extend({


// Set up out local properties on the object // Set up out local properties on the object
// and load the datafile. // and load the datafile.
initialize: function initialize(filename, callback) { initialize: function initialize(filename, callback) {
if (this === nStore) throw new Error("Can't call initialize directly");
this.filename = filename; this.filename = filename;
this.fd = null; this.fd = null;
this.index = {}; this.index = {};
this.writeQueue = []; this.writeQueue = Queue.new();
this.writeQueue.__proto__ = fastArray;
this.stale = 0; this.stale = 0;
this.dbLength = 0; this.dbLength = 0;
this.busy = false; this.busy = false;
this.lastCompact = null;
this.filterFn = null; this.filterFn = null;
// We don't want any other properties on this object that aren't initialized here // We don't want any other properties on this object that aren't initialized here
Object.seal(this); Object.seal(this);
Expand Down Expand Up @@ -80,15 +81,15 @@ var nStore = {
self.stale = stale; self.stale = stale;
self.busy = false; self.busy = false;
process.nextTick(function () { process.nextTick(function () {
if (typeof callback === 'function') callback(); if (typeof callback === 'function') callback(null, self);
self.checkQueue(); self.checkQueue();
}); });
} }
} }


function emit(line) { function emit(line) {
counter++; counter++;
fsRead(fd, line[0], line[1] - line[0], function (err, key) { Utils.fsRead(fd, line[0], line[1] - line[0], function (err, key) {
if (index.hasOwnProperty(key)) { if (index.hasOwnProperty(key)) {
stale++; stale++;
} }
Expand All @@ -110,14 +111,14 @@ var nStore = {


compactDatabase: function (clear, callback) { compactDatabase: function (clear, callback) {
if ((!clear && this.stale === 0) || this.busy) return; if ((!clear && this.stale === 0) || this.busy) return;
var tmpFile = Path.join(Path.dirname(this.filename), makeUUID(this.index) + ".tmpdb"), var tmpFile = Path.join(Path.dirname(this.filename), Utils.makeUUID(this.index) + ".tmpdb"),
tmpDb = Object.create(nStore); tmpDb;


this.busy = true; this.busy = true;
var self = this; var self = this;
Step( Step(
function makeNewDb() { function makeNewDb() {
tmpDb.initialize(tmpFile, this); tmpDb = nStore.new(tmpFile, this);
}, },
function copyData(err) { function copyData(err) {
if (err) throw err; if (err) throw err;
Expand Down Expand Up @@ -178,7 +179,7 @@ var nStore = {
// Pass null as the key to get a generated key. // Pass null as the key to get a generated key.
save: function save(key, doc, callback) { save: function save(key, doc, callback) {
if (!key) { if (!key) {
key = makeUUID(this.index); key = Utils.makeUUID(this.index);
} }
this.writeQueue.push({ this.writeQueue.push({
key: key.toString(), key: key.toString(),
Expand All @@ -199,7 +200,7 @@ var nStore = {
return; return;
} }


fsRead(this.fd, info.position, info.length, function (err, buffer) { Utils.fsRead(this.fd, info.position, info.length, function (err, buffer) {
if (err) { callback(err); return; } if (err) { callback(err); return; }
try { try {
var data = JSON.parse(buffer.toString()); var data = JSON.parse(buffer.toString());
Expand Down Expand Up @@ -235,7 +236,7 @@ var nStore = {


// Find records using a query // Find records using a query
find: function find(query, callback) { find: function find(query, callback) {
var filter = compileQuery(query); var filter = Utils.compileQuery(query);
var stream; var stream;
var results = {}; var results = {};
if (!callback) { if (!callback) {
Expand Down Expand Up @@ -277,7 +278,8 @@ var nStore = {
// Checks the save queue to see if there is a record to write to disk // Checks the save queue to see if there is a record to write to disk
checkQueue: function checkQueue() { checkQueue: function checkQueue() {
if (this.busy) return; if (this.busy) return;
if (this.writeQueue.length === 0) { var next = this.writeQueue.shift();
if (next === undefined) {
// Compact when the db is over half stale // Compact when the db is over half stale
if (this.stale > (this.length - this.stale)) { if (this.stale > (this.length - this.stale)) {
this.compactDatabase(); this.compactDatabase();
Expand All @@ -287,7 +289,6 @@ var nStore = {
this.busy = true; this.busy = true;
var self = this; var self = this;
try { try {
var next = this.writeQueue.fastShift();
var line = new Buffer(next.key + "\t" + JSON.stringify(next.doc) + "\n"); var line = new Buffer(next.key + "\t" + JSON.stringify(next.doc) + "\n");
var keyLength = Buffer.byteLength(next.key); var keyLength = Buffer.byteLength(next.key);
} catch(err) { } catch(err) {
Expand All @@ -299,7 +300,7 @@ var nStore = {
} }
Step( Step(
function writeDocument() { function writeDocument() {
fsWrite(self.fd, line, self.dbLength, this); Utils.fsWrite(self.fd, line, self.dbLength, this);
}, },
function updateIndex(err) { function updateIndex(err) {
if (err) throw err; if (err) throw err;
Expand Down Expand Up @@ -332,151 +333,5 @@ var nStore = {
); );
}, },


};

// Utilities

// If a large number of writes gets queued up, the shift call normally
// eats all the CPU. This implementes a fast shift for the queue array.
var fastArray = Object.create(Array.prototype, {
start: {value: 0},
fastShift: {value: function () {
var item = this[this.start];
if (this.start >= Math.floor(this.length / 2)) {
this.splice(0, this.start + 1);
this.start = 0;
} else {
this.start++;
}
return item;
}}
}); });



// Reads from a given file descriptor at a specified position and length
// Handles all OS level chunking for you.
// Callback gets (err, utf8String)
// Reuses a buffer that grows (replaces old one) only when needed
var readBuffer = new Buffer(40*1024);
var readQueue = null;
var fsRead = safe3(function fsRead(fd, position, length, callback) {
if (length > readBuffer.length) {
readBuffer = new Buffer(length);
console.log("Upgrading read buffer to " + readBuffer.length);
}
var offset = 0;

function readChunk() {
fs.read(fd, readBuffer, offset, length - offset, position, function (err, bytesRead) {
if (err) { callback(err); return; }

offset += bytesRead;

if (offset < length) {
readChunk();
return;
}
callback(null, readBuffer.toString('utf8', 0, length));
});
}
readChunk();
});

// Writes a buffer to a specified file descriptor at the given offset
// handles chunking for you.
// Callback gets (err)
function fsWrite(fd, buffer, position, callback) {
var offset = 0,
length = buffer.length;

function writeChunk() {
fs.write(fd, buffer, offset, length - offset, position, function (err, bytesWritten) {
if (err) { callback(err); return; }
offset += bytesWritten;
if (offset < length) {
writeChunk();
return;
}
callback();
});
}
writeChunk();
}

// Generates a random unique 16 char base 36 string
// (about 2^83 possible keys)
function makeUUID(index) {
var key = "";
while (key.length < 16) {
key += Math.floor(Math.random() * 0xcfd41b9100000).toString(36);
}
key = key.substr(0, 16);
if (index.hasOwnProperty(key)) {
return makeUUID(index);
}
return key;
}


// Makes an async function that takes 3 arguments only execute one at a time.
function safe3(fn) {
var queue = [];
queue.__proto__ = fastArray;
var safe = true;
function checkQueue() {
var next = queue.fastShift();
safe = false;
fn(next[0], next[1], next[2], function (error, result) {
next[3](error, result);
if (queue.length > 0) {
checkQueue();
} else {
safe = true;
}
});
}
return function (arg1, arg2, arg3, callback) {
queue.push(arguments);
if (safe) {
checkQueue();
}
};
}


function compileQuery(query) {
if (typeof query === 'function') {
return query;
}
var exp;
if (Array.isArray(query)) {
exp = query.map(compileSection).join(" || ") || "true";
} else if (typeof query === 'object') {
exp = compileSection(query);
} else {
exp = "true";
}
var filter = new Function("doc", "key", "return " + exp + ";");
return filter;
}

function compileSection(obj) {
var parts = Object.keys(obj).map(function (key) {
var p = key.indexOf(" ");
var name = key;
var operator = "=";
if (p > 0) {
name = key.substr(0, p);
operator = key.substr(p + 1);
}
if (operator === '=') operator = '===';
if (operator === '<>') operator = '!==';
if (name !== "key") {
name = "doc." + name;
}
return "(" + name + " " + operator + " " + JSON.stringify(obj[key]) + ")";
})
return "(" + parts.join(" && ") + ")";
}

module.exports = nStore;
32 changes: 32 additions & 0 deletions lib/queue.js
@@ -0,0 +1,32 @@
var Class = require('./class');

// If a large number of writes gets queued up, the shift call normally
// eats all the CPU. This implementes a faster queue.
var Queue = module.exports = Class.extend({
initialize: function initialize() {
if (this === Queue) throw new Error("Cann't use Queue directly");
this.tail = [];
this.head = Array.prototype.slice.call(arguments);
this.offset = 0;
// Lock the object down
Object.seal(this);
return this;
},
shift: function shift() {
if (this.offset === this.head.length) {
var tmp = this.head;
tmp.length = 0;
this.head = this.tail;
this.tail = tmp;
this.offset = 0;
if (this.head.length === 0) return;
}
return this.head[this.offset++];
},
push: function push(item) {
return this.tail.push(item);
},
get length() {
return this.head.length - this.offset + this.tail.length;
}
});

0 comments on commit eca6711

Please sign in to comment.