In [2]:
import asyncio
import base64
from pathlib import Path
import boto3
import json
from typing import Literal


def encode_b64(file_path) -> str:
    with open(file_path, "rb") as file:
        return base64.b64encode(file.read()).decode("utf-8")


class Nova2OmniEmbeddings:

    def __init__(self):
        self.client = boto3.client(
            service_name="bedrock-runtime",
            region_name="us-east-1",
        )

    async def _invoke_model(self, request_body):
        """
        Returns a dictionary with the following structure:
        {
            "request_id": str,
            "embeddings": [
                {
                    "embedding_type": str,
                    "embedding": list[float],
                }
            ]
        }
        """
        response = await asyncio.to_thread(
            self.client.invoke_model,
            body=json.dumps(request_body, indent=2),
            modelId="amazon.nova-2-multimodal-embeddings-v1:0",
            accept="application/json",
            contentType="application/json",
        )
        request_id = response.get("ResponseMetadata").get("RequestId")
        response_body = json.loads(response.get("body").read())

        results = response_body["embeddings"]
        for result in results:
            result["embedding_type"] = result.pop("embeddingType")

        return {
            "request_id": request_id,
            "embeddings": results,
        }

    async def embed_text(
        self,
        text: str,
        embedding_purpose: Literal[
            "GENERIC_INDEX",
            "TEXT_RETRIEVAL",
            "IMAGE_RETRIEVAL",
            "VIDEO_RETRIEVAL",
            "AUDIO_RETRIEVAL",
            "DOCUMENT_RETRIEVAL",
            "GENERIC_RETRIEVAL",
            "CLASSIFICATION",
            "CLUSTERING",
        ] = "GENERIC_INDEX",
        embedding_dimension: int = 3072,
        truncation_mode: Literal["START", "END", "NONE"] = "NONE",
    ):
        request_body = {
            "taskType": "SINGLE_EMBEDDING",
            "singleEmbeddingParams": {
                "embeddingPurpose": embedding_purpose,
                "embeddingDimension": embedding_dimension,
                "text": {"truncationMode": truncation_mode, "value": text},
            },
        }
        return await self._invoke_model(request_body)

    async def embed_video(
        self,
        video_path: str,
        embedding_purpose: Literal[
            "GENERIC_INDEX",
            "TEXT_RETRIEVAL",
            "IMAGE_RETRIEVAL",
            "VIDEO_RETRIEVAL",
            "AUDIO_RETRIEVAL",
            "DOCUMENT_RETRIEVAL",
            "GENERIC_RETRIEVAL",
            "CLASSIFICATION",
            "CLUSTERING",
        ] = "GENERIC_INDEX",
        embedding_dimension: int = 3072,
        embedding_mode: Literal[
            "AUDIO_VIDEO_COMBINED", "AUDIO_VIDEO_SEPARATE"
        ] = "AUDIO_VIDEO_COMBINED",
    ):
        """
        Args:
            video_path: str
            embedding_mode: Literal["AUDIO_VIDEO_COMBINED", "AUDIO_VIDEO_SEPARATE"]
                - "AUDIO_VIDEO_COMBINED" - Will produce a single embedding combining both audible and visual content.
                - "AUDIO_VIDEO_SEPARATE" - Will produce two embeddings, one for the audible content and one for the visual content.
        """
        video_path = Path(video_path)
        video_foramt = video_path.suffix[1:]
        video_b64 = encode_b64(video_path)

        request_body = {
            "taskType": "SINGLE_EMBEDDING",
            "singleEmbeddingParams": {
                "embeddingPurpose": embedding_purpose,
                "embeddingDimension": embedding_dimension,
                "video": {
                    "format": video_foramt,
                    "embeddingMode": embedding_mode,
                    "source": {"bytes": video_b64},
                },
            },
        }
        return await self._invoke_model(request_body)

    async def embed_audio(
        self,
        audio_path: str,
        embedding_purpose: Literal[
            "GENERIC_INDEX",
            "TEXT_RETRIEVAL",
            "IMAGE_RETRIEVAL",
            "VIDEO_RETRIEVAL",
            "AUDIO_RETRIEVAL",
            "DOCUMENT_RETRIEVAL",
            "GENERIC_RETRIEVAL",
            "CLASSIFICATION",
            "CLUSTERING",
        ] = "GENERIC_INDEX",
        embedding_dimension: int = 3072,
    ):
        audio_path = Path(audio_path)
        audio_format = audio_path.suffix[1:]
        audio_b64 = encode_b64(audio_path)

        request_body = {
            "taskType": "SINGLE_EMBEDDING",
            "singleEmbeddingParams": {
                "embeddingPurpose": embedding_purpose,
                "embeddingDimension": embedding_dimension,
                "audio": {"format": audio_format, "source": {"bytes": audio_b64}},
            },
        }
        return await self._invoke_model(request_body)

In [5]:
embed_model = Nova2OmniEmbeddings()

In [None]:


rst = asyncio.run(embed_model.embed_audio(
    audio_path="../datasets/explore/songs/renwoxing/clips_and_lyrics/0015_051.280-056.070.wav",
    embedding_purpose="GENERIC_INDEX",
))

  rst = await embed_model.embed_audio(


{'request_id': '190eed77-477b-4a80-9150-e9c86c6feff7',
 'embeddings': [{'embedding': [-0.0015606656,
    -0.013005546,
    -0.0057224403,
    0.05028811,
    0.0063727177,
    0.0020917254,
    0.0047036726,
    0.010751251,
    -0.0074131615,
    0.026878128,
    -0.028265387,
    -0.004790376,
    -0.006589477,
    -0.026357908,
    0.016560396,
    -0.018988097,
    0.0155199515,
    0.00029262478,
    -0.0071530505,
    -0.0071530505,
    -0.010231029,
    -0.023409983,
    -0.020462058,
    -0.013785879,
    0.02358339,
    0.013872582,
    -0.023063168,
    0.0068062358,
    0.030519681,
    0.01976843,
    0.00062047294,
    -0.0027961924,
    -0.004573617,
    0.031560127,
    0.004616969,
    -0.011791695,
    -0.024970649,
    -0.017774247,
    0.0015606656,
    -0.016040174,
    -0.01786095,
    0.010924659,
    0.011791695,
    -0.0012463648,
    0.02427702,
    0.003598201,
    -0.002904572,
    -0.0024710537,
    0.00085077947,
    0.0066761803,
    0.0018966421,
    0.03

In [6]:
video_rst = await embed_model.embed_video(
    video_path="../datasets/explore/videos/video_1/S1940E01-Scene-055.mp4",
    embedding_purpose="GENERIC_INDEX",
    embedding_mode="AUDIO_VIDEO_SEPARATE",
)

In [7]:
for rst in video_rst["embeddings"]:
    print(rst["embedding_type"])

AUDIO
VIDEO
