From f43bb3dcdafda41ce887eb901fc9473c2eda0273 Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 12:30:53 +0300 Subject: [PATCH 01/12] add definitions for limiter worker event types --- workers/limiter/types/eventTypes.ts | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 workers/limiter/types/eventTypes.ts diff --git a/workers/limiter/types/eventTypes.ts b/workers/limiter/types/eventTypes.ts new file mode 100644 index 00000000..ea945d86 --- /dev/null +++ b/workers/limiter/types/eventTypes.ts @@ -0,0 +1,22 @@ +/** + * Event for checking events count for specified workspace + * Limiter will unban workspace projects if event limit doesn't exceed + */ +export interface CheckWorkspaceEvent { + type: 'check-workspace' + workspaceId: string; +} + +/** + * Event for checking current total events count in workspaces and limits events receiving if workspace exceed the limit + */ +export interface RegularWorkspacesCheckEvent { + type: 'regular-workspaces-check' +} + +/** + * All types of events for limiter worker + */ +type LimiterEvent = CheckWorkspaceEvent | RegularWorkspacesCheckEvent; + +export default LimiterEvent; From 7f6b36cc6330537a327bb8e5d6bc3afe5ade73b1 Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 13:56:05 +0300 Subject: [PATCH 02/12] some code refactoring --- workers/limiter/src/index.ts | 204 +++++++++++++++++++++------- workers/limiter/types/eventTypes.ts | 16 ++- 2 files changed, 166 insertions(+), 54 deletions(-) diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 902895bb..f1cbf3f5 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -2,7 +2,7 @@ import { DatabaseController } from '../../../lib/db/controller'; import { Worker } from '../../../lib/worker'; import * as pkg from '../package.json'; import asyncForEach from '../../../lib/utils/asyncForEach'; -import { Collection, Db } from 'mongodb'; +import { Collection, Db, ObjectId } from 'mongodb'; import * as path from 'path'; import * as dotenv from 'dotenv'; import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from 'hawk.types'; @@ -13,6 +13,7 @@ import shortNumber from 'short-number'; import ReportData from '../types/reportData'; import { CriticalError } from '../../../lib/workerErrors'; import { HOURS_IN_DAY, MINUTES_IN_HOUR, MS_IN_SEC, SECONDS_IN_MINUTE } from '../../../lib/utils/consts'; +import LimiterEvent, { CheckSingleWorkspaceEvent } from '../types/eventTypes'; /** * Workspace with its tariff plan @@ -88,68 +89,87 @@ export default class LimiterWorker extends Worker { /** * Task handling function + * + * @param event - worker event to handle */ - public async handle(): Promise { + public async handle(event: LimiterEvent): Promise { this.logger.info('Limiter worker started task'); - const { bannedWorkspaces, bannedProjectIds } = await this.getWorkspacesAndProjectsIdsToBan(); + switch (event.type) { + case 'check-single-workspace': + return this.handleCheckSingleWorkspaceEvent(event); + case 'regular-workspaces-check': + return this.handleRegularWorkspacesCheck(); + } - await this.saveToRedis(bannedProjectIds); + this.logger.info('Limiter worker finished task'); + } - await this.sendReport({ - bannedWorkspaces, - bannedProjectIds, - }); + /** + * Handles event for checking events count for specified workspace + * + * @param event - event to handle + */ + private async handleCheckSingleWorkspaceEvent(event: CheckSingleWorkspaceEvent): Promise { + const workspace = await this.getWorkspaceWithTariffPlan(event.workspaceId); + const workspaceProjects = await this.getProjects(event.workspaceId); + const workspaceProjectsIds = workspaceProjects.map(p => p._id.toString()); + + const report = await this.analyzeWorkspaceData(workspace, workspaceProjects); + + // await this.updateWorkspacesEventsCount([ report.updatedWorkspace ]); + // + // if (report.isBanned) { + // await this.appendBannedProjectsToRedis(workspaceProjectsIds); + // } else { + // await this.removeBannedProjectsFromRedis(workspaceProjectsIds); + // } + // + // await this.sendSingleWorkspacesCheckReport(report); + } - this.logger.info('Limiter worker finished task'); + /** + * Handles event for for checking current total events count in workspaces + * and limits events receiving if workspace exceed the limit + */ + private async handleRegularWorkspacesCheck(): Promise { + const report = await this.analyzeWorkspacesLimits(); + + await this.updateWorkspacesEventsCount(report.updatedWorkspaces); + await this.saveToRedis(report.bannedProjectIds); + + await this.sendRegularWorkspacesCheckReport(report); } /** * Checks which workspaces reached the limit and return them along with their projects ids. * Also, updates workspace current event count in db. */ - private async getWorkspacesAndProjectsIdsToBan(): Promise { + private async analyzeWorkspacesLimits(): Promise { const bannedWorkspaces: WorkspaceWithTariffPlan[] = []; + const updatedWorkspaces: WorkspaceWithTariffPlan[] = []; const bannedProjectIds: string[] = []; const [projects, workspacesWithTariffPlans] = await Promise.all([ - this.getAllProjects(), + this.getProjects(), this.getWorkspacesWithTariffPlans(), ]); await asyncForEach(workspacesWithTariffPlans, async workspace => { const workspaceProjects = projects.filter(p => p.workspaceId.toString() === workspace._id.toString()); - /** - * If last charge date is not specified, then we skip checking it - * In the next time the Paymaster worker starts, it will set lastChargeDate for this workspace - * and limiter will process it successfully - */ - if (!workspace.lastChargeDate) { - HawkCatcher.send(new Error('Workspace without lastChargeDate detected'), { - workspaceId: workspace._id, - }); - - return; - } - - const since = Math.floor(new Date(workspace.lastChargeDate).getTime() / MS_IN_SEC); - - const workspaceEventsCount = await this.getEventsCountByProjects(workspaceProjects, since); - - await this.updateWorkspaceEventsCount(workspace, workspaceEventsCount); + const { isBanned, updatedWorkspace } = await this.analyzeWorkspaceData(workspace, workspaceProjects); - if (workspace.tariffPlan.eventsLimit < workspaceEventsCount) { + if (isBanned) { bannedProjectIds.push(...workspaceProjects.map(p => p._id.toString())); - bannedWorkspaces.push({ - ...workspace, - billingPeriodEventsCount: workspaceEventsCount, - }); + bannedWorkspaces.push(updatedWorkspace); } + updatedWorkspaces.push(updatedWorkspace); }); return { bannedWorkspaces, bannedProjectIds, + updatedWorkspaces, }; } @@ -168,19 +188,6 @@ export default class LimiterWorker extends Worker { .then(sum); } - /** - * Updates events counter during billing period for workspace - * - * @param workspace — workspace id for updating - * @param workspaceEventsCount - workspaces events count to set - */ - private async updateWorkspaceEventsCount(workspace: WorkspaceDBScheme, workspaceEventsCount: number): Promise { - await this.workspacesCollection.updateOne( - { _id: workspace._id }, - { $set: { billingPeriodEventsCount: workspaceEventsCount } } - ); - } - /** * Saves banned project ids to redis * If there is no projects, then previous data in Redis will be erased @@ -214,10 +221,16 @@ export default class LimiterWorker extends Worker { } /** - * Returns all projects from Database + * Returns all projects from Database or projects of the specified workspace + * + * @param [workspaceId] - workspace ids to fetch projects that belongs that workspace */ - private getAllProjects(): Promise { - return this.projectsCollection.find({}).toArray(); + private getProjects(workspaceId?: string): Promise { + const query = workspaceId + ? { workspaceId: new ObjectId(workspaceId) } + : {}; + + return this.projectsCollection.find(query).toArray(); } /** @@ -246,6 +259,41 @@ export default class LimiterWorker extends Worker { ]).toArray(); } + /** + * Returns workspace with its tariff plan by its id + * + * @param id - workspace id + */ + private async getWorkspaceWithTariffPlan(id: string): Promise { + const workspacesArray = await this.workspacesCollection.aggregate([ + { + $match: { + _id: new ObjectId(id), + }, + }, + { + $lookup: { + from: 'plans', + localField: 'tariffPlanId', + foreignField: '_id', + as: 'tariffPlan', + }, + }, + { + $unwind: { + path: '$tariffPlan', + }, + }, + { + $addFields: { + billingPeriodEventsCount: 0, + }, + }, + ]).toArray(); + + return workspacesArray.pop(); + } + /** * Returns total event counts for last billing period * @@ -284,7 +332,7 @@ export default class LimiterWorker extends Worker { * * @param reportData - data for sending notification after task handling */ - private async sendReport(reportData: ReportData): Promise { + private async sendRegularWorkspacesCheckReport(reportData: ReportData): Promise { if (!process.env.REPORT_NOTIFY_URL) { this.logger.error('Can\'t send report because REPORT_NOTIFY_URL not provided'); @@ -313,4 +361,58 @@ export default class LimiterWorker extends Worker { data: 'message=' + report + '&parse_mode=HTML', }); } + + /** + * @param workspace + * @param projects + */ + private async analyzeWorkspaceData(workspace: WorkspaceWithTariffPlan, projects: ProjectDBScheme[]): Promise<{ + isBanned: boolean; + updatedWorkspace: WorkspaceWithTariffPlan + }> { + /** + * If last charge date is not specified, then we skip checking it + * In the next time the Paymaster worker starts, it will set lastChargeDate for this workspace + * and limiter will process it successfully + */ + if (!workspace.lastChargeDate) { + HawkCatcher.send(new Error('Workspace without lastChargeDate detected'), { + workspaceId: workspace._id, + }); + + return; + } + const since = Math.floor(new Date(workspace.lastChargeDate).getTime() / MS_IN_SEC); + + const workspaceEventsCount = await this.getEventsCountByProjects(projects, since); + const updatedWorkspace = { + ...workspace, + billingPeriodEventsCount: workspaceEventsCount, + }; + + return { + isBanned: workspace.tariffPlan.eventsLimit < workspaceEventsCount, + updatedWorkspace, + }; + } + + /** + * Updates workspaces data in Database + * + * @param workspaces - workspaces data to update + */ + private async updateWorkspacesEventsCount(workspaces: WorkspaceDBScheme[]): Promise { + const operations = workspaces.map(workspace => { + return { + updateOne: { + filter: { + _id: workspace._id, + }, + update: { $set: { billingPeriodEventsCount: workspace.billingPeriodEventsCount } }, + }, + }; + }); + + await this.workspacesCollection.bulkWrite(operations); + } } diff --git a/workers/limiter/types/eventTypes.ts b/workers/limiter/types/eventTypes.ts index ea945d86..600e745e 100644 --- a/workers/limiter/types/eventTypes.ts +++ b/workers/limiter/types/eventTypes.ts @@ -2,8 +2,15 @@ * Event for checking events count for specified workspace * Limiter will unban workspace projects if event limit doesn't exceed */ -export interface CheckWorkspaceEvent { - type: 'check-workspace' +export interface CheckSingleWorkspaceEvent { + /** + * Event type name + */ + type: 'check-single-workspace' + + /** + * Workspace id to check + */ workspaceId: string; } @@ -11,12 +18,15 @@ export interface CheckWorkspaceEvent { * Event for checking current total events count in workspaces and limits events receiving if workspace exceed the limit */ export interface RegularWorkspacesCheckEvent { + /** + * Event type name + */ type: 'regular-workspaces-check' } /** * All types of events for limiter worker */ -type LimiterEvent = CheckWorkspaceEvent | RegularWorkspacesCheckEvent; +type LimiterEvent = CheckSingleWorkspaceEvent | RegularWorkspacesCheckEvent; export default LimiterEvent; From 022c5edc8f7ff0d3d3d1026d4ad10f58cbf488b5 Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 16:18:55 +0300 Subject: [PATCH 03/12] refactor logger creating logic --- lib/logger.ts | 20 ++++++++++++++++++++ lib/worker.ts | 16 ++-------------- 2 files changed, 22 insertions(+), 14 deletions(-) create mode 100644 lib/logger.ts diff --git a/lib/logger.ts b/lib/logger.ts new file mode 100644 index 00000000..d64b38ca --- /dev/null +++ b/lib/logger.ts @@ -0,0 +1,20 @@ +import { createLogger as createWinstonLogger, format, transports, Logger } from 'winston'; + +/** + * + */ +export default function createLogger(): Logger { + return createWinstonLogger({ + level: process.env.LOG_LEVEL || 'info', + transports: [ + new transports.Console({ + format: format.combine( + format.timestamp(), + format.colorize(), + format.simple(), + format.printf((msg) => `${msg.timestamp} - ${msg.level}: ${msg.message}`) + ), + }), + ], + }); +} diff --git a/lib/worker.ts b/lib/worker.ts index 0d7bc67f..992f8744 100644 --- a/lib/worker.ts +++ b/lib/worker.ts @@ -1,11 +1,11 @@ import * as amqp from 'amqplib'; import * as client from 'prom-client'; -import { createLogger, format, transports, Logger } from 'winston'; import { WorkerTask } from './types/worker-task'; import { CriticalError, NonCriticalError, ParsingError } from './workerErrors'; import { MongoError } from 'mongodb'; import HawkCatcher from '@hawk.so/nodejs'; import CacheController from '../lib/cache/controller'; +import createLogger from "./logger"; /** * Base worker class for processing tasks @@ -46,19 +46,7 @@ export abstract class Worker { * Logger module * (default level='info') */ - protected logger: Logger = createLogger({ - level: process.env.LOG_LEVEL || 'info', - transports: [ - new transports.Console({ - format: format.combine( - format.timestamp(), - format.colorize(), - format.simple(), - format.printf((msg) => `${msg.timestamp} - ${msg.level}: ${msg.message}`) - ), - }), - ], - }); + protected logger = createLogger(); /** * Cache module. From aa1e195a4d24794e3347f85c8a850235f278961b Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 16:19:32 +0300 Subject: [PATCH 04/12] refactor limiter worker and add second event type handler --- workers/limiter/src/index.ts | 158 ++++++++++++++-------------- workers/limiter/src/redisHelper.ts | 96 +++++++++++++++++ workers/limiter/types/index.ts | 7 ++ workers/limiter/types/reportData.ts | 16 ++- 4 files changed, 194 insertions(+), 83 deletions(-) create mode 100644 workers/limiter/src/redisHelper.ts create mode 100644 workers/limiter/types/index.ts diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index f1cbf3f5..88c3d2e3 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -5,20 +5,16 @@ import asyncForEach from '../../../lib/utils/asyncForEach'; import { Collection, Db, ObjectId } from 'mongodb'; import * as path from 'path'; import * as dotenv from 'dotenv'; -import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from 'hawk.types'; -import redis from 'redis'; +import { ProjectDBScheme, WorkspaceDBScheme } from 'hawk.types'; import HawkCatcher from '@hawk.so/nodejs'; import axios from 'axios'; import shortNumber from 'short-number'; -import ReportData from '../types/reportData'; import { CriticalError } from '../../../lib/workerErrors'; import { HOURS_IN_DAY, MINUTES_IN_HOUR, MS_IN_SEC, SECONDS_IN_MINUTE } from '../../../lib/utils/consts'; import LimiterEvent, { CheckSingleWorkspaceEvent } from '../types/eventTypes'; - -/** - * Workspace with its tariff plan - */ -type WorkspaceWithTariffPlan = WorkspaceDBScheme & {tariffPlan: PlanDBScheme}; +import RedisHelper from './redisHelper'; +import { MultiplyWorkspacesAnalyzeReport, SingleWorkspaceAnalyzeReport } from '../types/reportData'; +import { WorkspaceWithTariffPlan } from '../types'; dotenv.config({ path: path.resolve(__dirname, '../.env') }); @@ -57,14 +53,9 @@ export default class LimiterWorker extends Worker { private workspacesCollection!: Collection; /** - * Redis client for making queries - */ - private readonly redisClient = redis.createClient({ url: process.env.REDIS_URL }); - - /** - * Redis key for storing banned projects + * Redis helper instance for modifying data through redis */ - private readonly redisDisabledProjectsKey = 'DisabledProjectsSet'; + private redis = new RedisHelper(); /** * Start consuming messages @@ -93,16 +84,12 @@ export default class LimiterWorker extends Worker { * @param event - worker event to handle */ public async handle(event: LimiterEvent): Promise { - this.logger.info('Limiter worker started task'); - switch (event.type) { case 'check-single-workspace': return this.handleCheckSingleWorkspaceEvent(event); case 'regular-workspaces-check': return this.handleRegularWorkspacesCheck(); } - - this.logger.info('Limiter worker finished task'); } /** @@ -111,21 +98,27 @@ export default class LimiterWorker extends Worker { * @param event - event to handle */ private async handleCheckSingleWorkspaceEvent(event: CheckSingleWorkspaceEvent): Promise { + this.logger.info('Limiter worker started checking workspace with id ' + event.workspaceId); + const workspace = await this.getWorkspaceWithTariffPlan(event.workspaceId); const workspaceProjects = await this.getProjects(event.workspaceId); const workspaceProjectsIds = workspaceProjects.map(p => p._id.toString()); const report = await this.analyzeWorkspaceData(workspace, workspaceProjects); - // await this.updateWorkspacesEventsCount([ report.updatedWorkspace ]); - // - // if (report.isBanned) { - // await this.appendBannedProjectsToRedis(workspaceProjectsIds); - // } else { - // await this.removeBannedProjectsFromRedis(workspaceProjectsIds); - // } - // - // await this.sendSingleWorkspacesCheckReport(report); + await this.updateWorkspacesEventsCount([ report.updatedWorkspace ]); + + if (report.isBanned) { + await this.redis.appendBannedProjects(workspaceProjectsIds); + } else { + await this.redis.removeBannedProjects(workspaceProjectsIds); + } + + await this.sendSingleWorkspacesCheckReport(report); + this.logger.info( + `Limiter worker finished workspace checking. + Workspace with id ${event.workspaceId} was ${report.isBanned ? 'banned' : 'unbanned'}` + ); } /** @@ -133,19 +126,23 @@ export default class LimiterWorker extends Worker { * and limits events receiving if workspace exceed the limit */ private async handleRegularWorkspacesCheck(): Promise { + this.logger.info('Limiter worker started regular check'); + const report = await this.analyzeWorkspacesLimits(); await this.updateWorkspacesEventsCount(report.updatedWorkspaces); - await this.saveToRedis(report.bannedProjectIds); + await this.redis.saveBannedProjectsSet(report.bannedProjectIds); await this.sendRegularWorkspacesCheckReport(report); + + this.logger.info('Limiter worker finished task'); } /** * Checks which workspaces reached the limit and return them along with their projects ids. * Also, updates workspace current event count in db. */ - private async analyzeWorkspacesLimits(): Promise { + private async analyzeWorkspacesLimits(): Promise { const bannedWorkspaces: WorkspaceWithTariffPlan[] = []; const updatedWorkspaces: WorkspaceWithTariffPlan[] = []; const bannedProjectIds: string[] = []; @@ -188,38 +185,6 @@ export default class LimiterWorker extends Worker { .then(sum); } - /** - * Saves banned project ids to redis - * If there is no projects, then previous data in Redis will be erased - * - * @param projectIdsToBan - ids to ban - */ - private saveToRedis(projectIdsToBan: string[]): Promise { - return new Promise((resolve, reject) => { - const callback = (execError: Error|null): void => { - if (execError) { - this.logger.error(execError); - HawkCatcher.send(execError); - - reject(execError); - - return; - } - this.logger.info('Successfully saved to Redis'); - resolve(); - }; - - if (projectIdsToBan.length) { - this.redisClient.multi() - .del(this.redisDisabledProjectsKey) - .sadd(this.redisDisabledProjectsKey, projectIdsToBan) - .exec(callback); - } else { - this.redisClient.del(this.redisDisabledProjectsKey, callback); - } - }); - } - /** * Returns all projects from Database or projects of the specified workspace * @@ -332,13 +297,7 @@ export default class LimiterWorker extends Worker { * * @param reportData - data for sending notification after task handling */ - private async sendRegularWorkspacesCheckReport(reportData: ReportData): Promise { - if (!process.env.REPORT_NOTIFY_URL) { - this.logger.error('Can\'t send report because REPORT_NOTIFY_URL not provided'); - - return; - } - + private async sendRegularWorkspacesCheckReport(reportData: MultiplyWorkspacesAnalyzeReport): Promise { let report = process.env.SERVER_NAME ? ` Hawk Limiter (${process.env.SERVER_NAME}) 🚧\n` : ' Hawk Limiter 🚧\n'; if (reportData.bannedWorkspaces.length) { @@ -355,21 +314,18 @@ export default class LimiterWorker extends Worker { report += `\n\n${reportData.bannedWorkspaces.length} workspaces with ${reportData.bannedProjectIds.length} projects totally banned`; - await axios({ - method: 'post', - url: process.env.REPORT_NOTIFY_URL, - data: 'message=' + report + '&parse_mode=HTML', - }); + await this.sendReport(report); } /** - * @param workspace - * @param projects + * Analyses workspace data and gives a report about events limit + * + * @param workspace - workspace data to check + * @param projects - workspaces projects */ - private async analyzeWorkspaceData(workspace: WorkspaceWithTariffPlan, projects: ProjectDBScheme[]): Promise<{ - isBanned: boolean; - updatedWorkspace: WorkspaceWithTariffPlan - }> { + private async analyzeWorkspaceData( + workspace: WorkspaceWithTariffPlan, projects: ProjectDBScheme[] + ): Promise { /** * If last charge date is not specified, then we skip checking it * In the next time the Paymaster worker starts, it will set lastChargeDate for this workspace @@ -415,4 +371,46 @@ export default class LimiterWorker extends Worker { await this.workspacesCollection.bulkWrite(operations); } + + /** + * Sends notification to the chat about result of the workspace checking + * + * @param reportData - report data for generating notification + */ + private async sendSingleWorkspacesCheckReport(reportData: SingleWorkspaceAnalyzeReport): Promise { + const workspace = reportData.updatedWorkspace; + const timeFromLastChargeDate = Date.now() - new Date(workspace.lastChargeDate).getTime(); + + const millisecondsInDay = HOURS_IN_DAY * MINUTES_IN_HOUR * SECONDS_IN_MINUTE * MS_IN_SEC; + const timeInDays = Math.floor(timeFromLastChargeDate / millisecondsInDay); + + const reportString = ` +Hawk Limiter ${process.env.SERVER_NAME ? `(${process.env.SERVER_NAME})` : ''} 🚧 + +Check workspace ${encodeURIComponent(workspace.name)} | ${workspace._id} +Workspace has ${shortNumber(workspace.billingPeriodEventsCount)} events in ${timeInDays} days. Limit is ${workspace.tariffPlan.eventsLimit} +Workspace was ${reportData.isBanned ? 'blocked' : 'unblocked'} +`; + + await this.sendReport(reportString); + } + + /** + * Sends notify to the chat + * + * @param reportData - report notify in HTML markup to send + */ + private async sendReport(reportData: string): Promise { + if (!process.env.REPORT_NOTIFY_URL) { + this.logger.error('Can\'t send report because REPORT_NOTIFY_URL not provided'); + + return; + } + + await axios({ + method: 'post', + url: process.env.REPORT_NOTIFY_URL, + data: 'message=' + reportData + '&parse_mode=HTML', + }); + } } diff --git a/workers/limiter/src/redisHelper.ts b/workers/limiter/src/redisHelper.ts new file mode 100644 index 00000000..058a487d --- /dev/null +++ b/workers/limiter/src/redisHelper.ts @@ -0,0 +1,96 @@ +import HawkCatcher from '@hawk.so/nodejs'; +import redis from 'redis'; +import createLogger from '../../../lib/logger'; + +/** + * Class with helper functions for working with Redis + */ +export default class RedisHelper { + /** + * Redis client for making queries + */ + private readonly redisClient = redis.createClient({ url: process.env.REDIS_URL }); + + /** + * Logger instance + * (default level='info') + */ + private logger = createLogger(); + + /** + * Redis key for storing banned projects + */ + private readonly redisDisabledProjectsKey = 'DisabledProjectsSet'; + + /** + * Saves banned project ids to redis + * If there is no projects, then previous data in Redis will be erased + * + * @param projectIdsToBan - ids to ban + */ + public saveBannedProjectsSet(projectIdsToBan: string[]): Promise { + return new Promise((resolve, reject) => { + const callback = this.createCallback(resolve, reject); + + if (projectIdsToBan.length) { + this.redisClient.multi() + .del(this.redisDisabledProjectsKey) + .sadd(this.redisDisabledProjectsKey, projectIdsToBan) + .exec(callback); + } else { + this.redisClient.del(this.redisDisabledProjectsKey, callback); + } + }); + } + + /** + * Add new banned projects to the set + * + * @param projectIds - project ids to append + */ + public appendBannedProjects(projectIds: string[]): Promise { + return new Promise((resolve, reject) => { + const callback = this.createCallback(resolve, reject); + + if (projectIds.length) { + this.redisClient.sadd(this.redisDisabledProjectsKey, projectIds, callback); + } + }); + } + + /** + * Removes projects ids from set + * + * @param projectIds - project ids to remove + */ + public removeBannedProjects(projectIds: string[]): Promise { + return new Promise((resolve, reject) => { + const callback = this.createCallback(resolve, reject); + + if (projectIds.length) { + this.redisClient.srem(this.redisDisabledProjectsKey, projectIds, callback); + } + }); + } + + /** + * Creates callback function for Redis operations + * + * @param resolve - callback that will be called if no errors occurred + * @param reject - callback that will be called any error occurred + */ + private createCallback(resolve: () => void, reject: (reason?: unknown) => void) { + return (execError: Error | null): void => { + if (execError) { + this.logger.error(execError); + HawkCatcher.send(execError); + + reject(execError); + + return; + } + this.logger.info('Successfully saved to Redis'); + resolve(); + }; + } +} diff --git a/workers/limiter/types/index.ts b/workers/limiter/types/index.ts new file mode 100644 index 00000000..a47fa325 --- /dev/null +++ b/workers/limiter/types/index.ts @@ -0,0 +1,7 @@ +import {PlanDBScheme, WorkspaceDBScheme} from "hawk.types"; + +/** + * Workspace with its tariff plan + */ +export type WorkspaceWithTariffPlan = WorkspaceDBScheme & {tariffPlan: PlanDBScheme}; + diff --git a/workers/limiter/types/reportData.ts b/workers/limiter/types/reportData.ts index 7b11a080..17e872fa 100644 --- a/workers/limiter/types/reportData.ts +++ b/workers/limiter/types/reportData.ts @@ -1,16 +1,26 @@ -import { WorkspaceDBScheme } from 'hawk.types'; +import { WorkspaceWithTariffPlan } from './index'; /** * Data for sending notification after task handling */ -export default interface ReportData { +export interface SingleWorkspaceAnalyzeReport { + isBanned: boolean; + updatedWorkspace: WorkspaceWithTariffPlan +} + +/** + * Data for sending notification after task handling + */ +export interface MultiplyWorkspacesAnalyzeReport { /** * Banned workspaces data */ - bannedWorkspaces: WorkspaceDBScheme[]; + bannedWorkspaces: WorkspaceWithTariffPlan[]; /** * Projects ids to ban */ bannedProjectIds: string[]; + + updatedWorkspaces: WorkspaceWithTariffPlan[] } From a66fa0b858a19d3a7c17b32de9c29fad13f1b999 Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 16:51:06 +0300 Subject: [PATCH 05/12] fix error if no workspace with provided id were found --- workers/limiter/src/index.ts | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 88c3d2e3..75d97f71 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -101,6 +101,13 @@ export default class LimiterWorker extends Worker { this.logger.info('Limiter worker started checking workspace with id ' + event.workspaceId); const workspace = await this.getWorkspaceWithTariffPlan(event.workspaceId); + + if (!workspace) { + this.logger.info(`No workspace with id ${event.workspaceId}. Finishing task`); + + return; + } + const workspaceProjects = await this.getProjects(event.workspaceId); const workspaceProjectsIds = workspaceProjects.map(p => p._id.toString()); @@ -116,8 +123,7 @@ export default class LimiterWorker extends Worker { await this.sendSingleWorkspacesCheckReport(report); this.logger.info( - `Limiter worker finished workspace checking. - Workspace with id ${event.workspaceId} was ${report.isBanned ? 'banned' : 'unbanned'}` + `Limiter worker finished workspace checking. Workspace with id ${event.workspaceId} was ${report.isBanned ? 'banned' : 'unbanned'}` ); } @@ -379,17 +385,15 @@ export default class LimiterWorker extends Worker { */ private async sendSingleWorkspacesCheckReport(reportData: SingleWorkspaceAnalyzeReport): Promise { const workspace = reportData.updatedWorkspace; - const timeFromLastChargeDate = Date.now() - new Date(workspace.lastChargeDate).getTime(); - - const millisecondsInDay = HOURS_IN_DAY * MINUTES_IN_HOUR * SECONDS_IN_MINUTE * MS_IN_SEC; - const timeInDays = Math.floor(timeFromLastChargeDate / millisecondsInDay); const reportString = ` Hawk Limiter ${process.env.SERVER_NAME ? `(${process.env.SERVER_NAME})` : ''} 🚧 -Check workspace ${encodeURIComponent(workspace.name)} | ${workspace._id} -Workspace has ${shortNumber(workspace.billingPeriodEventsCount)} events in ${timeInDays} days. Limit is ${workspace.tariffPlan.eventsLimit} -Workspace was ${reportData.isBanned ? 'blocked' : 'unblocked'} +${encodeURIComponent(workspace.name)} wants to be unblocked + +It has ${shortNumber(workspace.billingPeriodEventsCount)} events of ${workspace.tariffPlan.eventsLimit}. Last charge date: ${workspace.lastChargeDate.toISOString()} + +${reportData.isBanned ? 'Blocked ❌' : 'Unblocked ✅'} `; await this.sendReport(reportString); From bb6968070f19b129610344b3562cb91bbd6a1a63 Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 16:53:55 +0300 Subject: [PATCH 06/12] update docs --- workers/limiter/types/reportData.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/workers/limiter/types/reportData.ts b/workers/limiter/types/reportData.ts index 17e872fa..fcf6b45b 100644 --- a/workers/limiter/types/reportData.ts +++ b/workers/limiter/types/reportData.ts @@ -4,7 +4,14 @@ import { WorkspaceWithTariffPlan } from './index'; * Data for sending notification after task handling */ export interface SingleWorkspaceAnalyzeReport { + /** + * Is workspace get banned + */ isBanned: boolean; + + /** + * Workspace with updated data (current events count) + */ updatedWorkspace: WorkspaceWithTariffPlan } @@ -22,5 +29,8 @@ export interface MultiplyWorkspacesAnalyzeReport { */ bannedProjectIds: string[]; + /** + * Array of workspaces with updated fields + */ updatedWorkspaces: WorkspaceWithTariffPlan[] } From 469b2ca4672a5f9a23dd10f25a168c27f3abba2f Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 16:59:08 +0300 Subject: [PATCH 07/12] fixes --- lib/logger.ts | 2 +- workers/limiter/src/index.ts | 22 ++++++++++++++-------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/lib/logger.ts b/lib/logger.ts index d64b38ca..a97b9694 100644 --- a/lib/logger.ts +++ b/lib/logger.ts @@ -1,7 +1,7 @@ import { createLogger as createWinstonLogger, format, transports, Logger } from 'winston'; /** - * + * Creates new logger instance */ export default function createLogger(): Logger { return createWinstonLogger({ diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 75d97f71..0cd3ee59 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -160,13 +160,17 @@ export default class LimiterWorker extends Worker { await asyncForEach(workspacesWithTariffPlans, async workspace => { const workspaceProjects = projects.filter(p => p.workspaceId.toString() === workspace._id.toString()); - const { isBanned, updatedWorkspace } = await this.analyzeWorkspaceData(workspace, workspaceProjects); - - if (isBanned) { - bannedProjectIds.push(...workspaceProjects.map(p => p._id.toString())); - bannedWorkspaces.push(updatedWorkspace); + try { + const { isBanned, updatedWorkspace } = await this.analyzeWorkspaceData(workspace, workspaceProjects); + + if (isBanned) { + bannedProjectIds.push(...workspaceProjects.map(p => p._id.toString())); + bannedWorkspaces.push(updatedWorkspace); + } + updatedWorkspaces.push(updatedWorkspace); + } catch (e) { + this.logger.error(e); } - updatedWorkspaces.push(updatedWorkspace); }); return { @@ -338,11 +342,13 @@ export default class LimiterWorker extends Worker { * and limiter will process it successfully */ if (!workspace.lastChargeDate) { - HawkCatcher.send(new Error('Workspace without lastChargeDate detected'), { + const error = new Error('Workspace without lastChargeDate detected'); + + HawkCatcher.send(error, { workspaceId: workspace._id, }); - return; + throw error; } const since = Math.floor(new Date(workspace.lastChargeDate).getTime() / MS_IN_SEC); From bddb832b372ef4f9c5e7578a755c78d8fbfa7cea Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 17:06:07 +0300 Subject: [PATCH 08/12] update docs --- workers/limiter/README.md | 18 ++++++++++++++++++ workers/limiter/src/index.ts | 2 +- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/workers/limiter/README.md b/workers/limiter/README.md index 3f018574..8489427a 100644 --- a/workers/limiter/README.md +++ b/workers/limiter/README.md @@ -10,4 +10,22 @@ If so, the Limiter puts information to Redis about the workspace's projects for 3. `yarn install` 4. `yarn run-limiter` +## Event types +### Regular check +Serves to check current total events count in workspaces and blocks events receiving if workspace exceed the limit. +Event shape: +```json +{ + "type":"regular-workspaces-check" +} +``` + +### Single workspace check +Serves to check single workspace by id. Blocks workspace if the event limit is exceeded and unblocks if not. +```json +{ + "type": "check-single-workspace", + "workspaceId":"5e4ff30a628a6c73a415f4d5" +} +``` diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 0cd3ee59..868c07ce 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -128,7 +128,7 @@ export default class LimiterWorker extends Worker { } /** - * Handles event for for checking current total events count in workspaces + * Handles event for checking current total events count in workspaces * and limits events receiving if workspace exceed the limit */ private async handleRegularWorkspacesCheck(): Promise { From 48bf64474755cb438ff2296d547b48cc2ef819e9 Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 17:25:14 +0300 Subject: [PATCH 09/12] fix linter errors --- lib/worker.ts | 2 +- workers/limiter/types/index.ts | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/worker.ts b/lib/worker.ts index 992f8744..15438ebe 100644 --- a/lib/worker.ts +++ b/lib/worker.ts @@ -5,7 +5,7 @@ import { CriticalError, NonCriticalError, ParsingError } from './workerErrors'; import { MongoError } from 'mongodb'; import HawkCatcher from '@hawk.so/nodejs'; import CacheController from '../lib/cache/controller'; -import createLogger from "./logger"; +import createLogger from './logger'; /** * Base worker class for processing tasks diff --git a/workers/limiter/types/index.ts b/workers/limiter/types/index.ts index a47fa325..0407a095 100644 --- a/workers/limiter/types/index.ts +++ b/workers/limiter/types/index.ts @@ -1,7 +1,6 @@ -import {PlanDBScheme, WorkspaceDBScheme} from "hawk.types"; +import { PlanDBScheme, WorkspaceDBScheme } from 'hawk.types'; /** * Workspace with its tariff plan */ export type WorkspaceWithTariffPlan = WorkspaceDBScheme & {tariffPlan: PlanDBScheme}; - From 40ea82a722eec73690b71b35ffb81a8cb53d7b67 Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 17:35:05 +0300 Subject: [PATCH 10/12] add tests --- workers/limiter/tests/index.test.ts | 525 +++++++++++++++++----------- 1 file changed, 330 insertions(+), 195 deletions(-) diff --git a/workers/limiter/tests/index.test.ts b/workers/limiter/tests/index.test.ts index fca867e5..7e4d3256 100644 --- a/workers/limiter/tests/index.test.ts +++ b/workers/limiter/tests/index.test.ts @@ -7,12 +7,17 @@ import { mockedPlans } from './plans.mock'; import axios from 'axios'; import { mocked } from 'ts-jest/utils'; import { MS_IN_SEC } from '../../../lib/utils/consts'; +import { RegularWorkspacesCheckEvent } from '../types/eventTypes'; /** * Mock axios for testing report sends */ jest.mock('axios'); +const REGULAR_WORKSPACES_CHECK_EVENT: RegularWorkspacesCheckEvent = { + type: 'regular-workspaces-check', +}; + /** * Constant of last charge date in all workspaces for tests */ @@ -126,236 +131,366 @@ describe('Limiter worker', () => { await planCollection.insertMany(Object.values(mockedPlans)); }); - test('Should count workspace events for a billing period and save it to the db', async () => { - /** - * Arrange - */ - const workspace = createWorkspaceMock({ - plan: mockedPlans.eventsLimit10, - billingPeriodEventsCount: 0, - lastChargeDate: LAST_CHARGE_DATE, - }); - const project = createProjectMock({ workspaceId: workspace._id }); - const eventsCollection = db.collection(`events:${project._id.toString()}`); - const repetitionsCollection = db.collection(`repetitions:${project._id.toString()}`); - - await fillDatabaseWithMockedData({ - workspace, - project, - eventsToMock: 5, - }); - - /** - * Act - * - * Worker initialization - */ - const worker = new LimiterWorker(); + describe('regular-workspaces-check', () => { + test('Should count workspace events for a billing period and save it to the db', async () => { + /** + * Arrange + */ + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + const eventsCollection = db.collection(`events:${project._id.toString()}`); + const repetitionsCollection = db.collection(`repetitions:${project._id.toString()}`); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 5, + }); - await worker.start(); - await worker.handle(); - await worker.finish(); + /** + * Act + * + * Worker initialization + */ + const worker = new LimiterWorker(); - /** - * Assert - */ - const workspaceInDatabase = await workspaceCollection.findOne({ - _id: workspace._id, - }); + await worker.start(); + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); + await worker.finish(); - /** - * Count events and repetitions since last charge date - */ - const since = Math.floor(new Date(workspace.lastChargeDate).getTime() / MS_IN_SEC); - const query = { - 'payload.timestamp': { - $gt: since, - }, - }; - const repetitionsCount = await repetitionsCollection.find(query).count(); - const eventsCount = await eventsCollection.find(query).count(); + /** + * Assert + */ + const workspaceInDatabase = await workspaceCollection.findOne({ + _id: workspace._id, + }); - /** - * Check count of events - */ - expect(workspaceInDatabase.billingPeriodEventsCount).toEqual(repetitionsCount + eventsCount); - }); + /** + * Count events and repetitions since last charge date + */ + const since = Math.floor(new Date(workspace.lastChargeDate).getTime() / MS_IN_SEC); + const query = { + 'payload.timestamp': { + $gt: since, + }, + }; + const repetitionsCount = await repetitionsCollection.find(query).count(); + const eventsCount = await eventsCollection.find(query).count(); - test('Should ban projects that have exceeded the plan limit and add their ids to redis', async (done) => { - /** - * Arrange - */ - const workspace = createWorkspaceMock({ - plan: mockedPlans.eventsLimit10, - billingPeriodEventsCount: 0, - lastChargeDate: LAST_CHARGE_DATE, + /** + * Check count of events + */ + expect(workspaceInDatabase.billingPeriodEventsCount).toEqual(repetitionsCount + eventsCount); }); - const project = createProjectMock({ workspaceId: workspace._id }); - await fillDatabaseWithMockedData({ - workspace, - project, - eventsToMock: 15, - }); + test('Should ban projects that have exceeded the plan limit and add their ids to redis', async (done) => { + /** + * Arrange + */ + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 15, + }); - /** - * Act - * - * Worker initialization - */ - const worker = new LimiterWorker(); + /** + * Act + * + * Worker initialization + */ + const worker = new LimiterWorker(); - await worker.start(); - await worker.handle(); - await worker.finish(); + await worker.start(); + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); + await worker.finish(); - /** - * Assert - * - * Gets all members of set with key 'DisabledProjectsSet' from Redis - */ - redisClient.smembers('DisabledProjectsSet', (err, result) => { - expect(err).toBeNull(); - expect(result).toContain(project._id.toString()); - done(); + /** + * Assert + * + * Gets all members of set with key 'DisabledProjectsSet' from Redis + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + expect(result).toContain(project._id.toString()); + done(); + }); }); - }); - test('Should unban previously banned projects if the limit allows', async (done) => { - /** - * Arrange - */ - const workspace = createWorkspaceMock({ - plan: mockedPlans.eventsLimit10, - billingPeriodEventsCount: 0, - lastChargeDate: LAST_CHARGE_DATE, - }); - const project = createProjectMock({ workspaceId: workspace._id }); + test('Should unban previously banned projects if the limit allows', async (done) => { + /** + * Arrange + */ + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 15, + }); - await fillDatabaseWithMockedData({ - workspace, - project, - eventsToMock: 15, - }); + /** + * Act + * + * Worker initialization + */ + const worker = new LimiterWorker(); - /** - * Act - * - * Worker initialization - */ - const worker = new LimiterWorker(); + await worker.start(); + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); - await worker.start(); - await worker.handle(); + /** + * Gets all members of set with key 'DisabledProjectsSet' from Redis + * Project should be banned + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + expect(result).toContain(project._id.toString()); + }); - /** - * Gets all members of set with key 'DisabledProjectsSet' from Redis - * Project should be banned - */ - redisClient.smembers('DisabledProjectsSet', (err, result) => { - expect(err).toBeNull(); - expect(result).toContain(project._id.toString()); - }); + /** + * Change workspace plan to plan with big events limit + */ + await workspaceCollection.findOneAndUpdate({ _id: workspace._id }, { + $set: { + tariffPlanId: mockedPlans.eventsLimit10000._id, + }, + }); - /** - * Change workspace plan to plan with big events limit - */ - await workspaceCollection.findOneAndUpdate({ _id: workspace._id }, { - $set: { - tariffPlanId: mockedPlans.eventsLimit10000._id, - }, + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); + await worker.finish(); + + /** + * Assert + * + * Gets all members of set with key 'DisabledProjectsSet' from Redis + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + expect(result).not.toContain(project._id.toString()); + done(); + }); }); - await worker.handle(); - await worker.finish(); + test('Should not ban project if it does not reach the limit', async (done) => { + /** + * Arrange + */ + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10000, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 100, + }); - /** - * Assert - * - * Gets all members of set with key 'DisabledProjectsSet' from Redis - */ - redisClient.smembers('DisabledProjectsSet', (err, result) => { - expect(err).toBeNull(); - expect(result).not.toContain(project._id.toString()); - done(); - }); - }); + /** + * Act + * + * Worker initialization + */ + const worker = new LimiterWorker(); - test('Should not ban project if it does not reach the limit', async (done) => { - /** - * Arrange - */ - const workspace = createWorkspaceMock({ - plan: mockedPlans.eventsLimit10000, - billingPeriodEventsCount: 0, - lastChargeDate: LAST_CHARGE_DATE, - }); - const project = createProjectMock({ workspaceId: workspace._id }); + await worker.start(); + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); + await worker.finish(); - await fillDatabaseWithMockedData({ - workspace, - project, - eventsToMock: 100, + /** + * Assert + * + * Gets all members of set with key 'DisabledProjectsSet' from Redis + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + + /** + * Redis shouldn't contain id of project 'Test project #2' from 'Test workspace #2' + */ + expect(result).not.toContain(project._id.toString()); + done(); + }); }); - /** - * Act - * - * Worker initialization - */ - const worker = new LimiterWorker(); + test('Should send a report with collected data', async () => { + /** + * Arrange + * + * Worker initialization + */ + const worker = new LimiterWorker(); - await worker.start(); - await worker.handle(); - await worker.finish(); + mocked(axios).mockResolvedValue({ + data: {}, + status: 200, + statusText: 'OK', + config: {}, + headers: {}, + }); - /** - * Assert - * - * Gets all members of set with key 'DisabledProjectsSet' from Redis - */ - redisClient.smembers('DisabledProjectsSet', (err, result) => { - expect(err).toBeNull(); + /** + * Act + */ + await worker.start(); + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); + await worker.finish(); /** - * Redis shouldn't contain id of project 'Test project #2' from 'Test workspace #2' + * Assert */ - expect(result).not.toContain(project._id.toString()); - done(); + expect(axios).toHaveBeenCalled(); + expect(axios).toHaveBeenCalledWith({ + method: 'post', + url: process.env.REPORT_NOTIFY_URL, + data: expect.any(String), + }); }); }); - test('Should send a report with collected data', async () => { - /** - * Arrange - * - * Worker initialization - */ - const worker = new LimiterWorker(); - - mocked(axios).mockResolvedValue({ - data: {}, - status: 200, - statusText: 'OK', - config: {}, - headers: {}, + describe('check-single-workspace', () => { + test('Should unblock workspace if the number of events does not exceed the limit', async (done) => { + /** + * Arrange + * + * Worker initialization + */ + const worker = new LimiterWorker(); + + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10000, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 100, + }); + + mocked(axios).mockResolvedValue({ + data: {}, + status: 200, + statusText: 'OK', + config: {}, + headers: {}, + }); + + /** + * Act + */ + await worker.start(); + await worker.handle({ + type: 'check-single-workspace', + workspaceId: workspace._id.toString(), + }); + await worker.finish(); + + /** + * Assert + */ + expect(axios).toHaveBeenCalled(); + expect(axios).toHaveBeenCalledWith({ + method: 'post', + url: process.env.REPORT_NOTIFY_URL, + data: expect.any(String), + }); + + /** + * Gets all members of set with key 'DisabledProjectsSet' from Redis + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + + /** + * Redis shouldn't contain id of project 'Test project #2' from 'Test workspace #2' + */ + expect(result).not.toContain(project._id.toString()); + done(); + }); }); - /** - * Act - */ - await worker.start(); - await worker.handle(); - await worker.finish(); + test('Should block workspace if the number of events exceed the limit', async (done) => { + /** + * Arrange + * + * Worker initialization + */ + const worker = new LimiterWorker(); + + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 100, + }); + + mocked(axios).mockResolvedValue({ + data: {}, + status: 200, + statusText: 'OK', + config: {}, + headers: {}, + }); - /** - * Assert - */ - expect(axios).toHaveBeenCalled(); - expect(axios).toHaveBeenCalledWith({ - method: 'post', - url: process.env.REPORT_NOTIFY_URL, - data: expect.any(String), + /** + * Act + */ + await worker.start(); + await worker.handle({ + type: 'check-single-workspace', + workspaceId: workspace._id.toString(), + }); + await worker.finish(); + + /** + * Assert + */ + expect(axios).toHaveBeenCalled(); + expect(axios).toHaveBeenCalledWith({ + method: 'post', + url: process.env.REPORT_NOTIFY_URL, + data: expect.any(String), + }); + + /** + * Gets all members of set with key 'DisabledProjectsSet' from Redis + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + + /** + * Redis shouldn't contain id of project 'Test project #2' from 'Test workspace #2' + */ + expect(result).toContain(project._id.toString()); + done(); + }); }); }); From 649125021e61c54a9b6fc299d947dd9f9d670afc Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 17:45:22 +0300 Subject: [PATCH 11/12] fix docs --- workers/limiter/README.md | 1 + workers/limiter/tests/index.test.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/workers/limiter/README.md b/workers/limiter/README.md index 8489427a..a0aba1b5 100644 --- a/workers/limiter/README.md +++ b/workers/limiter/README.md @@ -23,6 +23,7 @@ Event shape: ### Single workspace check Serves to check single workspace by id. Blocks workspace if the event limit is exceeded and unblocks if not. +Event shape: ```json { "type": "check-single-workspace", diff --git a/workers/limiter/tests/index.test.ts b/workers/limiter/tests/index.test.ts index 7e4d3256..9e871af4 100644 --- a/workers/limiter/tests/index.test.ts +++ b/workers/limiter/tests/index.test.ts @@ -486,7 +486,7 @@ describe('Limiter worker', () => { expect(err).toBeNull(); /** - * Redis shouldn't contain id of project 'Test project #2' from 'Test workspace #2' + * Redis shouldn't contain id of the mocked project */ expect(result).toContain(project._id.toString()); done(); From b3b7c314f76ea2ef8102d742ed61891d3efc732e Mon Sep 17 00:00:00 2001 From: Nikita Melnikov Date: Sat, 30 Jan 2021 18:13:03 +0300 Subject: [PATCH 12/12] fix README.md --- workers/limiter/README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/workers/limiter/README.md b/workers/limiter/README.md index a0aba1b5..3a035988 100644 --- a/workers/limiter/README.md +++ b/workers/limiter/README.md @@ -13,8 +13,11 @@ If so, the Limiter puts information to Redis about the workspace's projects for ## Event types ### Regular check + Serves to check current total events count in workspaces and blocks events receiving if workspace exceed the limit. + Event shape: + ```json { "type":"regular-workspaces-check" @@ -22,8 +25,11 @@ Event shape: ``` ### Single workspace check + Serves to check single workspace by id. Blocks workspace if the event limit is exceeded and unblocks if not. + Event shape: + ```json { "type": "check-single-workspace",