<a href="https://colab.research.google.com/github/abdokamel2001/ASL-Translation-Project/blob/main/2023-10-Sprint2-Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Microsoft Dataset Sign Language Model

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!cp -r "/content/drive/MyDrive/AI Team/Tasks/2023-10-Sprint2/MS-ASL" "/content" #Import the dataset

In [None]:
!cp -r "/content/drive/MyDrive/AI Team/Varying/Microsoft Dataset Encoded/filtered.json" "/content" #Pre-Filtered

In [None]:
!pip install -q mediapipe pytube

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m33.6/33.6 MB[0m [31m41.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import os
import cv2
import json
import time
import numpy as np
from tqdm import tqdm
import mediapipe as mp
import tensorflow as tf
from pytube import YouTube
import matplotlib.pyplot as plt
from IPython.core.display import clear_output

# Pick 50 Examples

In [None]:
with open('/content/MS-ASL/MSASL_train.json', 'r') as json_file:
    data = json.load(json_file)

selected_words = ["where", "hello", "thanks", "go", "stop", "here", "traffic", "good", "bad", "today"]

In [None]:
def check_link(url):
    try:
        yt = YouTube(url)
        yt.check_availability()
        if yt.age_restricted:   # doesn't work all the time
            return False
    except:
        return False
    else:
        return True

In [None]:
word_counts = {word: 0 for word in selected_words}
filtered_data = []

for dics in data:
    word = dics["text"]
    url = dics["url"].split('&')[0]

    if word in selected_words and check_link(url):
        if word_counts[word] < 5:
            filtered_data.append(dics)
            word_counts[word] += 1

In [None]:
word_counts

{'where': 5,
 'hello': 5,
 'thanks': 5,
 'go': 5,
 'stop': 5,
 'here': 5,
 'traffic': 5,
 'good': 5,
 'bad': 5,
 'today': 5}

In [None]:
with open('/content/filtered.json', 'w') as json_file:
    json.dump(filtered_data, json_file, indent=4)

#MediaPipe Functions

In [None]:
hands = mp.solutions.hands.Hands()
pose = mp.solutions.pose.Pose()
face_mesh = mp.solutions.face_mesh.FaceMesh()

In [None]:
def get_frame_landmarks(frame):
    """
    Extracts landmarks from a single video frame using MediaPipe.

    Args:
        frame: A single rgb frame/image.

    Returns:
        np.array: A NumPy array containing extracted landmarks.
        The output dimensions are (n, 3) array, where n is the number of landmarks.
        Each row in the array represents a landmark, and each landmark is represented
        as [x, y, z], where x, y, and z are the normalized coordinates of the landmark.
    """

    results_hands = hands.process(frame)
    results_pose = pose.process(frame)
    results_face = face_mesh.process(frame)

    landmarks_per_hand = 21
    landmarks_body_pose = 33
    landmarks_face = 468         # Max 468

    all_landmarks = np.zeros((landmarks_per_hand * 2 + landmarks_body_pose + landmarks_face, 3))

    if results_hands.multi_hand_landmarks:
        all_landmarks[:landmarks_per_hand, :] = np.array([(lm.x, lm.y, lm.z) for lm in results_hands.multi_hand_landmarks[0].landmark])
        if len(results_hands.multi_hand_landmarks) > 1:
            all_landmarks[landmarks_per_hand:landmarks_per_hand * 2, :] = np.array([(lm.x, lm.y, lm.z) for lm in results_hands.multi_hand_landmarks[1].landmark])

    if results_pose.pose_landmarks:
        all_landmarks[landmarks_per_hand * 2:landmarks_per_hand * 2 + landmarks_body_pose, :] = np.array([(lm.x, lm.y, lm.z) for lm in results_pose.pose_landmarks.landmark])

    if results_face.multi_face_landmarks:
        # all_landmarks[landmarks_per_hand * 2 + landmarks_body_pose:, :] = np.array([(lm.x, lm.y, lm.z) for lm in results_face.multi_face_landmarks[0].landmark[::468 // landmarks_face]])
        all_landmarks[landmarks_per_hand * 2 + landmarks_body_pose:, :] = np.array([(lm.x, lm.y, lm.z) for lm in results_face.multi_face_landmarks[0].landmark])

    return all_landmarks

In [None]:
def get_video_landmarks(video_path, start_frame=0, end_frame=-1, num_landmarks=543):
    """
    Extracts landmarks from a video by processing each frame in the video.

    Args:
        video_path (str): The file path to the video to process.
        start_frame (int): The index of the starting frame (default is 0).
        end_frame (int): The index of the ending frame (default is -1, meaning the last frame).

    Returns:
        np.array: A NumPy array where each row corresponds to the landmarks
        extracted from a single frame of the video within the specified frame range.
        The dimensions of the output array are (m, n, 3), where m is the number of frames
        within the specified range and n is the number of landmarks.
        Each element in the array is a 3D coordinate representing a landmark's position.
    """

    cap = cv2.VideoCapture(video_path)
    if end_frame < 0:
        end_frame = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if start_frame < 0:
        start_frame = 0

    all_frame_landmarks = np.zeros((end_frame - start_frame, num_landmarks, 3))
    frame_index = 0

    while cap.isOpened() and frame_index != end_frame:
        ret, frame = cap.read()
        if not ret:
            break
        if frame_index >= start_frame:
            frame.flags.writeable = False
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_landmarks = get_frame_landmarks(frame)
            all_frame_landmarks[frame_index - start_frame] = frame_landmarks

        frame_index += 1

    cap.release()

    return all_frame_landmarks

# Load the json data

In [None]:
with open("/content/filtered.json", 'r') as json_file:
    train_data = json.load(json_file)  # A list of dictionaries

# Define directories

In [None]:
video_dir = '/content/train-videos'
npy_dir = '/content/train-numpy'
os.makedirs(video_dir, exist_ok=True)
os.makedirs(npy_dir, exist_ok=True)

# Iterate through the data

In [None]:
downloaded_video_ids = set()

In [None]:
failed_video_ids = set()
try:
    for i in tqdm(range(len(train_data)), ncols=100):
        url = train_data[i]['url'].split('&')[0]
        video_id = url.split('=')[1]
        start = train_data[i]["start"]
        end = train_data[i]["end"]
        video_path = os.path.join(video_dir, f'{video_id}.mp4')

        label_dir = os.path.join(npy_dir, train_data[i]["text"])
        os.makedirs(label_dir, exist_ok=True)
        npy_path = os.path.join(label_dir, f'{i}.npy')
        if os.path.exists(npy_path): continue

        try:
            yt = YouTube(url)
            # stream = yt.streams.get_highest_resolution()
            stream = yt.streams.filter(file_extension='mp4').first()
            stream.download(output_path=video_dir, filename=f'{video_id}.mp4')
            video_landmarks = get_video_landmarks(video_path, start, end)
            np.save(npy_path, video_landmarks)
            os.remove(video_path)
            downloaded_video_ids.add(video_id)

        except Exception as e:
            print(f"\nError downloading {video_id}: {e}")
            if os.path.exists(video_path):
                os.remove(video_path)
            failed_video_ids.add(video_id)
            continue

        clear_output(True)

except KeyboardInterrupt:
    for f in os.listdir(video_dir):
        file_path = os.path.join(video_dir, f)
        if os.path.isfile(file_path):
            os.remove(file_path)
    print("\nLoading process interrupted by user.")

100%|███████████████████████████████████████████████████████████████| 50/50 [16:31<00:00, 19.83s/it]


In [None]:
print(f'Downloaded Videos ({len(downloaded_video_ids)}): {downloaded_video_ids}')
print(f'Failed Videos ({len(failed_video_ids)}): {failed_video_ids}')

Downloaded Videos (38): {'QUF1JHzBXhw', '2nXrJ_7NOgE', '6XcxbPfP5YQ', 'TwkGS9TjUX8', 'xKAgM2pSEDU', 'PygPDLrGBwE', '75BkNdtDsoQ', 'mC0lNJ6iz-s', 'f7COHRpmVKA', 'WprUBqi3iBc', 'Jwjs1LVxnmE', 'QB44Vddoi-w', '7XHOmZYiBew', 'K8c-np9zNT8', 'i6a81VVo-BM', '3zoqSvF0Z2A', '-LB4ENHxcIs', 'SC9lyDxbwUE', 'u0XAd3TkGaA', 'FVjpLa8GqeM', 'FCPZYdfdabA', 'z8e_-viWx9E', 'CSj7IScvZnE', 'wkxCnCMo7Mc', 'A5tZKVJ195U', 'hjS0dQDgbjo', 'P4QA138QqZc', 'BUhCGlNOqRA', '__lLQ3mhCvM', 'bq-HmgjGzmw', 'DOZJOFHs75s', 'rnr_aY0X0dQ', '7iuyJ84wvds', 'OmpKZvqoyjs', 'p36hZJQpLoQ', 'XtkDeYBnR8o', 'A84uvLUmCVU', 'k0T-yY_HrEQ'}
Failed Videos (0): set()


---

# OpenCV Function

In [None]:
def draw_landmarks(input_path, output_path, npy_file, start_frame=0, end_frame=-1):
    """
    Reads a video from the input file, overlays landmarks on each frame, and saves the result to an output video file.

    Args:
        input_path (str): The path to the input video file.
        output_path (str): The path to save the output video with overlaid facial landmarks.
        npy_file (str): The path to a NumPy file containing facial landmarks data for each frame.
        start_frame (int): The index of the starting frame for landmark overlay (default is 0).
        end_frame (int): The index of the ending frame for landmark overlay (default is -1, meaning the last frame).

    Description:
        This function reads a video from the input file, extracts facial landmarks data from a NumPy file,
        and overlays landmarks on each frame of the video. The frames within the specified range,
        from 'start_frame' (inclusive) to 'end_frame' (exclusive), are processed. Facial landmarks are drawn as
        red circles on the face, hands, and body in each frame. The output video is saved to the 'output_path'
        with the same resolution and frame rate as the input video.
    """

    cap = cv2.VideoCapture(input_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    # fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    landmarks_data = np.load(npy_file)
    frame_index = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_index >= start_frame and frame_index < end_frame:
            landmarks_frame = landmarks_data[frame_index - start_frame]
            landmarks = [(int(x * width), int(y * height)) for x, y, z in landmarks_frame]
            for x, y in landmarks:
                cv2.circle(frame, (x, y), 3, (0, 0, 255), -1)

        out.write(frame)
        frame_index += 1

    cap.release()
    out.release()

In [None]:
train_data[1]

{'org_text': 'stop ',
 'clean_text': 'stop',
 'start_time': 4.675,
 'signer_id': 144,
 'signer': -1,
 'start': 140,
 'end': 220,
 'file': 'stop - ASL sign for stop',
 'label': 358,
 'height': 360.0,
 'fps': 29.944,
 'end_time': 7.347,
 'url': 'https://www.youtube.com/watch?v=Jwjs1LVxnmE',
 'review': 1,
 'text': 'stop',
 'box': [0.0, 0.0, 1.0, 0.9790104627609253],
 'width': 480.0}

In [None]:
yt = YouTube(train_data[1]["url"].split('&')[0])
stream = yt.streams.get_highest_resolution()
stream.download(output_path='/content',filename='Original.mp4')

'/content/Original.mp4'

In [None]:
draw_landmarks('/content/Original.mp4', '/content/Edited.mp4', '/content/train-numpy/stop/1.npy', start_frame=140, end_frame=220)

---

# Model Training

In [None]:
!cp -r "/content/drive/MyDrive/AI Team/Varying/Microsoft Dataset Encoded" "/content/train-numpy"

---

# **Useful Shortcuts**

### Zip to Download

In [None]:
!zip -q -r /content/train-numpy.zip -j /content/train-numpy

### Unzip to Reload

In [None]:
!unzip -q /content/train-numpy.zip -d /content/train-numpy/

---

### Export to Drive

Folder

In [None]:
!cp -r "/content/train-numpy" "/content/drive/MyDrive/AI Team/Varying/Microsoft Dataset Encoded"

.npz file

In [None]:
!cp -r "/content/train-data.npz" "/content/drive/MyDrive/AI Team/Varying/Microsoft Dataset Encoded"

### Import from Drive

Folder

In [None]:
!cp -r "/content/drive/MyDrive/AI Team/Varying/Microsoft Dataset Encoded" "/content/train-numpy"

.npz file

In [None]:
!cp -r "/content/drive/MyDrive/AI Team/Varying/Microsoft Dataset Encoded/train-data.npz" "/content"

---

### Compress .npy to .npz

In [None]:
data_dict = {}

for filename in os.listdir(npy_dir):
    if filename.endswith('.npy'):
        key = filename.split('.')[0]
        data = np.load(os.path.join(npy_dir, filename), allow_pickle=True)
        data_dict[key] = data

np.savez_compressed('/content/train-data.npz', **data_dict)

### Decompress .npz to .npy

In [None]:
loaded_data = np.load('/content/train-data.npz', allow_pickle=True)

for key, data in loaded_data.items():
    np.save(os.path.join(npy_dir, f"{key}.npy"), data)

---