diff --git a/.infra/application.properties b/.infra/application.properties index e4eff0e4af..a4d44d53ba 100644 --- a/.infra/application.properties +++ b/.infra/application.properties @@ -8,7 +8,7 @@ debezium.source.database.user=%database_user% debezium.source.database.password=%database_pass% debezium.source.database.dbname=%database_dbname% debezium.source.database.server.name=api -debezium.source.table.include.list=public.comment,public.user_comment,public.comment_mention,public.source_request,public.post,public.user,public.post_report,public.source_feed,public.settings,public.reputation_event,public.submission,public.user_state,public.notification_v2,public.source_member,public.feature,public.source,public.post_mention,public.content_image,public.comment_report,public.user_post,public.banner,public.post_relation,public.marketing_cta,public.squad_public_request,public.user_streak,public.bookmark,public.user_company,public.source_report,public.user_top_reader,public.source_post_moderation,public.user_report,public.user_transaction,public.content_preference,public.campaign,public.opportunity_match,public.opportunity,public.organization,public.user_candidate_preference,public.user_experience +debezium.source.table.include.list=public.comment,public.user_comment,public.comment_mention,public.source_request,public.post,public.user,public.post_report,public.source_feed,public.settings,public.reputation_event,public.submission,public.user_state,public.notification_v2,public.source_member,public.feature,public.source,public.post_mention,public.content_image,public.comment_report,public.user_post,public.banner,public.post_relation,public.marketing_cta,public.squad_public_request,public.user_streak,public.bookmark,public.user_company,public.source_report,public.user_top_reader,public.source_post_moderation,public.user_report,public.user_transaction,public.content_preference,public.campaign,public.opportunity_match,public.opportunity,public.organization,public.user_candidate_preference,public.user_experience,public.user_referral debezium.source.column.exclude.list=public.post.tsv,public.post.placeholder,public.source.flags,public.user_top_reader.image debezium.source.skip.messages.without.change=true debezium.source.plugin.name=pgoutput diff --git a/src/common/njord.ts b/src/common/njord.ts index c6da184db8..0c6ec0f9bc 100644 --- a/src/common/njord.ts +++ b/src/common/njord.ts @@ -60,6 +60,7 @@ import { Message } from '@bufbuild/protobuf'; import { ensureSourcePermissions } from '../schema/sources'; import { SourceMemberRoles } from '../roles'; import type { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'; +import { usdToCores } from './number'; const transport = createGrpcTransport({ baseUrl: process.env.NJORD_ORIGIN, @@ -1056,7 +1057,7 @@ export const throwUserTransactionError = async ({ error, transaction, }: { - ctx: AuthContext; + ctx: Pick; entityManager: EntityManager; error: TransferError; transaction: UserTransaction; @@ -1096,3 +1097,48 @@ export const throwUserTransactionError = async ({ // throw error for client after saving the transaction in error state throw userTransactionError; }; + +export const awardReferral = async ({ + id, + ctx, +}: { + id: string; + ctx: Pick; +}) => { + await ctx.con.transaction(async (entityManager) => { + const transaction = await entityManager.getRepository(UserTransaction).save( + entityManager.getRepository(UserTransaction).create({ + id: randomUUID(), + processor: UserTransactionProcessor.Njord, + receiverId: ctx.userId, + status: UserTransactionStatus.Success, + productId: null, + senderId: systemUser.id, + value: usdToCores(10), + valueIncFees: 0, + fee: 0, + flags: { note: 'Linkedin recruiter referral' }, + referenceId: id, + referenceType: UserTransactionType.ReferralLinkedin, + }), + ); + + try { + await transferCores({ + ctx, + transaction, + entityManager, + }); + } catch (error) { + if (error instanceof TransferError) { + await throwUserTransactionError({ + ctx, + transaction, + entityManager, + error, + }); + } + throw error; + } + }); +}; diff --git a/src/migration/1761123042645-UserReferral.ts b/src/migration/1761123042645-UserReferral.ts index f6433018a6..affd960cdd 100644 --- a/src/migration/1761123042645-UserReferral.ts +++ b/src/migration/1761123042645-UserReferral.ts @@ -23,6 +23,10 @@ export class UserReferral1761123042645 implements MigrationInterface { ) `); + await queryRunner.query(/* sql */ ` + ALTER TABLE "public"."user_referral" REPLICA IDENTITY FULL + `); + await queryRunner.query(/* sql */ ` CREATE INDEX IF NOT EXISTS "IDX_user_referral_type" ON "user_referral" ("type") diff --git a/src/workers/cdc/primary.ts b/src/workers/cdc/primary.ts index 38248e6c2a..bdb5659907 100644 --- a/src/workers/cdc/primary.ts +++ b/src/workers/cdc/primary.ts @@ -162,6 +162,11 @@ import { PollPost } from '../../entity/posts/PollPost'; import { UserExperienceWork } from '../../entity/user/experiences/UserExperienceWork'; import { UserExperience } from '../../entity/user/experiences/UserExperience'; import { UserExperienceType } from '../../entity/user/experiences/types'; +import { + UserReferral, + UserReferralStatus, +} from '../../entity/user/referral/UserReferral'; +import { awardReferral } from '../../common/njord'; const isFreeformPostLongEnough = ( freeform: ChangeMessage, @@ -1520,6 +1525,25 @@ const onUserExperienceChange = async ( } }; +const onUserReferralChange = async ( + con: DataSource, + _: FastifyBaseLogger, + data: ChangeMessage, +) => { + if (data.payload.op !== 'u') { + return; + } + + if ( + data.payload.before!.status === UserReferralStatus.Pending && + data.payload.after!.status === UserReferralStatus.Accepted + ) { + const referral = data.payload.after!; + const ctx = { userId: referral.userId, con }; + await awardReferral({ id: referral.id, ctx }); + } +}; + const worker: Worker = { subscription: 'api-cdc', maxMessages: parseInt(process.env.CDC_WORKER_MAX_MESSAGES) || undefined, @@ -1654,6 +1678,9 @@ const worker: Worker = { case getTableName(con, UserExperience): await onUserExperienceChange(con, logger, data); break; + case getTableName(con, UserReferral): + await onUserReferralChange(con, logger, data); + break; } } catch (err) { logger.error(