diff --git a/apps/stack/src/transform/transform-forge-users.ts b/apps/stack/src/transform/transform-forge-users.ts deleted file mode 100644 index 51a7809e6..000000000 --- a/apps/stack/src/transform/transform-forge-users.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { createClient } from "@libsql/client"; -import { drizzle } from "drizzle-orm/libsql"; - -import { Config } from "sst/node/config"; -import { EventHandler } from "@stack/config/create-event"; - -import { extractMemberInfoEvent } from "@stack/extract/events"; -import * as extract from "@acme/extract-schema"; -import * as transform from "@acme/transform-schema"; -import type { Context, SetForgeUsersExtractEntities, SetForgeUsersTransformEntities } from "@acme/transform-functions"; - -import { setForgeUsers } from "@acme/transform-functions"; - -const context: Context = { - extract: { - db: drizzle(createClient({ url: Config.EXTRACT_DATABASE_URL, authToken: Config.EXTRACT_DATABASE_AUTH_TOKEN })), - entities: { - members: extract.members - } - }, - transform: { - db: drizzle(createClient({ url: Config.TRANSFORM_DATABASE_URL, authToken: Config.TRANSFORM_DATABASE_AUTH_TOKEN })), - entities: { - forgeUsers: transform.forgeUsers - } - } -} - -export const eventHandler = EventHandler(extractMemberInfoEvent, async (evt) => { - await setForgeUsers({ extractMemberIds: [evt.properties.memberId] }, context); -}); diff --git a/apps/stack/src/transform/transform-merge-requests.ts b/apps/stack/src/transform/transform-merge-requests.ts deleted file mode 100644 index 78db38b0f..000000000 --- a/apps/stack/src/transform/transform-merge-requests.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { createClient } from "@libsql/client"; -import { drizzle } from "drizzle-orm/libsql"; - -import { Config } from "sst/node/config"; -import { EventHandler } from "@stack/config/create-event"; - -import { extractMergeRequestsEvent } from "@stack/extract/events"; -import * as extract from "@acme/extract-schema"; -import * as transform from "@acme/transform-schema"; -import type { Context, SetMergeRequestsTransformEntities, SetMergeRequestsExtractEntities } from "@acme/transform-functions"; - -import { setMergeRequests } from "@acme/transform-functions"; - -const context: Context = { - extract: { - db: drizzle(createClient({ url: Config.EXTRACT_DATABASE_URL, authToken: Config.EXTRACT_DATABASE_AUTH_TOKEN })), - entities: { - mergeRequests: extract.mergeRequests, - repositories: extract.repositories - } - }, - transform: { - db: drizzle(createClient({ url: Config.TRANSFORM_DATABASE_URL, authToken: Config.TRANSFORM_DATABASE_AUTH_TOKEN })), - entities: { - mergeRequests: transform.mergeRequests - } - } -} - -export const eventHandler = EventHandler(extractMergeRequestsEvent, async (evt) => { - await setMergeRequests({ extractMergeRequestIds: evt.properties.mergeRequestIds }, context); -}); \ No newline at end of file diff --git a/apps/stack/src/transform/transform-repository.ts b/apps/stack/src/transform/transform-repository.ts deleted file mode 100644 index 39b989c4d..000000000 --- a/apps/stack/src/transform/transform-repository.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { createClient } from "@libsql/client"; -import { drizzle } from "drizzle-orm/libsql"; - -import { Config } from "sst/node/config"; -import { EventHandler } from "@stack/config/create-event"; - -import { extractRepositoryEvent } from "@stack/extract/events"; -import * as extract from "@acme/extract-schema"; -import * as transform from "@acme/transform-schema"; -import type { Context, SetRepositoryExtractEntities, SetRepositoryTransformEntities } from "@acme/transform-functions"; -import { setRepository } from "@acme/transform-functions"; - -const context: Context = { - extract: { - db: drizzle(createClient({ url: Config.EXTRACT_DATABASE_URL, authToken: Config.EXTRACT_DATABASE_AUTH_TOKEN })), - entities: { - repositories: extract.repositories - } - }, - transform: { - db: drizzle(createClient({ url: Config.TRANSFORM_DATABASE_URL, authToken: Config.TRANSFORM_DATABASE_AUTH_TOKEN })), - entities: { - repositories: transform.repositories - } - } -} - -export const eventHandler = EventHandler(extractRepositoryEvent, async (evt) => { - await setRepository({ extractRepositoryId: evt.properties.repositoryId }, context); -}); \ No newline at end of file diff --git a/apps/stack/stacks/TransformStack.ts b/apps/stack/stacks/TransformStack.ts index 3d02cf1a5..36e34df7b 100644 --- a/apps/stack/stacks/TransformStack.ts +++ b/apps/stack/stacks/TransformStack.ts @@ -11,7 +11,6 @@ import { z } from "zod"; export function TransformStack({ stack }: StackContext) { const { - ExtractBus, EXTRACT_DATABASE_AUTH_TOKEN, EXTRACT_DATABASE_URL, CRAWL_DATABASE_URL, @@ -20,58 +19,6 @@ export function TransformStack({ stack }: StackContext) { const TRANSFORM_DATABASE_URL = new Config.Secret(stack, "TRANSFORM_DATABASE_URL"); const TRANSFORM_DATABASE_AUTH_TOKEN = new Config.Secret(stack, "TRANSFORM_DATABASE_AUTH_TOKEN"); - ExtractBus.addRules(stack, { - "transformRepository": { - pattern: { - source: ["extract"], - detailType: ["repository"] - }, - targets: { - transformRepository: { - function: { - bind: [TRANSFORM_DATABASE_URL, TRANSFORM_DATABASE_AUTH_TOKEN], - handler: "src/transform/transform-repository.eventHandler", - } - } - }, - - } - }); - - ExtractBus.addRules(stack, { - "transformMergeRequests": { - pattern: { - source: ["extract"], - detailType: ["mergeRequest"] - }, - targets: { - transformMergeRequests: { - function: { - bind: [TRANSFORM_DATABASE_URL, TRANSFORM_DATABASE_AUTH_TOKEN], - handler: "src/transform/transform-merge-requests.eventHandler", - } - } - }, - } - }); - - ExtractBus.addRules(stack, { - "transformForgeUsers": { - pattern: { - source: ["extract"], - detailType: ["memberInfo"] - }, - targets: { - transformForgeUsers: { - function: { - bind: [TRANSFORM_DATABASE_URL, TRANSFORM_DATABASE_AUTH_TOKEN], - handler: "src/transform/transform-forge-users.eventHandler", - } - } - }, - } - }); - const transformTestingQueue = new Queue(stack, "TransformTestQueue"); transformTestingQueue.addConsumer(stack, { cdk: { diff --git a/packages/functions/transform/src/merge-request-metrics.ts b/packages/functions/transform/src/merge-request-metrics.ts index 6e454e26a..11343805d 100644 --- a/packages/functions/transform/src/merge-request-metrics.ts +++ b/packages/functions/transform/src/merge-request-metrics.ts @@ -1,6 +1,6 @@ import * as extract from '@acme/extract-schema'; import * as transform from '@acme/transform-schema'; -import { sql, eq, or, and, type ExtractTablesWithRelations } from "drizzle-orm"; +import { sql, eq, or, and, type ExtractTablesWithRelations, inArray } from "drizzle-orm"; import type { LibSQLDatabase } from 'drizzle-orm/libsql'; import { isCodeGen } from './is-codegen'; import { parseHunks } from './parse-hunks'; @@ -77,7 +77,7 @@ function upsertMergeRequest(db: TransformDatabase, mergeRequest: transform.NewMe .returning(); } -function _upsertForgeUser(db: TransformDatabase, forgeUser: transform.NewForgeUser) { +function upsertForgeUser(db: TransformDatabase, forgeUser: transform.NewForgeUser) { return db.insert(transform.forgeUsers) .values(forgeUser) .onConflictDoUpdate({ @@ -282,24 +282,26 @@ type MapUsersToJunksArgs = { reviewers: transform.ForgeUser['id'][] } -function addUnique(newElement: number, currentArray: number[]) { - if (!currentArray.includes(newElement)) { - currentArray.push(newElement); - } +type MappedUsersTypesArgs = { + author: number, + mergedBy: number | null, + approvers: number[], + committers: string[], + reviewers: number[] } -async function getId(actorId: number, db: TransformDatabase) { - return await db.select({ - id: transform.forgeUsers.id, - }).from(transform.forgeUsers) - .where(eq(transform.forgeUsers.externalId, actorId)).get(); +type TransformUserArgs = { + externalId: transform.ForgeUser['externalId']; + name: transform.ForgeUser['name']; + forgeType: transform.ForgeUser['forgeType']; } -async function getUserIds(timelineEvents: TimelineEventData[], extractDb: ExtractDatabase, transformDb: TransformDatabase) { - const reviewers: number[] = []; - const approvers: number[] = []; - const committers: number[] = []; - let mergedBy; +function getUserData(timelineEvents: TimelineEventData[], authorExternalId: number) { + const reviewers = new Set(); + const approvers = new Set(); + const committers = new Set(); + let mergedBy: number | undefined; + const author: number = authorExternalId; for (const timelineEvent of timelineEvents) { switch (timelineEvent.type) { @@ -307,49 +309,85 @@ async function getUserIds(timelineEvents: TimelineEventData[], extractDb: Extrac if (!timelineEvent.actorId) { break; } - const reviewer = await getId(timelineEvent.actorId, transformDb); - if (reviewer) { - addUnique(reviewer.id, reviewers); - if (timelineEvent.data && ((timelineEvent.data as extract.ReviewedEvent).state === 'approved')) { - addUnique(reviewer?.id, approvers); - } + reviewers.add(timelineEvent.actorId); + if (timelineEvent.data && ((timelineEvent.data as extract.ReviewedEvent).state === 'approved')) { + approvers.add(timelineEvent.actorId); } break; case 'committed': - const data = timelineEvent.data as extract.CommittedEvent; - const extractUserExternalId = await extractDb.select({ - id: extract.members.externalId, - }).from(extract.members) - .where(or( - eq(extract.members.username, data.committerName), - eq(extract.members.name, data.committerName)) - ).get(); - if (extractUserExternalId) { - const committer = await getId(extractUserExternalId.id, transformDb); - if (committer) { - addUnique(committer.id, committers); - } - } + committers.add((timelineEvent.data as extract.CommittedEvent).committerName); break; - case 'merged': + case 'merged': if (!timelineEvent.actorId) { break; } - mergedBy = await getId(timelineEvent.actorId, transformDb); + mergedBy = timelineEvent.actorId; break; default: break; } } - + return { - mergedBy: mergedBy?.id, - approvers, - committers, - reviewers, + author, + mergedBy: mergedBy ? mergedBy : null, + approvers: [...approvers.keys()], + committers: [...committers.keys()], + reviewers: [...reviewers.keys()], }; } +async function getTransformUserData( extractDb: ExtractDatabase, transformDb: TransformDatabase, users: MappedUsersTypesArgs) { + const { author, mergedBy, approvers, committers, reviewers } = users; + const allUsers = new Set(); + const transformUsers: transform.ForgeUser[] = []; + allUsers.add(author); + if (mergedBy) { + allUsers.add(mergedBy); + } + approvers.forEach((approver) => {{allUsers.add(approver);}}); + reviewers.forEach((reverse) => {{allUsers.add(reverse);}}); + + + const response = await extractDb + .select({ + externalId: extract.members.externalId, + name: extract.members.name, + forgeType: extract.members.forgeType, + userName: extract.members.username, + }) + .from(extract.members) + .where( + or( + allUsers.size > 0 ? inArray(extract.members.externalId, [...allUsers.keys()]) : undefined, + committers.length > 0 ? inArray(extract.members.name, committers) : undefined, + committers.length > 0 ? inArray(extract.members.username, committers) : undefined, + ), + ) + .all(); + + for (const res of response) { + if (res.name === null) { + res.name = res.userName; + } + transformUsers.push(await upsertForgeUser(transformDb, res as TransformUserArgs).returning().get()); + } + + const transformAuthor = transformUsers.find(({ externalId }) => externalId === author)?.id; + const transformMergedBy = transformUsers.find(({ externalId }) => externalId === mergedBy)?.id; + const transformApprovers = approvers.map((approver) => transformUsers.find(({ externalId }) => externalId === approver)?.id as number); + const transformCommitters = committers.map((committer) => transformUsers.find(({ name }) => name === committer)?.id as number); + const transformReviewers = reviewers.map((reviewer) => transformUsers.find(({ externalId }) => externalId === reviewer)?.id as number); + + return { + author: transformAuthor ? transformAuthor : null, + mergedBy: transformMergedBy ? transformMergedBy : null, + approvers: transformApprovers ? transformApprovers : [], + committers: transformCommitters ? transformCommitters : [], + reviewers: transformReviewers ? transformReviewers : [], + } +} + function mapUsersToJunk({ author, mergedBy, approvers, committers, reviewers }: MapUsersToJunksArgs, nullForgeUserId: number) { return { author: author || nullForgeUserId, @@ -730,7 +768,9 @@ export async function run(extractMergeRequestId: number, ctx: RunContext) { const timeline = runTimeline(extractData.mergeRequest, extractData.timelineEvents, extractData.notes); - const users = await getUserIds(extractData.timelineEvents, ctx.extractDatabase, ctx.transformDatabase); + const timelineUsers = getUserData(extractData.timelineEvents, extractData.mergeRequest.authorExternalId as number); + + const transformUsersIds = await getTransformUserData(ctx.extractDatabase, ctx.transformDatabase, timelineUsers); const { dateId: nullDateId, @@ -757,14 +797,11 @@ export async function run(extractMergeRequestId: number, ctx: RunContext) { const reviewDuration = calculateDuration(timeline.startedReviewAt, extractData.mergeRequest.closedAt); const usersJunk = mapUsersToJunk({ - author: (await ctx.transformDatabase.select().from(transform.forgeUsers).where(and( - eq(transform.forgeUsers.externalId, extractData.mergeRequest.authorExternalId || 0), - eq(transform.forgeUsers.forgeType, extractData.repository.forgeType), - )).get())?.id || null, // TODO: ??? - mergedBy: users.mergedBy, - approvers: users.approvers, - committers: users.committers, - reviewers: users.reviewers, + author: transformUsersIds.author, + mergedBy: transformUsersIds.mergedBy, + approvers: transformUsersIds.approvers, + committers: transformUsersIds.committers, + reviewers: transformUsersIds.reviewers, }, nullUserId); const { id: transformRepositoryId } = await upsertRepository(ctx.transformDatabase, extractData.repository).get();