diff --git a/apps/stack/src/extract/extract-timeline-events.ts b/apps/stack/src/extract/extract-timeline-events.ts index f6051aa16..8f3caa1bb 100644 --- a/apps/stack/src/extract/extract-timeline-events.ts +++ b/apps/stack/src/extract/extract-timeline-events.ts @@ -3,14 +3,15 @@ import { createMessageHandler } from "@stack/config/create-message"; import { z } from "zod"; import { getTimelineEvents, type Context, type GetTimelineEventsEntities, type GetTimelineEventsSourceControl } from "@acme/extract-functions"; -import { mergeRequests, MergeRequestSchema, namespaces, NamespaceSchema, repositories, RepositorySchema, timelineEvents } from "@acme/extract-schema"; +import { members, mergeRequests, MergeRequestSchema, namespaces, NamespaceSchema, repositories, repositoriesToMembers, RepositorySchema, timelineEvents } from "@acme/extract-schema"; import { GitHubSourceControl, GitlabSourceControl } from "@acme/source-control"; -import { extractMergeRequestsEvent } from "./events"; +import { extractMembersEvent, extractMergeRequestsEvent } from "./events"; import { getClerkUserToken } from "./get-clerk-user-token"; import { MessageKind, metadataSchema } from "./messages"; import { getTenantDb, type OmitDb } from "@stack/config/get-tenant-db"; import { Config } from "sst/node/config"; +import { filterNewExtractMembers } from "./filter-extract-members"; export const timelineEventsSenderHandler = createMessageHandler({ queueId: 'ExtractQueue', @@ -29,14 +30,30 @@ export const timelineEventsSenderHandler = createMessageHandler({ context.integrations.sourceControl = await initSourceControl(message.metadata.userId, message.metadata.sourceControl); + const { userId, sourceControl } = message.metadata; const { mergeRequestId, namespaceId, repositoryId } = message.content; - await getTimelineEvents({ + const { members } = await getTimelineEvents({ mergeRequestId, namespaceId, repositoryId, }, { ...context, db: getTenantDb(message.metadata.tenantId) } ); + + const memberIds = filterNewExtractMembers(members).map(member => member.id); + if (memberIds.length === 0) return; + + await extractMembersEvent.publish({ memberIds }, { + crawlId: message.metadata.crawlId, + version: 1, + caller: 'extract-timeline-events', + sourceControl, + userId, + timestamp: new Date().getTime(), + from: message.metadata.from, + to: message.metadata.to, + tenantId: message.metadata.tenantId, + }); } }); @@ -51,6 +68,8 @@ const context: OmitDb { timelineEvents, namespaces, repositories, - mergeRequests + mergeRequests, + members, + repositoriesToMembers }, integrations: { sourceControl: { fetchTimelineEvents } diff --git a/packages/functions/extract/src/get-timeline-events.ts b/packages/functions/extract/src/get-timeline-events.ts index 1384889dc..d04cb5f80 100644 --- a/packages/functions/extract/src/get-timeline-events.ts +++ b/packages/functions/extract/src/get-timeline-events.ts @@ -1,4 +1,4 @@ -import type { TimelineEvents } from "@acme/extract-schema"; +import type { Member, NewMember, TimelineEvents } from "@acme/extract-schema"; import type { Entities, ExtractFunction } from "./config" import type { SourceControl } from "@acme/source-control"; import { eq, sql } from "drizzle-orm"; @@ -11,10 +11,11 @@ export type GetTimelineEventsInputs = { export type GetTimelineEventsOutput = { timelineEvents: TimelineEvents[]; + members: Member[]; }; export type GetTimelineEventsSourceControl = Pick; -export type GetTimelineEventsEntities = Pick; +export type GetTimelineEventsEntities = Pick; export type GetTimelineEventsFunction = ExtractFunction @@ -37,6 +38,42 @@ export const getTimelineEvents: GetTimelineEventsFunction = async ( const { timelineEvents } = await integrations.sourceControl.fetchTimelineEvents(repository, namespace, mergeRequest); + const nonCommitEvents = timelineEvents.filter(ev => ev.type !== "committed"); + + const uniqueTimelineActors = [...nonCommitEvents.reduce((externalIdToActor, event) => + event.actorId ? externalIdToActor.set(event.actorId, { // actorId is optional due to commit events + externalId: event.actorId, + username: event.actorName, + forgeType: repository.forgeType, + extractedSource: 'timeline', + }) : externalIdToActor, new Map()).values()]; + + + const insertedUniqueTimelineActors = uniqueTimelineActors.length === 0 ? [] : await db.transaction(async (tx) => { + return Promise.all(uniqueTimelineActors.map(actor => + tx.insert(entities.members).values(actor) + .onConflictDoUpdate({ + target: [ + entities.members.externalId, + entities.members.forgeType + ], + set: { + username: actor.username, + _updatedAt: sql`(strftime('%s', 'now'))`, + }, + }) + .returning() + .get() + )); + }); + + if (insertedUniqueTimelineActors.length > 0) { + await db.insert(entities.repositoriesToMembers) + .values(insertedUniqueTimelineActors.map(member => ({ memberId: member.id, repositoryId }))) + .onConflictDoNothing() + .run(); + } + const insertedTimelineEvents = await db.transaction(async (tx) => { return Promise.all(timelineEvents.map(event => tx.insert(entities.timelineEvents).values(event) @@ -54,5 +91,6 @@ export const getTimelineEvents: GetTimelineEventsFunction = async ( return { timelineEvents: insertedTimelineEvents, + members: insertedUniqueTimelineActors }; } diff --git a/packages/schemas/extract/src/members.ts b/packages/schemas/extract/src/members.ts index 67b205819..217ccfaf8 100644 --- a/packages/schemas/extract/src/members.ts +++ b/packages/schemas/extract/src/members.ts @@ -13,7 +13,7 @@ export const members = sqliteTable('members', { name: text('name'), username: text('username').notNull(), email: text('email'), - extractedSource: Enum('extracted_source', { enum: ['repository', 'namespace', 'notes'] }), + extractedSource: Enum('extracted_source', { enum: ['repository', 'namespace', 'notes', 'timeline'] }), _createdAt: integer('__created_at', { mode: 'timestamp' }).default(sql`(strftime('%s', 'now'))`), _updatedAt: integer('__updated_at', { mode: 'timestamp' }).default(sql`(strftime('%s', 'now'))`), _extractedAt: integer('__extracted_at', { mode: 'timestamp' }),