Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 24 additions & 32 deletions apps/extract-stack/src/create-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,6 @@ type MessageProps<QueueUrl extends string, Shape extends ZodRawShape, MetadataSh
metadataShape: MetadataShape;
};

export function createBatchMessage<QueueUrl extends string, Shape extends ZodRawShape, MetadataShape extends ZodRawShape>({
queueUrl,
contentShape,
metadataShape,
}: MessageProps<QueueUrl, Shape, MetadataShape>) {

const messageSchema = z.object({
content: z.object(contentShape),
metadata: z.object(metadataShape),
});

const send: BatchSend<Shape, MetadataShape> = async (content, metadata) => {
console.log("sending", { content, metadata });
await sqs.sendMessageBatch({
QueueUrl: queueUrl,
Entries: content.map((c) => ({
Id: nanoid(),
MessageBody: JSON.stringify(messageSchema.parse({ content: c, metadata })),
})),
}).promise();
}

return {
send,
}
}

export function createMessage<QueueUrl extends string, Shape extends ZodRawShape, MetadataShape extends ZodRawShape>({
queueUrl,
contentShape,
Expand All @@ -70,17 +43,36 @@ export function createMessage<QueueUrl extends string, Shape extends ZodRawShape
}).promise();
}

const sendAll: BatchSend<Shape, MetadataShape> = async (contentArray, metadata) => {
for (let i = 0; i < contentArray.length; i += 10) {
const contentBatch = contentArray.slice(i, i + 10);
const Entries = contentBatch.map(content => JSON.stringify(messageSchema.parse({ content, metadata })))
.map(MessageBody => ({
Id: nanoid(),
MessageBody
}));
console.log("sending batch", Entries);
try {
await sqs.sendMessageBatch({
QueueUrl: queueUrl,
Entries
}).promise();
} catch (error) {
console.error(error);
}
}

}

return {
send,
sendAll
}
}

type Sender<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
send: Send<Shape, MetadataShape>;
}

type BatchSender<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
send: BatchSend<Shape, MetadataShape>
sendAll: BatchSend<Shape, MetadataShape>
}

type MessagePayload<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
Expand All @@ -89,7 +81,7 @@ type MessagePayload<Shape extends ZodRawShape, MetadataShape extends ZodRawShape
}

export function QueueHandler<Shape extends ZodRawShape, MetadataShape extends ZodRawShape>(
_sender: Sender<Shape, MetadataShape> | BatchSender<Shape, MetadataShape>,
_sender: Sender<Shape, MetadataShape>,
cb: (
message: MessagePayload<Shape, MetadataShape>
) => Promise<void>
Expand Down
42 changes: 19 additions & 23 deletions apps/extract-stack/src/extract-member.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { Namespace, Repository } from "@acme/extract-schema";
import { GitHubSourceControl, GitlabSourceControl } from "@acme/source-control";
import type { Pagination } from "@acme/source-control";
import { Config } from "sst/node/config";
import { extractMemberPageBatchMessage } from "./messages";
import { extractMemberPageMessage } from "./messages";

import { QueueHandler } from "./create-message";

Expand Down Expand Up @@ -61,7 +61,7 @@ const extractMembersPage = async ({ namespace, repository, sourceControl, userId
context.integrations.sourceControl = await initSourceControl(userId, sourceControl);
} catch (error) {
console.error(error);
return;
throw error;
}

const { paginationInfo: resultPaginationInfo } = await getMembers({
Expand All @@ -76,11 +76,7 @@ const extractMembersPage = async ({ namespace, repository, sourceControl, userId
return resultPaginationInfo;
};

const range = (a: number, b: number) => Array.apply(0, { length: b - a + 1 } as number[]).map((_, index) => index + a);
const chunks = <T>(array: Array<T>, size: number) => Array.apply(0, { length: Math.ceil(array.length / size) } as unknown[]).map((_, index) => array.slice(index * size, (index + 1) * size));

export const eventHandler = EventHandler(extractRepositoryEvent, async (ev) => {

const pagination = await extractMembersPage({
namespace: ev.properties.namespace,
repository: ev.properties.repository,
Expand All @@ -89,32 +85,32 @@ export const eventHandler = EventHandler(extractRepositoryEvent, async (ev) => {
paginationInfo: { page: 1, perPage: 2, totalPages: 1000 },
});

if (!pagination) return;

const remainingMemberPages = range(2, pagination.totalPages)
.map(page => ({
page,
perPage: pagination.perPage,
totalPages: pagination.totalPages
} satisfies Pagination));

const batchedPages = chunks(remainingMemberPages, 10);

await Promise.all(batchedPages.map(batch => extractMemberPageBatchMessage.send(
batch.map(page => ({
const arrayOfExtractMemberPageMessageContent: { repository: Repository, namespace: Namespace | null, pagination: Pagination }[] = [];
for (let i = 2; i <= pagination.totalPages; i++) {
arrayOfExtractMemberPageMessageContent.push({
namespace: ev.properties.namespace,
repository: ev.properties.repository,
pagination: page
})), {
pagination: {
page: i,
perPage: pagination.perPage,
totalPages: pagination.totalPages
}
})
}

if (arrayOfExtractMemberPageMessageContent.length === 0) return console.log("No more pages left, no need to enqueue");

await extractMemberPageMessage.sendAll(arrayOfExtractMemberPageMessageContent, {
version: 1,
caller: 'extract-member',
sourceControl: ev.metadata.sourceControl,
userId: ev.metadata.userId,
timestamp: new Date().getTime(),
})));
})

});

export const queueHandler = QueueHandler(extractMemberPageBatchMessage, async (message) => {
export const queueHandler = QueueHandler(extractMemberPageMessage, async (message) => {
await extractMembersPage({
namespace: message.content.namespace,
paginationInfo: message.content.pagination,
Expand Down
4 changes: 2 additions & 2 deletions apps/extract-stack/src/messages.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { z } from "zod";
import { RepositorySchema } from "@acme/extract-schema";
import { NamespaceSchema } from "@acme/extract-schema/src/namespaces";
import { createBatchMessage } from "./create-message";
import { createMessage } from "./create-message";
import { Queue } from 'sst/node/queue'

const paginationSchema = z.object({
Expand All @@ -24,7 +24,7 @@ const metadataSchema = z.object({
userId: z.string(),
});

export const extractMemberPageBatchMessage = createBatchMessage({
export const extractMemberPageMessage = createMessage({
metadataShape: metadataSchema.shape,
contentShape: extractMemberPageMessageSchema.shape,
queueUrl: Queue.ExtractMemberPageQueue.queueUrl
Expand Down