diff --git a/apps/web/actions/organization/create-space.ts b/apps/web/actions/organization/create-space.ts index fceff5a72..0a32efe28 100644 --- a/apps/web/actions/organization/create-space.ts +++ b/apps/web/actions/organization/create-space.ts @@ -5,10 +5,12 @@ import { getCurrentUser } from "@cap/database/auth/session"; import { nanoId, nanoIdLength } from "@cap/database/helpers"; import { spaceMembers, spaces, users } from "@cap/database/schema"; import { serverEnv } from "@cap/env"; +import { S3Buckets } from "@cap/web-backend"; import { and, eq, inArray } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { revalidatePath } from "next/cache"; import { v4 as uuidv4 } from "uuid"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "@/lib/server"; interface CreateSpaceResponse { success: boolean; @@ -89,25 +91,29 @@ export async function createSpace( user.activeOrganizationId }/spaces/${spaceId}/icon-${Date.now()}.${fileExtension}`; - const bucket = await createBucketProvider(); - - await bucket.putObject(fileKey, await iconFile.bytes(), { - contentType: iconFile.type, - }); - - // Construct the icon URL - if (serverEnv().CAP_AWS_BUCKET_URL) { - // If a custom bucket URL is defined, use it - iconUrl = `${serverEnv().CAP_AWS_BUCKET_URL}/${fileKey}`; - } else if (serverEnv().CAP_AWS_ENDPOINT) { - // For custom endpoints like MinIO - iconUrl = `${serverEnv().CAP_AWS_ENDPOINT}/${bucket.name}/${fileKey}`; - } else { - // Default AWS S3 URL format - iconUrl = `https://${bucket.name}.s3.${ - serverEnv().CAP_AWS_REGION || "us-east-1" - }.amazonaws.com/${fileKey}`; - } + await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess(Option.none()); + + yield* bucket.putObject( + fileKey, + yield* Effect.promise(() => iconFile.bytes()), + { contentType: iconFile.type }, + ); + + // Construct the icon URL + if (serverEnv().CAP_AWS_BUCKET_URL) { + // If a custom bucket URL is defined, use it + iconUrl = `${serverEnv().CAP_AWS_BUCKET_URL}/${fileKey}`; + } else if (serverEnv().CAP_AWS_ENDPOINT) { + // For custom endpoints like MinIO + iconUrl = `${serverEnv().CAP_AWS_ENDPOINT}/${bucket.bucketName}/${fileKey}`; + } else { + // Default AWS S3 URL format + iconUrl = `https://${bucket.bucketName}.s3.${ + serverEnv().CAP_AWS_REGION || "us-east-1" + }.amazonaws.com/${fileKey}`; + } + }).pipe(runPromise); } catch (error) { console.error("Error uploading space icon:", error); return { diff --git a/apps/web/actions/organization/delete-space.ts b/apps/web/actions/organization/delete-space.ts index 37c6588ad..066222621 100644 --- a/apps/web/actions/organization/delete-space.ts +++ b/apps/web/actions/organization/delete-space.ts @@ -8,9 +8,11 @@ import { spaces, spaceVideos, } from "@cap/database/schema"; +import { S3Buckets } from "@cap/web-backend"; import { eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { revalidatePath } from "next/cache"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "@/lib/server"; interface DeleteSpaceResponse { success: boolean; @@ -67,25 +69,27 @@ export async function deleteSpace( // 4. Delete space icons from S3 try { - const bucketProvider = await createBucketProvider(); + await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess(Option.none()); + + const listedObjects = yield* bucket.listObjects({ + prefix: `organizations/${user.activeOrganizationId}/spaces/${spaceId}/`, + }); + + if (listedObjects.Contents?.length) { + yield* bucket.deleteObjects( + listedObjects.Contents.map((content) => ({ + Key: content.Key, + })), + ); + + console.log( + `Deleted ${listedObjects.Contents.length} objects for space ${spaceId}`, + ); + } + }).pipe(runPromise); // List all objects with the space prefix - - const listedObjects = await bucketProvider.listObjects({ - prefix: `organizations/${user.activeOrganizationId}/spaces/${spaceId}/`, - }); - - if (listedObjects.Contents?.length) { - await bucketProvider.deleteObjects( - listedObjects.Contents.map((content) => ({ - Key: content.Key, - })), - ); - - console.log( - `Deleted ${listedObjects.Contents.length} objects for space ${spaceId}`, - ); - } } catch (error) { console.error("Error deleting space icons from S3:", error); // Continue with space deletion even if S3 deletion fails diff --git a/apps/web/actions/organization/update-space.ts b/apps/web/actions/organization/update-space.ts index 3c7ede3fe..ab6237d0b 100644 --- a/apps/web/actions/organization/update-space.ts +++ b/apps/web/actions/organization/update-space.ts @@ -4,10 +4,12 @@ import { db } from "@cap/database"; import { getCurrentUser } from "@cap/database/auth/session"; import { nanoIdLength } from "@cap/database/helpers"; import { spaceMembers, spaces } from "@cap/database/schema"; +import { S3Buckets } from "@cap/web-backend"; import { and, eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { revalidatePath } from "next/cache"; import { v4 as uuidv4 } from "uuid"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "@/lib/server"; import { uploadSpaceIcon } from "./upload-space-icon"; export async function updateSpace(formData: FormData) { @@ -48,14 +50,18 @@ export async function updateSpace(formData: FormData) { // Remove icon from S3 and set iconUrl to null const spaceArr = await db().select().from(spaces).where(eq(spaces.id, id)); const space = spaceArr[0]; - if (space && space.iconUrl) { - try { - const bucketProvider = await createBucketProvider(); - const prevKeyMatch = space.iconUrl.match(/organizations\/.+/); - if (prevKeyMatch && prevKeyMatch[0]) - await bucketProvider.deleteObject(prevKeyMatch[0]); - } catch (e) { - console.warn("Failed to delete old space icon from S3", e); + if (space?.iconUrl) { + const key = space.iconUrl.match(/organizations\/.+/)?.[0]; + + if (key) { + try { + await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess(Option.none()); + yield* bucket.deleteObject(key); + }).pipe(runPromise); + } catch (e) { + console.warn("Failed to delete old space icon from S3", e); + } } } await db().update(spaces).set({ iconUrl: null }).where(eq(spaces.id, id)); diff --git a/apps/web/actions/organization/upload-organization-icon.ts b/apps/web/actions/organization/upload-organization-icon.ts index 2c143a75c..485418da7 100644 --- a/apps/web/actions/organization/upload-organization-icon.ts +++ b/apps/web/actions/organization/upload-organization-icon.ts @@ -4,12 +4,14 @@ import { db } from "@cap/database"; import { getCurrentUser } from "@cap/database/auth/session"; import { organizations } from "@cap/database/schema"; import { serverEnv } from "@cap/env"; +import { S3Buckets } from "@cap/web-backend"; import DOMPurify from "dompurify"; import { eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { JSDOM } from "jsdom"; import { revalidatePath } from "next/cache"; import { sanitizeFile } from "@/lib/sanitizeFile"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "@/lib/server"; export async function uploadOrganizationIcon( formData: FormData, @@ -56,27 +58,30 @@ export async function uploadOrganizationIcon( try { const sanitizedFile = await sanitizeFile(file); - - const bucket = await createBucketProvider(); - - await bucket.putObject(fileKey, await sanitizedFile.bytes(), { - contentType: file.type, - }); - - // Construct the icon URL - let iconUrl; - if (serverEnv().CAP_AWS_BUCKET_URL) { - // If a custom bucket URL is defined, use it - iconUrl = `${serverEnv().CAP_AWS_BUCKET_URL}/${fileKey}`; - } else if (serverEnv().CAP_AWS_ENDPOINT) { - // For custom endpoints like MinIO - iconUrl = `${serverEnv().CAP_AWS_ENDPOINT}/${bucket.name}/${fileKey}`; - } else { - // Default AWS S3 URL format - iconUrl = `https://${bucket.name}.s3.${ - serverEnv().CAP_AWS_REGION || "us-east-1" - }.amazonaws.com/${fileKey}`; - } + let iconUrl: string | undefined; + + await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess(Option.none()); + + yield* bucket.putObject( + fileKey, + yield* Effect.promise(() => sanitizedFile.bytes()), + { contentType: file.type }, + ); + // Construct the icon URL + if (serverEnv().CAP_AWS_BUCKET_URL) { + // If a custom bucket URL is defined, use it + iconUrl = `${serverEnv().CAP_AWS_BUCKET_URL}/${fileKey}`; + } else if (serverEnv().CAP_AWS_ENDPOINT) { + // For custom endpoints like MinIO + iconUrl = `${serverEnv().CAP_AWS_ENDPOINT}/${bucket.bucketName}/${fileKey}`; + } else { + // Default AWS S3 URL format + iconUrl = `https://${bucket.bucketName}.s3.${ + serverEnv().CAP_AWS_REGION || "us-east-1" + }.amazonaws.com/${fileKey}`; + } + }).pipe(runPromise); // Update organization with new icon URL await db() diff --git a/apps/web/actions/organization/upload-space-icon.ts b/apps/web/actions/organization/upload-space-icon.ts index e3a93464c..e56c92b5a 100644 --- a/apps/web/actions/organization/upload-space-icon.ts +++ b/apps/web/actions/organization/upload-space-icon.ts @@ -4,10 +4,12 @@ import { db } from "@cap/database"; import { getCurrentUser } from "@cap/database/auth/session"; import { spaces } from "@cap/database/schema"; import { serverEnv } from "@cap/env"; +import { S3Buckets } from "@cap/web-backend"; import { eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { revalidatePath } from "next/cache"; import { sanitizeFile } from "@/lib/sanitizeFile"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "@/lib/server"; export async function uploadSpaceIcon(formData: FormData, spaceId: string) { const user = await getCurrentUser(); @@ -52,16 +54,18 @@ export async function uploadSpaceIcon(formData: FormData, spaceId: string) { space.organizationId }/spaces/${spaceId}/icon-${Date.now()}.${fileExtension}`; - const bucket = await createBucketProvider(); + const [bucket] = await S3Buckets.getBucketAccess(Option.none()).pipe( + runPromise, + ); try { // Remove previous icon if exists if (space.iconUrl) { // Try to extract the previous S3 key from the URL - const prevKeyMatch = space.iconUrl.match(/organizations\/.+/); - if (prevKeyMatch && prevKeyMatch[0]) { + const key = space.iconUrl.match(/organizations\/.+/)?.[0]; + if (key) { try { - await bucket.deleteObject(prevKeyMatch[0]); + await bucket.deleteObject(key).pipe(runPromise); } catch (e) { // Log and continue console.warn("Failed to delete old space icon from S3", e); @@ -71,18 +75,23 @@ export async function uploadSpaceIcon(formData: FormData, spaceId: string) { const sanitizedFile = await sanitizeFile(file); - await bucket.putObject(fileKey, await sanitizedFile.bytes(), { - contentType: file.type, - }); + await bucket + .putObject( + fileKey, + Effect.promise(() => sanitizedFile.bytes()), + { contentType: file.type }, + ) + .pipe(runPromise); + + let iconUrl: string | undefined; // Construct the icon URL - let iconUrl; if (serverEnv().CAP_AWS_BUCKET_URL) { iconUrl = `${serverEnv().CAP_AWS_BUCKET_URL}/${fileKey}`; } else if (serverEnv().CAP_AWS_ENDPOINT) { - iconUrl = `${serverEnv().CAP_AWS_ENDPOINT}/${bucket.name}/${fileKey}`; + iconUrl = `${serverEnv().CAP_AWS_ENDPOINT}/${bucket.bucketName}/${fileKey}`; } else { - iconUrl = `https://${bucket.name}.s3.${ + iconUrl = `https://${bucket.bucketName}.s3.${ serverEnv().CAP_AWS_REGION || "us-east-1" }.amazonaws.com/${fileKey}`; } diff --git a/apps/web/actions/video/upload.ts b/apps/web/actions/video/upload.ts index 6ab1a3c11..2bfa2f541 100644 --- a/apps/web/actions/video/upload.ts +++ b/apps/web/actions/video/upload.ts @@ -10,11 +10,13 @@ import { nanoId } from "@cap/database/helpers"; import { s3Buckets, videos, videoUploads } from "@cap/database/schema"; import { buildEnv, NODE_ENV, serverEnv } from "@cap/env"; import { userIsPro } from "@cap/utils"; +import { S3Buckets } from "@cap/web-backend"; import { type Folder, Video } from "@cap/web-domain"; import { eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { revalidatePath } from "next/cache"; +import { runPromise } from "@/lib/server"; import { dub } from "@/utils/dub"; -import { createBucketProvider } from "@/utils/s3"; async function getVideoUploadPresignedUrl({ fileKey, @@ -86,8 +88,6 @@ async function getVideoUploadPresignedUrl({ } } - const bucket = await createBucketProvider(customBucket); - const contentType = fileKey.endsWith(".aac") ? "audio/aac" : fileKey.endsWith(".webm") @@ -109,19 +109,27 @@ async function getVideoUploadPresignedUrl({ "x-amz-meta-audiocodec": audioCodec ?? "", }; - const presignedPostData = await bucket.getPresignedPostUrl(fileKey, { - Fields, - Expires: 1800, - }); - - const customEndpoint = serverEnv().CAP_AWS_ENDPOINT; - if (customEndpoint && !customEndpoint.includes("amazonaws.com")) { - if (serverEnv().S3_PATH_STYLE) { - presignedPostData.url = `${customEndpoint}/${bucket.name}`; - } else { - presignedPostData.url = customEndpoint; + const presignedPostData = await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess( + Option.fromNullable(customBucket?.id), + ); + + const presignedPostData = yield* bucket.getPresignedPostUrl(fileKey, { + Fields, + Expires: 1800, + }); + + const customEndpoint = serverEnv().CAP_AWS_ENDPOINT; + if (customEndpoint && !customEndpoint.includes("amazonaws.com")) { + if (serverEnv().S3_PATH_STYLE) { + presignedPostData.url = `${customEndpoint}/${bucket.bucketName}`; + } else { + presignedPostData.url = customEndpoint; + } } - } + + return presignedPostData; + }).pipe(runPromise); const videoId = fileKey.split("/")[1]; if (videoId) { @@ -214,15 +222,12 @@ export async function createVideoAndGetUploadUrl({ const idToUse = Video.VideoId.make(videoId || nanoId()); - const bucket = await createBucketProvider(customBucket); - const videoData = { id: idToUse, name: `Cap ${ isScreenshot ? "Screenshot" : isUpload ? "Upload" : "Recording" } - ${formattedDate}`, ownerId: user.id, - awsBucket: bucket.name, source: { type: "desktopMP4" as const }, isScreenshot, bucket: customBucket?.id, diff --git a/apps/web/actions/videos/download.ts b/apps/web/actions/videos/download.ts index 173c8a222..13acff323 100644 --- a/apps/web/actions/videos/download.ts +++ b/apps/web/actions/videos/download.ts @@ -3,9 +3,11 @@ import { db } from "@cap/database"; import { getCurrentUser } from "@cap/database/auth/session"; import { videos } from "@cap/database/schema"; +import { S3Buckets } from "@cap/web-backend"; import type { Video } from "@cap/web-domain"; import { eq } from "drizzle-orm"; -import { createBucketProvider } from "@/utils/s3"; +import { Effect, Option } from "effect"; +import { runPromise } from "@/lib/server"; export async function downloadVideo(videoId: Video.VideoId) { const user = await getCurrentUser(); @@ -31,10 +33,14 @@ export async function downloadVideo(videoId: Video.VideoId) { } try { - const bucketProvider = await createBucketProvider(); const videoKey = `${video.ownerId}/${videoId}/result.mp4`; - const downloadUrl = await bucketProvider.getSignedObjectUrl(videoKey); + const downloadUrl = await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess( + Option.fromNullable(video.bucket), + ); + return yield* bucket.getSignedObjectUrl(videoKey); + }).pipe(runPromise); return { success: true, diff --git a/apps/web/actions/videos/edit-transcript.ts b/apps/web/actions/videos/edit-transcript.ts index 48e2d857b..4cd630a2b 100644 --- a/apps/web/actions/videos/edit-transcript.ts +++ b/apps/web/actions/videos/edit-transcript.ts @@ -3,10 +3,12 @@ import { db } from "@cap/database"; import { getCurrentUser } from "@cap/database/auth/session"; import { s3Buckets, videos } from "@cap/database/schema"; +import { S3Buckets } from "@cap/web-backend"; import type { Video } from "@cap/web-domain"; import { eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { revalidatePath } from "next/cache"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "@/lib/server"; export async function editTranscriptEntry( videoId: Video.VideoId, @@ -50,20 +52,28 @@ export async function editTranscriptEntry( }; } - const bucket = await createBucketProvider(result.bucket); + const [bucket] = await S3Buckets.getBucketAccess( + Option.fromNullable(result.bucket?.id), + ).pipe(runPromise); try { const transcriptKey = `${video.ownerId}/${videoId}/transcription.vtt`; - const vttContent = await bucket.getObject(transcriptKey); - if (!vttContent) + const vttContent = await bucket.getObject(transcriptKey).pipe(runPromise); + if (Option.isNone(vttContent)) return { success: false, message: "Transcript file not found" }; - const updatedVttContent = updateVttEntry(vttContent, entryId, newText); - - await bucket.putObject(transcriptKey, updatedVttContent, { - contentType: "text/vtt", - }); + const updatedVttContent = updateVttEntry( + vttContent.value, + entryId, + newText, + ); + + await bucket + .putObject(transcriptKey, updatedVttContent, { + contentType: "text/vtt", + }) + .pipe(runPromise); revalidatePath(`/s/${videoId}`); diff --git a/apps/web/actions/videos/generate-ai-metadata.ts b/apps/web/actions/videos/generate-ai-metadata.ts index 7de6689d5..80d987fcb 100644 --- a/apps/web/actions/videos/generate-ai-metadata.ts +++ b/apps/web/actions/videos/generate-ai-metadata.ts @@ -4,10 +4,13 @@ import { db } from "@cap/database"; import { s3Buckets, videos } from "@cap/database/schema"; import type { VideoMetadata } from "@cap/database/types"; import { serverEnv } from "@cap/env"; +import { S3Buckets } from "@cap/web-backend"; import type { Video } from "@cap/web-domain"; import { eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { GROQ_MODEL, getGroqClient } from "@/lib/groq-client"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "@/lib/server"; + export async function generateAiMetadata( videoId: Video.VideoId, userId: string, @@ -118,27 +121,25 @@ export async function generateAiMetadata( const { video } = row; - const awsBucket = video.awsBucket; - if (!awsBucket) { - console.error( - `[generateAiMetadata] AWS bucket not found for video ${videoId}`, + const vtt = await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess( + Option.fromNullable(row.bucket?.id), ); - throw new Error(`AWS bucket not found for video ${videoId}`); - } - - const bucket = await createBucketProvider(row.bucket); - const transcriptKey = `${userId}/${videoId}/transcription.vtt`; - const vtt = await bucket.getObject(transcriptKey); + return yield* bucket.getObject(`${userId}/${videoId}/transcription.vtt`); + }).pipe(runPromise); - if (!vtt || vtt.length < 10) { + if (Option.isNone(vtt)) { + console.error(`[generateAiMetadata] Transcript is empty`); + throw new Error("Transcript is empty"); + } else if (vtt.value.length < 10) { console.error( - `[generateAiMetadata] Transcript is empty or too short (${vtt?.length} chars)`, + `[generateAiMetadata] Transcript is too short (${vtt.value.length} chars)`, ); - throw new Error("Transcript is empty or too short"); + throw new Error("Transcript is too short"); } - const transcriptText = vtt + const transcriptText = vtt.value .split("\n") .filter( (l) => diff --git a/apps/web/actions/videos/get-og-image.tsx b/apps/web/actions/videos/get-og-image.tsx index 9f6b7c391..cc10ffac5 100644 --- a/apps/web/actions/videos/get-og-image.tsx +++ b/apps/web/actions/videos/get-og-image.tsx @@ -1,9 +1,11 @@ import { db } from "@cap/database"; import { s3Buckets, videos } from "@cap/database/schema"; +import { S3Buckets } from "@cap/web-backend"; import type { Video } from "@cap/web-domain"; import { eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { ImageResponse } from "next/og"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "@/lib/server"; export async function generateVideoOgImage(videoId: Video.VideoId) { const videoData = await getData(videoId); @@ -58,13 +60,17 @@ export async function generateVideoOgImage(videoId: Video.VideoId) { ); } - const bucket = await createBucketProvider(videoData.bucket); - const screenshotKey = `${video.ownerId}/${video.id}/screenshot/screen-capture.jpg`; let screenshotUrl = null; try { - screenshotUrl = await bucket.getSignedObjectUrl(screenshotKey); + await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess( + Option.fromNullable(videoData.bucket?.id), + ); + + screenshotUrl = yield* bucket.getSignedObjectUrl(screenshotKey); + }).pipe(runPromise); } catch (error) { console.error("Error generating URL for screenshot:", error); } diff --git a/apps/web/actions/videos/get-transcript.ts b/apps/web/actions/videos/get-transcript.ts index 3726bb3ad..90b910f7d 100644 --- a/apps/web/actions/videos/get-transcript.ts +++ b/apps/web/actions/videos/get-transcript.ts @@ -3,9 +3,11 @@ import { db } from "@cap/database"; import { getCurrentUser } from "@cap/database/auth/session"; import { s3Buckets, videos } from "@cap/database/schema"; +import { S3Buckets } from "@cap/web-backend"; import type { Video } from "@cap/web-domain"; import { eq } from "drizzle-orm"; -import { createBucketProvider } from "@/utils/s3"; +import { Effect, Option } from "effect"; +import { runPromise } from "@/lib/server"; export async function getTranscript( videoId: Video.VideoId, @@ -46,20 +48,24 @@ export async function getTranscript( }; } - const bucket = await createBucketProvider(result.bucket); - try { - const transcriptKey = `${video.ownerId}/${videoId}/transcription.vtt`; + const vttContent = await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess( + Option.fromNullable(result.bucket?.id), + ); - const vttContent = await bucket.getObject(transcriptKey); + return yield* bucket.getObject( + `${video.ownerId}/${videoId}/transcription.vtt`, + ); + }).pipe(runPromise); - if (!vttContent) { + if (Option.isNone(vttContent)) { return { success: false, message: "Transcript file not found" }; } return { success: true, - content: vttContent, + content: vttContent.value, message: "Transcript retrieved successfully", }; } catch (error) { diff --git a/apps/web/app/api/desktop/[...route]/s3Config.ts b/apps/web/app/api/desktop/[...route]/s3Config.ts index 7740f191e..fea0e5802 100644 --- a/apps/web/app/api/desktop/[...route]/s3Config.ts +++ b/apps/web/app/api/desktop/[...route]/s3Config.ts @@ -3,6 +3,7 @@ import { db } from "@cap/database"; import { decrypt, encrypt } from "@cap/database/crypto"; import { nanoId } from "@cap/database/helpers"; import { s3Buckets } from "@cap/database/schema"; +import { S3Bucket } from "@cap/web-domain"; import { zValidator } from "@hono/zod-validator"; import { eq } from "drizzle-orm"; import { Hono } from "hono"; @@ -31,7 +32,7 @@ app.post( try { // Encrypt the sensitive data const encryptedConfig = { - id: nanoId(), + id: S3Bucket.S3BucketId.make(nanoId()), provider: data.provider, accessKeyId: await encrypt(data.accessKeyId), secretAccessKey: await encrypt(data.secretAccessKey), diff --git a/apps/web/app/api/desktop/[...route]/video.ts b/apps/web/app/api/desktop/[...route]/video.ts index 06d6dae44..29a007422 100644 --- a/apps/web/app/api/desktop/[...route]/video.ts +++ b/apps/web/app/api/desktop/[...route]/video.ts @@ -5,13 +5,15 @@ import { nanoId } from "@cap/database/helpers"; import { s3Buckets, videos, videoUploads } from "@cap/database/schema"; import { buildEnv, NODE_ENV, serverEnv } from "@cap/env"; import { userIsPro } from "@cap/utils"; +import { S3Buckets } from "@cap/web-backend"; import { Video } from "@cap/web-domain"; import { zValidator } from "@hono/zod-validator"; import { and, count, eq, gt, gte, lt, lte } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { Hono } from "hono"; import { z } from "zod"; +import { runPromise } from "@/lib/server"; import { dub } from "@/utils/dub"; -import { createBucketProvider } from "@/utils/s3"; import { stringOrNumberOptional } from "@/utils/zod"; import { withAuth } from "../../utils"; @@ -71,8 +73,6 @@ app.get( console.log("User bucket:", customBucket ? "found" : "not found"); - const bucket = await createBucketProvider(customBucket); - const date = new Date(); const formattedDate = `${date.getDate()} ${date.toLocaleString( "default", @@ -108,8 +108,6 @@ app.get( id: idToUse, name: videoName, ownerId: user.id, - awsRegion: "auto", - awsBucket: bucket.name, source: recordingMode === "hls" ? { type: "local" as const } @@ -225,18 +223,22 @@ app.delete( .delete(videos) .where(and(eq(videos.id, videoId), eq(videos.ownerId, user.id))); - const bucket = await createBucketProvider(result.bucket); + await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess( + Option.fromNullable(result.bucket?.id), + ); - const listedObjects = await bucket.listObjects({ - prefix: `${user.id}/${videoId}/`, - }); + const listedObjects = yield* bucket.listObjects({ + prefix: `${user.id}/${videoId}/`, + }); - if (listedObjects.Contents?.length) - await bucket.deleteObjects( - listedObjects.Contents.map((content: any) => ({ - Key: content.Key, - })), - ); + if (listedObjects.Contents?.length) + yield* bucket.deleteObjects( + listedObjects.Contents.map((content: any) => ({ + Key: content.Key, + })), + ); + }).pipe(runPromise); return c.json(true); } catch (error) { diff --git a/apps/web/app/api/playlist/route.ts b/apps/web/app/api/playlist/route.ts index 3d706f1c4..ef8b9e98d 100644 --- a/apps/web/app/api/playlist/route.ts +++ b/apps/web/app/api/playlist/route.ts @@ -1,10 +1,5 @@ import { serverEnv } from "@cap/env"; -import { - provideOptionalAuth, - S3BucketAccess, - S3Buckets, - Videos, -} from "@cap/web-backend"; +import { provideOptionalAuth, S3Buckets, Videos } from "@cap/web-backend"; import { Video } from "@cap/web-domain"; import { HttpApi, @@ -60,14 +55,7 @@ const ApiLive = HttpApiBuilder.api(Api).pipe( ), ); - const [S3ProviderLayer, customBucket] = - yield* s3Buckets.getProviderForBucket(video.bucketId); - - return yield* getPlaylistResponse( - video, - Option.isSome(customBucket), - urlParams, - ).pipe(Effect.provide(S3ProviderLayer)); + return yield* getPlaylistResponse(video, urlParams); }).pipe( provideOptionalAuth, Effect.tapErrorCause(Effect.logError), @@ -78,6 +66,7 @@ const ApiLive = HttpApiBuilder.api(Api).pipe( S3Error: () => new HttpApiError.InternalServerError(), UnknownException: () => new HttpApiError.InternalServerError(), }), + Effect.provideService(S3Buckets, s3Buckets), ), ); }), @@ -87,13 +76,12 @@ const ApiLive = HttpApiBuilder.api(Api).pipe( const getPlaylistResponse = ( video: Video.Video, - isCustomBucket: boolean, urlParams: (typeof GetPlaylistParams)["Type"], ) => Effect.gen(function* () { - const s3 = yield* S3BucketAccess; + const [s3, customBucket] = yield* S3Buckets.getBucketAccess(video.bucketId); - if (!isCustomBucket) { + if (Option.isNone(customBucket)) { let redirect = `${video.ownerId}/${video.id}/combined-source/stream.m3u8`; if (video.source.type === "desktopMP4" || urlParams.videoType === "mp4") diff --git a/apps/web/app/api/thumbnail/route.ts b/apps/web/app/api/thumbnail/route.ts index de2c5a8a1..93ca4b9cc 100644 --- a/apps/web/app/api/thumbnail/route.ts +++ b/apps/web/app/api/thumbnail/route.ts @@ -1,10 +1,12 @@ import { db } from "@cap/database"; import { s3Buckets, videos } from "@cap/database/schema"; +import { S3Buckets } from "@cap/web-backend"; import { Video } from "@cap/web-domain"; import { eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import type { NextRequest } from "next/server"; +import { runPromise } from "@/lib/server"; import { getHeaders } from "@/utils/helpers"; -import { createBucketProvider } from "@/utils/s3"; export const revalidate = 0; @@ -44,12 +46,15 @@ export async function GET(request: NextRequest) { ); const prefix = `${query.video.ownerId}/${query.video.id}/`; - const bucketProvider = await createBucketProvider(query.bucket); try { - const listResponse = await bucketProvider.listObjects({ - prefix: prefix, - }); + const [bucket] = await S3Buckets.getBucketAccess( + Option.fromNullable(query.bucket?.id), + ).pipe(runPromise); + + const listResponse = await bucket + .listObjects({ prefix: prefix }) + .pipe(runPromise); const contents = listResponse.Contents || []; const thumbnailKey = contents.find((item) => @@ -68,7 +73,9 @@ export async function GET(request: NextRequest) { }, ); - const thumbnailUrl = await bucketProvider.getSignedObjectUrl(thumbnailKey); + const thumbnailUrl = await bucket + .getSignedObjectUrl(thumbnailKey) + .pipe(runPromise); return new Response(JSON.stringify({ screen: thumbnailUrl }), { status: 200, diff --git a/apps/web/app/api/upload/[...route]/multipart.ts b/apps/web/app/api/upload/[...route]/multipart.ts index b6783ee46..5d1217c8e 100644 --- a/apps/web/app/api/upload/[...route]/multipart.ts +++ b/apps/web/app/api/upload/[...route]/multipart.ts @@ -1,14 +1,15 @@ import { db, updateIfDefined } from "@cap/database"; -import { s3Buckets, videos, videoUploads } from "@cap/database/schema"; -import type { VideoMetadata } from "@cap/database/types"; +import { videos, videoUploads } from "@cap/database/schema"; import { serverEnv } from "@cap/env"; +import { S3Buckets } from "@cap/web-backend"; import { Video } from "@cap/web-domain"; import { zValidator } from "@hono/zod-validator"; import { and, eq } from "drizzle-orm"; +import { Effect } from "effect"; import { Hono } from "hono"; import { z } from "zod"; import { withAuth } from "@/app/api/utils"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "@/lib/server"; import { stringOrNumberOptional } from "@/utils/zod"; import { parseVideoIdOrFileKey } from "../utils"; @@ -37,34 +38,38 @@ app.post( try { try { - const { bucket } = await getUserBucketWithClient(user.id); + const uploadId = await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccessForUser(user.id); - const finalContentType = contentType || "video/mp4"; - console.log( - `Creating multipart upload in bucket: ${bucket.name}, content-type: ${finalContentType}, key: ${fileKey}`, - ); + const finalContentType = contentType || "video/mp4"; + console.log( + `Creating multipart upload in bucket: ${bucket.bucketName}, content-type: ${finalContentType}, key: ${fileKey}`, + ); - const { UploadId } = await bucket.multipart.create(fileKey, { - ContentType: finalContentType, - Metadata: { - userId: user.id, - source: "cap-multipart-upload", - }, - CacheControl: "max-age=31536000", - }); - - if (!UploadId) { - throw new Error("No UploadId returned from S3"); - } + const { UploadId } = yield* bucket.multipart.create(fileKey, { + ContentType: finalContentType, + Metadata: { + userId: user.id, + source: "cap-multipart-upload", + }, + CacheControl: "max-age=31536000", + }); - console.log( - `Successfully initiated multipart upload with ID: ${UploadId}`, - ); - console.log( - `Upload details: Bucket=${bucket.name}, Key=${fileKey}, ContentType=${finalContentType}`, - ); + if (!UploadId) { + throw new Error("No UploadId returned from S3"); + } + + console.log( + `Successfully initiated multipart upload with ID: ${UploadId}`, + ); + console.log( + `Upload details: Bucket=${bucket.bucketName}, Key=${fileKey}, ContentType=${finalContentType}`, + ); + + return UploadId; + }).pipe(runPromise); - return c.json({ uploadId: UploadId }); + return c.json({ uploadId: uploadId }); } catch (s3Error) { console.error("S3 operation failed:", s3Error); throw new Error( @@ -115,18 +120,23 @@ app.post( try { try { - const { bucket } = await getUserBucketWithClient(user.id); + const presignedUrl = await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccessForUser(user.id); - console.log( - `Getting presigned URL for part ${partNumber} of upload ${uploadId}`, - ); + console.log( + `Getting presigned URL for part ${partNumber} of upload ${uploadId}`, + ); - const presignedUrl = await bucket.multipart.getPresignedUploadPartUrl( - fileKey, - uploadId, - partNumber, - { ContentMD5: md5Sum }, - ); + const presignedUrl = + yield* bucket.multipart.getPresignedUploadPartUrl( + fileKey, + uploadId, + partNumber, + { ContentMD5: md5Sum }, + ); + + return presignedUrl; + }).pipe(runPromise); return c.json({ presignedUrl }); } catch (s3Error) { @@ -188,53 +198,59 @@ app.post( try { try { - const { bucket } = await getUserBucketWithClient(user.id); - - console.log( - `Completing multipart upload ${uploadId} with ${parts.length} parts for key: ${fileKey}`, + const [bucket] = await S3Buckets.getBucketAccessForUser(user.id).pipe( + runPromise, ); - const totalSize = parts.reduce((acc, part) => acc + part.size, 0); - console.log(`Total size of all parts: ${totalSize} bytes`); + const { result, formattedParts } = await Effect.gen(function* () { + console.log( + `Completing multipart upload ${uploadId} with ${parts.length} parts for key: ${fileKey}`, + ); - const sortedParts = [...parts].sort( - (a, b) => a.partNumber - b.partNumber, - ); + const totalSize = parts.reduce((acc, part) => acc + part.size, 0); + console.log(`Total size of all parts: ${totalSize} bytes`); - const sequentialCheck = sortedParts.every( - (part, index) => part.partNumber === index + 1, - ); + const sortedParts = [...parts].sort( + (a, b) => a.partNumber - b.partNumber, + ); - if (!sequentialCheck) { - console.warn( - "WARNING: Part numbers are not sequential! This may cause issues with the assembled file.", + const sequentialCheck = sortedParts.every( + (part, index) => part.partNumber === index + 1, ); - } - const formattedParts = sortedParts.map((part) => ({ - PartNumber: part.partNumber, - ETag: part.etag, - })); + if (!sequentialCheck) { + console.warn( + "WARNING: Part numbers are not sequential! This may cause issues with the assembled file.", + ); + } - console.log( - "Sending to S3:", - JSON.stringify( - { - Bucket: bucket.name, - Key: fileKey, - UploadId: uploadId, + const formattedParts = sortedParts.map((part) => ({ + PartNumber: part.partNumber, + ETag: part.etag, + })); + + console.log( + "Sending to S3:", + JSON.stringify( + { + Bucket: bucket.bucketName, + Key: fileKey, + UploadId: uploadId, + Parts: formattedParts, + }, + null, + 2, + ), + ); + + const result = yield* bucket.multipart.complete(fileKey, uploadId, { + MultipartUpload: { Parts: formattedParts, }, - null, - 2, - ), - ); + }); - const result = await bucket.multipart.complete(fileKey, uploadId, { - MultipartUpload: { - Parts: formattedParts, - }, - }); + return { result, formattedParts }; + }).pipe(runPromise); try { console.log( @@ -244,36 +260,41 @@ app.post( ); console.log(`Complete response: ${JSON.stringify(result, null, 2)}`); - try { + await Effect.gen(function* () { console.log( "Performing metadata fix by copying the object to itself...", ); - const copyResult = await bucket.copyObject( - `${bucket.name}/${fileKey}`, - fileKey, - { + yield* bucket + .copyObject(`${bucket.bucketName}/${fileKey}`, fileKey, { ContentType: "video/mp4", MetadataDirective: "REPLACE", - }, - ); - - console.log("Copy for metadata fix successful:", copyResult); - } catch (copyError) { - console.error( - "Warning: Failed to copy object to fix metadata:", - copyError, - ); - } + }) + .pipe( + Effect.tap((result) => + Effect.log("Copy for metadata fix successful:", result), + ), + Effect.catchAll((e) => + Effect.logError( + "Warning: Failed to copy object to fix metadata:", + e, + ), + ), + ); - try { - const headResult = await bucket.headObject(fileKey); - console.log( - `Object verification successful: ContentType=${headResult.ContentType}, ContentLength=${headResult.ContentLength}`, + yield* bucket.headObject(fileKey).pipe( + Effect.tap((headResult) => + Effect.log( + `Object verification successful: ContentType=${headResult.ContentType}, ContentLength=${headResult.ContentLength}`, + ), + ), + Effect.catchAll((headError) => + Effect.logError( + `Warning: Unable to verify object: ${headError}`, + ), + ), ); - } catch (headError) { - console.error(`Warning: Unable to verify object: ${headError}`); - } + }).pipe(runPromise); const videoIdFromFileKey = fileKey.split("/")[1]; @@ -359,22 +380,3 @@ app.post( } }, ); - -async function getUserBucketWithClient(userId: string) { - const [customBucket] = await db() - .select() - .from(s3Buckets) - .where(eq(s3Buckets.ownerId, userId)); - - console.log("S3 bucket configuration:", { - hasEndpoint: !!customBucket?.endpoint, - endpoint: customBucket?.endpoint || "N/A", - region: customBucket?.region || "N/A", - hasAccessKey: !!customBucket?.accessKeyId, - hasSecretKey: !!customBucket?.secretAccessKey, - }); - - const bucket = await createBucketProvider(customBucket); - - return { bucket }; -} diff --git a/apps/web/app/api/upload/[...route]/signed.ts b/apps/web/app/api/upload/[...route]/signed.ts index 6975c5c6b..601cb50cb 100644 --- a/apps/web/app/api/upload/[...route]/signed.ts +++ b/apps/web/app/api/upload/[...route]/signed.ts @@ -2,15 +2,18 @@ import { CloudFrontClient, CreateInvalidationCommand, } from "@aws-sdk/client-cloudfront"; +import type { PresignedPost } from "@aws-sdk/s3-presigned-post"; import { db, updateIfDefined } from "@cap/database"; import { s3Buckets, videos } from "@cap/database/schema"; import { serverEnv } from "@cap/env"; +import { S3Buckets } from "@cap/web-backend"; import { Video } from "@cap/web-domain"; import { zValidator } from "@hono/zod-validator"; import { and, eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { Hono } from "hono"; import { z } from "zod"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "@/lib/server"; import { stringOrNumberOptional } from "@/utils/zod"; import { withAuth } from "../../utils"; import { parseVideoIdOrFileKey } from "../utils"; @@ -110,34 +113,42 @@ app.post( ? "application/x-mpegURL" : "video/mp2t"; - const bucket = await createBucketProvider(customBucket); - - let data; - if (method === "post") { - const Fields = { - "Content-Type": contentType, - "x-amz-meta-userid": user.id, - "x-amz-meta-duration": durationInSecs - ? durationInSecs.toString() - : "", - }; - - data = bucket.getPresignedPostUrl(fileKey, { Fields, Expires: 1800 }); - } else if (method === "put") { - const presignedUrl = await bucket.getPresignedPutUrl( - fileKey, - { - ContentType: contentType, - Metadata: { - userid: user.id, - duration: durationInSecs ? durationInSecs.toString() : "", - }, - }, - { expiresIn: 1800 }, + let data: PresignedPost; + + await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess( + Option.fromNullable(customBucket?.id), ); - data = { url: presignedUrl, fields: {} }; - } + if (method === "post") { + const Fields = { + "Content-Type": contentType, + "x-amz-meta-userid": user.id, + "x-amz-meta-duration": durationInSecs + ? durationInSecs.toString() + : "", + }; + + data = yield* bucket.getPresignedPostUrl(fileKey, { + Fields, + Expires: 1800, + }); + } else if (method === "put") { + const presignedUrl = yield* bucket.getPresignedPutUrl( + fileKey, + { + ContentType: contentType, + Metadata: { + userid: user.id, + duration: durationInSecs ? durationInSecs.toString() : "", + }, + }, + { expiresIn: 1800 }, + ); + + data = { url: presignedUrl, fields: {} }; + } + }).pipe(runPromise); console.log("Presigned URL created successfully"); @@ -175,8 +186,8 @@ app.post( } } - if (method === "post") return c.json({ presignedPostData: data }); - else return c.json({ presignedPutData: data }); + if (method === "post") return c.json({ presignedPostData: data! }); + else return c.json({ presignedPutData: data! }); } catch (s3Error) { console.error("S3 operation failed:", s3Error); throw new Error( diff --git a/apps/web/app/embed/[videoId]/page.tsx b/apps/web/app/embed/[videoId]/page.tsx index b12f2432f..e923cc2ae 100644 --- a/apps/web/app/embed/[videoId]/page.tsx +++ b/apps/web/app/embed/[videoId]/page.tsx @@ -129,13 +129,13 @@ export default async function EmbedVideoPage(props: Props) { orgId: videos.orgId, createdAt: videos.createdAt, updatedAt: videos.updatedAt, - awsRegion: videos.awsRegion, - awsBucket: videos.awsBucket, bucket: videos.bucket, metadata: videos.metadata, public: videos.public, videoStartTime: videos.videoStartTime, audioStartTime: videos.audioStartTime, + awsRegion: videos.awsRegion, + awsBucket: videos.awsBucket, xStreamInfo: videos.xStreamInfo, jobId: videos.jobId, jobStatus: videos.jobStatus, diff --git a/apps/web/app/layout.tsx b/apps/web/app/layout.tsx index 47f9846c7..171da8476 100644 --- a/apps/web/app/layout.tsx +++ b/apps/web/app/layout.tsx @@ -115,8 +115,6 @@ export default async function RootLayout({ children }: PropsWithChildren) { diff --git a/apps/web/app/s/[videoId]/page.tsx b/apps/web/app/s/[videoId]/page.tsx index 64d176dd2..7a970edb3 100644 --- a/apps/web/app/s/[videoId]/page.tsx +++ b/apps/web/app/s/[videoId]/page.tsx @@ -269,13 +269,13 @@ export default async function ShareVideoPage(props: Props) { orgId: videos.orgId, createdAt: videos.createdAt, updatedAt: videos.updatedAt, - awsRegion: videos.awsRegion, - awsBucket: videos.awsBucket, bucket: videos.bucket, metadata: videos.metadata, public: videos.public, videoStartTime: videos.videoStartTime, audioStartTime: videos.audioStartTime, + awsRegion: videos.awsRegion, + awsBucket: videos.awsBucket, xStreamInfo: videos.xStreamInfo, jobId: videos.jobId, jobStatus: videos.jobStatus, @@ -456,8 +456,6 @@ async function AuthorizedContent({ ), createdAt: videos.createdAt, updatedAt: videos.updatedAt, - awsRegion: videos.awsRegion, - awsBucket: videos.awsBucket, bucket: videos.bucket, metadata: videos.metadata, public: videos.public, diff --git a/apps/web/components/forms/server.ts b/apps/web/components/forms/server.ts index f651902f9..5b952eb43 100644 --- a/apps/web/components/forms/server.ts +++ b/apps/web/components/forms/server.ts @@ -9,9 +9,11 @@ import { users, } from "@cap/database/schema"; import { serverEnv } from "@cap/env"; +import { S3Buckets } from "@cap/web-backend"; import { eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { revalidatePath } from "next/cache"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "@/lib/server"; export async function createOrganization(formData: FormData) { const user = await getCurrentUser(); @@ -64,26 +66,31 @@ export async function createOrganization(formData: FormData) { const fileKey = `organizations/${organizationId}/icon-${Date.now()}.${fileExtension}`; try { - const bucket = await createBucketProvider(); - - await bucket.putObject(fileKey, await iconFile.bytes(), { - contentType: iconFile.type, - }); - - // Construct the icon URL - let iconUrl; - if (serverEnv().CAP_AWS_BUCKET_URL) { - // If a custom bucket URL is defined, use it - iconUrl = `${serverEnv().CAP_AWS_BUCKET_URL}/${fileKey}`; - } else if (serverEnv().CAP_AWS_ENDPOINT) { - // For custom endpoints like MinIO - iconUrl = `${serverEnv().CAP_AWS_ENDPOINT}/${bucket.name}/${fileKey}`; - } else { - // Default AWS S3 URL format - iconUrl = `https://${bucket.name}.s3.${ - serverEnv().CAP_AWS_REGION || "us-east-1" - }.amazonaws.com/${fileKey}`; - } + let iconUrl: string | undefined; + + await Effect.gen(function* () { + const [bucket] = yield* S3Buckets.getBucketAccess(Option.none()); + + yield* bucket.putObject( + fileKey, + yield* Effect.promise(() => iconFile.bytes()), + { contentType: iconFile.type }, + ); + + // Construct the icon URL + if (serverEnv().CAP_AWS_BUCKET_URL) { + // If a custom bucket URL is defined, use it + iconUrl = `${serverEnv().CAP_AWS_BUCKET_URL}/${fileKey}`; + } else if (serverEnv().CAP_AWS_ENDPOINT) { + // For custom endpoints like MinIO + iconUrl = `${serverEnv().CAP_AWS_ENDPOINT}/${bucket.bucketName}/${fileKey}`; + } else { + // Default AWS S3 URL format + iconUrl = `https://${bucket.bucketName}.s3.${ + serverEnv().CAP_AWS_REGION || "us-east-1" + }.amazonaws.com/${fileKey}`; + } + }).pipe(runPromise); // Add the icon URL to the organization values orgValues.iconUrl = iconUrl; diff --git a/apps/web/lib/server.ts b/apps/web/lib/server.ts index 57f409d46..6edfb674c 100644 --- a/apps/web/lib/server.ts +++ b/apps/web/lib/server.ts @@ -44,8 +44,11 @@ export const Dependencies = Layer.mergeAll( Videos.Default, VideosPolicy.Default, Folders.Default, - Database.Default, -).pipe(Layer.provideMerge(Layer.mergeAll(TracingLayer, FetchHttpClient.layer))); +).pipe( + Layer.provideMerge( + Layer.mergeAll(Database.Default, TracingLayer, FetchHttpClient.layer), + ), +); // purposefully not exposed const EffectRuntime = ManagedRuntime.make(Dependencies); diff --git a/apps/web/lib/transcribe.ts b/apps/web/lib/transcribe.ts index d3e50c078..7451d692c 100644 --- a/apps/web/lib/transcribe.ts +++ b/apps/web/lib/transcribe.ts @@ -1,11 +1,13 @@ import { db } from "@cap/database"; import { s3Buckets, videos } from "@cap/database/schema"; import { serverEnv } from "@cap/env"; +import { S3Buckets } from "@cap/web-backend"; import type { Video } from "@cap/web-domain"; import { createClient } from "@deepgram/sdk"; import { eq } from "drizzle-orm"; +import { Effect, Option } from "effect"; import { generateAiMetadata } from "@/actions/videos/generate-ai-metadata"; -import { createBucketProvider } from "@/utils/s3"; +import { runPromise } from "./server"; type TranscribeResult = { success: boolean; @@ -71,12 +73,14 @@ export async function transcribeVideo( .set({ transcriptionStatus: "PROCESSING" }) .where(eq(videos.id, videoId)); - const bucket = await createBucketProvider(result.bucket); + const [bucket] = await S3Buckets.getBucketAccess( + Option.fromNullable(result.bucket?.id), + ).pipe(runPromise); try { const videoKey = `${userId}/${videoId}/result.mp4`; - const videoUrl = await bucket.getSignedObjectUrl(videoKey); + const videoUrl = await bucket.getSignedObjectUrl(videoKey).pipe(runPromise); // Check if video file actually exists before transcribing try { @@ -118,11 +122,11 @@ export async function transcribeVideo( throw new Error("Failed to transcribe audio"); } - await bucket.putObject( - `${userId}/${videoId}/transcription.vtt`, - transcription, - { contentType: "text/vtt" }, - ); + await bucket + .putObject(`${userId}/${videoId}/transcription.vtt`, transcription, { + contentType: "text/vtt", + }) + .pipe(runPromise); await db() .update(videos) diff --git a/apps/web/utils/public-env.tsx b/apps/web/utils/public-env.tsx index 4d8683638..05244652e 100644 --- a/apps/web/utils/public-env.tsx +++ b/apps/web/utils/public-env.tsx @@ -4,8 +4,6 @@ import { createContext, type PropsWithChildren, useContext } from "react"; type PublicEnvContext = { webUrl: string; - awsBucket: string; - s3BucketUrl: string; }; const Context = createContext(null); diff --git a/apps/web/utils/s3.ts b/apps/web/utils/s3.ts deleted file mode 100644 index 6f49b5f2e..000000000 --- a/apps/web/utils/s3.ts +++ /dev/null @@ -1,401 +0,0 @@ -import { - CompleteMultipartUploadCommand, - type CompleteMultipartUploadCommandInput, - type CompleteMultipartUploadOutput, - CopyObjectCommand, - type CopyObjectCommandInput, - type CopyObjectCommandOutput, - CreateMultipartUploadCommand, - type CreateMultipartUploadCommandInput, - type CreateMultipartUploadOutput, - DeleteObjectCommand, - type DeleteObjectCommandOutput, - DeleteObjectsCommand, - type DeleteObjectsCommandOutput, - GetObjectCommand, - HeadObjectCommand, - type HeadObjectOutput, - ListObjectsV2Command, - type ListObjectsV2Output, - type ObjectIdentifier, - PutObjectCommand, - type PutObjectCommandOutput, - type PutObjectRequest, - S3Client, - UploadPartCommand, - type UploadPartCommandInput, -} from "@aws-sdk/client-s3"; -import * as CloudFrontPresigner from "@aws-sdk/cloudfront-signer"; -import { - createPresignedPost, - type PresignedPost, - type PresignedPostOptions, -} from "@aws-sdk/s3-presigned-post"; -import * as S3Presigner from "@aws-sdk/s3-request-presigner"; -import { decrypt } from "@cap/database/crypto"; -import type { s3Buckets } from "@cap/database/schema"; -import { serverEnv } from "@cap/env"; -import { S3_BUCKET_URL } from "@cap/utils"; -import type { - RequestPresigningArguments, - StreamingBlobPayloadInputTypes, -} from "@smithy/types"; -import { awsCredentialsProvider } from "@vercel/functions/oidc"; -import type { InferSelectModel } from "drizzle-orm"; - -type S3Config = { - endpoint?: string | null; - region?: string; - accessKeyId?: string; - secretAccessKey?: string; - forcePathStyle?: boolean; -} | null; - -async function tryDecrypt( - text: string | null | undefined, -): Promise { - if (!text) return undefined; - try { - const decrypted = await decrypt(text); - return decrypted; - } catch (error) { - return text; - } -} - -export async function getS3Config(config?: S3Config, internal = false) { - if (!config) { - const env = serverEnv(); - return { - endpoint: internal - ? (env.S3_INTERNAL_ENDPOINT ?? env.CAP_AWS_ENDPOINT) - : (env.S3_PUBLIC_ENDPOINT ?? env.CAP_AWS_ENDPOINT), - region: env.CAP_AWS_REGION, - credentials: env.VERCEL_AWS_ROLE_ARN - ? awsCredentialsProvider({ roleArn: env.VERCEL_AWS_ROLE_ARN }) - : { - accessKeyId: env.CAP_AWS_ACCESS_KEY ?? "", - secretAccessKey: env.CAP_AWS_SECRET_KEY ?? "", - }, - forcePathStyle: env.S3_PATH_STYLE, - }; - } - - const endpoint = config.endpoint - ? await tryDecrypt(config.endpoint) - : serverEnv().CAP_AWS_ENDPOINT; - - const region = - (await tryDecrypt(config.region)) ?? serverEnv().CAP_AWS_REGION; - - const finalRegion = endpoint?.includes("localhost") ? "us-east-1" : region; - - const isLocalOrMinio = - endpoint?.includes("localhost") || endpoint?.includes("127.0.0.1"); - - return { - endpoint, - region: finalRegion, - credentials: { - accessKeyId: - (await tryDecrypt(config.accessKeyId)) ?? - serverEnv().CAP_AWS_ACCESS_KEY ?? - "", - secretAccessKey: - (await tryDecrypt(config.secretAccessKey)) ?? - serverEnv().CAP_AWS_SECRET_KEY ?? - "", - }, - forcePathStyle: endpoint?.endsWith("s3.amazonaws.com") - ? false - : (config.forcePathStyle ?? true), - useArnRegion: false, - requestHandler: { - connectionTimeout: isLocalOrMinio ? 5000 : 10000, - socketTimeout: isLocalOrMinio ? 30000 : 60000, - }, - }; -} - -export async function getS3Bucket( - bucket?: InferSelectModel | null, -) { - if (!bucket?.bucketName) { - return serverEnv().CAP_AWS_BUCKET || ""; - } - - return ( - ((await tryDecrypt(bucket.bucketName)) ?? serverEnv().CAP_AWS_BUCKET) || "" - ); -} - -export async function createS3Client(config?: S3Config, internal = false) { - const s3Config = await getS3Config(config, internal); - const isLocalOrMinio = - s3Config.endpoint?.includes("localhost") || - s3Config.endpoint?.includes("127.0.0.1"); - - return [ - new S3Client({ - ...s3Config, - maxAttempts: isLocalOrMinio ? 5 : 3, - }), - s3Config, - ] as const; -} - -interface S3BucketProvider { - name: string; - getSignedObjectUrl(key: string): Promise; - getObject(key: string): Promise; - listObjects(config?: { - prefix?: string; - maxKeys?: number; - }): Promise; - headObject(key: string): Promise; - putObject( - key: string, - body: StreamingBlobPayloadInputTypes, - fields?: { contentType?: string }, - ): Promise; - copyObject( - source: string, - key: string, - args?: Omit, - ): Promise; - deleteObject(key: string): Promise; - deleteObjects(keys: ObjectIdentifier[]): Promise; - getPresignedPutUrl( - key: string, - args?: Omit, - signingArgs?: RequestPresigningArguments, - ): Promise; - getPresignedPostUrl( - key: string, - args: Omit, - ): Promise; - multipart: { - create( - key: string, - args?: Omit, - ): Promise; - getPresignedUploadPartUrl( - key: string, - uploadId: string, - partNumber: number, - args?: Omit< - UploadPartCommandInput, - "Key" | "Bucket" | "PartNumber" | "UploadId" - >, - ): Promise; - complete( - key: string, - uploadId: string, - args?: Omit< - CompleteMultipartUploadCommandInput, - "Key" | "Bucket" | "UploadId" - >, - ): Promise; - }; -} - -function createCloudFrontProvider(config: { - s3: (internal: boolean) => Promise; - bucket: string; - keyPairId: string; - privateKey: string; -}): S3BucketProvider { - const s3 = createS3Provider(config.s3, config.bucket); - return { - ...s3, - async getSignedObjectUrl(key: string) { - const url = `${S3_BUCKET_URL}/${key}`; - const expires = Math.floor((Date.now() + 3600 * 1000) / 1000); - - const policy = { - Statement: [ - { - Resource: url, - Condition: { - DateLessThan: { "AWS:EpochTime": Math.floor(expires) }, - }, - }, - ], - }; - - return CloudFrontPresigner.getSignedUrl({ - url, - keyPairId: config.keyPairId, - privateKey: config.privateKey, - policy: JSON.stringify(policy), - }); - }, - }; -} - -function createS3Provider( - getClient: (internal: boolean) => Promise, - bucket: string, -): S3BucketProvider { - return { - name: bucket, - async getSignedObjectUrl(key: string) { - return S3Presigner.getSignedUrl( - await getClient(false), - new GetObjectCommand({ Bucket: bucket, Key: key }), - { expiresIn: 3600 }, - ); - }, - async getObject(key: string, format = "string") { - const resp = await getClient(true).then((c) => - c.send(new GetObjectCommand({ Bucket: bucket, Key: key })), - ); - if (format === "string") { - return await resp.Body?.transformToString(); - } - }, - async listObjects(config) { - return await getClient(true).then((c) => - c.send( - new ListObjectsV2Command({ - Bucket: bucket, - Prefix: config?.prefix, - MaxKeys: config?.maxKeys, - }), - ), - ); - }, - async headObject(key: string) { - return await getClient(true).then((c) => - c.send(new HeadObjectCommand({ Bucket: bucket, Key: key })), - ); - }, - async putObject(key: string, body, fields) { - return await getClient(true).then((c) => - c.send( - new PutObjectCommand({ - Bucket: bucket, - Key: key, - Body: body, - ContentType: fields?.contentType, - }), - ), - ); - }, - async copyObject(source: string, key: string, args) { - return await getClient(true).then((c) => - c.send( - new CopyObjectCommand({ - Bucket: bucket, - CopySource: source, - Key: key, - ...args, - }), - ), - ); - }, - deleteObject: (key: string) => - getClient(true).then((client) => - client.send(new DeleteObjectCommand({ Bucket: bucket, Key: key })), - ), - deleteObjects: (objects: ObjectIdentifier[]) => - getClient(true).then((client) => - client.send( - new DeleteObjectsCommand({ - Bucket: bucket, - Delete: { - Objects: objects, - }, - }), - ), - ), - getPresignedPutUrl: (key: string, args, signingArgs) => - getClient(false).then((client) => - S3Presigner.getSignedUrl( - client, - new PutObjectCommand({ Bucket: bucket, Key: key, ...args }), - signingArgs, - ), - ), - getPresignedPostUrl: ( - key: string, - args: Omit, - ) => - getClient(false).then((client) => - createPresignedPost(client, { - ...args, - Bucket: bucket, - Key: key, - }), - ), - multipart: { - create: (key, args) => - getClient(true).then((client) => - client.send( - new CreateMultipartUploadCommand({ - ...args, - Bucket: bucket, - Key: key, - }), - ), - ), - getPresignedUploadPartUrl: (key, uploadId, partNumber, args) => { - console.log({ - ...args, - Bucket: bucket, - Key: key, - UploadId: uploadId, - PartNumber: partNumber, - }); - return getClient(false).then((client) => - S3Presigner.getSignedUrl( - client, - new UploadPartCommand({ - ...args, - Bucket: bucket, - Key: key, - UploadId: uploadId, - PartNumber: partNumber, - }), - ), - ); - }, - complete: (key, uploadId, args) => - getClient(true).then((client) => - client.send( - new CompleteMultipartUploadCommand({ - Bucket: bucket, - Key: key, - UploadId: uploadId, - ...args, - }), - ), - ), - }, - }; -} - -export async function createBucketProvider( - customBucket?: InferSelectModel | null, -) { - const bucket = await getS3Bucket(customBucket); - const getClient = (internal: boolean) => - createS3Client(customBucket, internal).then((v) => v[0]); - - if (!customBucket && serverEnv().CAP_CLOUDFRONT_DISTRIBUTION_ID) { - const keyPairId = serverEnv().CLOUDFRONT_KEYPAIR_ID; - const privateKey = serverEnv().CLOUDFRONT_KEYPAIR_PRIVATE_KEY; - - if (!keyPairId || !privateKey) - throw new Error("Missing CloudFront keypair ID or private key"); - - return createCloudFrontProvider({ - s3: getClient, - bucket, - keyPairId, - privateKey, - }); - } - - return createS3Provider(getClient, bucket); -} diff --git a/packages/database/schema.ts b/packages/database/schema.ts index 7bc232434..ccf63a2cc 100644 --- a/packages/database/schema.ts +++ b/packages/database/schema.ts @@ -1,4 +1,4 @@ -import type { Folder, Video } from "@cap/web-domain"; +import type { Folder, S3Bucket, Video } from "@cap/web-domain"; import { boolean, customType, @@ -245,7 +245,7 @@ export const videos = mysqlTable( // TODO: make this non-null orgId: nanoIdNullable("orgId"), name: varchar("name", { length: 255 }).notNull().default("My Video"), - bucket: nanoIdNullable("bucket"), + bucket: nanoIdNullable("bucket").$type(), // in seconds duration: float("duration"), width: int("width"), @@ -371,7 +371,7 @@ export const notifications = mysqlTable( ); export const s3Buckets = mysqlTable("s3_buckets", { - id: nanoId("id").notNull().primaryKey().unique(), + id: nanoId("id").notNull().primaryKey().unique().$type(), ownerId: nanoId("ownerId").notNull(), // Use encryptedText for sensitive fields region: encryptedText("region").notNull(), diff --git a/packages/web-backend/src/Loom/ImportVideo.ts b/packages/web-backend/src/Loom/ImportVideo.ts index 021f1d16e..0c5cf0891 100644 --- a/packages/web-backend/src/Loom/ImportVideo.ts +++ b/packages/web-backend/src/Loom/ImportVideo.ts @@ -5,7 +5,7 @@ import { Effect, Option, Schedule, Schema, Stream } from "effect"; import { DatabaseError } from "../Database.ts"; import { S3Buckets } from "../S3Buckets/index.ts"; -import { S3BucketAccess, S3Error } from "../S3Buckets/S3BucketAccess.ts"; +import { S3Error } from "../S3Buckets/S3BucketAccess.ts"; import { Videos } from "../Videos/index.ts"; export class LoomApiError extends Schema.TaggedError( @@ -63,7 +63,7 @@ export const LoomImportVideoLive = LoomImportVideo.toLayer( const loomVideo = payload.loom.video; const [_, customBucket] = yield* s3Buckets - .getProviderForUser(payload.cap.userId) + .getBucketAccessForUser(payload.cap.userId) .pipe(Effect.catchAll(() => Effect.die(null))); const customBucketId = Option.map(customBucket, (b) => b.id); @@ -101,58 +101,50 @@ export const LoomImportVideoLive = LoomImportVideo.toLayer( error: LoomImportVideoError, success: Schema.Struct({ fileKey: Schema.String }), execute: Effect.gen(function* () { - const [bucketProvider] = - yield* s3Buckets.getProviderForBucket(customBucketId); - - return yield* Effect.gen(function* () { - const s3Bucket = yield* S3BucketAccess; - - const resp = yield* http - .get(payload.loom.video.downloadUrl) - .pipe(Effect.catchAll((cause) => new LoomApiError({ cause }))); - const contentLength = Headers.get( - resp.headers, - "content-length", - ).pipe( - Option.map((v) => Number(v)), - Option.getOrUndefined, - ); - yield* Effect.log(`Downloading ${contentLength} bytes`); - - let downloadedBytes = 0; - - const key = source.getFileKey(); - - yield* s3Bucket - .putObject( - key, - resp.stream.pipe( - Stream.tap((bytes) => { - downloadedBytes += bytes.length; - return Effect.void; + const [s3Bucket] = yield* s3Buckets.getBucketAccess(customBucketId); + + const resp = yield* http + .get(payload.loom.video.downloadUrl) + .pipe(Effect.catchAll((cause) => new LoomApiError({ cause }))); + const contentLength = Headers.get(resp.headers, "content-length").pipe( + Option.map((v) => Number(v)), + Option.getOrUndefined, + ); + yield* Effect.log(`Downloading ${contentLength} bytes`); + + let downloadedBytes = 0; + + const key = source.getFileKey(); + + yield* s3Bucket + .putObject( + key, + resp.stream.pipe( + Stream.tap((bytes) => { + downloadedBytes += bytes.length; + return Effect.void; + }), + ), + { contentLength }, + ) + .pipe( + Effect.race( + // TODO: Connect this with upload progress + Effect.repeat( + Effect.gen(function* () { + const bytes = yield* Effect.succeed(downloadedBytes); + yield* Effect.log(`Downloaded ${bytes} bytes`); }), - ), - { contentLength }, - ) - .pipe( - Effect.race( - // TODO: Connect this with upload progress - Effect.repeat( - Effect.gen(function* () { - const bytes = yield* Effect.succeed(downloadedBytes); - yield* Effect.log(`Downloaded ${bytes} bytes`); - }), - Schedule.forever.pipe(Schedule.delayed(() => "2 seconds")), - ).pipe(Effect.delay("100 millis")), - ), - ); - - yield* Effect.log( - `Uploaded video for user '${payload.cap.userId}' at key '${key}'`, + Schedule.forever.pipe(Schedule.delayed(() => "2 seconds")), + ).pipe(Effect.delay("100 millis")), + ), ); - return { fileKey: key }; - }).pipe(Effect.provide(bucketProvider)); + yield* Effect.log( + `Uploaded video for user '${payload.cap.userId}' at key '${key}'`, + ); + + return { fileKey: key }; }), }); diff --git a/packages/web-backend/src/S3Buckets/S3BucketAccess.ts b/packages/web-backend/src/S3Buckets/S3BucketAccess.ts index 4254a7b2e..d153d5739 100644 --- a/packages/web-backend/src/S3Buckets/S3BucketAccess.ts +++ b/packages/web-backend/src/S3Buckets/S3BucketAccess.ts @@ -15,23 +15,17 @@ export class S3Error extends Schema.TaggedError()("S3Error", { }) {} const wrapS3Promise = ( - callback: ( - provider: S3BucketClientProvider["Type"], - ) => Promise | Effect.Effect, Cause.UnknownException>, -): Effect.Effect => + promise: Promise | Effect.Effect, Cause.UnknownException>, +): Effect.Effect => Effect.gen(function* () { - const provider = yield* S3BucketClientProvider; - - const cbResult = callback(provider); - - if (cbResult instanceof Promise) { + if (promise instanceof Promise) { return yield* Effect.tryPromise({ - try: () => cbResult, + try: () => promise, catch: (cause) => new S3Error({ cause }), }); } - return yield* cbResult.pipe( + return yield* promise.pipe( Effect.flatMap((cbResult) => Effect.tryPromise({ try: () => cbResult, @@ -43,257 +37,256 @@ const wrapS3Promise = ( Effect.catchTag("UnknownException", (cause) => new S3Error({ cause })), ); -// @effect-diagnostics-next-line leakingRequirements:off -export class S3BucketAccess extends Effect.Service()( - "S3BucketAccess", - { - sync: () => ({ - bucketName: Effect.map(S3BucketClientProvider, (p) => p.bucket), - getSignedObjectUrl: (key: string) => - wrapS3Promise((provider) => - provider.getPublic.pipe( - Effect.map((client) => - S3Presigner.getSignedUrl( - client, - new S3.GetObjectCommand({ Bucket: provider.bucket, Key: key }), - { expiresIn: 3600 }, - ), +export const createS3BucketAccess = Effect.gen(function* () { + const provider = yield* S3BucketClientProvider; + return { + bucketName: provider.bucket, + getSignedObjectUrl: (key: string) => + wrapS3Promise( + provider.getPublic.pipe( + Effect.map((client) => + S3Presigner.getSignedUrl( + client, + new S3.GetObjectCommand({ Bucket: provider.bucket, Key: key }), + { expiresIn: 3600 }, ), ), - ).pipe(Effect.withSpan("getSignedObjectUrl")), - getObject: (key: string) => - wrapS3Promise((provider) => - provider.getInternal.pipe( - Effect.map(async (client) => { - const a = await client - .send( - new S3.GetObjectCommand({ - Bucket: provider.bucket, - Key: key, - }), - ) - .then((resp) => resp.Body?.transformToString()) - .catch((e) => { - if (e instanceof S3.NoSuchKey) { - return null; - } else { - throw e; - } - }); - return Option.fromNullable(a); - }), - ), ), - listObjects: (config: { prefix?: string; maxKeys?: number }) => - wrapS3Promise((provider) => - provider.getInternal.pipe( - Effect.map((client) => - client.send( - new S3.ListObjectsV2Command({ + ).pipe(Effect.withSpan("getSignedObjectUrl")), + getObject: (key: string) => + wrapS3Promise( + provider.getInternal.pipe( + Effect.map(async (client) => { + const a = await client + .send( + new S3.GetObjectCommand({ Bucket: provider.bucket, - Prefix: config?.prefix, - MaxKeys: config?.maxKeys, + Key: key, }), - ), + ) + .then((resp) => resp.Body?.transformToString()) + .catch((e) => { + if (e instanceof S3.NoSuchKey) { + return null; + } else { + throw e; + } + }); + return Option.fromNullable(a); + }), + ), + ), + listObjects: (config: { prefix?: string; maxKeys?: number }) => + wrapS3Promise( + provider.getInternal.pipe( + Effect.map((client) => + client.send( + new S3.ListObjectsV2Command({ + Bucket: provider.bucket, + Prefix: config?.prefix, + MaxKeys: config?.maxKeys, + }), ), ), ), - headObject: (key: string) => - wrapS3Promise((provider) => - provider.getInternal.pipe( - Effect.map((client) => - client.send( - new S3.HeadObjectCommand({ Bucket: provider.bucket, Key: key }), - ), + ), + headObject: (key: string) => + wrapS3Promise( + provider.getInternal.pipe( + Effect.map((client) => + client.send( + new S3.HeadObjectCommand({ Bucket: provider.bucket, Key: key }), ), ), ), - putObject: ( - key: string, - body: string | Uint8Array | ArrayBuffer | Stream.Stream, - fields?: { contentType?: string; contentLength?: number }, - ) => - wrapS3Promise((provider) => - provider.getInternal.pipe( - Effect.flatMap((client) => - Effect.gen(function* () { - let _body; - - if (typeof body === "string" || body instanceof Uint8Array) { - _body = body; - } else if (body instanceof ArrayBuffer) { - _body = new Uint8Array(body); - } else { - _body = body.pipe( - Stream.toReadableStreamRuntime(yield* Effect.runtime()), - (s) => Readable.fromWeb(s as any), - ); - } + ), + putObject: ( + key: string, + body: string | Uint8Array | ArrayBuffer | Stream.Stream, + fields?: { contentType?: string; contentLength?: number }, + ) => + wrapS3Promise( + provider.getInternal.pipe( + Effect.flatMap((client) => + Effect.gen(function* () { + let _body; - return client.send( - new S3.PutObjectCommand({ - Bucket: provider.bucket, - Key: key, - Body: _body, - ContentType: fields?.contentType, - ContentLength: fields?.contentLength, - }), + if (typeof body === "string" || body instanceof Uint8Array) { + _body = body; + } else if (body instanceof ArrayBuffer) { + _body = new Uint8Array(body); + } else { + _body = body.pipe( + Stream.toReadableStreamRuntime(yield* Effect.runtime()), + (s) => Readable.fromWeb(s as any), ); + } + + return client.send( + new S3.PutObjectCommand({ + Bucket: provider.bucket, + Key: key, + Body: _body, + ContentType: fields?.contentType, + ContentLength: fields?.contentLength, + }), + ); + }), + ), + ), + ).pipe( + Effect.withSpan("S3BucketAccess.putObject", { attributes: { key } }), + ), + /** Copy an object within the same bucket */ + copyObject: ( + source: string, + key: string, + args?: Omit, + ) => + wrapS3Promise( + provider.getInternal.pipe( + Effect.map((client) => + client.send( + new S3.CopyObjectCommand({ + Bucket: provider.bucket, + CopySource: source, + Key: key, + ...args, }), ), ), - ).pipe( - Effect.withSpan("S3BucketAccess.putObject", { attributes: { key } }), ), - /** Copy an object within the same bucket */ - copyObject: ( - source: string, - key: string, - args?: Omit, - ) => - wrapS3Promise((provider) => - provider.getInternal.pipe( - Effect.map((client) => - client.send( - new S3.CopyObjectCommand({ - Bucket: provider.bucket, - CopySource: source, - Key: key, - ...args, - }), - ), + ), + deleteObject: (key: string) => + wrapS3Promise( + provider.getInternal.pipe( + Effect.map((client) => + client.send( + new S3.DeleteObjectCommand({ + Bucket: provider.bucket, + Key: key, + }), ), ), ), - deleteObject: (key: string) => - wrapS3Promise((provider) => - provider.getInternal.pipe( - Effect.map((client) => - client.send( - new S3.DeleteObjectCommand({ - Bucket: provider.bucket, - Key: key, - }), - ), + ), + deleteObjects: (objects: S3.ObjectIdentifier[]) => + wrapS3Promise( + provider.getInternal.pipe( + Effect.map((client) => + client.send( + new S3.DeleteObjectsCommand({ + Bucket: provider.bucket, + Delete: { + Objects: objects, + }, + }), + ), + ), + ), + ), + getPresignedPutUrl: ( + key: string, + args?: Omit, + signingArgs?: RequestPresigningArguments, + ) => + wrapS3Promise( + provider.getPublic.pipe( + Effect.map((client) => + S3Presigner.getSignedUrl( + client, + new S3.PutObjectCommand({ + Bucket: provider.bucket, + Key: key, + ...args, + }), + signingArgs, ), ), ), - deleteObjects: (objects: S3.ObjectIdentifier[]) => - wrapS3Promise((provider) => + ), + getPresignedPostUrl: ( + key: string, + args: Omit, + ) => + wrapS3Promise( + provider.getPublic.pipe( + Effect.map((client) => + createPresignedPost(client, { + ...args, + Bucket: provider.bucket, + Key: key, + }), + ), + ), + ), + multipart: { + create: ( + key: string, + args?: Omit, + ) => + wrapS3Promise( provider.getInternal.pipe( Effect.map((client) => client.send( - new S3.DeleteObjectsCommand({ + new S3.CreateMultipartUploadCommand({ + ...args, Bucket: provider.bucket, - Delete: { - Objects: objects, - }, + Key: key, }), ), ), ), ), - getPresignedPutUrl: ( + getPresignedUploadPartUrl: ( key: string, - args?: Omit, - signingArgs?: RequestPresigningArguments, + uploadId: string, + partNumber: number, + args?: Omit< + S3.UploadPartCommandInput, + "Key" | "Bucket" | "PartNumber" | "UploadId" + >, ) => - wrapS3Promise((provider) => + wrapS3Promise( provider.getPublic.pipe( Effect.map((client) => S3Presigner.getSignedUrl( client, - new S3.PutObjectCommand({ + new S3.UploadPartCommand({ + ...args, Bucket: provider.bucket, Key: key, - ...args, + UploadId: uploadId, + PartNumber: partNumber, }), - signingArgs, ), ), ), ), - getPresignedPostUrl: ( + complete: ( key: string, - args: Omit, + uploadId: string, + args?: Omit< + S3.CompleteMultipartUploadCommandInput, + "Key" | "Bucket" | "UploadId" + >, ) => - wrapS3Promise((provider) => - provider.getPublic.pipe( + wrapS3Promise( + provider.getInternal.pipe( Effect.map((client) => - createPresignedPost(client, { - ...args, - Bucket: provider.bucket, - Key: key, - }), - ), - ), - ), - multipart: { - create: ( - key: string, - args?: Omit, - ) => - wrapS3Promise((provider) => - provider.getInternal.pipe( - Effect.map((client) => - client.send( - new S3.CreateMultipartUploadCommand({ - ...args, - Bucket: provider.bucket, - Key: key, - }), - ), - ), - ), - ), - getPresignedUploadPartUrl: ( - key: string, - uploadId: string, - partNumber: number, - args?: Omit< - S3.UploadPartCommandInput, - "Key" | "Bucket" | "PartNumber" | "UploadId" - >, - ) => - wrapS3Promise((provider) => - provider.getPublic.pipe( - Effect.map((client) => - S3Presigner.getSignedUrl( - client, - new S3.UploadPartCommand({ - ...args, - Bucket: provider.bucket, - Key: key, - UploadId: uploadId, - PartNumber: partNumber, - }), - ), - ), - ), - ), - complete: ( - key: string, - uploadId: string, - args?: Omit< - S3.CompleteMultipartUploadCommandInput, - "Key" | "Bucket" | "UploadId" - >, - ) => - wrapS3Promise((provider) => - provider.getInternal.pipe( - Effect.map((client) => - client.send( - new S3.CompleteMultipartUploadCommand({ - Bucket: provider.bucket, - Key: key, - UploadId: uploadId, - ...args, - }), - ), + client.send( + new S3.CompleteMultipartUploadCommand({ + Bucket: provider.bucket, + Key: key, + UploadId: uploadId, + ...args, + }), ), ), ), - }, - }), - }, -) {} + ), + }, + }; +}); + +export type S3BucketAccess = Effect.Effect.Success; diff --git a/packages/web-backend/src/S3Buckets/index.ts b/packages/web-backend/src/S3Buckets/index.ts index f5fb2d6c6..b1e7c3025 100644 --- a/packages/web-backend/src/S3Buckets/index.ts +++ b/packages/web-backend/src/S3Buckets/index.ts @@ -4,10 +4,10 @@ import { decrypt } from "@cap/database/crypto"; import { S3_BUCKET_URL } from "@cap/utils"; import type { S3Bucket } from "@cap/web-domain"; import { awsCredentialsProvider } from "@vercel/functions/oidc"; -import { Config, Context, Effect, Layer, Option } from "effect"; +import { Config, Effect, Layer, Option } from "effect"; import { Database } from "../Database.ts"; -import { S3BucketAccess } from "./S3BucketAccess.ts"; +import { createS3BucketAccess } from "./S3BucketAccess.ts"; import { S3BucketClientProvider } from "./S3BucketClientProvider.ts"; import { S3BucketsRepo } from "./S3BucketsRepo.ts"; @@ -61,7 +61,7 @@ export class S3Buckets extends Effect.Service()("S3Buckets", { return decrypt(v); })(); - const config = { + return new S3.S3Client({ endpoint, region: await decrypt(bucket.region), credentials: { @@ -74,13 +74,9 @@ export class S3Buckets extends Effect.Service()("S3Buckets", { Option.getOrNull, ) ?? true, useArnRegion: false, - }; - console.log({ config }); - return new S3.S3Client(config); + }); }; - const defaultBucketAccess = S3BucketAccess.Default; - const cloudfrontEnvs = yield* Config.all({ distributionId: Config.string("CAP_CLOUDFRONT_DISTRIBUTION_ID"), keypairId: Config.string("CLOUDFRONT_KEYPAIR_ID"), @@ -95,10 +91,8 @@ export class S3Buckets extends Effect.Service()("S3Buckets", { const cloudfrontBucketAccess = cloudfrontEnvs.pipe( Option.map((cloudfrontEnvs) => - Layer.map(defaultBucketAccess, (context) => { - const s3 = Context.get(context, S3BucketAccess); - - return Context.make(S3BucketAccess, { + Effect.flatMap(createS3BucketAccess, (s3) => + Effect.succeed({ ...s3, getSignedObjectUrl: (key) => { const url = `${S3_BUCKET_URL}/${key}`; @@ -126,15 +120,15 @@ export class S3Buckets extends Effect.Service()("S3Buckets", { }), ); }, - }); - }), + }), + ), ), ); - const getProvider = Effect.fn("S3Buckets.getProviderLayer")(function* ( + const getBucketAccess = Effect.fn("S3Buckets.getProviderLayer")(function* ( customBucket: Option.Option, ) { - const layer = yield* Option.match(customBucket, { + const bucketAccess = yield* Option.match(customBucket, { onNone: () => { const provider = Layer.succeed(S3BucketClientProvider, { getInternal: Effect.succeed(createDefaultClient(true)), @@ -144,8 +138,8 @@ export class S3Buckets extends Effect.Service()("S3Buckets", { return Option.match(cloudfrontBucketAccess, { onSome: (access) => access, - onNone: () => defaultBucketAccess, - }).pipe(Layer.merge(provider), Effect.succeed); + onNone: () => createS3BucketAccess, + }).pipe(Effect.provide(provider)); }, onSome: (customBucket) => Effect.gen(function* () { @@ -161,15 +155,15 @@ export class S3Buckets extends Effect.Service()("S3Buckets", { bucket, }); - return Layer.merge(defaultBucketAccess, provider); + return yield* createS3BucketAccess.pipe(Effect.provide(provider)); }), }); - return [layer, customBucket] as const; + return [bucketAccess, customBucket] as const; }); return { - getProviderForBucket: Effect.fn("S3Buckets.getProviderById")(function* ( + getBucketAccess: Effect.fn("S3Buckets.getBucketAccess")(function* ( bucketId: Option.Option, ) { const customBucket = yield* bucketId.pipe( @@ -178,18 +172,25 @@ export class S3Buckets extends Effect.Service()("S3Buckets", { Effect.map(Option.flatten), ); - return yield* getProvider(customBucket); + return yield* getBucketAccess(customBucket); }), - getProviderForUser: Effect.fn("S3Buckets.getProviderForUser")(function* ( - userId: string, - ) { - const customBucket = yield* repo - .getForUser(userId) - .pipe(Effect.option, Effect.map(Option.flatten)); + getBucketAccessForUser: Effect.fn("S3Buckets.getProviderForUser")( + function* (userId: string) { + const customBucket = yield* repo + .getForUser(userId) + .pipe(Effect.option, Effect.map(Option.flatten)); - return yield* getProvider(customBucket); - }), + return yield* getBucketAccess(customBucket); + }, + ), }; }), dependencies: [S3BucketsRepo.Default, Database.Default], -}) {} +}) { + static getBucketAccess = (bucketId: Option.Option) => + Effect.flatMap(S3Buckets, (b) => + b.getBucketAccess(Option.fromNullable(bucketId).pipe(Option.flatten)), + ); + static getBucketAccessForUser = (userId: string) => + Effect.flatMap(S3Buckets, (b) => b.getBucketAccessForUser(userId)); +} diff --git a/packages/web-backend/src/Videos/index.ts b/packages/web-backend/src/Videos/index.ts index 49d08d6cc..e258a1d7c 100644 --- a/packages/web-backend/src/Videos/index.ts +++ b/packages/web-backend/src/Videos/index.ts @@ -39,30 +39,25 @@ export class Videos extends Effect.Service()("Videos", { Effect.flatMap(Effect.catchAll(() => new Video.NotFoundError())), ); - const [S3ProviderLayer] = yield* s3Buckets.getProviderForBucket( - video.bucketId, - ); + const [bucket] = yield* s3Buckets.getBucketAccess(video.bucketId); yield* repo .delete(video.id) .pipe(Policy.withPolicy(policy.isOwner(video.id))); - yield* Effect.gen(function* () { - const s3 = yield* S3BucketAccess; - const user = yield* CurrentUser; + const user = yield* CurrentUser; - const prefix = `${user.id}/${video.id}/`; + const prefix = `${user.id}/${video.id}/`; - const listedObjects = yield* s3.listObjects({ prefix }); + const listedObjects = yield* bucket.listObjects({ prefix }); - if (listedObjects.Contents?.length) { - yield* s3.deleteObjects( - listedObjects.Contents.map((content) => ({ - Key: content.Key, - })), - ); - } - }).pipe(Effect.provide(S3ProviderLayer)); + if (listedObjects.Contents?.length) { + yield* bucket.deleteObjects( + listedObjects.Contents.map((content) => ({ + Key: content.Key, + })), + ); + } }), /* @@ -79,33 +74,29 @@ export class Videos extends Effect.Service()("Videos", { Policy.withPolicy(policy.isOwner(videoId)), ); - const [S3ProviderLayer] = yield* s3Buckets.getProviderForBucket( - video.bucketId, - ); + const [bucket] = yield* s3Buckets.getBucketAccess(video.bucketId); // Don't duplicate password or sharing data const newVideoId = yield* repo.create(video); - yield* Effect.gen(function* () { - const s3 = yield* S3BucketAccess; - const bucketName = yield* s3.bucketName; - - const prefix = `${video.ownerId}/${video.id}/`; - const newPrefix = `${video.ownerId}/${newVideoId}/`; - - const allObjects = yield* s3.listObjects({ prefix }); - - if (allObjects.Contents) - yield* Effect.all( - Array.filterMap(allObjects.Contents, (obj) => - Option.map(Option.fromNullable(obj.Key), (key) => { - const newKey = key.replace(prefix, newPrefix); - return s3.copyObject(`${bucketName}/${obj.Key}`, newKey); - }), - ), - { concurrency: 1 }, - ); - }).pipe(Effect.provide(S3ProviderLayer)); + const prefix = `${video.ownerId}/${video.id}/`; + const newPrefix = `${video.ownerId}/${newVideoId}/`; + + const allObjects = yield* bucket.listObjects({ prefix }); + + if (allObjects.Contents) + yield* Effect.all( + Array.filterMap(allObjects.Contents, (obj) => + Option.map(Option.fromNullable(obj.Key), (key) => { + const newKey = key.replace(prefix, newPrefix); + return bucket.copyObject( + `${bucket.bucketName}/${obj.Key}`, + newKey, + ); + }), + ), + { concurrency: 1 }, + ); }), /* diff --git a/packages/web-backend/src/index.ts b/packages/web-backend/src/index.ts index ebf421ec8..4ad253f16 100644 --- a/packages/web-backend/src/index.ts +++ b/packages/web-backend/src/index.ts @@ -4,7 +4,6 @@ export { Folders } from "./Folders/index.ts"; export * from "./Loom/index.ts"; export * from "./Rpcs.ts"; export { S3Buckets } from "./S3Buckets/index.ts"; -export { S3BucketAccess } from "./S3Buckets/S3BucketAccess.ts"; export { Videos } from "./Videos/index.ts"; export { VideosPolicy } from "./Videos/VideosPolicy.ts"; export * as Workflows from "./Workflows.ts";