Skip to content

Commit

Permalink
Add all() api point for easy async queries.
Browse files Browse the repository at this point in the history
  • Loading branch information
creationix committed Jul 1, 2010
1 parent 5a726ac commit c1aaaec
Showing 1 changed file with 83 additions and 64 deletions.
147 changes: 83 additions & 64 deletions lib/nstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,72 @@ function nStore(filename) {

}

function getStream(filter) {
var counter = 0;
var stream = new process.EventEmitter();
var queue = [];
var paused = false;

// Checks to see if we should emit the "end" event yet.
function checkDone() {
if (!paused && counter === 0) {
counter--;
stream.emit("end");
}
}

// Tries to push events through
function flush() {
if (paused) { return; }
for (var i = 0, l = queue.length; i < l; i++) {
var item = queue[i];
stream.emit("data", item.doc, item.meta);
counter--;
}
queue.length = 0;
process.nextTick(checkDone);
}


stream.pause = function () {
paused = true;
};

// Resumes emitting of events
stream.resume = function () {
paused = false;
process.nextTick(function () {
flush();
checkDone();
});
};

Object.keys(index).forEach(function (key) {
counter++;
getByKey(key, function (err, doc, meta) {
if (err) {
stream.emit("error", err);
return;
}
if (!filter || filter(doc, meta)) {
queue.push({
doc: doc,
meta: meta
});
flush();
} else {
counter--;
process.nextTick(checkDone);
}
});
});

process.nextTick(checkDone);

return stream;
}


return {
get length() {
return Object.keys(index).length;
Expand Down Expand Up @@ -258,75 +324,28 @@ function nStore(filename) {
}
},

all: function (filter, callback) {
if (typeof filter === 'function' && callback === undefined) {
callback = filter;
filter = null;
}
var results = [];
var stream = getStream(filter);
stream.addListener('data', function (doc, meta) {
results.push([doc, meta]);
});
stream.addListener('end', function () {
callback(null, results);
});
stream.addListener('error', callback);
},

// Returns a readable stream of the whole collection.
// Supports pause and resume so that you can delay events for layer.
// This queues "data" and "end" events in memory./
// Also you can provide a filter to pre-filter results before they
// go to the queue
stream: function (filter) {
var counter = 0;
var stream = new process.EventEmitter();
var queue = [];
var paused = false;

// Checks to see if we should emit the "end" event yet.
function checkDone() {
if (!paused && counter === 0) {
counter--;
stream.emit("end");
}
}

// Tries to push events through
function flush() {
if (paused) { return; }
for (var i = 0, l = queue.length; i < l; i++) {
var item = queue[i];
stream.emit("data", item.doc, item.meta);
counter--;
}
queue.length = 0;
process.nextTick(checkDone);
}


stream.pause = function () {
paused = true;
};

// Resumes emitting of events
stream.resume = function () {
paused = false;
process.nextTick(function () {
flush();
checkDone();
});
};

Object.keys(index).forEach(function (key) {
counter++;
getByKey(key, function (err, doc, meta) {
if (err) {
stream.emit("error", err);
return;
}
if (!filter || filter(doc, meta)) {
queue.push({
doc: doc,
meta: meta
});
flush();
} else {
counter--;
process.nextTick(checkDone);
}
});
});

process.nextTick(checkDone);

return stream;
},
stream: getStream,

// Loads a single document by id, accepts key and callback
// the callback will be called with (err, doc, meta)
Expand Down

0 comments on commit c1aaaec

Please sign in to comment.