### Purpose of this notebook
The purpose of this notebook is to convert 1000+ videos and its parquet into video frames and load the numpy files to S3

In [1]:
#Install missing packages
#!pip install boto3
#!pip install pyarrow
#!pip install fastparquet
#!pip install s3fs
#!pip install configparser
#!pip install zlib
#!pip install psutil
#!pip install opencv-python

In [1]:
# Import required libraries
import boto3 #Video files get read through this
import cv2
import os
import pandas as pd
import math
import numpy as np
import random
import io
import s3fs # Parquet files get read through this
import zlib # For compression
import time # To calculate download time
import configparser
import requests
import psutil # Checks memory usage

In [2]:
# Let's read in the credentials file
config = configparser.ConfigParser()
config.read('credentials')

['credentials']

In [6]:
aws_access_key_id = config["default"]['aws_access_key_id']
aws_secret_access_key = config["default"]['aws_secret_access_key']
bucket_name = 'asl-capstone'
prefix = 'youtube-asl/1000-samples/'
save_path = '/content/temp_folder'
s3_URI = 's3://asl-capstone/'

In [7]:
# Create an s3 object
s3 = boto3.client('s3',aws_access_key_id = aws_access_key_id, aws_secret_access_key = aws_secret_access_key,
                  region_name = 'us-west-2')

In [8]:
# Let's get a list of all the parquet files in the S3 bucket
paginator = s3.get_paginator('list_objects_v2')
parquet_files = []
for page in paginator.paginate(Bucket = bucket_name, Prefix = prefix+'parsed/'):
    parquet_files.extend(content['Key'] for content in page.get('Contents',[]))
print(parquet_files[0:4])

['youtube-asl/1000-samples/parsed/--6bmFM9wT4.ase.parquet', 'youtube-asl/1000-samples/parsed/-9aGqJpaN7c.ase.parquet', 'youtube-asl/1000-samples/parsed/-FSlHH2ReLA.ase.parquet', 'youtube-asl/1000-samples/parsed/-GtDaiSJkSQ.en.parquet']


In [9]:
# Let's get a list of all the videos
paginator = s3.get_paginator('list_objects_v2')
video_files = []
for page in paginator.paginate(Bucket = bucket_name, Prefix = 'youtube-asl/1000-samples/'):
    video_files.extend(content['Key'] for content in page.get('Contents',[]) if content['Key'].endswith(('.mkv','.webm','.mp4')))
print(video_files[0:4])

['youtube-asl/1000-samples/--6bmFM9wT4.webm', 'youtube-asl/1000-samples/-9aGqJpaN7c.mkv', 'youtube-asl/1000-samples/-FSlHH2ReLA.webm', 'youtube-asl/1000-samples/-GtDaiSJkSQ.mkv']


In [8]:
# Let's get a list of all the downloaded videos
paginator = s3.get_paginator('list_objects_v2')
downloaded_videos = []
for page in paginator.paginate(Bucket = bucket_name, Prefix = 'youtube-asl/1000-samples/numpy_files/'):
    downloaded_videos.extend(content['Key'] for content in page.get('Contents',[]) if content['Key'].endswith(('.npy')))
print(downloaded_videos[0:4])

['youtube-asl/1000-samples/numpy_files/--6bmFM9wT4.npy', 'youtube-asl/1000-samples/numpy_files/--6bmFM9wT4_cap.npy', 'youtube-asl/1000-samples/numpy_files/-9aGqJpaN7c.npy', 'youtube-asl/1000-samples/numpy_files/-9aGqJpaN7c_cap.npy']


In [8]:
downloaded_videos = [file for file in downloaded_videos if '_cap' not in os.path.basename(file).split('.')[0]]
downloaded_videos = [os.path.basename(file).split('.')[0] for file in downloaded_videos]
print(downloaded_videos[0:4])

['--6bmFM9wT4', '-9aGqJpaN7c', '-FSlHH2ReLA', '-GtDaiSJkSQ']


In [9]:
video_files = [file for file in video_files if os.path.basename(file).split('.')[0] not in downloaded_videos]

In [10]:
print(video_files[0:4])
try:
    video_files.remove('youtube-asl/1000-samples/EoByRQldF-4.mkv')
    #video_files.remove('youtube-asl/1000-samples/GdYNF6g73eI.webm')
except:
    print("Video not found")

['youtube-asl/1000-samples/EoByRQldF-4.mkv', 'youtube-asl/1000-samples/GdYNF6g73eI.webm']


In [11]:
parquet_names = [os.path.basename(file).split('.')[0] for file in parquet_files]
video_names = [os.path.basename(file).split('.')[0] for file in video_files]
not_there_files = [file for file in parquet_names if file not in video_names]

In [12]:
print(len(not_there_files))
print(not_there_files)

1154
['--6bmFM9wT4', '-9aGqJpaN7c', '-FSlHH2ReLA', '-GtDaiSJkSQ', '-HkeOGWJWLI', '-JGpOd2AlVY', '-QHnZBBE8Ho', '-ZOyG_dW_1M', '-Zrf6jWiFZs', '-_oUXqM2Zjc', '-aLiyA30EQI', '-d4a2kGy0sg', '-eBnWC1ngLg', '-j1GUcRAbPc', '-jar_eJa87Y', '-kvW8iUnuoo', '-o9Tj5gJGI4', '-oqvbNZSFnA', '-qK5tz1UHSM', '-toZLgPr-7E', '-ulbptDunGg', '-w3fITJYQy0', '00urZoasbzw', '03LKR5Kjgd4', '04QD9ew7UHI', '0553Gp2y-RE', '0I7h9AmcusY', '0J4_26aTVF8', '0L6S0b14ZmI', '0Pp2AcPbHnw', '0WcGVcWLLy0', '0axfOnsyUE8', '0dMMGRq3EaI', '0gLuBychKgQ', '0ibHGUofRWQ', '0oJp4HiGlqw', '0tQRCmF1lhE', '0u2AVVL5KRE', '0zIK6G6DdTI', '11QQwlNKUr4', '11bYjaf1gWc', '11gzqTBlm5Q', '182RxC-rQZw', '1PM8NOrXLMc', '1Tr77GEcSrE', '1W05nmX9T38', '1Wol7Y3ezhA', '1YYaRxqSxNc', '1ZByLiG8-3c', '1fCuZxuGHGM', '1hQ3N5wsyeI', '1iTycyw2jcE', '1nJt4VHPNE0', '1utumpy3Aec', '2-0BoNb-3DE', '209YWxzAzyQ', '20nY5jgoY8w', '2FqUerxjxXY', '2Kxcrq-_Jts', '2RjGna641y4', '2SEgXpck6-o', '2Tgl8gxz_HA', '2cryaHnc62w', '2msG66_yfBw', '2pDVyY9wsRg', '2r-hs8cizg4', '2rU

In [13]:
# Make sure that parquet_files only has the same files as video_files
parquet_files = [file for file in parquet_files if os.path.basename(file).split('.')[0] not in not_there_files] 

In [14]:
print(len(video_files), len(parquet_files)) # Confirm that the lists are equal

1 1


In [15]:
filename = os.path.basename(video_files[0])
local_video_path = "temp_folder/" + filename
# Let's read in one parquet file using S3FS
fs = s3fs.S3FileSystem(key=aws_access_key_id, secret=aws_secret_access_key)

In [16]:
# Define function to process video
def video_frame_capturer(video_name, start_frame, end_frame):
  current_frame = start_frame
  filename = os.path.basename(video_name)
  video_path = "temp_folder/" + filename
  #download_from_s3('asl-capstone', video_name, local_video_path, aws_access_key_id, aws_secret_access_key)
  #print(f"Reading {video_path}") 
  #if psutil.virtual_memory().percent>=90:
  #    time.sleep(20)
  video_capture = cv2.VideoCapture(video_path)
  video_capture.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
  while video_capture.isOpened() and current_frame <= end_frame:
    ret, frame = video_capture.read()
    #print("In frame number ",video_capture.get(cv2.CAP_PROP_POS_FRAMES))
    if not ret:
      break
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    frame = cv2.resize(frame, (1280, 720))
    current_frame+=1
    yield frame # Does not terminate the function call; but comes back to it
  video_capture.release()

In [17]:
def read_caption(caption_file):
    with fs.open('asl-capstone/'+caption_file, 'rb') as f:
        df = pd.read_parquet(f)
    return df

In [18]:
def save_frames_file(frame_list, filename, caption_list): #, caption_list):
    numpy_array = frame_list[0]
    caption_array = np.array(caption_list[0], dtype = str)
    filename = filename.split('.')[0]
    s3_file_path = f'{prefix}numpy_files/{filename}.npy'
    s3_cap_path = f'{prefix}numpy_files/{filename}_cap.npy'
    file_path = "temp_folder/"+filename+".npy"
    caption_path = "temp_folder/"+filename+"_cap.npy"
    #file_path = "s3://asl-capstone/youtube-asl/1000-samples/numpy_files/"+filename
    np.save(f"{file_path}",numpy_array)
    np.save(f"{caption_path}",caption_array)
    s3.upload_file(file_path, bucket_name, s3_file_path)
    s3.upload_file(caption_path, bucket_name, s3_cap_path)
    os.remove(file_path)
    os.remove(caption_path)

In [None]:
download_time = []
errored_video = []
errored_caption = []
counter = 0
skipped_videos = []
skipped_captions = []
for video, caption in zip(video_files, parquet_files):
    try:
        if os.path.basename(video).split('.')[0] in os.path.basename(caption):
            vid_frames = [] # Empty list to store video frames
            master_caption = [] # Empty list to store captions
            start_time = time.time()
            filename = os.path.basename(video)
            local_video_path = "temp_folder/"+filename
            #print(local_video_path)
            if os.path.exists(local_video_path):
                print("Video already exists. Not downloading again")
            else:
                s3.download_file(bucket_name,video,local_video_path)
            temp_df = read_caption(caption)
            size_file_name = os.path.basename(video)
            size_file_path = 'temp_folder/'+size_file_name
            if os.path.getsize(size_file_path) < (195*1024*1024):
                print(f"Skipping a video as it's huge. Video name: {size_file_name} of size {os.path.getsize(size_file_path)/(1024*1024)} MB")
                skipped_videos.append(video)
                skipped_captions.append(caption)
                os.remove(size_file_path)
                continue
            for _, row in temp_df.iterrows():            
                temp_frame = video_frame_capturer(video,int(row['start_frame']), int(row['end_frame']))
                for frame in temp_frame:
                    vid_frames.append(frame)
                master_caption.append(row['Caption'])
            save_frames_file(vid_frames,filename, master_caption)
            file_size = os.path.getsize(local_video_path)/(1024*1024)
            os.remove(local_video_path)
            end_time = time.time()
            download_time.append(end_time-start_time)
            counter+=1
            print(f"Number of videos complete is {counter}.")
            #time.sleep(5)
    except Exception as e:
        print(f"An error occurred. See {e}")
        errored_video.append(video)
        errored_caption.append(caption)
        print("We will still kill the instance")
            #print(f"Time taken to process {filename} of size {file_size:.2f} MB is {((end_time - start_time)):.2f} seconds")

In [None]:
time_download = sum(download_time)/len(download_time)
print(f"Average processing time per video is {time_download:.2f} seconds")

In [21]:
def get_instance_id():
    """Get current instance ID from metadata"""
    url = "http://169.254.169.254/latest/meta-data/instance-id"
    response = requests.get(url)
    return response.text

In [22]:
def stop_instance(instance_id, region_name='us-west-2'):
    """Stop the EC2 instance"""
    ec2 = boto3.client('ec2', region_name=region_name)
    ec2.stop_instances(InstanceIds=[instance_id])

In [23]:
# Get the current instance ID
instance_id = get_instance_id()

# Stop the instance
stop_instance(instance_id)

NoCredentialsError: Unable to locate credentials