In [75]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
import os
import cv2
import shutil
from PIL import Image

# Sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Tensorflow
import tensorflow as tf


# Variables #

In [76]:
# These 20 words were selected based on the amount of samples available
# selected_words = [
#     'like', 'work', 'play', 'take', 'call',
#     'go', 'study', 'give', 'write', 'yesterday',
#     'far', 'hot', 'cold', 'good', 'bad',
#     'computer', 'apple', 'doctor', 'family', 'dog'
# ]
# selected_words = ['work','study', 'write', 'hot', 'cold', 'family']
# selected_words = ['hello',
# 'bye',
# 'world',
# 'yes',
# 'no',
# 'I',
# 'you',
# 'go',
# 'work',
# 'drink',
# 'beer',
# 'many',
# 'what',
# 'thank you',
# 'love']
selected_words = ['hello',
'bye',
'world']

n_classes = len(selected_words)

# Absolute main path
main_path = '../data/custom_videos/'

# Frame sampling parameters
frames_per_video = 20
target_size = (480, 480)

# Dataset multiplier
number_of_augmentations = 1

# Train split parameters
train_size = 0.7


# 1) Load Data #

In [77]:
# Read JSON file into a DataFrame with unprocessed instance col
# wlas_df = pd.read_json(main_path + 'WLASL_v0.3.json')


In [78]:
def get_videos_ids(json_list):
    """
    function to check if the video id is available in the dataset
    and return the viedos ids of the current instance

    Input: instance json list
    Output: list of videos_ids
    """
    videos_list = []
    for ins in json_list:
        video_id = ins['video_id']
        if os.path.exists(f'{main_path}videos/{video_id}.mp4'):
            videos_list.append(video_id)
    return videos_list


In [79]:
def get_json_features(json_list):
    """
    function to check if the video id is available in the dataset
    and return the viedos ids and url or any other featrue of the current instance

    input: instance json list
    output: list of videos_ids
    """
    videos_ids = []
    videos_urls = []
    for ins in json_list:
        video_id = ins['video_id']
        video_url = ins['url']
        if os.path.exists(f'{main_path}videos/{video_id}.mp4'):
            videos_ids.append(video_id)
            videos_urls.append(video_url)
    return videos_ids, videos_urls


In [80]:
# Open JSON file (read only)
# with open(main_path+'WLASL_v0.3.json', 'r') as data_file:
#     json_data = data_file.read()

# instance_json = json.loads(json_data)


In [81]:
# # Get available video ids for all rows in wlas_df and add to new col 'videos_id'
# wlas_df['videos_ids'] = wlas_df['instances'].apply(get_videos_ids)
# wlas_df


In [82]:
# Create separate DataFrame for available information in each instance
features_df = pd.DataFrame(columns=['word', 'video_id'])

for filename in os.listdir("../data/custom_videos/"):
    word = filename.split("_")[0]
    filename = filename.replace(".mp4","")
    df = pd.DataFrame([[word, filename]], columns=features_df.columns)
    # Append temporary df to feature_df
    features_df = pd.concat([features_df, df], ignore_index=True)

# Renaming index col to index
# features_df.index.name = 'index'
features_df


Unnamed: 0,word,video_id
0,bye,bye_Benjamin_4
1,love,love_Eigo_4
2,many,many_Benjamin_4
3,world,world_Jaris_4
4,thankyou,thankyou_Benjamin_2
...,...,...
295,world,world_Jaris_3
296,bye,bye_Eigo_2
297,go,go_Roshni_3
298,drink,drink_Jaris_2


# 2) Define 20 target classes #

In [83]:
selected_df = features_df[features_df['word'].isin(selected_words)]
selected_df["video_length"]=60


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  selected_df["video_length"]=60


In [84]:
for video_id in selected_df['video_id']:
    if os.path.exists(f'{main_path}videos/{video_id}.mp4'):
        cap = cv2.VideoCapture(f'{main_path}videos/{video_id}.mp4')
        length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        selected_df.loc[selected_df['video_id'] == video_id, ['video_length']] = int(length)
    pass

selected_df = selected_df.reset_index(drop=True)
input_length = len(selected_df)
selected_df


Unnamed: 0,word,video_id,video_length
0,bye,bye_Benjamin_4,60
1,world,world_Jaris_4,60
2,hello,hello_Benjamin_1,60
3,hello,hello_Roshni_4,60
4,hello,hello_Jaris_5,60
5,world,world_Benjamin_4,60
6,hello,hello_Eigo_3,60
7,bye,bye_Benjamin_1,60
8,bye,bye_Benjamin_2,60
9,hello,hello_Eigo_2,60


# 3) Defining the Input/Features: X #

In [85]:
# Initialize empty array of desired shape
X = np.empty((input_length, frames_per_video, *target_size, 3), dtype=np.uint8)

# Function to perform frame sampling
def sample_frames(video_path, frames_per_video, total_frames):
    frames = []
    cap = cv2.VideoCapture(video_path)

    frame_indices = []

    while len(set(frame_indices)) != frames_per_video:
        frame_indices = sorted(np.random.uniform(0, total_frames-5, frames_per_video).astype(int))

    frame_counter = 0

    try:
        while cap.isOpened():
            ret, frame = cap.read()

            if not ret:
                break

            if frame_counter in frame_indices:
                # Resize frame to required size
                frame = cv2.resize(frame, target_size)
                # CV2 output BGR -> converting to RGB
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                # Append to list of frames
                frames.append(frame_rgb)

            frame_counter += 1

            if len(frames) == frames_per_video:
                break

    finally:
        cap.release()

    return frames


In [86]:
np.random.seed(10)

for i, row in selected_df.iterrows():
    video_id = row['video_id']
    total_frames = row['video_length']
    video_path = f'../data/custom_videos/{video_id}.mp4'

    sampled_frames = sample_frames(video_path, frames_per_video, total_frames)

    # Assign sampled frames to results array
    X[i] = np.array(sampled_frames)


In [87]:
if X.shape == (len(selected_df), frames_per_video, *target_size, 3):
    print(f'✅ X has been initialized with Shape {X.shape}!')
else:
    print('❌ X has not been initialized properly!')


✅ X has been initialized with Shape (60, 20, 480, 480, 3)!


# 4) Defining the Output/Target: y #

In [88]:
label_encoder = LabelEncoder()

selected_df['encoded_word'] = label_encoder.fit_transform(selected_df['word'])
y_cat = tf.keras.utils.to_categorical(selected_df['encoded_word'], num_classes=n_classes)


In [89]:
if y_cat.shape == (input_length, n_classes):
    print(f'✅ y has been initialized with Shape {y_cat.shape}!')
else:
    print('❌ y has not been initialized properly!')
print()


✅ y has been initialized with Shape (60, 3)!



# 5) Restore sampled frames into .mp4 files and write CSV #

In [90]:
def generate_processed_videos(X):
    """
    Generate processed videos from sampled frames.

    Parameters:
    - X (numpy.ndarray): Array containing sampled frames for multiple videos.
    - output_folder (str): Path to the folder to store processed videos. Defaults to '../data/processed_videos/'.

    Returns:
    - None
    """
    def frames_to_video(sampled_frames, output_path, fps=frames_per_video):
        height, width, _ = sampled_frames[0].shape
        fourcc = cv2.VideoWriter.fourcc(*'mp4v')
        video = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        for frame in sampled_frames:
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            video.write(frame_rgb)

        video.release()

    output_folder = os.path.dirname('../data/processed_videos/')

    if os.path.exists(output_folder):
        shutil.rmtree(output_folder)

    os.makedirs(output_folder)

    for i, sampled_frames in enumerate(X):
            video_path = f'../data/processed_videos/processed_{i}.mp4'
            frames_to_video(sampled_frames, video_path)


In [91]:
def generate_csv(list_of_dataframes):
    output_folder = os.path.dirname('../data/csv/')

    # Remove the folder if it exists
    if os.path.exists(output_folder):
        shutil.rmtree(output_folder)

    os.makedirs(output_folder)  # Recreate the folder

    for i, dataframe in enumerate(list_of_dataframes):
        file_path = f'{output_folder}/dataframe_{i}.csv'
        dataframe.to_csv(file_path, index=False)


# 6) Create Train / Validation split #

In [92]:
X_train, X_val, y_cat_train, y_cat_val = train_test_split(X,
                                                          y_cat,
                                                          train_size=train_size,
                                                          random_state=1,
                                                          stratify=y_cat)


# 7) Video augmentation for increased data set size #

In [93]:
def augment_frame_params(frame_width, frame_height):
    """
    Generate random parameters for image augmentation.

    Args:
    - frame_width (int): Width of the frame.
    - frame_height (int): Height of the frame.

    Returns:
    Variables containing randomly generated parameters for frame augmentation.
        - angle (float): Random rotation between 0 and 15 degrees.
        - flip (float): Random value for horizontal mirroring.
        - x_trans (int): Random translation along the x-axis between -20 and 20 pixels.
        - y_trans (int): Random translation along the y-axis between -20 and 20 pixels.
        - scale (float): Random zoom factor between 0.8 and 1.2.
        - crop_size (int): Random size for cropping within the frame.
        - alpha (float): Random value for brightness and contrast adjustment between 0.7 and 1.3.
        - beta (int): Random value for brightness and contrast adjustment between -20 and 20.
    """
    # Random rotation between -15 and 15 degrees
    angle = np.random.uniform(0, 15)
    # Random horizontal mirroring
    flip = np.random.rand()
    # Random translation
    x_trans = np.random.randint(-20, 20)
    y_trans = np.random.randint(-20, 20)
    # Random zoom
    scale = np.random.uniform(0.8, 1.2)
    # Random cropping (with centralized region)
    crop_size = np.random.randint(0.8 * min(frame_width, frame_height), min(frame_width, frame_height))
    # Changes in brightness, contrast, and saturation
    alpha = np.random.uniform(0.7, 1.3)
    beta = np.random.randint(-20, 20)

    return angle, flip, x_trans, y_trans, scale, crop_size, alpha, beta


In [94]:
def augment_frame(frame, angle, flip, x_trans, y_trans, scale, crop_size, alpha, beta, target_size):
    """
    Apply various random transformations to an input image/frame.

    Args:
    - frame (numpy.ndarray): Input image/frame to be augmented.
    - angle (float): Angle for random rotation.
    - flip (float): Value for horizontal flipping (50% chance).
    - x_trans (int): Random translation along the x-axis.
    - y_trans (int): Random translation along the y-axis.
    - scale (float): Random zoom factor.
    - crop_size (int): Random size for cropping within the frame.
    - alpha (float): Value for brightness and contrast adjustment.
    - beta (int): Value for brightness and contrast adjustment.

    Returns:
    numpy.ndarray: Augmented image/frame after applying random transformations.
    """
    # Random rotation by an angle
    rows, cols, _ = frame.shape
    M = cv2.getRotationMatrix2D((cols / 2, rows / 2), angle, 1)
    frame = cv2.warpAffine(frame, M, (cols, rows))
    # Horizontal flipping
    if flip > 0.5:  # 50% chance of flipping
        frame = cv2.flip(frame, 1)
    # Random translation
    M = np.float32([[1, 0, x_trans], [0, 1, y_trans]]) # type: ignore
    frame = cv2.warpAffine(frame, M, (cols, rows)) # type: ignore
    # Random zoom
    frame = cv2.resize(frame, None, fx=scale, fy=scale)
    # Random cropping (with centralized region)
    x = int((rows - crop_size) / 2)
    y = int((cols - crop_size) / 2)
    frame = frame[x:x + crop_size, y:y + crop_size]
    # Changes in brightness, contrast, and saturation
    frame = cv2.convertScaleAbs(frame, alpha=alpha, beta=beta)
    # Resize frame back to height 150, width 150
    frame = cv2.resize(frame, target_size)

    return frame


In [95]:
def multiply_data(X, frames_per_video):
    X_temp = np.empty((len(X), frames_per_video, *target_size, 3), dtype=np.uint8)
    frame_height = X.shape[2]
    frame_width = X.shape[3]

    for i in range(len(X)):
        angle, flip, x_trans, y_trans,\
        scale, crop_size, alpha, beta = augment_frame_params(frame_height, frame_width)
        for j in range(frames_per_video):
            sampled_frame = X[i][j]
            aug_frame = augment_frame(sampled_frame,
                                      angle,
                                      flip,
                                      x_trans,
                                      y_trans,
                                      scale,
                                      crop_size,
                                      alpha,
                                      beta,
                                      target_size)
            X_temp[i][j] = aug_frame

    return X_temp


In [96]:
# Initialize a copy of preprocessed X and categoried y
X_aug = X_train.copy()
y_aug = y_cat_train.copy()

# Multiply dataset by defined param
for _ in range(number_of_augmentations):
    X_temp = multiply_data(X_train, frames_per_video)
    # Returns X_aug with shape (n * 219, 10, 150, 150, 3)
    X_aug = np.concatenate((X_aug, X_temp), axis=0)
    # Returns y_aug with shape (n * 219, 20)
    y_aug = np.concatenate((y_aug, y_cat_train), axis=0)


# 8) Testing output, generating videos and CSV #

In [97]:
if X_aug.shape == ((number_of_augmentations + 1) * len(X_train), frames_per_video, *target_size, 3):
    print(f'✅ X_aug has been initialized with Shape {X_aug.shape}!')
else:
    print('❌ X_aug has not been initialized properly!')

if y_aug.shape == ((number_of_augmentations + 1) * len(y_cat_train), n_classes):
    print(f'✅ y_aug has been initialized with Shape {y_aug.shape}!')
else:
    print('❌ y_aug has not been initialized properly!')

print()

if X_val.shape == (round(len(X) * (1 - train_size)), frames_per_video, *target_size, 3):
    print(f'✅ X_val has been initialized with Shape {X_val.shape}!')
else:
    print('❌ X_val has not been initialized properly!')

if y_cat_val.shape == (round(len(y_cat) * (1 - train_size)), n_classes):
    print(f'✅ y_cat_val has been initialized with Shape {y_cat_val.shape}!')
else:
    print('❌ y_cat_val has not been initialized properly!')


✅ X_aug has been initialized with Shape (84, 20, 480, 480, 3)!
✅ y_aug has been initialized with Shape (84, 3)!

✅ X_val has been initialized with Shape (18, 20, 480, 480, 3)!
✅ y_cat_val has been initialized with Shape (18, 3)!


In [98]:
# generate_processed_videos(X_aug)
# list_of_dataframes = [wlas_df, features_df, selected_df]
# generate_csv(list_of_dataframes)
