Permalink
Browse files

add createKeyStream, isOpen, isClosed

  • Loading branch information...
maxogden committed May 29, 2014
1 parent d84b025 commit d3d423ba92944138db2151e9e935529506d32eae
Showing with 150 additions and 14 deletions.
  1. +16 −2 docs/api.md
  2. +18 −1 lib/commands.js
  3. +12 −5 lib/storage.js
  4. +104 −6 test/tests/levelup.js
View
@@ -85,7 +85,6 @@ Rows are returned in the format `{key: key, value: value}` where key is by defau
* `start` (defaults to the beginning of the possible keyspace) - key to start iterating from
* `end` (defaults to the end of the possible keyspace) - key to stop iterating at
* `limit` (default unlimited) - how many rows to return before stopping
* `keys` (default `true`) - if false you won't get JS objects with k/v pairs but rather only the raw columnized values from the data store
Note: not all options from `levelup.createReadStream` are supported at this time
@@ -106,11 +105,26 @@ You can also pass in options to serialize the values as either CSV or line-delim
* `start` (defaults to the beginning of the possible keyspace) - key to start iterating from
* `end` (defaults to the end of the possible keyspace) - key to stop iterating at
* `limit` (default unlimited) - how many rows to return before stopping
* `keys` (default `true`) - if false you won't get JS objects with k/v pairs but rather only the raw columnized values from the data store
* `format` (default `objects`) - if set to `csv` or `json` the stream will not be an object mode stream and will emit serialized data
* `csv` (default `false`) - if true is equivalent to setting `format` to `csv`
* `json` (default `false`) - if true is equivalent to setting `format` to `json`
## createKeyStream
```js
var keyStream = db.createKeyStream([opts])
```
Returns a [key stream](https://github.com/rvagg/node-levelup#createKeyStream) over the most recent version of all keys in the dat store.
By default the returned stream is a readable object stream that will emit 1 JS object per row in the form `{id: key, version: number, deleted: boolean}`. This differs slightly from levelup where the value stream is not an object stream by default. Dat stores the id, version and deleted status in the key on disk which is why all 3 properties are returned by this stream.
### Options
* `start` (defaults to the beginning of the possible keyspace) - key to start iterating from
* `end` (defaults to the end of the possible keyspace) - key to stop iterating at
* `limit` (default unlimited) - how many rows to return before stopping
## createChangesStream
```js
View
@@ -425,6 +425,14 @@ dat.createValueStream = dat.valueStream = function(opts) {
}
}
dat.createKeyStream = dat.keyStream = function(opts) {
var self = this
if (!opts) opts = {}
opts.keysOnly = true
var readStream = this.storage.createReadStream(opts)
return readStream
}
dat.createChangesStream = function(opts) {
return this.storage.createChangesStream(opts)
}
@@ -487,8 +495,17 @@ dat.createBlobReadStream = function(key, name, opts) {
return proxy
}
dat.isOpen = function() {
if (!this.db) return false
return this.db.isOpen()
}
dat.isClosed = function() {
if (!this.db) return true
return this.db.isClosed()
}
dat.createWriteStream = function(options) {
dat.createWriteStream = dat.writeStream = function(options) {
return writeStream(this, options)
}
View
@@ -266,7 +266,6 @@ Database.prototype.delete = function (key, opts, cb) {
Database.prototype.createReadStream = function(opts) {
var self = this
if (!opts) opts = {}
if (typeof opts.keys === 'undefined') opts.keys = true
var curOpts = {
start: self._key(self.keys.cur, ''),
@@ -291,17 +290,25 @@ Database.prototype.createReadStream = function(opts) {
var pending = 0
function write(row, enc, next) {
var encKey = row.key.split(self.sep)[2]
var key = self._key(self.keys.data, encKey + self.sep + row.value)
var currentKey = row.key.split(self.sep)
var currentVal = row.value.split(self.sep)
var key = self._key(self.keys.data, currentKey[2] + self.sep + row.value)
// internal option, not documented, used to implement createKeyStream
if (opts.keysOnly) {
var obj = {id: currentKey[2]}
if (currentVal[0]) obj.version = docUtils.unpack(currentVal[0])
if (currentVal[1] === '1') obj.deleted = true
stream.push(obj)
return next()
}
pending++
self.db.get(key, { valueEncoding: 'binary'}, function(err, val) {
if (err) {
console.error('readStream GET Error: ', err)
next()
return
}
var valuesOnly = !opts.keys
var decoded = docUtils.decodeRow(key, val, self, valuesOnly)
var decoded = docUtils.decodeRow(key, val, self)
if (!decoded._deleted) stream.push(decoded)
if (--pending === 0 && stream.ended) stream.push(null)
next()
View
@@ -100,18 +100,116 @@ module.exports.valueStream = function(test, common) {
})
}
module.exports.createKeyStream = function(test, common) {
test('.createKeyStream', function(t) {
common.getDat(t, function(dat, done) {
dat.put('foo', {"bar": "baz"}, function(err, doc) {
if (err) throw err
var rs = dat.createKeyStream()
rs.pipe(concat(function(rows) {
t.equal(rows.length, 1, '1 key')
t.equal(rows[0].id, 'foo')
t.equal(rows[0].version, 1)
setImmediate(done)
}))
})
})
})
}
module.exports.keyStream = function(test, common) {
test('.keyStream', function(t) {
common.getDat(t, function(dat, done) {
dat.put('foo', {"bar": "baz"}, function(err, doc) {
if (err) throw err
var rs = dat.keyStream()
rs.pipe(concat(function(rows) {
t.equal(rows.length, 1, '1 key')
t.equal(rows[0].id, 'foo')
t.equal(rows[0].version, 1)
setImmediate(done)
}))
})
})
})
}
module.exports.createWriteStream = function(test, common) {
test('.createWriteStream', function(t) {
common.getDat(t, function(dat, done) {
var ws = dat.createWriteStream({ objects: true })
ws.on('end', function() {
var rs = dat.keyStream()
rs.pipe(concat(function(rows) {
t.equal(rows.length, 1, '1 key')
t.equal(rows[0].id, 'foo')
t.equal(rows[0].version, 1)
setImmediate(done)
}))
})
ws.write({"id": "foo", "b": "bar", "c": "hello"})
ws.end()
})
})
}
module.exports.writeStream = function(test, common) {
test('.writeStream', function(t) {
common.getDat(t, function(dat, done) {
var ws = dat.writeStream({ objects: true })
ws.on('end', function() {
var rs = dat.keyStream()
rs.pipe(concat(function(rows) {
t.equal(rows.length, 1, '1 key')
t.equal(rows[0].id, 'foo')
t.equal(rows[0].version, 1)
setImmediate(done)
}))
})
ws.write({"id": "foo", "b": "bar", "c": "hello"})
ws.end()
})
})
}
module.exports.isOpen = function(test, common) {
test('.isOpen', function(t) {
common.getDat(t, function(dat, done) {
var state = dat.isOpen()
t.equal(state, true, 'open')
done()
})
})
}
module.exports.isClosed = function(test, common) {
test('.isClosed', function(t) {
common.getDat(t, function(dat, done) {
var state = dat.isClosed()
t.equal(state, false, 'is not closed')
done()
})
})
}
module.exports.all = function (test, common) {
module.exports.putGet(test, common)
module.exports.del(test, common)
module.exports.createReadStream(test, common)
module.exports.readStream(test, common)
module.exports.createValueStream(test, common)
module.exports.valueStream(test, common)
// module.exports.createKeyStream(test, common)
// module.exports.keyStream(test, common)
// module.exports.createWriteStream(test, common)
// module.exports.writeStream(test, common)
// module.exports.isOpen(test, common)
// module.exports.isClosed(test, common)
module.exports.createKeyStream(test, common)
module.exports.keyStream(test, common)
module.exports.createWriteStream(test, common)
module.exports.writeStream(test, common)
module.exports.isOpen(test, common)
module.exports.isClosed(test, common)
// module.exports.batch(test, common)
}

0 comments on commit d3d423b

Please sign in to comment.