Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,492 changes: 799 additions & 693 deletions package-lock.json

Large diffs are not rendered by default.

97 changes: 97 additions & 0 deletions packages/cdk/lambda/copyVideoJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import {
S3Client,
GetObjectCommand,
PutObjectCommand,
DeleteObjectsCommand,
ListObjectsV2Command,
} from '@aws-sdk/client-s3';
import { Readable } from 'stream';
import { VideoJob } from 'generative-ai-use-cases';
import { updateJobStatus } from './repositoryVideoJob';

export interface CopyVideoJobParams {
job: VideoJob;
}

const BUCKET_NAME: string = process.env.BUCKET_NAME!;
const videoBucketRegionMap = JSON.parse(
process.env.VIDEO_BUCKET_REGION_MAP ?? '{}'
);

const copyAndDeleteObject = async (
jobId: string,
srcBucket: string,
srcRegion: string,
dstBucket: string,
dstRegion: string
) => {
const srcS3 = new S3Client({ region: srcRegion });
const dstS3 = new S3Client({ region: dstRegion });

const { Body, ContentType, ContentLength } = await srcS3.send(
new GetObjectCommand({
Bucket: srcBucket,
Key: `${jobId}/output.mp4`,
})
);

const chunks = [];
for await (const chunk of Body as Readable) {
chunks.push(chunk);
}
const fileBuffer = Buffer.concat(chunks);

await dstS3.send(
new PutObjectCommand({
Bucket: dstBucket,
Key: `${jobId}/output.mp4`,
Body: fileBuffer,
ContentType,
ContentLength,
})
);

const listRes = await srcS3.send(
new ListObjectsV2Command({
Bucket: srcBucket,
Prefix: jobId,
})
);

const objects = listRes.Contents?.map((object) => ({
Key: object.Key,
}));

await srcS3.send(
new DeleteObjectsCommand({
Bucket: srcBucket,
Delete: {
Objects: objects,
},
})
);
};

export const handler = async (event: CopyVideoJobParams): Promise<void> => {
const job = event.job;
const jobId = job.jobId;
const dstRegion = process.env.AWS_DEFAULT_REGION!;
const dstBucket = BUCKET_NAME;
const srcRegion = job.region;
const srcBucket = videoBucketRegionMap[srcRegion];

try {
await copyAndDeleteObject(
jobId,
srcBucket,
srcRegion,
dstBucket,
dstRegion
);

await updateJobStatus(job, 'Completed');
} catch (error) {
console.error(error);
await updateJobStatus(job, 'Failed');
}
};
147 changes: 48 additions & 99 deletions packages/cdk/lambda/repositoryVideoJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ import {
GetAsyncInvokeCommand,
ValidationException,
} from '@aws-sdk/client-bedrock-runtime';
import {
S3Client,
GetObjectCommand,
PutObjectCommand,
DeleteObjectsCommand,
ListObjectsV2Command,
} from '@aws-sdk/client-s3';
import { initBedrockClient } from './utils/bedrockApi';
import { Readable } from 'stream';
import { CopyVideoJobParams } from './copyVideoJob';
import {
LambdaClient,
InvokeCommand,
InvocationType,
} from '@aws-sdk/client-lambda';

const BUCKET_NAME: string = process.env.BUCKET_NAME!;
const TABLE_NAME: string = process.env.TABLE_NAME!;
const COPY_VIDEO_JOB_FUNCTION_ARN = process.env.COPY_VIDEO_JOB_FUNCTION_ARN!;
const dynamoDb = new DynamoDBClient({});
const dynamoDbDocument = DynamoDBDocumentClient.from(dynamoDb);
const lambda = new LambdaClient({});

export const createJob = async (
_userId: string,
Expand Down Expand Up @@ -67,63 +67,28 @@ export const createJob = async (
return item;
};

const copyAndDeleteObject = async (
jobId: string,
srcBucket: string,
srcRegion: string,
dstBucket: string,
dstRegion: string
) => {
const srcS3 = new S3Client({ region: srcRegion });
const dstS3 = new S3Client({ region: dstRegion });

const { Body, ContentType, ContentLength } = await srcS3.send(
new GetObjectCommand({
Bucket: srcBucket,
Key: `${jobId}/output.mp4`,
})
);

const chunks = [];
for await (const chunk of Body as Readable) {
chunks.push(chunk);
}
const fileBuffer = Buffer.concat(chunks);

await dstS3.send(
new PutObjectCommand({
Bucket: dstBucket,
Key: `${jobId}/output.mp4`,
Body: fileBuffer,
ContentType,
ContentLength,
})
);

const listRes = await srcS3.send(
new ListObjectsV2Command({
Bucket: srcBucket,
Prefix: jobId,
})
);

const objects = listRes.Contents?.map((object) => ({
Key: object.Key,
}));

await srcS3.send(
new DeleteObjectsCommand({
Bucket: srcBucket,
Delete: {
Objects: objects,
},
})
);
export const updateJobStatus = async (job: VideoJob, status: string) => {
const updateCommand = new UpdateCommand({
TableName: TABLE_NAME,
Key: {
id: job.id,
createdDate: job.createdDate,
},
UpdateExpression: 'set #status = :status',
ExpressionAttributeNames: {
'#status': 'status',
},
ExpressionAttributeValues: {
':status': status,
},
});

await dynamoDbDocument.send(updateCommand);
};

const checkAndUpdateJob = async (
job: VideoJob
): Promise<'InProgress' | 'Completed' | 'Failed'> => {
): Promise<'InProgress' | 'Completed' | 'Failed' | 'Finalizing'> => {
try {
const client = await initBedrockClient(job.region);
const command = new GetAsyncInvokeCommand({
Expand All @@ -145,45 +110,29 @@ const checkAndUpdateJob = async (
}
}

if (res.status !== 'InProgress') {
if (res.status === 'Completed') {
const jobId = job.jobId;
const dstBucket = BUCKET_NAME;
const dstRegion = process.env.AWS_DEFAULT_REGION!;
const srcRegion = job.region;
const videoBucketRegionMap = JSON.parse(
process.env.VIDEO_BUCKET_REGION_MAP ?? '{}'
);
const srcBucket = videoBucketRegionMap[srcRegion];

await copyAndDeleteObject(
jobId,
srcBucket,
srcRegion,
dstBucket,
dstRegion
);
}

const updateCommand = new UpdateCommand({
TableName: TABLE_NAME,
Key: {
id: job.id,
createdDate: job.createdDate,
},
UpdateExpression: 'set #status = :status',
ExpressionAttributeNames: {
'#status': 'status',
},
ExpressionAttributeValues: {
':status': res.status,
},
});

await dynamoDbDocument.send(updateCommand);
// Video generation is complete, but the video copying is not finished.
// We will run the copy job to set the status to "Finalizing".
if (res.status === 'Completed') {
const params: CopyVideoJobParams = { job };

await lambda.send(
new InvokeCommand({
FunctionName: COPY_VIDEO_JOB_FUNCTION_ARN,
InvocationType: InvocationType.Event,
Payload: JSON.stringify(params),
})
);

await updateJobStatus(job, 'Finalizing');
return 'Finalizing';
} else if (res.status === 'Failed') {
// Since video generation has failed, we will not copy the video and will terminate with a Failed status.
await updateJobStatus(job, 'Failed');
return 'Failed';
} else {
// This res.status will be InProgress only.
return res.status!;
}

return res.status!;
} catch (e) {
console.error(e);
return job.status;
Expand Down
31 changes: 27 additions & 4 deletions packages/cdk/lib/construct/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,11 @@ export class Api extends Construct {
}
table.grantWriteData(generateVideoFunction);

const listVideoJobs = new NodejsFunction(this, 'ListVideoJobs', {
const copyVideoJob = new NodejsFunction(this, 'CopyVideoJob', {
runtime: Runtime.NODEJS_LATEST,
entry: './lambda/listVideoJobs.ts',
entry: './lambda/copyVideoJob.ts',
timeout: Duration.minutes(15),
memorySize: 512,
environment: {
MODEL_REGION: modelRegion,
MODEL_IDS: JSON.stringify(modelIds),
Expand All @@ -304,7 +305,7 @@ export class Api extends Construct {
});
for (const region of Object.keys(props.videoBucketRegionMap)) {
const bucketName = props.videoBucketRegionMap[region];
listVideoJobs.role?.addToPrincipalPolicy(
copyVideoJob.role?.addToPrincipalPolicy(
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:GetObject', 's3:DeleteObject', 's3:ListBucket'],
Expand All @@ -315,8 +316,30 @@ export class Api extends Construct {
})
);
}
fileBucket.grantWrite(listVideoJobs);
fileBucket.grantWrite(copyVideoJob);
table.grantWriteData(copyVideoJob);

const listVideoJobs = new NodejsFunction(this, 'ListVideoJobs', {
runtime: Runtime.NODEJS_LATEST,
entry: './lambda/listVideoJobs.ts',
timeout: Duration.minutes(15),
environment: {
MODEL_REGION: modelRegion,
MODEL_IDS: JSON.stringify(modelIds),
IMAGE_GENERATION_MODEL_IDS: JSON.stringify(imageGenerationModelIds),
VIDEO_GENERATION_MODEL_IDS: JSON.stringify(videoGenerationModelIds),
VIDEO_BUCKET_REGION_MAP: JSON.stringify(props.videoBucketRegionMap),
CROSS_ACCOUNT_BEDROCK_ROLE_ARN: crossAccountBedrockRoleArn ?? '',
BUCKET_NAME: fileBucket.bucketName,
TABLE_NAME: table.tableName,
COPY_VIDEO_JOB_FUNCTION_ARN: copyVideoJob.functionArn,
},
bundling: {
nodeModules: ['@aws-sdk/client-bedrock-runtime'],
},
});
table.grantReadWriteData(listVideoJobs);
copyVideoJob.grantInvoke(listVideoJobs);

const deleteVideoJob = new NodejsFunction(this, 'DeleteVideoJob', {
runtime: Runtime.NODEJS_LATEST,
Expand Down
1 change: 1 addition & 0 deletions packages/cdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"@aws-sdk/client-bedrock-runtime": "^3.755.0",
"@aws-sdk/client-dynamodb": "^3.755.0",
"@aws-sdk/client-kendra": "^3.755.0",
"@aws-sdk/client-lambda": "^3.755.0",
"@aws-sdk/client-s3": "^3.755.0",
"@aws-sdk/client-sagemaker-runtime": "^3.755.0",
"@aws-sdk/client-transcribe": "^3.755.0",
Expand Down
Loading
Loading