# Amazon Bedrock Batch inference with Amazon Nova FMs to analyze videos - Work with videos on a large scale
This notebook walks through the end-to-end process of running batch inference on a collection of videos stored in Amazon S3 using Amazon Nova Pro or Amazon Nova Premier via Amazon Bedrock.

With [Batch Inference](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference.html), you can provide a set of prompts as a single input file and receive responses as a single output file, allowing you to get simultaneous large-scale predictions. The responses are processed and stored in your Amazon S3 bucket so you can access them at a later time. Amazon Bedrock offers support for Amazon Nova Foundation Models (FMs) for batch inference at a 50% lower price compared to on-demand inference pricing. Please refer to model list [here](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference-supported.html).

## Introduction
**Amazon Bedrock** provides a unified API for invoking foundation models like Amazon Nova Lite, Amazon Nova Pro or Amazon Nova Premier as described [here](https://docs.aws.amazon.com/nova/latest/userguide/what-is-nova.html). When you want to analyze, summarize, caption, or classify a **large set of videos** you can use **batch inference**. This notebook shows you how to:

1. Discover all your MP4 videos in Amazon S3
2. Build a JSONL payload referencing them
3. Upload payload to Amazon S3
4. Kick off a Amazon Nova batch inference job
5. Poll for completion
6. Download the results locally and explore


## Table of Contents
1. [Prerequisites](#prerequisites)
2. [Install Dependencies](#install-dependencies)
3. [Configuration & Imports](#configuration--imports)
4. [Helper Functions](#helper-functions)
5. [List Videos in S3](#list-videos-in-s3)
6. [Build JSONL Payload](#build-jsonl-payload)
7. [Upload JSONL to S3](#upload-jsonl-to-s3)
8. [Invoke Batch Job](#invoke-batch-job)
9. [Poll Job Status](#poll-job-status)
10. [Download Results](#download-results)
11. [Conclusion](#conclusion)


## Prerequisites
Before you begin, ensure that you have the following prerequisites in place:

1. You must have permissions to invoke `CreateModelInvocationJob` and `GetModelInvocationJob` API. Refer to the documentation to learn about [required permissions for batch inference job](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference-prereq.html#batch-inference-permissions).
2. Provide a S3 bucket with empty prefixes for `video/batch/input/` and `video/batch/output/`.
3. Bedrock Batch Inference requires a service role so that it can access and write to S3 on your behalf. You can create the service role manually [see here](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-iam-sr.html) or use the AWS Console workflow which can create one for you [here](https://us-east-1.console.aws.amazon.com/bedrock/home?region=us-east-1#/batch-inference/create). We also provide a quick way to create a service role in the code below. The role requires permissions to read and write data on Amazon S3 bucket (`GetObject`, `ListBucket`, `PutObject`).
4. This notebook was built using videos with `.mp4` format.
5. Ensure that you are in a AWS region that is supported for Batch Inference. Refer [here](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference-supported.html) for documentation.
6. The default maximum size of a single file (in GB) submitted for batch inference for Nova models is 1 GB. However, you can request an increase [here](https://us-east-1.console.aws.amazon.com/servicequotas/home/services/bedrock/quotas/L-68FC8D47) as needed.
7. Amazon Bedrock Batch Inference requires a minimum of 100 records.

In [None]:
%pip install --quiet "boto3>=1.35.1" huggingface_hub sagemaker

Next we import the dependencies.

In [None]:
import json
import logging
import re
import html
import os
import time
from IPython.display import display, Markdown, HTML
from urllib.parse import urlparse
from botocore.config import Config as BConfig
import boto3
from huggingface_hub import snapshot_download
import sagemaker

Please update the variables below to be able to run inside your AWS account:

In [None]:
BUCKET_NAME = 'YOUR_AMAZON_S3_BUCKET_NAME' # REPLACE this bucket name with the name of your Amazon S3 bucket
ROLE_ARN = 'ROLE_ARN' # REPLACE with the arn of the IAM service role that you created in the prerequisites
MODEL_ID = 'arn:aws:bedrock:us-east-1::foundation-model/amazon.nova-pro-v1:0' # the arn of the model. Change this if you want to use a different model or a different region

In [None]:
DATASET_DIR = "./local_data" # if you want you can point this to a local directory that contains videos to process
VIDEOS_SOURCE_PREFIX = 'video/batch/source/academic_source'
INPUT_PREFIX = 'video/batch/input'
OUTPUT_PREFIX = 'video/batch/output'
MAX_TOKENS = 200
OUTPUT_FOLDER = os.path.join(DATASET_DIR,'outputs')

In [None]:
# We will use the Amazon SageMaker Python SDK to upload to Amazon S3 buckets
session = boto3.session.Session(region_name="us-east-1")
sagemaker_session = sagemaker.Session(boto_session=session)

In [None]:
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)

# Initialize AWS clients
boto_cfg = BConfig(retries={'max_attempts':10,'mode':'standard'})
s3 = session.client('s3', config=boto_cfg)
bedrock = session.client('bedrock', config=boto_cfg)

### Download Sample Dataset

Download a dataset from the HuggingFace Hub as an example. 

If you want to use your own videos you can skip this step and instead create a directory `./local_data`, place your videos in that directory and upload them directly to the Amazon S3 bucket in the next step. 

We will use a subset of videos from the [LLaVa-Video-178k](https://huggingface.co/datasets/lmms-lab/LLaVA-Video-178K) as sample video input.



<div style="border: 2px solid #006CE0; 
    padding: 10px; 
    border-radius: 5px; 
    max-width: 100%;
    background: #f0fbff;">
    <b>Info:</b> Dataset <a href="https://huggingface.co/datasets/lmms-lab/LLaVA-Video-178K">lmms-lab/LLaVA-Video-178K</a> is released to public under <a href="https://www.apache.org/licenses/LICENSE-2.0">Apache 2.0 License</a>.
</div>

In [None]:
file_path = snapshot_download(
    repo_id="malterei/LLaVA-Video-small-swift",
    repo_type="dataset",
    allow_patterns=[
        "videos/academic_source/Charades/*",
        "videos/academic_source/youcook2/*",
        "videos/academic_source/activitynet/*",
    ],
    local_dir=DATASET_DIR
)
print(f"Downloaded dataset to local filepath: {file_path}")

In [None]:
# Let's move the videos into the root of ./local_data
!mv {DATASET_DIR}/videos/academic_source/* {DATASET_DIR}

### Upload videos to your Amazon S3 bucket
Upload the whole folder **./local_data** to an Amazon S3 bucket.
In case you want to use a different folder structure in Amazon S3, you have to change the variable **VIDEOS_SOURCE_PREFIX**.

In [None]:
dataset_s3_uri = sagemaker_session.upload_data(
    DATASET_DIR, 
    bucket=BUCKET_NAME, 
    key_prefix=VIDEOS_SOURCE_PREFIX,
)
print(f"Uploaded data from local: {file_path} to s3: {dataset_s3_uri}")

In [None]:
## Helper Functions
def list_mp4s_in_s3(bucket, prefix):
    """
    Recursively list all .mp4 object keys under a given S3 prefix.
    """
    logger.info(f"Listing MP4s under s3://{bucket}/{prefix}/")
    token = None
    keys = []
    while True:
        params = {'Bucket': bucket, 'Prefix': prefix}
        if token:
            params['ContinuationToken'] = token
        resp = s3.list_objects_v2(**params)
        for obj in resp.get('Contents', []):
            k = obj['Key']
            if k.lower().endswith('.mp4'):
                keys.append(k)
                logger.info(f"  • {k}")
        if not resp.get('IsTruncated'):
            break
        token = resp.get('NextContinuationToken')
    return keys


def pretty_llm_print(video_output, title=None):
    """Generates formatted output showing the prompt once and then video names and their summaries."""
    # Header styling
    header = ""
    if title:
        header = f"""<div style='border: 2px solid #000000; 
            padding: 10px; 
            border-radius: 5px; 
            max-width: fit-content; 
            margin: 0 auto; 
            text-align: center; 
            font-weight: bold;'>{title}</div>\n"""

    body_parts = [header]
    
    # Display the prompt (user's question) ONCE at the top
    if video_output:
        user_content = video_output[0]['modelInput']['messages'][0]['content']
        prompt_texts = [html.escape(c['text']).replace('\\n', '\n')
                        for c in user_content if 'text' in c]
        if prompt_texts:
            body_parts.append("\n**Prompt:**\n\n")
            body_parts.append("<div style='margin-bottom: 1em; font-style: italic;'>")
            body_parts.append("<br>".join(prompt_texts))
            body_parts.append("</div>")
            body_parts.append("\n\n---\n")  # Horizontal rule after prompt


    # Process each video entry
    for entry in video_output:
        # Extract video name from S3 URI
        video_uri = entry['modelInput']['messages'][0]['content'][1]['video']['source']['s3Location']['uri']
        video_name = video_uri.split(f"{VIDEOS_SOURCE_PREFIX}/")[-1]  # Get filename from URI

        
        # Add video name header
        body_parts.append(f"\n## 📹 {video_name}\n")
        
        # Process assistant summary
        assistant_content = entry['modelOutput']['output']['message']['content']
        for content in assistant_content:
            if 'text' in content:
                processed = process_content_string(content['text'])
                body_parts.append(processed)
        
        # Add separator between entries
        body_parts.append("\n\n---\n")

    # Final styling
    styled_markdown = f"""
<div style="border: 2px solid #FFC000; 
    padding: 10px; 
    border-radius: 5px; 
    max-width: 100%;">
{''.join(body_parts)}
</div>"""
    display(Markdown(styled_markdown))

def process_content_string(text):
    """Format thinking/answer blocks"""
    text = text.replace('\\n', '\n')
    
    answer_style = """<div style="background-color: #e8f5e9; 
        border-left: 4px solid #43a047; 
        padding: 10px; 
        margin: 10px 0; 
        border-radius: 4px;">
        <strong style="color: #43a047;">Summary</strong>
        <div style="margin-top: 8px; white-space: pre-wrap;">{}</div>
    </div>"""
    
    # Convert <answer> tags to summary blocks
    text = re.sub(r'<answer>(.*?)</answer>', 
                 lambda m: answer_style.format(m.group(1)), 
                 text, 
                 flags=re.DOTALL)
    
    return text

## List videos in Amazon S3
Use the helper function to find all MP4s under your source prefix in your Amazon S3 bucket.

In [None]:
mp4_keys = list_mp4s_in_s3(BUCKET_NAME, VIDEOS_SOURCE_PREFIX)
print(f"Found {len(mp4_keys)} videos.")
mp4_keys[:5]  # show first 5

## Build JSONL Payload

Amazon Bedrock Batch Inference takes a JSONL file as input. The JSONL contains rows of JSON objects. Each line contains one request to the model:

```json
{ "recordId" : "alphanumeric string", "modelInput" : {JSON body} }
...
```

In this case with Amazon Nova video understanding the JSON body in the `modelInput` field adheres to the Amazon Nova messages format with video content. It contains a text prompt and points to a video stored on Amazon S3. Here is the schema for the JSON body in `modelInput` that we are using:

```json
{
    "schemaVersion": "messages-v1",
    "messages":[
        {
            "role":"user",
            "content": [
                {
                  "text": string
                },
                {
                  "video": {
                    "format": "mp4",
                    "source": {
                      "s3Location": {
                        "uri": "s3://my-bucket/object-key"
                        "bucketOwner": "123456789012"
                       }
                    }
                  }
                },
            ]
        }
    ],
    "inferenceConfig": {
        "maxTokens": 200
    }
}
```

You can read more about the request schema in the AWS Documentation:
* [Amazon Nova Complete request schema](https://docs.aws.amazon.com/nova/latest/userguide/complete-request-schema.html)
* [Format and upload your batch inference data](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference-data.html)
* [Amazon Nova Video understanding examples](https://docs.aws.amazon.com/nova/latest/userguide/modalities-video-examples.html)

We define a helper function that converts the list of Amazon S3 keys into a newline-delimited JSONL file for batch input. The text instructions that we send to Amazon Nova alongside the video is the following prompt: "Please summarize this video in ~200 words.".

In [None]:
def build_jsonl_from_s3_keys(keys, bucket):
    """
    Build a list of JSONL-ready records pointing to videos via s3Location.
    """
    records = []
    for idx, k in enumerate(keys):
        uri = f"s3://{bucket}/{k}"
        video_obj = {
            'video': {
                'format':'mp4', 
                'source':{
                    's3Location':{
                        'uri':uri
                    }
                }
            }
        }
        text_obj = {
            'text':'Please summarize this video in ~200 words.'
        }
        rec = {
            'recordId': f'video-{idx}',
            'modelInput':{
                'schemaVersion':'messages-v1',
                'messages':[
                    {
                        'role':'user',
                        'content':[
                            text_obj, 
                            video_obj
                        ]
                    }
                ],
                'inferenceConfig':{
                    'maxTokens':MAX_TOKENS
                }
            }
        }
        records.append(rec)
    return records


Let's use the helper function to create the JSONL file for your Amazon S3 keys.

In [None]:
records = build_jsonl_from_s3_keys(mp4_keys, BUCKET_NAME)
input_key = f"{INPUT_PREFIX}/video_batch_{int(time.time())}.jsonl"
jsonl_str = '\n'.join(json.dumps(r) for r in records)
print(f"Generated {len(records)} records → {input_key}")
print(jsonl_str[:500], '...')

## Upload JSONL to Amazon S3
The batch inference job will read the JSONL file from Amazon S3. Upload the JSONL to the Amazon S3 bucket below:

In [None]:
s3.put_object(Bucket=BUCKET_NAME, Key=input_key, Body=jsonl_str.encode('utf-8'))
print(f"Uploaded JSONL to s3://{BUCKET_NAME}/{input_key}")

## Invoke Amazon Bedrock Batch Inference Job

Start the Amazon Bedrock Batch Inference job with Amazon Nova as the foundation model.

In the request you specify the input data configuration which is the Amazon S3 bucket that contains the videos and the JSONL file which contains the prompts.

The `modelId` specifies which model the batch inference job should use. 

The request also contains the output location. Amazon Bedrock Batch Inference will write the output from the model to the Amazon S3 bucket that is configured in the output data config. After the batch inference completes you can get the output from that Amazon S3 bucket.

As part of the request you also specify the arn of the role that the batch inference job will assume.

In [None]:
print(f"Starting batch inference job with model: {MODEL_ID}")
resp = bedrock.create_model_invocation_job(
    jobName=f"batch-video-{int(time.time())}",
    modelId=MODEL_ID,
    inputDataConfig={ 
        's3InputDataConfig': {
            's3Uri': f"s3://{BUCKET_NAME}/video/batch/",
            's3InputFormat': 'JSONL'
    }},
    outputDataConfig={ 
        's3OutputDataConfig': {
            's3Uri': f"s3://{BUCKET_NAME}/{OUTPUT_PREFIX}/"
        }
    },
    roleArn=ROLE_ARN
)
job_arn = resp['jobArn']
print(f"Started job ARN: {job_arn}")

## Poll Job Status
Wait until the batch job completes.

In [None]:
while True:
    status_resp = bedrock.get_model_invocation_job(jobIdentifier=job_arn)
    status = status_resp['status']
    print('Status:', status)
    if status in ('Completed','Failed'):
        break
    time.sleep(10)
print('Final status:', status)

## Download Results
Fetch the generated JSONL output files to your local `./outputs/` folder.

In [None]:
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
out_uri = status_resp['outputDataConfig']['s3OutputDataConfig']['s3Uri']
parsed = urlparse(out_uri)
out_bucket, out_prefix = parsed.netloc, parsed.path.lstrip('/')
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=out_bucket, Prefix=out_prefix):
    for obj in page.get('Contents', []):
        if not obj['Key'].lower().endswith('.jsonl.out'):
            continue
        dst = os.path.join(OUTPUT_FOLDER, os.path.basename(obj['Key']))
        s3.download_file(out_bucket, obj['Key'], dst)
        print('Downloaded →', dst)

In [None]:
# extract summaries from first output file after batch processing
output_files = os.listdir(OUTPUT_FOLDER)
objects = []
with open(os.path.join(OUTPUT_FOLDER, output_files[0]), 'r') as f:
    for line in f:
        obj = json.loads(line)
        objects.append(obj)

In [None]:
# the first 5 summaries will be displayed
first_five = objects[:5]
model_name = MODEL_ID.rpartition('/')[-1]
pretty_llm_print(first_five, title="Video summaries by batch processing with " + model_name)

### Integrating with Existing Workflows

After retrieving the processed output data, you can integrate it into your existing workflows or analytics systems for further analysis or downstream processing. For example, you could:

- Store the summarized videos in a database for easy access and querying.
- Perform sentiment analysis or topic modeling on the summarized transcripts to gain additional insights.
- Categorize the summarizes into actionable business buckets.

The specific integration steps will depend on your existing workflows and systems, but the processed output data from the batch inference job can be easily incorporated into various data pipelines and analytics processes.

## Conclusion

The notebook covers the entire process, from data preparation and formatting to job submission, output retrieval, and integration with existing workflows. You can leverage the JSONL outputs for further analysis or visualization. Feel free to adapt and extend this notebook to suit your specific requirements, and explore other use cases where batch inference can be applied to optimize your interactions with foundation models at scale. 