Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ export abstract class Worker {
);
}

/**
* Forced clears worker cache
*/
public clearCache(): void {
this.cache.flushAll();
}

/**
* Create cache controller instance
*/
Expand Down
53 changes: 38 additions & 15 deletions workers/grouper/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ export default class GrouperWorker extends Worker {
*/
public readonly type: string = pkg.workerType;

/**
* Contains event grouphashes by its catcher type and event title as keys
*
* @example
* {
* 'grouper:Hawk client catcher test': '7e2b961c35b915dcbe2704e144e8d2c3517e2c5281a5de4403c0c58978b435a0'
* }
*/
private static cachedHashValues: Record<string, string> = {};

/**
* Database Controller
*/
Expand All @@ -37,16 +47,23 @@ export default class GrouperWorker extends Worker {
* @param task - worker task to create hash
*/
private static getUniqueEventHash(task: GroupWorkerTask): string {
return crypto.createHmac('sha256', process.env.EVENT_SECRET)
.update(task.catcherType + task.event.title)
.digest('hex');
const computedHashValueCacheKey = `${task.catcherType}:${task.event.title}`;

if (!this.cachedHashValues[computedHashValueCacheKey]) {
this.cachedHashValues[computedHashValueCacheKey] = crypto.createHmac('sha256', process.env.EVENT_SECRET)
.update(task.catcherType + task.event.title)
.digest('hex');
}

return this.cachedHashValues[computedHashValueCacheKey];
}

/**
* Start consuming messages
*/
public async start(): Promise<void> {
await this.db.connect();
this.prepareCache();
await super.start();
}

Expand Down Expand Up @@ -168,11 +185,14 @@ export default class GrouperWorker extends Worker {
if (isUserFromOriginalEvent) {
return false;
} else {
const repetition = await this.db.getConnection().collection(`repetitions:${task.projectId}`)
.findOne({
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
});
const repetitionCacheKey = `repetitions:${task.projectId}:${existedEvent.groupHash}:${eventUser.id}`;
const repetition = await this.cache.get(repetitionCacheKey, async () => {
Comment thread
nikmel2803 marked this conversation as resolved.
return this.db.getConnection().collection(`repetitions:${task.projectId}`)
.findOne({
groupHash: existedEvent.groupHash,
'payload.user.id': eventUser.id,
});
});

/**
* If there is no repetitions from this user — return true
Expand All @@ -187,18 +207,21 @@ export default class GrouperWorker extends Worker {
* @param projectId - project's identifier
* @param query - mongo query string
*/
private async getEvent(projectId: string, query): Promise<GroupedEventDBScheme> {
private async getEvent(projectId: string, query: Record<string, unknown>): Promise<GroupedEventDBScheme> {
if (!mongodb.ObjectID.isValid(projectId)) {
throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed');
}

try {
const eventCacheKey = `${projectId}:${JSON.stringify(query)}`;

return this.cache.get(eventCacheKey, async () => {
Comment thread
nikmel2803 marked this conversation as resolved.
return this.db.getConnection()
.collection(`events:${projectId}`)
.findOne(query);
} catch (err) {
throw new DatabaseReadWriteError(err);
}
.findOne(query)
.catch((err) => {
throw new DatabaseReadWriteError(err);
});
});
}

/**
Expand Down Expand Up @@ -279,7 +302,7 @@ export default class GrouperWorker extends Worker {
}

/**
* saves event at the special aggregation collection
* Saves event at the special aggregation collection
*
* @param {string} projectId - project's identifier
* @param {string} eventHash - event hash
Expand Down
4 changes: 4 additions & 0 deletions workers/grouper/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ describe('GrouperWorker', () => {
repetitionsCollection = connection.db().collection('repetitions:' + testGroupingTask.projectId);
});

/**
* Clears worker cache and mongodb before each test
*/
beforeEach(async () => {
worker.clearCache();
await eventsCollection.deleteMany({});
await dailyEventsCollection.deleteMany({});
await repetitionsCollection.deleteMany({});
Expand Down