Permalink
Browse files

updated docs + bug fixes

  • Loading branch information...
Raynos committed Nov 24, 2012
1 parent 9613319 commit 1b4fd2c446a30ffb21cd1dfc1961d8bfcc43f5fd
Showing with 123 additions and 42 deletions.
  1. +37 −27 README.md
  2. +29 −0 examples/simple.js
  3. +53 −13 index.js
  4. +4 −2 package.json
View
@@ -11,37 +11,47 @@ You query a range in the database. It will load the range from
It's basically a never ending feed like `tail -f`
```js
-var livefeed = require("level-livefeed")
+var livefeed = require("..")
, level = require("levelidb")
+ , WriteStream = require("write-stream")
- , db = level("/tmp/db")
+ , db = level("/tmp/db-livefeed-example", {
+ createIfMissing: true
+ })
var stream = livefeed(db, { start: "foo:", end: "foo;" })
-/* stream
-
- { type: "put", key: keyFromDisk1, value: valueFromDisk1 }
- { type: "put", key: keyFromDisk2, value: valueFromDisk2 }
-
-*/
-
-db.put("some key", "some value")
-/* stream
- { type: "put", key: "some key", value: "some value" }
-*/
-
-db.del("die")
-/* stream
- { type: "del", key: "die" }
-*/
-
-db.batch([
- { type: "put", key: "one", value: "two" }
- , { type: "del", key: "two" }
-])
-/* stream
- { type: "put", key: "one", value: "two" }
- , { type: "del", key: "two" }
-*/
+
+stream.pipe(WriteStream(function (chunk) {
+ console.log("chunk", chunk.type, chunk.key.toString()
+ , chunk.value && chunk.value.toString())
+}))
+
+stream.on("loaded", function () {
+ console.log("finished loading from disk")
+})
+
+setTimeout(function () {
+ db.put("foo:some key", "some value")
+
+ db.del("foo:die")
+
+ db.batch([
+ { type: "put", key: "foo:one", value: "two" }
+ , { type: "del", key: "foo:two" }
+ ])
+}, 2000)
+```
+
+prints
+
+```
+chunk put foo:one two
+chunk put foo:some key some value
+finished loading from disk
+chunk put foo:some key some value
+chunk del foo:die undefined
+chunk put foo:one two
+chunk del foo:two undefined
```
## Installation
View
@@ -0,0 +1,29 @@
+var livefeed = require("..")
+ , level = require("levelidb")
+ , WriteStream = require("write-stream")
+
+ , db = level("/tmp/db-livefeed-example", {
+ createIfMissing: true
+ })
+
+var stream = livefeed(db, { start: "foo:", end: "foo;" })
+
+stream.pipe(WriteStream(function (chunk) {
+ console.log("chunk", chunk.type, chunk.key.toString()
+ , chunk.value && chunk.value.toString())
+}))
+
+stream.on("loaded", function () {
+ console.log("finished loading from disk")
+})
+
+setTimeout(function () {
+ db.put("foo:some key", "some value")
+
+ db.del("foo:die")
+
+ db.batch([
+ { type: "put", key: "foo:one", value: "two" }
+ , { type: "del", key: "foo:two" }
+ ])
+}, 2000)
View
@@ -1,5 +1,6 @@
var ReadStream = require("read-stream")
, WriteStream = require("write-stream")
+ , wrap = require("streams2")
module.exports = query
@@ -10,24 +11,63 @@ function query(db, options) {
, stream = queue.stream
, start = options.start
, end = options.end
+ , writer = WriteStream(write, loaded)
+ , reader = wrap(db.readStream(options))
- db.readStream(options)
- .pipe(WriteStream(function write(chunk) {
- chunk.type = "put"
- queue.push(chunk)
- }))
+ reader.pipe(writer)
- db.on("put", function put(key, value) {
- if (start < key && key < end) {
- queue.push({ type: "put", key: key, value: value })
+ db.on("put", put)
+ db.on("del", del)
+ db.on("batch", batch)
+ stream.close = stream.destroy = close
+
+ return stream
+
+ function write(chunk) {
+ chunk.type = "put"
+ queue.push(chunk)
+ }
+
+ function loaded() {
+ stream.emit("loaded")
+ this.emit("finish")
+ }
+
+ function put(key, value) {
+ key = key.toString()
+ if ((!start || start <= key) &&
+ (!end || key <= end)
+ ) {
+ queue.push({ type: "put", key: key , value: value })
}
- })
+ }
- db.on("del", function del(key) {
- if (start < key && key < end) {
+ function del(key) {
+ key = key.toString()
+ if ((!start || start <= key) &&
+ (!end || key <= end)
+ ) {
queue.push({ type: "del", key: key })
}
- })
+ }
- return stream
+ function batch(args) {
+ args.forEach(function (item) {
+ var key = item.key.toString()
+
+ if ((!start || start <= key) &&
+ (!end || key <= end)
+ ) {
+ return queue.push(item)
+ }
+ })
+ }
+
+ function close() {
+ reader.unpipe(writer)
+ db.removeListener("put", put)
+ db.removeListener("del", del)
+ db.removeListener("batch", batch)
+ queue.end()
+ }
}
View
@@ -18,10 +18,12 @@
},
"dependencies": {
"read-stream": "~0.4.8",
- "write-stream": "~0.4.3"
+ "write-stream": "~0.4.3",
+ "streams2": "~0.1.1"
},
"devDependencies": {
- "tap": "~0.3.1"
+ "tap": "~0.3.1",
+ "levelidb": "~0.1.8"
},
"licenses": [
{

0 comments on commit 1b4fd2c

Please sign in to comment.