diff --git a/app/lib/computation/blockGenerator.js b/app/lib/computation/blockGenerator.js index bb3fb42a2..71041500b 100644 --- a/app/lib/computation/blockGenerator.js +++ b/app/lib/computation/blockGenerator.js @@ -291,7 +291,7 @@ function BlockGenerator(mainContext, prover) { if (block.number > 0) { try { // Will throw an error if not enough links - yield Q.nbind(mainContext.checkHaveEnoughLinks, mainContext)(newcomer, newLinks); + yield mainContext.checkHaveEnoughLinks(newcomer, newLinks); // This one does not throw but returns a boolean let isOut = yield rules.HELPERS.isOver3Hops(newcomer, newLinks, realNewcomers, current, conf, dal); if (isOut) { diff --git a/app/lib/computation/blockchainContext.js b/app/lib/computation/blockchainContext.js index e532fc4c8..fe376415c 100644 --- a/app/lib/computation/blockchainContext.js +++ b/app/lib/computation/blockchainContext.js @@ -1,5 +1,4 @@ "use strict"; -const async = require('async'); const _ = require('underscore'); const co = require('co'); const Q = require('q'); @@ -121,78 +120,58 @@ function BlockchainContext() { }; const saveBlockData = (current, block) => co(function*() { - yield Q.nfcall(updateBlocksComputedVars, current, block); + yield updateBlocksComputedVars(current, block); // Saves the block (DAL) yield Q.nbind(dal.saveBlock, dal, block); yield Q.nbind(that.saveParametersForRootBlock, that, block); // Create/Update members (create new identities if do not exist) yield Q.nfcall(updateMembers, block); // Create/Update certifications - yield Q.nfcall(updateCertifications, block); + yield that.updateCertifications(block); // Save links - yield updateLinksForBlocks([block], dal.getBlockOrNull.bind(dal)); + yield that.updateLinksForBlocks([block], dal.getBlockOrNull.bind(dal)); // Compute obsolete links - yield Q.nfcall(computeObsoleteLinks, block); + yield that.computeObsoleteLinks(block); // Compute obsolete memberships (active, joiner) - yield computeObsoleteMemberships(block); + yield that.computeObsoleteMemberships(block); // Update consumed sources & create new ones - yield Q.nfcall(updateTransactionSources, block); + yield that.updateSources(block); // Delete eventually present transactions - yield Q.nfcall(deleteTransactions, block); + yield that.deleteTransactions(block); return block; }); - function updateBlocksComputedVars (current, block, done) { + const updateBlocksComputedVars = (current, block) => co(function*() { if (current) { logger.trace('Block median time +%s', block.medianTime - current.medianTime); - logger.trace('Block time ' + ((block.time - current.time) >= 0 ? '+' : '') + '%d', block.time - current.time); + logger.trace('Block time ' + + ((block.time - current.time) >= 0 ? '+' : '') + + '%d', block.time - current.time); } // Monetary Mass update if (current) { - block.monetaryMass = (current.monetaryMass || 0) + (block.dividend || 0) * Math.pow(10, block.unitbase || 0) * block.membersCount; + block.monetaryMass = (current.monetaryMass || 0) + + (block.dividend || 0) * Math.pow(10, block.unitbase || 0) * block.membersCount; } // UD Time update if (block.number == 0) { block.UDTime = block.medianTime; // Root = first UD time block.dividend = null; - done(); } else if (block.dividend) { - async.waterfall([ - function (next) { - async.parallel({ - last: function (callback) { - dal.lastUDBlock().then((res) => callback(null, res)).catch(callback); - }, - root: function (callback) { - dal.getBlock(0, callback); - } - }, next); - }, - function (res, next) { - var last = res.last; - var root = res.root; - block.UDTime = conf.dt + (last ? last.UDTime : root.medianTime); - next(); - } - ], done); + const result = yield { + last: dal.lastUDBlock(), + root: dal.getBlock(0) + }; + block.UDTime = conf.dt + (result.last ? result.last.UDTime : result.root.medianTime); } else { block.dividend = null; block.UDTime = current.UDTime; - done(); } - } + }); this.updateMembers = updateMembers; - this.updateCertifications = updateCertifications; - this.computeObsoleteLinks = computeObsoleteLinks; - this.computeObsoleteMemberships = computeObsoleteMemberships; - this.updateTransactionSourcesForBlocks = updateTransactionSourcesForBlocks; - this.updateCertificationsForBlocks = updateCertificationsForBlocks; - this.updateMembershipsForBlocks = updateMembershipsForBlocks; - this.updateLinksForBlocks = updateLinksForBlocks; - this.updateTransactionsForBlocks = updateTransactionsForBlocks; let cleanRejectedIdentities = (idty) => co(function *() { yield dal.removeUnWrittenWithPubkey(idty.pubkey); @@ -367,39 +346,27 @@ function BlockchainContext() { * @param block * @param done */ - function updateCertifications (block, done) { - async.forEachSeries(block.certifications, function(inlineCert, callback){ - var cert = Certification.statics.fromInline(inlineCert); - var from_uid, to_uid; - async.waterfall([ - function (next) { - dal.getWritten(cert.to, next); - }, - function (idty, next){ - cert.target = new Identity(idty).getTargetHash(); - to_uid = idty.uid; - dal.getWritten(cert.from, next); - }, - function (idty, next){ - from_uid = idty.uid; - dal.existsCert(cert).then(_.partial(next, null)).catch(next); - }, - function (existing, next) { - if (existing) { - cert = existing; - } - cert.written_block = block.number; - cert.written_hash = block.hash; - cert.from_uid = from_uid; - cert.to_uid = to_uid; - cert.linked = true; - dal.officializeCertification(new Certification(cert)) - .then(_.partial(next, null)) - .catch(next); - } - ], callback); - }, done); - } + this.updateCertifications = (block) => co(function*() { + for (let c in block.certifications) { + const inlineCert = block.certifications[c]; + let cert = Certification.statics.fromInline(inlineCert); + let idty = yield Q.nbind(dal.getWritten, dal, cert.to); + cert.target = new Identity(idty).getTargetHash(); + const to_uid = idty.uid; + idty = yield Q.nbind(dal.getWritten, dal, cert.from); + const from_uid = idty.uid; + const existing = yield dal.existsCert(cert); + if (existing) { + cert = existing; + } + cert.written_block = block.number; + cert.written_hash = block.hash; + cert.from_uid = from_uid; + cert.to_uid = to_uid; + cert.linked = true; + yield dal.officializeCertification(new Certification(cert)); + } + }); that.saveParametersForRootBlock = (block, done) => { if (block.parameters) { @@ -434,129 +401,91 @@ function BlockchainContext() { } }; - function computeObsoleteLinks (block, done) { - async.waterfall([ - function (next){ - dal.obsoletesLinks(block.medianTime - conf.sigValidity).then(function() { - next(); - }).catch(next); - }, - function (next){ - dal.getMembers(next); - }, - function (members, next){ - // If a member no more have enough signatures, he has to be kicked - async.forEachSeries(members, function(idty, callback){ - var pubkey = idty.pubkey; - async.waterfall([ - function (nextOne){ - that.checkHaveEnoughLinks(pubkey, {}, function (err) { - nextOne(null, err); - }); - }, - function (notEnoughLinks, nextOne){ - dal.setKicked(pubkey, new Identity(idty).getTargetHash(), notEnoughLinks ? true : false, nextOne); - } - ], callback); - }, next); + this.computeObsoleteLinks = (block) => co(function*() { + yield dal.obsoletesLinks(block.medianTime - conf.sigValidity); + const members = yield Q.nbind(dal.getMembers, dal); + for (const m in members) { + const idty = members[m]; + try { + yield that.checkHaveEnoughLinks(idty.pubkey, {}); + } catch (notEnoughLinks) { + yield Q.nbind(dal.setKicked, dal, idty.pubkey, new Identity(idty).getTargetHash(), + notEnoughLinks ? true : false); } - ], done); - } + } + }); - this.checkHaveEnoughLinks = function(target, newLinks, done) { - async.waterfall([ - function (next){ - dal.getValidLinksTo(target).then(_.partial(next, null)).catch(next); - }, - function (links, next){ - var count = links.length; - if (newLinks[target] && newLinks[target].length) - count += newLinks[target].length; - next(count < conf.sigQty && 'Key ' + target + ' does not have enough links (' + count + '/' + conf.sigQty + ')'); - } - ], done); - }; + this.checkHaveEnoughLinks = (target, newLinks) => co(function*() { + const links = yield dal.getValidLinksTo(target); + let count = links.length; + if (newLinks[target] && newLinks[target].length) + count += newLinks[target].length; + if (count < conf.sigQty) + throw 'Key ' + target + ' does not have enough links (' + count + '/' + conf.sigQty + ')'; + }); - function computeObsoleteMemberships (block) { - return co(function *() { - let lastForKick = yield dal.getMembershipExcludingBlock(block, conf.msValidity); - let lastForRevoke = yield dal.getMembershipRevocatingBlock(block, conf.msValidity * constants.REVOCATION_FACTOR); - if (lastForKick) { - yield dal.kickWithOutdatedMemberships(lastForKick.number); - } - if (lastForRevoke) { - yield dal.revokeWithOutdatedMemberships(lastForRevoke.number); + this.computeObsoleteMemberships = (block) => co(function *() { + let lastForKick = yield dal.getMembershipExcludingBlock(block, conf.msValidity); + let lastForRevoke = yield dal.getMembershipRevocatingBlock(block, conf.msValidity * constants.REVOCATION_FACTOR); + if (lastForKick) { + yield dal.kickWithOutdatedMemberships(lastForKick.number); + } + if (lastForRevoke) { + yield dal.revokeWithOutdatedMemberships(lastForRevoke.number); + } + }); + + this.updateSources = (block) => co(function*() { + if (block.dividend) { + const idties = yield Q.nfcall(dal.getMembers); + for (const i in idties) { + const idty = idties[i]; + yield dal.saveSource(new Source({ + 'type': 'D', + 'number': block.number, + 'time': block.medianTime, + 'identifier': idty.pubkey, + 'noffset': block.number, + 'block_hash': block.hash, + 'amount': block.dividend, + 'base': block.unitbase, + 'conditions': 'SIG(' + idty.pubkey + ')', // Only this pubkey can unlock its UD + 'consumed': 0 + })); } - }); - } - function updateTransactionSources (block, done) { - async.parallel([ - function (next) { - if (block.dividend) { - async.waterfall([ - function (nextOne) { - dal.getMembers(nextOne); - }, - function (idties, nextOne) { - async.forEachSeries(idties, function (idty, callback) { - dal.saveSource(new Source({ - 'type': 'D', - 'number': block.number, - 'time': block.medianTime, - 'identifier': idty.pubkey, - 'noffset': block.number, - 'block_hash': block.hash, - 'amount': block.dividend, - 'base': block.unitbase, - 'conditions': 'SIG(' + idty.pubkey + ')', // Only this pubkey can unlock its UD - 'consumed': 0 - })).then(_.partial(callback, null)).catch(callback); - }, nextOne); - } - ], next); + for (const t in block.transactions) { + const obj = block.transactions[t]; + obj.version = constants.DOCUMENTS_VERSION; + obj.currency = block.currency; + obj.issuers = obj.signatories; + const tx = new Transaction(obj); + const txObj = tx.getTransaction(); + const txHash = tx.getHash(true); + for (const i in txObj.inputs) { + const input = txObj.inputs[i]; + yield dal.setConsumedSource(input.identifier, input.noffset); + } + + let index = 0; + for (const o in txObj.outputs) { + const output = txObj.outputs[o]; + yield dal.saveSource(new Source({ + 'type': 'T', + 'number': block.number, + 'time': block.medianTime, + 'identifier': txHash, + 'noffset': index++, + 'block_hash': block.hash, + 'amount': output.amount, + 'base': output.base, + 'conditions': output.conditions, + 'consumed': 0 + })); } - else next(); - }, - function (next) { - async.forEachSeries(block.transactions, function (json, callback) { - var obj = json; - obj.version = constants.DOCUMENTS_VERSION; - obj.currency = block.currency; - obj.issuers = json.signatories; - var tx = new Transaction(obj); - var txObj = tx.getTransaction(); - var txHash = tx.getHash(true); - async.parallel([ - function (nextOne) { - async.forEachSeries(txObj.inputs, function (input, callback2) { - dal.setConsumedSource(input.identifier, input.noffset).then(_.partial(callback2, null)).catch(callback2); - }, nextOne); - }, - function (nextOne) { - let index = 0; - async.forEachSeries(txObj.outputs, function (output, callback2) { - dal.saveSource(new Source({ - 'type': 'T', - 'number': block.number, - 'time': block.medianTime, - 'identifier': txHash, - 'noffset': index++, - 'block_hash': block.hash, - 'amount': output.amount, - 'base': output.base, - 'conditions': output.conditions, - 'consumed': 0 - })).then(_.partial(callback2, null)).catch(callback2); - }, nextOne); - } - ], callback); - }, next); } - ], function (err) { - done(err); - }); - } + } + }); /** * New method for CREATING memberships found in blocks. @@ -564,36 +493,34 @@ function BlockchainContext() { * @param blocks * @returns {*} */ - function updateMembershipsForBlocks(blocks) { - return co(function *() { - let memberships = []; - let types = { - 'join': 'joiners', - 'active': 'actives', - 'leave': 'leavers' - }; - for (let i = 0, len = blocks.length; i < len; i++) { - let block = blocks[i]; - _.keys(types).forEach(function(type){ - let msType = type == 'leave' ? 'out' : 'in'; - let field = types[type]; - let mss = block[field]; - for (let j = 0, len2 = mss.length; j < len2; j++) { - let msRaw = mss[j]; - var ms = Membership.statics.fromInline(msRaw, type == 'leave' ? 'OUT' : 'IN', block.currency); - ms.membership = msType.toUpperCase(); - ms.written = true; - ms.written_number = block.number; - ms.type = type; - ms.hash = String(hashf(ms.getRawSigned())).toUpperCase(); - ms.idtyHash = (hashf(ms.userid + ms.certts + ms.issuer) + "").toUpperCase(); - memberships.push(ms); - } - }); - } - return dal.updateMemberships(memberships); - }); - } + this.updateMembershipsForBlocks = (blocks) => co(function *() { + let memberships = []; + let types = { + 'join': 'joiners', + 'active': 'actives', + 'leave': 'leavers' + }; + for (let i = 0, len = blocks.length; i < len; i++) { + let block = blocks[i]; + _.keys(types).forEach(function(type){ + let msType = type == 'leave' ? 'out' : 'in'; + let field = types[type]; + let mss = block[field]; + for (let j = 0, len2 = mss.length; j < len2; j++) { + let msRaw = mss[j]; + var ms = Membership.statics.fromInline(msRaw, type == 'leave' ? 'OUT' : 'IN', block.currency); + ms.membership = msType.toUpperCase(); + ms.written = true; + ms.written_number = block.number; + ms.type = type; + ms.hash = String(hashf(ms.getRawSigned())).toUpperCase(); + ms.idtyHash = (hashf(ms.userid + ms.certts + ms.issuer) + "").toUpperCase(); + memberships.push(ms); + } + }); + } + return dal.updateMemberships(memberships); + }); /** * New method for CREATING links found in blocks. @@ -602,35 +529,33 @@ function BlockchainContext() { * @param getBlockOrNull * @returns {*} */ - function updateLinksForBlocks(blocks, getBlockOrNull) { - return co(function *() { - let links = []; - for (let i = 0, len = blocks.length; i < len; i++) { - let block = blocks[i]; - for (let j = 0, len2 = block.certifications.length; j < len2; j++) { - let inlineCert = block.certifications[j]; - let cert = Certification.statics.fromInline(inlineCert); - let tagBlock = block; - if (block.number > 0) { - tagBlock = yield getBlockOrNull(cert.block_number); - } - let fromIdty = yield dal.getWrittenIdtyByPubkey(cert.from); - let toIdty = yield dal.getWrittenIdtyByPubkey(cert.to); - links.push({ - source: cert.from, - target: cert.to, - from_wotb_id: fromIdty.wotb_id, - to_wotb_id: toIdty.wotb_id, - timestamp: tagBlock.medianTime, - block_number: block.number, - block_hash: block.hash, - obsolete: false - }); + this.updateLinksForBlocks = (blocks, getBlockOrNull) => co(function *() { + let links = []; + for (let i = 0, len = blocks.length; i < len; i++) { + let block = blocks[i]; + for (let j = 0, len2 = block.certifications.length; j < len2; j++) { + let inlineCert = block.certifications[j]; + let cert = Certification.statics.fromInline(inlineCert); + let tagBlock = block; + if (block.number > 0) { + tagBlock = yield getBlockOrNull(cert.block_number); } + let fromIdty = yield dal.getWrittenIdtyByPubkey(cert.from); + let toIdty = yield dal.getWrittenIdtyByPubkey(cert.to); + links.push({ + source: cert.from, + target: cert.to, + from_wotb_id: fromIdty.wotb_id, + to_wotb_id: toIdty.wotb_id, + timestamp: tagBlock.medianTime, + block_number: block.number, + block_hash: block.hash, + obsolete: false + }); } - return dal.updateLinks(links); - }); - } + } + return dal.updateLinks(links); + }); /** * New method for CREATING transactions found in blocks. @@ -638,25 +563,23 @@ function BlockchainContext() { * @param blocks * @returns {*} */ - function updateTransactionsForBlocks(blocks) { - return co(function *() { - let txs = []; - for (let i = 0, len = blocks.length; i < len; i++) { - let block = blocks[i]; - txs = txs.concat(block.transactions.map((tx) => { - _.extend(tx, { - block_number: block.number, - time: block.medianTime, - currency: block.currency, - written: true, - removed: false - }); - return new Transaction(tx); - })); - } - return dal.updateTransactions(txs); - }); - } + this.updateTransactionsForBlocks = (blocks) => co(function *() { + let txs = []; + for (let i = 0, len = blocks.length; i < len; i++) { + let block = blocks[i]; + txs = txs.concat(block.transactions.map((tx) => { + _.extend(tx, { + block_number: block.number, + time: block.medianTime, + currency: block.currency, + written: true, + removed: false + }); + return new Transaction(tx); + })); + } + return dal.updateTransactions(txs); + }); /** * New method for CREATING certifications found in blocks. @@ -664,34 +587,32 @@ function BlockchainContext() { * @param blocks * @returns {*} */ - function updateCertificationsForBlocks(blocks) { - return co(function *() { - let certs = []; - for (let i = 0, len = blocks.length; i < len; i++) { - let block = blocks[i]; - for (let j = 0, len2 = block.certifications.length; j < len2; j++) { - let inlineCert = block.certifications[j]; - var cert = Certification.statics.fromInline(inlineCert); - let to = yield dal.getWrittenIdtyByPubkey(cert.to); - let to_uid = to.uid; - cert.target = new Identity(to).getTargetHash(); - let from = yield dal.getWrittenIdtyByPubkey(cert.from); - let from_uid = from.uid; - let existing = yield dal.existsCert(cert); - if (existing) { - cert = existing; - } - cert.written_block = block.number; - cert.written_hash = block.hash; - cert.from_uid = from_uid; - cert.to_uid = to_uid; - cert.linked = true; - certs.push(cert); + this.updateCertificationsForBlocks = (blocks) => co(function *() { + let certs = []; + for (let i = 0, len = blocks.length; i < len; i++) { + let block = blocks[i]; + for (let j = 0, len2 = block.certifications.length; j < len2; j++) { + let inlineCert = block.certifications[j]; + var cert = Certification.statics.fromInline(inlineCert); + let to = yield dal.getWrittenIdtyByPubkey(cert.to); + let to_uid = to.uid; + cert.target = new Identity(to).getTargetHash(); + let from = yield dal.getWrittenIdtyByPubkey(cert.from); + let from_uid = from.uid; + let existing = yield dal.existsCert(cert); + if (existing) { + cert = existing; } + cert.written_block = block.number; + cert.written_hash = block.hash; + cert.from_uid = from_uid; + cert.to_uid = to_uid; + cert.linked = true; + certs.push(cert); } - return dal.updateCertifications(certs); - }); - } + } + return dal.updateCertifications(certs); + }); /** * New method for CREATING sources found in transactions of blocks. @@ -699,56 +620,54 @@ function BlockchainContext() { * @param blocks * @returns {*} */ - function updateTransactionSourcesForBlocks(blocks, dividends) { - return co(function *() { - let sources = dividends; - for (let i = 0, len = blocks.length; i < len; i++) { - let block = blocks[i]; - // Transactions - for (let j = 0, len2 = block.transactions.length; j < len2; j++) { - let json = block.transactions[j]; - let obj = json; - obj.version = constants.DOCUMENTS_VERSION; - obj.currency = block.currency; - obj.issuers = json.signatories; - let tx = new Transaction(obj); - let txObj = tx.getTransaction(); - let txHash = tx.getHash(true); - sources = sources.concat(txObj.inputs.map((input) => _.extend({ toConsume: true }, input))); - sources = sources.concat(txObj.outputs.map((output, index) => _.extend({ - toConsume: false - }, { - 'type': 'T', - 'number': block.number, - 'block_hash': block.hash, - 'fingerprint': txHash, - 'amount': output.amount, - 'base': output.base, - 'consumed': false, - 'identifier': txHash, - 'noffset': index, - 'conditions': output.conditions - }))); - } - } - try { - let res = yield dal.updateSources(sources); - return res; - } catch (e) { - throw e; + this.updateTransactionSourcesForBlocks = (blocks, dividends) => co(function *() { + let sources = dividends; + for (let i = 0, len = blocks.length; i < len; i++) { + let block = blocks[i]; + // Transactions + for (let j = 0, len2 = block.transactions.length; j < len2; j++) { + let json = block.transactions[j]; + let obj = json; + obj.version = constants.DOCUMENTS_VERSION; + obj.currency = block.currency; + obj.issuers = json.signatories; + let tx = new Transaction(obj); + let txObj = tx.getTransaction(); + let txHash = tx.getHash(true); + sources = sources.concat(txObj.inputs.map((input) => _.extend({ toConsume: true }, input))); + sources = sources.concat(txObj.outputs.map((output, index) => _.extend({ + toConsume: false + }, { + 'type': 'T', + 'number': block.number, + 'block_hash': block.hash, + 'fingerprint': txHash, + 'amount': output.amount, + 'base': output.base, + 'consumed': false, + 'identifier': txHash, + 'noffset': index, + 'conditions': output.conditions + }))); } - }); - } + } + try { + let res = yield dal.updateSources(sources); + return res; + } catch (e) { + throw e; + } + }); - function deleteTransactions (block, done) { - async.forEachSeries(block.transactions, function (json, callback) { - var obj = json; + this.deleteTransactions = (block) => co(function*() { + for (const t in block.transactions) { + var obj = block.transactions[t]; obj.version = constants.DOCUMENTS_VERSION; obj.currency = block.currency; - obj.issuers = json.signatories; + obj.issuers = obj.signatories; var tx = new Transaction(obj); var txHash = tx.getHash(); - dal.removeTxByHash(txHash).then(_.partial(callback, null)).catch(callback); - }, done); - } + yield dal.removeTxByHash(txHash); + } + }); } diff --git a/app/service/BlockchainService.js b/app/service/BlockchainService.js index 3ad3be2a9..f68493fae 100644 --- a/app/service/BlockchainService.js +++ b/app/service/BlockchainService.js @@ -523,7 +523,11 @@ function BlockchainService () { async.waterfall([ function (next){ // Compute obsolete links - mainContext.computeObsoleteLinks(block, next); + mainContext.computeObsoleteLinks(block) + .then(function() { + next(); + }) + .catch(next); }, function (next){ // Compute obsolete memberships (active, joiner)