Skip to content

Commit

Permalink
add: timeout to lock of queueMongo
Browse files Browse the repository at this point in the history
  • Loading branch information
sirpy committed Aug 28, 2019
1 parent 8189283 commit 7682c54
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 52 deletions.
9 changes: 3 additions & 6 deletions src/server/blockchain/AdminWallet.js
Expand Up @@ -146,6 +146,7 @@ export class Wallet {
let gdbalance = await this.tokenContract.methods.balanceOf(this.address).call()
let nativebalance = await this.web3.eth.getBalance(this.address)
this.nonce = parseInt(await this.web3.eth.getTransactionCount(this.address))
txManager.init(this.address, this.nonce)
log.debug('AdminWallet Ready:', {
account: this.address,
gdbalance,
Expand Down Expand Up @@ -286,9 +287,7 @@ export class Wallet {
gas = gas || (await tx.estimateGas())
gasPrice = gasPrice || this.gasPrice

const netNonce = parseInt(await this.web3.eth.getTransactionCount(this.address))

const { nonce, release, fail } = await txManager.lock(this.address, netNonce)
const { nonce, release, fail } = await txManager.lock(this.address)

return new Promise((res, rej) => {
tx.send({ gas, gasPrice, chainId: this.networkId, nonce })
Expand Down Expand Up @@ -332,9 +331,7 @@ export class Wallet {
gas = gas || 100000
gasPrice = gasPrice || this.gasPrice

const netNonce = parseInt(await this.web3.eth.getTransactionCount(this.address))

const { nonce, release, fail } = await txManager.lock(this.address, netNonce)
const { nonce, release, fail } = await txManager.lock(this.address)

return new Promise((res, rej) => {
this.web3.eth
Expand Down
3 changes: 2 additions & 1 deletion src/server/models/wallet-nonce.js
Expand Up @@ -13,7 +13,8 @@ export const WalletNonceSchema = new mongoose.Schema({
isLock: {
type: Boolean,
default: false
}
},
createdAt: { type: Date, default: Date.now }
})

export default mongoose.db.model(MODEL_WALLET_NONCE, WalletNonceSchema)
49 changes: 34 additions & 15 deletions src/server/utils/__tests__/tx-manager.js
@@ -1,13 +1,18 @@
// @flow
import queueMongo from '../tx-manager/queueMongo'
import WalletNonce from '../../models/wallet-nonce'

import { MODEL_WALLET_EXPIRE } from '../../models/constants'
import config from '../../server.config'
let txManagerMongo

const prefixTestAddress = 'test'

jest.setTimeout(3000000)

jest.setTimeout(350000)
const Timeout = (timeout: msec) => {
return new Promise((res, rej) => {
setTimeout(rej, timeout, new Error('Request Timeout'))
})
}
beforeAll(async () => {
txManagerMongo = new queueMongo()
})
Expand All @@ -16,39 +21,39 @@ afterAll(async () => {
await WalletNonce.deleteMany({ address: new RegExp(prefixTestAddress, 'i') })
})

test('txManagerMongo - queue nonce release', async () => {
test('lock should release on success', async () => {
let netNonce = 0
const testAddress = `${prefixTestAddress} - ${Date.now()}`

txManagerMongo.init(testAddress, netNonce)
for (let i = 0; i < 5; i++) {
const { nonce, release, fail } = await txManagerMongo.lock(testAddress, netNonce)

const { nonce, release, fail } = await txManagerMongo.lock(testAddress)
release()

await expect(nonce === i).toBeTruthy()
expect(nonce === i).toBeTruthy()
}
})

test('txManagerMongo - queue nonce fail', async () => {
test('lock should release on fail', async () => {
let netNonce = 0
const testAddress = `${prefixTestAddress} - ${Date.now()}`

txManagerMongo.init(testAddress, netNonce)
for (let i = 0; i < 5; i++) {
const { nonce, release, fail } = await txManagerMongo.lock(testAddress, netNonce)

console.log('got lock 2', { nonce })
fail()

await expect(nonce === 0).toBeTruthy()
expect(nonce === 0).toBeTruthy()
}
})

test('txManagerMongo - queue nonce fail/release', async () => {
test('nonce should be in correct order', async () => {
let netNonce = 0
const testAddress = `${prefixTestAddress} - ${Date.now()}`
txManagerMongo.init(testAddress, netNonce)
let nowNetNonce = netNonce

for (let i = 0; i < 10; i++) {
const { nonce, release, fail } = await txManagerMongo.lock(testAddress, netNonce)
console.log('got lock 3', { nonce })

if (i % 2 === 0) {
release()
Expand All @@ -58,5 +63,19 @@ test('txManagerMongo - queue nonce fail/release', async () => {
nowNetNonce = nonce
}

await expect(nowNetNonce === 5).toBeTruthy()
expect(nowNetNonce === 5).toBeTruthy()
})

test('lock should release after ttl', async () => {
let netNonce = 0
const testAddress = `${prefixTestAddress} - ${Date.now()}`
txManagerMongo.init(testAddress, netNonce)
let txManagerMongo2 = new queueMongo()
const start = Date.now()
const { nonce, release, fail } = await txManagerMongo.lock(testAddress, netNonce)
const { nonce: nonce2, release: release2, fail: fail2 } = await txManagerMongo2.lock(testAddress, netNonce)
const end = Date.now()
release2()
expect(end - start >= MODEL_WALLET_EXPIRE).toBeTruthy()
expect(nonce === 0).toBeTruthy()
})
8 changes: 4 additions & 4 deletions src/server/utils/tx-manager.js
Expand Up @@ -2,8 +2,8 @@ import config from '../server.config'
import queueMongo from './tx-manager/queueMongo'
import queueMutex from './tx-manager/queueMutex'

const network_id = config.ethereum.network_id
const LOCAL_NETWORK_ID = 4447
const network = config.network
const LOCAL_NETWORK = 'develop'

class TransactionRun {
/**
Expand All @@ -14,8 +14,8 @@ class TransactionRun {
static getManagerInstance() {
let queueManager = null

switch (network_id) {
case LOCAL_NETWORK_ID:
switch (network) {
case LOCAL_NETWORK:
queueManager = new queueMutex()
break

Expand Down
40 changes: 22 additions & 18 deletions src/server/utils/tx-manager/queueMongo.js
@@ -1,5 +1,5 @@
import WalletNonce from '../../models/wallet-nonce'

import { MODEL_WALLET_EXPIRE } from '../../models/constants'
export default class queueMongo {
constructor() {
this.model = WalletNonce
Expand All @@ -15,7 +15,6 @@ export default class queueMongo {
]

const options = { fullDocument: 'updateLookup' }

// listen to the collection
this.model.watch(filter, options).on('change', data => {
this.run()
Expand All @@ -25,24 +24,30 @@ export default class queueMongo {
/**
* Get new nonce after increment
* @param address
* @param netNonce
* @returns {Promise<*>}
*/
async getWalletNonce(address) {
try {
let wallet = await this.model.findOneAndUpdate(
{ address, isLock: false },
{ isLock: true },
{ returnNewDocument: true }
{
$or: [
{ address, isLock: false },
{ isLock: true, createdAt: { $gte: new Date(new Date().getTime() - MODEL_WALLET_EXPIRE).toISOString() } }
]
},
{ isLock: true, $currentDate: { createdAt: true } },
{ returnOriginal: false, returnNewDocument: true }
)

return wallet
} catch (e) {
console.log(e)
return false
}
}

init(address, nonce) {
this.createIfNotExist(address, nonce)
}
/**
* Create if not exist nonce to db
* @param address
Expand Down Expand Up @@ -75,7 +80,8 @@ export default class queueMongo {
{ address },
{
isLock: false,
nonce: nextNonce
nonce: nextNonce,
createdAt: 0
},
{ returnNewDocument: true }
)
Expand All @@ -84,12 +90,11 @@ export default class queueMongo {
/**
* lock for queue
* @param address
* @param netNonce
* @returns {Promise<any>}
*/
async lock(address, netNonce) {
async lock(address) {
return new Promise(resolve => {
this.addToQueue(address, netNonce, nonce =>
this.addToQueue(address, nonce =>
resolve({
nonce,
release: async () => await this.unlock(address, nonce + 1),
Expand All @@ -102,13 +107,10 @@ export default class queueMongo {
/**
* Add new tr to queue
* @param address
* @param netNonce
* @param cb
* @returns {Promise<void>}
*/
async addToQueue(address, netNonce, cb) {
await this.createIfNotExist(address, netNonce)

async addToQueue(address, cb) {
this.queue.push({ cb, address })

this.run()
Expand All @@ -118,17 +120,19 @@ export default class queueMongo {
* Run the first transaction from the queue
* @returns {Promise<void>}
*/
async run() {
run = async () => {
this.timeout && clearTimeout(this.timeout)
try {
if (this.queue.length > 0) {
const nextTr = this.queue[0]

const walletNonce = await this.getWalletNonce(nextTr.address)

if (walletNonce) {
this.queue.shift()
nextTr.cb(walletNonce.nonce)
}
if (this.queue.length > 0) {
this.timeout = setTimeout(this.run, MODEL_WALLET_EXPIRE)
}
}
} catch (e) {
console.log(e)
Expand Down
15 changes: 7 additions & 8 deletions src/server/utils/tx-manager/queueMutex.js
Expand Up @@ -6,27 +6,26 @@ export default class queueMutex {
this.mutex = new Mutex()
}

init(address, nonce) {
this.nonce = nonce
}
/**
* lock for queue
* @param address
* @param netNonce
* @returns {Promise<any>}
*/
async lock(address, netNonce) {
if (!this.nonce) {
this.nonce = netNonce
} else {
this.nonce++
}
async lock(address) {
this.nonce++

let release = await this.mutex.lock()

return {
nonce: this.nonce,
release: release,
fail: () => {
this.nonce--;
release();
this.nonce--
release()
}
}
}
Expand Down

0 comments on commit 7682c54

Please sign in to comment.