In [3]:
import boto3


s3_resource = boto3.resource("s3")
print("Hello, Amazon S3! Let's list your buckets:")
for bucket in s3_resource.buckets.all():
    print(f"\t{bucket.name}")

Hello, Amazon S3! Let's list your buckets:
	video-aws-bucket


In [4]:
s3 = boto3.client('s3')

# File to be uploaded
file_name = "images/poatan.png"
bucket_name = 'video-aws-bucket'
object_name = 'image_0.png' 

# Upload the file
try:
    response = s3.upload_file(file_name, bucket_name, object_name)
    print("File uploaded successfully")
except Exception as e:
    print(f"Error uploading file: {e}")


File uploaded successfully


## Upload Video Frames to AWS

When uploading to the bucket, we're going to want to make sure that the same file name is not duplicated. In order to do this, we're going to maintain a key-value store containing video names and metadata

In [5]:
import time
from botocore.exceptions import ClientError

def upload_metadata(video_name, table):
    try:
        # Check if video_name already exists
        response = table.get_item(Key={'video_name': video_name})
        if 'Item' in response:
            raise ValueError(f"Video name '{video_name}' already exists.")
        
        table.put_item(
            Item={
                'video_name': video_name,
                'timestamp': int(time.time())
            }
        )
    except ClientError as e:
        print(e.response['Error']['Message'])
        return False
    except ValueError as e:
        print(e)
        return False
    
    return True

# Example use
video_name = "example_video"
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('video_names')
unique_id = upload_metadata(video_name, table)
if unique_id:
    print(f"Metadata uploaded with unique ID: {unique_id}")
else:
    print("Failed to upload metadata.")

Video name 'example_video' already exists.
Failed to upload metadata.


In [6]:
import cv2

def video_to_frames(video_path: str) -> list:
    try:
        vidcap = cv2.VideoCapture(video_path)
        if not vidcap.isOpened():
            raise ValueError(f"Failed to open video at path: {video_path}")
        
        success, image = vidcap.read()
        count = 0
        frames = []
        while success:
            frames.append(image)
            success, image = vidcap.read()
            count += 1
        
        print(f"Read {count} frames")
        
        return frames
    
    except cv2.error as e:
        print(f"Error reading video: {e}")
        return []
    except ValueError as e:
        print(e)
        return []

In [7]:
import io 

def upload_video_frames(video_path, table, bucket_name):
    frames = video_to_frames(video_path)
    if not frames:
        print("Failed to read video frames.")
        return False
    video_name = video_path.split('/')[-1].split('.')[0]
    video_in_db = upload_metadata(video_name, table)
    if not video_in_db:
        print("Failed to upload metadata.")
        return False
    else: 
        s3_client = boto3.client('s3')
        for i, frame in enumerate(frames): 
            frame_name = f"{video_name}_frame_{i}.png"
            image = Image.fromarray(frame)
            image_buffer = io.BytesIO()
            image.save(image_buffer, format='PNG')
            image_buffer.seek(0)
            try: 
                s3_client.upload_fileobj(image_buffer, bucket_name, frame_name)
                print(f'Uploaded {frame_name} to {bucket_name}')
            except ClientError as e:
                print(f"dropped frame: {e.response['Error']['Message']}")

            image_buffer.close()

In [8]:
upload_video_frames('videos/psa.mp4', table, 'video-aws-bucket')

Read 159 frames
Video name 'psa' already exists.
Failed to upload metadata.


False