Skip to content

Commit

Permalink
fix TxStateSet
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Aug 13, 2015
1 parent e181f09 commit 0aa19bb
Showing 1 changed file with 86 additions and 62 deletions.
148 changes: 86 additions & 62 deletions lib/txstateset.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,25 @@ TxStateSet.prototype._makeTxRecord = function (txid, blockchainState) {
/**
* @param {Object[]} txRecords
* @param {Blockchain} blockchainState
* @return {Promise.<Object[]>}
* @return {Promise.<?{height: number, hash: string}>}
*/
TxStateSet.prototype._refreshTxRecords = function (txRecords, blockchainState) {
var self = this

return new Promise(function (resolve, reject) {
(function maybeRefresh (i) {
var lastConfirmedHeight = null

function maybeRefresh (i) {
if (i >= txRecords.length) {
return resolve()
return resolve(null)
}

if (lastConfirmedHeight !== null && lastConfirmedHeight > txRecords[i].blockHeight) {
var obj = {
height: txRecords[i].blockHeight,
hash: txRecords[i].blockHash
}
return resolve(obj)
}

var promise = Promise.resolve(true) // not confirmed need refresh op.
Expand All @@ -134,17 +144,18 @@ TxStateSet.prototype._refreshTxRecords = function (txRecords, blockchainState) {
promise
.then(function (needRefresh) {
if (!needRefresh) {
return resolve()
lastConfirmedHeight = txRecords[i].blockHeight
return
}

return self._makeTxRecord(txRecords[i].txid, blockchainState)
.then(function (txr) {
txRecords[i] = txr
maybeRefresh(i + 1)
})
.then(function (txr) { txRecords[i] = txr })
})
.then(function () { maybeRefresh(i + 1) })
.catch(reject)
})(0)
}

maybeRefresh(0)
})
}

Expand All @@ -162,71 +173,84 @@ TxStateSet.prototype.sync = function (blockchainState, addresses) {
newTxSS.latest = _.cloneDeep(self.latest)
newTxSS.oldTxSS = self

if (newTxSS.trackedAddresses.length === 0) {
return Promise.resolve(newTxSS)
}

return new Promise(function (resolve, reject) {
(function update () {
var queryOpts = {
from: _.get(newTxSS.latest, 'height', 0)
var recordsAlreadyRefreshed = false

/**
* @param {Array.<{txid: string, height: ?number}>}
* @return {Promise}
*/
function addNewRecordsFromTransactions (transactions) {
var existTxIds = _.pluck(newTxSS.txRecords, 'txid')
var txIds = _.pluck(transactions, 'txid')
var newTxIds = _.without.bind(null, txIds).apply(null, existTxIds)
return Promise.map(newTxIds, function (txid) {
return self._makeTxRecord(txid, blockchainState)
.then(function (record) { newTxSS.txRecords.push(record) })
})
}

/**
* @return {Promise}
*/
function update () {
var queryOpts = {}
if (newTxSS.latest !== null) {
queryOpts.from = newTxSS.latest.hash
}

blockchainState.addressesQuery(newTxSS.trackedAddresses, queryOpts)
return blockchainState.addressesQuery(self.trackedAddresses, queryOpts)
.then(function (data) {
if (data.latest.height <= _.get(newTxSS.latest, 'height') &&
data.latest.hash !== _.get(newTxSS.latest, 'hash')) {
throw new errors.Blockchain().HeaderNotFound()
}

var dataTxIds = _.pluck(data.transactions, 'txid')

// remove records that not in blockchain anymore
newTxSS.txRecords = newTxSS.txRecords.filter(function (record) {
if (record.blockHeight <= data.latest.height) {
return true
return Promise.try(function () {
if (data.latest.hash !== _.get(newTxSS.latest, 'hash') &&
recordsAlreadyRefreshed === false) {
return self._refreshTxRecords(newTxSS.txRecords, blockchainState)
}

return dataTxIds.indexOf(record.txid) !== -1
})

return Promise.resolve()
.then(function () {
if (data.latest.hash === _.get(newTxSS.latest, 'hash')) {
return
}

// update blockHash and blockHeight in txRecords if latest changed
return self._refreshTxRecords(newTxSS.txRecords, blockchainState)
})
.then(function () {
// create records for new transactions
var existTxIds = _.pluck(newTxSS.txRecords, 'txid')
var newTxIds = _.without.bind(null, dataTxIds).apply(null, existTxIds)
return Promise.map(newTxIds, function (txid) {
return self._makeTxRecord(txid, blockchainState)
})
})
.then(function (newTxRecords) {
// re-sort records
newTxSS.txRecords = _.sortBy(newTxSS.txRecords.concat(newTxRecords), function (row) {
return row.blockHeight === undefined ? -Infinity : -row.blockHeight
})
newTxSS.latest = data.latest
resolve(newTxSS)
})
.then(function () {
newTxSS.latest = data.latest
return addNewRecordsFromTransactions(data.transactions)
})
})
.catch(errors.Blockchain.HeaderNotFound, function (err) {
if (newTxSS.latest === null || newTxSS.latest.height === 0) {
if (newTxSS.latest === null) {
throw err
}

newTxSS.latest.hash = null
newTxSS.latest.height -= 10
update()
return this._refreshTxRecords(newTxSS.txRecords, blockchainState)
.then(function (newLatest) {
recordsAlreadyRefreshed = true
newTxSS.latest = newLatest
return update()
})
})
.catch(reject)
})()
}

var updateProcess = Promise.resolve()
if (self.trackedAddresses.length > 0) {
updateProcess = update()
}

if (addresses.length > 0) {
updateProcess = updateProcess
.then(function () {
return blockchainState.addressesQuery(addresses)
})
.then(function (data) {
newTxSS.latest = data.latest
return addNewRecordsFromTransactions(data.transactions)
})
}

updateProcess
.then(function () {
// re-sort records
newTxSS.txRecords = _.sortBy(newTxSS.txRecords, function (row) {
return row.blockHeight === undefined ? -Infinity : -row.blockHeight
})
return newTxSS
})
.then(resolve, reject)
})
}

Expand Down

0 comments on commit 0aa19bb

Please sign in to comment.