Permalink
Browse files

add createValueStream, change createReadStream to match levelup

  • Loading branch information...
maxogden committed May 29, 2014
1 parent 228d098 commit d84b0250a28e7bc1d997184c40872cf2bef47cab
View
@@ -78,6 +78,8 @@ var readStream = db.createReadStream([opts])
Returns a [read stream](https://github.com/rvagg/node-levelup#createReadStream) over the most recent version of all rows in the dat store.
Rows are returned in the format `{key: key, value: value}` where key is by default a string and value is by default a JS object.
### Options
* `start` (defaults to the beginning of the possible keyspace) - key to start iterating from
@@ -87,6 +89,28 @@ Returns a [read stream](https://github.com/rvagg/node-levelup#createReadStream)
Note: not all options from `levelup.createReadStream` are supported at this time
## createValueStream
```js
var valueStream = db.createValueStream([opts])
```
Returns a [value stream](https://github.com/rvagg/node-levelup#createValueStream) over the most recent version of all rows in the dat store.
By default the returned stream is a readable object stream that will emit 1 JS object per row (equivalent to the `.value` object returned by `createReadStream`). This differs slightly from levelup where the value stream is not an object stream by default.
You can also pass in options to serialize the values as either CSV or line-delimited JSON (see below).
### Options
* `start` (defaults to the beginning of the possible keyspace) - key to start iterating from
* `end` (defaults to the end of the possible keyspace) - key to stop iterating at
* `limit` (default unlimited) - how many rows to return before stopping
* `keys` (default `true`) - if false you won't get JS objects with k/v pairs but rather only the raw columnized values from the data store
* `format` (default `objects`) - if set to `csv` or `json` the stream will not be an object mode stream and will emit serialized data
* `csv` (default `false`) - if true is equivalent to setting `format` to `csv`
* `json` (default `false`) - if true is equivalent to setting `format` to `json`
## createChangesStream
```js
View
@@ -26,6 +26,7 @@ var version = require("level-version")
var clearLog = require('single-line-log')
var multilevel = require('multilevel/msgpack')
var stdout = require('stdout-stream')
var combiner = require('stream-combiner')
var debug = require('debug')('dat.commands')
var clone = require(path.join(__dirname, 'clone'))
@@ -349,7 +350,7 @@ dat.cat = function(options, cb) {
if (!options) options = {}
if (!cb) cb = noop
if (!options.f && !options.json) options.json = true
var readStream = this.createReadStream(options)
var readStream = this.createValueStream(options)
readStream.pipe(stdout)
readStream.on('end', cb)
readStream.on('error', cb)
@@ -387,23 +388,36 @@ dat.put = function(key, val, opts, cb) {
return this.storage.put(key, val, opts, cb)
}
dat.delete = function(key, opts, cb) {
dat.delete = dat.del = function(key, opts, cb) {
return this.storage.delete(key, opts, cb)
}
dat.createReadStream = function(opts) {
dat.createReadStream = dat.readStream = function(opts) {
var self = this
if (!opts) opts = {}
var readStream = this.storage.createReadStream(opts)
if (opts.csv || opts.f === 'csv') var formatter = csvWriteStream()
else if (opts.json || opts.f === 'json') var formatter = ldj.serialize()
return combiner(readStream, keyValueFormatter())
function keyValueFormatter() {
return through.obj(function(obj, enc, cb) {
this.push({key: obj.id, value: obj})
cb()
})
}
}
dat.createValueStream = dat.valueStream = function(opts) {
var self = this
if (!opts) opts = {}
var readStream = this.storage.createReadStream(opts)
if (opts.csv || opts.format === 'csv') var formatter = csvWriteStream()
else if (opts.json || opts.format === 'json') var formatter = ldj.serialize()
// default to objects
if (!formatter) return readStream
readStream.pipe(formatter)
return formatter
else return combiner(readStream, formatter)
function csvWriteStream() {
var headers = self.headers()
View
@@ -117,7 +117,7 @@ RestHandler.prototype.exportCsv = function(req, res) {
var reqUrl = url.parse(req.url, true)
var qs = reqUrl.query
qs.csv = true
var readStream = this.dat.createReadStream(qs)
var readStream = this.dat.createValueStream(qs)
res.writeHead(200, {'content-type': 'text/csv'})
readStream.pipe(res)
}
@@ -126,7 +126,7 @@ RestHandler.prototype.exportJson = function(req, res) {
var reqUrl = url.parse(req.url, true)
var qs = reqUrl.query
if (typeof qs.limit === 'undefined') qs.limit = 50
var readStream = this.dat.createReadStream(qs)
var readStream = this.dat.createValueStream(qs)
res.writeHead(200, {'content-type': 'application/json'})
readStream.pipe(jsonStream.stringify('{"rows": [\n', '\n,\n', '\n]}\n')).pipe(res)
}
@@ -186,7 +186,7 @@ RestHandler.prototype.dataTable = function(req, res) {
if (qs.end) readOpts.end = qs.end
if (qs.reverse) readOpts.reverse = Boolean(qs.reverse)
var table = dataTable(headers)
this.dat.createReadStream(readOpts).pipe(table).pipe(res)
this.dat.createValueStream(readOpts).pipe(table).pipe(res)
}
RestHandler.prototype.json = function(res, json) {
View
@@ -71,8 +71,8 @@ module.exports = function() {
}
common.compareData = function compareData(t, dat1, dat2, cb) {
dat1.createReadStream().pipe(concat(function(db1) {
dat2.createReadStream().pipe(concat(function(db2) {
dat1.createValueStream().pipe(concat(function(db1) {
dat2.createValueStream().pipe(concat(function(db2) {
t.deepEquals(db1, db2, 'low level data matches')
cb()
}))
@@ -32,7 +32,7 @@ module.exports.clone = function(test, common) {
// var remote = 'http://localhost:' + dat.defaultPort
// dat2.clone(remote, function(err) {
// t.notOk(err, 'no err on clone')
// dat2.createReadStream().pipe(concat(function(data) {
// dat2.createValueStream().pipe(concat(function(data) {
// t.equal(data.length, 1)
// var first = data[0] || {}
// t.equal(first.foo, 'bar')
View
@@ -41,7 +41,7 @@ module.exports.putJson = function(test, common) {
common.getDat(t, function(dat, done) {
dat.put({"foo": "bar"}, function(err, doc) {
if (err) throw err
var cat = dat.createReadStream()
var cat = dat.createValueStream()
cat.pipe(concat(function(data) {
t.equal(data.length, 1)
@@ -58,7 +58,7 @@ module.exports.putWeirdKeys = function(test, common) {
common.getDat(t, function(dat, done) {
dat.put(".error.", {"foo": "bar"}, function(err, doc) {
if (err) throw err
var cat = dat.createReadStream()
var cat = dat.createValueStream()
cat.pipe(concat(function(data) {
t.equal(data.length, 1)
t.equal(data[0]['foo'], "bar")
@@ -74,7 +74,7 @@ module.exports.putJsonSetVersion = function(test, common) {
common.getDat(t, function(dat, done) {
dat.put({"foo": "bar", version: 5}, function(err, doc) {
if (err) throw err
var cat = dat.createReadStream()
var cat = dat.createValueStream()
cat.pipe(concat(function(data) {
t.equal(data.length, 1)
@@ -143,7 +143,7 @@ module.exports.multiplePutJson = function(test, common) {
if (err) throw err
dat.put({"foo": "bar"}, function(err) {
if (err) throw err
var cat = dat.createReadStream()
var cat = dat.createValueStream()
cat.pipe(concat(function(data) {
t.equal(data.length, 2)
@@ -166,7 +166,7 @@ module.exports.putBuff = function(test, common) {
dat.put(row, {columns: schema.toJSON()}, function(err) {
if (err) throw err
var cat = dat.createReadStream()
var cat = dat.createValueStream()
cat.pipe(concat(function(data) {
t.equal(data.length, 1)
@@ -188,7 +188,7 @@ module.exports.deleteRow = function(test, common) {
dat.get(doc.id, function(err, doc) {
t.true(err, 'doc should now be not found')
t.false(doc, 'doc should be null')
var cat = dat.createReadStream()
var cat = dat.createValueStream()
cat.pipe(concat(function(data) {
t.equal(data.length, 0, 'should return no data')
View
@@ -126,7 +126,7 @@ module.exports.sameDir = function(test, common) {
var ws = dat2.createWriteStream({ json: true })
ws.on('end', function() {
var cat = dat1.createReadStream()
var cat = dat1.createValueStream()
cat.pipe(concat(function(data) {
t.equal(data.length, 2)
t.equal(data[0].hello, "bruce wayne")
View
@@ -3,8 +3,8 @@
var path = require('path')
var concat = require('concat-stream')
module.exports.put = function(test, common) {
test('.put', function(t) {
module.exports.putGet = function(test, common) {
test('.put + .get', function(t) {
common.getDat(t, function(dat, done) {
dat.put('foo', {"bar": "baz"}, function(err, doc) {
if (err) throw err
@@ -18,14 +18,95 @@ module.exports.put = function(test, common) {
})
}
module.exports.del = function(test, common) {
test('.del', function(t) {
common.getDat(t, function(dat, done) {
dat.put('foo', {"bar": "baz"}, function(err, doc) {
if (err) throw err
dat.del('foo', function(err, data) {
t.notOk(err, 'no err')
setImmediate(done)
})
})
})
})
}
module.exports.createReadStream = function(test, common) {
test('.createReadStream', function(t) {
common.getDat(t, function(dat, done) {
dat.put('foo', {"bar": "baz"}, function(err, doc) {
if (err) throw err
var rs = dat.createReadStream()
rs.pipe(concat(function(rows) {
t.equal(rows.length, 1, '1 row')
t.ok(rows[0].key, '.key')
t.ok(rows[0].value, '.value')
setImmediate(done)
}))
})
})
})
}
module.exports.readStream = function(test, common) {
test('.readStream', function(t) {
common.getDat(t, function(dat, done) {
dat.put('foo', {"bar": "baz"}, function(err, doc) {
if (err) throw err
var rs = dat.readStream()
rs.pipe(concat(function(rows) {
t.equal(rows.length, 1, '1 row')
t.ok(rows[0].key, '.key')
t.ok(rows[0].value, '.value')
setImmediate(done)
}))
})
})
})
}
module.exports.createValueStream = function(test, common) {
test('.createValueStream', function(t) {
common.getDat(t, function(dat, done) {
dat.put('foo', {"bar": "baz"}, function(err, doc) {
if (err) throw err
var rs = dat.readStream()
rs.pipe(concat(function(rows) {
t.equal(rows.length, 1, '1 row')
t.ok(rows[0], '.key')
t.ok(rows[0], '.value')
setImmediate(done)
}))
})
})
})
}
module.exports.valueStream = function(test, common) {
test('.valueStream', function(t) {
common.getDat(t, function(dat, done) {
dat.put('foo', {"bar": "baz"}, function(err, doc) {
if (err) throw err
var rs = dat.readStream()
rs.pipe(concat(function(rows) {
t.equal(rows.length, 1, '1 row')
t.ok(rows[0], '.key')
t.ok(rows[0], '.value')
setImmediate(done)
}))
})
})
})
}
module.exports.all = function (test, common) {
module.exports.put(test, common)
// module.exports.get(test, common)
// module.exports.del(test, common)
// module.exports.createReadStream(test, common)
// module.exports.readStream(test, common)
// module.exports.createValueStream(test, common)
// module.exports.valueStream(test, common)
module.exports.putGet(test, common)
module.exports.del(test, common)
module.exports.createReadStream(test, common)
module.exports.readStream(test, common)
module.exports.createValueStream(test, common)
module.exports.valueStream(test, common)
// module.exports.createKeyStream(test, common)
// module.exports.keyStream(test, common)
// module.exports.createWriteStream(test, common)
Oops, something went wrong.

0 comments on commit d84b025

Please sign in to comment.