<a href="https://colab.research.google.com/github/ABHI2410/Project-6367/blob/main/machine_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Setup**

In [1]:
!pip install tqdm opencv-python einops mediapipe tensorflow==2.13.0

Collecting einops
  Downloading einops-0.6.1-py3-none-any.whl (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.2/42.2 kB[0m [31m687.5 kB/s[0m eta [36m0:00:00[0m
[?25hCollecting mediapipe
  Downloading mediapipe-0.10.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (33.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.5/33.5 MB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting tensorflow==2.13.0
  Downloading tensorflow-2.13.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (524.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m524.1/524.1 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
Collecting keras<2.14,>=2.13.1 (from tensorflow==2.13.0)
  Downloading keras-2.13.1-py3-none-any.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m31.1 MB/s[0m eta [36m0:00:00[0m
Collecting tensorboard<2.14,>=2.13 (from tensorflow==2.13.0)
  Downloadin

In [2]:
import os
import sys
import tqdm
import random
import pathlib
import itertools
import collections

import cv2
import einops
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

import mediapipe as mp

import tensorflow as tf
import keras
from keras import layers
from sklearn.model_selection import train_test_split
from sklearn import preprocessing


from google.colab import drive

In [3]:
drive.mount('/content/drive')
VIDEO_COUNT = 0

Mounted at /content/drive


## **Pre Processing Data**

In [4]:
class CustomLableEncoder():
    def __init__(self):
        self.label_encoder = preprocessing.LabelEncoder()
        self.df = pd.read_csv('/content/drive/MyDrive/ASL_Videos/how2sign_realigned_train.csv', sep= "	", header= 0)


    def transform(self, data_to_transform):
        text = []
        data = ""
        for index, row in self.df.iterrows():
            data = row['SENTENCE'][:-1].lower().split(' ')
            text = text + [i  for i in data if i not in text]
        self.label_encoder = preprocessing.LabelEncoder()
        self.label_encoder.fit(text)
        return self.label_encoder.transform(data_to_transform)

In [5]:
class KeyPointDetector():
    def __init__(self,path, frame_number):
        self.path = '/content/drive/MyDrive/ASL_Videos/'
        self.holistic = mp.solutions.holistic
        self.output_path = path.split('.')[0]
        self.file_name = frame_number

    def extract_keypoints(self,results):
        pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
        face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
        lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
        rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
        final_data = np.concatenate([pose, face, lh, rh])
        np.save(str(self.path)+'/processed_data/'+str(self.file_name), final_data)
        return final_data


    def keypoints(self,image):
        mediapipe_holistic = self.holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)
        image.flags.writeable = False
        outcome = mediapipe_holistic.process(image)
        image.flags.writeable = True
        mediapipe_holistic.close()
        return self.extract_keypoints(outcome)



In [16]:
class FrameGenerator():
    df = pd.read_csv('/content/drive/MyDrive/ASL_Videos/how2sign_realigned_train.csv', sep= "	", header= 0)
    video_path = '/content/drive/MyDrive/ASL_Videos/raw_videos/'

    def get_video_and_label(self):
        encoder = CustomLableEncoder()
        labels = list()
        videos = list()
        for index, row in self.df.iterrows():
            data = row['SENTENCE'][:-1].lower().split(' ')
            video = row['SENTENCE_NAME'] + ".mp4"
            labels.append(encoder.transform(data))
            if os.path.exists(self.video_path + video):
                videos.append(self.video_path + video)
            else:
                print("Video not found", video)
                exit()
        return videos, labels


    def __call__(self):
        video_path, labels = self.get_video_and_label()
        data_lable_pair = list(zip(video_path, labels))

        random.shuffle(data_lable_pair)
        for path,lable in data_lable_pair:
            video_frames = []
            cap = cv2.VideoCapture(path)
            frame_number = 0
            VIDEO_COUNT += 1
            print(VIDEO_COUNT)
            while cap.isOpened():
                ret, frame = cap.read()
                landmarks = KeyPointDetector()
                video_frames.append(landmarks.keypoints(frame,path,frame_number))
                frame_number += 1
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break
            cap.release()
            cv2.destroyAllWindows()

            yield video_frames, lable




In [7]:
batch_size = 100

output_signature = (tf.TensorSpec(shape = (1662), dtype = tf.float32),
                    tf.TensorSpec(shape = (), dtype = tf.int16))
train_ds = tf.data.Dataset.from_generator(FrameGenerator(), output_signature= output_signature)

train_ds = train_ds.batch(batch_size)


In [8]:
HEIGHT = 720
WIDTH = 1280

In [9]:
class Conv3Dim(keras.layers.Layer):
    def __init__(self, filters, kernel_size, padding):
        super().__init__()
        self.seq = keras.Sequential([
            layers.Conv3D(filters = filters,
                          kernel_size = (1,kernel_size[1],kernel_size[2]),
                          padding=padding),
            layers.Conv3D(filters = filters,
                          kernel_size = (kernel_size[0],1,1),
                          padding=padding),
        ])

    def call(self, inputs):
        return self.seq(inputs)

In [10]:
class ResidualMain(keras.layers.Layer):

  def __init__(self, filters, kernel_size):
    super().__init__()
    self.seq = keras.Sequential([
        Conv3Dim(filters=filters,
                    kernel_size=kernel_size,
                    padding='same'),
        layers.LayerNormalization(),
        layers.ReLU(),
        Conv3Dim(filters=filters,
                    kernel_size=kernel_size,
                    padding='same'),
        layers.LayerNormalization()
    ])

  def call(self, inputs):
    return self.seq(inputs)

In [11]:
class Project(keras.layers.Layer):
  def __init__(self, units):
    super().__init__()
    self.seq = keras.Sequential([
        layers.Dense(units),
        layers.LayerNormalization()
    ])

  def call(self, input):
    return self.seq(input)

In [12]:
def add_residual_block(input, filters, kernel_size):
  out = ResidualMain(filters,
                     kernel_size)(input)

  res = input
  if out.shape[-1] != input.shape[-1]:
    res = Project(out.shape[-1])(res)

  return layers.add([res, out])

In [13]:
class ResizeVideo(keras.layers.Layer):
  def __init__(self, height, width):
    super().__init__()
    self.height = height
    self.width = width
    self.resizing_layer = layers.Resizing(self.height, self.width)

  def call(self, video):
    # b stands for batch size, t stands for time, h stands for height,
    # w stands for width, and c stands for the number of channels.
    old_shape = einops.parse_shape(video, 'b t h w c')
    images = einops.rearrange(video, 'b t h w c -> (b t) h w c')
    images = self.resizing_layer(images)
    videos = einops.rearrange(
        images, '(b t) h w c -> b t h w c',
        t = old_shape['t'])
    return videos

In [14]:
input_shape = (None, 10, HEIGHT, WIDTH, 3)
input = layers.Input(shape=(input_shape[1:]))
x = input

x = Conv3Dim(filters=16, kernel_size=(3, 7, 7), padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = ResizeVideo(HEIGHT // 2, WIDTH // 2)(x)

# Block 1
x = add_residual_block(x, 16, (3, 3, 3))
x = ResizeVideo(HEIGHT // 4, WIDTH // 4)(x)

# Block 2
x = add_residual_block(x, 32, (3, 3, 3))
x = ResizeVideo(HEIGHT // 8, WIDTH // 8)(x)

# Block 3
x = add_residual_block(x, 64, (3, 3, 3))
x = ResizeVideo(HEIGHT // 16, WIDTH // 16)(x)

# Block 4
x = add_residual_block(x, 128, (3, 3, 3))

x = layers.GlobalAveragePooling3D()(x)
x = layers.Flatten()(x)
x = layers.Dense(10)(x)

model = keras.Model(input, x)

In [None]:
frames, label = next(iter(train_ds))
model.build(frames)

In [None]:
keras.utils.plot_model(model, expand_nested=True, dpi=60, show_shapes=True)

In [None]:
model.compile(loss = keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              optimizer = keras.optimizers.Adam(learning_rate = 0.0001),
              metrics = ['accuracy'])

In [None]:
history = model.fit(x = train_ds,
                    epochs = 10,
                    validation_data = val_ds)