Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:16.20-slim as build-stage
FROM node:18.20-slim as build-stage

RUN apt update
RUN apt install git -y
Expand Down
62 changes: 40 additions & 22 deletions convertors/move-timestamp-out-of-payload.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require('dotenv').config();
require('process');
const { MongoClient } = require('mongodb');

/**
Expand All @@ -6,16 +7,21 @@
*/
const documentsSelectionLimit = 10000;

/**

Check warning on line 10 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @returns declaration
* @param db - mongo db instance

Check warning on line 11 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @param "db" type
* @param collectionName - name of the collection to be updated

Check warning on line 12 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @param "collectionName" type
*/
async function movePayloadTimestampToEventLevel(db, collectionName) {
const collection = db.collection(collectionName);

const docsToUpdate = collection.find(
{ timestamp: { $exists: false } },
{ projection: { _id: 1, 'payload.timestamp': 1 } }
{
projection: {
_id: 1,
'payload.timestamp': 1,
},
}
).limit(documentsSelectionLimit);

const batchedOps = [];
Expand All @@ -33,11 +39,11 @@
updateOne: {
filter: { _id: doc._id },
update: {
$set: { timestamp: Number(doc.payload.timestamp)},
$unset: {'payload.timestamp': ''},
}
}
})
$set: { timestamp: Number(doc.payload.timestamp) },
$unset: { 'payload.timestamp': '' },
},
},
});

currentCount++;
}
Expand All @@ -46,26 +52,32 @@
await collection.bulkWrite(batchedOps);
}

return currentCount
return currentCount;
}
/**

Check warning on line 57 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @returns declaration
* @param db - mongo db instance

Check warning on line 58 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @param "db" type
* @param repetitionCollectionName - repetitions collection to be updated

Check warning on line 59 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @param "repetitionCollectionName" type
* @param projectId - project id of current repetitions collection

Check warning on line 60 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

Missing JSDoc @param "projectId" type
*/
async function backfillTimestampsFromEvents(db, repetitionCollectionName, projectId) {
const repetitions = db.collection(repetitionCollectionName);
const events = db.collection(`events:${projectId}`);

let bulkOps = [];
const bulkOps = [];
let repetitionCount = 1;

const repetitionsList = await repetitions.find(
{
timestamp: { $exists: false },
},
{ projection: { _id: 1, groupHash: 1 } }
).limit(documentsSelectionLimit).toArray();
{
projection: {
_id: 1,
groupHash: 1,
},
}
).limit(documentsSelectionLimit)
.toArray();

const groupHashList = [];

Expand All @@ -77,24 +89,29 @@

const relatedEvents = await events.find(
{ groupHash: { $in: groupHashList } },
{ projection: { timestamp: 1, groupHash: 1 } }
{
projection: {
timestamp: 1,
groupHash: 1,
},
}
).toArray();

const relatedEventsMap = new Map()
const relatedEventsMap = new Map();

relatedEvents.forEach(e => {
relatedEventsMap.set(e.groupHash, e);
})
});

for (const repetition of repetitionsList) {
const relatedEvent = relatedEventsMap.get(repetition.groupHash);

if (!relatedEvent) {
bulkOps.push({
deleteOne: {
filter: { _id: repetition._id }
}
})
filter: { _id: repetition._id },
},
});
} else if (relatedEvent?.timestamp !== null) {
bulkOps.push({
updateOne: {
Expand All @@ -111,11 +128,12 @@
const result = await repetitions.bulkWrite(bulkOps);
const updated = result.modifiedCount;
const deleted = result.deletedCount;

processed = bulkOps.length;
console.log(` updates (${processed} processed, ${updated} updated, ${deleted} deleted)`);

if (updated + deleted === 0) {
repetitionCollectionsToCheck.filter(collection => collection !== repetition)
repetitionCollectionsToCheck.filter(collection => collection !== repetition);

Check warning on line 136 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

'repetition' is not defined

Check warning on line 136 in convertors/move-timestamp-out-of-payload.js

View workflow job for this annotation

GitHub Actions / ESlint

'repetitionCollectionsToCheck' is not defined
}
}

Expand All @@ -126,7 +144,7 @@
* Method that runs convertor script
*/
async function run() {
const fullUri = 'mongodb://hawk_new:evieg9bauK0ahs2youhoh7aer7kohT@rc1d-2jltinutse1eadfs.mdb.yandexcloud.net:27018/hawk_events?authSource=admin&replicaSet=rs01&tls=true&tlsInsecure=true';
const fullUri = process.env.MONGO_EVENTS_DATABASE_URI;

// Parse the Mongo URL manually
const mongoUrl = new URL(fullUri);
Expand Down Expand Up @@ -174,13 +192,13 @@

// Convert events
let i = 1;
let documentsUpdatedCount = 1
let documentsUpdatedCount = 1;

while (documentsUpdatedCount != 0) {
documentsUpdatedCount = 0;
i = 1;
const collectionsToUpdateCount = eventCollectionsToCheck.length;

for (const collectionName of eventCollectionsToCheck) {
console.log(`[${i}/${collectionsToUpdateCount}] Processing ${collectionName}`);
const updated = await movePayloadTimestampToEventLevel(db, collectionName);
Expand All @@ -189,10 +207,10 @@
eventCollectionsToCheck = eventCollectionsToCheck.filter(collection => collection !== collectionName);
}

documentsUpdatedCount += updated
documentsUpdatedCount += updated;
i++;
}
}
}

// Convert repetitions + backfill from events
documentsUpdatedCount = 1;
Expand Down
72 changes: 35 additions & 37 deletions workers/grouper/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ import type {
import type { RepetitionDBScheme } from '../types/repetition';
import { DatabaseReadWriteError, DiffCalculationError, ValidationError } from '../../../lib/workerErrors';
import { decodeUnsafeFields, encodeUnsafeFields } from '../../../lib/utils/unsafeFields';
import HawkCatcher from '@hawk.so/nodejs';
import { MS_IN_SEC } from '../../../lib/utils/consts';
import DataFilter from './data-filter';
import RedisHelper from './redisHelper';
import levenshtein from 'js-levenshtein';
// import levenshtein from 'js-levenshtein';
import { computeDelta } from './utils/repetitionDiff';
import TimeMs from '../../../lib/utils/time';
import { rightTrim } from '../../../lib/utils/string';
Expand Down Expand Up @@ -109,7 +108,7 @@ export default class GrouperWorker extends Worker {
let existedEvent = await this.getEvent(task.projectId, uniqueEventHash);

/**
* If we couldn't group by group hash (title), try grouping by Levenshtein distance or patterns
* If we couldn't group by group hash (title), try grouping by patterns
*/
if (!existedEvent) {
const similarEvent = await this.findSimilarEvent(task.projectId, task.payload);
Expand Down Expand Up @@ -177,7 +176,6 @@ export default class GrouperWorker extends Worker {
* and we need to process this event as repetition
*/
if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) {
HawkCatcher.send(new Error('[Grouper] MongoError: E11000 duplicate key error collection'));
await this.handle(task);

return;
Expand Down Expand Up @@ -287,35 +285,35 @@ export default class GrouperWorker extends Worker {
* @param event - event to compare
*/
private async findSimilarEvent(projectId: string, event: EventData<EventAddons>): Promise<GroupedEventDBScheme | undefined> {
const eventsCountToCompare = 60;
const diffTreshold = 0.35;
// const eventsCountToCompare = 60;
// const diffTreshold = 0.35;

const lastUniqueEvents = await this.findLastEvents(projectId, eventsCountToCompare);
// const lastUniqueEvents = await this.findLastEvents(projectId, eventsCountToCompare);

/**
* Trim titles to reduce CPU usage for Levenshtein comparison
*/
const trimmedEventTitle = hasValue(event.title) ? rightTrim(event.title, MAX_CODE_LINE_LENGTH) : '';
// const trimmedEventTitle = hasValue(event.title) ? rightTrim(event.title, MAX_CODE_LINE_LENGTH) : '';

/**
* First try to find by Levenshtein distance
*/
const similarByLevenshtein = lastUniqueEvents.filter(prevEvent => {
const trimmedPrevTitle = hasValue(prevEvent.payload.title) ? rightTrim(prevEvent.payload.title, MAX_CODE_LINE_LENGTH) : '';
// const similarByLevenshtein = lastUniqueEvents.filter(prevEvent => {
// const trimmedPrevTitle = hasValue(prevEvent.payload.title) ? rightTrim(prevEvent.payload.title, MAX_CODE_LINE_LENGTH) : '';

if (trimmedEventTitle === '' || trimmedPrevTitle === '') {
return false;
}
// if (trimmedEventTitle === '' || trimmedPrevTitle === '') {
// return false;
// }

const distance = levenshtein(trimmedEventTitle, trimmedPrevTitle);
const threshold = trimmedEventTitle.length * diffTreshold;
// const distance = levenshtein(trimmedEventTitle, trimmedPrevTitle);
// const threshold = trimmedEventTitle.length * diffTreshold;

return distance < threshold;
}).pop();
// return distance < threshold;
// }).pop();

if (similarByLevenshtein) {
return similarByLevenshtein;
}
// if (similarByLevenshtein) {
// return similarByLevenshtein;
// }

/**
* If no match by Levenshtein, try matching by patterns
Expand Down Expand Up @@ -402,23 +400,23 @@ export default class GrouperWorker extends Worker {
* @param count - how many events to return
* @returns {GroupedEventDBScheme[]} list of the last N unique events
*/
private findLastEvents(projectId: string, count: number): Promise<GroupedEventDBScheme[]> {
return this.cache.get(`last:${count}:eventsOf:${projectId}`, async () => {
return this.eventsDb.getConnection()
.collection(`events:${projectId}`)
.find()
.sort({
_id: 1,
})
.limit(count)
.toArray();
},
/**
* TimeMs class stores time intervals in milliseconds, however NodeCache ttl needs to be specified in seconds
*/
/* eslint-disable-next-line @typescript-eslint/no-magic-numbers */
TimeMs.MINUTE / 1000);
}
// private findLastEvents(projectId: string, count: number): Promise<GroupedEventDBScheme[]> {
// return this.cache.get(`last:${count}:eventsOf:${projectId}`, async () => {
// return this.eventsDb.getConnection()
// .collection(`events:${projectId}`)
// .find()
// .sort({
// _id: 1,
// })
// .limit(count)
// .toArray();
// },
// /**
// * TimeMs class stores time intervals in milliseconds, however NodeCache ttl needs to be specified in seconds
// */
// /* eslint-disable-next-line @typescript-eslint/no-magic-numbers */
// TimeMs.MINUTE / 1000);
// }

/**
* Decides whether to increase the number of affected users for the repetition and the daily aggregation
Expand Down
18 changes: 9 additions & 9 deletions workers/grouper/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -457,17 +457,17 @@ describe('GrouperWorker', () => {
});

describe('Grouping', () => {
test('should group events with partially different titles', async () => {
await worker.handle(generateTask({ title: 'Some error (but not filly identical) example' }));
await worker.handle(generateTask({ title: 'Some error (yes, it is not the identical) example' }));
await worker.handle(generateTask({ title: 'Some error (and it is not identical) example' }));
// test('should group events with partially different titles', async () => {
// await worker.handle(generateTask({ title: 'Some error (but not filly identical) example' }));
// await worker.handle(generateTask({ title: 'Some error (yes, it is not the identical) example' }));
// await worker.handle(generateTask({ title: 'Some error (and it is not identical) example' }));

const originalEvent = await eventsCollection.findOne({});
// const originalEvent = await eventsCollection.findOne({});

expect((await repetitionsCollection.find({
groupHash: originalEvent.groupHash,
}).toArray()).length).toBe(2);
});
// expect((await repetitionsCollection.find({
// groupHash: originalEvent.groupHash,
// }).toArray()).length).toBe(2);
// });

describe('Pattern matching', () => {
beforeEach(() => {
Expand Down
Loading