diff --git a/lib/logger.ts b/lib/logger.ts new file mode 100644 index 00000000..a97b9694 --- /dev/null +++ b/lib/logger.ts @@ -0,0 +1,20 @@ +import { createLogger as createWinstonLogger, format, transports, Logger } from 'winston'; + +/** + * Creates new logger instance + */ +export default function createLogger(): Logger { + return createWinstonLogger({ + level: process.env.LOG_LEVEL || 'info', + transports: [ + new transports.Console({ + format: format.combine( + format.timestamp(), + format.colorize(), + format.simple(), + format.printf((msg) => `${msg.timestamp} - ${msg.level}: ${msg.message}`) + ), + }), + ], + }); +} diff --git a/lib/worker.ts b/lib/worker.ts index 0d7bc67f..15438ebe 100644 --- a/lib/worker.ts +++ b/lib/worker.ts @@ -1,11 +1,11 @@ import * as amqp from 'amqplib'; import * as client from 'prom-client'; -import { createLogger, format, transports, Logger } from 'winston'; import { WorkerTask } from './types/worker-task'; import { CriticalError, NonCriticalError, ParsingError } from './workerErrors'; import { MongoError } from 'mongodb'; import HawkCatcher from '@hawk.so/nodejs'; import CacheController from '../lib/cache/controller'; +import createLogger from './logger'; /** * Base worker class for processing tasks @@ -46,19 +46,7 @@ export abstract class Worker { * Logger module * (default level='info') */ - protected logger: Logger = createLogger({ - level: process.env.LOG_LEVEL || 'info', - transports: [ - new transports.Console({ - format: format.combine( - format.timestamp(), - format.colorize(), - format.simple(), - format.printf((msg) => `${msg.timestamp} - ${msg.level}: ${msg.message}`) - ), - }), - ], - }); + protected logger = createLogger(); /** * Cache module. diff --git a/workers/limiter/README.md b/workers/limiter/README.md index 3f018574..3a035988 100644 --- a/workers/limiter/README.md +++ b/workers/limiter/README.md @@ -10,4 +10,29 @@ If so, the Limiter puts information to Redis about the workspace's projects for 3. `yarn install` 4. `yarn run-limiter` +## Event types +### Regular check + +Serves to check current total events count in workspaces and blocks events receiving if workspace exceed the limit. + +Event shape: + +```json +{ + "type":"regular-workspaces-check" +} +``` + +### Single workspace check + +Serves to check single workspace by id. Blocks workspace if the event limit is exceeded and unblocks if not. + +Event shape: + +```json +{ + "type": "check-single-workspace", + "workspaceId":"5e4ff30a628a6c73a415f4d5" +} +``` diff --git a/workers/limiter/src/index.ts b/workers/limiter/src/index.ts index 902895bb..868c07ce 100644 --- a/workers/limiter/src/index.ts +++ b/workers/limiter/src/index.ts @@ -2,22 +2,19 @@ import { DatabaseController } from '../../../lib/db/controller'; import { Worker } from '../../../lib/worker'; import * as pkg from '../package.json'; import asyncForEach from '../../../lib/utils/asyncForEach'; -import { Collection, Db } from 'mongodb'; +import { Collection, Db, ObjectId } from 'mongodb'; import * as path from 'path'; import * as dotenv from 'dotenv'; -import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from 'hawk.types'; -import redis from 'redis'; +import { ProjectDBScheme, WorkspaceDBScheme } from 'hawk.types'; import HawkCatcher from '@hawk.so/nodejs'; import axios from 'axios'; import shortNumber from 'short-number'; -import ReportData from '../types/reportData'; import { CriticalError } from '../../../lib/workerErrors'; import { HOURS_IN_DAY, MINUTES_IN_HOUR, MS_IN_SEC, SECONDS_IN_MINUTE } from '../../../lib/utils/consts'; - -/** - * Workspace with its tariff plan - */ -type WorkspaceWithTariffPlan = WorkspaceDBScheme & {tariffPlan: PlanDBScheme}; +import LimiterEvent, { CheckSingleWorkspaceEvent } from '../types/eventTypes'; +import RedisHelper from './redisHelper'; +import { MultiplyWorkspacesAnalyzeReport, SingleWorkspaceAnalyzeReport } from '../types/reportData'; +import { WorkspaceWithTariffPlan } from '../types'; dotenv.config({ path: path.resolve(__dirname, '../.env') }); @@ -56,14 +53,9 @@ export default class LimiterWorker extends Worker { private workspacesCollection!: Collection; /** - * Redis client for making queries - */ - private readonly redisClient = redis.createClient({ url: process.env.REDIS_URL }); - - /** - * Redis key for storing banned projects + * Redis helper instance for modifying data through redis */ - private readonly redisDisabledProjectsKey = 'DisabledProjectsSet'; + private redis = new RedisHelper(); /** * Start consuming messages @@ -88,18 +80,66 @@ export default class LimiterWorker extends Worker { /** * Task handling function + * + * @param event - worker event to handle + */ + public async handle(event: LimiterEvent): Promise { + switch (event.type) { + case 'check-single-workspace': + return this.handleCheckSingleWorkspaceEvent(event); + case 'regular-workspaces-check': + return this.handleRegularWorkspacesCheck(); + } + } + + /** + * Handles event for checking events count for specified workspace + * + * @param event - event to handle */ - public async handle(): Promise { - this.logger.info('Limiter worker started task'); + private async handleCheckSingleWorkspaceEvent(event: CheckSingleWorkspaceEvent): Promise { + this.logger.info('Limiter worker started checking workspace with id ' + event.workspaceId); - const { bannedWorkspaces, bannedProjectIds } = await this.getWorkspacesAndProjectsIdsToBan(); + const workspace = await this.getWorkspaceWithTariffPlan(event.workspaceId); - await this.saveToRedis(bannedProjectIds); + if (!workspace) { + this.logger.info(`No workspace with id ${event.workspaceId}. Finishing task`); - await this.sendReport({ - bannedWorkspaces, - bannedProjectIds, - }); + return; + } + + const workspaceProjects = await this.getProjects(event.workspaceId); + const workspaceProjectsIds = workspaceProjects.map(p => p._id.toString()); + + const report = await this.analyzeWorkspaceData(workspace, workspaceProjects); + + await this.updateWorkspacesEventsCount([ report.updatedWorkspace ]); + + if (report.isBanned) { + await this.redis.appendBannedProjects(workspaceProjectsIds); + } else { + await this.redis.removeBannedProjects(workspaceProjectsIds); + } + + await this.sendSingleWorkspacesCheckReport(report); + this.logger.info( + `Limiter worker finished workspace checking. Workspace with id ${event.workspaceId} was ${report.isBanned ? 'banned' : 'unbanned'}` + ); + } + + /** + * Handles event for checking current total events count in workspaces + * and limits events receiving if workspace exceed the limit + */ + private async handleRegularWorkspacesCheck(): Promise { + this.logger.info('Limiter worker started regular check'); + + const report = await this.analyzeWorkspacesLimits(); + + await this.updateWorkspacesEventsCount(report.updatedWorkspaces); + await this.redis.saveBannedProjectsSet(report.bannedProjectIds); + + await this.sendRegularWorkspacesCheckReport(report); this.logger.info('Limiter worker finished task'); } @@ -108,48 +148,35 @@ export default class LimiterWorker extends Worker { * Checks which workspaces reached the limit and return them along with their projects ids. * Also, updates workspace current event count in db. */ - private async getWorkspacesAndProjectsIdsToBan(): Promise { + private async analyzeWorkspacesLimits(): Promise { const bannedWorkspaces: WorkspaceWithTariffPlan[] = []; + const updatedWorkspaces: WorkspaceWithTariffPlan[] = []; const bannedProjectIds: string[] = []; const [projects, workspacesWithTariffPlans] = await Promise.all([ - this.getAllProjects(), + this.getProjects(), this.getWorkspacesWithTariffPlans(), ]); await asyncForEach(workspacesWithTariffPlans, async workspace => { const workspaceProjects = projects.filter(p => p.workspaceId.toString() === workspace._id.toString()); - /** - * If last charge date is not specified, then we skip checking it - * In the next time the Paymaster worker starts, it will set lastChargeDate for this workspace - * and limiter will process it successfully - */ - if (!workspace.lastChargeDate) { - HawkCatcher.send(new Error('Workspace without lastChargeDate detected'), { - workspaceId: workspace._id, - }); - - return; - } - - const since = Math.floor(new Date(workspace.lastChargeDate).getTime() / MS_IN_SEC); - - const workspaceEventsCount = await this.getEventsCountByProjects(workspaceProjects, since); - - await this.updateWorkspaceEventsCount(workspace, workspaceEventsCount); + try { + const { isBanned, updatedWorkspace } = await this.analyzeWorkspaceData(workspace, workspaceProjects); - if (workspace.tariffPlan.eventsLimit < workspaceEventsCount) { - bannedProjectIds.push(...workspaceProjects.map(p => p._id.toString())); - bannedWorkspaces.push({ - ...workspace, - billingPeriodEventsCount: workspaceEventsCount, - }); + if (isBanned) { + bannedProjectIds.push(...workspaceProjects.map(p => p._id.toString())); + bannedWorkspaces.push(updatedWorkspace); + } + updatedWorkspaces.push(updatedWorkspace); + } catch (e) { + this.logger.error(e); } }); return { bannedWorkspaces, bannedProjectIds, + updatedWorkspaces, }; } @@ -169,62 +196,56 @@ export default class LimiterWorker extends Worker { } /** - * Updates events counter during billing period for workspace - * - * @param workspace — workspace id for updating - * @param workspaceEventsCount - workspaces events count to set - */ - private async updateWorkspaceEventsCount(workspace: WorkspaceDBScheme, workspaceEventsCount: number): Promise { - await this.workspacesCollection.updateOne( - { _id: workspace._id }, - { $set: { billingPeriodEventsCount: workspaceEventsCount } } - ); - } - - /** - * Saves banned project ids to redis - * If there is no projects, then previous data in Redis will be erased + * Returns all projects from Database or projects of the specified workspace * - * @param projectIdsToBan - ids to ban + * @param [workspaceId] - workspace ids to fetch projects that belongs that workspace */ - private saveToRedis(projectIdsToBan: string[]): Promise { - return new Promise((resolve, reject) => { - const callback = (execError: Error|null): void => { - if (execError) { - this.logger.error(execError); - HawkCatcher.send(execError); - - reject(execError); + private getProjects(workspaceId?: string): Promise { + const query = workspaceId + ? { workspaceId: new ObjectId(workspaceId) } + : {}; - return; - } - this.logger.info('Successfully saved to Redis'); - resolve(); - }; - - if (projectIdsToBan.length) { - this.redisClient.multi() - .del(this.redisDisabledProjectsKey) - .sadd(this.redisDisabledProjectsKey, projectIdsToBan) - .exec(callback); - } else { - this.redisClient.del(this.redisDisabledProjectsKey, callback); - } - }); + return this.projectsCollection.find(query).toArray(); } /** - * Returns all projects from Database + * Returns array of workspaces with their tariff plans */ - private getAllProjects(): Promise { - return this.projectsCollection.find({}).toArray(); + private async getWorkspacesWithTariffPlans(): Promise { + return this.workspacesCollection.aggregate([ + { + $lookup: { + from: 'plans', + localField: 'tariffPlanId', + foreignField: '_id', + as: 'tariffPlan', + }, + }, + { + $unwind: { + path: '$tariffPlan', + }, + }, + { + $addFields: { + billingPeriodEventsCount: 0, + }, + }, + ]).toArray(); } /** - * Returns array of workspaces with their tariff plans + * Returns workspace with its tariff plan by its id + * + * @param id - workspace id */ - private async getWorkspacesWithTariffPlans(): Promise { - return this.workspacesCollection.aggregate([ + private async getWorkspaceWithTariffPlan(id: string): Promise { + const workspacesArray = await this.workspacesCollection.aggregate([ + { + $match: { + _id: new ObjectId(id), + }, + }, { $lookup: { from: 'plans', @@ -244,6 +265,8 @@ export default class LimiterWorker extends Worker { }, }, ]).toArray(); + + return workspacesArray.pop(); } /** @@ -284,13 +307,7 @@ export default class LimiterWorker extends Worker { * * @param reportData - data for sending notification after task handling */ - private async sendReport(reportData: ReportData): Promise { - if (!process.env.REPORT_NOTIFY_URL) { - this.logger.error('Can\'t send report because REPORT_NOTIFY_URL not provided'); - - return; - } - + private async sendRegularWorkspacesCheckReport(reportData: MultiplyWorkspacesAnalyzeReport): Promise { let report = process.env.SERVER_NAME ? ` Hawk Limiter (${process.env.SERVER_NAME}) 🚧\n` : ' Hawk Limiter 🚧\n'; if (reportData.bannedWorkspaces.length) { @@ -307,10 +324,103 @@ export default class LimiterWorker extends Worker { report += `\n\n${reportData.bannedWorkspaces.length} workspaces with ${reportData.bannedProjectIds.length} projects totally banned`; + await this.sendReport(report); + } + + /** + * Analyses workspace data and gives a report about events limit + * + * @param workspace - workspace data to check + * @param projects - workspaces projects + */ + private async analyzeWorkspaceData( + workspace: WorkspaceWithTariffPlan, projects: ProjectDBScheme[] + ): Promise { + /** + * If last charge date is not specified, then we skip checking it + * In the next time the Paymaster worker starts, it will set lastChargeDate for this workspace + * and limiter will process it successfully + */ + if (!workspace.lastChargeDate) { + const error = new Error('Workspace without lastChargeDate detected'); + + HawkCatcher.send(error, { + workspaceId: workspace._id, + }); + + throw error; + } + const since = Math.floor(new Date(workspace.lastChargeDate).getTime() / MS_IN_SEC); + + const workspaceEventsCount = await this.getEventsCountByProjects(projects, since); + const updatedWorkspace = { + ...workspace, + billingPeriodEventsCount: workspaceEventsCount, + }; + + return { + isBanned: workspace.tariffPlan.eventsLimit < workspaceEventsCount, + updatedWorkspace, + }; + } + + /** + * Updates workspaces data in Database + * + * @param workspaces - workspaces data to update + */ + private async updateWorkspacesEventsCount(workspaces: WorkspaceDBScheme[]): Promise { + const operations = workspaces.map(workspace => { + return { + updateOne: { + filter: { + _id: workspace._id, + }, + update: { $set: { billingPeriodEventsCount: workspace.billingPeriodEventsCount } }, + }, + }; + }); + + await this.workspacesCollection.bulkWrite(operations); + } + + /** + * Sends notification to the chat about result of the workspace checking + * + * @param reportData - report data for generating notification + */ + private async sendSingleWorkspacesCheckReport(reportData: SingleWorkspaceAnalyzeReport): Promise { + const workspace = reportData.updatedWorkspace; + + const reportString = ` +Hawk Limiter ${process.env.SERVER_NAME ? `(${process.env.SERVER_NAME})` : ''} 🚧 + +${encodeURIComponent(workspace.name)} wants to be unblocked + +It has ${shortNumber(workspace.billingPeriodEventsCount)} events of ${workspace.tariffPlan.eventsLimit}. Last charge date: ${workspace.lastChargeDate.toISOString()} + +${reportData.isBanned ? 'Blocked ❌' : 'Unblocked ✅'} +`; + + await this.sendReport(reportString); + } + + /** + * Sends notify to the chat + * + * @param reportData - report notify in HTML markup to send + */ + private async sendReport(reportData: string): Promise { + if (!process.env.REPORT_NOTIFY_URL) { + this.logger.error('Can\'t send report because REPORT_NOTIFY_URL not provided'); + + return; + } + await axios({ method: 'post', url: process.env.REPORT_NOTIFY_URL, - data: 'message=' + report + '&parse_mode=HTML', + data: 'message=' + reportData + '&parse_mode=HTML', }); } } diff --git a/workers/limiter/src/redisHelper.ts b/workers/limiter/src/redisHelper.ts new file mode 100644 index 00000000..058a487d --- /dev/null +++ b/workers/limiter/src/redisHelper.ts @@ -0,0 +1,96 @@ +import HawkCatcher from '@hawk.so/nodejs'; +import redis from 'redis'; +import createLogger from '../../../lib/logger'; + +/** + * Class with helper functions for working with Redis + */ +export default class RedisHelper { + /** + * Redis client for making queries + */ + private readonly redisClient = redis.createClient({ url: process.env.REDIS_URL }); + + /** + * Logger instance + * (default level='info') + */ + private logger = createLogger(); + + /** + * Redis key for storing banned projects + */ + private readonly redisDisabledProjectsKey = 'DisabledProjectsSet'; + + /** + * Saves banned project ids to redis + * If there is no projects, then previous data in Redis will be erased + * + * @param projectIdsToBan - ids to ban + */ + public saveBannedProjectsSet(projectIdsToBan: string[]): Promise { + return new Promise((resolve, reject) => { + const callback = this.createCallback(resolve, reject); + + if (projectIdsToBan.length) { + this.redisClient.multi() + .del(this.redisDisabledProjectsKey) + .sadd(this.redisDisabledProjectsKey, projectIdsToBan) + .exec(callback); + } else { + this.redisClient.del(this.redisDisabledProjectsKey, callback); + } + }); + } + + /** + * Add new banned projects to the set + * + * @param projectIds - project ids to append + */ + public appendBannedProjects(projectIds: string[]): Promise { + return new Promise((resolve, reject) => { + const callback = this.createCallback(resolve, reject); + + if (projectIds.length) { + this.redisClient.sadd(this.redisDisabledProjectsKey, projectIds, callback); + } + }); + } + + /** + * Removes projects ids from set + * + * @param projectIds - project ids to remove + */ + public removeBannedProjects(projectIds: string[]): Promise { + return new Promise((resolve, reject) => { + const callback = this.createCallback(resolve, reject); + + if (projectIds.length) { + this.redisClient.srem(this.redisDisabledProjectsKey, projectIds, callback); + } + }); + } + + /** + * Creates callback function for Redis operations + * + * @param resolve - callback that will be called if no errors occurred + * @param reject - callback that will be called any error occurred + */ + private createCallback(resolve: () => void, reject: (reason?: unknown) => void) { + return (execError: Error | null): void => { + if (execError) { + this.logger.error(execError); + HawkCatcher.send(execError); + + reject(execError); + + return; + } + this.logger.info('Successfully saved to Redis'); + resolve(); + }; + } +} diff --git a/workers/limiter/tests/index.test.ts b/workers/limiter/tests/index.test.ts index fca867e5..9e871af4 100644 --- a/workers/limiter/tests/index.test.ts +++ b/workers/limiter/tests/index.test.ts @@ -7,12 +7,17 @@ import { mockedPlans } from './plans.mock'; import axios from 'axios'; import { mocked } from 'ts-jest/utils'; import { MS_IN_SEC } from '../../../lib/utils/consts'; +import { RegularWorkspacesCheckEvent } from '../types/eventTypes'; /** * Mock axios for testing report sends */ jest.mock('axios'); +const REGULAR_WORKSPACES_CHECK_EVENT: RegularWorkspacesCheckEvent = { + type: 'regular-workspaces-check', +}; + /** * Constant of last charge date in all workspaces for tests */ @@ -126,236 +131,366 @@ describe('Limiter worker', () => { await planCollection.insertMany(Object.values(mockedPlans)); }); - test('Should count workspace events for a billing period and save it to the db', async () => { - /** - * Arrange - */ - const workspace = createWorkspaceMock({ - plan: mockedPlans.eventsLimit10, - billingPeriodEventsCount: 0, - lastChargeDate: LAST_CHARGE_DATE, - }); - const project = createProjectMock({ workspaceId: workspace._id }); - const eventsCollection = db.collection(`events:${project._id.toString()}`); - const repetitionsCollection = db.collection(`repetitions:${project._id.toString()}`); - - await fillDatabaseWithMockedData({ - workspace, - project, - eventsToMock: 5, - }); - - /** - * Act - * - * Worker initialization - */ - const worker = new LimiterWorker(); + describe('regular-workspaces-check', () => { + test('Should count workspace events for a billing period and save it to the db', async () => { + /** + * Arrange + */ + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + const eventsCollection = db.collection(`events:${project._id.toString()}`); + const repetitionsCollection = db.collection(`repetitions:${project._id.toString()}`); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 5, + }); - await worker.start(); - await worker.handle(); - await worker.finish(); + /** + * Act + * + * Worker initialization + */ + const worker = new LimiterWorker(); - /** - * Assert - */ - const workspaceInDatabase = await workspaceCollection.findOne({ - _id: workspace._id, - }); + await worker.start(); + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); + await worker.finish(); - /** - * Count events and repetitions since last charge date - */ - const since = Math.floor(new Date(workspace.lastChargeDate).getTime() / MS_IN_SEC); - const query = { - 'payload.timestamp': { - $gt: since, - }, - }; - const repetitionsCount = await repetitionsCollection.find(query).count(); - const eventsCount = await eventsCollection.find(query).count(); + /** + * Assert + */ + const workspaceInDatabase = await workspaceCollection.findOne({ + _id: workspace._id, + }); - /** - * Check count of events - */ - expect(workspaceInDatabase.billingPeriodEventsCount).toEqual(repetitionsCount + eventsCount); - }); + /** + * Count events and repetitions since last charge date + */ + const since = Math.floor(new Date(workspace.lastChargeDate).getTime() / MS_IN_SEC); + const query = { + 'payload.timestamp': { + $gt: since, + }, + }; + const repetitionsCount = await repetitionsCollection.find(query).count(); + const eventsCount = await eventsCollection.find(query).count(); - test('Should ban projects that have exceeded the plan limit and add their ids to redis', async (done) => { - /** - * Arrange - */ - const workspace = createWorkspaceMock({ - plan: mockedPlans.eventsLimit10, - billingPeriodEventsCount: 0, - lastChargeDate: LAST_CHARGE_DATE, + /** + * Check count of events + */ + expect(workspaceInDatabase.billingPeriodEventsCount).toEqual(repetitionsCount + eventsCount); }); - const project = createProjectMock({ workspaceId: workspace._id }); - await fillDatabaseWithMockedData({ - workspace, - project, - eventsToMock: 15, - }); + test('Should ban projects that have exceeded the plan limit and add their ids to redis', async (done) => { + /** + * Arrange + */ + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 15, + }); - /** - * Act - * - * Worker initialization - */ - const worker = new LimiterWorker(); + /** + * Act + * + * Worker initialization + */ + const worker = new LimiterWorker(); - await worker.start(); - await worker.handle(); - await worker.finish(); + await worker.start(); + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); + await worker.finish(); - /** - * Assert - * - * Gets all members of set with key 'DisabledProjectsSet' from Redis - */ - redisClient.smembers('DisabledProjectsSet', (err, result) => { - expect(err).toBeNull(); - expect(result).toContain(project._id.toString()); - done(); + /** + * Assert + * + * Gets all members of set with key 'DisabledProjectsSet' from Redis + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + expect(result).toContain(project._id.toString()); + done(); + }); }); - }); - test('Should unban previously banned projects if the limit allows', async (done) => { - /** - * Arrange - */ - const workspace = createWorkspaceMock({ - plan: mockedPlans.eventsLimit10, - billingPeriodEventsCount: 0, - lastChargeDate: LAST_CHARGE_DATE, - }); - const project = createProjectMock({ workspaceId: workspace._id }); + test('Should unban previously banned projects if the limit allows', async (done) => { + /** + * Arrange + */ + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 15, + }); - await fillDatabaseWithMockedData({ - workspace, - project, - eventsToMock: 15, - }); + /** + * Act + * + * Worker initialization + */ + const worker = new LimiterWorker(); - /** - * Act - * - * Worker initialization - */ - const worker = new LimiterWorker(); + await worker.start(); + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); - await worker.start(); - await worker.handle(); + /** + * Gets all members of set with key 'DisabledProjectsSet' from Redis + * Project should be banned + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + expect(result).toContain(project._id.toString()); + }); - /** - * Gets all members of set with key 'DisabledProjectsSet' from Redis - * Project should be banned - */ - redisClient.smembers('DisabledProjectsSet', (err, result) => { - expect(err).toBeNull(); - expect(result).toContain(project._id.toString()); - }); + /** + * Change workspace plan to plan with big events limit + */ + await workspaceCollection.findOneAndUpdate({ _id: workspace._id }, { + $set: { + tariffPlanId: mockedPlans.eventsLimit10000._id, + }, + }); - /** - * Change workspace plan to plan with big events limit - */ - await workspaceCollection.findOneAndUpdate({ _id: workspace._id }, { - $set: { - tariffPlanId: mockedPlans.eventsLimit10000._id, - }, + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); + await worker.finish(); + + /** + * Assert + * + * Gets all members of set with key 'DisabledProjectsSet' from Redis + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + expect(result).not.toContain(project._id.toString()); + done(); + }); }); - await worker.handle(); - await worker.finish(); + test('Should not ban project if it does not reach the limit', async (done) => { + /** + * Arrange + */ + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10000, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 100, + }); - /** - * Assert - * - * Gets all members of set with key 'DisabledProjectsSet' from Redis - */ - redisClient.smembers('DisabledProjectsSet', (err, result) => { - expect(err).toBeNull(); - expect(result).not.toContain(project._id.toString()); - done(); - }); - }); + /** + * Act + * + * Worker initialization + */ + const worker = new LimiterWorker(); - test('Should not ban project if it does not reach the limit', async (done) => { - /** - * Arrange - */ - const workspace = createWorkspaceMock({ - plan: mockedPlans.eventsLimit10000, - billingPeriodEventsCount: 0, - lastChargeDate: LAST_CHARGE_DATE, - }); - const project = createProjectMock({ workspaceId: workspace._id }); + await worker.start(); + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); + await worker.finish(); - await fillDatabaseWithMockedData({ - workspace, - project, - eventsToMock: 100, + /** + * Assert + * + * Gets all members of set with key 'DisabledProjectsSet' from Redis + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + + /** + * Redis shouldn't contain id of project 'Test project #2' from 'Test workspace #2' + */ + expect(result).not.toContain(project._id.toString()); + done(); + }); }); - /** - * Act - * - * Worker initialization - */ - const worker = new LimiterWorker(); + test('Should send a report with collected data', async () => { + /** + * Arrange + * + * Worker initialization + */ + const worker = new LimiterWorker(); - await worker.start(); - await worker.handle(); - await worker.finish(); + mocked(axios).mockResolvedValue({ + data: {}, + status: 200, + statusText: 'OK', + config: {}, + headers: {}, + }); - /** - * Assert - * - * Gets all members of set with key 'DisabledProjectsSet' from Redis - */ - redisClient.smembers('DisabledProjectsSet', (err, result) => { - expect(err).toBeNull(); + /** + * Act + */ + await worker.start(); + await worker.handle(REGULAR_WORKSPACES_CHECK_EVENT); + await worker.finish(); /** - * Redis shouldn't contain id of project 'Test project #2' from 'Test workspace #2' + * Assert */ - expect(result).not.toContain(project._id.toString()); - done(); + expect(axios).toHaveBeenCalled(); + expect(axios).toHaveBeenCalledWith({ + method: 'post', + url: process.env.REPORT_NOTIFY_URL, + data: expect.any(String), + }); }); }); - test('Should send a report with collected data', async () => { - /** - * Arrange - * - * Worker initialization - */ - const worker = new LimiterWorker(); - - mocked(axios).mockResolvedValue({ - data: {}, - status: 200, - statusText: 'OK', - config: {}, - headers: {}, + describe('check-single-workspace', () => { + test('Should unblock workspace if the number of events does not exceed the limit', async (done) => { + /** + * Arrange + * + * Worker initialization + */ + const worker = new LimiterWorker(); + + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10000, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 100, + }); + + mocked(axios).mockResolvedValue({ + data: {}, + status: 200, + statusText: 'OK', + config: {}, + headers: {}, + }); + + /** + * Act + */ + await worker.start(); + await worker.handle({ + type: 'check-single-workspace', + workspaceId: workspace._id.toString(), + }); + await worker.finish(); + + /** + * Assert + */ + expect(axios).toHaveBeenCalled(); + expect(axios).toHaveBeenCalledWith({ + method: 'post', + url: process.env.REPORT_NOTIFY_URL, + data: expect.any(String), + }); + + /** + * Gets all members of set with key 'DisabledProjectsSet' from Redis + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + + /** + * Redis shouldn't contain id of project 'Test project #2' from 'Test workspace #2' + */ + expect(result).not.toContain(project._id.toString()); + done(); + }); }); - /** - * Act - */ - await worker.start(); - await worker.handle(); - await worker.finish(); + test('Should block workspace if the number of events exceed the limit', async (done) => { + /** + * Arrange + * + * Worker initialization + */ + const worker = new LimiterWorker(); + + const workspace = createWorkspaceMock({ + plan: mockedPlans.eventsLimit10, + billingPeriodEventsCount: 0, + lastChargeDate: LAST_CHARGE_DATE, + }); + const project = createProjectMock({ workspaceId: workspace._id }); + + await fillDatabaseWithMockedData({ + workspace, + project, + eventsToMock: 100, + }); + + mocked(axios).mockResolvedValue({ + data: {}, + status: 200, + statusText: 'OK', + config: {}, + headers: {}, + }); - /** - * Assert - */ - expect(axios).toHaveBeenCalled(); - expect(axios).toHaveBeenCalledWith({ - method: 'post', - url: process.env.REPORT_NOTIFY_URL, - data: expect.any(String), + /** + * Act + */ + await worker.start(); + await worker.handle({ + type: 'check-single-workspace', + workspaceId: workspace._id.toString(), + }); + await worker.finish(); + + /** + * Assert + */ + expect(axios).toHaveBeenCalled(); + expect(axios).toHaveBeenCalledWith({ + method: 'post', + url: process.env.REPORT_NOTIFY_URL, + data: expect.any(String), + }); + + /** + * Gets all members of set with key 'DisabledProjectsSet' from Redis + */ + redisClient.smembers('DisabledProjectsSet', (err, result) => { + expect(err).toBeNull(); + + /** + * Redis shouldn't contain id of the mocked project + */ + expect(result).toContain(project._id.toString()); + done(); + }); }); }); diff --git a/workers/limiter/types/eventTypes.ts b/workers/limiter/types/eventTypes.ts new file mode 100644 index 00000000..600e745e --- /dev/null +++ b/workers/limiter/types/eventTypes.ts @@ -0,0 +1,32 @@ +/** + * Event for checking events count for specified workspace + * Limiter will unban workspace projects if event limit doesn't exceed + */ +export interface CheckSingleWorkspaceEvent { + /** + * Event type name + */ + type: 'check-single-workspace' + + /** + * Workspace id to check + */ + workspaceId: string; +} + +/** + * Event for checking current total events count in workspaces and limits events receiving if workspace exceed the limit + */ +export interface RegularWorkspacesCheckEvent { + /** + * Event type name + */ + type: 'regular-workspaces-check' +} + +/** + * All types of events for limiter worker + */ +type LimiterEvent = CheckSingleWorkspaceEvent | RegularWorkspacesCheckEvent; + +export default LimiterEvent; diff --git a/workers/limiter/types/index.ts b/workers/limiter/types/index.ts new file mode 100644 index 00000000..0407a095 --- /dev/null +++ b/workers/limiter/types/index.ts @@ -0,0 +1,6 @@ +import { PlanDBScheme, WorkspaceDBScheme } from 'hawk.types'; + +/** + * Workspace with its tariff plan + */ +export type WorkspaceWithTariffPlan = WorkspaceDBScheme & {tariffPlan: PlanDBScheme}; diff --git a/workers/limiter/types/reportData.ts b/workers/limiter/types/reportData.ts index 7b11a080..fcf6b45b 100644 --- a/workers/limiter/types/reportData.ts +++ b/workers/limiter/types/reportData.ts @@ -1,16 +1,36 @@ -import { WorkspaceDBScheme } from 'hawk.types'; +import { WorkspaceWithTariffPlan } from './index'; /** * Data for sending notification after task handling */ -export default interface ReportData { +export interface SingleWorkspaceAnalyzeReport { + /** + * Is workspace get banned + */ + isBanned: boolean; + + /** + * Workspace with updated data (current events count) + */ + updatedWorkspace: WorkspaceWithTariffPlan +} + +/** + * Data for sending notification after task handling + */ +export interface MultiplyWorkspacesAnalyzeReport { /** * Banned workspaces data */ - bannedWorkspaces: WorkspaceDBScheme[]; + bannedWorkspaces: WorkspaceWithTariffPlan[]; /** * Projects ids to ban */ bannedProjectIds: string[]; + + /** + * Array of workspaces with updated fields + */ + updatedWorkspaces: WorkspaceWithTariffPlan[] }