Skip to content

Commit

Permalink
Merge pull request #8 from e-e-e/namespaces
Browse files Browse the repository at this point in the history
Add prefixes, and option for index config
  • Loading branch information
e-e-e committed Mar 27, 2018
2 parents ec0c2fc + f013dc8 commit 29f9c06
Show file tree
Hide file tree
Showing 17 changed files with 810 additions and 399 deletions.
208 changes: 193 additions & 15 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,167 @@ const inherits = require('inherits')
const events = require('events')
const SparqlIterator = require('sparql-iterator')

const constants = require('./lib/constants')
const utils = require('./lib/utils')
const prefixes = require('./lib/prefixes')
const Variable = require('./lib/Variable')
const HyperdbReadTransform = require('./lib/HyperdbReadTransform')
const JoinStream = require('./lib/JoinStream')
const planner = require('./lib/planner')
const attachCreateReadStream = require('./lib/hyperdbModifier').attachCreateReadStream
const pkg = require('./package.json')

const Transform = stream.Transform
const PassThrough = stream.PassThrough

// temporarily augment hyperdb prototype to include createReadStream
if (!hyperdb.createReadStream) {
attachCreateReadStream(hyperdb)
}

function Graph (storage, key, opts) {
if (!(this instanceof Graph)) return new Graph(storage, key, opts)
events.EventEmitter.call(this)

if (typeof key === 'string') key = Buffer.from(key, 'hex')

if (!Buffer.isBuffer(key) && !opts) {
opts = key
key = null
}

if (!opts) opts = {}
this.db = hyperdb(storage, key, opts)
this._prefixes = Object.assign({}, opts.prefixes || constants.DEFAULT_PREFIXES)
this._basename = opts.name || constants.DEFAULT_BASE
this._prefixes._ = this._basename
this._indexType = opts.index === 'tri' ? 'tri' : 'hex'
this._indexes = opts.index === 'tri'
? constants.HEXSTORE_INDEXES_REDUCED
: constants.HEXSTORE_INDEXES
this._indexKeys = Object.keys(this._indexes)

this.db.on('error', (e) => {
this.emit('error', e)
})
this.db.on('ready', (e) => {
this.emit('ready', e)
if (utils.isNewDatabase(this.db)) {
this._onNew((err) => {
if (err) return this.emit('error', err)
this.emit('ready', e)
})
} else {
this._onInit((err) => {
if (err) return this.emit('error', err)
this.emit('ready', e)
})
}
})
}

inherits(Graph, events.EventEmitter)

Graph.prototype._onNew = function (cb) {
this._version = pkg.version
const metadata = [
['@version', pkg.version],
['@index', this._indexType],
['@name', this._basename]
]
Object.keys(this._prefixes).forEach((key) => {
if (key !== '_') {
metadata.push([prefixes.toKey(key), this._prefixes[key]])
}
})
utils.put(this.db, metadata, cb)
}

Graph.prototype._onInit = function (cb) {
// get and set graph version
this._version = null
this._basename = null
this._indexType = null

let missing = 4
let error = null
// get and set version
this.graphVersion((err, version) => {
if (err) error = err
this._version = version
maybeDone()
})
// get and set graph name
this.name((err, name) => {
if (err) error = err
this._basename = name || constants.DEFAULT_BASE
// modify prefixes to ensure correct namespacing
this._prefixes._ = this._basename
maybeDone()
})
// get and set graph indexation
this.indexType((err, index) => {
if (err) error = err
this._indexType = index || 'hex'
this._indexes = index === 'tri'
? constants.HEXSTORE_INDEXES_REDUCED
: constants.HEXSTORE_INDEXES
this._indexKeys = Object.keys(this._indexes)
maybeDone()
})
// get and set prefixes
this.prefixes((err, prefixes) => {
if (err) error = err
this._prefixes = Object.assign({ _: this._prefixes }, prefixes)
maybeDone()
})
function maybeDone () {
missing--
if (!missing) {
cb(error)
}
}
}

Graph.prototype.v = (name) => new Variable(name)

function returnValueAsString (cb) {
return (err, nodes) => {
if (err) return cb(err)
if (!nodes || nodes.length === 0) return cb(null, null)
cb(null, nodes[0].value.toString())
}
}

Graph.prototype.graphVersion = function (cb) {
if (this._version) return cb(null, this._version)
this.db.get('@version', returnValueAsString(cb))
}

Graph.prototype.name = function (cb) {
if (this._basename) return cb(null, this._basename)
this.db.get('@name', returnValueAsString(cb))
}

Graph.prototype.indexType = function (cb) {
if (this._indexType) return cb(null, this._indexType)
this.db.get('@index', returnValueAsString(cb))
}

Graph.prototype.prefixes = function (callback) {
// should cache this somehow
const prefixStream = this.db.createReadStream(constants.PREFIX_KEY)
utils.collect(prefixStream, (err, data) => {
if (err) return callback(err)
var names = data.reduce((p, nodes) => {
var data = prefixes.fromNodes(nodes)
p[data.prefix] = data.uri
return p
}, {})
callback(null, names)
})
}

Graph.prototype.addPrefix = function (prefix, uri, cb) {
this.db.put(prefixes.toKey(prefix), uri, cb)
}

Graph.prototype.getStream = function (triple, opts) {
const stream = this.db.createReadStream(utils.createQuery(triple))
return stream.pipe(new HyperdbReadTransform(this.db, opts))
const stream = this.db.createReadStream(this._createQuery(triple, { encode: (!opts || opts.encode === undefined) ? true : opts.encode }))
return stream.pipe(new HyperdbReadTransform(this.db, this._basename, opts))
}

Graph.prototype.get = function (triple, opts, callback) {
Expand All @@ -52,21 +178,22 @@ function doAction (action) {
if (!triples) return callback(new Error('Must pass triple'))
let entries = (!triples.reduce) ? [triples] : triples
entries = entries.reduce((prev, triple) => {
return prev.concat(this.generateBatch(triple, action))
return prev.concat(this._generateBatch(triple, action))
}, [])
this.db.batch(entries.reverse(), callback)
}
}

function doActionStream (action) {
return function () {
const self = this
const transform = new Transform({
objectMode: true,
transform (triples, encoding, done) {
if (!triples) return done()
let entries = (!triples.reduce) ? [triples] : triples
entries = entries.reduce((prev, triple) => {
return prev.concat(utils.generateBatch(triple, action))
return prev.concat(self._generateBatch(triple, action))
}, [])
this.push(entries.reverse())
done()
Expand Down Expand Up @@ -95,8 +222,7 @@ Graph.prototype.searchStream = function (query, options) {
} else if (!Array.isArray(query)) {
query = [ query ]
}
const plannedQuery = planner(query)

const plannedQuery = planner(query, this._prefixes)
var streams = plannedQuery.map((triple, i) => {
const limit = (options && i === plannedQuery.length - 1) ? options.limit : undefined
return new JoinStream({
Expand Down Expand Up @@ -131,10 +257,62 @@ Graph.prototype.query = function (query, callback) {
utils.collect(this.queryStream(query), callback)
}

Graph.prototype.generateBatch = utils.generateBatch

Graph.prototype.close = function (callback) {
callback()
}

/* PRIVATE FUNCTIONS */

Graph.prototype._generateBatch = function (triple, action) {
if (!action) action = 'put'
var data = null
if (action === 'put') {
data = JSON.stringify(utils.extraDataMask(triple))
}
return this._encodeKeys(triple).map(key => ({
type: 'put', // no delete in hyperdb so just putting nulls
key: key,
value: data
}))
}

Graph.prototype._encodeKeys = function (triple) {
const encodedTriple = utils.encodeTriple(triple, this._prefixes)
return this._indexKeys.map(key => utils.encodeKey(key, encodedTriple))
}

Graph.prototype._createQuery = function (pattern, options) {
var types = utils.typesFromPattern(pattern)
var preferedIndex = options && options.index
var index = this._findIndex(types, preferedIndex)
const encodedTriple = utils.encodeTriple(pattern, options.encode ? this._prefixes : { _: this._basename })
var key = utils.encodeKey(index, encodedTriple)
return key
}

Graph.prototype._possibleIndexes = function (types) {
var result = this._indexKeys.filter((key) => {
var matches = 0
return this._indexes[key].every(function (e, i) {
if (types.indexOf(e) >= 0) {
matches++
return true
}
if (matches === types.length) {
return true
}
})
})
result.sort()
return result
}

Graph.prototype._findIndex = function (types, preferedIndex) {
var result = this._possibleIndexes(types)
if (preferedIndex && result.some(r => r === preferedIndex)) {
return preferedIndex
}
return result[0]
}

module.exports = Graph
23 changes: 12 additions & 11 deletions lib/HyperdbReadTransform.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ const Transform = require('readable-stream').Transform
const inherits = require('inherits')
const utils = require('./utils')

function HyperdbReadTransform (db, options) {
function HyperdbReadTransform (db, basename, options) {
if (!(this instanceof HyperdbReadTransform)) {
return new HyperdbReadTransform(db, options)
return new HyperdbReadTransform(db, basename, options)
}
var opts = options || {}
this.db = db
this._finished = false
this._prefixes = { _: basename }
this._count = 0
this._filter = opts.filter
this._offset = opts.offset || 0
Expand All @@ -24,23 +24,24 @@ function HyperdbReadTransform (db, options) {
inherits(HyperdbReadTransform, Transform)

HyperdbReadTransform.prototype._transform = function transform (nodes, encoding, done) {
if (this._finished) return done()
if (this._limit && this._count >= this._limit) {
this.push(null)
this._sources.forEach(source => source.destroy())
this._finished = true
return
}
var value = nodes[0].value && JSON.parse(nodes[0].value.toString())
if (value === null) return done()
value = Object.assign(value, utils.decodeKey(nodes[0].key))
value = Object.assign(value, utils.decodeKey(nodes[0].key, this._prefixes))
if (!this._filter || this._filter(value)) {
if (this._count >= this._offset) {
this.push(value)
}
this._count++
if (this._limit && this._count >= this._limit) {
this.end()
}
}
done()
}

HyperdbReadTransform.prototype._flush = function (done) {
this._sources.forEach(source => source.destroy())
done()
}

module.exports = HyperdbReadTransform
12 changes: 8 additions & 4 deletions lib/JoinStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ function JoinStream (options) {
this.matcher = matcher(options.triple)
this.mask = queryMask(options.triple)
this.maskUpdater = maskUpdater(options.triple)
this.limit = options && options.limit
this.limit = options.limit
this._limitCounter = 0
this.db = options.db
this._ended = false
this.filter = options && options.filter
this.offset = options && options.offset
this.filter = options.filter
this.offset = options.offset

this.once('pipe', (source) => {
source.on('error', (err) => {
Expand Down Expand Up @@ -49,7 +49,11 @@ function JoinStream (options) {
}
}

this._options = { filter: this.filter, offset: this.offset }
this._options = {
filter: this.filter,
offset: this.offset,
encode: options.encode ? !!options.encode : false
}
}

inherits(JoinStream, Transform)
Expand Down
26 changes: 26 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
const HEXSTORE_INDEXES = {
spo: ['subject', 'predicate', 'object'],
pos: ['predicate', 'object', 'subject'],
osp: ['object', 'subject', 'predicate'],
sop: ['subject', 'object', 'predicate'], // [optional]
pso: ['predicate', 'subject', 'object'], // [optional]
ops: ['object', 'predicate', 'subject'] // [optional]
}
const HEXSTORE_INDEXES_REDUCED = {
spo: HEXSTORE_INDEXES.spo,
pos: HEXSTORE_INDEXES.pos,
osp: HEXSTORE_INDEXES.osp
}
const PREFIX_KEY = '@prefix/'
const DEFAULT_BASE = 'hg://'
const DEFAULT_PREFIXES = {
foaf: 'http://xmlns.com/foaf/0.1/'
}

module.exports = {
PREFIX_KEY,
DEFAULT_BASE,
DEFAULT_PREFIXES,
HEXSTORE_INDEXES,
HEXSTORE_INDEXES_REDUCED
}
Loading

0 comments on commit 29f9c06

Please sign in to comment.