## **Sign Language Recognition**

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

import pickle

import os
import random
import matplotlib.pyplot as plt

In [None]:
COMPETITION_PATH = '/kaggle/input/asl-signs/'
PROCESS_DATASET_PATH = "/kaggle/input/preprocess-dataset/preprocess_dataset.pkl"
dataset_path = '/kaggle/input/asl-signs/train_landmark_files'
user_ids = os.listdir('/kaggle/input/asl-signs/train_landmark_files')

## Function to load sequence provided by Google

In [None]:
ROWS_PER_FRAME = 543  # number of landmarks per frame

def load_relevant_data_subset(pq_path):
    data_columns = ['x', 'y']
    data = pd.read_parquet(pq_path, columns=data_columns)
    n_frames = int(len(data) / ROWS_PER_FRAME)
    data = data.values.reshape(n_frames, ROWS_PER_FRAME, len(data_columns))
    return data.astype(np.float32)

In [None]:
def select_random_sequence():
    usr = random.choice(user_ids)
    usr_sqc = os.listdir(os.path.join(dataset_path,usr))
    sqc = random.choice(usr_sqc)
    return os.path.join(dataset_path,usr,sqc)

In [None]:
select_random_sequence()

In [None]:
cols = ['frame', 'row_id', 'type', 'landmark_index', 'x', 'y', 'z']
pq_path = select_random_sequence()
df = pd.read_parquet(pq_path, columns=cols)
print(pq_path)
print(f'xmax: {np.max(df.x)}\nymax: {np.max(df.y)}\nxmin: {np.min(df.x)}\nymin: {np.min(df.y)}')

### **Do not run next cell (takes time)**
or maybe run it one time for min values

In [None]:
# maxX=[]
# maxY=[]
# maxZ=[]
# for usr in user_ids:
#     usr_sqc = os.listdir(os.path.join(dataset_path,usr))
#     for sqc in usr_sqc:
#         pth = os.path.join(dataset_path,usr,sqc)
#         df = pd.read_parquet(pth, columns=['x', 'y', 'z'])
#         maxX.append(np.max(df.x))
#         maxY.append(np.max(df.y))
#         maxZ.append(np.max(df.z))

# print(f'max x: {np.max(maxX)}\nmax y: {np.max(maxY)}\nmax z: {np.max(maxZ)}')

'''
outputs:

max x: 2.9205052852630615
max y: 3.572496175765991
max z: 4.796591758728027
'''

### **Prepocessing**

In [None]:
# lips idx
LIPS_IDXS0 = np.array([
        61, 185, 40, 39, 37, 0, 267, 269, 270, 409,
        291, 146, 91, 181, 84, 17, 314, 405, 321, 375,
        78, 191, 80, 81, 82, 13, 312, 311, 310, 415,
        95, 88, 178, 87, 14, 317, 402, 318, 324, 308,
    ])

# left hand, by taking account face from 0 to 468
LEFT_HAND_IDXS0 = np.arange(468,489)
RIGHT_HAND_IDXS0 = np.arange(522,543)
LEFT_POSE_IDXS0 = np.array([502, 504, 506, 508, 510])
RIGHT_POSE_IDXS0 = np.array([503, 505, 507, 509, 511])

REDUCED_LANDMARKS = np.sort(np.concatenate([LIPS_IDXS0, LEFT_HAND_IDXS0, RIGHT_HAND_IDXS0, LEFT_POSE_IDXS0, RIGHT_POSE_IDXS0]))
print(REDUCED_LANDMARKS)

**Note** positions kept as wanted

In [None]:
# function to replace NaN and normalize columns 
pq_path = select_random_sequence() # only first sequence of user here
cols = ['frame', 'row_id', 'type', 'landmark_index', 'x', 'y', 'z']
sqc_df = pd.read_parquet(pq_path, columns=cols)

In [None]:
def normalize_sequence(sequence_dataframe):
    '''
        function to normalize coordinates columns (x,y) per frame, also replace NaN values by column mean
        sequence_dataset is a pandas dataframe containing a sequence of an user
    '''



    frame_sqc_idx = sqc_df.frame.unique()
    normalized_df = pd.DataFrame()

    for frame in frame_sqc_idx:
        frame_df = sqc_df[sqc_df.frame == frame]
        frame_df1 = frame_df.copy()
        
        na_x = frame_df['x'].fillna(0.0)
        na_y = frame_df['y'].fillna(0.0)

        x_norm = (na_x-np.min(na_x))/(np.max(na_x)-np.min(na_x))
        y_norm = (na_y-np.min(na_y))/(np.max(na_y)-np.min(na_y))

        frame_df1.x, frame_df1.y = x_norm, y_norm
        normalized_df = pd.concat([normalized_df, frame_df1])
    
    return normalized_df

normalized_df=normalize_sequence(sqc_df)

In [None]:
len(sqc_df), len(normalized_df)

In [None]:
v = load_relevant_data_subset(select_random_sequence())
print(v.shape)

In [None]:
def normalize_loaded_sequence(loaded_sqc):
    '''
        Function to normalize using min-max normalization. 
        Normalization is calculated over all points, but only relevants landmarks points are returned
        This function also replaces NaN by 0
    '''
    normalized_sqc = np.zeros((loaded_sqc.shape[0], len(REDUCED_LANDMARKS), 2))
    
    for frm_idx in range(loaded_sqc.shape[0]):
        frame_array = loaded_sqc[frm_idx]
        
        na_x = np.nan_to_num(frame_array[:,0], nan=0.0)
        na_y = np.nan_to_num(frame_array[:,1], nan=0.0)


        x_norm = (na_x-np.min(na_x))/(np.max(na_x)-np.min(na_x))
        y_norm = (na_y-np.min(na_y))/(np.max(na_y)-np.min(na_y))

        normalized_sqc[frm_idx,:,0],  normalized_sqc[frm_idx,:,1] = x_norm[REDUCED_LANDMARKS], y_norm[REDUCED_LANDMARKS]
    
    return normalized_sqc

n_v = normalize_loaded_sequence(v)
print(n_v.shape)
print(np.max(n_v[0,:,0]), np.min(n_v[0,:,0]))

**Note** at this step I have a normalized tensor built after loading data

In [None]:
def get_data(sqc_path):
    data = load_relevant_data_subset(sqc_path)
    data = normalize_loaded_sequence(data)
    return data

In [None]:
d = get_data(select_random_sequence())
d.shape
# print(vv.shape)

In [None]:
pq_path = select_random_sequence() # only first sequence of user here
cols = ['frame', 'row_id', 'type', 'landmark_index', 'x', 'y', 'z']
sqc_df = pd.read_parquet(pq_path, columns=cols)

vv = get_data(pq_path)

n_df = normalize_sequence(sqc_df)
frame_df0 = n_df[n_df.frame == n_df.frame.unique()[0]]
frame_df1 = n_df[n_df.frame == n_df.frame.unique()[-1]]

X0 = frame_df0.x
Y0= frame_df0.y

X1 = frame_df1.x
Y1= frame_df1.y

plt.figure(figsize=(8,10))
plt.subplot(1,2,1)
plt.scatter(X0,-Y0)
plt.scatter(vv[0,:,0],-vv[0,:,1], s=3, c='r')

plt.subplot(1,2,2)
plt.scatter(X1,-Y1)
plt.scatter(vv[-1,:,0],-vv[-1,:,1], s=3, c='r')

plt.title(pq_path)
plt.show()

#### **Note** 
Normalization using min-max change position of point when using less (but most important) landmarks, is it normal as we used less points.
But movement keep the same

- RNN or LSTM can be a good simple approach for starting (it can be adapted for Time Series)

#### **TODO**
* thing about data augmentation
* try to use coatnet -> need to input data with same shape
* padding ?
    - issue with padding is that we have sequence with much more frames than other, maybe reduce thoses sequences and padding for small sequences
    - goal: have se

In [None]:
train_path = '/kaggle/input/asl-signs/train.csv'
train = pd.read_csv(train_path)
train.head()

In [None]:
len(train)

In [None]:
train.columns

In [None]:
train.sign.unique()

In [None]:
train.participant_id.unique(), len(train.participant_id.unique())

In [None]:
d=dict(train.sign.value_counts(dropna=True))
print(train.sign.value_counts(dropna=True).mean())
print(train.sign.value_counts(dropna=True).std())
print(train.sign.value_counts(dropna=True).max())
print(train.sign.value_counts(dropna=True).min())

# word distribution is not too expended
# any words have close occurences

#### **Some notes:**
* each parquet contains markers position [x y z] and type (face, left_hand, pose, right_hand) for different frame
* train dataset is composed of image path, participant id (folder name of parquet file) sequence id (filename) and word said
* one sequence = numerous frames = 1 word
* every frame has data for each type, but it is possible that one type has no value in a frame, it is setted to NaN

**Goal**: using hand position, be able to understand word said in the sequence
* classification between 250 words using positions of body parts in video

In [None]:
import json
 
# Opening JSON file
f = open('/kaggle/input/asl-signs/sign_to_prediction_index_map.json')
 
# returns JSON object as 
# a dictionary
WORD2IDX = json.load(f)
print(len(WORD2IDX), WORD2IDX)

In [None]:
train_words = train.sign.unique()
print(len(train_words))
# same length as sign to prediction index json

In [None]:
random_word = random.choice(train.sign.unique())
print(f'idx for <{random_word}> is <{WORD2IDX[random_word]}>')

In [None]:
train.path

### Custom Dataset class

In [None]:
all_sqc_path = train.path
print(len(all_sqc_path))

In [None]:
myList = []

for i in range(5):
    sq1 = select_random_sequence()
    word = train[train.path == sq1[24:]].sign.values[0]
    mydata = get_data(sq1)
    print(mydata.shape)
    myList.append((mydata, word))

### **Don't run following cell, it creates *preprocess_dataset***

In [None]:
# Do not run !

# processed_dataset = []
# for idx,path in enumerate(train.path):
#     sequence_path = os.path.join(COMPETITION_PATH, path)
#     word = train[train.path == path].sign.values[0]
#     processed_sqc = get_data(sequence_path)
    
#     processed_dataset.append((processed_sqc, word))
    
#     if idx%200 == 0:
#         print(processed_sqc.shape, word)

In [None]:
# to save dataset
# with open("preprocess_dataset.pkl", "wb") as fp:   #Pickling
#     pickle.dump(processed_dataset, fp)

In [None]:
# to load dataset
with open(PROCESS_DATASET_PATH, "rb") as fp:   # Unpickling
    dataset = pickle.load(fp)

In [None]:
dataset[0][1]

In [None]:
len(dataset)

### **Custom class and Dataloader**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import train_test_split


In [None]:
class ISLR(Dataset):
    def __init__(self, dataset, split):
        self.split = split
        self.dataset = dataset
        
        if split == 'trainval':
            self.islr_dataset = dataset[:int(0.8*len(dataset))]
        elif split =='test':
            self.islr_dataset = dataset[int(0.8*len(dataset)):]
        
    def __len__(self):
        return len(self.islr_dataset)
    
    def __getitem__(self, index):
        sample = self.islr_dataset[index]
        features = torch.FloatTensor(sample[0])
        target = WORD2IDX[sample[1]]
        
        return features, target

In [None]:
testset = ISLR(dataset, split='test')
trainvalset = ISLR(dataset, split='trainval')

In [None]:
trainset, valset = train_test_split(trainvalset,test_size=0.1, random_state=42)

In [None]:
len(trainset),len(valset)

In [None]:
len(dataset)

In [None]:
len(testset)+len(trainset)

In [None]:
# batch =[[torch.tensor([1833, 3205,  467,  342, 4165,   31, 49,  803]), torch.tensor([1])],
#         [torch.tensor([1833, 3205,  467,  342, 49,  803]), torch.tensor([2])],
#         [torch.tensor([1833, 3205,  467,  342, 4165,   31, 49,  803,52,54]), torch.tensor([1])]]
# def custom_collate(batch):
#     padded_batch=[]
#     labels=[]
#     for sentence,label in batch:
#         # print(sentence.tolist())

#         listSentence = sentence.tolist()
#         max_len = max(len(sentence.tolist()) for sentence,label in batch)
#         # print(listSentence)
#         padded_sentence=listSentence+[5001]*(max_len-len(listSentence))
#         # print(max_len)
#         padded_batch.append(padded_sentence)
#         labels.append(label)

#     return torch.tensor(padded_batch), torch.tensor(labels)

In [None]:

def custom_collate_fn(batch):
    padded_batch = []
    labels= []

    max_frame = max(len(sequence) for sequence,_ in batch)
#     print(max_frame)
    for sequence, label in batch:
        padding_array = -np.ones(((max_frame-len(sequence)), len(REDUCED_LANDMARKS), 2))
        padded_sequence = sequence.tolist()+padding_array.tolist()

        padded_batch.append(padded_sequence)
        labels.append(label)


    return torch.tensor(padded_batch), torch.tensor(labels)

In [None]:
train_loader = DataLoader(trainset, batch_size=32, collate_fn=custom_collate_fn, shuffle=True)

In [None]:
val_loader = DataLoader(valset, batch_size=32, collate_fn=custom_collate_fn, shuffle=False)

In [None]:
test_loader = DataLoader(testset, batch_size=32, collate_fn=custom_collate_fn, shuffle=False)

In [None]:
custom_it = enumerate(train_loader)

In [None]:
idx,(sqc,lb)=next(custom_it)
print(sqc.shape, lb)

### **Model architecture**

### Transformer

In [None]:
# class SignLanguageModel(nn.Module):
#     def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
#         super(SignLanguageModel, self).__init__()
#         self.num_layers = num_layers
#         self.hidden_dim = hidden_dim
#         self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
#         self.fc = nn.Linear(hidden_dim, output_dim)

#     def forward(self, x):
#         h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
#         c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
#         out, _ = self.lstm(x, (h0, c0))
#         out = self.fc(out[:, -1, :])
#         return out

In [None]:
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, hidden_dim, output_dim, n_landmarks, max_seq_length=1000):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim*n_landmarks, hidden_dim) # change encoding 
        self.layer_norm1 = nn.LayerNorm(hidden_dim)
        self.positional_encoding = self.positional_encoding = self.create_positional_encoding(max_seq_length, hidden_dim)
        encoder_layers = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=num_heads)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
        self.fc = nn.Linear(hidden_dim, output_dim)
        # activation softmax
        
        
    def forward(self, x):
        
        batch_size, n_frames, n_landmarks, input_dim = x.shape
        pad_mask = self.sequence_mask(x)
        pad_mask = pad_mask.to(device)
        
        
        # Flatten n_landmarks and input_dim for embedding
        x = x.view(batch_size, n_frames, -1)
        x = x.to(device)
        x = self.embedding(x)
        
        x = self.layer_norm1(x)
        x += self.positional_encoding[:, :n_frames, :].to(device)
        x = x.permute(1, 0, 2)  # Transformer expects sequence length first
                
        transformer_out = self.transformer_encoder(x,src_key_padding_mask=pad_mask)
        out = self.fc(transformer_out[-1, :, :])
        assert not torch.isnan(out).any(), "NaN in final output"
        
        
        return out
    
    def create_positional_encoding(self, max_seq_length, hidden_dim):
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * (-torch.log(torch.tensor(10000.0)) / hidden_dim))
        
        positional_encoding = torch.zeros(max_seq_length, hidden_dim)
        positional_encoding[:, 0::2] = torch.sin(position * div_term)
        positional_encoding[:, 1::2] = torch.cos(position * div_term)
        
        return positional_encoding.unsqueeze(0)
    
    def sequence_mask(self, sequence):
        lengths = [self.valid_len(padded_sequence) for padded_sequence in sequence]
        
        mask = torch.zeros(sequence.size()[:2], dtype=torch.bool)  # shape: [batch_size, n_frames]
        for i, length in enumerate(lengths):
            mask[i, :length] = 1
        
        mask = ~mask # True values are ignored
        return mask

        
    def valid_len(self, padded_sequence):
        for idx, frame in  enumerate(padded_sequence):
            if -1 in frame:
                break

        return idx+1

In [None]:
# Exemple d'utilisation
input_dim = 2  # (x, y)
num_heads = 4 # attention head == later
num_layers = 2 # 
hidden_dim = 64
output_dim = 250  # nombre de mots
n_landmarks = 92

# vision transformer = find similar project
# try CNN 
# point Net
# LSTM + conv 

model = TransformerModel(input_dim=input_dim,
                         num_heads=num_heads,
                         num_layers=num_layers,
                         hidden_dim=hidden_dim,
                         output_dim=output_dim,
                         n_landmarks=n_landmarks)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = model.to(device)
print(model)

In [None]:
out  = model(sqc)
# [torch.argmax(vi, ) for vi in v]

In [None]:
k_pred, idx = torch.topk(out , 5, dim=1)
np.array([l in id for l in lb for id in idx]).sum()

In [None]:
def valid_len(padded_sequence):
    for idx, frame in  enumerate(padded_sequence):
        if -1 in frame:
            break
    
    return idx+1

In [None]:
sqc.shape

In [None]:
lengths = [valid_len(padded_sequence) for padded_sequence in sqc]
lengths

In [None]:
mask = torch.zeros(sqc.size()[:2], dtype=torch.bool)  # shape: [batch_size, n_frames, 1]
print(mask.shape)
for i, length in enumerate(lengths):
    mask[i, :length] = 1

In [None]:
mask[1]

In [None]:
# mask = mask.sum(dim=1) == 0
mask = ~mask
mask[1]

In [None]:
mask.shape

### 1D CNN

In [None]:
class SignLanguageCNN1D(nn.Module):
    def __init__(self, n_landmarks, input_dim, num_classes):
        super(SignLanguageCNN1D, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2, padding=0)
        self.adaptive_pool = nn.AdaptiveAvgPool1d(1)
        self.fc1 = nn.Linear(256 * n_landmarks, 512)  # Note: Adjusted for dynamic sequence length
        self.fc2 = nn.Linear(512, num_classes)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        batch_size, n_frames, n_landmarks, input_dim = x.size()
        x = x.permute(0, 3, 1, 2)  # Change from [batch_size, n_frames, n_landmarks, input_dim] to [batch_size, input_dim, n_frames, n_landmarks]
        x = x.reshape(batch_size * n_landmarks, input_dim, n_frames)  # Combine batch and landmarks for 1D conv
        x = self.pool(F.relu(self.conv1(x)))  # Output shape: [batch_size * n_landmarks, 64, n_frames//2]
        x = self.pool(F.relu(self.conv2(x)))  # Output shape: [batch_size * n_landmarks, 128, n_frames//4]
        x = self.pool(F.relu(self.conv3(x)))  # Output shape: [batch_size * n_landmarks, 256, n_frames//8]
        x = self.adaptive_pool(x)  # Output shape: [batch_size * n_landmarks, 256, 1]
        x = x.view(batch_size, n_landmarks, -1)  # Reshape back to [batch_size, n_landmarks, feature_dim]
#         print(x.shape)
        x = x.view(batch_size, -1)  # Flatten to [batch_size, n_landmarks * feature_dim]
#         print(x.shape)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)
        

In [None]:
n_landmarks = dataset[0][0].shape[1]
input_dim = dataset[0][0].shape[2]
num_classes = 250

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
model = SignLanguageCNN1D(n_landmarks, input_dim, num_classes).to(device)

In [None]:
out = model(sqc)

In [None]:
# torch.max(out, 1)

### **Training Phase**

In [None]:
loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [None]:
num_epochs = 5

dataloader = train_loader

for epoch in range(num_epochs):

    print(f'Epoch {epoch}/{num_epochs - 1}')
    print('-' * 10)
    
    model.train()
    running_loss = 0.0

    for sequence, label in dataloader:
        sequence, label = sequence.to(device), label.to(device)
        optimizer.zero_grad()

        target = label
        
        outputs = model(sequence)

#         predictions = torch.argmax(outputs, dim=1) # get index of max word

        # Compute the loss, gradients, and update optimizer
        loss = loss_function(outputs, target)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
#         running_corrects += torch.sum(predictions == label)

#     exp_lr_scheduler.step()

    epoch_loss = running_loss / len(dataloader)
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {epoch_loss:.4f}")
    
    # Validation step
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for sequence, label in val_loader:
            sequence, label = sequence.to(device), label.to(device)
            outputs = model(sequence)
            loss = loss_function(outputs, label)
            val_loss += loss.item() * sequence.size(0)
            
#           use top 5 pred maybe
            _, predicted = torch.topk(outputs, 5, dim=1)

#             _, predicted = torch.max(outputs, 1)
            total += label.size(0)
            correct += np.array([lab in pred for lab in label for pred in predicted]).sum()            
#             correct += (predicted == label).sum().item()
            
    val_loss /= len(val_loader.dataset)
    val_acc = correct /total
    print(f'val loss: {val_loss:.4f} Acc: {val_acc:.4f}')
    
    torch.save(model.state_dict(), f'pointnet_transformer_model_{epoch+1}.pth')

In [None]:
'''
Epoch 0/4
----------
Epoch 1/5, Training Loss: 5.5489
val loss: 5.5252 Acc: 0.6634
Epoch 1/4
----------
Epoch 2/5, Training Loss: 5.5313
val loss: 5.5213 Acc: 0.6592
Epoch 2/4
----------
Epoch 3/5, Training Loss: 5.4685
val loss: 5.3126 Acc: 0.7019
Epoch 3/4
----------
Epoch 4/5, Training Loss: 5.0170
val loss: 4.6766 Acc: 0.8116
Epoch 4/4
----------
Epoch 5/5, Training Loss: 4.4992
val loss: 4.2321 Acc: 0.9436
'''

In [None]:
'''
Epoch 0/4
----------
Epoch 1/5, Training Loss: 5.6811
val loss: 5.6895 Acc: 0.0030
Epoch 1/4
----------
Epoch 2/5, Training Loss: 5.6814
val loss: 5.6895 Acc: 0.0030
Epoch 2/4
----------
'''

In [None]:
_, p = torch.topk(outputs, 5, dim=1)

#### **Analysis Ideas**

* class embalencement (count words for each element in train dataset)
* size analysis (lenght of sequence, linked to words ?)
* position ranges (x y z)
* number of sequence per participant 
* train dataset will be splitted for train test val