From 972ec8593a55fa4799ee05c292d55862fb073d60 Mon Sep 17 00:00:00 2001 From: Oleksii Serdiukov Date: Fri, 31 Jul 2020 02:00:57 +0300 Subject: [PATCH] fixed cron tasks logging, optimized /enroll API call --- src/server/claimQueue/claimQueueAPI.js | 45 +++++++++++++------ src/server/cron/TaskRunner.js | 30 +++++++------ .../cron/DisposeEnrollmentsTask.js | 2 +- .../processor/EnrollmentProcessor.js | 8 +++- 4 files changed, 56 insertions(+), 29 deletions(-) diff --git a/src/server/claimQueue/claimQueueAPI.js b/src/server/claimQueue/claimQueueAPI.js index bc1fe258..5a52d46d 100644 --- a/src/server/claimQueue/claimQueueAPI.js +++ b/src/server/claimQueue/claimQueueAPI.js @@ -9,19 +9,38 @@ import { ClaimQueueProps } from '../db/mongo/models/props' import conf from '../server.config' import { wrapAsync } from '../utils/helpers' +import logger from '../../imports/logger' + +const defaultLogger = logger.child({ from: 'ClaimQueue ' }) const ClaimQueue = { - async setWhitelisted(user, storage, log) { - //if user has passed, then we mark that in claim queue and tag the user - return Promise.all([ - user.claimQueue && storage.updateUser({ identifier: user.identifier, 'claimQueue.status': 'whitelisted' }), - Mautic.updateContact(user.mauticId, { tags: ['claimqueue_claimed'] }).catch(e => { - log.error('Failed Mautic tagging user claimed', e.message, e, { mauticId: user.mauticId }) - }), - Mautic.addContactsToSegment([user.mauticId], conf.mauticClaimQueueWhitelistedSegmentId).catch(e => { - log && log.error('Failed Mautic adding user to claim queue whitelisted segment', e.message, e) + async setWhitelisted(user, storage, log = defaultLogger) { + const { mauticClaimQueueWhitelistedSegmentId } = conf + const { identifier, mauticId, claimQueue } = user + let result = Promise.resolve() + + if (claimQueue) { + result = storage.updateUser({ identifier, 'claimQueue.status': 'whitelisted' }) + } + + // Mautic calls could took a lot of time and cause ZoOm timeout + // so let's do it 'in background' + // also we won't call mautic if user.mauticId is empty or null + if (mauticId) { + Mautic.updateContact(mauticId, { tags: ['claimqueue_claimed'] }).catch(exception => { + const { message } = exception + + log.error('Failed Mautic tagging user claimed', message, exception, { mauticId }) }) - ]) + + Mautic.addContactsToSegment([mauticId], mauticClaimQueueWhitelistedSegmentId).catch(exception => { + const { message } = exception + + log.error('Failed Mautic adding user to claim queue whitelisted segment', message, exception) + }) + } + + return result }, async getStatistics(storage) { @@ -43,7 +62,7 @@ const ClaimQueue = { return stats.map(pair => ({ [pair._id]: pair.total })) }, - async updateAllowed(toAdd, storage, log) { + async updateAllowed(toAdd, storage, log = defaultLogger) { const { claimQueueAllowed } = conf let queueProps = await ClaimQueueProps.findOne({}) @@ -82,7 +101,7 @@ const ClaimQueue = { return { ok: 1, newAllowed, stillPending, approvedUsers: pendingUsers } }, - async enqueue(user, storage, log) { + async enqueue(user, storage, log = defaultLogger) { const { claimQueueAllowed } = conf const { claimQueue } = user @@ -97,7 +116,7 @@ const ClaimQueue = { const openSpaces = claimQueueAllowed - totalQueued let status = openSpaces > 0 ? 'approved' : 'pending' - //if user was added to queue tag him in mautic + // if user was added to queue tag him in mautic if (['test', 'development'].includes(conf.env) === false && user.mauticId) { if (status === 'pending') { Mautic.updateContact(user.mauticId, { tags: ['claimqueue_in'] }).catch(e => { diff --git a/src/server/cron/TaskRunner.js b/src/server/cron/TaskRunner.js index 1fe6aaf4..5cd23ecb 100644 --- a/src/server/cron/TaskRunner.js +++ b/src/server/cron/TaskRunner.js @@ -1,13 +1,14 @@ -import MongoLock from '../utils/tx-manager/queueMongo' import { CronJob, CronTime } from 'cron' -import { invokeMap, map, filter, once } from 'lodash' +import { invokeMap, keys, once } from 'lodash' +import { v4 as uuidv4 } from 'uuid' +import MongoLock from '../utils/tx-manager/queueMongo' import logger from '../../imports/logger' class TaskRunner { lock = null jobFactory = null - tasks = [] + tasks = {} constructor(lock, jobFactory, logger) { const exitEvents = ['SIGINT', 'beforeExit'] @@ -22,14 +23,14 @@ class TaskRunner { registerTask(task) { const { logger, tasks, lock, jobFactory } = this const { schedule, name } = task - const taskIdentifier = name || tasks.length + const taskName = name || `task/${uuidv4()}` const taskJob = new jobFactory(schedule, async () => { - logger.info('Running cron task', { taskIdentifier }) + logger.info('Running cron task', { taskName }) // we don't need re-queue in the cron. just lock -> run -> release (despite success/failed) - const { address, release } = await lock.lock(taskIdentifier) - logger.info('Obtained mutex for exclusive run:', { address, taskIdentifier }) + const { address, release } = await lock.lock(taskName) + logger.info('Obtained mutex for exclusive run:', { address, taskName }) try { const taskResult = await task.execute({ @@ -37,42 +38,43 @@ class TaskRunner { // let task whould decide to stop or to set new schedule by themselves during execution // let's make this feedback more clear setTime: time => { - logger.info('Cron task setting new schedule', { taskName: name, schedule: time }) + logger.info('Cron task setting new schedule', { taskName, schedule: time }) taskJob.setTime(time instanceof CronTime ? time : new CronTime(time)) taskJob.start() }, stop: () => { - logger.info('Cron task has stopped itself', { taskName: name }) + logger.info('Cron task has stopped itself', { taskName }) taskJob.stop() } }) - logger.info('Cron task completed', { taskIdentifier, taskResult }) + logger.info('Cron task completed', { taskName, taskResult }) } catch (exception) { const { message: errMessage } = exception - logger.error('Cron task failed', errMessage, exception, { taskIdentifier }) + logger.error('Cron task failed', errMessage, exception, { taskName }) } finally { release() } }) - tasks.push(taskJob) + logger.info('Cron task registered', { taskName, schedule }) + tasks[taskName] = taskJob } startTasks() { const { logger, tasks } = this - logger.info('Starting cron tasks', filter(map(tasks, 'name'))) + logger.info('Starting cron tasks', keys(tasks)) invokeMap(tasks, 'start') } stopTasks() { const { logger, tasks } = this - logger.info('Stopping cron tasks') + logger.info('Stopping cron tasks', keys(tasks)) invokeMap(tasks, 'stop') } } diff --git a/src/server/verification/cron/DisposeEnrollmentsTask.js b/src/server/verification/cron/DisposeEnrollmentsTask.js index 0063b789..371c2119 100644 --- a/src/server/verification/cron/DisposeEnrollmentsTask.js +++ b/src/server/verification/cron/DisposeEnrollmentsTask.js @@ -10,7 +10,7 @@ class DisposeEnrollmentsTask { logger = null get schedule() { - return '0 0 * * * *' //once an hour + return '0 0 * * * *' // once an hour } get name() { diff --git a/src/server/verification/processor/EnrollmentProcessor.js b/src/server/verification/processor/EnrollmentProcessor.js index 7c3ffde0..730e07d7 100644 --- a/src/server/verification/processor/EnrollmentProcessor.js +++ b/src/server/verification/processor/EnrollmentProcessor.js @@ -186,11 +186,17 @@ class EnrollmentProcessor { try { const enqueuedDisposalTasks = await storage.fetchTasksForProcessing(DISPOSE_ENROLLMENTS_TASK, enqueuedAtFilters) const enqueuedTasksCount = enqueuedDisposalTasks.length + + if (enqueuedTasksCount <= 0) { + log.info('No enqueued disposal tasks ready to processing found, skipping') + return + } + const approximatedBatchSize = Math.round(enqueuedTasksCount / DISPOSE_BATCH_AMOUNT, 0) const disposeBatchSize = Math.min(DISPOSE_BATCH_MAXIMAL, Math.max(DISPOSE_BATCH_MINIMAL, approximatedBatchSize)) const chunkedDisposalTasks = chunk(enqueuedDisposalTasks, disposeBatchSize) - log.info('Enqueued disposal task fetched and ready to processing', { + log.info('Enqueued disposal tasks fetched and ready to processing', { enqueuedTasksCount, disposeBatchSize })