Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .infra/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ debezium.source.database.user=%database_user%
debezium.source.database.password=%database_pass%
debezium.source.database.dbname=%database_dbname%
debezium.source.database.server.name=api
debezium.source.table.include.list=public.comment,public.user_comment,public.comment_mention,public.source_request,public.post,public.user,public.post_report,public.source_feed,public.settings,public.reputation_event,public.submission,public.user_state,public.notification_v2,public.source_member,public.feature,public.source,public.post_mention,public.content_image,public.comment_report,public.user_post,public.banner,public.post_relation,public.marketing_cta,public.squad_public_request,public.user_streak,public.bookmark,public.user_company,public.source_report,public.user_top_reader,public.source_post_moderation,public.user_report,public.user_transaction,public.content_preference,public.campaign,public.opportunity_match,public.opportunity,public.organization,public.user_candidate_preference,public.user_experience
debezium.source.table.include.list=public.comment,public.user_comment,public.comment_mention,public.source_request,public.post,public.user,public.post_report,public.source_feed,public.settings,public.reputation_event,public.submission,public.user_state,public.notification_v2,public.source_member,public.feature,public.source,public.post_mention,public.content_image,public.comment_report,public.user_post,public.banner,public.post_relation,public.marketing_cta,public.squad_public_request,public.user_streak,public.bookmark,public.user_company,public.source_report,public.user_top_reader,public.source_post_moderation,public.user_report,public.user_transaction,public.content_preference,public.campaign,public.opportunity_match,public.opportunity,public.organization,public.user_candidate_preference,public.user_experience,public.user_referral
debezium.source.column.exclude.list=public.post.tsv,public.post.placeholder,public.source.flags,public.user_top_reader.image
debezium.source.skip.messages.without.change=true
debezium.source.plugin.name=pgoutput
Expand Down
48 changes: 47 additions & 1 deletion src/common/njord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import { Message } from '@bufbuild/protobuf';
import { ensureSourcePermissions } from '../schema/sources';
import { SourceMemberRoles } from '../roles';
import type { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity';
import { usdToCores } from './number';

const transport = createGrpcTransport({
baseUrl: process.env.NJORD_ORIGIN,
Expand Down Expand Up @@ -1056,7 +1057,7 @@ export const throwUserTransactionError = async ({
error,
transaction,
}: {
ctx: AuthContext;
ctx: Pick<AuthContext, 'userId' | 'con'>;
entityManager: EntityManager;
error: TransferError;
transaction: UserTransaction;
Expand Down Expand Up @@ -1096,3 +1097,48 @@ export const throwUserTransactionError = async ({
// throw error for client after saving the transaction in error state
throw userTransactionError;
};

export const awardReferral = async ({
id,
ctx,
}: {
id: string;
ctx: Pick<AuthContext, 'userId' | 'con'>;
}) => {
await ctx.con.transaction(async (entityManager) => {
const transaction = await entityManager.getRepository(UserTransaction).save(
entityManager.getRepository(UserTransaction).create({
id: randomUUID(),
processor: UserTransactionProcessor.Njord,
receiverId: ctx.userId,
status: UserTransactionStatus.Success,
productId: null,
senderId: systemUser.id,
value: usdToCores(10),
valueIncFees: 0,
fee: 0,
flags: { note: 'Linkedin recruiter referral' },
referenceId: id,
referenceType: UserTransactionType.ReferralLinkedin,
}),
);

try {
await transferCores({
ctx,
transaction,
entityManager,
});
} catch (error) {
if (error instanceof TransferError) {
await throwUserTransactionError({
ctx,
transaction,
entityManager,
error,
});
}
throw error;
}
});
};
4 changes: 4 additions & 0 deletions src/migration/1761123042645-UserReferral.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ export class UserReferral1761123042645 implements MigrationInterface {
)
`);

await queryRunner.query(/* sql */ `
ALTER TABLE "public"."user_referral" REPLICA IDENTITY FULL
`);

await queryRunner.query(/* sql */ `
CREATE INDEX IF NOT EXISTS "IDX_user_referral_type"
ON "user_referral" ("type")
Expand Down
27 changes: 27 additions & 0 deletions src/workers/cdc/primary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ import { PollPost } from '../../entity/posts/PollPost';
import { UserExperienceWork } from '../../entity/user/experiences/UserExperienceWork';
import { UserExperience } from '../../entity/user/experiences/UserExperience';
import { UserExperienceType } from '../../entity/user/experiences/types';
import {
UserReferral,
UserReferralStatus,
} from '../../entity/user/referral/UserReferral';
import { awardReferral } from '../../common/njord';

const isFreeformPostLongEnough = (
freeform: ChangeMessage<FreeformPost>,
Expand Down Expand Up @@ -1520,6 +1525,25 @@ const onUserExperienceChange = async (
}
};

const onUserReferralChange = async (
con: DataSource,
_: FastifyBaseLogger,
data: ChangeMessage<UserReferral>,
) => {
if (data.payload.op !== 'u') {
return;
}

if (
data.payload.before!.status === UserReferralStatus.Pending &&
data.payload.after!.status === UserReferralStatus.Accepted
) {
const referral = data.payload.after!;
const ctx = { userId: referral.userId, con };
await awardReferral({ id: referral.id, ctx });
}
};

const worker: Worker = {
subscription: 'api-cdc',
maxMessages: parseInt(process.env.CDC_WORKER_MAX_MESSAGES) || undefined,
Expand Down Expand Up @@ -1654,6 +1678,9 @@ const worker: Worker = {
case getTableName(con, UserExperience):
await onUserExperienceChange(con, logger, data);
break;
case getTableName(con, UserReferral):
await onUserReferralChange(con, logger, data);
break;
}
} catch (err) {
logger.error(
Expand Down