diff --git a/apps/extract-stack/src/create-message.ts b/apps/extract-stack/src/create-message.ts index 468052b58..a989ab5d2 100644 --- a/apps/extract-stack/src/create-message.ts +++ b/apps/extract-stack/src/create-message.ts @@ -24,33 +24,6 @@ type MessageProps({ - queueUrl, - contentShape, - metadataShape, -}: MessageProps) { - - const messageSchema = z.object({ - content: z.object(contentShape), - metadata: z.object(metadataShape), - }); - - const send: BatchSend = async (content, metadata) => { - console.log("sending", { content, metadata }); - await sqs.sendMessageBatch({ - QueueUrl: queueUrl, - Entries: content.map((c) => ({ - Id: nanoid(), - MessageBody: JSON.stringify(messageSchema.parse({ content: c, metadata })), - })), - }).promise(); - } - - return { - send, - } -} - export function createMessage({ queueUrl, contentShape, @@ -70,17 +43,36 @@ export function createMessage = async (contentArray, metadata) => { + for (let i = 0; i < contentArray.length; i += 10) { + const contentBatch = contentArray.slice(i, i + 10); + const Entries = contentBatch.map(content => JSON.stringify(messageSchema.parse({ content, metadata }))) + .map(MessageBody => ({ + Id: nanoid(), + MessageBody + })); + console.log("sending batch", Entries); + try { + await sqs.sendMessageBatch({ + QueueUrl: queueUrl, + Entries + }).promise(); + } catch (error) { + console.error(error); + } + } + + } + return { send, + sendAll } } type Sender = { send: Send; -} - -type BatchSender = { - send: BatchSend + sendAll: BatchSend } type MessagePayload = { @@ -89,7 +81,7 @@ type MessagePayload( - _sender: Sender | BatchSender, + _sender: Sender, cb: ( message: MessagePayload ) => Promise diff --git a/apps/extract-stack/src/extract-member.ts b/apps/extract-stack/src/extract-member.ts index 6ba77d32c..64f26f489 100644 --- a/apps/extract-stack/src/extract-member.ts +++ b/apps/extract-stack/src/extract-member.ts @@ -10,7 +10,7 @@ import type { Namespace, Repository } from "@acme/extract-schema"; import { GitHubSourceControl, GitlabSourceControl } from "@acme/source-control"; import type { Pagination } from "@acme/source-control"; import { Config } from "sst/node/config"; -import { extractMemberPageBatchMessage } from "./messages"; +import { extractMemberPageMessage } from "./messages"; import { QueueHandler } from "./create-message"; @@ -61,7 +61,7 @@ const extractMembersPage = async ({ namespace, repository, sourceControl, userId context.integrations.sourceControl = await initSourceControl(userId, sourceControl); } catch (error) { console.error(error); - return; + throw error; } const { paginationInfo: resultPaginationInfo } = await getMembers({ @@ -76,11 +76,7 @@ const extractMembersPage = async ({ namespace, repository, sourceControl, userId return resultPaginationInfo; }; -const range = (a: number, b: number) => Array.apply(0, { length: b - a + 1 } as number[]).map((_, index) => index + a); -const chunks = (array: Array, size: number) => Array.apply(0, { length: Math.ceil(array.length / size) } as unknown[]).map((_, index) => array.slice(index * size, (index + 1) * size)); - export const eventHandler = EventHandler(extractRepositoryEvent, async (ev) => { - const pagination = await extractMembersPage({ namespace: ev.properties.namespace, repository: ev.properties.repository, @@ -89,32 +85,32 @@ export const eventHandler = EventHandler(extractRepositoryEvent, async (ev) => { paginationInfo: { page: 1, perPage: 2, totalPages: 1000 }, }); - if (!pagination) return; - - const remainingMemberPages = range(2, pagination.totalPages) - .map(page => ({ - page, - perPage: pagination.perPage, - totalPages: pagination.totalPages - } satisfies Pagination)); - - const batchedPages = chunks(remainingMemberPages, 10); - - await Promise.all(batchedPages.map(batch => extractMemberPageBatchMessage.send( - batch.map(page => ({ + const arrayOfExtractMemberPageMessageContent: { repository: Repository, namespace: Namespace | null, pagination: Pagination }[] = []; + for (let i = 2; i <= pagination.totalPages; i++) { + arrayOfExtractMemberPageMessageContent.push({ namespace: ev.properties.namespace, repository: ev.properties.repository, - pagination: page - })), { + pagination: { + page: i, + perPage: pagination.perPage, + totalPages: pagination.totalPages + } + }) + } + + if (arrayOfExtractMemberPageMessageContent.length === 0) return console.log("No more pages left, no need to enqueue"); + + await extractMemberPageMessage.sendAll(arrayOfExtractMemberPageMessageContent, { version: 1, caller: 'extract-member', sourceControl: ev.metadata.sourceControl, userId: ev.metadata.userId, timestamp: new Date().getTime(), - }))); + }) + }); -export const queueHandler = QueueHandler(extractMemberPageBatchMessage, async (message) => { +export const queueHandler = QueueHandler(extractMemberPageMessage, async (message) => { await extractMembersPage({ namespace: message.content.namespace, paginationInfo: message.content.pagination, diff --git a/apps/extract-stack/src/messages.ts b/apps/extract-stack/src/messages.ts index 1c8d4e443..b16ef963f 100644 --- a/apps/extract-stack/src/messages.ts +++ b/apps/extract-stack/src/messages.ts @@ -1,7 +1,7 @@ import { z } from "zod"; import { RepositorySchema } from "@acme/extract-schema"; import { NamespaceSchema } from "@acme/extract-schema/src/namespaces"; -import { createBatchMessage } from "./create-message"; +import { createMessage } from "./create-message"; import { Queue } from 'sst/node/queue' const paginationSchema = z.object({ @@ -24,7 +24,7 @@ const metadataSchema = z.object({ userId: z.string(), }); -export const extractMemberPageBatchMessage = createBatchMessage({ +export const extractMemberPageMessage = createMessage({ metadataShape: metadataSchema.shape, contentShape: extractMemberPageMessageSchema.shape, queueUrl: Queue.ExtractMemberPageQueue.queueUrl