Skip to content

Commit

Permalink
add thunky to enable some use before db is ready
Browse files Browse the repository at this point in the history
  • Loading branch information
e-e-e committed Sep 9, 2018
1 parent 8492c87 commit 9ca8f5e
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 80 deletions.
54 changes: 32 additions & 22 deletions index.js
@@ -1,5 +1,6 @@
const hyperdb = require('hyperdb')
const stream = require('readable-stream')
const thunky = require('thunky')
const pump = require('pump')
const inherits = require('inherits')
const events = require('events')
Expand Down Expand Up @@ -37,27 +38,29 @@ function Graph (storage, key, opts) {
? constants.HEXSTORE_INDEXES_REDUCED
: constants.HEXSTORE_INDEXES
this._indexKeys = Object.keys(this._indexes)

this.isReady = false
this.ready = thunky(this._ready.bind(this))
this.db.on('error', (e) => {
this.emit('error', e)
})
this.db.on('ready', (e) => {
if (utils.isNewDatabase(this.db)) {
this._onNew((err) => {
if (err) return this.emit('error', err)
this.emit('ready', e)
})
} else {
this._onInit((err) => {
if (err) return this.emit('error', err)
this.emit('ready', e)
})
}
this.db.ready(() => {
this.ready(() => {
this.emit('ready')
})
})
}

inherits(Graph, events.EventEmitter)

Graph.prototype._ready = function (cb) {
this.isReady = true
if (utils.isNewDatabase(this.db)) {
this._onNew(cb)
} else {
this._onInit(cb)
}
}

Graph.prototype._onNew = function (cb) {
this._version = pkg.version
const metadata = [
Expand Down Expand Up @@ -169,17 +172,20 @@ Graph.prototype.getStream = function (triple, opts) {

Graph.prototype.get = function (triple, opts, callback) {
if (typeof opts === 'function') return this.get(triple, undefined, opts)
utils.collect(this.getStream(triple, opts), callback)
this.ready(() => {
utils.collect(this.getStream(triple, opts), callback)
})
}

function doAction (action) {
return function (triples, callback) {
if (!triples) return callback(new Error('Must pass triple'))
let entries = (!triples.reduce) ? [triples] : triples
entries = entries.reduce((prev, triple) => {
return prev.concat(this._generateBatch(triple, action))
}, [])
this.db.batch(entries.reverse(), callback)
this.ready(() => {
let entries = (!triples.reduce) ? [triples] : triples
entries = entries.reduce((prev, triple) => {
return prev.concat(this._generateBatch(triple, action))
}, [])
this.db.batch(entries.reverse(), callback)
})
}
}

Expand Down Expand Up @@ -245,15 +251,19 @@ Graph.prototype.search = function (query, options, callback) {
callback = options
options = undefined
}
utils.collect(this.searchStream(query, options), callback)
this.ready(() => {
utils.collect(this.searchStream(query, options), callback)
})
}

Graph.prototype.queryStream = function (query) {
return new SparqlIterator(query, { hypergraph: this })
}

Graph.prototype.query = function (query, callback) {
utils.collect(this.queryStream(query), callback)
this.ready(() => {
utils.collect(this.queryStream(query), callback)
})
}

Graph.prototype.close = function (callback) {
Expand Down

0 comments on commit 9ca8f5e

Please sign in to comment.