Skip to content

Commit

Permalink
wip: debug stakingModel task stuck
Browse files Browse the repository at this point in the history
  • Loading branch information
sirpy committed Oct 15, 2020
1 parent 454d1f3 commit efa9076
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 23 deletions.
26 changes: 15 additions & 11 deletions src/server/cron/TaskRunner.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,55 +24,59 @@ class TaskRunner {
const { logger, tasks, lock, jobFactory } = this
const { schedule, name } = task
const taskName = name || `task/${uuidv4()}`

const taskId = uuidv4()
const taskJob = new jobFactory(schedule, async () => {
try {
logger.info('Running cron task. getting lock...', { taskName })
logger.info('Running cron task. getting lock...', { taskName, taskId })

const { address, release } = await lock.lock(taskName, 60000)
const { address, release } = await lock.lock(taskName, 60000, taskId)
// we don't need re-queue in the cron. just lock -> run -> release (despite success/failed)
logger.info('Obtained mutex for exclusive run:', { address, taskName })
logger.info('Obtained mutex for exclusive run:', { address, taskName, taskId })

try {
const taskResult = await task.execute({
// an context object we're passing to the task to let it manipilate its execution & schedule
// let task whould decide to stop or to set new schedule by themselves during execution
// let's make this feedback more clear
setTime: time => {
logger.info('Cron task setting new schedule', { taskName, schedule: time })
logger.info('Cron task setting new schedule', { taskName, schedule: time, taskId })

taskJob.setTime(time instanceof CronTime ? time : new CronTime(time))
taskJob.start()
},

stop: () => {
logger.info('Cron task has stopped itself', { taskName })
logger.info('Cron task has stopped itself', { taskName, taskId })
taskJob.stop()
}
})

logger.info('Cron task completed', { taskName, taskResult })
logger.info('Cron task completed', { taskName, taskResult, taskId })
} catch (exception) {
const { message: errMessage } = exception

logger.error('Cron task failed', errMessage, exception, { taskName })
logger.error('Cron task failed', errMessage, exception, { taskName, taskId })
} finally {
release()
}
} catch (exception) {
const { message: errMessage } = exception
if (errMessage.includes('lock not acquired timeout')) {
const nextTry = moment().add(1, 'hours')
logger.info('task lock timeout,probably other worker is doing it, retrying later', { taskName, nextTry })
logger.info('task lock timeout,probably other worker is doing it, retrying later', {
taskName,
nextTry,
taskId
})
taskJob.setTime(new CronTime(nextTry))
taskJob.start()
return
}
logger.error('Cron task failed', errMessage, exception, { taskName })
logger.error('Cron task failed', errMessage, exception, { taskName, taskId })
}
})

logger.info('Cron task registered', { taskName, schedule })
logger.info('Cron task registered', { taskName, schedule, taskId })
tasks[taskName] = taskJob
}

Expand Down
30 changes: 18 additions & 12 deletions src/server/utils/tx-manager/queueMongo.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export default class queueMongo {
* @returns {Promise<*>}
*/
async getWalletNonce(addresses) {
async getWalletNonce(addresses, id) {
try {
const expired = moment()
.subtract(this.lockExpireSeconds, 'seconds')
Expand All @@ -58,12 +58,12 @@ export default class queueMongo {
]
}
const update = { isLock: true, lockedAt: +new Date() }
this.log.debug('getting free address', { addresses, expired })
this.log.debug('getting free address', { addresses, expired, id })
let wallet = await this.model.findOneAndUpdate(filter, update, {
sort: { lockedAt: 1 }, //get least recently used
returnNewDocument: true
})
this.log.debug('got free address', { addresses, expired, wallet })
this.log.debug('got free address', { addresses, expired, wallet, id })

if (this.reRunQueue) {
clearTimeout(this.reRunQueue)
Expand All @@ -73,7 +73,7 @@ export default class queueMongo {
}, this.lockExpireSeconds * 1000)
return wallet
} catch (e) {
this.log.error('TX queueMongo (getWalletNonce)', e.message, e, { addresses })
this.log.error('TX queueMongo (getWalletNonce)', e.message, e, { addresses, id })
return false
}
}
Expand Down Expand Up @@ -163,7 +163,7 @@ export default class queueMongo {
*
* @returns {Promise<any>}
*/
lock(addresses, timeout = 15000) {
lock(addresses, timeout = 15000, id) {
return new Promise((resolve, reject) => {
let timer

Expand All @@ -180,15 +180,21 @@ export default class queueMongo {
timer = setTimeout(() => {
//if timer make sure to remove request from queue
this.removeFromQueue(cb)
reject(new Error('lock not acquired timeout'))
reject(new Error('lock not acquired timeout id:' + id))
this.log.warn('lock timedout,', { addresses, id })
}, timeout)

this.addToQueue(addresses, cb)
this.addToQueue(addresses, cb, id)
})
}

removeFromQueue(cb) {
remove(this.queue, x => x.cb === cb)
remove(this.queue, x => {
if (x.cb === cb) {
return (x.removed = true)
}
return false
})
}

/**
Expand All @@ -199,11 +205,11 @@ export default class queueMongo {
*
* @returns {Promise<void>}
*/
async addToQueue(addresses, cb) {
async addToQueue(addresses, cb, id) {
addresses = Array.isArray(addresses) ? addresses : [addresses]
await this.createListIfNotExists(addresses)

this.queue.push({ cb, addresses })
this.queue.push({ cb, addresses, id })

this.run()
}
Expand All @@ -218,11 +224,11 @@ export default class queueMongo {
try {
if (this.queue.length > 0) {
nextTr = this.queue.shift()
walletNonce = await this.getWalletNonce(nextTr.addresses)
if (!nextTr.removed) walletNonce = await this.getWalletNonce(nextTr.addresses, nextTr.id)
if (walletNonce) {
nextTr.cb({ nonce: walletNonce.nonce, address: walletNonce.address })
} else {
this.queue.push(nextTr)
!nextTr.removed && this.queue.push(nextTr)
}
}
} catch (e) {
Expand Down

0 comments on commit efa9076

Please sign in to comment.