Skip to content

Commit

Permalink
fixed cron tasks logging, optimized /enroll API call
Browse files Browse the repository at this point in the history
  • Loading branch information
serdiukov-o-nordwhale committed Jul 30, 2020
1 parent c859276 commit 972ec85
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 29 deletions.
45 changes: 32 additions & 13 deletions src/server/claimQueue/claimQueueAPI.js
Expand Up @@ -9,19 +9,38 @@ import { ClaimQueueProps } from '../db/mongo/models/props'

import conf from '../server.config'
import { wrapAsync } from '../utils/helpers'
import logger from '../../imports/logger'

const defaultLogger = logger.child({ from: 'ClaimQueue ' })

const ClaimQueue = {
async setWhitelisted(user, storage, log) {
//if user has passed, then we mark that in claim queue and tag the user
return Promise.all([
user.claimQueue && storage.updateUser({ identifier: user.identifier, 'claimQueue.status': 'whitelisted' }),
Mautic.updateContact(user.mauticId, { tags: ['claimqueue_claimed'] }).catch(e => {
log.error('Failed Mautic tagging user claimed', e.message, e, { mauticId: user.mauticId })
}),
Mautic.addContactsToSegment([user.mauticId], conf.mauticClaimQueueWhitelistedSegmentId).catch(e => {
log && log.error('Failed Mautic adding user to claim queue whitelisted segment', e.message, e)
async setWhitelisted(user, storage, log = defaultLogger) {
const { mauticClaimQueueWhitelistedSegmentId } = conf
const { identifier, mauticId, claimQueue } = user
let result = Promise.resolve()

if (claimQueue) {
result = storage.updateUser({ identifier, 'claimQueue.status': 'whitelisted' })
}

// Mautic calls could took a lot of time and cause ZoOm timeout
// so let's do it 'in background'
// also we won't call mautic if user.mauticId is empty or null
if (mauticId) {
Mautic.updateContact(mauticId, { tags: ['claimqueue_claimed'] }).catch(exception => {
const { message } = exception

log.error('Failed Mautic tagging user claimed', message, exception, { mauticId })
})
])

Mautic.addContactsToSegment([mauticId], mauticClaimQueueWhitelistedSegmentId).catch(exception => {
const { message } = exception

log.error('Failed Mautic adding user to claim queue whitelisted segment', message, exception)
})
}

return result
},

async getStatistics(storage) {
Expand All @@ -43,7 +62,7 @@ const ClaimQueue = {
return stats.map(pair => ({ [pair._id]: pair.total }))
},

async updateAllowed(toAdd, storage, log) {
async updateAllowed(toAdd, storage, log = defaultLogger) {
const { claimQueueAllowed } = conf
let queueProps = await ClaimQueueProps.findOne({})

Expand Down Expand Up @@ -82,7 +101,7 @@ const ClaimQueue = {
return { ok: 1, newAllowed, stillPending, approvedUsers: pendingUsers }
},

async enqueue(user, storage, log) {
async enqueue(user, storage, log = defaultLogger) {
const { claimQueueAllowed } = conf
const { claimQueue } = user

Expand All @@ -97,7 +116,7 @@ const ClaimQueue = {
const openSpaces = claimQueueAllowed - totalQueued
let status = openSpaces > 0 ? 'approved' : 'pending'

//if user was added to queue tag him in mautic
// if user was added to queue tag him in mautic
if (['test', 'development'].includes(conf.env) === false && user.mauticId) {
if (status === 'pending') {
Mautic.updateContact(user.mauticId, { tags: ['claimqueue_in'] }).catch(e => {
Expand Down
30 changes: 16 additions & 14 deletions src/server/cron/TaskRunner.js
@@ -1,13 +1,14 @@
import MongoLock from '../utils/tx-manager/queueMongo'
import { CronJob, CronTime } from 'cron'
import { invokeMap, map, filter, once } from 'lodash'
import { invokeMap, keys, once } from 'lodash'
import { v4 as uuidv4 } from 'uuid'

import MongoLock from '../utils/tx-manager/queueMongo'
import logger from '../../imports/logger'

class TaskRunner {
lock = null
jobFactory = null
tasks = []
tasks = {}

constructor(lock, jobFactory, logger) {
const exitEvents = ['SIGINT', 'beforeExit']
Expand All @@ -22,57 +23,58 @@ class TaskRunner {
registerTask(task) {
const { logger, tasks, lock, jobFactory } = this
const { schedule, name } = task
const taskIdentifier = name || tasks.length
const taskName = name || `task/${uuidv4()}`

const taskJob = new jobFactory(schedule, async () => {
logger.info('Running cron task', { taskIdentifier })
logger.info('Running cron task', { taskName })

// we don't need re-queue in the cron. just lock -> run -> release (despite success/failed)
const { address, release } = await lock.lock(taskIdentifier)
logger.info('Obtained mutex for exclusive run:', { address, taskIdentifier })
const { address, release } = await lock.lock(taskName)
logger.info('Obtained mutex for exclusive run:', { address, taskName })

try {
const taskResult = await task.execute({
// an context object we're passing to the task to let it manipilate its execution & schedule
// let task whould decide to stop or to set new schedule by themselves during execution
// let's make this feedback more clear
setTime: time => {
logger.info('Cron task setting new schedule', { taskName: name, schedule: time })
logger.info('Cron task setting new schedule', { taskName, schedule: time })

taskJob.setTime(time instanceof CronTime ? time : new CronTime(time))
taskJob.start()
},

stop: () => {
logger.info('Cron task has stopped itself', { taskName: name })
logger.info('Cron task has stopped itself', { taskName })
taskJob.stop()
}
})

logger.info('Cron task completed', { taskIdentifier, taskResult })
logger.info('Cron task completed', { taskName, taskResult })
} catch (exception) {
const { message: errMessage } = exception

logger.error('Cron task failed', errMessage, exception, { taskIdentifier })
logger.error('Cron task failed', errMessage, exception, { taskName })
} finally {
release()
}
})

tasks.push(taskJob)
logger.info('Cron task registered', { taskName, schedule })
tasks[taskName] = taskJob
}

startTasks() {
const { logger, tasks } = this

logger.info('Starting cron tasks', filter(map(tasks, 'name')))
logger.info('Starting cron tasks', keys(tasks))
invokeMap(tasks, 'start')
}

stopTasks() {
const { logger, tasks } = this

logger.info('Stopping cron tasks')
logger.info('Stopping cron tasks', keys(tasks))
invokeMap(tasks, 'stop')
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/verification/cron/DisposeEnrollmentsTask.js
Expand Up @@ -10,7 +10,7 @@ class DisposeEnrollmentsTask {
logger = null

get schedule() {
return '0 0 * * * *' //once an hour
return '0 0 * * * *' // once an hour
}

get name() {
Expand Down
8 changes: 7 additions & 1 deletion src/server/verification/processor/EnrollmentProcessor.js
Expand Up @@ -186,11 +186,17 @@ class EnrollmentProcessor {
try {
const enqueuedDisposalTasks = await storage.fetchTasksForProcessing(DISPOSE_ENROLLMENTS_TASK, enqueuedAtFilters)
const enqueuedTasksCount = enqueuedDisposalTasks.length

if (enqueuedTasksCount <= 0) {
log.info('No enqueued disposal tasks ready to processing found, skipping')
return
}

const approximatedBatchSize = Math.round(enqueuedTasksCount / DISPOSE_BATCH_AMOUNT, 0)
const disposeBatchSize = Math.min(DISPOSE_BATCH_MAXIMAL, Math.max(DISPOSE_BATCH_MINIMAL, approximatedBatchSize))
const chunkedDisposalTasks = chunk(enqueuedDisposalTasks, disposeBatchSize)

log.info('Enqueued disposal task fetched and ready to processing', {
log.info('Enqueued disposal tasks fetched and ready to processing', {
enqueuedTasksCount,
disposeBatchSize
})
Expand Down

0 comments on commit 972ec85

Please sign in to comment.