Skip to content

Commit

Permalink
fix: add/update events based on doc existence in db
Browse files Browse the repository at this point in the history
  • Loading branch information
gr2m authored and capellini committed Jul 18, 2017
1 parent 16f96e5 commit 9c0b592
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 76 deletions.
22 changes: 2 additions & 20 deletions lib/helpers/add-many.js
Expand Up @@ -2,6 +2,7 @@ var clone = require('lodash/clone')
var uuid = require('pouchdb-utils').uuid

var addTimestamps = require('../utils/add-timestamps')
var bulkDocs = require('./db-bulk-docs')

module.exports = function addMany (state, docs, prefix) {
docs = docs.map(function (doc) {
Expand All @@ -16,24 +17,5 @@ module.exports = function addMany (state, docs, prefix) {
})
}

return state.db.bulkDocs(docs)

.then(function (responses) {
return responses.map(function (response, i) {
if (response instanceof Error) {
if (response.status === 409) {
var conflict = new Error('Object with id "' + docs[i]._id + '" already exists')
conflict.name = 'Conflict'
conflict.status = 409
return conflict
} else {
return response
}
}

docs[i]._id = response.id
docs[i]._rev = response.rev
return docs[i]
})
})
return bulkDocs(state, docs)
}
96 changes: 96 additions & 0 deletions lib/helpers/db-bulk-docs.js
@@ -0,0 +1,96 @@
module.exports = bulkDocs

var internals = bulkDocs.internals = {}

// we wrap PouchDB’s .bulkDocs method in order to guarantee that the events get
// emmited before the Promise gets resolved. This is how the rest of Hoodie’s
// APIs behave. Also it avoids confusion that listening to events after adding
// a revision will emit an add event for the previously added document, e.g.
//
// store.add([{_id: 'foo'}, {_id: 'bar'}]).then(function () {
// hoodie.on('add', handler) // called with {_id: 'foo'} & {_id: 'bar'}
// })
//
// With our workaround, the handler will not be called in the example above.
function bulkDocs (state, docs) {
return new Promise(function (resolve, reject) {
var scope = {
wantedRevs: [],
missedChangedDocs: [],
resolve: resolve,
result: [],
docs: docs
}

scope.changeHandler = changeHandler.bind(null, state, scope)
scope.resetHandler = resetHandler.bind(null, state, scope)
state.emitter.on('change', scope.changeHandler)
state.emitter.once('reset', scope.resetHandler)

scope.bulkDocsPromise = state.db.bulkDocs(docs)

.then(function (result) {
if (result.length === 0) {
state.emitter.removeListener('change', scope.changeHandler)
state.emitter.removeListener('reset', scope.resetHandler)
resolve([])
return
}

scope.result = result
.map(internals.toNormalisedError.bind(null, scope))
.map(function (result, i) {
if (!result.rev) {
return result
}

docs[i]._id = result.id
docs[i]._rev = result.rev
return docs[i]
})
scope.wantedRevs = result.map(toRev).filter(Boolean)
})

.catch(reject)
})
}

function toRev (result) {
return result.rev
}

internals.toNormalisedError = function toNormalisedError (scope, result, i) {
if (result instanceof Error) {
if (result.status === 409) {
var conflict = new Error('Document with id "' + scope.docs[i]._id + '" already exists')
conflict.name = 'Conflict'
conflict.status = 409
return conflict
}
}

return result
}

function changeHandler (state, scope, eventName, doc) {
var index = scope.wantedRevs.indexOf(doc._rev)

if (index === -1) {
return
}

scope.wantedRevs.splice(index, 1)

if (scope.wantedRevs.length === 0) {
state.emitter.removeListener('change', scope.changeHandler)
state.emitter.removeListener('reset', scope.resetHandler)
scope.resolve(scope.result)
}
}

function resetHandler (state, scope) {
state.emitter.removeListener('change', scope.changeHandler)
scope.bulkDocsPromise.then(function (result) {
scope.resolve(scope.result)
})
}
58 changes: 58 additions & 0 deletions lib/helpers/db-put.js
@@ -0,0 +1,58 @@
module.exports = put

// we wrap PouchDB’s .put method in order to guarantee that the events get
// emmited before the Promise gets resolved. This is how the rest of Hoodie’s
// APIs behave. Also it avoids confusion that listening to events after adding
// a revision will emit an add event for the previously added document, e.g.
//
// store.add({_id: 'foo'}).then(function () {
// hoodie.on('add', handler) // called with {_id: 'foo'}
// })
//
// With our workaround, the handler will not be called in the example above.
function put (state, doc) {
return new Promise(function (resolve, reject) {
var scope = {
wantedRev: null,
missedChangedDocs: [],
resolve: resolve
}

scope.changeHandler = changeHandler.bind(null, state, scope)
scope.resetHandler = resetHandler.bind(null, state, scope)
state.emitter.on('change', scope.changeHandler)
state.emitter.once('reset', scope.resetHandler)

scope.putPromise = state.db.put(doc)

.then(function (result) {
doc._id = result.id
doc._rev = result.rev
scope.result = doc
scope.wantedRev = result.rev
scope.missedChangedDocs.forEach(scope.changeHandler.bind(null, state, scope, null))
})

.catch(reject)
})
}

function changeHandler (state, scope, _notUsedEventName, doc) {
if (!scope.wantedRev) {
scope.missedChangedDocs.push(doc._rev)
return
}

if (doc._rev === scope.wantedRev) {
state.emitter.removeListener('change', scope.changeHandler)
state.emitter.once('reset', scope.resetHandler)
scope.resolve(scope.result)
}
}

function resetHandler (state, scope) {
state.emitter.removeListener('change', scope.changeHandler)
scope.putPromise.then(function (result) {
scope.resolve(scope.result)
})
}
71 changes: 71 additions & 0 deletions lib/helpers/handle-changes.js
@@ -0,0 +1,71 @@
module.exports = handleChanges

function handleChanges (state) {
var isBootstrapping = true
var changesDuringBootstrapping = []
var knownDocIds = {}

// we have to initiallo load all documents in order to differentiate add from
// change events by populating knownDocIds. This is something we wish would be
// simpler but unfortunately we can’t expose the necessery information with
// ouchDB directly, see https://github.com/pouchdb/pouchdb/pull/6553
state.bootstrap = state.db.allDocs()

.then(function (result) {
isBootstrapping = false

result.rows.forEach(function (row) {
knownDocIds[row.id] = 1
})

changesDuringBootstrapping.forEach(handleChange)
})

// we listen to the changes feed which we use to emit our own events
// if there happen to be events while we are still populating knownDocIds
// then we store the events in changesDuringBootstrapping which are handled
// once the initial bootstrap is done
state.db.changes({
since: 'now',
live: true,
include_docs: true
})
.on('change', function (change) {
if (isBootstrapping) {
changesDuringBootstrapping.push(change)
return
}

handleChange(change)
})

function handleChange (change) {
var doc = change.doc

if (!doc.hoodie) {
doc.hoodie = {}
}

if (change.deleted) {
// ignore deletes for unknown docs
if (!knownDocIds[change.id]) {
return
}

delete knownDocIds[change.id]
state.emitter.emit('remove', doc)
state.emitter.emit('change', 'remove', doc)
return
}

if (knownDocIds[change.id]) {
state.emitter.emit('update', doc)
state.emitter.emit('change', 'update', doc)
return
}

knownDocIds[change.id] = 1
state.emitter.emit('add', doc)
state.emitter.emit('change', 'add', doc)
}
}
20 changes: 0 additions & 20 deletions lib/helpers/start-listen-to-changes.js

This file was deleted.

14 changes: 7 additions & 7 deletions lib/helpers/update-many.js
@@ -1,8 +1,9 @@
var assign = require('lodash/assign')
var PouchDBErrors = require('pouchdb-errors')

var changeObject = require('../utils/change-object')
var addTimestamps = require('../utils/add-timestamps')
var bulkDocs = require('./db-bulk-docs')
var changeObject = require('../utils/change-object')
var toId = require('../utils/to-id')

var findMany = require('./find-many')
Expand Down Expand Up @@ -39,7 +40,6 @@ module.exports = function updateMany (state, array, change, prefix) {
if (typeof passedDoc !== 'object') {
return PouchDBErrors.NOT_AN_OBJECT
}

return assign(doc, passedDoc, {_id: doc._id, _rev: doc._rev, hoodie: doc.hoodie})
})
})
Expand All @@ -50,13 +50,13 @@ module.exports = function updateMany (state, array, change, prefix) {
return !(doc instanceof Error)
})
validObjects.forEach(addTimestamps)
return state.db.bulkDocs(validObjects)
return bulkDocs(state, validObjects)
})

.then(function (responses) {
responses.forEach(function (response) {
var index = ids.indexOf(response.id)
docs[index]._rev = response.rev
.then(function (updatedDocs) {
updatedDocs.forEach(function (doc) {
var index = ids.indexOf(doc._id)
docs[index] = doc
})

return docs
Expand Down
8 changes: 2 additions & 6 deletions lib/helpers/update-one.js
Expand Up @@ -6,6 +6,7 @@ var changeObject = require('../utils/change-object')
var addTimestamps = require('../utils/add-timestamps')

var findOne = require('./find-one')
var put = require('./db-put')

module.exports = function updateOne (state, idOrDoc, change, prefix) {
var doc
Expand All @@ -25,11 +26,6 @@ module.exports = function updateOne (state, idOrDoc, change, prefix) {

.then(function (_doc) {
doc = _doc
return state.db.put(addTimestamps(doc))
})

.then(function (response) {
doc._rev = response.rev
return doc
return put(state, addTimestamps(doc))
})
}
12 changes: 3 additions & 9 deletions lib/remove-all.js
@@ -1,5 +1,6 @@
var isntDesignDoc = require('./utils/isnt-design-doc')
var addTimestamps = require('./utils/add-timestamps')
var bulkDocs = require('./helpers/db-bulk-docs')
var isntDesignDoc = require('./utils/isnt-design-doc')

module.exports = removeAll

Expand Down Expand Up @@ -42,12 +43,5 @@ function removeAll (state, prefix, filter) {
})
})

.then(state.db.bulkDocs.bind(state.db))

.then(function (results) {
return results.map(function (result, i) {
docs[i]._rev = result.rev
return docs[i]
})
})
.then(bulkDocs.bind(null, state))
}
11 changes: 8 additions & 3 deletions lib/reset.js
@@ -1,17 +1,22 @@
module.exports = reset

var handleChanges = require('./helpers/handle-changes')
var disconnect = require('./disconnect')
var startListenToChanges = require('./helpers/start-listen-to-changes')

function reset (state) {
return disconnect(state)
return state.bootstrap

.then(function () {
return disconnect(state)
})

.then(function () {
return state.db.destroy()
})

.then(function () {
state.emitter.emit('reset')
state.db = new state.PouchDB(state.dbName)
startListenToChanges(state)
handleChanges(state)
})
}

0 comments on commit 9c0b592

Please sign in to comment.