# Video Content analysis using Amazon Bedrock - Claude models
In this notebook, we will be performing video content analysis using Amazon Bedrock. We will be leveraging prompt engineering techniques to output a json file which will output the following json


# Install and import necessary libraries

In [None]:
#ffmpeg download 
#!sudo yum update -y

Install ffmpeg using instructions from https://www.maskaravivek.com/post/how-to-install-ffmpeg-on-ec2-running-amazon-linux/
Using existing build for Amazon Linux AMI under GNU license - https://www.johnvansickle.com/ffmpeg/

In [None]:
pip install webvtt-py

In [None]:
pip install PhotoHash

In [None]:
pip install ffmpeg

In [None]:
conda install ffmpeg=*=lgpl*

In [None]:
! ffmpeg

In [None]:
pip install m3u8-To-MP4

In [None]:
#import sagemaker and aws utils
import sagemaker
from sagemaker import get_execution_role

import boto3
from botocore.exceptions import ClientError

#import helper modules
import sys
import subprocess 
import json
import os
import time
from datetime import datetime
import csv
import pandas as pd
import numpy as np
from __future__ import print_function
import time
import webvtt
import re
import fsspec
import random
from pathlib import Path
from urllib.parse import urlparse
from IPython.display import HTML, display, Image as IImage
from PIL import Image, ImageDraw, ImageFont, ExifTags, ImageColor
import io
from io import BytesIO
import matplotlib.pyplot as plt
import base64
import pprint
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
import photohash
import shutil
import tempfile
import time
import json 
import sagemaker
import csv



#lmm, orchestration, vector store
# from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth, helpers
# from langchain.vectorstores import OpenSearchVectorSearch
# from langchain.embeddings import BedrockEmbeddings

# from langchain.document_loaders.csv_loader import CSVLoader
# from langchain.document_loaders import S3FileLoader



In [None]:
import pandas as pd
import requests


s3_client=boto3.client('s3')
sess = sagemaker.Session()
bucket = sess.default_bucket()



In [None]:
#credentials and clients
aws_account_id  = boto3.client('sts').get_caller_identity()['Account']  

sess = sagemaker.Session()
role = get_execution_role()
print(sess)
print(role)


region = boto3.Session().region_name
print(region)

credentials = boto3.Session().get_credentials().get_frozen_credentials()
access_key = credentials.access_key
secret_key = credentials.secret_key


s3_client = boto3.client('s3')
bedrock_client = boto3.client(service_name='bedrock-runtime', 
                              region_name=region)


# Generate summary and results functions

In [None]:
model_id_haiku = "us.anthropic.claude-3-5-haiku-20241022-v1:0" 
model_id_sonnet35v2 ="us.anthropic.claude-3-5-sonnet-20241022-v2:0"
model_id_sonnet37 ="us.anthropic.claude-3-7-sonnet-20250219-v1:0"


## Prepare Video Files

Get frame images for each video and store as a grid of images

In [None]:
def extract_frames(video_path,key):
    # Set the S3 bucket and folder paths
    print("video-path",video_path)
    video_filename = key.split('/')[-1]
    # Construct the S3 key for the input video file
    #s3_key = f"{video_path}/{video_filename}"
    print("video file---", video_filename)

    # Create a temporary directory
    output_dir = Path(f"/tmp/{random.randint(0, 1000000)}")
    while output_dir.exists():
        output_dir = Path(f"/tmp/{random.randint(0, 1000000)}")
    output_dir.mkdir(parents=True, exist_ok=False)

    # Download the video file from S3
    local_video_path = f"/tmp/{video_filename}"
    print("local_video_path---", local_video_path)
    print("bucket---", bucket)
    s3_client = boto3.client('s3')
    s3_client.download_file(bucket, key, local_video_path)

    # Set the output pattern for extracted frames
    output_pattern = output_dir / "frame-%07d.jpg"
    print(output_pattern)

    # Construct the ffmpeg command
    frame_rate = 1/1 ## extract second for each second
    ffmpeg_cmd = ["ffmpeg", "-i", local_video_path, "-vf", f"fps={frame_rate}", str(output_pattern)]

    try:
        subprocess.run(ffmpeg_cmd, check=True)
    except subprocess.CalledProcessError as err:
        print(f"Error running ffmpeg: {err}")

    # Clean up the temporary video file
    os.remove(local_video_path)

    # Upload the extracted frames to S3
    for frame_file in output_dir.glob("*.jpg"):
        s3_output_key = f"{frame_images_path}/{video_filename}/{frame_file.name}"
        #print ("Uploading file to:"+s3_output_key)
        s3_client.upload_file(str(frame_file), bucket, s3_output_key)

    # Clean up the temporary directory
    for file in output_dir.glob("*"):
        file.unlink()
    output_dir.rmdir()

    return f"s3://{bucket}/{frame_images_path}/{video_filename}/"


In [None]:
###Creates a 4x4 grid layout of sequential frames from video/animation
###Adds frame numbers on black bars above each image
###Saves multiple grid images if there are more than 16 frames

def create_tiled_image(s3_path, font_type):
    # Parse the S3 path
    parsed_url = urlparse(s3_path)
    bucket_name = parsed_url.netloc
    folder_path = parsed_url.path.lstrip('/')
    print ("folder path:"+folder_path)

    s3 = boto3.resource('s3')

    bucket = s3.Bucket(bucket_name)

    # Get the list of image files in the S3 folder
    image_keys = [obj.key for obj in bucket.objects.filter(Prefix=folder_path)]
    selected_keys = []
    for key in image_keys:
        if key.endswith(".jpg") and "computed" not in key:
            selected_keys.append(key)
    
    # Sort the image files by frame number
    selected_keys.sort(key=lambda x: int(x.split('-')[-1].split('.')[0]))

    # Select all frames and generate multiple images

    num_frames = len(selected_keys)

    # Load the selected images from S3
    images = []
    for key in selected_keys:
        image_obj = bucket.Object(key).get()
        image_data = image_obj['Body'].read()
        image = Image.open(BytesIO(image_data))
        images.append(image)

    # Create a new image to hold the 4x4 tiled grid
    tile_width = images[0].width * 4
    tile_height = images[0].height * 4 + 50  # Extra height for the black bar
    tiled_image = Image.new('RGB', (tile_width, tile_height))

    # Draw the images onto the tiled grid with a black bar on top
    num_images = num_frames / 16
    #print("number of image grids: "+str(num_images))
    print("number of frames: "+str(num_frames))
    num_images_int = int(num_images)
    if num_images > num_images_int:
        num_images_int = num_images_int + 1
    print("number of image grids: "+str(num_images_int))
    frame_counter = 0
    for k in range(num_images_int):
        for i in range(16):
            if frame_counter < num_frames and images[frame_counter]:
                row = i // 4
                col = i % 4
                x = col * images[frame_counter].width
                y = row * images[frame_counter].height + 50  # Offset for the black bar
                tiled_image.paste(images[frame_counter], (x, y))
                #print("frame counter:"+str(frame_counter))

                # Draw a black bar on top of the image
                draw = ImageDraw.Draw(tiled_image)
                bar_x = col * images[frame_counter].width
                bar_y = row * images[frame_counter].height
                bar_width = images[frame_counter].width
                bar_height = 50  # Increased height for the black bar
                draw.rectangle([(bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height)], fill=(0, 0, 0))

                # Add frame number in white color on the black bar
                font_size = 50  # Increased font size
                frame_number = selected_keys[frame_counter].split('-')[-1].split('.')[0]
                text_x = bar_x + 10
                text_y = bar_y + 20  # Adjusted y-coordinate for larger font
                font = ImageFont.truetype(font_type, font_size)
                draw.text((text_x, text_y), frame_number, font=font, fill=(255, 255, 255))
                
                frame_counter = frame_counter + 1

        # Save the tiled image to S3
        base_filename = Path(folder_path).stem
        grid_filename = f'image_grid-{k}-{base_filename}.jpg'
        grid_key = f'{folder_path}computed/{grid_filename}'
        grid_path = f'{folder_path}computed'
        temp_file = BytesIO()
        tiled_image.save(temp_file, format='JPEG')
        temp_file.seek(0)
        bucket.put_object(Key=grid_key, Body=temp_file)
    

    
    return f's3://{bucket_name}/{grid_path}/'

In [None]:
def create_tiled_image(s3_path, font_type):
    # Parse the S3 path
    parsed_url = urlparse(s3_path)
    bucket_name = parsed_url.netloc
    folder_path = parsed_url.path.lstrip('/')
    print("folder path:" + folder_path)

    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)

    # Get the list of image files in the S3 folder
    image_keys = [obj.key for obj in bucket.objects.filter(Prefix=folder_path)]
    selected_keys = []
    for key in image_keys:
        if key.endswith(".jpg") and "computed" not in key:
            selected_keys.append(key)
    
    # Sort the image files by frame number
    selected_keys.sort(key=lambda x: int(x.split('-')[-1].split('.')[0]))

    num_frames = len(selected_keys)
    if num_frames == 0:
        print("No frames found")
        return None

    # Load the selected images from S3
    images = []
    for key in selected_keys:
        image_obj = bucket.Object(key).get()
        image_data = image_obj['Body'].read()
        image = Image.open(BytesIO(image_data))
        images.append(image)

    # Create a new image to hold the 4x4 tiled grid
    tile_width = images[0].width * 4
    tile_height = images[0].height * 4 + 50  # Extra height for the black bar
    
    # Calculate number of complete grids needed
    num_images = (num_frames + 15) // 16  # Round up division
    print("number of frames: " + str(num_frames))
    print("number of image grids: " + str(num_images))

    frame_counter = 0
    for k in range(num_images):
        # Create a new tiled image for each grid
        tiled_image = Image.new('RGB', (tile_width, tile_height))
        
        for i in range(16):
            if frame_counter < num_frames:  # Check if we still have frames to process
                row = i // 4
                col = i % 4
                x = col * images[0].width
                y = row * images[0].height + 50  # Offset for the black bar
                tiled_image.paste(images[frame_counter], (x, y))

                # Draw a black bar on top of the image
                draw = ImageDraw.Draw(tiled_image)
                bar_x = col * images[0].width
                bar_y = row * images[0].height
                bar_width = images[0].width
                bar_height = 50
                draw.rectangle([(bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height)], fill=(0, 0, 0))

                # Add frame number in white color on the black bar
                font_size = 50
                frame_number = selected_keys[frame_counter].split('-')[-1].split('.')[0]
                text_x = bar_x + 10
                text_y = bar_y + 20
                font = ImageFont.truetype(font_type, font_size)
                draw.text((text_x, text_y), frame_number, font=font, fill=(255, 255, 255))
                
                frame_counter += 1

        # Save the tiled image to S3
        base_filename = Path(folder_path).stem
        grid_filename = f'image_grid-{k}-{base_filename}.jpg'
        grid_key = f'{folder_path}computed/{grid_filename}'
        grid_path = f'{folder_path}computed'
        temp_file = BytesIO()
        tiled_image.save(temp_file, format='JPEG')
        temp_file.seek(0)
        bucket.put_object(Key=grid_key, Body=temp_file)

    return f's3://{bucket_name}/{grid_path}/'


## Multimodal Prompting

In [None]:
def get_contextual_summary(image_grid_path, modelid):
    """
    Invokes Bedrock's conversation API to run a multimodal inference using the input
    provided in the request body.
    
    Args:
        image_grid_path: The Amazon S3 uri for the video's image grid (jpeg)
        modelid: The Bedrock model ID to use
    """
    
    # Initialize S3 clients
    s3 = boto3.resource('s3')
    s3_client = boto3.client('s3')
    
    # Parse S3 path
    bucket_name = image_grid_path.split('/')[2]
    prefix = '/'.join(image_grid_path.split('/')[3:])
    
    logger.info(f"Bucket name: {bucket_name}")
    logger.info(f"Prefix: {prefix}")
    
    # Get bucket
    bucket = s3.Bucket(bucket_name)
    
    # List objects in the folder
    try:
        image_objects = list(bucket.objects.filter(Prefix=prefix))
        logger.info(f"Found {len(image_objects)} objects with prefix {prefix}")
        
        # Filter for image files (assuming jpeg/jpg)
        image_keys = [obj.key for obj in image_objects 
                     if obj.key.lower().endswith(('.jpg', '.jpeg'))]
        logger.info(f"Found {len(image_keys)} image files")
        
        if not image_keys:
            logger.warning(f"No image files found in {image_grid_path}")
            
    except Exception as e:
        logger.error(f"Error listing objects in bucket: {str(e)}")
        raise

    # Define the analysis prompt
    prompt = """
    As an expert traffic safety analyst, analyze the provided traffic video footage image grid frames and:

    1. Safe Driving Behaviors:
    - Identify and highlight instances of proper following distance maintenance
    - Note correct use of turn signals and lane changes
    - Recognize appropriate speed adjustments for conditions
    - Point out defensive driving techniques in action

    2. Driver Safety Practices:
    - Detect proper seat belt usage
    - Observe correct hand positioning on steering wheel
    - Note appropriate mirror checks during maneuvers
    - Identify distraction-free driving behaviors

    3. Critical Analysis:
    - Timestamp each identified safe driving behavior
    - Rate the effectiveness of each observed safety practice (1-5 scale)
    - Provide specific commentary on what makes each highlighted action exemplary
    - Note how these actions contributed to accident prevention
    """

    # Construct the input message
    message = {
        "role": "user",
        "content": [
            {"type": "text", "text": prompt}
        ]
    }

    # Process and add images
    if image_keys:
        for image_key in image_keys:
            try:
                logger.info(f"Processing image: {image_key}")
                
                # Get image from S3
                response = s3_client.get_object(Bucket=bucket_name, Key=image_key)
                image_binary = response['Body'].read()
                
                # Check image size
                image_size = len(image_binary)
                logger.info(f"Image size: {image_size} bytes")
                
                # Encode image
                base64_string = base64.b64encode(image_binary).decode('utf-8')
                logger.info(f"Successfully encoded image {image_key}")
                
                # Add to message
                message["content"].append({
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": "image/jpeg",
                        "data": base64_string
                    }
                })
                logger.info(f"Added image {image_key} to message")
                
            except Exception as e:
                logger.error(f"Error processing image {image_key}: {str(e)}")
                continue

    # Verify images were added
    image_count = sum(1 for content in message["content"] if content["type"] == "image")
    logger.info(f"Total images added to message: {image_count}")

    # Prepare request body
    request_body = {
        "messages": [message],
        "max_tokens": 2048,
        "temperature": 0,
        "top_p": 0.999,
        "top_k": 250,
        "anthropic_version": "bedrock-2023-05-31"
    }

    try:
        # Initialize Bedrock runtime client
        bedrock_runtime = boto3.client('bedrock-runtime')
        
        logger.info("Invoking Bedrock model...")
        
        # Invoke model
        response = bedrock_runtime.invoke_model(
            modelId=modelid,
            contentType="application/json",
            accept="application/json",
            body=json.dumps(request_body)
        )
        
        # Process response
        response_body = json.loads(response['body'].read())
        input_tokens = response_body["usage"]["input_tokens"]
        output_tokens = response_body["usage"]["output_tokens"]
        
        logger.info("\nInvocation details:")
        logger.info(f"Input tokens: {input_tokens}")
        logger.info(f"Output tokens: {output_tokens}")
        
        response_content = response_body.get("content", [])
        logger.info(f"\nModel responses ({len(response_content)}):")
        for content in response_content:
            logger.info(content["text"])
            
        return response_body

    except ClientError as error:
        logger.error(
            "Failed to invoke model. Error: %s: %s",
            error.response["Error"]["Code"],
            error.response["Error"]["Message"]
        )
        raise



In [None]:
def extract_frames(video_path, key):
    try:
        print("video-path", video_path)
        video_filename = key.split('/')[-1]
        print("video file---", video_filename)

        # Create a temporary directory
        output_dir = Path(f"/tmp/{random.randint(0, 1000000)}")
        while output_dir.exists():
            output_dir = Path(f"/tmp/{random.randint(0, 1000000)}")
        output_dir.mkdir(parents=True, exist_ok=False)

        # Download the video file from S3
        local_video_path = f"/tmp/{video_filename}"
        print("local_video_path---", local_video_path)
        print("bucket---", bucket)
        
        # Check if video exists in S3
        s3_client = boto3.client('s3')
        try:
            # Get video file size
            response = s3_client.head_object(Bucket=bucket, Key=key)
            file_size = response['ContentLength']
            print(f"Video file size: {file_size} bytes")

            if file_size == 0:
                print("Error: Video file is empty")
                return None

            # Download with progress
            print("Downloading video file...")
            s3_client.download_file(bucket, key, local_video_path)

            # Verify the downloaded file
            if not os.path.exists(local_video_path):
                print("Error: Failed to download video file")
                return None

            local_size = os.path.getsize(local_video_path)
            if local_size != file_size:
                print(f"Error: Downloaded file size ({local_size}) doesn't match S3 file size ({file_size})")
                return None

            # Check video file integrity using ffprobe
            probe_cmd = ["ffprobe", "-v", "error", local_video_path]
            result = subprocess.run(probe_cmd, capture_output=True, text=True)
            if result.returncode != 0:
                print(f"Error: Video file is corrupted: {result.stderr}")
                return None

            # Set the output pattern for extracted frames
            output_pattern = output_dir / "frame-%07d.jpg"
            print(f"Output pattern: {output_pattern}")

            # Construct the ffmpeg command with more detailed options
            frame_rate = 1/1
            ffmpeg_cmd = [
                "ffmpeg",
                "-v", "error",  # Only show errors
                "-i", local_video_path,
                "-vf", f"fps={frame_rate}",
                "-frame_pts", "1",  # Add presentation timestamp
                "-q:v", "2",  # High quality
                str(output_pattern)
            ]

            print("Executing ffmpeg command:", " ".join(ffmpeg_cmd))
            result = subprocess.run(ffmpeg_cmd, capture_output=True, text=True)
            
            if result.returncode != 0:
                print(f"Error running ffmpeg: {result.stderr}")
                return None

            # Check if any frames were extracted
            frames = list(output_dir.glob("*.jpg"))
            if not frames:
                print("No frames were extracted")
                return None

            print(f"Successfully extracted {len(frames)} frames")

            # Upload frames to S3
            for frame_file in frames:
                s3_output_key = f"{frame_images_path}/{video_filename}/{frame_file.name}"
                s3_client.upload_file(str(frame_file), bucket, s3_output_key)

            return f"s3://{bucket}/{frame_images_path}/{video_filename}/"

        except s3_client.exceptions.NoSuchKey:
            print(f"Error: Video file not found in S3: {key}")
            return None
        except Exception as e:
            print(f"Error processing video: {str(e)}")
            return None
        finally:
            # Cleanup
            if os.path.exists(local_video_path):
                os.remove(local_video_path)
            if output_dir.exists():
                for file in output_dir.glob("*"):
                    file.unlink()
                output_dir.rmdir()

    except Exception as e:
        print(f"Unexpected error: {str(e)}")
        return None



In [None]:
def create_tiled_image(s3_path, font_type):
    try:
        if not s3_path:
            print("Invalid S3 path provided")
            return None

        # Parse the S3 path
        parsed_url = urlparse(s3_path)
        bucket_name = parsed_url.netloc
        folder_path = parsed_url.path.lstrip('/')
        print("folder path:" + folder_path)

        s3 = boto3.resource('s3')
        bucket = s3.Bucket(bucket_name)

        # Get the list of image files
        image_keys = [obj.key for obj in bucket.objects.filter(Prefix=folder_path)]
        selected_keys = []
        for key in image_keys:
            if key.endswith(".jpg") and "computed" not in key:
                selected_keys.append(key)

        if not selected_keys:
            print("No frames found to process")
            return None

        # Rest of your existing code...
        
    except Exception as e:
        print(f"Error in create_tiled_image: {e}")
        return None

    

    


# Test one video

In [None]:
#frame_images_path = "extracted-frames"
video_path = "traffic"
key = "traffic/TestVideo.mp4"
frame_images_path='frame_images'
extracted_frames_path = extract_frames(video_path, key)
print(extracted_frames_path)

In [None]:
image_grid_path=create_tiled_image(extracted_frames_path, 'DejaVuSans.ttf')
print("calling LLM ")


In [None]:
contextual_summary=get_contextual_summary(image_grid_path,model_id_sonnet35v2)
print("writing response ")
text_data = contextual_summary['content'][0]['text']

# Option 2A  Process combined video with Anthropic Claude

In [None]:
#frame_images_path = "extracted-frames"
video_path = "nova_video"
key = "nova_video/combined_videos.mov"
frame_images_path='frame_images'
extracted_frames_path = extract_frames(video_path, key)
print(extracted_frames_path)

In [None]:
def create_tiled_image1(s3_path, font_type):
    MAX_DIMENSION = 8000
    
    # Parse the S3 path
    parsed_url = urlparse(s3_path)
    bucket_name = parsed_url.netloc
    folder_path = parsed_url.path.lstrip('/')
    print("folder path:" + folder_path)

    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)

    # Get the list of image files in the S3 folder
    image_keys = [obj.key for obj in bucket.objects.filter(Prefix=folder_path)]
    selected_keys = []
    for key in image_keys:
        if key.endswith(".jpg") and "computed" not in key:
            selected_keys.append(key)
    
    # Sort the image files by frame number
    selected_keys.sort(key=lambda x: int(x.split('-')[-1].split('.')[0]))

    num_frames = len(selected_keys)
    if num_frames == 0:
        print("No frames found")
        return None

    # Load the selected images from S3
    images = []
    for key in selected_keys:
        image_obj = bucket.Object(key).get()
        image_data = image_obj['Body'].read()
        image = Image.open(BytesIO(image_data))
        images.append(image)

    # Calculate dimensions for the tiled grid
    single_width = images[0].width
    single_height = images[0].height
    tile_width = single_width * 4
    tile_height = single_height * 4 + 50  # Extra height for the black bar

    # Check if dimensions exceed maximum allowed size
    if tile_width > MAX_DIMENSION or tile_height > MAX_DIMENSION:
        # Calculate scaling factor
        scale_factor = min(MAX_DIMENSION / tile_width, MAX_DIMENSION / tile_height)
        
        # Resize all images
        new_width = int(single_width * scale_factor)
        new_height = int(single_height * scale_factor)
        
        resized_images = []
        for img in images:
            resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
            resized_images.append(resized)
        
        images = resized_images
        tile_width = new_width * 4
        tile_height = new_height * 4 + int(50 * scale_factor)  # Scale the black bar height too

    # Calculate number of complete grids needed
    num_images = (num_frames + 15) // 16  # Round up division
    print("number of frames: " + str(num_frames))
    print("number of image grids: " + str(num_images))

    frame_counter = 0
    for k in range(num_images):
        # Create a new tiled image for each grid
        tiled_image = Image.new('RGB', (tile_width, tile_height))
        
        for i in range(16):
            if frame_counter < num_frames:  # Check if we still have frames to process
                row = i // 4
                col = i % 4
                x = col * images[0].width
                y = row * images[0].height + int(50 * scale_factor) if 'scale_factor' in locals() else row * images[0].height + 50
                tiled_image.paste(images[frame_counter], (x, y))

                # Draw a black bar on top of the image
                draw = ImageDraw.Draw(tiled_image)
                bar_x = col * images[0].width
                bar_y = row * images[0].height
                bar_width = images[0].width
                bar_height = int(50 * scale_factor) if 'scale_factor' in locals() else 50
                draw.rectangle([(bar_x, bar_y), (bar_x + bar_width, bar_y + bar_height)], fill=(0, 0, 0))

                # Add frame number in white color on the black bar
                font_size = int(50 * scale_factor) if 'scale_factor' in locals() else 50
                frame_number = selected_keys[frame_counter].split('-')[-1].split('.')[0]
                text_x = bar_x + 10
                text_y = bar_y + (bar_height // 2 - font_size // 2)
                font = ImageFont.truetype(font_type, font_size)
                draw.text((text_x, text_y), frame_number, font=font, fill=(255, 255, 255))
                
                frame_counter += 1

        # Save the tiled image to S3
        base_filename = Path(folder_path).stem
        grid_filename = f'image_grid-{k}-{base_filename}.jpg'
        grid_key = f'{folder_path}computed/{grid_filename}'
        grid_path = f'{folder_path}computed'
        temp_file = BytesIO()
        tiled_image.save(temp_file, format='JPEG')
        temp_file.seek(0)
        bucket.put_object(Key=grid_key, Body=temp_file)

    return f's3://{bucket_name}/{grid_path}/'


In [None]:
image_grid_path=create_tiled_image1(extracted_frames_path, 'DejaVuSans.ttf')
print("calling LLM ")

In [None]:
def get_contextual_summary1(image_grid_path, modelid):
    """
    Invokes Bedrock's conversation API to run a multimodal inference using the input
    provided in the request body.
    
    Args:
        image_grid_path: The Amazon S3 uri for the video's image grid (jpeg)
        modelid: The Bedrock model ID to use
    """
    
    # Initialize S3 clients
    s3 = boto3.resource('s3')
    s3_client = boto3.client('s3')
    
    # Parse S3 path
    bucket_name = image_grid_path.split('/')[2]
    prefix = '/'.join(image_grid_path.split('/')[3:])
    
    logger.info(f"Bucket name: {bucket_name}")
    logger.info(f"Prefix: {prefix}")
    
    # Get bucket
    bucket = s3.Bucket(bucket_name)
    
    # List objects in the folder
    try:
        image_objects = list(bucket.objects.filter(Prefix=prefix))
        logger.info(f"Found {len(image_objects)} objects with prefix {prefix}")
        
        # Filter for image files (assuming jpeg/jpg)
        image_keys = [obj.key for obj in image_objects 
                     if obj.key.lower().endswith(('.jpg', '.jpeg'))]
        logger.info(f"Found {len(image_keys)} image files")
        
        if not image_keys:
            logger.warning(f"No image files found in {image_grid_path}")
            
    except Exception as e:
        logger.error(f"Error listing objects in bucket: {str(e)}")
        raise

    # Define the analysis prompt
    prompt = """
   As an expert traffic safety analyst, analyze the provided traffic video footage image grid frames showing 3 simultaneous camera angles and:

Analyze the provided video footage and answer the following:
Are there any pedestrians visible? If yes, describe their location, actions, and appearance.
Are any pedestrians at risk due to the driver's behavior? Explain why or why not.
Is the driver driving safely? Provide specific examples of unsafe or safe behaviors.
Is the driver distracted? If so, describe how and when.
    """

    # Construct the input message
    message = {
        "role": "user",
        "content": [
            {"type": "text", "text": prompt}
        ]
    }

    # Process and add images
    if image_keys:
        for image_key in image_keys:
            try:
                logger.info(f"Processing image: {image_key}")
                
                # Get image from S3
                response = s3_client.get_object(Bucket=bucket_name, Key=image_key)
                image_binary = response['Body'].read()
                
                # Check image size
                image_size = len(image_binary)
                logger.info(f"Image size: {image_size} bytes")
                
                # Encode image
                base64_string = base64.b64encode(image_binary).decode('utf-8')
                logger.info(f"Successfully encoded image {image_key}")
                
                # Add to message
                message["content"].append({
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": "image/jpeg",
                        "data": base64_string
                    }
                })
                logger.info(f"Added image {image_key} to message")
                
            except Exception as e:
                logger.error(f"Error processing image {image_key}: {str(e)}")
                continue

    # Verify images were added
    image_count = sum(1 for content in message["content"] if content["type"] == "image")
    logger.info(f"Total images added to message: {image_count}")

    # Prepare request body
    request_body = {
        "messages": [message],
        "max_tokens": 2048,
        "temperature": 0,
        "top_p": 0.999,
        "top_k": 250,
        "anthropic_version": "bedrock-2023-05-31"
    }

    try:
        # Initialize Bedrock runtime client
        bedrock_runtime = boto3.client('bedrock-runtime')
        
        logger.info("Invoking Bedrock model...")
        
        # Invoke model
        response = bedrock_runtime.invoke_model(
            modelId=modelid,
            contentType="application/json",
            accept="application/json",
            body=json.dumps(request_body)
        )
        
        # Process response
        response_body = json.loads(response['body'].read())
        input_tokens = response_body["usage"]["input_tokens"]
        output_tokens = response_body["usage"]["output_tokens"]
        
        logger.info("\nInvocation details:")
        logger.info(f"Input tokens: {input_tokens}")
        logger.info(f"Output tokens: {output_tokens}")
        
        response_content = response_body.get("content", [])
        logger.info(f"\nModel responses ({len(response_content)}):")
        for content in response_content:
            logger.info(content["text"])
            
        return response_body

    except ClientError as error:
        logger.error(
            "Failed to invoke model. Error: %s: %s",
            error.response["Error"]["Code"],
            error.response["Error"]["Message"]
        )
        raise



In [None]:
contextual_summary=get_contextual_summary1(image_grid_path,model_id_sonnet35v2)
print("writing response ")
text_data = contextual_summary['content'][0]['text']

# Option 2B -  Process individual videos's image gride and then summarize each analysis

You can use function (dont have test data so dont have the test data)

In [None]:
import boto3
import json
import logging
import concurrent.futures
import time
from datetime import datetime
from botocore.exceptions import ClientError

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def get_contextual_summary(s3_path, modelid):
    """
    Helper function to analyze footage from a single camera view
    
    Args:
        s3_path: Path to the camera footage in S3
        modelid: Bedrock model ID to use
    
    Returns:
        Analysis result for the specific camera view
    """
    try:
        # Create the analysis prompt for this camera view
        camera_prompt = f"""
        Analyze the driving footage from camera at {s3_path} and provide:
        
        1. Safety Assessment:
           - Vehicle positioning and lane discipline
           - Following distance management
           - Speed control and consistency
           - Response to road conditions and hazards
        
        2. Risk Identification:
           - Immediate safety concerns
           - Potential hazards
           - Traffic rule compliance
        
        3. Driver Behavior (if visible):
           - Attention level
           - Control inputs
           - Defensive driving practices
        """
        
        # Prepare the message for Bedrock
        message = {
            "messages": [{
                "role": "user",
                "content": [{"type": "text", "text": camera_prompt}]
            }],
            "max_tokens": 1024,
            "temperature": 0,
            "top_p": 0.999,
            "top_k": 250,
            "anthropic_version": "bedrock-2023-05-31"
        }
        
        # Call Bedrock with retry logic
        max_retries = 3
        for attempt in range(max_retries):
            try:
                bedrock_runtime = boto3.client('bedrock-runtime')
                response = bedrock_runtime.invoke_model(
                    modelId=model_id_sonnet35v2,
                    contentType="application/json",
                    accept="application/json",
                    body=json.dumps(message)
                )
                
                return json.loads(response['body'].read())
                
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)  # Exponential backoff
                
    except Exception as e:
        logger.error(f"Error in get_contextual_summary: {str(e)}")
        raise

def prompt_chaining_safety_analysis(dash_cam_path, rear_cam_path, side_cam_path, modelid):
    """
    Performs a multi-view safety analysis by chaining prompts across different camera angles
    and generating a comprehensive final analysis.
    
    Args:
        dash_cam_path: S3 path to front camera footage frames
        rear_cam_path: S3 path to rear view camera footage frames
        side_cam_path: S3 path to driver-facing camera footage frames
        modelid: Bedrock model ID to use
    
    Returns:
        Final comprehensive safety analysis
    """
    logger.info("Starting multi-view safety analysis...")
    
    # Validate input paths
    for path, name in [(dash_cam_path, 'front'), (rear_cam_path, 'rear'), (side_cam_path, 'driver')]:
        if not path.startswith('s3://'):
            raise ValueError(f"Invalid S3 path format for {name} camera: {path}")
    
    # Store analysis results for each view
    view_analyses = {}
    
    # Process each camera view
    camera_views = {
        'front_cam': dash_cam_path,
        'rear_cam': rear_cam_path,
        'driver_cam': side_cam_path
    }
    
    try:
        # Process camera views in parallel
        with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
            future_to_view = {
                executor.submit(get_contextual_summary, s3_path, modelid): view_name 
                for view_name, s3_path in camera_views.items()
            }
            
            for future in concurrent.futures.as_completed(future_to_view):
                view_name = future_to_view[future]
                try:
                    view_result = future.result()
                    if view_result and 'content' in view_result:
                        view_text = view_result['content'][0]['text']
                        view_analyses[view_name] = view_text
                        logger.info(f"Successfully processed {view_name}")
                    else:
                        logger.warning(f"No analysis generated for {view_name}")
                        view_analyses[view_name] = "Analysis not available"
                except Exception as e:
                    logger.error(f"Error processing {view_name}: {str(e)}")
                    view_analyses[view_name] = f"Error in analysis: {str(e)}"

        # Create comprehensive analysis prompt
        comprehensive_prompt = f"""
        As a senior traffic safety expert, review the following analyses from multiple camera angles 
        of the same driving sequence and provide a comprehensive safety assessment.
        
        FRONT CAMERA ANALYSIS (External View):
        {view_analyses.get('front_cam', 'Not available')}
        
        REAR CAMERA ANALYSIS (External View):
        {view_analyses.get('rear_cam', 'Not available')}
        
        DRIVER CAMERA ANALYSIS (Internal View):
        {view_analyses.get('driver_cam', 'Not available')}
        
        Please provide:
        1. Unified Safety Score (1-100):
           - Calculate overall safety rating considering all angles
           - Break down scoring criteria and weights used
           - Factor in driver attention and behavior
        
        2. Comprehensive Safety Assessment:
           - Cross-reference behaviors across camera angles
           - Analyze driver attention and responsiveness
           - Evaluate external hazard awareness
           - Assess compliance with traffic rules
        
        3. Driver Behavior Analysis:
           - Attention level and focus on road
           - Response to potential hazards
           - Signs of fatigue or distraction
           - Use of safety equipment
        
        4. Critical Safety Recommendations:
           - Prioritized improvement areas
           - Specific defensive driving techniques
           - Driver attention enhancement strategies
        
        5. Multi-angle Safety Integration:
           - Correlation between driver behavior and external events
           - Effectiveness of camera coverage
           - Suggestions for enhanced monitoring
        """

        # Get final comprehensive analysis with retry logic
        max_retries = 3
        retry_count = 0
        
        while retry_count < max_retries:
            try:
                bedrock_runtime = boto3.client('bedrock-runtime')
                final_response = bedrock_runtime.invoke_model(
                    modelId=model_id_sonnet37,
                    contentType="application/json",
                    accept="application/json",
                    body=json.dumps({
                        "messages": [{
                            "role": "user",
                            "content": [{"type": "text", "text": comprehensive_prompt}]
                        }],
                        "max_tokens": 2048,
                        "temperature": 0,
                        "top_p": 0.999,
                        "top_k": 250,
                        "anthropic_version": "bedrock-2023-05-31"
                    })
                )
                
                final_result = json.loads(final_response['body'].read())
                break
                
            except Exception as e:
                retry_count += 1
                if retry_count == max_retries:
                    raise Exception(f"Failed to generate analysis after {max_retries} attempts: {str(e)}")
                logger.warning(f"Retry {retry_count} of {max_retries} for final analysis")
                time.sleep(2 ** retry_count)  # Exponential backoff

        # Process and format the final analysis
        if 'content' in final_result:
            final_analysis = {
                'individual_analyses': view_analyses,
                'comprehensive_analysis': final_result['content'][0]['text'],
                'metadata': {
                    'input_tokens': final_result.get('usage', {}).get('input_tokens', 0),
                    'output_tokens': final_result.get('usage', {}).get('output_tokens', 0),
                    'timestamp': datetime.now().isoformat(),
                    'model_id': modelid
                }
            }
            
            logger.info("Successfully generated comprehensive safety analysis")
            return final_analysis
            
        else:
            raise Exception("No content in final analysis response")
            
    except Exception as e:
        logger.error(f"Error in prompt chaining analysis: {str(e)}")
        raise

    finally:
        logger.info("Completed safety analysis process")

    

    




In [None]:
#frame_images_path = "extracted-frames"
video_path = "nova_video"
key = "nova_video/frontfacing_dc.mov"
frame_images_path='frame_images'
extracted_frames_path = extract_frames(video_path, key)
print(extracted_frames_path)
image_grid_path=create_tiled_image(extracted_frames_path, 'DejaVuSans.ttf')
print("calling LLM ")
contextual_summary=get_contextual_summary(image_grid_path,model_id_sonnet35v2)
print("writing response ")
text_data_front = contextual_summary['content'][0]['text']
print(text_data_front)

In [None]:
#frame_images_path = "extracted-frames"
video_path = "nova_video"
key = "nova_video/sidefacing_dc.mov"
frame_images_path='frame_images'
extracted_frames_path = extract_frames(video_path, key)
print(extracted_frames_path)
image_grid_path=create_tiled_image(extracted_frames_path, 'DejaVuSans.ttf')
print("calling LLM ")
contextual_summary=get_contextual_summary(image_grid_path,model_id_sonnet35v2)
print("writing response ")
text_data_side = contextual_summary['content'][0]['text']
print(text_data_side)

In [None]:
#frame_images_path = "extracted-frames"
video_path = "nova_video"
key = "nova_video/infacing_dc.mov"
frame_images_path='frame_images'
extracted_frames_path = extract_frames(video_path, key)
print(extracted_frames_path)
image_grid_path=create_tiled_image(extracted_frames_path, 'DejaVuSans.ttf')
print("calling LLM ")
contextual_summary=get_contextual_summary(image_grid_path,model_id_sonnet35v2)
print("writing response ")
text_data_infacing = contextual_summary['content'][0]['text']
print(text_data_infacing)

In [None]:
def summarize(client, inface,side,front,model_id,max_tokens=4096, temperature=0, top_p=0.9):
    prompt=""" You are safety expert for safe driving. Analyze the summary from individual camera feed and 
    generate the final recommendation and summary 
    <inward_facing_camera>"""+inface+ """</inward_facing_camera> <side_camera>"""+side+""" </side_camera> <front_camera>"""+front+"""</front_camera>"""
    
    response = ""
    try:
        response = bedrock_client.converse(
            modelId=model_id_sonnet37,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "text": prompt
                        } 
                    ]
                }
                    
            ],
            inferenceConfig={
                "temperature": temperature,
                "maxTokens": max_tokens,
                "topP": top_p
            }
            #additionalModelRequestFields={
            #}
        )
    except Exception as e:
        print(e)
        result = "Model invocation error"
    try:
        res=response['output']['message']['content'][0]['text']
        result = response['output']['message']['content'][0]['text'] \
        + '\n--- Latency: ' + str(response['metrics']['latencyMs']) \
        + 'ms - Input tokens:' + str(response['usage']['inputTokens']) \
        + ' - Output tokens:' + str(response['usage']['outputTokens']) + ' ---\n'
        #data=res.split(":", 1)
        #data.to_csv('file1.csv')
        return res
    except Exception as e:
        print(e)
        result = "Output parsing error"
    return result

In [None]:
summary=summarize(bedrock_client,text_data_infacing,text_data_side,text_data_front,model_id_sonnet37)
print(summary)