From 3a0a134bbf529124767a2f82c798b8070820e022 Mon Sep 17 00:00:00 2001 From: capJavert Date: Fri, 21 Nov 2025 15:23:53 +0100 Subject: [PATCH 1/6] feat: parse cv to profile --- .infra/common.ts | 4 + bin/importProfileFromJSON.ts | 62 ++------------- package.json | 2 +- pnpm-lock.yaml | 10 +-- src/common/profile/import.ts | 68 ++++++++++++++++ src/entity/user/User.ts | 1 + src/workers/opportunity/parseCVProfile.ts | 97 +++++++++++++++++++++++ 7 files changed, 181 insertions(+), 63 deletions(-) create mode 100644 src/workers/opportunity/parseCVProfile.ts diff --git a/.infra/common.ts b/.infra/common.ts index d5d1b982a0..1607c2f02f 100644 --- a/.infra/common.ts +++ b/.infra/common.ts @@ -437,6 +437,10 @@ export const workers: Worker[] = [ topic: 'api.v1.recruiter-rejected-candidate-match', subscription: 'api.recruiter-rejected-candidate-match-email', }, + { + topic: 'api.v1.candidate-preference-updated', + subscription: 'api.parse-cv-profile', + }, ]; export const personalizedDigestWorkers: Worker[] = [ diff --git a/bin/importProfileFromJSON.ts b/bin/importProfileFromJSON.ts index 98db61a2e9..c792780f09 100644 --- a/bin/importProfileFromJSON.ts +++ b/bin/importProfileFromJSON.ts @@ -5,14 +5,7 @@ import { z } from 'zod'; import createOrGetConnection from '../src/db'; import { type DataSource } from 'typeorm'; import { readFile } from 'node:fs/promises'; -import { userExperienceInputBaseSchema } from '../src/common/schema/profile'; -import { UserExperienceType } from '../src/entity/user/experiences/types'; -import { - importUserExperienceWork, - importUserExperienceEducation, - importUserExperienceCertification, - importUserExperienceProject, -} from '../src/common/profile/import'; +import { importUserExperienceFromJSON } from '../src/common/profile/import'; /** * Import profile from JSON to user by id @@ -47,55 +40,10 @@ const main = async () => { const dataJSON = JSON.parse(await readFile(params.path, 'utf-8')); - const data = z - .array( - userExperienceInputBaseSchema - .pick({ - type: true, - }) - .loose(), - ) - .parse(dataJSON); - - await con.transaction(async (entityManager) => { - for (const item of data) { - switch (item.type) { - case UserExperienceType.Work: - await importUserExperienceWork({ - data: item, - con: entityManager, - userId: params.userId, - }); - - break; - case UserExperienceType.Education: - await importUserExperienceEducation({ - data: item, - con: entityManager, - userId: params.userId, - }); - - break; - case UserExperienceType.Certification: - await importUserExperienceCertification({ - data: item, - con: entityManager, - userId: params.userId, - }); - - break; - case UserExperienceType.Project: - await importUserExperienceProject({ - data: item, - con: entityManager, - userId: params.userId, - }); - - break; - default: - throw new Error(`Unsupported experience type: ${item.type}`); - } - } + await importUserExperienceFromJSON({ + con: con.manager, + dataJson: dataJSON, + userId: params.userId, }); } catch (error) { console.error(error instanceof z.ZodError ? z.prettifyError(error) : error); diff --git a/package.json b/package.json index 9f19f78807..65ccb404d1 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,7 @@ "@connectrpc/connect-fastify": "^1.6.1", "@connectrpc/connect-node": "^1.6.1", "@dailydotdev/graphql-redis-subscriptions": "^2.4.3", - "@dailydotdev/schema": "0.2.50", + "@dailydotdev/schema": "0.2.51", "@dailydotdev/ts-ioredis-pool": "^1.0.2", "@fastify/cookie": "^11.0.2", "@fastify/cors": "^11.1.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2bc518ae72..77f25d5617 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -35,8 +35,8 @@ importers: specifier: ^2.4.3 version: 2.4.3(graphql-subscriptions@3.0.0(graphql@16.11.0)) '@dailydotdev/schema': - specifier: 0.2.50 - version: 0.2.50(@bufbuild/protobuf@1.10.0) + specifier: 0.2.51 + version: 0.2.51(@bufbuild/protobuf@1.10.0) '@dailydotdev/ts-ioredis-pool': specifier: ^1.0.2 version: 1.0.2 @@ -702,8 +702,8 @@ packages: peerDependencies: graphql-subscriptions: ^1.0.0 || ^2.0.0 - '@dailydotdev/schema@0.2.50': - resolution: {integrity: sha512-kQI3CCVfjfenJI358MdB0J/eXrZvcdcNXoSzrMtkwpmkRKtxe0H8hIoWIU5FPIJGbIs5KxWsUHikJVxUMFlhRg==} + '@dailydotdev/schema@0.2.51': + resolution: {integrity: sha512-2muxU+xhZwTU6E20Hdh/tQpNGM2qYvIvgKl+Z9PWZjtjF1LFOvXmuebUDqEafvCsGZBwl2ISt5porMc26mL5EA==} peerDependencies: '@bufbuild/protobuf': 1.x @@ -5180,7 +5180,7 @@ snapshots: transitivePeerDependencies: - supports-color - '@dailydotdev/schema@0.2.50(@bufbuild/protobuf@1.10.0)': + '@dailydotdev/schema@0.2.51(@bufbuild/protobuf@1.10.0)': dependencies: '@bufbuild/protobuf': 1.10.0 diff --git a/src/common/profile/import.ts b/src/common/profile/import.ts index 0f0cb13177..78b373e430 100644 --- a/src/common/profile/import.ts +++ b/src/common/profile/import.ts @@ -2,6 +2,7 @@ import { type EntityManager } from 'typeorm'; import { userExperienceCertificationImportSchema, userExperienceEducationImportSchema, + userExperienceInputBaseSchema, userExperienceProjectImportSchema, userExperienceWorkImportSchema, } from '../../../src/common/schema/profile'; @@ -15,6 +16,8 @@ import { DatasetLocation } from '../../../src/entity/dataset/DatasetLocation'; import { UserExperienceEducation } from '../../../src/entity/user/experiences/UserExperienceEducation'; import { UserExperienceCertification } from '../../../src/entity/user/experiences/UserExperienceCertification'; import { UserExperienceProject } from '../../../src/entity/user/experiences/UserExperienceProject'; +import z from 'zod'; +import { UserExperienceType } from '../../entity/user/experiences/types'; const resolveUserCompanyPart = async ({ name, @@ -306,3 +309,68 @@ export const importUserExperienceProject = async ({ experienceId, }; }; + +export const importUserExperienceFromJSON = async ({ + con, + dataJson, + userId, +}: { + con: EntityManager; + dataJson: unknown; + userId: string; +}) => { + if (!userId) { + throw new Error('userId is required'); + } + + const data = z + .array( + userExperienceInputBaseSchema + .pick({ + type: true, + }) + .loose(), + ) + .parse(dataJson); + + await con.transaction(async (entityManager) => { + for (const item of data) { + switch (item.type) { + case UserExperienceType.Work: + await importUserExperienceWork({ + data: item, + con: entityManager, + userId, + }); + + break; + case UserExperienceType.Education: + await importUserExperienceEducation({ + data: item, + con: entityManager, + userId, + }); + + break; + case UserExperienceType.Certification: + await importUserExperienceCertification({ + data: item, + con: entityManager, + userId, + }); + + break; + case UserExperienceType.Project: + await importUserExperienceProject({ + data: item, + con: entityManager, + userId, + }); + + break; + default: + throw new Error(`Unsupported experience type: ${item.type}`); + } + } + }); +}; diff --git a/src/entity/user/User.ts b/src/entity/user/User.ts index 65a6103b01..69feee989c 100644 --- a/src/entity/user/User.ts +++ b/src/entity/user/User.ts @@ -44,6 +44,7 @@ export type UserFlags = Partial<{ lng: number | null | undefined; }; subdivision: string | null; + lastCVParseAt: Date; }>; export type UserFlagsPublic = Pick; diff --git a/src/workers/opportunity/parseCVProfile.ts b/src/workers/opportunity/parseCVProfile.ts new file mode 100644 index 0000000000..0379fa7ca3 --- /dev/null +++ b/src/workers/opportunity/parseCVProfile.ts @@ -0,0 +1,97 @@ +import { + BrokkrParseRequest, + CandidatePreferenceUpdated, +} from '@dailydotdev/schema'; +import type { TypedWorker } from '../worker'; +import { User } from '../../entity/user/User'; +import { getBrokkrClient } from '../../common/brokkr'; +import { updateFlagsStatement } from '../../common/utils'; +import { importUserExperienceFromJSON } from '../../common/profile/import'; + +export const parseCVProfileWorker: TypedWorker<'api.v1.candidate-preference-updated'> = + { + subscription: 'api.parse-cv-profile', + parseMessage: ({ data }) => CandidatePreferenceUpdated.fromBinary(data), + handler: async ({ data }, con) => { + const { userId, cv } = data.payload || {}; + + if (!cv?.blob || !cv?.bucket) { + return; + } + + if (!cv?.lastModified) { + return; + } + + if (!userId) { + return; + } + + const user: Pick | null = await con + .getRepository(User) + .findOne({ + select: ['flags'], + where: { + id: userId, + }, + }); + + if (!user) { + return; + } + + const lastModifiedCVDate = new Date(cv.lastModified); + + if (Number.isNaN(lastModifiedCVDate.getTime())) { + return; + } + + const lastProfileParseDate = user.flags.lastCVParseAt || new Date(0); + + if (lastModifiedCVDate <= lastProfileParseDate) { + return; + } + + const brokkrClient = getBrokkrClient(); + + try { + await con.getRepository(User).update( + { id: userId }, + { + flags: updateFlagsStatement({ + lastCVParseAt: new Date(), + }), + }, + ); + + const result = await brokkrClient.garmr.execute(() => { + return brokkrClient.instance.parseCV( + new BrokkrParseRequest({ + bucketName: cv.bucket, + blobName: cv.blob, + }), + ); + }); + + const dataJson = JSON.parse(result.parsedCv); + + await importUserExperienceFromJSON({ + con: con.manager, + dataJson, + userId, + }); + } catch (error) { + // revert to previous date on error + await con.getRepository(User).update( + { id: userId }, + { + flags: updateFlagsStatement({ + lastCVParseAt: user.flags.lastCVParseAt, + }), + }, + ); + + throw error; + } + }, + }; From d95c753b51f926d94d453d4cb7beffec9f15f912 Mon Sep 17 00:00:00 2001 From: capJavert Date: Mon, 24 Nov 2025 10:41:47 +0100 Subject: [PATCH 2/6] feat: add support for os and volunteering --- src/common/profile/import.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/common/profile/import.ts b/src/common/profile/import.ts index 78b373e430..8c3089395f 100644 --- a/src/common/profile/import.ts +++ b/src/common/profile/import.ts @@ -361,6 +361,8 @@ export const importUserExperienceFromJSON = async ({ break; case UserExperienceType.Project: + case UserExperienceType.OpenSource: + case UserExperienceType.Volunteering: await importUserExperienceProject({ data: item, con: entityManager, From 7b66c4a2f3e320e3e1c3587e347c71a2ace682d8 Mon Sep 17 00:00:00 2001 From: capJavert Date: Mon, 24 Nov 2025 12:24:20 +0100 Subject: [PATCH 3/6] feat: tests --- __tests__/helpers.ts | 19 + .../workers/opportunity/parseCVProfile.ts | 333 ++++++++++++++++++ src/entity/user/User.ts | 2 +- src/workers/opportunity/parseCVProfile.ts | 22 +- 4 files changed, 371 insertions(+), 5 deletions(-) create mode 100644 __tests__/workers/opportunity/parseCVProfile.ts diff --git a/__tests__/helpers.ts b/__tests__/helpers.ts index cc5dbeb4f0..cf45621006 100644 --- a/__tests__/helpers.ts +++ b/__tests__/helpers.ts @@ -46,11 +46,16 @@ import { ScreeningQuestionsResponse, BrokkrService, ExtractMarkdownResponse, + ParseCVResponse, } from '@dailydotdev/schema'; import { createClient, type ClickHouseClient } from '@clickhouse/client'; import * as clickhouseCommon from '../src/common/clickhouse'; import { Message as ProtobufMessage } from '@bufbuild/protobuf'; import { GarmrService } from '../src/integrations/garmr'; +import { userExperienceCertificationFixture } from './fixture/profile/certification'; +import { userExperienceEducationFixture } from './fixture/profile/education'; +import { userExperienceProjectFixture } from './fixture/profile/project'; +import { userExperienceWorkFixture } from './fixture/profile/work'; export class MockContext extends Context { mockSpan: MockProxy & opentelemetry.Span; @@ -443,6 +448,20 @@ export const createMockBrokkrTransport = () => content: `# Extracted content for ${request.blobName} in ${request.bucketName}`, }); }, + parseCV: (request) => { + if (request.blobName === 'empty-cv-mock') { + return new ParseCVResponse({}); + } + + return new ParseCVResponse({ + parsedCv: JSON.stringify([ + userExperienceCertificationFixture[0], + userExperienceEducationFixture[0], + userExperienceProjectFixture[0], + userExperienceWorkFixture[0], + ]), + }); + }, }); }); diff --git a/__tests__/workers/opportunity/parseCVProfile.ts b/__tests__/workers/opportunity/parseCVProfile.ts new file mode 100644 index 0000000000..7e049c5ffe --- /dev/null +++ b/__tests__/workers/opportunity/parseCVProfile.ts @@ -0,0 +1,333 @@ +import { + createGarmrMock, + createMockBrokkrTransport, + expectSuccessfulTypedBackground, + saveFixtures, +} from '../../helpers'; +import { DataSource } from 'typeorm'; +import createOrGetConnection from '../../../src/db'; +import { User } from '../../../src/entity/user/User'; +import { usersFixture } from '../../fixture/user'; +import { parseCVProfileWorker as worker } from '../../../src/workers/opportunity/parseCVProfile'; +import { BrokkrService, CandidatePreferenceUpdated } from '@dailydotdev/schema'; +import { createClient } from '@connectrpc/connect'; +import type { ServiceClient } from '../../../src/types'; +import * as brokkrCommon from '../../../src/common/brokkr'; +import { UserExperience } from '../../../src/entity/user/experiences/UserExperience'; +import { getSecondsTimestamp, updateFlagsStatement } from '../../../src/common'; + +let con: DataSource; + +beforeAll(async () => { + con = await createOrGetConnection(); +}); + +describe('parseCVProfile worker', () => { + beforeEach(async () => { + jest.resetAllMocks(); + + await saveFixtures( + con, + User, + usersFixture.map((item) => { + return { + ...item, + id: `${item.id}-pcpw`, + }; + }), + ); + + const transport = createMockBrokkrTransport(); + + const serviceClient = { + instance: createClient(BrokkrService, transport), + garmr: createGarmrMock(), + }; + + jest + .spyOn(brokkrCommon, 'getBrokkrClient') + .mockImplementation((): ServiceClient => { + return serviceClient; + }); + }); + + it('should parse CV to profile', async () => { + const userId = '1-pcpw'; + + const payload = new CandidatePreferenceUpdated({ + payload: { + userId, + cv: { + blob: userId, + bucket: 'bucket-test', + lastModified: getSecondsTimestamp(new Date()), + }, + }, + }); + + const parseCVSpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseCV', + ); + + await expectSuccessfulTypedBackground<'api.v1.candidate-preference-updated'>( + worker, + payload, + ); + + expect(parseCVSpy).toHaveBeenCalledTimes(1); + + const experiences = await con.getRepository(UserExperience).find({ + where: { userId }, + }); + + expect(experiences).toHaveLength(4); + + const user = await con.getRepository(User).findOneBy({ id: userId }); + expect(user?.flags.lastCVParseAt).toBeDefined(); + }); + + it('should skip if CV blob or bucket is empty', async () => { + const userId = '1-pcpw'; + + const payload = new CandidatePreferenceUpdated({ + payload: { + userId, + }, + }); + + const parseCVSpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseCV', + ); + + await expectSuccessfulTypedBackground<'api.v1.candidate-preference-updated'>( + worker, + payload, + ); + + expect(parseCVSpy).toHaveBeenCalledTimes(0); + + const experiences = await con.getRepository(UserExperience).find({ + where: { userId }, + }); + + expect(experiences).toHaveLength(0); + }); + + it('should skip if CV lastModified is empty', async () => { + const userId = '1-pcpw'; + + const payload = new CandidatePreferenceUpdated({ + payload: { + userId, + cv: { + blob: userId, + bucket: 'bucket-test', + }, + }, + }); + + const parseCVSpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseCV', + ); + + await expectSuccessfulTypedBackground<'api.v1.candidate-preference-updated'>( + worker, + payload, + ); + + expect(parseCVSpy).toHaveBeenCalledTimes(0); + + const experiences = await con.getRepository(UserExperience).find({ + where: { userId }, + }); + + expect(experiences).toHaveLength(0); + }); + + it('should skip if userId is empty', async () => { + const userId = '1-pcpw'; + + const payload = new CandidatePreferenceUpdated({ + payload: { + cv: { + blob: userId, + bucket: 'bucket-test', + lastModified: getSecondsTimestamp(new Date()), + }, + }, + }); + + const parseCVSpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseCV', + ); + + await expectSuccessfulTypedBackground<'api.v1.candidate-preference-updated'>( + worker, + payload, + ); + + expect(parseCVSpy).toHaveBeenCalledTimes(0); + + const experiences = await con.getRepository(UserExperience).find({ + where: { userId }, + }); + + expect(experiences).toHaveLength(0); + }); + + it('should skip if user is not found', async () => { + const userId = 'non-existing-user'; + + const payload = new CandidatePreferenceUpdated({ + payload: { + userId, + cv: { + blob: userId, + bucket: 'bucket-test', + lastModified: getSecondsTimestamp(new Date()), + }, + }, + }); + + const parseCVSpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseCV', + ); + + await expectSuccessfulTypedBackground<'api.v1.candidate-preference-updated'>( + worker, + payload, + ); + + expect(parseCVSpy).toHaveBeenCalledTimes(0); + + const experiences = await con.getRepository(UserExperience).find({ + where: { userId }, + }); + + expect(experiences).toHaveLength(0); + }); + + it('should skip if lastModified is less then last profile parse date', async () => { + const userId = '1-pcpw'; + + await con.getRepository(User).update( + { id: userId }, + { + flags: updateFlagsStatement({ + lastCVParseAt: new Date(Date.now() + 1 * 24 * 60 * 60 * 1000), // 1 day in future + }), + }, + ); + + const payload = new CandidatePreferenceUpdated({ + payload: { + userId, + cv: { + blob: userId, + bucket: 'bucket-test', + lastModified: getSecondsTimestamp(new Date()), + }, + }, + }); + + const parseCVSpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseCV', + ); + + await expectSuccessfulTypedBackground<'api.v1.candidate-preference-updated'>( + worker, + payload, + ); + + expect(parseCVSpy).toHaveBeenCalledTimes(0); + + const experiences = await con.getRepository(UserExperience).find({ + where: { userId }, + }); + + expect(experiences).toHaveLength(0); + }); + + it('should fail if parsedCV in result is empty', async () => { + const userId = '1-pcpw'; + + const payload = new CandidatePreferenceUpdated({ + payload: { + userId, + cv: { + blob: 'empty-cv-mock', + bucket: 'bucket-test', + lastModified: getSecondsTimestamp(new Date()), + }, + }, + }); + + const parseCVSpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseCV', + ); + + await expectSuccessfulTypedBackground<'api.v1.candidate-preference-updated'>( + worker, + payload, + ); + + expect(parseCVSpy).toHaveBeenCalledTimes(1); + + const experiences = await con.getRepository(UserExperience).find({ + where: { userId }, + }); + + expect(experiences).toHaveLength(0); + + const user = await con.getRepository(User).findOneBy({ id: userId }); + expect(user?.flags.lastCVParseAt).toBeNull(); + }); + + it('should revert date of profile parse if parsing fails', async () => { + const userId = '1-pcpw'; + + const parseDate = new Date('2024-01-01T00:00:00Z'); + + await con.getRepository(User).update( + { id: userId }, + { + flags: updateFlagsStatement({ + lastCVParseAt: parseDate, + }), + }, + ); + + const payload = new CandidatePreferenceUpdated({ + payload: { + userId, + cv: { + blob: 'empty-cv-mock', + bucket: 'bucket-test', + lastModified: getSecondsTimestamp(new Date()), + }, + }, + }); + + const parseCVSpy = jest.spyOn( + brokkrCommon.getBrokkrClient().instance, + 'parseCV', + ); + + await expectSuccessfulTypedBackground<'api.v1.candidate-preference-updated'>( + worker, + payload, + ); + + expect(parseCVSpy).toHaveBeenCalledTimes(1); + + const user = await con.getRepository(User).findOneBy({ id: userId }); + expect(user?.flags.lastCVParseAt).toBe(parseDate.toISOString()); + }); +}); diff --git a/src/entity/user/User.ts b/src/entity/user/User.ts index 69feee989c..d138569fa4 100644 --- a/src/entity/user/User.ts +++ b/src/entity/user/User.ts @@ -44,7 +44,7 @@ export type UserFlags = Partial<{ lng: number | null | undefined; }; subdivision: string | null; - lastCVParseAt: Date; + lastCVParseAt: Date | null; }>; export type UserFlagsPublic = Pick; diff --git a/src/workers/opportunity/parseCVProfile.ts b/src/workers/opportunity/parseCVProfile.ts index 0379fa7ca3..3b1376f8d7 100644 --- a/src/workers/opportunity/parseCVProfile.ts +++ b/src/workers/opportunity/parseCVProfile.ts @@ -7,6 +7,7 @@ import { User } from '../../entity/user/User'; import { getBrokkrClient } from '../../common/brokkr'; import { updateFlagsStatement } from '../../common/utils'; import { importUserExperienceFromJSON } from '../../common/profile/import'; +import { logger } from '../../logger'; export const parseCVProfileWorker: TypedWorker<'api.v1.candidate-preference-updated'> = { @@ -40,13 +41,15 @@ export const parseCVProfileWorker: TypedWorker<'api.v1.candidate-preference-upda return; } - const lastModifiedCVDate = new Date(cv.lastModified); + const lastModifiedCVDate = new Date(cv.lastModified * 1000); if (Number.isNaN(lastModifiedCVDate.getTime())) { return; } - const lastProfileParseDate = user.flags.lastCVParseAt || new Date(0); + const lastProfileParseDate = user.flags.lastCVParseAt + ? new Date(user.flags.lastCVParseAt) + : new Date(0); if (lastModifiedCVDate <= lastProfileParseDate) { return; @@ -73,6 +76,10 @@ export const parseCVProfileWorker: TypedWorker<'api.v1.candidate-preference-upda ); }); + if (!result.parsedCv) { + throw new Error('Empty parsedCV result'); + } + const dataJson = JSON.parse(result.parsedCv); await importUserExperienceFromJSON({ @@ -86,12 +93,19 @@ export const parseCVProfileWorker: TypedWorker<'api.v1.candidate-preference-upda { id: userId }, { flags: updateFlagsStatement({ - lastCVParseAt: user.flags.lastCVParseAt, + lastCVParseAt: user.flags.lastCVParseAt || null, }), }, ); - throw error; + logger.error( + { + err: error, + userId, + cv, + }, + 'Error parsing CV to profile', + ); } }, }; From 8e955e1d04fc9bb631bfe0db440ebef3c605ea52 Mon Sep 17 00:00:00 2001 From: capJavert Date: Mon, 24 Nov 2025 12:28:37 +0100 Subject: [PATCH 4/6] feat: register worker --- src/workers/index.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/workers/index.ts b/src/workers/index.ts index 11a5f60ee7..e33a2489ad 100644 --- a/src/workers/index.ts +++ b/src/workers/index.ts @@ -73,6 +73,7 @@ import { storeCandidateApplicationScore } from './opportunity/storeCandidateAppl import { extractCVMarkdown } from './extractCVMarkdown'; import candidateAcceptedOpportunitySlack from './candidateAcceptedOpportunitySlack'; import recruiterRejectedCandidateMatchEmail from './recruiterRejectedCandidateMatchEmail'; +import { parseCVProfileWorker } from './opportunity/parseCVProfile'; export { Worker } from './worker'; @@ -149,6 +150,7 @@ export const typedWorkers: BaseTypedWorker[] = [ extractCVMarkdown, candidateAcceptedOpportunitySlack, recruiterRejectedCandidateMatchEmail, + parseCVProfileWorker, ]; export const personalizedDigestWorkers: Worker[] = [ From 5b1b383f4775f9460e2aa4e0f8e77e0086a87bbc Mon Sep 17 00:00:00 2001 From: capJavert Date: Mon, 24 Nov 2025 12:37:57 +0100 Subject: [PATCH 5/6] chore: disable worker for now in prod --- src/workers/opportunity/parseCVProfile.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/workers/opportunity/parseCVProfile.ts b/src/workers/opportunity/parseCVProfile.ts index 3b1376f8d7..2986360225 100644 --- a/src/workers/opportunity/parseCVProfile.ts +++ b/src/workers/opportunity/parseCVProfile.ts @@ -5,7 +5,7 @@ import { import type { TypedWorker } from '../worker'; import { User } from '../../entity/user/User'; import { getBrokkrClient } from '../../common/brokkr'; -import { updateFlagsStatement } from '../../common/utils'; +import { isProd, updateFlagsStatement } from '../../common/utils'; import { importUserExperienceFromJSON } from '../../common/profile/import'; import { logger } from '../../logger'; @@ -14,6 +14,12 @@ export const parseCVProfileWorker: TypedWorker<'api.v1.candidate-preference-upda subscription: 'api.parse-cv-profile', parseMessage: ({ data }) => CandidatePreferenceUpdated.fromBinary(data), handler: async ({ data }, con) => { + if (isProd) { + // disabled for now so I can merge the code and will enable after backfill + + return; + } + const { userId, cv } = data.payload || {}; if (!cv?.blob || !cv?.bucket) { From 98c5ec31fb7bda01e44a3f8a2e7dfdd09b54c976 Mon Sep 17 00:00:00 2001 From: capJavert Date: Mon, 24 Nov 2025 13:21:07 +0100 Subject: [PATCH 6/6] fix: cv parse date from cdc --- src/common/opportunity/pubsub.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/common/opportunity/pubsub.ts b/src/common/opportunity/pubsub.ts index 4695ff391f..aac3ff2276 100644 --- a/src/common/opportunity/pubsub.ts +++ b/src/common/opportunity/pubsub.ts @@ -122,6 +122,11 @@ export const notifyOpportunityMatchAccepted = async ({ return; } + const cvLastModifiedDate = + typeof candidatePreference.cv.lastModified === 'string' + ? new Date(candidatePreference.cv.lastModified) + : candidatePreference.cv.lastModified; + const message = new CandidateAcceptedOpportunityMessage({ opportunityId: match.opportunityId, userId: match.userId, @@ -138,9 +143,7 @@ export const notifyOpportunityMatchAccepted = async ({ }), cv: new UserCV({ ...candidatePreference.cv, - lastModified: - getSecondsTimestamp(candidatePreference.cv.lastModified || 0) || - undefined, + lastModified: getSecondsTimestamp(cvLastModifiedDate || 0) || undefined, }), updatedAt: getSecondsTimestamp(candidatePreference.updatedAt), keywords: keywords, @@ -451,6 +454,11 @@ export const notifyCandidatePreferenceChange = async ({ return; } + const cvLastModifiedDate = + typeof candidatePreference?.cv?.lastModified === 'string' + ? new Date(candidatePreference.cv.lastModified) + : candidatePreference?.cv?.lastModified; + const message = new CandidatePreferenceUpdated({ payload: { ...candidatePreference, @@ -462,9 +470,7 @@ export const notifyCandidatePreferenceChange = async ({ }), cv: new UserCV({ ...candidatePreference?.cv, - lastModified: - getSecondsTimestamp(candidatePreference?.cv?.lastModified || 0) || - undefined, + lastModified: getSecondsTimestamp(cvLastModifiedDate || 0) || undefined, }), updatedAt: getSecondsTimestamp(candidatePreference?.updatedAt) || undefined,