diff --git a/queues/index.js b/queues/index.js index bf73d64..8bad644 100644 --- a/queues/index.js +++ b/queues/index.js @@ -19,12 +19,7 @@ fs.readdirSync(__dirname) }) .forEach(function (file) { let consumer = require(path.join(__dirname, file)); - if (consumer.name === 'newTransaction') { - q.process(consumer.name, 6, consumer.task); - - } else { - q.process(consumer.name, consumer.task); - } + q.process(consumer.name, consumer.task); }); module.exports = q; \ No newline at end of file diff --git a/queues/transaction.js b/queues/transaction.js index 3615a93..4ed44a9 100644 --- a/queues/transaction.js +++ b/queues/transaction.js @@ -45,8 +45,7 @@ consumer.task = async function(job, done) { block: transaction.blockNumber, fromAccount: fromWallet, toAccount: toWallet, - tokenAmount: tokenAmount, - isProcess: true + tokenAmount: tokenAmount }, { upsert: true, new: true }) @@ -55,10 +54,17 @@ consumer.task = async function(job, done) { const q = require('./index') await q.create('addAmountToWallet', {toWallet: toWallet, tokenAmount: tokenAmount}) - .priority('high').removeOnComplete(true).save() + .attempts(5).backoff({delay: 10000}) + .priority('critical').removeOnComplete(true).save() await q.create('subAmountFromWallet', {fromWallet: fromWallet, tokenAmount: tokenAmount}) - .priority('high').removeOnComplete(true).save() + .attempts(5).backoff({delay: 10000}) + .priority('critical').removeOnComplete(true).save() + + await db.Transaction.findOneAndUpdate( + {hash: transaction.transactionHash, fromAccount: fromWallet, toAccount: toWallet}, + {isProcess: true}, { upsert: true, new: true } + ) } }