This *.ipynb* file is used to 
1. create
2. modify
3. manage

the *.py* files within the /src directory.

In [1]:
import os

# defining paths
root_dir= os.path.dirname(os.getcwd())
src_dir= os.path.join(root_dir, 'src')
data_dir= os.path.join(root_dir, 'data')

print(f'project_directory: {root_dir}')
print(f'src_directory: {src_dir}')

src_dir= src_dir.replace('\\', '/')

project_directory: C:\Users\sadeg\OneDrive\Desktop\Thesis\python_codes\SignLanguageProject
src_directory: C:\Users\sadeg\OneDrive\Desktop\Thesis\python_codes\SignLanguageProject\src


# making \_\_init\_\_.py file

In [2]:
%%writefile $src_dir/__init__.py
# Explanation: This file will mark the source directory as a python package

Overwriting C:/Users/sadeg/OneDrive/Desktop/Thesis/python_codes/SignLanguageProject/src/__init__.py


# making prepare\_datasets.py file

In [17]:
%%writefile $src_dir/prepare_datasets.py
# Explanation: This python file contains functions that are used to extract landmarks from LSA64, AUTSL40 and WLASL100 datasets.

#------------------------------------------------------------------------------Import--------------------------------------------------------------------------

# importing libraries for working with directories of the libreries
import os
from pathlib import Path
from natsort import natsorted 
# importing OpenCV and Mediapipe to read videos and extract landmarks
import cv2                                                                         
import mediapipe as mp  
# importing numpy to work with arrays
import numpy as np                                                                                                                           
# importing tqdm for progression bar and typing for writing input types for each function                                                      
from tqdm.auto import tqdm                                                         
from typing import Callable, List

#------------------------------------------------------------------constant variables--------------------------------------------------------------------------
# A list for all class names in AUTSL 40
autslclass_names = ["sister", "hurry", "hungry", "enjoy_your_meal", "brother", "tree", "heavy", "cry", "family", "wise", "unwise", "kin", "shopping", "key",
                    "mother", "friend", "ataturk", "shoe", "mirror", "same", "father", "garden", "look", "honey", "glass", "flag", "feast", "baby", "single",
                    "wait", "I", "petrol", "together", "inform", "we", "work", "wednesday", "fork", "tea", "teapot"]

# A list for all class names in LSA64
lsa64class_names= ['Opaque', 'Red', 'Green', 'Yellow', 'Bright', 'Light-blue', 'Colors', 'Pink', 'Women', 'Enemy', 'Son', 'Man', 'Away', 'Drawer', 'Born',
                   'Learn', 'Call', 'Skimmer', 'Bitter', 'Sweet milk', 'Milk', 'Water', 'Food', 'Argentina', 'Uruguay', 'Country', 'Last name', 'Where',
                   'Mock', 'Birthday', 'Breakfast', 'Photo', 'Hungry', 'Map', 'Coin', 'Music', 'Ship', 'None', 'Name', 'Patience','Perfume', 'Deaf', 'Trap',
                   'Rice', 'Barbecue', 'Candy', 'Chewing-gum', 'Spaghetti', 'Yogurt', 'Accept', 'Thanks', 'Shut down', 'Appear', 'To land', 'Catch', 'Help',
                   'Dance', 'Bathe', 'Buy', 'Copy', 'Run', 'Realize', 'Give', 'Find']

# A list for all class names in WLALS100
wlasl100class_names = ["accident", "africa", "all", "apple", "basketball", "bed", "before", "bird", "birthday", "black", "blue", "book", "bowling", "brown",
                       "but", "can", "candy", "chair", "change", "cheat", "city", "clothes", "color", "computer", "cook", "cool", "corn", "cousin", "cow",
                       "dance", "dark", "deaf", "decide", "doctor", "dog", "drink","eat", "enjoy", "family", "fine", "finish", "fish", "forget", "full",
                       "give", "go", "graduate", "hat", "hearing", "help", "hot", "how", "jacket", "kiss", "language", "last", "later", "letter", "like",
                       "man", "many", "medicine", "meet", "mother", "need", "no", "now", "orange", "paint", "paper", "pink", "pizza", "play", "pull", "purple",
                       "right","same", "school", "secretary", "shirt", "short", "son", "study", "table", "tall", "tell", "thanksgiving", "thin", "thursday",
                       "time", "walk", "want", "what", "white", "who", "woman", "work", "wrong", "year", "yes"]

top_35_indexes= [2, 3, 4, 7, 9, 10, 11, 13, 15, 20, 22, 27, 29, 35, 36, 42, 48, 50, 53, 54, 58, 59, 62, 63, 66, 67, 69, 77, 81, 82, 90, 93, 94, 95, 99]
wlasl35class_names= [wlasl100class_names[i] for i in top_35_indexes]
#--------------------------------------------------------------------Getting landmarks--------------------------------------------------------------------------
# function to get landmarks from LSA64 or AUTSL40 dataset.
def get_landmarks(root: str,
                  class_names: List[str],
                  frame_numbers: int):
    """
    This function retrieves all video paths from the dataset directory. Then the function analysis videos frame by frame and extract landmark. Finally the
    function is able to assigne each video, an array of detected landmarks. depending on the datas, the function also uses the title of each video to assign
    labels to them by using a dictionary.
    Args:
        root: Path to where dataset is located.
        class_names: List of all words in the dataset.
        frame_numbers: number of frames we want to take from the each video in the dataset.
    Returns:
        detections, labels, len(all_video_paths),len(none_cv2_video_paths) where:
        detections: is a list of all mediapipe landmarks that were detected from all videos.
        labels: is a list of labels corresponding to each video detection.
        len(all_video_paths): is the number of videos in the dataset.
        none_cv2_video_paths" is a list of videos that OpenCV was not able to open.
    Example use:
        results= get_landmarks_LSA64(root= root, class_names= lsa64class_names, frame_numbers= 30)
        detections, labels, num_all_videos, none_cv2_video_paths= results[0], results[1], results[2], results[3]
    """
    labels= []                       # a list to store video labels
    detections= []                   # a list to store all video detections
    none_cv2_video_paths= []         # a list to store video paths that cv2 can't capture
    
    all_video_paths= Path(root).glob("**/*.mp4")                           # a list to store all video paths in the dataset
    all_video_paths= [str(path) for path in all_video_paths]               # changing path objects to strings since natosrt works with strings
    all_video_paths= natsorted(all_video_paths)                            # sorted
    vid_idx_to_label= {i:label for i, label in enumerate(class_names)}     # this mapping is used to change the video titles to labels
    
    with mp.solutions.holistic.Holistic(min_detection_confidence= 0.5, min_tracking_confidence=0.5) as holistic:
        for video_path in tqdm(all_video_paths, desc="Processing videos"):
            cap = cv2.VideoCapture(video_path)                             # capture each video using OpenCV
            if not cap.isOpened():                                         # if OpenCV can't capture the video path
                none_cv2_video_paths.append(video_path)                    # add the video path to none_cv2_video_paths
            else:                                                                                                      
                video_detections= []                                                                     # a list to store video detections
                total_frames_number= cap.get(cv2.CAP_PROP_FRAME_COUNT)                                   # getting total number of frames from a video
                total_frames_number = int(total_frames_number)                                           # changing float to integer   
                frame_idxs_to_process = np.linspace(0, total_frames_number-1, frame_numbers, dtype=int)  # picking desiered frame indexes
                
                for idx in frame_idxs_to_process:
                    cap.set(cv2.CAP_PROP_POS_FRAMES, idx)                             # set the video to the desired frame index
                    ret, frame= cap.read()                                            # reading the frame 
                    result= holistic.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))  # processing the frame (Mediapipe works with RGB)
                    pose,face,lh, rh= get_frame_detections(result)                    # turning results into flattened arrays
                    frame_detection= np.concatenate((pose,face,lh, rh))  
                    video_detections.append(frame_detection)                          # storing the frame detection in the video detection list
                    
                if class_names== autslclass_names:    # for AUTSL
                    video_idx= int(os.path.basename(os.path.dirname(video_path))) # extract video index from the video folder
                    label= vid_idx_to_label[video_idx]                            # map the video index to a label
                    
                elif class_names== lsa64class_names:  # for LSA64
                    video_idx= int(os.path.basename(video_path).split('_')[0])    # extract index from the video title: 001_004_003 -> 1
                    label= vid_idx_to_label[video_idx-1]                          # map the index to the correct label
                
                labels.append(label)
                detections.append(video_detections) 
   
            cap.release()
        
    return detections, labels, len(all_video_paths), none_cv2_video_paths

#----------------------------------------------------------------Helping functions--------------------------------------------------------------------------
def get_frame_detections(result):
    '''
    This function turns the result objects obtianed with mediapipe into flattened numpy arrays
    '''
    pose= np.array([[res.x, res.y, res.z, res.visibility] for res in result.pose_landmarks.landmark]).flatten() if result.pose_landmarks else np.zeros(33*4) 
    face= np.array([[res.x, res.y, res.z] for res in result.face_landmarks.landmark]).flatten() if result.face_landmarks else np.zeros(468*3) 
    lh= np.array([[res.x, res.y, res.z] for res in result.left_hand_landmarks.landmark]).flatten() if result.left_hand_landmarks else np.zeros(21*3)
    rh= np.array([[res.x, res.y, res.z] for res in result.right_hand_landmarks.landmark]).flatten() if result.right_hand_landmarks else np.zeros(21*3)
    return pose, face, lh, rh
    

def get_frame_coordinates(result, frame):
    '''
    This function turns the result objects to a list of coordinates
    '''
    p_co= [(int(r.x * frame.shape[1]), int(r.y * frame.shape[0])) for r in result.pose_landmarks.landmark] if result.pose_landmarks else [(0,0)]*33 
    f_co= [(int(r.x * frame.shape[1]), int(r.y * frame.shape[0])) for r in result.face_landmarks.landmark] if result.face_landmarks else [(0,0)]* 468 
    l_co= [(int(r.x * frame.shape[1]), int(r.y * frame.shape[0])) for r in result.left_hand_landmarks.landmark] if result.left_hand_landmarks else [(0,0)]*21
    r_co= [(int(r.x * frame.shape[1]), int(r.y * frame.shape[0])) for r in result.right_hand_landmarks.landmark] if result.right_hand_landmarks else [(0,0)]*21
    return p_co, f_co, l_co, r_co

#---------------------------------------------------------------Additional functions------------------------------------------------------------------------
# (!!!!!Since WLASL 100 was excluded these functions are not used in the main ipynb files. !!!!) nevertheless they are working and were developed by me
# function to get landmarks from WLASL100 dataset. 
def get_landmarks_WLASL100(root: str,
                           class_names: List[str],
                           frame_numbers: int):
    """
    This function retrieves all video paths from the WLSA100 directory. Then the function analysis videos frame by frame and extract landmark. Since some of 
    the videos have faulty frames. it checks for before and after frames first. incase those are faulty as well it puts an empty list for that frame of the
    video. Finally the function is able to assigne each video, an array of detected landmarks and a label.    
    Args:
        root: Path to video WLASL100 dataset directory.
        class_names: List of all words in the dataset.
        frame_numbers: number of frames we want to take from the entire video.
   Returns:
        detections, labels, len(all_video_paths),len(none_cv2_video_paths) where:
        detections: is a list of all mediapipe landmarks that were detected from all videos.
        labels: is a list of labels corresponding to each video detection.
        len(all_video_paths): is the number of videos in the dataset.
        none_cv2_video_paths" is a list of videos that OpenCV was not able to open.       
    Example use:
        results= get_landmarks_WLASL100(root= root, class_names= class_names frame_numbers= 30)
        detections, labels, num_all_videos, none_cv2_video_paths= results[0], results[1], results[2], results[3]
    """
    labels= []                    # a list to store video labels
    detections= []                # a list to store all video detections
    none_cv2_video_paths= []      # a list to store video paths that cv2 can't capture
    
    all_video_paths= Path(root).glob("**/*.mp4")                        # a list to store all video paths in the dataset
    all_video_paths= [str(path) for path in all_video_paths]            # changing path objects to strings since natosrt works with strings
    all_video_paths= natsorted(all_video_paths)                         # sorted
    vid_idx_to_label= {i:label for i, label in enumerate(class_names)}  # this mapping is used to change the video titles to labels
    
    with mp.solutions.holistic.Holistic(min_detection_confidence= 0.5, min_tracking_confidence=0.5) as holistic:
        for video_path in tqdm(all_video_paths, desc="Processing videos"):
            cap = cv2.VideoCapture(video_path)              # capture each video using Opencv
            if not cap.isOpened():                          # if OpenCV can't capture the video
                none_cv2_video_paths.append(video_path)     # add the video path to none_cv2_video_paths list
            else:
                video_detections= []
                total_frames_number= cap.get(cv2.CAP_PROP_FRAME_COUNT)                                     # getting total number of frames from a video
                total_frames_number = int(total_frames_number)                                             # changing float to integer   
                frame_idxs_to_process = np.linspace(0, total_frames_number - 1, frame_numbers, dtype= int) # picking desiered frame indexes
                
                for idx in frame_idxs_to_process:
                    cap.set(cv2.CAP_PROP_POS_FRAMES, idx) # set the video to the desired frame index
                    ret, frame= cap.read()                # read the frame
                    if not ret:                           # if the frame was "unreadable".
                        print(f"Failed to grab frame {idx}, of video {video_path} of length {total_frames_number} frames. trying adjacent frames...")
                        cap.set(cv2.CAP_PROP_POS_FRAMES, idx - 1)      # set video to previous frame
                        ret, frame = cap.read()                        # read the frame
                        if not ret:                                    # if previous was also "unreadable"
                            cap.set(cv2.CAP_PROP_POS_FRAMES, idx + 1)  # set the video to next frame
                            ret, frame = cap.read()                    # read frame
                            
                    if not ret:                           # if the return value is still False
                        print(f"Unable to retrieve any frames around index {idx}, of video {video_path} of length {total_frames_number} frames.")
                        frame_detection= []               # we add empty detection that will be filled later, using interpolation
                        video_detections.append(frame_detection)
                        continue
                                
                    result= holistic.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                    pose,face,lh, rh= get_frame_detections(result)       # turning results into flattened arrays
                    frame_detection= np.concatenate((pose,face,lh, rh))   
                    video_detections.append(frame_detection)             # storing the frame detection in the video detection list

                video_idx= int(os.path.basename(os.path.dirname(video_path))) # extract video index from the folder
                label= vid_idx_to_label[video_idx-1]                          # map the video index to a label
                detections.append(video_detections)    
                labels.append(label)
       
            cap.release()
            
        return detections, labels, len(all_video_paths), none_cv2_video_paths

# function to interpolate two frames of a video and fill in the bad frame.
def interpolate_frame_detections(most_recent_detection, next_coming_detection, alpha):
    """
    Based on the value of most recent detection and next coming detection which are the frames before and after our faulty frame returns a landmark array for
    the faulty frame.
    Args:
        most_recent_detection: landmarks detected in previous frame.
        next_coming_detection: landmarks detected in the next frame.
        alpha: interpolation factor. 
    Returns:
        either: (1 - alpha) * most_recent_detection + alpha * next_coming_detection
        or: next_coming_detection
        or: most_recent_detection
    Example use:
        video_detection[i]= interpolate_frames(most_recent_detection, next_coming_detection, 0.5)
    """
    if most_recent_detection is None and next_coming_detection is not None:             # first to nth frames are all corrupt
        return next_coming_detection
    elif most_recent_detection is not None and next_coming_detection is None:           # nth to last frames are all corrupt
        return most_recent_detection
    else:
        return (1 - alpha) * most_recent_detection + alpha * next_coming_detection 

# function to fill the empty detections in the videos using interpolation
def fill_empty_detections(detections):
    """
    In principle fills up the empty landmark detections for frames that where faulty in the dataset and returns the dataset.
    Args:
        detections: all video detections from mediapipe
    Returns:
        detections (with no empty landmark frame)
    Example use: 
        detections= fill_empty_detections(detections)
    """
    for video_detection in detections:
        most_recent_detection= None
        for i in range(len(video_detection)):
            if len(video_detection[i]) != 0:
                most_recent_detection= video_detection[i]
            else:
                next_coming_detection= None
                for j in range(i+1, len(video_detection)):
                    if len(video_detection[j]) != 0:
                        next_coming_detection= video_detection[j]
                        break
                    else:
                        continue
                     
                video_detection[i]= interpolate_frame_detections(most_recent_detection, next_coming_detection, 0.5)
                most_recent_detection= video_detection[i]

    return detections


Overwriting C:/Users/sadeg/OneDrive/Desktop/Thesis/python_codes/SignLanguageProject/src/prepare_datasets.py


# making preprocess_utils.py file

In [28]:
%%writefile $src_dir/preprocess_utils.py
#Explanation: This python file contains functions for preprocessing our data.
#------------------------------------------------------------------------------Import--------------------------------------------------------------------------

# importing libraries for preprocessing data
import random 
import numpy as np
import torch
from collections import defaultdict
from sklearn.model_selection import train_test_split

#importing tqdm for progression bar and typing and numpy.typing for writing input types for each function
from tqdm.auto import tqdm 
from typing import List, Tuple
from numpy.typing import NDArray

#-----------------------------------------------------------------------Interpolation--------------------------------------------------------------------------
# function to interpolate 2 video detections
def interpolate_video_detections(video_detection_1: NDArray[np.float64], 
                                 video_detection_2: NDArray[np.float64], 
                                 frame_structure: List[Tuple[int, int]],
                                 alpha: float):
    """
    This function gets two video detection arrays and based interpolates them frame by frame. to make correct interpolations the function
    first checks , if both frames contain same body parts.
    Args:
        video_detection_1: First video detection array.
        video_detection_2: Second video detection array.
        frame_structure: represents the start and end index for each landmark class: pose, face, lh, rh.
        alpha: interpolation factor
    Returns:
        an array that is the interpolation of the two input video detections:
        inter_vid_detection
    Example usage: 
        inter_vid_detection = interpolate_video_detections(video_detection_1= v1, video_detection_2= v2, frame_structure= frame_structure, alpha= 0.5)
    """
    num_frames = video_detection_1.shape[0]                # number of frames that will be interpolated
    inter_vid_detection= np.zeros_like(video_detection_1)  # zero array for storing interpolated values
    for i in range(num_frames):
        frame_detection_1= video_detection_1[i]             
        frame_detection_2= video_detection_2[i]             
        inter_frame_detection= np.zeros_like(frame_detection_1) # stores interpolated frame
        
        for (start, end) in frame_structure:
            bodypart1= frame_detection_1[start:end]    # body part in frame
            bodypart2= frame_detection_2[start:end]    # body part in frame
    
            if np.all(bodypart1 == 0) and np.all(bodypart2 == 0):       # if the body part does not exist in both frames
                inter_frame_detection[start:end] = np.zeros(end- start) # put zero    

            elif np.all(bodypart1 == 0):                                # if body part 1 does not exist                   
                inter_frame_detection[start:end] = bodypart2            # put bodypart 2
            
            elif np.all(bodypart2 == 0):                                # if body part 2 does not exist                     
                inter_frame_detection[start:end] = bodypart1            # put bodypart 1
            
            else:  # if both exists then we interpolate
                inter_frame_detection[start:end]= (1 - alpha) * bodypart1 + alpha * bodypart2
                
        inter_vid_detection[i]= inter_frame_detection 
    return inter_vid_detection

# function to apply the interpolation to the entire dataset
def interpolate_dataset(detections: NDArray[np.float64],
                        labels: List[str],
                         alpha: float= 0.5,
                         noise_level: float= 0.001):
    """
    This function applies interpolation accross the entire dataset. It only interpolates between videos that have the same label. 
    Args:
        detections: array of all video detections from LSA64 or WLASL100 dataset
        labels: list of all video labels in the dataset
        alpha: interpolation factor.
        num_interpolations_samples: number of interpolated samples that should be produced for each label
    Returns:
        a tuple of (np.array(x), y) where np.array(x) is the detections and y is the labels
    Example usage:
        detections, labels = interpolate_dataset(detections, labels, alpha= 0.5, min_interpolations= 13)
    """
    current_data= defaultdict(list)                 # stores current data
    interpolated_data= defaultdict(list)            # stores interpolated data
    
    frame_structure= [(0, 132), (132, 1536), (1536, 1599), (1599, 1662)]  # represents the indexes of the concatenated pose, face, lh, rh
    
    x = []  #stores augmented detections
    y = []  #stores augmented labels

    # making a dictionary where key is label and value is list of all videos with same label
    for idx, label in enumerate(labels):
        current_data[label].append(detections[idx])

    # for each label, finding all video pair combinations:
    for label, video_detections in current_data.items():
        pairs= []
        for i in range(len(video_detections)):
            for j in range(i+1, len(video_detections)):
                pairs.append((i, j))
        # randomly select a number of pairs equal to the number of samples that are available for that label
        selected_pairs = random.sample(pairs, len(video_detections))
        # interpolating the randomly selected pairs
        for (i, j) in selected_pairs:
            video_detection_1= video_detections[i]
            video_detection_2= video_detections[j]
            
            inter_vid_detection = interpolate_video_detections(video_detection_1, video_detection_2, frame_structure, alpha) #interpolate
            # adding random gaussian noise
            noise = np.random.normal(0, noise_level, inter_vid_detection.shape[1:])  
            noisy_interpolated = np.clip(inter_vid_detection + noise, 0.001, 0.999)
            # adding the new sample under the label it belongs to
            interpolated_data[label].append(noisy_interpolated)
            
    # add video detections of both current and interpolated data together 
    for label in current_data:
        original_videos = current_data[label]  # Original samples
        interpolated_videos = interpolated_data[label]  # Interpolated samples

        combined_videos = original_videos + interpolated_videos
        sampled_videos = random.sample(combined_videos, len(original_videos))  # Randomly pick samples so that the original number of samples is preserved

        for video_detection in sampled_videos:
            x.append(video_detection)
            y.append(label)

    return np.array(x), y

#-------------------------------------------------------------------------Split Data--------------------------------------------------------------------------
#function to convert detections and labels to the right format for training
def convert(detections: NDArray[np.float64],
            labels: List[str],
            class_names: List[str]):
    """
    This function maps our Labels to numbers so that they are prepared for the training phase (ex: it maps the label "Red" to number 1). It also changes the
    detections from float64 to float32. since float64 would generate errors when training.
    Args:
        detections: array of all video detections
        label: labels for each video detection
        class_names: list of all class names withing the dataset. it is used to make a dictionray that converts labels to numbers.
    Returns:
        a tuple of (X, y) where X is our features/ detections and has type tensor float 32 and y is our label and has type long.
    Example use:
        X, y= convert(detections= detections, labels= labels, class_names= wlasl100class_names)
    """
    label_to_number= {label: num for num, label in enumerate(class_names)} # used for mapping the labels to numbers
    X= torch.tensor(detections, dtype=torch.float32)
    y= [label_to_number[label] for label in labels]                        # a list that has all the labels but in number format
    y= torch.tensor(y, dtype=torch.long)    
    
    return X, y

# fuction that splits the dataset for training
def split_dataset(detections: NDArray[np.float64],
                  labels: List[str],
                  class_names: List[str],
                  test_size: float):
    """
    This function splits the dataset and converts them so that they are suitable for training process. 
    Args:
        detections: video detections for the entire dataset.
        labels: list of all video labels for the entire dataset.
        class_names: list of all class names in the dataset
        test_size: determines how data should be splitted    
    Returns:
        a tuple of (X_train, X_test, y_train, y_test) 
    Example usage:
        xtrain, xtest, ytrain, ytest= split_dataset(detections, labels, class_names, 0.2)
    """
    X_train, X_test, y_train, y_test = train_test_split(detections, labels, test_size= test_size, random_state= 42, stratify=labels)
    X_train, y_train= convert(X_train, y_train, class_names)
    X_test, y_test= convert(X_test, y_test, class_names)
    
    return X_train, X_test, y_train, y_test


Overwriting C:/Users/sadeg/OneDrive/Desktop/Thesis/python_codes/SignLanguageProject/src/preprocess_utils.py


# making models.py

In [5]:
%%writefile $src_dir/models.py
# Explanation: this file contains classes that are used to make the LSTM and transformer model variations.
#---------------------------------------------------------------------------------Import-----------------------------------------------------------------------

import torch 
from torch import nn
import math
# importing typing for writing function input types
from typing import List, Callable

#-----------------------------------------------------------------Functions for building transformer-----------------------------------------------------------
#normal positional encoding
class PositionalEncoding(nn.Module):
  def __init__(self, d_model, seq_len):
    super().__init__()

    pe = torch.zeros(seq_len, d_model)
    position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float) * -(math.log(10000.0)/d_model))
    pe[:, 0::2] = torch.sin(position*div_term)
    pe[:, 1::2] = torch.cos(position*div_term)
    self.register_buffer('pe', pe.unsqueeze(0))

  def forward(self, x):
    return x + self.pe[:, :x.shape[1]]

#multihead attention layer
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, num_heads):
    super().__init__()
    assert d_model % num_heads == 0, "d_model should be divisible by num_heads"
    self.d_model = d_model
    self.num_heads = num_heads
    self.d_k = d_model // num_heads

    self.w_q = nn.Linear(d_model, d_model)
    self.w_k = nn.Linear(d_model, d_model)
    self.w_v = nn.Linear(d_model, d_model)
    self.w_o = nn.Linear(d_model, d_model)

  def scaled_dot_product_attention(self, Q, K, V):
    attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
    attn_probs = torch.softmax(attn_scores, dim=1)
    output = torch.matmul(attn_probs, V)
    return output

  def split_heads(self, x):
    batch_size, seq_len, d_model = x.shape
    return x.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)

  def combine_heads(self, x):
    batch_size, num_heads, seq_len, d_k = x.shape
    return x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)

  def forward(self, Q, K, V):
    Q = self.split_heads(self.w_q(Q))
    K = self.split_heads(self.w_k(K))
    V = self.split_heads(self.w_v(V))

    attn_output = self.scaled_dot_product_attention(Q, K, V)
    output = self.w_o(self.combine_heads(attn_output))
    return output

# feed forward layer
class PositionWiseFeedForward(nn.Module):
  def __init__(self, d_model, d_ff):
    super().__init__()
    self.fc1 = nn.Linear(d_model, d_ff)
    self.fc2 = nn.Linear(d_ff, d_model)
    self.relu = nn.ReLU()

  def forward(self, x):
    return self.fc2(self.relu(self.fc1(x)))

# encoder
class EncoderLayer(nn.Module):
  def __init__(self, d_model, num_heads, d_ff, dropout):
    super().__init__()
    self.self_attn = MultiHeadAttention(d_model, num_heads)
    self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    attn_output = self.self_attn(x, x, x)
    x = self.norm1(x + self.dropout(attn_output))
    ff_output = self.feed_forward(x)
    x = self.norm2(x + self.dropout(ff_output))
    return x

#------------------------------------------------------------------Transformer Models---------------------------------------------------------------------------
# encoder based transformer model for classification (This parent class has no positional encoding)
# PE is added to the inherited classes so the code is more clean and clear to read 
class Transformer(nn.Module):
    def __init__(self, class_names: List[str], seq_len: int, d_model: int, nhead: int, d_ff: int = 2048, num_layers: int = 2, dropout: float = 0.1):
        """
        Transformer model for sign language classification
        Parameters:
            class_names : list of all the classes in the dataset.
            seq_len : length of input sequences-> corresponds to frame numbers in a video sample.
            d_model : dimention of the model inputs (number of features).
            nhead : the number of attention heads in the multi-head attention layer.
            d_ff : the dimension of the feedforward network.
            num_layers: the number of layers in the Transformer encoder. Default is 2.
            dropout : the dropout probability.
        """
        super().__init__()
        self.model_type = 'transformer' # this is used in the training to save some of the resutls in the correct directory for the model
        self.class_names = class_names
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, nhead, d_ff, dropout) for i in range(num_layers)])
        self.classifier = nn.Linear(in_features=d_model, out_features=len(self.class_names))
        
    def forward(self, src: torch.Tensor):
        output = src
        for encoder_layer in self.encoder_layers:
            output = encoder_layer(output)
            
        output = torch.mean(output, dim=1)
        output = self.classifier(output)
        return output

# encoder based transformer model for classification, with positional encoding
class PETransformer(Transformer):
    def __init__(self, class_names: List[str], seq_len: int, d_model: int, nhead: int, d_ff: int = 2048, num_layers: int = 2, dropout: float = 0.1):
        super().__init__(class_names, seq_len, d_model, nhead, d_ff, num_layers, dropout)
        self.model_type = 'PEtransformer'
        self.positional_encoding = PositionalEncoding(d_model, seq_len)

    def forward(self, src: torch.Tensor):
        output = self.positional_encoding(src)
        for encoder_layer in self.encoder_layers:
            output = encoder_layer(output)
        output = torch.mean(output, dim=1)
        output = self.classifier(output)
        return output

# encoder based transformer model for classification, with a learnable parameter for positional encoding
class ParamTransformer(Transformer):
    def __init__(self, class_names: List[str], seq_len: int, d_model: int, nhead: int, d_ff: int = 2048, num_layers: int = 2, dropout: float = 0.1):
        """
        Transformer model with learnable parameter as encoding
        """
        super().__init__(class_names, seq_len, d_model, nhead, d_ff, num_layers, dropout)
        self.model_type = 'paramtransformer'
        self.positional_encoding = nn.Parameter(torch.randn(1, seq_len, d_model))

    def forward(self, src: torch.Tensor):
        output = src + self.positional_encoding
        for encoder_layer in self.encoder_layers:
            output = encoder_layer(output)
        output = torch.mean(output, dim=1)
        output = self.classifier(output)
        return output
#----------------------------------------------------------------------------LSTM Model------------------------------------------------------------------------
class LstmModel(nn.Module):
    def __init__(self, class_names: List[str], input_size: int, hidden_size: int, num_layers: int= 1, activition: Callable= nn.ReLU()):
        super().__init__()
        self.model_type= 'lstm'
        self.num_layers = num_layers
        self.class_names= class_names
        self.lstm_layers= nn.ModuleList()
        self.lstm_layers.append(nn.LSTM(input_size=input_size, hidden_size=hidden_size, batch_first=True))
        
        for i in range(1, num_layers):
            self.lstm_layers.append(nn.LSTM(input_size=hidden_size, hidden_size=hidden_size, batch_first=True))
        
        self.fc = nn.Linear(in_features= hidden_size, out_features= len(self.class_names))
        self.activition = activition

    def forward(self, src):
        output = src
        for lstm in self.lstm_layers:
            output, final_states = lstm(output)
            output = self.activition(output)

        output= self.fc(output[:,-1,:])
        return output

Overwriting C:/Users/sadeg/OneDrive/Desktop/Thesis/python_codes/SignLanguageProject/src/models.py


# making train_utils.py file

In [23]:
%%writefile $src_dir/train_utils.py
#Explanation: This python file contains functions for implementing the training step
#-------------------------------------------------------------------------Import-------------------------------------------------------------------------------

#importing libraries for training models
import torch  
from torch.utils.data import DataLoader # for writing input types
# importing tqdm for progression bar
from tqdm.auto import tqdm 
# importing typing for writing input types for the functions
from typing import Callable, List

#----------------------------------------------------------------Functions for training a model----------------------------------------------------------------
#function for resetting the model parameters if needed
def reset_model_parameters(model):
    for name, module in model.named_children():
        if hasattr(module, 'reset_parameters'):
            module.reset_parameters()
            
# function to calculate accuracy
def accuracy_fn(y_logits: torch.Tensor, y: torch.Tensor):
    """
    returns accuracy based on true and predicted label values
    Args:
        y_logits: torch tensor that represents model outputs
        y: torch tensor that represents true output values
    Returns:
        accuracy
    Example usage: 
        accuracy= accuracy_fn(y_logits, y)
    """
    y_preds= torch.argmax(y_logits, 1)                 # gives the position --> label of the strongest prediction
    corrects= (y_preds==y)                             # compare prediction with truth
    accuracy= corrects.sum().item()/ corrects.shape[0] # number of true predictions / all predictions
    return accuracy

# function to train the model
def train_model(num_epochs: int,
                model: torch.nn.Module,
                train_dataloader: DataLoader,
                test_dataloader: DataLoader,
                optimizer: torch.optim.Optimizer,
                loss_fn: torch.nn.Module,
                device: torch.device):
    """
    Trains a model on given train and test data. and returns avg loss and avg accruacies for each epoch.
    Args:
        num_epochs: number of times (epochs) the model is trained with the entire dataset
        model: model object
        train_dataloader: DataLoader object of train dataset
        test_dataloader: DataLoader object of test dataset.
        optimizer: optimizing entity that updates the weights of the model
        loss_fn: function to calculate loss
        device: Cuda or CPU
    Returns:
        A tuple of (train_losses, test_losses, train_accuracies, test_accuracies, y_trues, y_preds) where:
        train_losses is a list that contains avg train loss of all batches, for every epoch.
        test_losses is a list that contains avg test loss of all batches, for every epoch.
        train_accuracies is a list that contains avg train accuracy of all batches, for every epoch.
        test_accuracies is a list that contains avg test accuracy of all batches, for every epoch.
        y_trues and y_preds are used to draw confusion matrix (they get overwritten in each epoch so in principle the last value of y_trues and y_preds is
        returned).
    Example usage: 
        results= train(num_epochs, model, train_dataloader, test_dataloader, optimizer, loss_fn, accuracy_fn, device)
        train_losses, test_losses, train_accuracies, test_accuracies, y_trues, y_preds= results[0], results[1], results[2], results[3], results[4], results[5]
    """
    
    train_losses= []     
    test_losses= []          
    train_accuracies= []      
    test_accuracies= []       
 
    for epoch in tqdm(range(num_epochs), desc="Training Epoch"):
        model.train()
        train_loss= [] # a list to store loss of every batch
        train_acc= []  # a list to store acc of every batch

        for X, y in train_dataloader:
            # sending detections and labels to device
            X= X.to(device) 
            y= y.to(device)

            # train the model
            optimizer.zero_grad()
            y_logits = model(X)
            loss = loss_fn(y_logits, y)        # batch loss
            loss.backward()
            optimizer.step()

            accuracy= accuracy_fn(y_logits, y) # batch accuracy

            #add loss and accuray of the batch to the list
            train_loss.append(loss.item())
            train_acc.append(accuracy)
            
        # adding average loss and accuracy for the epoch
        train_losses.append(sum(train_loss) / len(train_loss))  
        train_accuracies.append(sum(train_acc) / len(train_acc))
    
        model.eval()      # setting model to evaluation mode so no weights are changed

        y_trues= []       
        y_preds= []       
        test_loss= []     # list to store loss of every batch
        test_acc= []      # list to store accuracy of every batch
        
        with torch.no_grad():
            for X, y in test_dataloader:
                X = X.to(device)
                y = y.to(device)
                
                y_logits = model(X)
                loss = loss_fn(y_logits, y)        # test batch loss
                accuracy= accuracy_fn(y_logits, y) # test batch accuracy
                
                test_loss.append(loss.item())
                test_acc.append(accuracy)
                y_pred= torch.argmax(y_logits, 1)                 # predicted labels
                
                y_trues.extend(y.flatten().cpu().numpy())          # Store true labels
                y_preds.extend(y_pred.flatten().cpu().numpy())     # Store predictions
                
        test_losses.append(sum(test_loss) / len(test_loss))
        test_accuracies.append(sum(test_acc) / len(test_acc))

    return train_losses, test_losses, train_accuracies, test_accuracies, y_trues, y_preds

Overwriting C:/Users/sadeg/OneDrive/Desktop/Thesis/python_codes/SignLanguageProject/src/train_utils.py


# making plot_utils.py file

In [21]:
%%writefile $src_dir/plot_utils.py
# Explanation: This python file contains functions for plotting training results and other important data.
#----------------------------------------------------------------------Import-----------------------------------------------------------------------------------

# importing libraries for plotting data                                                 
import cv2 
import numpy as np 
import seaborn as sns
import matplotlib.pyplot as plt 
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import confusion_matrix

# for writing input types for the functions                                                                                
from typing import List
from numpy.typing import NDArray

#-----------------------------------------------------------------visualizing video detections-----------------------------------------------------------------
# functions for drawing video landmarks
def draw_circles(frame: np.ndarray,
                 frame_detection: NDArray[np.float64],
                 frame_structure: List[tuple]):
    """
    This function draws circles on the frame based on x and y position of the landmark. it uses the frame_structure list to handle drawing pose landmarks since
    unlike other landmarks they have x, y, z and "visibility values"
    Args:
        frame: represents frame that is shown
        frame_detection: represents the coordinates of the landmarks in a frame that was processed by mediapipe.
        frame_structure: a list that represents the start and end index for each landmark class: pose, face, lh, rh.
    Returns:
        manipulated frame 
    Example usage:
        frame = draw_circles(frame, frame_detection, [(0, 132), (132, 1536), (1536, 1599), (1599, 1662)])
    """
    for (start, end) in frame_structure:                            # iterate through frame structure: pose, face, lh, rh
        bodypart= frame_detection[start:end]               
        if (start, end) == frame_structure[0]:                      # for the pose landmarks:
            for i in range(0, len(bodypart), 4):                    # iterate through pose landmark
                x, y = bodypart[i], bodypart[i+ 1]                  # getting x and y values for drawing the circles
                px = int(x * frame.shape[1])                                                   
                py = int(y * frame.shape[0])
                cv2.circle(frame, (px, py), 3, (0, 255, 0), -1)     # plotting circles on the frame
        else:
            for i in range(0, len(bodypart), 3):
                x, y = bodypart[i], bodypart[i+ 1]
                px = int(x * frame.shape[1]) 
                py = int(y * frame.shape[0])
                cv2.circle(frame, (px, py), 3, (0, 255, 0), -1)
    return frame

# function to show the vidoe detections
def show_video_detections(video_detection: NDArray[np.float64],
                          frame_structure: List[tuple] = [(0, 132), (132, 1536), (1536, 1599), (1599, 1662)],
                          height: int = 720,
                          width: int = 1280):
    """
    This function draws Mediapipe landmarks that were detected from a video. It uses a video_detection array that has x, y, z( and visibility for pose) values. 
    here we only focus on the (x,y) coordinates we do not draw in 3D (no z or visibility).
    Args:
        video_detection: an array that represents video detections 
        frame_structure: a list that represents the start and end index for each landmark class: pose, face, lh, rh.
        height and width: dimentions of the video
    """
    cv2.namedWindow("video detection", cv2.WINDOW_NORMAL)                   # make a window
    cv2.resizeWindow("video detection", width= width, height= height)       # resize the window to desired hight and width
    try:      # try to plot
        for frame_detection in video_detection:
            frame = np.zeros((height, width, 3), dtype=np.uint8)            # making empty black frame
            frame = draw_circles(frame, frame_detection, frame_structure)   # drawing circles on the frame
            cv2.imshow("video detection", frame)
            if cv2.waitKey(100) & 0xFF == 27:  #ESC key
                break
    finally:  # guarantees that destroyAllWindows() is executed at the end. even if there is error in try part
        cv2.destroyAllWindows()

# function to plot the video detection in one frame
def plot_video_detection_as_MHI(video_detection: NDArray[np.float64],
                                num_movements: int= 10,
                                height: int= 720,
                                width: int= 1280,
                                save_path: str = None,
                                frame_structure: List[tuple]= [(0, 132), (132, 1536), (1536, 1599), (1599, 1662)]):
    """
    This function plots the entire video in one single frame also refered to as motion history image. the lh, rh and pose each have different colors
    Args:
        video_detection: an array that represents video detections 
        frame_structure: a list that represents the start and end index for each landmark class: pose, face, lh, rh.
    """
    pose_xy, lh_xy, rh_xy = [], [], []       # lists to store (x, y) values for pose, lh and rh 
    mean_lh_xy, mean_rh_xy= [], []
    for frame_detection in video_detection:                                           
        for (start, end) in frame_structure:
            bodypart = frame_detection[start:end]
            # for pose 
            if (start, end) == frame_structure[0]:              
                frame_pose_xy= [(bodypart[i], bodypart[i+ 1]) for i in range(0, len(bodypart), 4)]  # get (x, y) from (x, y, z, vis) of each landmark
                mean_lh_xy.append(tuple(np.mean([frame_pose_xy[15], frame_pose_xy[17], frame_pose_xy[19]], axis=0))) # a more stable mean
                mean_rh_xy.append(tuple(np.mean([frame_pose_xy[16], frame_pose_xy[18], frame_pose_xy[20]], axis=0))) # a more stable mean
                pose_xy.append(frame_pose_xy)
            # for left hand
            elif (start, end) == frame_structure[2]:         
                frame_lh_xy= [(bodypart[i], bodypart[i+ 1]) for i in range(0, len(bodypart), 3)]    # get (x, y) from (x, y, z) of each landmark
                lh_xy.append(frame_lh_xy)
                #mean_lh_xy.append(tuple(np.mean(frame_lh_xy, axis= 0)))     # store one x mean, y mean tuple for the entire left hand in the frame
            # for right hand
            elif (start, end) == frame_structure[3]:         
                frame_rh_xy= [(bodypart[i], bodypart[i+ 1]) for i in range(0, len(bodypart), 3)]    # get (x, y) from (x, y, z) of each landmark
                rh_xy.append(frame_rh_xy)                                   
                #mean_rh_xy.append(tuple(np.mean(frame_rh_xy, axis= 0)))    # store one x mean, y mean tuple for the entire right hand in the frame
    
    pose_colors, lh_colors, rh_colors= [], [], []        #lists to store shades of colors for pose, lh, rh
    for i in range(len(video_detection)): 
        pose_colors.append(plt.cm.Blues(np.log1p(i) / np.log1p(len(video_detection))))    # shade of blue
        lh_colors.append(plt.cm.Greens(np.log1p(i) / np.log1p(len(video_detection))))     # shade of green
        rh_colors.append(plt.cm.Reds(np.log1p(i) / np.log1p(len(video_detection))))       # shade of red

    # plotting video as motion histogram image (MHI)
    plt.figure(figsize=(9, 6))
    for i in range(len(video_detection)):
        plt.scatter(*zip(*pose_xy[i]), color=pose_colors[i], s=5)       # draw pose landmark with blue
        plt.scatter(*zip(*lh_xy[i]), color=lh_colors[i], s=5)           # draw lh landmark with green
        plt.scatter(*zip(*rh_xy[i]), color=rh_colors[i], s=5)           # draw rh landmark with red

    #plotting lines to depict movement nicer
    points_for_lh_line= [mean_lh_xy[i] for i in np.linspace(0, len(mean_lh_xy)- 1, num_movements, dtype=int)]
    points_for_rh_line= [mean_rh_xy[i] for i in np.linspace(0, len(mean_rh_xy)- 1, num_movements, dtype=int)]

    plt.plot(*zip(*points_for_lh_line), linewidth=4, color= 'green') 
    plt.plot(*zip(*points_for_rh_line), linewidth=4, color= 'red') 

    plt.gca().set_aspect(height/width)
    plt.gca().invert_yaxis()

    if save_path:
        plt.savefig(save_path, bbox_inches='tight', dpi=300)
    plt.show()

#---------------------------------------------------------------Visualizing training results--------------------------------------------------------------------
def plot_loss_accuracy(train_losses: List[float],
                       test_losses: List[float],
                       train_accuracies: List[float],
                       test_accuracies: List[float],
                       batch_size: int,
                       save_path: str):
    """
    Draws loss and accuracy of a training session.
    Example usage:
        plot_loss_accuracy(train_losses, test_losses, train_accuracies, test_accuracies, 64)
    """
    plt.figure(figsize=(18, 9))
    # plotting loss
    plt.subplot(1, 2, 1) 
    plt.plot(train_losses, label='Train Loss')      
    plt.plot(test_losses, label='Test Loss')
    plt.title(f'Loss over Epochs(batch size= {batch_size}), Last Loss:{test_losses[-1]}') # writing the final loss value
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    # plotting accuracy
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(test_accuracies, label='Test Accuracy')
    plt.title(f'Acc over Epochs(batch size= {batch_size}), Last Acc: {test_accuracies[-1]}') # writing the final accuracy
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    if save_path:
        plt.savefig(save_path, bbox_inches='tight', dpi=300)
    
    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(y_trues: List[int],
                          y_preds: List[int],
                          class_names: List[str],
                          num_epochs: int,
                          save_path: str):
    """
    Plots confusion matrix of a model using true values and model predictions.
    Example usage:
        plot_confusion_matrix(y_trues, y_preds, class_names, num_epochs)
    """
    conf_matrix = confusion_matrix(y_trues, y_preds)
    plt.figure(figsize=(18, 15))
    sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(f'Confusion Matrix after {num_epochs} epoches')

    if save_path:
        plt.savefig(save_path, bbox_inches='tight', dpi=300)
    
    plt.show()

# function for drawing in tensor board.
def draw_in_tensorboard(train_losses: List[float],
                        test_losses: List[float],
                        train_accuracies: List[float], 
                        test_accuracies: List[float],  
                        log_dir: str):
    """
    Plots loss and accuracy of the training process in tensor board.
    Example usage:
        draw_in_tensorboard(train_losses, test_losses, train_accuracies, test_accuracies, save_directory)
    """
    with SummaryWriter(log_dir= log_dir) as writer:
        losses_and_accuracies= zip(train_losses, test_losses, train_accuracies, test_accuracies)
        for epoch , (tr_losses, te_losses, tr_accs, te_accs) in enumerate(losses_and_accuracies):
            writer.add_scalar('Loss/train', tr_losses, epoch)
            writer.add_scalar('Loss/test', te_losses, epoch)
            writer.add_scalar('Accuracy/train', tr_accs, epoch)
            writer.add_scalar('Accuracy/test', te_accs, epoch)
            

Overwriting C:/Users/sadeg/OneDrive/Desktop/Thesis/python_codes/SignLanguageProject/src/plot_utils.py


# making train.py file

In [3]:
%%writefile $src_dir/train.py
# Explanation: this python file carries out the training process. from making the datast to plotting the results.
#-----------------------------------------------------------------------Import-------------------------------------------------------------------------------

# importing numpy, torch, nn, typing: for writing input types for the functions
import numpy as np 
from numpy.typing import NDArray
from typing import Callable, List, Tuple, Literal
import torch
from torch import nn
# from tqdm.auto import tqdm  

import torch.optim as optim                   #optimizer
from torch.utils.data import Dataset          # dataset calss
from torch.utils.data import DataLoader       #data loader
    
from sklearn.utils import resample         # used for bootstrapping
from sklearn.model_selection import KFold  # for K fold cross validation if necessary

# connecting the steps
from preprocess_utils import interpolate_dataset, split_dataset, convert
from plot_utils import draw_in_tensorboard, plot_confusion_matrix, plot_loss_accuracy
from train_utils import train_model, reset_model_parameters # for K fold cross validation if necessary

#-------------------------------------------------------------Constant variables and classes-------------------------------------------------------------------
# path to experiment directory: where the tensorboard files are saved (!!!!should be changed based on system file structure!!!!)
experiment_dir= "C:/Users/sadeg/OneDrive/Desktop/Thesis/python_codes/SignLanguageProject/experiment_results"
# path to the current directory incase we want to save some plotting pictures quickly (!!!!should be changed based on system file structure!!!!)
current_dir= "C:/Users/sadeg/OneDrive/Desktop/Thesis/python_codes/SignLanguageProject/notebooks"
# A simple dataset class from to CustomImageDataset example from pytorch.org
class CustomDataset(Dataset):
    def __init__(self,features, labels):
        self.features = features
        self.labels = labels
    
    def __len__(self):
        return len(self.labels)
        
    def __getitem__(self, index):
        feature = self.features[index]
        label = self.labels[index]
        return feature, label
        
#----------------------------------------------------------------------Train functions-------------------------------------------------------------------------
# function to configure the training enviroment
def configure(detections: NDArray[np.float64], 
              labels: List[str],
              class_names: List[str],
              test_size: float,
              batch_size: int,
              num_epochs: int,
              model: torch.nn.Module,
              lr: float,
              device: torch.device,
              quick_save: bool,
              results_name: str,
              dataset_name: Literal['LSA64', 'AUTSL40']):
    """
    This function configures the training enviroment. The function first splits the datasets, then creates dataset and dataloader objects, following that 
    the functions sends the model to the device, it defines loss function and optimizer algorithim for the train process. Afterwards the model trains the model
    and plots the results and saves them to the right directory.
    Args:
        detections: array of all video detections
        labels: list of all video labels
        class_names: a list containing unique class names in the dataset
        batch_size: batch size
        num_epochs: number of epochs
        lr: learning rate
        device: the device that we use for training (Cuda or CPU)
        results_name: used to identify different training results
        quick_save: a boolean for quick saving the plots in current dir
        data_set_dir: The directory where the results are saved
    """
    #X_train, X_test, y_train, y_test = train_test_split(detections, labels, test_size= test_size, random_state= 42, stratify=labels)

    X_train, X_test, y_train, y_test= split_dataset(detections, labels, class_names, test_size)  # split the dataset
    train_dataset= CustomDataset(X_train, y_train)      # train_dataset
    test_dataset= CustomDataset(X_test, y_test)         # test dataset
    train_loader = DataLoader(dataset=train_dataset, batch_size= batch_size, num_workers=0, shuffle=True) # train dataloader 
    test_loader = DataLoader(dataset=test_dataset, batch_size= batch_size, num_workers=0, shuffle=False)  # test dataloader
    model= model.to(device)                            # sending model to device: CUDA or CPU     
    loss_fn = nn.CrossEntropyLoss()                    # cross entropy for loss
    optimizer = optim.Adam(model.parameters(), lr= lr) # Adam optimizer

    train_l, test_l, train_a, test_a, y_trues, y_preds = train_model(num_epochs,model, train_loader, test_loader, optimizer, loss_fn, device)  # train model

    save_path= f"{current_dir}/loss_acc.png" if quick_save else None
    plot_loss_accuracy(train_l, test_l, train_a, test_a, batch_size, save_path)  # loss acc
    save_path= f"{current_dir}/confmat.png" if quick_save else None
    plot_confusion_matrix(y_trues, y_preds, class_names, num_epochs, save_path)  # confusion matrix

    log_dir =f'{experiment_dir}/{dataset_name}/{model.model_type}/runs/{results_name}/'         # directory for saving the tensorboard files
    draw_in_tensorboard(train_l, test_l, train_a, test_a, log_dir)  # drawing in tensor board

# function to configure the Kfold cross validation
def configure_Kfold(detections: NDArray[np.float64], 
                    labels: List[str],
                    class_names: List[str],
                    n_splits: int,
                    batch_size: int,
                    num_epochs: int,
                    model_class: torch.nn.Module,
                    model_args: dict,
                    lr: float,
                    device: torch.device,
                    quick_save: bool):
    """
    This function configures the training enviroment for KFold cross validation. For each fold, The function first splits the datasets, then creates dataset
    and dataloader objects, following that the functions sends the model to the device, it defines loss function and optimizer algorithim for the train 
    process. Afterwards the model trains the model and plots the results and saves them to the right directory. 
    Args:
        detections: array of all video detections
        labels: list of all video labels
        class_names: a list containing unique class names in the dataset
        n_splits: the number of folds that the data will be divided to
        batch_size: batch size
        num_epochs: number of epochs
        lr: learning rate
        device: Cuda or CPU
        quick_save: a boolean for quick saving the plots in current dir
    """
    X, y= convert(detections, labels, class_names) # converting detections and labels to the right format.
    dataset= CustomDataset(X, y)                   # making dataset
    kf= KFold(n_splits=n_splits, shuffle=True)     # making kfold object to split the dataset
    
    for fold, (train_idx, test_idx) in enumerate(kf.split(dataset)):
        print(f"Fold {fold + 1} ------------------------------------------------------------------------------------------------------------------------------")
        model = model_class(**model_args).to(device)
        train_loader = DataLoader(dataset=dataset, batch_size= batch_size, sampler=torch.utils.data.SubsetRandomSampler(train_idx))  # train_dataloader
        test_loader = DataLoader(dataset=dataset, batch_size= batch_size, sampler=torch.utils.data.SubsetRandomSampler(test_idx))    # test dataloader
        loss_fn = nn.CrossEntropyLoss()                       # loss function
        optimizer = optim.Adam(model.parameters(), lr= lr)    # optimizer

        train_l, test_l, train_a, test_a, y_trues, y_preds = train_model(num_epochs,model, train_loader, test_loader, optimizer, loss_fn, device) # train model

        save_path= f"{current_dir}/{fold+1}.png" if quick_save else None 
        plot_loss_accuracy(train_l, test_l, train_a, test_a, batch_size, save_path)    # loss acc
        # save_path= f"{current_dir}/c{fold+1}.png" if quick_save else None
        # plot_confusion_matrix(y_trues, y_preds, class_names, num_epochs, save_path)  # confusion matrix

Overwriting C:/Users/sadeg/OneDrive/Desktop/Thesis/python_codes/SignLanguageProject/src/train.py


# making analyse_layer.py file

In [4]:
%%writefile $src_dir/analyse_layer.py
# Explanation: this python file contains functions for analysing layer attentions and drawing sailency maps.
#------------------------------------------------------------------------Import--------------------------------------------------------------------------------
import captum
from captum.attr import Attribution
from captum.attr import Saliency
from captum.attr import IntegratedGradients
from captum.attr import LayerConductance

import os
import cv2
import torch
from tqdm.auto import tqdm 
from torch import Tensor
import mediapipe as mp
import numpy as np
import matplotlib.pyplot as plt
from typing import List
from prepare_datasets import get_frame_detections, get_frame_coordinates, autslclass_names, lsa64class_names
from preprocess_utils import convert

#------------------------------------------------------------------------Constants-----------------------------------------------------------------------------
# these parameters most only be used when the 468 extra face landmarks are not included !!!!
pose_idx, lh_idx, rh_idx= list(range(0,33)), list(range(33, 54)), list(range(54, 75))    # indexes in pose, lh, rh
pose_l_idx, pose_r_idx= [11, 13, 15, 17, 19, 21], [12, 14, 16, 18, 20, 22]               # indexes corresponding to right arm body and left arm 
pose_m_idx= [idx for idx in pose_idx if idx not in pose_r_idx and idx not in pose_l_idx] # rest of the indexes: legs, hips, etc
reordered_idxs= pose_m_idx + pose_l_idx + lh_idx + pose_r_idx + rh_idx                   # so left arm lh and right arm rh are next to each other

reordered_landmarks= ["Nose", "Left Eye Inner", "Left Eye", "Left Eye Outer", "Right Eye Inner", "Right Eye", "Right Eye Outer", "Left Ear", "Right Ear",
                      "Left Mouth Corner", "Right Mouth Corner", "Left Hip", "Right Hip", "Left Knee", "Right Knee", "Left Ankle", "Right Ankle", "Left Heel",
                      "Right Heel", "Left Foot Index", "Right Foot Index", "Left Shoulder", "Left Elbow", "Left Wrist", "Left Pinky", "Left Index", 
                      "Left Thumb", "Left Wrist", "Left Thumb CMC", "Left Thumb MCP", "Left Thumb IP", "Left Thumb Tip", "Left Index MCP", "Left Index PIP",
                      "Left Index DIP", "Left Index Tip", "Left Middle MCP",  "Left Middle PIP", "Left Middle DIP", "Left Middle Tip", "Left Ring MCP",
                      "Left Ring PIP", "Left Ring DIP", "Left Ring Tip", "Left Pinky MCP", "Left Pinky PIP", "Left Pinky DIP", "Left Pinky Tip", 
                      "Right Shoulder", "Right Elbow", "Right Wrist", "Right Pinky", "Right Index", "Right Thumb", "Right Wrist", "Right Thumb CMC", 
                      "Right Thumb MCP", "Right Thumb IP", "Right Thumb Tip", "Right Index MCP", "Right Index PIP", "Right Index DIP", "Right Index Tip",
                      "Right Middle MCP", "Right Middle PIP", "Right Middle DIP", "Right Middle Tip",  "Right Ring MCP", "Right Ring PIP", "Right Ring DIP",
                      "Right Ring Tip", "Right Pinky MCP", "Right Pinky PIP", "Right Pinky DIP", "Right Pinky Tip"]

mp_holistic= mp.solutions.holistic         # mediapipe holistic model
mp_drawing= mp.solutions.drawing_utils     # pre-made class that has functions for drawing media pipe result object

#--------------------------------------------------------------plotting data on video sample-------------------------------------------------------------------
def plot_attributions_on_video(video_path: str,
                               model: torch.nn.Module,
                               captum_method: Attribution,
                               class_names: List[str],
                               device: torch.device,
                               frame_numbers: int = 30):
    """
    This function plots the attribution values for a given video (from LSA64 and AUTSL) 
    Args:
        video_path: Path to the video sample.
        captum_method: ex:  Saliency
        class_names: List of all words in the dataset from which we took that video.
        device: The device that the computation is going to take place on (CPU and GPU)
        frame_numbers: number of frames we want to take from the entire video.
    Example usage:
        result_objs, coordiantes, video_detection, label= get_landmarks_from_vid(video_path, class_name, 30) 
    """
    vid_idx_to_label= {i:label for i, label in enumerate(class_names)}          # this mapping is used to change the video titles to labels
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"ERROR in opening the video path{video_path}")
        return
        
    with mp.solutions.holistic.Holistic(min_detection_confidence= 0.5, min_tracking_confidence=0.5) as holistic:
        try:
            frames= []
            result_objs= []        # stores mediapipe result objects from each frame of the video
            video_detection= []    # stores the detections from each frame of the video
            video_coordinates= []  # stores the coordinates of the detected landmarks
            
            total_frames_number = cap.get(cv2.CAP_PROP_FRAME_COUNT)                                 
            total_frames_number= int(total_frames_number)
            frame_idxs_to_process = np.linspace(0, total_frames_number-1, frame_numbers, dtype=int) # desired frame indexes
            
            for idx in frame_idxs_to_process:
                cap.set(cv2.CAP_PROP_POS_FRAMES, idx)       # set cv2 to the desired index
                ret, frame= cap.read()                      # process the frame in that index
                if not ret:
                    print("unreadble frame detected")       # incase there is any unreadable frame
                    break   

                result= holistic.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))  # processing frame with Mediapipe
                pose,face,lh, rh= get_frame_detections(result)
                frame_detection= np.concatenate((pose, lh, rh))                   # here we eliminate face landmarks for drawing.
                p_co, f_co, l_co, r_co= get_frame_coordinates(result, frame)      # turning results into coordinates
                frame_coordinates= p_co+ l_co+ r_co                               # shape: 1, frame_number, 75 tuples                 
                
                frames.append(frame)                                              # append
                result_objs.append(result)                                       
                video_detection.append(frame_detection)  
                video_coordinates.append(frame_coordinates)
                    
            if class_names== autslclass_names:    # for AUTSL
                video_idx= int(os.path.basename(os.path.dirname(video_path))) # extract video index from the video folder
                label= vid_idx_to_label[video_idx]                            # map the video index to a label      
            elif class_names== lsa64class_names:  # for LSA64
                video_idx= int(os.path.basename(video_path).split('_')[0])    # extract index from the video title: 001_004_003 -> 1
                label= vid_idx_to_label[video_idx-1]                          # map the index to the correct label

            print(f"Calculating landmark attributions for the sign language video {label}")
            video_detection = np.array(video_detection, dtype=np.float64)                     # torch said to change list of numpy to np array so its faster
            video_detection, label= convert(video_detection, [label], class_names)                                     # converting to tensors
            attributions= landmark_attributions(model, captum_method, video_detection, label, device)                  # shape: 1, frame_number, 75

            vid_color_scale= (attributions[0] - attributions[0].min()) / (attributions[0].max()-attributions[0].min()) # normalizing attributions
            vid_color_scale= vid_color_scale* 255 
            vid_color_scale = vid_color_scale.cpu().numpy()   

            for i, frame in enumerate(frames):
                plot_mp_landmarks(frame, result_objs[i])
                plot_circle(frame, video_coordinates[i], vid_color_scale[i])
                
                cv2.imshow('Frame with Attributions', frame)
                #cv2.resizeWindow('Frame with Attributions', int(frame.shape[1] * 0.6), int(frame.shape[0] * 0.6))  # make it smaller
                if cv2.waitKey(30) & 0xFF == 27:
                    break

        except Exception as e:
            print(f"Error happened while processing: {e}")
            raise
            
        finally:
            cap.release()
            cv2.destroyAllWindows()
    
#------------------------------------------------------------------working with attention values-------------------------------------------------------------
# function for calculating mean attentions
def landmark_attributions(model: torch.nn.Module,
                          captum_method: Attribution,
                          video_detection: torch.Tensor, 
                          label: torch.Tensor, 
                          device: torch.device,):
    """
    Calculates the mean layer attribution of landmarks in  a video. each landmark has x, y, z (and in case of pose_landmarks visibility) values. mean layer
    attribution
    acts as a parameter that shows how much a landmark is effecting the output of the model.
    Args:
        model: model that we want to analyse
        captum_method: LayerConductance, Saliency, IntegratedGradients
        video detection : a tensor of shape(frame_number, 1662 or 258)
        label: label of the video
         device: The device that the computation is going to take place on (CPU and GPU)
    Example usage:
        lm_attributions= landmark_attributions(model,  sailency, video_detection, label, device)
    """
    model.eval()                                                             # set model to evaluation mode
    model.to(device)
    video_detection, label = video_detection.to(device), label.to(device)    # put on GPU
    video_detection= video_detection.unsqueeze(0)                        
    video_detection.requires_grad_()

    attributions= captum_method.attribute(inputs= video_detection, target= label.item())
    
    pose = attributions[:, :, :132]                                          # shape: 1, frame_number, 132
    pose = pose.reshape(attributions.shape[0], attributions.shape[1], -1, 4) # shape: 1, frame_number, 33, 4
    pose_means = pose.mean(dim=3)                                            # shape: 1, frame_number, 33
    
    rest = attributions[:, :, 132:]                                          # shape: 1, frame_number, 1530 or 126
    rest = rest.reshape(attributions.shape[0], attributions.shape[1], -1, 3) # shape: 1, frame_number, 510 or 42, 3
    rest_means = rest.mean(dim=3)                                            # shape: 1, frame_number, 510 or 42
    means = torch.cat((pose_means, rest_means), dim=2)                       # shape: 1, frame_number, 510+33 or 42+33
    
    return means

def landmark_attributions_for_dataset(model, captum_method, dataset, device):
    '''
    This function calculates the mean layer attribution of landmarks over the dataset.
    Example Usage:
        lm_atts_dataset= landmark_attributions_for_dataset(model, sailency, dataset, device)
    '''
    total_lm_atts= None
    for data in tqdm(dataset):
        video_detection, label = data[0], data[1]
        video_lm_atts= landmark_attributions(model, captum_method, video_detection, label, device)
    
        if total_lm_atts is None:
            total_lm_atts = torch.zeros_like(video_lm_atts)
    
        total_lm_atts+= video_lm_atts
    
    lm_atts_dataset = total_lm_atts / len(dataset)
    return lm_atts_dataset
#-------------------------------------------------------------------plotting heatmap -------------------------------------------------------------------------

# function for drawing the attention heatmap
def plot_atts_heatmap(attributions: Tensor, save_path: str, show_landmark_names: bool = False, vmin= None, vmax= None):
    """
    For a video, this function plots attribution as a heatmap where x axis are frames, y axis are landmarks and the colors represent attribution values.
    Args:
        attributions: a tensor of shape(1, frame_number, 1662 or 258)
        show_landmark_names: boolean variable for showing landmark names
    Note: vmin and vmax are chosen based on trail and error.
    """
    atts = attributions.detach().cpu().numpy()     # moving tensor to CPU for drawing 
    num_frames, num_features = atts[0].shape       # remove batch dimention and get frame and featur numbers
    plt.figure(figsize=(20, 15))
    if show_landmark_names:
        atts = atts[:, :, reordered_idxs]          # reorder the landmarks
        y_ticks_labels = reordered_landmarks
        y_ticks_positions = range(0, num_features)
    else:
        y_ticks_labels = None 
        y_ticks_positions = range(0, num_features, 5) 
        
    plt.imshow(atts[0].T, cmap='viridis', aspect='auto', origin='lower', vmin= vmin, vmax= vmax) # we transpose attributions so landmarks are on y axis
    plt.colorbar()
    plt.xlim(0, num_frames - 1)
    plt.xticks(range(0, num_frames))
    plt.xlabel("frames")
    plt.ylim(0, num_features - 1)
    plt.yticks(y_ticks_positions, y_ticks_labels)
    plt.ylabel("features")
    plt.title("Attributions")
    if save_path:
        plt.savefig(save_path, bbox_inches='tight', dpi=300)

    plt.tight_layout()
    plt.show()

#---------------------------------------------------------------------plotting on data on video---------------------------------------------------------------
def plot_mp_landmarks(frame, result):
    """
    This function draws landmarks and connections on a given frame.
    Args:
        frame: video frame that we want to draw on.
        result: the detected media pipe object corresponding to the frame.
        
    """
    mp_drawing.draw_landmarks(frame, result.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                              mp_drawing.DrawingSpec(color= (0, 255, 0), thickness= 1, circle_radius= 2),
                              mp_drawing.DrawingSpec(color= (0, 255, 0), thickness= 1, circle_radius= 2))
    mp_drawing.draw_landmarks(frame, result.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
                              mp_drawing.DrawingSpec(color= (0, 255, 0), thickness= 1, circle_radius= 2),
                              mp_drawing.DrawingSpec(color= (0, 255, 0), thickness= 1, circle_radius= 2))
    mp_drawing.draw_landmarks(frame, result.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                              mp_drawing.DrawingSpec(color= (0, 255, 0), thickness= 1, circle_radius= 2),
                              mp_drawing.DrawingSpec(color= (0, 255, 0), thickness= 1, circle_radius= 2))
    mp_drawing.draw_landmarks(frame, result.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                              mp_drawing.DrawingSpec(color= (0, 255, 0), thickness= 1, circle_radius= 2),
                              mp_drawing.DrawingSpec(color= (0, 255, 0), thickness= 1, circle_radius= 2))


def plot_circle(frame, coordinates, frame_colorscale):
    """
    This function visualizes layer attributions by drawing circles on detected landmarks.
    Args:
        frame: The video frame we want to draw on.
        coordinates: List of (x, y) coordinates of landmarks detected in the frame.
        frame_colorscale: a color scaling based on attribution values.
                      
    """
    for idx, color_scale in enumerate(frame_colorscale):  
        intensity = int(color_scale)
        color = (intensity, 255 , 0)
        x, y = coordinates[idx]
        cv2.circle(frame, (x, y), radius=5, color=color, thickness=-1)


Overwriting C:/Users/sadeg/OneDrive/Desktop/Thesis/python_codes/SignLanguageProject/src/analyse_layer.py
