Skip to content

Commit

Permalink
refactor to enable configuration of index and prefixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjamin Forster committed Feb 15, 2018
1 parent c9de920 commit 1e4ba05
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 117 deletions.
80 changes: 72 additions & 8 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const inherits = require('inherits')
const events = require('events')
const SparqlIterator = require('sparql-iterator')

const constants = require('./lib/constants')
const utils = require('./lib/utils')
const prefixes = require('./lib/prefixes')
const Variable = require('./lib/Variable')
Expand All @@ -26,21 +27,29 @@ function Graph (storage, key, opts) {
events.EventEmitter.call(this)
this.db = hyperdb(storage, key, opts)

// what are the default prefixes
this._prefixes = constants.DEFAULT_PREFIXES
this._indexes = (opts && opts.index === 'small')
? constants.HEXSTORE_INDEXES_REDUCED
: constants.HEXSTORE_INDEXES
this._indexKeys = Object.keys(this._indexes)

this.db.on('error', (e) => {
this.emit('error', e)
})
this.db.on('ready', (e) => {
this.emit('ready', e)
// check prefixes
})
}

inherits(Graph, events.EventEmitter)

Graph.prototype.v = (name) => new Variable(name)

Graph.prototype.prefixes = function (callback) {
Graph.prototype.listPrefixes = function (callback) {
// should cache this somehow
const prefixStream = this.db.createReadStream(prefixes.PREFIX_KEY)
const prefixStream = this.db.createReadStream(constants.PREFIX_KEY)
utils.collect(prefixStream, (err, data) => {
if (err) return callback(err)
var names = data.reduce((p, nodes) => {
Expand All @@ -57,8 +66,8 @@ Graph.prototype.addPrefix = function (prefix, uri, cb) {
}

Graph.prototype.getStream = function (triple, opts) {
const stream = this.db.createReadStream(utils.createQuery(triple))
return stream.pipe(new HyperdbReadTransform(this.db, opts))
const stream = this.db.createReadStream(this._createQuery(triple))
return stream.pipe(new HyperdbReadTransform(this.db, this._prefixes, opts))
}

Graph.prototype.get = function (triple, opts, callback) {
Expand All @@ -71,21 +80,22 @@ function doAction (action) {
if (!triples) return callback(new Error('Must pass triple'))
let entries = (!triples.reduce) ? [triples] : triples
entries = entries.reduce((prev, triple) => {
return prev.concat(this.generateBatch(triple, action))
return prev.concat(this._generateBatch(triple, action))
}, [])
this.db.batch(entries.reverse(), callback)
}
}

function doActionStream (action) {
return function () {
const self = this
const transform = new Transform({
objectMode: true,
transform (triples, encoding, done) {
if (!triples) return done()
let entries = (!triples.reduce) ? [triples] : triples
entries = entries.reduce((prev, triple) => {
return prev.concat(utils.generateBatch(triple, action))
return prev.concat(self._generateBatch(triple, action))
}, [])
this.push(entries.reverse())
done()
Expand Down Expand Up @@ -150,10 +160,64 @@ Graph.prototype.query = function (query, callback) {
utils.collect(this.queryStream(query), callback)
}

Graph.prototype.generateBatch = utils.generateBatch

Graph.prototype.close = function (callback) {
callback()
}

/* PRIVATE FUNCTIONS */

Graph.prototype._generateBatch = function (triple, action) {
if (!action) action = 'put'
var data = null
if (action === 'put') {
data = JSON.stringify(utils.extraDataMask(triple))
}
return this._encodeKeys(triple).map(key => ({
type: 'put', // no delete in hyperdb so just putting nulls
key: key,
value: data
}))
}

Graph.prototype._encodeKeys = function (triple) {
const encodedTriple = utils.encodeTriple(triple, this._prefixes)
return this._indexKeys.map(key => utils.encodeKey(key, encodedTriple))
}

Graph.prototype._createQuery = function (pattern, options) {
var types = utils.typesFromPattern(pattern)
var preferedIndex = options && options.index
var index = this._findIndex(types, preferedIndex)
const encodedTriple = utils.encodeTriple(pattern, this._prefixes)
var key = utils.encodeKey(index, encodedTriple)
return key
}

Graph.prototype._possibleIndexes = function (types) {
var result = this._indexKeys.filter((key) => {
var matches = 0
return this._indexes[key].every(function (e, i) {
if (types.indexOf(e) >= 0) {
matches++
return true
}
if (matches === types.length) {
return true
}
})
})

result.sort()

return result
}

Graph.prototype._findIndex = function (types, preferedIndex) {
var result = this._possibleIndexes(types)
if (preferedIndex && result.some(r => r === preferedIndex)) {
return preferedIndex
}
return result[0]
}

module.exports = Graph
5 changes: 3 additions & 2 deletions lib/HyperdbReadTransform.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ const Transform = require('readable-stream').Transform
const inherits = require('inherits')
const utils = require('./utils')

function HyperdbReadTransform (db, options) {
function HyperdbReadTransform (db, prefixes, options) {
if (!(this instanceof HyperdbReadTransform)) {
return new HyperdbReadTransform(db, options)
}
var opts = options || {}
this.db = db
this._prefixes = prefixes
this._finished = false
this._count = 0
this._filter = opts.filter
Expand All @@ -33,7 +34,7 @@ HyperdbReadTransform.prototype._transform = function transform (nodes, encoding,
}
var value = nodes[0].value && JSON.parse(nodes[0].value.toString())
if (value === null) return done()
value = Object.assign(value, utils.decodeKey(nodes[0].key))
value = Object.assign(value, utils.decodeKey(nodes[0].key, this._prefixes))
if (!this._filter || this._filter(value)) {
if (this._count >= this._offset) {
this.push(value)
Expand Down
25 changes: 25 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
const HEXSTORE_INDEXES = {
spo: ['subject', 'predicate', 'object'],
pos: ['predicate', 'object', 'subject'],
osp: ['object', 'subject', 'predicate'],
sop: ['subject', 'object', 'predicate'], // [optional]
pso: ['predicate', 'subject', 'object'], // [optional]
ops: ['object', 'predicate', 'subject'] // [optional]
}
const HEXSTORE_INDEXES_REDUCED = {
spo: HEXSTORE_INDEXES.spo,
pos: HEXSTORE_INDEXES.pos,
osp: HEXSTORE_INDEXES.osp
}
const PREFIX_KEY = '@prefix/'
const DEFAULT_PREFIXES = {
_: 'hg://',
foaf: 'http://xmlns.com/foaf/0.1/'
}

module.exports = {
PREFIX_KEY,
DEFAULT_PREFIXES,
HEXSTORE_INDEXES,
HEXSTORE_INDEXES_REDUCED
}
10 changes: 3 additions & 7 deletions lib/prefixes.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
const PREFIX_KEY = '@prefix/'
var constants = require('./constants')

const PREFIX_KEY = constants.PREFIX_KEY
const PREFIX_REGEX = /^(\w+):/
const DEFAULT_PREFIXES = {
_: 'hg://',
foaf: 'http://xmlns.com/foaf/0.1/'
}

function toKey (prefix) {
return PREFIX_KEY + prefix
Expand Down Expand Up @@ -36,8 +34,6 @@ function fromPrefixed (uri, prefixes) {
}

module.exports = {
DEFAULT_PREFIXES,
PREFIX_KEY,
toKey,
fromKey,
fromNodes,
Expand Down
109 changes: 30 additions & 79 deletions lib/utils.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
const Variable = require('./Variable')
const prefixes = require('./prefixes')
const constants = require('./constants')

const defs = {
spo: ['subject', 'predicate', 'object'],
sop: ['subject', 'object', 'predicate'], // [optional]
pos: ['predicate', 'object', 'subject'],
pso: ['predicate', 'subject', 'object'], // [optional]
ops: ['object', 'predicate', 'subject'], // [optional]
osp: ['object', 'subject', 'predicate']
}
const spo = constants.HEXSTORE_INDEXES.spo
const tripleAliasMap = {
s: 'subject',
p: 'predicate',
o: 'object'
}
const defKeys = Object.keys(defs)

function collect (stream, cb) {
var res = []
Expand All @@ -25,45 +18,43 @@ function collect (stream, cb) {

function filterTriple (triple) {
const filtered = {}
defs.spo.forEach((key) => {
spo.forEach((key) => {
if (triple.hasOwnProperty(key)) {
filtered[key] = triple[key]
}
})
return filtered
}

function escapeKeyValue (value) {
function escapeKeyValue (value, prefixMap) {
if (typeof value === 'string' || value instanceof String) {
return prefixes.toPrefixed(value, prefixes.DEFAULT_PREFIXES).replace(/(\/)/g, '%2F')
return prefixes.toPrefixed(value, prefixMap).replace(/(\/)/g, '%2F')
}
return value
}

function unescapeKeyValue (value) {
return prefixes.fromPrefixed(value.replace(/%2F/g, '/'), prefixes.DEFAULT_PREFIXES)
function unescapeKeyValue (value, prefixMap) {
return prefixes.fromPrefixed(value.replace(/%2F/g, '/'), prefixMap)
}

function decodeKey (key) {
const values = key.split('/')
if (values.length < 4) throw new Error('Key is not in triple form')
const order = values[0]
const triple = {}
for (var i = 0; i < 3; i++) {
const k = tripleAliasMap[order[i]]
triple[k] = unescapeKeyValue(values[i + 1])
}
function encodeTriple (triple, prefixMap) {
// var newTriple = Object.assign({}, triple)
spo.forEach((key) => {
if (triple.hasOwnProperty(key)) {
triple[key] = escapeKeyValue(triple[key], prefixMap)
}
})
return triple
}

function encodeKey (key, triple) {
var result = key
var def = defs[key]
var def = constants.HEXSTORE_INDEXES[key]
var i = 0
var value = triple[def[i]]
// need to handle this smarter
while (value) {
result += '/' + escapeKeyValue(value)
result += '/' + value
i += 1
value = triple[def[i]]
}
Expand All @@ -73,48 +64,16 @@ function encodeKey (key, triple) {
return result
}

function encodeKeys (triple) {
return defKeys.map(key => encodeKey(key, triple))
}

function generateBatch (triple, action) {
if (!action) action = 'put'
var data = null
if (action === 'put') {
data = JSON.stringify(extraDataMask(triple))
}
return encodeKeys(triple).map(key => ({
type: 'put', // no delete in hyperdb so just putting nulls
key: key,
value: data
}))
}

function possibleIndexes (types) {
var result = defKeys.filter((key) => {
var matches = 0
return defs[key].every(function (e, i) {
if (types.indexOf(e) >= 0) {
matches++
return true
}
if (matches === types.length) {
return true
}
})
})

result.sort()

return result
}

function findIndex (types, preferedIndex) {
var result = possibleIndexes(types)
if (preferedIndex && result.some(r => r === preferedIndex)) {
return preferedIndex
function decodeKey (key, prefixMap) {
const values = key.split('/')
if (values.length < 4) throw new Error('Key is not in triple form')
const order = values[0]
const triple = {}
for (var i = 0; i < 3; i++) {
const k = tripleAliasMap[order[i]]
triple[k] = unescapeKeyValue(values[i + 1], prefixMap)
}
return result[0]
return triple
}

function typesFromPattern (pattern) {
Expand All @@ -132,16 +91,8 @@ function typesFromPattern (pattern) {
})
}

function createQuery (pattern, options) {
var types = typesFromPattern(pattern)
var preferedIndex = options && options.index
var index = findIndex(types, preferedIndex)
var key = encodeKey(index, pattern)
return key
}

function hasKey (key) {
return defs.spo.indexOf(key) >= 0
return spo.indexOf(key) >= 0
}

function keyIsNotAObject (tripleKey) {
Expand Down Expand Up @@ -229,14 +180,14 @@ function materializer (pattern, data) {
}

module.exports = {
defs,
defKeys,
escapeKeyValue,
unescapeKeyValue,
encodeTriple,
encodeKey,
decodeKey,
typesFromPattern,
filterTriple,
collect,
generateBatch,
createQuery,
extraDataMask,
queryMask,
variablesMask,
Expand Down
Loading

0 comments on commit 1e4ba05

Please sign in to comment.