## Video and Audio Content Analysis with Amazon Bedrock and Amazon Aurora PostgreSQL pgvector

This notebook demonstrates how to process video and audio content using [Amazon Bedrock](https://aws.amazon.com/bedrock/) to invoke [Amazon Titan Multimodal Embeddings G1 model](https://docs.aws.amazon.com/bedrock/latest/userguide/titan-multiemb-models.html) for generating multimodal embeddings, [Amazon Transcribe](https://aws.amazon.com/transcribe/) for converting speech to text, and [Amazon Aurora PostgreSQL](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/data-api.html) with pgvector for efficient vector storage and similarity search, you will build an app that understands both visual and audio content, enabling natural language queries to find specific moments in videos.

> Create Amazon Aurora PostgreSQL with this [Amazon CDK Stack](https://github.com/build-on-aws/langchain-embeddings/tree/main/create-aurora-pgvector)

![Diagram](images/video-embedding.png)

In [None]:
#!pip install boto3
#!pip install json
#!pip install base64
#!pip install uuid
# or install requirements.txt

In [None]:
import boto3
import json
import os
from PIL import Image as PILImage
import random

region = os.environ.get("AWS_DEFAULT_REGION", "us-west-2")
ssm = boto3.client(service_name="ssm", region_name=region)
sns_client = boto3.client('stepfunctions')


# Default model settings
default_model_id = os.environ.get("DEFAULT_MODEL_ID", "amazon.titan-embed-image-v1")
default_embedding_dimmesion = os.environ.get("DEFAULT_EMBEDDING_DIMENSION", "1024")

## 2. Database Interface (AuroraPostgres Class)

An `AuroraPostgres` class that interacts with Amazon Aurora PostgreSQL [using RDS Data API](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/data-api.html)

Code: [aurora_service.py](create_audio_video_helper/aurora_service.py)

In [None]:
from create_audio_video_helper import AuroraPostgres

## 3. Video Content Processing

A `VideoProcessor` class uses the [ffmpeg libavcodec library](https://ffmpeg.org/) to proccess the audio and create frames. 

The class is set to process frames every 1 second, you can modify this by changing the FPS value in command.

Code: [video_processor.py](create_audio_video_helper/video_processor.py)

## 6. Embedding Generation

Generate Embeddings for each extracted frame. Embeddins are created with the Amazon Titan Multimodal Embeddings G1 model using Amazon Bedrock Invoke Model API. 

Code: [embedding_generation.py](create_audio_video_helper/embedding_generation.py)


In [None]:
from create_audio_video_helper import EmbeddingGeneration

### Configuration
The system uses environment variables and AWS Systems Manager Parameter Store for configuration:

**DEFAULT_MODEL_ID:** Bedrock model ID (default: "amazon.titan-embed-image-v1")

**DEFAULT_EMBEDDING_DIMENSION:** Embedding dimension (default: "1024")

In [None]:

def get_ssm_parameter(name):
    response = ssm.get_parameter(Name=name, WithDecryption=True)
    return response["Parameter"]["Value"]



In [None]:
# Get Data from environment variables, never share secrets!

cluster_arn = get_ssm_parameter("/videopgvector/cluster_arn")
credentials_arn = get_ssm_parameter("/videopgvector/secret_arn")
table_name = get_ssm_parameter("/videopgvector/video_table_name")
api_url = get_ssm_parameter("/videopgvector/api_retrieve")
bucket_name = get_ssm_parameter("/videopgvector/bucket_name")
state_machine_arn = get_ssm_parameter("/videopgvector/state_machine_arn")

In [None]:
print(credentials_arn)

In [None]:
# Initialize Aurora PostgreSQL client
aurora = AuroraPostgres(cluster_arn, table_name, credentials_arn,region)

In [None]:
# Verify Aurora Cluster conectivity:
aurora.execute_statement("select count(*) from bedrock_integration.knowledge_bases")

In [None]:
embedding_generation = EmbeddingGeneration(region,default_model_id,default_embedding_dimmesion)

In [None]:
aurora.execute_statement("select count(*) from bedrock_integration.knowledge_bases")

In [None]:
s3_client = boto3.client('s3')
base_path = "images/"

def download_file(base_path,bucket, key, filename):
    print("Download file from s3://{}{}".format(bucket,key))
    with open(base_path+filename, "wb") as data:
        s3_client.download_fileobj(bucket, key, data)
    print("Download file from s3://{}{}".format(bucket,key))
    return True

def read_image_from_s3(s3_key):
    parts = s3_key.split('s3://')[-1].split('/', 1)
    bucket_name = parts[0]
    image_key = parts[1]
    try:
        response = s3_client.get_object(Bucket=bucket_name, Key=image_key)
        image_data = response['Body'].read()
        return image_data
    except Exception as e:
        print(f'Error reading image from {s3_key}: {str(e)}')
        raise

# Upload Video to Amazon S3 bucket
def upload_file_to_s3 (video_path,bucket_name,s3_key):
    s3_client.upload_file(video_path, bucket_name,s3_key)
    print("Upload successful!")



In [None]:
video_path = "<your-video-path>"
s3_key = f"video/{video_path}"
upload_file_to_s3 (video_path,bucket_name,s3_key)

### Check the status of the Step Functions workflow processing your video

In [None]:
response = sns_client.describe_state_machine(
    stateMachineArn=state_machine_arn
)

In [None]:
response = sns_client.list_executions(
    stateMachineArn=state_machine_arn,
    maxResults=12
)
response['executions'][0]

## Similarity Search

Implements functions for:
- `retrieve()`: Performs similarity searches in the database and displays results
- `aurora.similarity_search()`: Executes the vector similarity search in the database
- `get_embeddings()`: Generates embeddings for the search query

In [None]:
from IPython.display import display

def retrieve(search_query, how="cosine", k=5):
    search_vector = embedding_generation.get_embeddings(search_query)
    
    result = aurora.similarity_search(search_vector,how=how, k=k)
    rows = json.loads(result.get("formattedRecords"))
    for row in rows:
        metric = "similarity" if how == "cosine" else "distance"
        metric_value = row.get(metric)
        if row.get("content_type") == "text":
            print("row: ", row)
            print (f"text:\n{row.get('chunks')}\n{metric}:{metric_value}\nmetadata:{row.get('metadata')}\n")
        if row.get("content_type") == "image":
            print(row)
            sourceurl = row.get('sourceurl')
            print(sourceurl)
            bucket_name = sourceurl.split("/")[2] 
            key = sourceurl.replace("s3://", "").replace(bucket_name,"").lstrip("/")
            filename = sourceurl.split("/")[-1] 
            print("bucket_name: ",bucket_name)
            print("key: ",key)
            print("filename: ",filename)
            download_file(base_path,bucket_name, key, filename)
            img = PILImage.open(base_path+filename)            
            print (f"Image:\n{row.get('sourceurl')}\n{metric}:{metric_value}\nmetadata:{row.get('metadata')}\n")
            display(img)
        del row["embedding"]
        del row["id"]

    return rows

I tested the notebook with my AWS re:Invent 2024 sesion [AI self-service support with knowledge retrieval using PostgreSQL](https://www.youtube.com/watch?v=fpi3awGakyg?trk=fccf147c-636d-45bf-bf0a-7ab087d5691a&sc_channel=video). 

I ask for Aurora and it brings me images and texts where it mentions:

![Diagram](data/cosine.png)

```bash
text:
memory . A place where all the information is stored and can easily be retrievable , and that's where the vector database comes in . This is the the first building block . And a vector database stores and retrieves data in the form of vector embeddeds or mathematical representations . This allows us to find similarities between data rather than relying on the exact keyword match that is what usually happens up to today . This is essential for systems like retrieval ofmented generation or RAC , which combines external knowledge with the AI response to deliver those accurate and context aware response . And by the way , I think yesterday we announced the re-rank API for RAC . So now your rack applications , you can score and it will prioritize those documents that have the most accurate information . So at the end will be even faster and cheaper building rack . We're gonna use Amazon Aurora postgrade SQL with vector support that will give us a scalable and fully managed solution for our AI tasks .
similarity:0.5754164493071239
metadata:{"speaker":"spk_0","second":321}
```


In [None]:
search_query = "aurora"
docs = retrieve(search_query, how="cosine", k=10)

In [None]:
search_vector = embedding_generation.get_embeddings(search_query)
result = aurora.similarity_search(search_vector,how="cosine", k=3)

In [None]:
rows = json.loads(result.get("formattedRecords"))

In [None]:
search_query = "elizabeth"
docs = retrieve(search_query, how="l2", k=10)

## RAG Implementation

Finally, the notebook implements a complete RAG system:
- `CustomMultimodalRetriever`: A custom retriever class that extends BaseRetriever
- `_get_relevant_documents()`: Core retrieval method that finds similar content
- `image_content_block()`: Formats image content for LLM consumption
- `text_content_block()`: Formats text content for LLM consumption
- `parse_docs_for_context()`: Processes documents for context (text and images)
- `ThinkingLLM`: Uses an LLM to answer questions based on retrieved content

> Based on https://github.com/langchain-ai/langchain/blob/master/docs/docs/how_to/custom_retriever.ipynb



In [None]:
from typing import List

from langchain_core.callbacks import CallbackManagerForRetrieverRun
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever

class CustomMultimodalRetriever(BaseRetriever):
    """A retriever that contains the top k documents that contain the user query.
    query could be text or image_bytes
    """
    k: int
    """Number of top results to return"""
    how: str
    """How to calculate the similarity between the query and the documents."""

    def _get_relevant_documents(
        self, query: str, *, run_manager: CallbackManagerForRetrieverRun
    ) -> List[Document]:
        """Sync implementations for retriever."""
        search_vector = embedding_generation.get_embeddings(query)
        result = aurora.similarity_search(search_vector, how=self.how, k=self.k)
        rows = json.loads(result.get("formattedRecords"))

        matching_documents = []

        for row in rows:
            document_kwargs = dict(
                metadata=dict(**json.loads(row.get("metadata")), content_type = row.get("content_type"), source=row.get("sourceurl")))
            
            if self.how == "cosine":
                document_kwargs["similarity"] = row.get("similarity")
            elif self.how == "l2":
                document_kwargs["distance"] = row.get("distance")

            if row.get("content_type") == "text":
                matching_documents.append( Document( page_content=row.get("chunks"), **document_kwargs ))
            if row.get("content_type") == "image":
                matching_documents.append( Document( page_content=row.get("source"),**document_kwargs ))

        return matching_documents

In [None]:
retriever = CustomMultimodalRetriever(how="cosine", k=20)

In [None]:
query = "como funciona aurora"
docs = retriever.invoke(query)


In [None]:
list(docs)

# Building the RAG 

In [None]:
from typing import List, Dict
bedrock_runtime = boto3.client(service_name="bedrock-runtime", region_name=_region_name)


budget_tokens = 0
max_tokens = 1024
conversation: List[Dict] = []
reasoning_config = {"thinking": {"type": "enabled", "budget_tokens": budget_tokens}}

In [None]:
def image_content_block(sourceurl):
    print("sourceurl: ",sourceurl)
    bucket_name = sourceurl.split("/")[2] 
    key = sourceurl.replace("s3://", "").replace(bucket_name,"").lstrip("/")
    filename = sourceurl.split("/")[-1] 
    print("bucket_name: ",bucket_name)
    print("key: ",key)
    print("filename: ",filename)
    image_bytes = read_image_from_s3(sourceurl)
    extension = filename.split('.')[-1]
    print (f"Including Image :{filename}")
    if extension == 'jpg':
        extension = 'jpeg'
    
    block = { "image": { "format": extension, "source": { "bytes": image_bytes}}}
    print(block)
    return block

def text_content_block(text):
    return { "text": text }

def parse_docs_for_context(docs):
    blocks = []
    for doc in docs:
        if doc.metadata.get('content_type') == "image":
            print(doc)
            blocks.append(image_content_block(doc.metadata.get('source')))
        else:
            print(doc)
            blocks.append(text_content_block(doc.page_content))
    return blocks

In [None]:
def answer(model_id,system_prompt,content) -> str:
    """Get completion from Claude model based on conversation history.

    Returns:
        str: Model completion text
    """

    # Invoke model
    kwargs = dict(
        modelId=model_id,
        inferenceConfig=dict(maxTokens=max_tokens),
        messages=[
            {
                "role": "user",
                "content": content,
            }
        ],

    )

    kwargs["system"] = [{"text": system_prompt}]

    response = bedrock_runtime.converse(**kwargs)
    
    return response.get("output",{}).get("message",{}).get("content", [])
    


In [None]:

system_prompt = """Answer the user's questions based on the below context. If the context has an image, indicate that it can be reviewed for further feedback.
If the context doesn't contain any relevant information to the question, don't make something up and just say "I don't know". (IF YOU MAKE SOMETHING UP BY YOUR OWN YOU WILL BE FIRED). For each statement in your response provide a [n] where n is the document number that provides the response. """
model_id = "us.amazon.nova-pro-v1:0"


In [None]:
query = "<your-query>"
docs = retriever.invoke(query)
parsed_docs = parse_docs_for_context(docs)

In [None]:
llm_response = answer(model_id,system_prompt,[text_content_block(f"question:{query}\n\nDocs:\n"), *parsed_docs])

In [None]:
print(llm_response[0].get("text"))

In [None]:
query = "<your-query>"
docs = retriever.invoke(query)
parsed_docs = parse_docs_for_context(docs)
llm_response = answer(model_id,system_prompt,[text_content_block(f"question:{query}\n\nDocs:\n"), *parsed_docs])
print(llm_response[0].get("text"))

# Query thought Amazon Lambda function

In [None]:
import boto3
import json

In [None]:

# Inicializar el cliente de Lambda
lambda_client = boto3.client('lambda', region_name="us-east-1")

# 1. Invocación Síncrona (RequestResponse)
def invoke_lambda_sync(function_name, payload):
    try:
        response = lambda_client.invoke(
        FunctionName=function_name, 
        InvocationType='RequestResponse',
        Payload=json.dumps(payload)
        )
        response_payload = json.loads(response['Payload'].read())
        return response_payload

    except Exception as e:
        print(f"Error invoking Lambda: {str(e)}")
        raise


In [None]:
function_name = "<your-lambda>"

In [None]:
payload = {
    "id": "<your-video-name>",
    "query": "<your-query>",
    "method":"retrieve_generate"
}

In [None]:
response_payload = invoke_lambda_sync(function_name, payload)

In [None]:
response_payload