From aee91e4fbfe88d534d080f6f7d393d0850a2aa1d Mon Sep 17 00:00:00 2001 From: Benjamin Forster Date: Sun, 7 Jan 2018 23:12:40 +1100 Subject: [PATCH 1/5] use db.createReadStream; add options to limit & offset to get & getStream --- index.js | 11 +++--- lib/HyperdbDiffTransform.js | 60 +++++++++++++++++++---------- test/triple-store.spec.js | 76 ++++++++++++++++++------------------- 3 files changed, 85 insertions(+), 62 deletions(-) diff --git a/index.js b/index.js index 5f92c8c..91c361c 100644 --- a/index.js +++ b/index.js @@ -15,13 +15,14 @@ function Graph (db, opts) { Graph.prototype.v = (name) => new Variable(name) -Graph.prototype.getStream = function (triple) { - const stream = this.db.createDiffStream(utils.createQuery(triple)) - return stream.pipe(new HyperdbDiffTransform(this.db)) +Graph.prototype.getStream = function (triple, opts) { + const stream = this.db.createReadStream(utils.createQuery(triple)) + return stream.pipe(new HyperdbDiffTransform(this.db, opts)) } -Graph.prototype.get = function (triple, callback) { - utils.collect(this.getStream(triple), callback) +Graph.prototype.get = function (triple, opts, callback) { + if (typeof opts === 'function') return this.get(triple, undefined, opts) + utils.collect(this.getStream(triple, opts), callback) } function doAction (action) { diff --git a/lib/HyperdbDiffTransform.js b/lib/HyperdbDiffTransform.js index cc3af7b..ef16d6c 100644 --- a/lib/HyperdbDiffTransform.js +++ b/lib/HyperdbDiffTransform.js @@ -2,35 +2,57 @@ const Transform = require('readable-stream').Transform const inherits = require('inherits') function HyperdbDiffTransform (db, options) { - this.db = db if (!(this instanceof HyperdbDiffTransform)) { - return new HyperdbDiffTransform(options) + return new HyperdbDiffTransform(db, options) } - Transform.call(this, Object.assign(options || {}, { objectMode: true })) - + var opts = options || {} + this.db = db + this._finished = false + this._count = 0 + this._filter = opts.filter + this._offset = opts.offset || 0 + this._limit = opts.limit && opts.limit + this._offset + Transform.call(this, Object.assign(opts, { objectMode: true })) + this._sources = [] this.once('pipe', (source) => { source.on('error', e => this.emit('error', e)) + this._sources.push(source) }) } inherits(HyperdbDiffTransform, Transform) -HyperdbDiffTransform.prototype._transform = function transform (chunk, encoding, done) { - if (chunk.type === 'put') { - const seq = chunk.nodes[0].seq - const feedSeq = chunk.nodes[0].feedSeq - this.db.get(chunk.name, (err, nodes) => { - if (err) { - this.emit('error', err) - done() - } - const node = nodes[0] - if (node.feedSeq === feedSeq && node.seq === seq) { - if (node.value !== null) this.push(JSON.parse(node.value.toString())) - } - done() - }) +HyperdbDiffTransform.prototype._transform = function transform (nodes, encoding, done) { + // if (chunk.type === 'put') { + // const seq = chunk.nodes[0].seq + // const feedSeq = chunk.nodes[0].feedSeq + // this.db.get(chunk.name, (err, nodes) => { + // if (err) { + // this.emit('error', err) + // done() + // } + // const node = nodes[0] + // if (node.feedSeq === feedSeq && node.seq === seq) { + // if (node.value !== null) this.push(JSON.parse(node.value.toString())) + // } + // done() + // }) + // } + if (this._finished) return done() + if (this._limit && this._count >= this._limit) { + this.push(null) + this._sources.forEach(source => source.destroy()) + this._finished = true + return + } + var value = nodes[0].value && JSON.parse(nodes[0].value.toString()) + if (value !== null && (!this._filter || this._filter(value))) { + if (this._count >= this._offset) { + this.push(value) + } + this._count++ } + done() } module.exports = HyperdbDiffTransform diff --git a/test/triple-store.spec.js b/test/triple-store.spec.js index fb49792..1415896 100644 --- a/test/triple-store.spec.js +++ b/test/triple-store.spec.js @@ -114,29 +114,29 @@ describe('a basic triple store', function () { stream.on('end', done) }) - xit('should get the triple if limit 1 is used', function (done) { - db.get({ limit: 1 }, (err, list) => { + it('should get the triple if limit 1 is used', function (done) { + db.get({}, { limit: 1 }, (err, list) => { expect(list).to.eql([triple]) done(err) }) }) - xit('should get the triple if limit 0 is used', function (done) { - db.get({ limit: 0 }, (err, list) => { + it('should get the triple if limit 0 is used', function (done) { + db.get({}, { limit: 0 }, (err, list) => { expect(list).to.eql([triple]) done(err) }) }) - xit('should get the triple if offset 0 is used', function (done) { - db.get({ offset: 0 }, (err, list) => { + it('should get the triple if offset 0 is used', function (done) { + db.get({}, { offset: 0 }, (err, list) => { expect(list).to.eql([triple]) done(err) }) }) - xit('should not get the triple if offset 1 is used', function (done) { - db.get({ offset: 1 }, (err, list) => { + it('should not get the triple if offset 1 is used', function (done) { + db.get({}, { offset: 1 }, (err, list) => { expect(list).to.eql([]) done(err) }) @@ -270,30 +270,30 @@ describe('a basic triple store', function () { stream.on('end', done) }) - xit('should return only one triple with limit 1', function (done) { - db.get({ predicate: 'b', limit: 1 }, (err, list) => { + it('should return only one triple with limit 1', function (done) { + db.get({ predicate: 'b' }, { limit: 1 }, (err, list) => { expect(list).to.eql([triple1]) done(err) }) }) - xit('should return two triples with limit 2', function (done) { - db.get({ predicate: 'b', limit: 2 }, (err, list) => { + it('should return two triples with limit 2', function (done) { + db.get({ predicate: 'b' }, { limit: 2 }, (err, list) => { expect(list).to.eql([triple1, triple2]) done(err) }) }) - xit('should return three triples with limit 3', function (done) { - db.get({ predicate: 'b', limit: 3 }, (err, list) => { + it('should return three triples with limit 3', function (done) { + db.get({ predicate: 'b' }, { limit: 3 }, (err, list) => { expect(list).to.eql([triple1, triple2]) done(err) }) }) - xit('should support limit over streams', function (done) { + it('should support limit over streams', function (done) { var triples = [triple1] - var stream = db.getStream({ predicate: 'b', limit: 1 }) + var stream = db.getStream({ predicate: 'b' }, { limit: 1 }) stream.on('data', function (data) { expect(data).to.eql(triples.shift()) }) @@ -301,23 +301,23 @@ describe('a basic triple store', function () { stream.on('end', done) }) - xit('should return only one triple with offset 1', function (done) { - db.get({ predicate: 'b', offset: 1 }, (err, list) => { + it('should return only one triple with offset 1', function (done) { + db.get({ predicate: 'b' }, { offset: 1 }, (err, list) => { expect(list).to.eql([triple2]) done(err) }) }) - xit('should return only no triples with offset 2', function (done) { - db.get({ predicate: 'b', offset: 2 }, (err, list) => { + it('should return only no triples with offset 2', function (done) { + db.get({ predicate: 'b' }, { offset: 2 }, (err, list) => { expect(list).to.eql([]) done(err) }) }) - xit('should support offset over streams', function (done) { + it('should support offset over streams', function (done) { var triples = [triple2] - var stream = db.getStream({ predicate: 'b', offset: 1 }) + var stream = db.getStream({ predicate: 'b' }, { offset: 1 }) stream.on('data', function (data) { expect(data).to.eql(triples.shift()) }) @@ -414,30 +414,30 @@ describe('a basic triple store', function () { stream.on('end', done) }) - xit('should return only one triple with limit 1', function (done) { - db.get({ predicate: 'b', limit: 1 }, (err, list) => { + it('should return only one triple with limit 1', function (done) { + db.get({ predicate: 'b' }, { limit: 1 }, (err, list) => { expect(list).to.eql([triple1]) done(err) }) }) - xit('should return two triples with limit 2', function (done) { - db.get({ predicate: 'b', limit: 2 }, (err, list) => { + it('should return two triples with limit 2', function (done) { + db.get({ predicate: 'b' }, { limit: 2 }, (err, list) => { expect(list).to.eql([triple1, triple2]) done(err) }) }) - xit('should return three triples with limit 3', function (done) { - db.get({ predicate: 'b', limit: 3 }, (err, list) => { + it('should return three triples with limit 3', function (done) { + db.get({ predicate: 'b' }, { limit: 3 }, (err, list) => { expect(list).to.eql([triple1, triple2]) done(err) }) }) - xit('should support limit over streams', function (done) { + it('should support limit over streams', function (done) { var triples = [triple1] - var stream = db.getStream({ predicate: 'b', limit: 1 }) + var stream = db.getStream({ predicate: 'b' }, { limit: 1 }) stream.on('data', function (data) { expect(data).to.eql(triples.shift()) }) @@ -445,23 +445,23 @@ describe('a basic triple store', function () { stream.on('end', done) }) - xit('should return only one triple with offset 1', function (done) { - db.get({ predicate: 'b', offset: 1 }, (err, list) => { + it('should return only one triple with offset 1', function (done) { + db.get({ predicate: 'b' }, { offset: 1 }, (err, list) => { expect(list).to.eql([triple2]) done(err) }) }) - xit('should return only no triples with offset 2', function (done) { - db.get({ predicate: 'b', offset: 2 }, (err, list) => { + it('should return only no triples with offset 2', function (done) { + db.get({ predicate: 'b' }, { offset: 2 }, (err, list) => { expect(list).to.eql([]) done(err) }) }) - xit('should support offset over streams', function (done) { + it('should support offset over streams', function (done) { var triples = [triple2] - var stream = db.getStream({ predicate: 'b', offset: 1 }) + var stream = db.getStream({ predicate: 'b' }, { offset: 1 }) stream.on('data', function (data) { expect(data).to.eql(triples.shift()) }) @@ -573,7 +573,7 @@ describe('a basic triple store', function () { }) }) - xit('should support filtering', function (done) { + it('should support filtering', function (done) { var triple1 = { subject: 'a', predicate: 'b', object: 'd' } var triple2 = { subject: 'a', predicate: 'b', object: 'c' } @@ -582,7 +582,7 @@ describe('a basic triple store', function () { return triple.object === 'd' } - db.get({ subject: 'a', predicate: 'b', filter: filter }, (err, results) => { + db.get({ subject: 'a', predicate: 'b' }, { filter: filter }, (err, results) => { expect(results).to.eql([triple1]) done(err) }) From 0da5b320d88606483780aaba16bc52ae9a598b07 Mon Sep 17 00:00:00 2001 From: Benjamin Forster Date: Mon, 8 Jan 2018 21:59:08 +1100 Subject: [PATCH 2/5] add filter and limit to join stream --- index.js | 18 ++++++++++++++---- lib/JoinStream.js | 9 +++++---- test/join-stream.spec.js | 10 +++++----- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/index.js b/index.js index 91c361c..3b5dbdc 100644 --- a/index.js +++ b/index.js @@ -76,8 +76,14 @@ Graph.prototype.searchStream = function (query, options) { } const plannedQuery = planner(query) - var streams = plannedQuery.map((triple) => { - return new JoinStream({ triple, db: this }) + var streams = plannedQuery.map((triple, i) => { + const limit = (options && i === plannedQuery.length - 1) ? options.limit : undefined + return new JoinStream({ + triple: utils.filterTriple(triple), + filter: triple.filter, + db: this, + limit + }) }) streams[0].start = true @@ -88,8 +94,12 @@ Graph.prototype.searchStream = function (query, options) { return result } -Graph.prototype.search = function (query, callback) { - utils.collect(this.searchStream(query), callback) +Graph.prototype.search = function (query, options, callback) { + if (typeof options === 'function') { + callback = options + options = undefined + } + utils.collect(this.searchStream(query, options), callback) } Graph.prototype.generateBatch = utils.generateBatch diff --git a/lib/JoinStream.js b/lib/JoinStream.js index 06d5557..e44e85c 100644 --- a/lib/JoinStream.js +++ b/lib/JoinStream.js @@ -16,11 +16,12 @@ function JoinStream (options) { this.matcher = matcher(options.triple) this.mask = queryMask(options.triple) this.maskUpdater = maskUpdater(options.triple) - this.limit = options.limit + this.limit = options && options.limit this._limitCounter = 0 this.db = options.db - this._index = options.index this._ended = false + this.filter = options && options.filter + this.offset = options && options.offset this.once('pipe', (source) => { source.on('error', (err) => { @@ -48,7 +49,7 @@ function JoinStream (options) { } } - this._indexPreferences = { index: this._index } + this._options = { filter: this.filter, offset: this.offset } } inherits(JoinStream, Transform) @@ -60,7 +61,7 @@ JoinStream.prototype._transform = function transform (solution, encoding, done) var newMask = this.maskUpdater(solution, this.mask) this._lastSolution = solution - this._readStream = this.db.getStream(newMask, this._indexPreferences) + this._readStream = this.db.getStream(newMask, this._options) this._readStream.on('data', this._onDataStream) this._readStream.on('error', this._onErrorStream) diff --git a/test/join-stream.spec.js b/test/join-stream.spec.js index 32aea12..37a0a00 100644 --- a/test/join-stream.spec.js +++ b/test/join-stream.spec.js @@ -281,7 +281,7 @@ describe('JoinStream', () => { }) }) - xit('should support filtering inside a condition', (done) => { + it('should support filtering inside a condition', (done) => { db.search([{ subject: db.v('x'), predicate: 'friend', @@ -293,7 +293,7 @@ describe('JoinStream', () => { }) }) - xit('should support filtering inside a second-level condition', (done) => { + it('should support filtering inside a second-level condition', (done) => { db.search([{ subject: 'matteo', predicate: 'friend', @@ -365,7 +365,7 @@ describe('JoinStream', () => { }) }) - xit('should return only one solution with limit 1', (done) => { + it('should return only one solution with limit 1', (done) => { db.search([{ subject: db.v('x'), predicate: 'friend', @@ -381,7 +381,7 @@ describe('JoinStream', () => { }) }) - xit('should return only one solution with limit 1 (bis)', (done) => { + it('should return only one solution with limit 1 (bis)', (done) => { db.search([{ subject: 'lucio', predicate: 'friend', @@ -392,7 +392,7 @@ describe('JoinStream', () => { object: db.v('x') }], { limit: 1 }, (err, results) => { expect(results).to.have.property('length', 1) - expect(results[0]).to.have.property('x', 'marco') + expect(results[0]).to.have.property('x', 'matteo') done(err) }) }) From 8449b0ce13fbfd43c34156f60a6c4e24e8e16f92 Mon Sep 17 00:00:00 2001 From: Benjamin Forster Date: Mon, 8 Jan 2018 22:10:11 +1100 Subject: [PATCH 3/5] update readme --- readme.md | 43 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/readme.md b/readme.md index e878b4f..9d57dde 100644 --- a/readme.md +++ b/readme.md @@ -46,10 +46,19 @@ Inserts **Hexastore** formated entries for triple into the graph database. Returns a writable stream. -#### `db.get(triple, [callback])` +#### `db.get(triple, [options], callback)` Returns all entries that match the triple. This allows for partial pattern-matching. For example `{ subject: 'a' })`, will return all triples with subject equal to 'a'. +Allowed options: +```js +{ + limit: number, // limit number of triples returned + offset: number, // offset returned + filter: function (triple) { return bool }, // filter the results +} +``` + #### `db.del(triple, [callback])` Remove triples indices from the graph database. @@ -58,13 +67,41 @@ Remove triples indices from the graph database. Returns a writable stream for removing entries. -#### `var stream = db.getStream(triple)` +#### `var stream = db.getStream(triple, [options])` Returns a readable stream of all matching triples. -#### `db.search(queries, [callback])` +Allowed options: +```js +{ + limit: number, // limit number of triples returned + offset: number, // offset returned + filter: function (triple) { return bool }, // filter the results +} +``` + +#### `db.search(queries, [options], callback)` Allows for Basic Graph Patterns searches where all queries must match. +Expects queries to be an array of triple options of the form: + +```js +{ + subject: String || Variable, // required + predicate: String || Variable, // required + object: String || Variable, // required + filter: Function, // optional +} +``` + +Allowed options: +```js +{ + limit: number, // limit number of results returned +} +``` + +filter: function (triple) { return bool }, ```js db.put([{ From a345b264c23a08999723de6345b8933c0d0931a4 Mon Sep 17 00:00:00 2001 From: Benjamin Forster Date: Sun, 14 Jan 2018 13:07:32 +1100 Subject: [PATCH 4/5] add readStream for hyperdb [temporary]; modified constructor to make hyperdb instance from opts. will remove createReadStream once hyperdb merges it. --- index.js | 40 +++- ...ffTransform.js => HyperdbReadTransform.js} | 27 +-- lib/hyperdbModifier.js | 195 ++++++++++++++++++ lib/utils.js | 6 +- package.json | 5 +- readme.md | 21 +- test/join-stream.spec.js | 3 +- test/triple-store.spec.js | 14 +- 8 files changed, 260 insertions(+), 51 deletions(-) rename lib/{HyperdbDiffTransform.js => HyperdbReadTransform.js} (56%) create mode 100644 lib/hyperdbModifier.js diff --git a/index.js b/index.js index 3b5dbdc..30316b2 100644 --- a/index.js +++ b/index.js @@ -1,23 +1,44 @@ -const PassThrough = require('readable-stream').PassThrough -const Transform = require('readable-stream').Transform +const hyperdb = require('hyperdb') +const stream = require('readable-stream') const pump = require('pump') +const inherits = require('inherits') +const events = require('events') const utils = require('./lib/utils') const Variable = require('./lib/Variable') -const HyperdbDiffTransform = require('./lib/HyperdbDiffTransform') +const HyperdbReadTransform = require('./lib/HyperdbReadTransform') const JoinStream = require('./lib/JoinStream') const planner = require('./lib/planner') +const attachCreateReadStream = require('./lib/hyperdbModifier').attachCreateReadStream -function Graph (db, opts) { - if (!(this instanceof Graph)) return new Graph(db, opts) - this.db = db +const Transform = stream.Transform +const PassThrough = stream.PassThrough + +// temporarily augment hyperdb prototype to include createReadStream +if (!hyperdb.createReadStream) { + attachCreateReadStream(hyperdb) +} + +function Graph (storage, key, opts) { + if (!(this instanceof Graph)) return new Graph(storage, key, opts) + events.EventEmitter.call(this) + this.db = hyperdb(storage, key, opts) + + this.db.on('error', (e) => { + this.emit('error', e) + }) + this.db.on('ready', (e) => { + this.emit('ready', e) + }) } +inherits(Graph, events.EventEmitter) + Graph.prototype.v = (name) => new Variable(name) Graph.prototype.getStream = function (triple, opts) { const stream = this.db.createReadStream(utils.createQuery(triple)) - return stream.pipe(new HyperdbDiffTransform(this.db, opts)) + return stream.pipe(new HyperdbReadTransform(this.db, opts)) } Graph.prototype.get = function (triple, opts, callback) { @@ -56,12 +77,11 @@ function doActionStream (action) { } } -// this is not implemented in hyperdb yet -// for now we just put a null value in the db - Graph.prototype.put = doAction('put') Graph.prototype.putStream = doActionStream('put') +// this is not implemented in hyperdb yet +// for now we just put a null value in the db Graph.prototype.del = doAction('del') Graph.prototype.delStream = doActionStream('del') diff --git a/lib/HyperdbDiffTransform.js b/lib/HyperdbReadTransform.js similarity index 56% rename from lib/HyperdbDiffTransform.js rename to lib/HyperdbReadTransform.js index ef16d6c..5293dfe 100644 --- a/lib/HyperdbDiffTransform.js +++ b/lib/HyperdbReadTransform.js @@ -1,9 +1,9 @@ const Transform = require('readable-stream').Transform const inherits = require('inherits') -function HyperdbDiffTransform (db, options) { - if (!(this instanceof HyperdbDiffTransform)) { - return new HyperdbDiffTransform(db, options) +function HyperdbReadTransform (db, options) { + if (!(this instanceof HyperdbReadTransform)) { + return new HyperdbReadTransform(db, options) } var opts = options || {} this.db = db @@ -20,24 +20,9 @@ function HyperdbDiffTransform (db, options) { }) } -inherits(HyperdbDiffTransform, Transform) +inherits(HyperdbReadTransform, Transform) -HyperdbDiffTransform.prototype._transform = function transform (nodes, encoding, done) { - // if (chunk.type === 'put') { - // const seq = chunk.nodes[0].seq - // const feedSeq = chunk.nodes[0].feedSeq - // this.db.get(chunk.name, (err, nodes) => { - // if (err) { - // this.emit('error', err) - // done() - // } - // const node = nodes[0] - // if (node.feedSeq === feedSeq && node.seq === seq) { - // if (node.value !== null) this.push(JSON.parse(node.value.toString())) - // } - // done() - // }) - // } +HyperdbReadTransform.prototype._transform = function transform (nodes, encoding, done) { if (this._finished) return done() if (this._limit && this._count >= this._limit) { this.push(null) @@ -55,4 +40,4 @@ HyperdbDiffTransform.prototype._transform = function transform (nodes, encoding, done() } -module.exports = HyperdbDiffTransform +module.exports = HyperdbReadTransform diff --git a/lib/hyperdbModifier.js b/lib/hyperdbModifier.js new file mode 100644 index 0000000..8e3aa14 --- /dev/null +++ b/lib/hyperdbModifier.js @@ -0,0 +1,195 @@ +var hash = require('hyperdb/lib/hash') +var Readable = require('readable-stream').Readable +var LRU = require('lru') + +module.exports = { attachCreateReadStream } + +function attachCreateReadStream (DB) { + DB.prototype.createReadStream = createReadStream +} + +function createReadStream (key, opts) { + if (!opts) opts = {} + var self = this + var path = hash(key, true) + var cacheMax = opts.cacheSize || 128 + var keyCache = new LRU(cacheMax) + var streamQueue + var queueNeedsSorting = true + var stream = new Readable({ objectMode: true }) + stream._read = read + + return stream + + function read () { + if (stream.destroyed) return + // if no heads - get heads and process first tries + if (!streamQueue) { + self.heads(function (err, heads) { + if (err) stream.emit('error', err) + if (!heads.length) { + stream.push(null) + return + } + streamQueue = heads.map(h => ({ node: h, index: 0 })) + next() + }) + return + } + next() + } + + function next () { + if (!streamQueue.length) { + stream.push(null) + return + } + // sort stream queue first to ensure that you always get the latest node + // this requires offsetting feeds sequences based on when it started in relation to others + if (queueNeedsSorting) streamQueue.sort(sortQueueByClockAndSeq) + var data = streamQueue.pop() + var node = data.node + readNext(node, data.index, (err, match) => { + if (err) { + return stream.emit('error', err) + } + if (!match) return next() + // check if really a match and not encountered before + check(node, (matchingNode) => { + if (!matchingNode) next() + else { + keyCache.set(node.key, true) + stream.push(matchingNode) + } + }) + }) + } + + function sortQueueByClockAndSeq (a, b) { + a = a.node + b = b.node + var sortValue = sortNodesByClock(a, b) + if (sortValue !== 0) return sortValue + // same time, so use sequence to order + if (a.feed === b.feed) return a.seq - b.seq + var bOffset = b.clock.reduce((p, v) => p + v, b.seq) + var aOffset = a.clock.reduce((p, v) => p + v, a.seq) + // if real sequence is the same then return sort on feed + if (bOffset === aOffset) return b.feed - a.feed + return aOffset - bOffset + } + + function check (node, cb) { + // is it actually a match and not a collision + if (!(node && node.key && node.key.indexOf(key) === 0)) return cb() + // have we encountered this node before + if (keyCache.get(node.key)) return cb() + // it is not in the cache but might still be a duplicate if cache is full + // if (keyCache.length === cacheMax) { + // so check if this is the first instance of the node + // TODO: Atm this is a bit of a hack to get conflicting values + // ideally this should not need to retraverse the trie. + // Potential issue here when db is updated after stream was created! + return self._get(node.key, false, [], noop, (err, latest) => { + if (err) return stream.emit('error', err) + if (sortNodesByClock(node, Array.isArray(latest) ? latest[0] : latest) >= 0) { + cb(latest) + } else { + cb() + } + }) + } + + function readNext (node, i, cb) { + var writers = self._writers + var trie + var missing = 0 + var error + var vals + for (; i < path.length - 1; i++) { + if (node.path[i] === path[i]) continue + // check trie + trie = node.trie[i] + if (!trie) { + return cb(null) + } + vals = trie[path[i]] + // not found + if (!vals || !vals.length) { + return cb(null) + } + + missing = vals.length + error = null + for (var j = 0; j < vals.length; j++) { + // fetch potential + writers[vals[j].feed].get(vals[j].seq, (err, val) => { + if (err) { + error = err + } else { + pushToQueue({ node: val, index: i }) + } + missing-- + if (!missing) { + cb(error) + } + }) + } + return + } + + // Traverse the rest of the node's trie, recursively, + // hunting for more nodes with the desired prefix. + for (; i < node.trie.length; i++) { + trie = node.trie[i] || [] + for (j = 0; j < trie.length; j++) { + var entrySet = trie[j] || [] + for (var el = 0; el < entrySet.length; el++) { + var entry = entrySet[el] + missing++ + writers[entry.feed].get(entry.seq, (err, val) => { + if (err) { + error = err + } else if (val.key && val.value) { + pushToQueue({ node: val, index: i + 1 }) + } + missing-- + if (!missing) { + if (i < node.trie.length) { + pushToQueue({ node: node, index: i + 1 }) + cb(error, false) + } else { + cb(error, true) + } + } + }) + } + } + if (missing > 0) return + } + return cb(null, true) + } + + function pushToQueue (item) { + queueNeedsSorting = streamQueue.length > 0 && (sortQueueByClockAndSeq(item, streamQueue[streamQueue.length - 1]) < 0) + streamQueue.push(item) + } +} + +function sortNodesByClock (a, b) { + var isGreater = false + var isLess = false + var length = a.clock.length + if (b.clock.length > length) length = b.clock.length + for (var i = 0; i < length; i++) { + var diff = (a.clock[i] || 0) - (b.clock[i] || 0) + if (diff > 0) isGreater = true + if (diff < 0) isLess = true + } + if (isGreater && isLess) return 0 + if (isLess) return -1 + if (isGreater) return 1 + return 0 +} + +function noop () {} diff --git a/lib/utils.js b/lib/utils.js index eece0c5..af3425a 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -2,10 +2,10 @@ const Variable = require('./Variable') const defs = { spo: ['subject', 'predicate', 'object'], - sop: ['subject', 'object', 'predicate'], + sop: ['subject', 'object', 'predicate'], // [optional] pos: ['predicate', 'object', 'subject'], - pso: ['predicate', 'subject', 'object'], - ops: ['object', 'predicate', 'subject'], + pso: ['predicate', 'subject', 'object'], // [optional] + ops: ['object', 'predicate', 'subject'], // [optional] osp: ['object', 'subject', 'predicate'] } const defKeys = Object.keys(defs) diff --git a/package.json b/package.json index d2a0811..203b2ab 100644 --- a/package.json +++ b/package.json @@ -12,8 +12,9 @@ }, "dependencies": { "hyperdb": "^2.0.0", - "readable-stream": "^2.3.3", - "pump": "^2.0.0" + "lru": "^3.1.0", + "pump": "^2.0.0", + "readable-stream": "^2.3.3" }, "devDependencies": { "chai": "^4.1.2", diff --git a/readme.md b/readme.md index 9d57dde..c0d2842 100644 --- a/readme.md +++ b/readme.md @@ -17,10 +17,9 @@ This requires node v6.x.x or greater. ## basic usage ```js -var hyperdb = require('hyperdb') var hypergraph = require('hyper-graph-db') -var db = hypergraph(hyperdb('./my.db', {valueEncoding: 'utf-8'})) +var db = hypergraph('./my.db', { valueEncoding: 'utf-8' }) var triple = { subject: 'a', predicate: 'b', object: 'c' } @@ -34,9 +33,23 @@ db.put(triple, function (err) { ## API -#### `var db = hypergraph(hyperdb)` +#### `var db = hypergraph(storage, [key], [options])` -Returns an instance of hyper-graph-db using the hyperdb passed to it. +Returns an instance of hyper-graph-db. Arguments are passed directly to hyperdb, look at its constructor [API](https://github.com/mafintosh/hyperdb#var-db--hyperdbstorage-key-options) for configuration options. + + +#### `db.on('ready')` + +*This event is passed on from underlying hyperdb instance.* + +Emitted exactly once: when the db is fully ready and all static properties have +been set. You do not need to wait for this when calling any async functions. + +#### `db.on('error', err)` + +*This event is passed on from underlying hyperdb instance.* + +Emitted if there was a critical error before `db` is ready. #### `db.put(triple, [callback])` diff --git a/test/join-stream.spec.js b/test/join-stream.spec.js index 37a0a00..65efdb9 100644 --- a/test/join-stream.spec.js +++ b/test/join-stream.spec.js @@ -3,7 +3,6 @@ const expect = require('chai').expect const ram = require('random-access-memory') const hypergraph = require('../index') -const hyperdb = require('hyperdb') const fixture = require('./fixture/foaf') function ramStore (filename) { @@ -16,7 +15,7 @@ function ramStore (filename) { describe('JoinStream', () => { let db beforeEach((done) => { - db = hypergraph(hyperdb(ramStore)) + db = hypergraph(ramStore) db.put(fixture, done) }) diff --git a/test/triple-store.spec.js b/test/triple-store.spec.js index 1415896..d0aca28 100644 --- a/test/triple-store.spec.js +++ b/test/triple-store.spec.js @@ -2,7 +2,6 @@ const expect = require('chai').expect const ram = require('random-access-memory') const hypergraph = require('../index') -const hyperdb = require('hyperdb') function ramStore (filename) { // filename will be one of: data, bitfield, tree, signatures, key, secret_key @@ -13,11 +12,9 @@ function ramStore (filename) { describe('a basic triple store', function () { let db - let hyper beforeEach(function () { - hyper = hyperdb(ramStore) - db = hypergraph(hyper) + db = hypergraph(ramStore) }) afterEach(function (done) { @@ -209,7 +206,8 @@ describe('a basic triple store', function () { it('should put a triple with an object to false', function (done) { var t = { subject: 'a', predicate: 'b', object: false } db.put(t, function () { - hyper.get('spo::a::b::false', done) + // accessing underlying db instance + db.db.get('spo/a/b/false', done) }) }) @@ -598,7 +596,7 @@ describe('deferred open support', function () { }) it('should support deferred search', function (done) { - db = hypergraph(hyperdb(ramStore)) + db = hypergraph(ramStore) db.search([{ predicate: 'likes' }], function () { done() }) @@ -607,11 +605,9 @@ describe('deferred open support', function () { describe('generateBatch', function () { var db - var hyper beforeEach(function () { - hyper = hyperdb(ramStore) - db = hypergraph(hyper) + db = hypergraph(ramStore) }) afterEach(function (done) { From bb2f318e7c6d52d9514ce1ee64f7bf221b9713bf Mon Sep 17 00:00:00 2001 From: Benjamin Forster Date: Sun, 14 Jan 2018 13:07:59 +1100 Subject: [PATCH 5/5] 0.1.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 203b2ab..b0fc45e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hyper-graph-db", - "version": "0.0.2", + "version": "0.1.0", "description": "A distributed graph database built upon hyperdb.", "main": "index.js", "repository": {