Skip to content

Commit

Permalink
Merge bb2f318 into 054e04b
Browse files Browse the repository at this point in the history
  • Loading branch information
e-e-e committed Jan 14, 2018
2 parents 054e04b + bb2f318 commit 2e86f60
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 125 deletions.
67 changes: 49 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,49 @@
const PassThrough = require('readable-stream').PassThrough
const Transform = require('readable-stream').Transform
const hyperdb = require('hyperdb')
const stream = require('readable-stream')
const pump = require('pump')
const inherits = require('inherits')
const events = require('events')

const utils = require('./lib/utils')
const Variable = require('./lib/Variable')
const HyperdbDiffTransform = require('./lib/HyperdbDiffTransform')
const HyperdbReadTransform = require('./lib/HyperdbReadTransform')
const JoinStream = require('./lib/JoinStream')
const planner = require('./lib/planner')
const attachCreateReadStream = require('./lib/hyperdbModifier').attachCreateReadStream

function Graph (db, opts) {
if (!(this instanceof Graph)) return new Graph(db, opts)
this.db = db
const Transform = stream.Transform
const PassThrough = stream.PassThrough

// temporarily augment hyperdb prototype to include createReadStream
if (!hyperdb.createReadStream) {
attachCreateReadStream(hyperdb)
}

function Graph (storage, key, opts) {
if (!(this instanceof Graph)) return new Graph(storage, key, opts)
events.EventEmitter.call(this)
this.db = hyperdb(storage, key, opts)

this.db.on('error', (e) => {
this.emit('error', e)
})
this.db.on('ready', (e) => {
this.emit('ready', e)
})
}

inherits(Graph, events.EventEmitter)

Graph.prototype.v = (name) => new Variable(name)

Graph.prototype.getStream = function (triple) {
const stream = this.db.createDiffStream(utils.createQuery(triple))
return stream.pipe(new HyperdbDiffTransform(this.db))
Graph.prototype.getStream = function (triple, opts) {
const stream = this.db.createReadStream(utils.createQuery(triple))
return stream.pipe(new HyperdbReadTransform(this.db, opts))
}

Graph.prototype.get = function (triple, callback) {
utils.collect(this.getStream(triple), callback)
Graph.prototype.get = function (triple, opts, callback) {
if (typeof opts === 'function') return this.get(triple, undefined, opts)
utils.collect(this.getStream(triple, opts), callback)
}

function doAction (action) {
Expand Down Expand Up @@ -55,12 +77,11 @@ function doActionStream (action) {
}
}

// this is not implemented in hyperdb yet
// for now we just put a null value in the db

Graph.prototype.put = doAction('put')
Graph.prototype.putStream = doActionStream('put')

// this is not implemented in hyperdb yet
// for now we just put a null value in the db
Graph.prototype.del = doAction('del')
Graph.prototype.delStream = doActionStream('del')

Expand All @@ -75,8 +96,14 @@ Graph.prototype.searchStream = function (query, options) {
}
const plannedQuery = planner(query)

var streams = plannedQuery.map((triple) => {
return new JoinStream({ triple, db: this })
var streams = plannedQuery.map((triple, i) => {
const limit = (options && i === plannedQuery.length - 1) ? options.limit : undefined
return new JoinStream({
triple: utils.filterTriple(triple),
filter: triple.filter,
db: this,
limit
})
})

streams[0].start = true
Expand All @@ -87,8 +114,12 @@ Graph.prototype.searchStream = function (query, options) {
return result
}

Graph.prototype.search = function (query, callback) {
utils.collect(this.searchStream(query), callback)
Graph.prototype.search = function (query, options, callback) {
if (typeof options === 'function') {
callback = options
options = undefined
}
utils.collect(this.searchStream(query, options), callback)
}

Graph.prototype.generateBatch = utils.generateBatch
Expand Down
36 changes: 0 additions & 36 deletions lib/HyperdbDiffTransform.js

This file was deleted.

43 changes: 43 additions & 0 deletions lib/HyperdbReadTransform.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const Transform = require('readable-stream').Transform
const inherits = require('inherits')

function HyperdbReadTransform (db, options) {
if (!(this instanceof HyperdbReadTransform)) {
return new HyperdbReadTransform(db, options)
}
var opts = options || {}
this.db = db
this._finished = false
this._count = 0
this._filter = opts.filter
this._offset = opts.offset || 0
this._limit = opts.limit && opts.limit + this._offset
Transform.call(this, Object.assign(opts, { objectMode: true }))
this._sources = []
this.once('pipe', (source) => {
source.on('error', e => this.emit('error', e))
this._sources.push(source)
})
}

inherits(HyperdbReadTransform, Transform)

HyperdbReadTransform.prototype._transform = function transform (nodes, encoding, done) {
if (this._finished) return done()
if (this._limit && this._count >= this._limit) {
this.push(null)
this._sources.forEach(source => source.destroy())
this._finished = true
return
}
var value = nodes[0].value && JSON.parse(nodes[0].value.toString())
if (value !== null && (!this._filter || this._filter(value))) {
if (this._count >= this._offset) {
this.push(value)
}
this._count++
}
done()
}

module.exports = HyperdbReadTransform
9 changes: 5 additions & 4 deletions lib/JoinStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ function JoinStream (options) {
this.matcher = matcher(options.triple)
this.mask = queryMask(options.triple)
this.maskUpdater = maskUpdater(options.triple)
this.limit = options.limit
this.limit = options && options.limit
this._limitCounter = 0
this.db = options.db
this._index = options.index
this._ended = false
this.filter = options && options.filter
this.offset = options && options.offset

this.once('pipe', (source) => {
source.on('error', (err) => {
Expand Down Expand Up @@ -48,7 +49,7 @@ function JoinStream (options) {
}
}

this._indexPreferences = { index: this._index }
this._options = { filter: this.filter, offset: this.offset }
}

inherits(JoinStream, Transform)
Expand All @@ -60,7 +61,7 @@ JoinStream.prototype._transform = function transform (solution, encoding, done)
var newMask = this.maskUpdater(solution, this.mask)

this._lastSolution = solution
this._readStream = this.db.getStream(newMask, this._indexPreferences)
this._readStream = this.db.getStream(newMask, this._options)

this._readStream.on('data', this._onDataStream)
this._readStream.on('error', this._onErrorStream)
Expand Down
Loading

0 comments on commit 2e86f60

Please sign in to comment.