Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions apps/extract-stack/src/create-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,18 @@ export function createMessage<QueueUrl extends string, Shape extends ZodRawShape

return {
send,
sendAll
sendAll,
shapes: {
contentShape,
metadataShape,
}
}
}

type Sender<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
send: Send<Shape, MetadataShape>;
sendAll: BatchSend<Shape, MetadataShape>
shapes: { contentShape: Shape, metadataShape: MetadataShape };
}

type MessagePayload<Shape extends ZodRawShape, MetadataShape extends ZodRawShape> = {
Expand All @@ -86,6 +91,10 @@ export function QueueHandler<Shape extends ZodRawShape, MetadataShape extends Zo
message: MessagePayload<Shape, MetadataShape>
) => Promise<void>
) {
const schema = z.object({
content: z.object(_sender.shapes.contentShape),
metadata: z.object(_sender.shapes.metadataShape)
});
/**
* TODO:
* - Do consumers always recieve batches ?
Expand All @@ -94,7 +103,8 @@ export function QueueHandler<Shape extends ZodRawShape, MetadataShape extends Zo
return async (event: SQSEvent) => {
if (event.Records.length > 1) console.warn('WARNING: QueueHandler should process 1 message but got', event.Records.length);
for (const record of event.Records) {
await cb(JSON.parse(record.body) as MessagePayload<Shape, MetadataShape>);
const parsed = schema.parse(JSON.parse(record.body) as unknown) as MessagePayload<Shape, MetadataShape>;
await cb(parsed);
}
}
}
6 changes: 2 additions & 4 deletions apps/extract-stack/src/events.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import { EventBus } from "sst/node/event-bus";
import { z } from "zod";

import { RepositorySchema } from "@acme/extract-schema";
import { MergeRequestSchema } from "@acme/extract-schema/src/merge-requests";
import { NamespaceSchema } from "@acme/extract-schema/src/namespaces";
import { createEvent } from "./create-event";

const extractRepositoryEventSchema = z.object({
repository: RepositorySchema,
namespace: z.nullable(NamespaceSchema),
repositoryId: z.number(),
namespaceId: z.nullable(z.number()),
});

const metadataSchema = z.object({
Expand Down
19 changes: 14 additions & 5 deletions apps/extract-stack/src/extract-members.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import { createClient } from "@libsql/client";
import { drizzle } from "drizzle-orm/libsql";
import { getMembers } from "@acme/extract-functions";
import type { Context, GetMembersEntities, GetMembersSourceControl } from "@acme/extract-functions";
import { members, repositoriesToMembers } from "@acme/extract-schema";
import { members, namespaces, repositories, repositoriesToMembers } from "@acme/extract-schema";
import type { Namespace, Repository } from "@acme/extract-schema";
import { GitHubSourceControl, GitlabSourceControl } from "@acme/source-control";
import type { Pagination } from "@acme/source-control";
import { Config } from "sst/node/config";
import { extractMemberPageMessage } from "./messages";

import { QueueHandler } from "./create-message";
import { eq } from "drizzle-orm";

const clerkClient = Clerk({ secretKey: Config.CLERK_SECRET_KEY });
const client = createClient({ url: Config.DATABASE_URL, authToken: Config.DATABASE_AUTH_TOKEN });
Expand Down Expand Up @@ -72,18 +73,26 @@ const extractMembersPage = async ({ namespace, repository, sourceControl, userId
};

export const eventHandler = EventHandler(extractRepositoryEvent, async (ev) => {
if (!ev.properties.namespaceId) throw new Error("Missing namespaceId");

const repository = await db.select().from(repositories).where(eq(repositories.id, ev.properties.repositoryId)).get();
const namespace = await db.select().from(namespaces).where(eq(namespaces.id, ev.properties.namespaceId)).get();

if (!repository) throw new Error("invalid repo id");
if (!namespace) throw new Error("Invalid namespace id");

const pagination = await extractMembersPage({
namespace: ev.properties.namespace,
repository: ev.properties.repository,
namespace: namespace,
repository: repository,
sourceControl: ev.metadata.sourceControl,
userId: ev.metadata.userId,
});

const arrayOfExtractMemberPageMessageContent: { repository: Repository, namespace: Namespace | null, pagination: Pagination }[] = [];
for (let i = 2; i <= pagination.totalPages; i++) {
arrayOfExtractMemberPageMessageContent.push({
namespace: ev.properties.namespace,
repository: ev.properties.repository,
namespace: namespace,
repository: repository,
pagination: {
page: i,
perPage: pagination.perPage,
Expand Down
2 changes: 1 addition & 1 deletion apps/extract-stack/src/extract-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export const handler = ApiHandler(async (ev) => {

const { repository, namespace } = await getRepository({ externalRepositoryId: repositoryId, repositoryName, namespaceName }, context);

await extractRepositoryEvent.publish({ repository, namespace }, { caller: 'extract-repository', timestamp: new Date().getTime(), version: 1, sourceControl, userId: sub });
await extractRepositoryEvent.publish({ repositoryId: repository.id, namespaceId: namespace?.id || null }, { caller: 'extract-repository', timestamp: new Date().getTime(), version: 1, sourceControl, userId: sub });

return {
statusCode: 200,
Expand Down
11 changes: 9 additions & 2 deletions packages/schemas/extract/src/members.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { InferModel } from 'drizzle-orm';
import { sql } from 'drizzle-orm';
import { sqliteTable, text, integer, uniqueIndex } from 'drizzle-orm/sqlite-core';
import { createInsertSchema, createSelectSchema } from 'drizzle-zod';
import { z } from 'zod';

export const members = sqliteTable('members', {
id: integer('id').primaryKey(),
Expand All @@ -16,5 +17,11 @@ export const members = sqliteTable('members', {

export type Member = InferModel<typeof members>;
export type NewMember = InferModel<typeof members, 'insert'>;
export const MemberSchema = createSelectSchema(members);
export const NewMemberSchema = createInsertSchema(members);
export const MemberSchema = createSelectSchema(members, {
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
});
export const NewMemberSchema = createInsertSchema(members, {
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
});
11 changes: 9 additions & 2 deletions packages/schemas/extract/src/merge-requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { InferModel } from "drizzle-orm";
import { sql } from "drizzle-orm";
import { integer, sqliteTable, uniqueIndex } from "drizzle-orm/sqlite-core";
import { createInsertSchema } from "drizzle-zod";
import { z } from "zod";

export const mergeRequests = sqliteTable(
"merge_requests",
Expand All @@ -24,5 +25,11 @@ export const mergeRequests = sqliteTable(

export type MergeRequest = InferModel<typeof mergeRequests>;
export type NewMergeRequest = InferModel<typeof mergeRequests, "insert">;
export const MergeRequestSchema = createInsertSchema(mergeRequests);
export const NewMergeRequestSchema = createInsertSchema(mergeRequests);
export const MergeRequestSchema = createInsertSchema(mergeRequests, {
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
});
export const NewMergeRequestSchema = createInsertSchema(mergeRequests, {
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
});
11 changes: 9 additions & 2 deletions packages/schemas/extract/src/namespaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { InferModel } from 'drizzle-orm';
import { sql } from 'drizzle-orm';
import { sqliteTable, text, integer, uniqueIndex } from 'drizzle-orm/sqlite-core';
import { createInsertSchema, createSelectSchema } from 'drizzle-zod';
import { z } from 'zod';

export const namespaces = sqliteTable('namespaces', {
id: integer('id').primaryKey(),
Expand All @@ -15,5 +16,11 @@ export const namespaces = sqliteTable('namespaces', {

export type Namespace = InferModel<typeof namespaces>;
export type NewNamespace = InferModel<typeof namespaces, 'insert'>;
export const NewNamespaceSchema = createInsertSchema(namespaces);
export const NamespaceSchema = createSelectSchema(namespaces);
export const NewNamespaceSchema = createInsertSchema(namespaces, {
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
});
export const NamespaceSchema = createSelectSchema(namespaces, {
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
});
11 changes: 9 additions & 2 deletions packages/schemas/extract/src/repositories.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { InferModel } from 'drizzle-orm';
import { sql } from 'drizzle-orm';
import { sqliteTable, integer,text, uniqueIndex } from 'drizzle-orm/sqlite-core';
import { createInsertSchema, createSelectSchema } from 'drizzle-zod';
import { z } from 'zod';

export const repositories = sqliteTable('repositories', {
id: integer('id').primaryKey(),
Expand All @@ -15,5 +16,11 @@ export const repositories = sqliteTable('repositories', {

export type Repository = InferModel<typeof repositories>;
export type NewRepository = InferModel<typeof repositories, 'insert'>;
export const NewRepositorySchema = createInsertSchema(repositories);
export const RepositorySchema = createSelectSchema(repositories);
export const NewRepositorySchema = createInsertSchema(repositories, {
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
});
export const RepositorySchema = createSelectSchema(repositories, {
createdAt: z.coerce.date(),
updatedAt: z.coerce.date(),
});