Permalink
Comparing changes
Open a pull request
- 17 commits
- 7 files changed
- 0 commit comments
- 2 contributors
Unified
Split
Showing
with
370 additions
and 74 deletions.
- +22 −7 bin/cat.js
- +17 −6 lib/commands.js
- +44 −19 lib/rest-handler.js
- +4 −1 package.json
- +40 −0 test/tests/cli.js
- +67 −16 test/tests/read-streams.js
- +176 −25 test/tests/rest.js
| @@ -2,30 +2,45 @@ var stdout = require('stdout-stream') | ||
| var multistream = require('multistream') | ||
| var eos = require('end-of-stream') | ||
| var through = require('through2') | ||
| var ldj = require('ndjson') | ||
| var pump = require('pump') | ||
| var formatData = require('format-data') | ||
| module.exports = function(dat, opts, cb) { | ||
| if (!opts) opts = {} | ||
| if (!opts.f && !opts.json) opts.json = true | ||
| if(!opts.format) { | ||
| if(opts.csv) opts.format = 'csv' | ||
| if(opts.json) opts.format = 'json' | ||
| if(opts.ndjson) opts.format = 'ndjson' | ||
| if(opts.sse) opts.format = 'sse' | ||
| } | ||
| var format = opts.format || 'ndjson' | ||
| opts.format = 'objectMode' | ||
| var readStream = dat.createReadStream(opts) | ||
| if (opts.live) { | ||
| if(['ndjson', 'csv', 'sse'].indexOf(format) === -1) { | ||
| format = 'ndjson' | ||
| } | ||
| var changes = dat.createChangesReadStream({ | ||
| since: dat.storage.change, | ||
| data: true, | ||
| decode: true, | ||
| live: true | ||
| }) | ||
| var format = through.obj(function(data, enc, cb) { | ||
| var selectValues = through.obj(function(data, enc, cb) { | ||
| // Somehow data.value has no valid version key | ||
| var row = data.value | ||
| data.value.version = data.to | ||
| cb(null, data.value) | ||
| }) | ||
| readStream = multistream([readStream, pump(changes, format, ldj.serialize())]) | ||
| readStream = multistream.obj([readStream, pump(changes, selectValues)]) | ||
| } | ||
| readStream.pipe(stdout) | ||
| eos(readStream, cb) | ||
| opts.format = format | ||
| var formatter = formatData(opts) | ||
| readStream.pipe(formatter).pipe(stdout) | ||
| eos(formatter, cb) | ||
| } | ||
| @@ -16,7 +16,6 @@ var request = require('request') | ||
| var levelup = require('levelup') | ||
| var ldj = require('ndjson') | ||
| var concat = require('concat-stream') | ||
| var csvWriter = require('csv-write-stream') | ||
| var connections = require('connections') | ||
| var through = require('through2') | ||
| var multilevel = require('multilevel') | ||
| @@ -25,6 +24,7 @@ var isNumber = require('isnumber') | ||
| var skimBlobs = require('skim-blob-store') | ||
| var remoteBlobs = require('dat-remote-blobs') | ||
| var debug = require('debug')('dat.commands') | ||
| var formatData = require('format-data') | ||
| var restHandler = require('./rest-handler.js') | ||
| var restServer = require('./rest-server.js') | ||
| @@ -713,8 +713,13 @@ var changesEncoder = function(dat) { | ||
| } | ||
| dat.createChangesReadStream = function(opts) { | ||
| if (opts && opts.decode) return pumpify.obj(this.storage.createChangesReadStream(opts), changesDecoder(this)) | ||
| return this.storage.createChangesReadStream(opts) | ||
| var pipeline = [this.storage.createChangesReadStream(opts)] | ||
| if (opts && opts.decode) pipeline.push(changesDecoder(this)) | ||
| if(opts.format && opts.format !== 'objectMode') | ||
| pipeline.push(formatData(opts)) | ||
| return pipeline.length === 1 ? pipeline[0] : pumpify.obj(pipeline) | ||
| } | ||
| dat.createChangesWriteStream = function(opts) { | ||
| @@ -734,9 +739,15 @@ dat.createReadStream = function(opts) { | ||
| if (!opts) opts = {} | ||
| var pipeline = [this.storage.createReadStream(opts), decoder(this)] | ||
| if (opts.csv || opts.format === 'csv') pipeline.push(csvWriter({headers: this.headers()})) | ||
| else if (opts.json || opts.format === 'json') pipeline.push(ldj.serialize()) | ||
| if(!opts.format) { | ||
| if(opts.csv) opts.format = 'csv' | ||
| if(opts.json) opts.format = 'json' | ||
| if(opts.ndjson) opts.format = 'ndjson' | ||
| } | ||
| if(opts.format && opts.format !== 'objectMode') | ||
| pipeline.push(formatData(opts)) | ||
| return pumpify.obj(pipeline) | ||
| } | ||
| @@ -1,7 +1,6 @@ | ||
| var http = require('http') | ||
| var url = require('url') | ||
| var path = require('path') | ||
| var sleep = require('sleep-ref') | ||
| var concat = require('concat-stream') | ||
| var ldj = require('ndjson') | ||
| var manifest = require('level-manifest') | ||
| @@ -16,19 +15,19 @@ var pump = require('pump') | ||
| var zlib = require('zlib') | ||
| var through = require('through2') | ||
| var formatOpts = require('rest-format-opts')({ | ||
| 'json': 'application/json', | ||
| 'csv': 'text/csv', | ||
| 'ndjson': 'application/x-ndjson', | ||
| 'sse': 'text/event-stream' | ||
| }) | ||
| module.exports = RestHandler | ||
| function RestHandler(dat) { | ||
| if (!(this instanceof RestHandler)) return new RestHandler(dat) | ||
| this.dat = dat | ||
| this.auth = auth(dat.options) | ||
| this.sleep = sleep(function(opts) { | ||
| opts.decode = true | ||
| if (opts.live === 'true') opts.live = true | ||
| if (opts.tail === 'true') opts.tail = true | ||
| if (typeof opts.tail === 'string') opts.tail = parseInt(opts.tail, 10) | ||
| return dat.createChangesReadStream(opts) | ||
| }, {style: 'newline'}) | ||
| } | ||
| RestHandler.prototype.session = function(req, res) { | ||
| @@ -157,8 +156,26 @@ RestHandler.prototype.pull = function(req, res) { | ||
| pump(send, zip(req, res), res) | ||
| } | ||
| RestHandler.prototype.changes = function(req, res) { | ||
| this.sleep.httpHandler(req, res) | ||
| RestHandler.prototype.changes = function(req, res) { | ||
| var readStream = this.dat.createChangesReadStream.bind(this.dat) | ||
| formatOpts(req, res, function (req, res, opts) { | ||
| opts.decode = true | ||
| if (opts.live === 'true') opts.live = true | ||
| if (opts.tail === 'true') opts.tail = true | ||
| if (typeof opts.tail === 'string') opts.tail = parseInt(opts.tail, 10) | ||
| if(!opts.format) opts.format = 'json' | ||
| // only ndjson, csv sse are suitable for live streams | ||
| if(opts.live && ['ndjson', 'csv', 'sse'].indexOf(opts.format) === -1) { | ||
| opts.format = 'ndjson' | ||
| // Plain content type as long as application/x-ndjson is not supported by browsers | ||
| res.setHeader('Content-Type', 'text/plain') | ||
| } | ||
| readStream(opts).pipe(res) | ||
| }) | ||
| } | ||
| RestHandler.prototype.stats = function(req, res) { | ||
| @@ -204,14 +221,22 @@ RestHandler.prototype.exportCsv = function(req, res) { | ||
| pump(readStream, res) | ||
| } | ||
| RestHandler.prototype.exportJson = function(req, res) { | ||
| var reqUrl = url.parse(req.url, true) | ||
| var qs = reqUrl.query | ||
| if (typeof qs.limit === 'undefined') qs.limit = 50 | ||
| else qs.limit = +qs.limit | ||
| var readStream = this.dat.createReadStream(qs) | ||
| res.writeHead(200, {'content-type': 'application/json'}) | ||
| pump(readStream, jsonStream.stringify('{"rows": [\n', '\n,\n', '\n]}\n'), res) | ||
| RestHandler.prototype.exportData = function(req, res) { | ||
| var readStream = this.dat.createReadStream.bind(this.dat) | ||
| formatOpts(req, res, function (req, res, opts) { | ||
| if (typeof opts.limit === 'undefined') opts.limit = 50 | ||
| else opts.limit = +opts.limit | ||
| if(!(opts.format)) { | ||
| res.setHeader('Content-Type', 'application/json') | ||
| opts.format = 'json' | ||
| } | ||
| if(opts.format === 'json') opts.style = opts.style || 'object' | ||
| readStream(opts).pipe(res) | ||
| }) | ||
| } | ||
| RestHandler.prototype.error = function(res, status, message) { | ||
| @@ -330,7 +355,7 @@ RestHandler.prototype.document = function(req, res, opts) { | ||
| var self = this | ||
| if (req.method === "GET" || req.method === "HEAD") { | ||
| if (opts.key) return this.get(req, res, opts) | ||
| else return this.exportJson(req, res) | ||
| else return this.exportData(req, res) | ||
| } | ||
| this.auth.handle(req, res, function(err) { | ||
| if (err) return self.auth.error(req, res) | ||
| @@ -37,7 +37,6 @@ | ||
| "cookie-cutter": "^0.1.1", | ||
| "corsify": "~1.0.2", | ||
| "csv-parser": "^1.2.1", | ||
| "csv-write-stream": "~0.1.1", | ||
| "cuid": "^1.2.4", | ||
| "dat-editor-prebuilt": "^1.0.19", | ||
| "dat-remote-blobs": "^2.0.0", | ||
| @@ -52,6 +51,7 @@ | ||
| "execspawn": "^0.2.0", | ||
| "exit": "^0.1.2", | ||
| "extend": "~1.2.1", | ||
| "format-data": "^1.0.1", | ||
| "getport": "~0.1.0", | ||
| "head-stream": "~0.0.4", | ||
| "isnumber": "^1.0.0", | ||
| @@ -70,6 +70,7 @@ | ||
| "multilevel": "~6.0.0", | ||
| "multistream": "^1.4.1", | ||
| "ndjson": "^1.3.0", | ||
| "negotiator": "^0.4.9", | ||
| "peek-stream": "^1.1.1", | ||
| "pretty-bytes": "^0.1.1", | ||
| "protocol-buffers": "^2.3.3", | ||
| @@ -78,6 +79,7 @@ | ||
| "read": "^1.0.5", | ||
| "request": "~2.27.0", | ||
| "resolve": "^0.7.1", | ||
| "rest-format-opts": "^1.0.0", | ||
| "rimraf": "^2.2.8", | ||
| "routes-router": "~1.5.3", | ||
| "single-line-log": "^0.3.1", | ||
| @@ -101,6 +103,7 @@ | ||
| "memdown": "^0.7.1", | ||
| "run-parallel": "^1.0.0", | ||
| "run-series": "^1.0.2", | ||
| "split": "^0.3.1", | ||
| "tap-spec": "^0.2.0", | ||
| "tape": "^2.13.3", | ||
| "tree-kill": "0.0.6", | ||
| @@ -12,6 +12,7 @@ var through = require('through2') | ||
| var kill = require('tree-kill') | ||
| var rimraf = require('rimraf') | ||
| var runSerially = require('run-series') | ||
| var split = require('split') | ||
| var nodeCmd = process.execPath | ||
| var timeout = 20000 | ||
| @@ -555,6 +556,44 @@ module.exports.cloneDir = function(test, common) { | ||
| }) | ||
| } | ||
| module.exports.cat = function (test, common) { | ||
| test('CLI dat cat --live', function (t) { | ||
| common.destroyTmpDats(function () { | ||
| mkdirp(common.dat1tmp, function (err) { | ||
| t.notOk(err) | ||
| initDat({cwd: common.dat1tmp, timeout: timeout, rpc: common.rpc}, function(cleanup) { | ||
| var datImport = spawn(datCliPath, ['import', '-', '--results', '--json', '--quiet'], {cwd: common.dat1tmp, env: process.env}) | ||
| datImport.stdin.write('{"a": 1}\n') | ||
| datImport.stdout.once('data', function () { | ||
| var cat = spawn(datCliPath, ['cat', '--live'], {cwd: common.dat1tmp, env: process.env}) | ||
| var lineSplit = cat.stdout.pipe(split()) | ||
| lineSplit.once('data', function (chunk) { | ||
| var row1 = JSON.parse(chunk) | ||
| t.equals(row1.a, 1) | ||
| t.equals(Object.keys(row1).length, 3) | ||
| lineSplit.once('data', function (chunk) { | ||
| var row2 = JSON.parse(chunk) | ||
| t.equals(row2.a, 2) | ||
| t.equals(Object.keys(row2).length, 3) | ||
| kill(cat.pid) | ||
| kill(datImport.pid) | ||
| cleanup() | ||
| common.destroyTmpDats(function () { | ||
| t.end() | ||
| }) | ||
| }) | ||
| datImport.stdin.write('{"a": 2}\n') | ||
| datImport.stdin.end() | ||
| }) | ||
| }) | ||
| }) | ||
| }) | ||
| }) | ||
| }) | ||
| } | ||
| module.exports.all = function (test, common) { | ||
| module.exports.spawn(test, common) | ||
| module.exports.noArgs(test, common) | ||
| @@ -568,6 +607,7 @@ module.exports.all = function (test, common) { | ||
| module.exports.badCommand(test, common) | ||
| module.exports.clone(test, common) | ||
| module.exports.cloneDir(test, common) | ||
| module.exports.cat(test, common) | ||
| } | ||
| function initDat(opts, cb) { | ||
Oops, something went wrong.