From 104433ca17a82b74c983687ed4e4de2b64faff9f Mon Sep 17 00:00:00 2001 From: JinGyeong Jeong Date: Tue, 2 Jul 2019 18:32:32 +0900 Subject: [PATCH] Use DB transaction when create pending txs --- src/models/logic/transaction.ts | 6 +++-- src/worker/index.ts | 45 ++++++++++++++++++++++++--------- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/models/logic/transaction.ts b/src/models/logic/transaction.ts index 27f3621..c779ace 100644 --- a/src/models/logic/transaction.ts +++ b/src/models/logic/transaction.ts @@ -394,7 +394,8 @@ export async function removePendings(hashes: H256[]): Promise { } export async function removeOutdatedPendings( - updatedTransactions: SignedTransaction[] + updatedTransactions: SignedTransaction[], + options: { transaction?: Sequelize.Transaction } = {} ): Promise { try { await models.Transaction.destroy({ @@ -419,7 +420,8 @@ export async function removeOutdatedPendings( })) } ] - } + }, + transaction: options.transaction }); } catch (err) { console.error(err); diff --git a/src/worker/index.ts b/src/worker/index.ts index fae1745..df4832a 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -118,8 +118,6 @@ export default class Worker { } console.log("%d block is indexing...", nextBlockNumber); await this.indexNewBlock(nextBlock); - // FIXME: It's slow due to the getSignerAddress() - await TxModel.removeOutdatedPendings(nextBlock.transactions); console.log("%d block is synchronized", nextBlockNumber); lastIndexedBlockNumber = nextBlockNumber; } @@ -185,6 +183,11 @@ export default class Worker { await updateCCCChange(sdk, block, miningReward, transaction); + // FIXME: It's slow due to the getSignerAddress() + await TxModel.removeOutdatedPendings(block.transactions, { + transaction + }); + await transaction.commit(); } catch (err) { await transaction.rollback(); @@ -215,16 +218,34 @@ export default class Worker { `Indexed: ${indexedHashes.length} / RPC: ${transactions.length}` ); - // Remove dropped pending transactions - if (transactions.length > 0) { - await TxModel.removeOutdatedPendings(transactions); - } + const transaction = await models.sequelize.transaction({ + isolationLevel: + models.Sequelize.Transaction.ISOLATION_LEVELS.SERIALIZABLE, + deferrable: models.Sequelize.Deferrable.SET_DEFERRED + }); + try { + // Remove dropped pending transactions + if (transactions.length > 0) { + await TxModel.removeOutdatedPendings(transactions, { + transaction + }); + } - // Index new pending transactions - const newPendingTransactions = _.filter( - transactions, - pending => !_.includes(indexedHashes, pending.hash().value) - ); - await TxModel.createTransactions(newPendingTransactions, true); + // Index new pending transactions + const newPendingTransactions = _.filter( + transactions, + pending => !_.includes(indexedHashes, pending.hash().value) + ); + await TxModel.createTransactions( + newPendingTransactions, + true, + undefined, + { transaction } + ); + await transaction.commit(); + } catch (err) { + await transaction.rollback(); + throw err; + } }; }