# Imports

In [1]:
import pandas as pd
from json import load
from os import listdir

import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter

# Data Cleaning

In [13]:
def columns_search(df, keyword):
    if type(keyword) == str: return df.loc[:, ([c for c in df.columns if keyword in c])]
    elif type(keyword) == list: return df.loc[:, ([c for c in df.columns if any([k in c for k in keyword])])]

In [14]:
def one_hot(T, nb_class, batch_first=True):
    assert len(T.unique()) <= nb_class, "nb_class should be higher then number of unique element in tensor T"
    T_dtype = T.dtype
    if not batch_first: T = T.unsqueeze(0)
    out = []
    for batch in T:
        out.append(torch.stack([torch.where(batch == uniq, 1, 0) for uniq in range(nb_class)]).T)
    out = torch.stack(out)
    if not batch_first: out = out[0]
    return out.to(T_dtype)

In [15]:
class CTP():
    
    def __init__(self):
        super(CTP, self).__init__()
        
        with open ("./data/character_to_prediction_index.json", "r") as f: self.character_map = load(f)
        self.rev_character_map = {j:i for i,j in self.character_map.items()}
        
    def string_to_list_pred(self, string):
        return [self.char_to_pred(char) for char in string]
        
    def list_pred_to_string(self, list_pred):
        return "".join([self.pred_to_char(p) for p in list_pred])
        
    def char_to_pred(self, char):
        return self.character_map[char]
    
    def pred_to_char(self, pred):
        return self.rev_character_map[pred]

In [16]:
class DataCleaner(CTP):

    def __init__(
        self,
        nans_management = "zero",
    ):
        super(DataCleaner, self).__init__()
        
        self.nans_management_map = {
            "clear": self.clear_nans,
            "zero": self.zero_nans,
        }
        self.nans_management = nans_management
    
        with open("./data/dataset_infos.json", "r") as f: self.dataset_infos = load(f)
        
        self.max_char_length = len(self.character_map)
        self.space_char_tensor = torch.zeros(self.max_char_length)
        self.space_char_tensor[0] = 1
    
    def __call__(self, df, y):
        
        dominant_hand, n_dominant_value = self.find_dominant_hand(df)
        df = columns_search(df, [dominant_hand])
        df = self.nans_management_map[self.nans_management](df)
        x = torch.FloatTensor(df.values)
        x = self.pad_x_tensor(x)

        y = "S" + y + "E"
        y = self.pad_y_string(y)
        y = torch.FloatTensor(self.string_to_list_pred(y))
        
        return x, y
        
    def find_dominant_hand(self, df):
        right_hand_value = columns_search(df, "right_hand").notna().sum().sum()
        left_hand_value = columns_search(df, "left_hand").notna().sum().sum()
        
        return ("right_hand", right_hand_value) if right_hand_value >= left_hand_value else ("left_hand", left_hand_value)
    
    def clear_nans(self, df):
        return df[df.notna().all(1)]
    
    def zero_nans(self, df):
        return df.fillna(0)
    
    def pad_x_tensor(self, x):
        pad_length = self.dataset_infos["max_frame_nans"] - x.shape[0]
        return F.pad(x, (0, 0, 0, pad_length), "constant", 0)
    
    def pad_y_string(self, y):
        pad_length = self.dataset_infos["max_length_phrase"] + 2 - len(y)
        return y + "".join(["P" for _ in range(pad_length)])

# Dataset

In [17]:
class DS(Dataset):
    
    def __init__(
        self,
        data_path = "./data/splited_data/",
    ):
        super(DS, self).__init__()
        
        self.data_path = data_path
        
        self.DC = DataCleaner()
        
        self.labels = pd.read_csv("./data/train.csv")[["sequence_id", "phrase"]]
        self.labels_length = self.labels.shape[0]
        
        column_data_example = pd.read_parquet(f"{data_path}{self.labels.loc[0, 'sequence_id']}.parquet").columns
        self.usefull_columns = [c for c in column_data_example if any([k in c for k in ["hand"]])]
        
    def __getitem__(self, index):
        print(index, end="\r")
        sequence_id, y = self.labels.loc[index].to_list()
        data = pd.read_parquet(f"{self.data_path}{sequence_id}.parquet", columns=self.usefull_columns)

        return self.DC(data, y)
        
    def __len__(self):
        return self.labels_length

In [18]:
class DSProcessed(Dataset):
    
    def __init__(
        self,
        data_path = "./data/processed_data/",
    ):
        super(DSProcessed, self).__init__()
        self.data_path = data_path
        self.files = listdir(data_path)
        
    def __getitem__(self, index):
        file_name = self.files[index]
        with open(self.data_path + file_name, "rb") as f: return torch.load(f)
        
    def __len__(self):
        return len(self.files)

# Model

In [19]:
class Model(nn.Module):
    
    def __init__(
        self,
        x_shape,
        y_shape,
    ):
        super(Model, self).__init__()

        self.x_shape = x_shape
        self.y_shape = y_shape

        #Embedding
        
        self.l1 = nn.Linear(self.x_shape[2], 256, False)
        self.l2 = nn.Linear(256, 256, False)
        
        self.pe1 = nn.Parameter(torch.zeros((self.x_shape[1], 256)))
        
        # Encoder
        self.attn1 = nn.MultiheadAttention(256, 8, batch_first=True)
        self.ln1 = nn.LayerNorm(256)
        
        self.l4 = nn.Linear(256, 256)
        self.l3 = nn.Linear(256, 256)
        self.ln2 = nn.LayerNorm(256)
        
        self.attn2 = nn.MultiheadAttention(256, 8, batch_first=True)
        self.ln3 = nn.LayerNorm(256)
        
        self.l5 = nn.Linear(256, 256)
        self.l6 = nn.Linear(256, 256)
        self.ln4 = nn.LayerNorm(256)
        
        # Phrase Embedding
        
        self.emb1 = nn.Embedding(62, 256)
        
        # Decoder
        self.pe2 = nn.Parameter(torch.zeros((self.x_shape[1], 256)))
        
        self.attn3 = nn.MultiheadAttention(256, 8, batch_first=True)
        self.ln5 = nn.LayerNorm(256)
        
        self.attn4 = nn.MultiheadAttention(256, 8, batch_first=True)
        self.ln6 = nn.LayerNorm(256)
        
        self.l7 = nn.Linear(256, 256)
        self.l8 = nn.Linear(256, 256)
        self.ln7 = nn.LayerNorm(256)
        
        self.attn5 = nn.MultiheadAttention(256, 8, batch_first=True)
        self.ln8 = nn.LayerNorm(256)
        
        self.l9 = nn.Linear(256, 256)
        self.l10 = nn.Linear(256, 256)
        self.ln9 = nn.LayerNorm(256)
        
        # Classifier
        self.l11 = nn.Linear(256, self.y_shape[1]+1)
        
    def forward(self, x, y):
        
        #Embedding
        
        attn_mask = x.clone().sum(2)
        attn_mask = torch.where(attn_mask == 0, 0., 1.)
        attn_mask = attn_mask.unsqueeze(2).repeat(1,1,self.x_shape[1])
        attn_mask = attn_mask.repeat(8,1,1)
        
        x = self.l1(x)
        x = F.gelu(x)
        x = self.l2(x)
        
        x = x + self.pe1
        
        # Encoder
        _x = x
        x = self.attn1(x, x, x, attn_mask=attn_mask)[0]
        x = self.ln1(x + _x)
        
        _x = x
        x = self.l3(x)
        x = F.gelu(x)
        x = self.l4(x)
        x = self.ln2(x + _x)
        
        _x = x
        x = self.attn2(x, x, x, attn_mask=attn_mask)[0]
        x = self.ln3(x + _x)
        
        _x = x
        x = self.l5(x)
        x = F.gelu(x)
        x = self.l6(x)
        enc_out = self.ln4(x + _x)
        
        # Decoder
        y = y.to(torch.int64)
        y = F.pad(y, (0,self.x_shape[1] - y.shape[1],0,0), "constant", 61)
        y = self.emb1(y)
        
        x = y + self.pe2
        
        causal_mask = torch.tril((attn_mask.abs() + 1).bool().float())
        
        _x = x
        x = self.attn3(x, x, x, attn_mask=causal_mask, is_causal=True)[0]
        x = self.ln5(x + _x)
        
        ###
        
        _x = x
        x = self.attn4(x, enc_out, enc_out, attn_mask=causal_mask, is_causal=True)[0]
        x = self.ln6(x + _x)
        
        _x = x
        x = self.l7(x)
        x = F.gelu(x)
        x = self.l8(x)
        x = self.ln7(x + _x)
        
        _x = x
        x = self.attn5(x, enc_out, enc_out, attn_mask=causal_mask, is_causal=True)[0]
        x = self.ln8(x + _x)
        
        _x = x
        x = self.l9(x)
        x = F.gelu(x)
        x = self.l10(x)
        x = self.ln9(x + _x)
        
        x = x[:,:self.y_shape[2]-1,:]
        # Classifier
        x = self.l11(x)
        x = F.softmax(x, 1)
        
        return x

In [20]:
class Accuracy():
    
    def __init__(self):
        ...
    
    def __call__(self, pred, target):
        pred = pred.argmax(2)
        target = target.argmax(2)
        
        acc = []
        for p, t in zip(pred, target):
            t = t[torch.where((t == 61) | (t == 60), 0, 1).bool()]
            p = p[:len(t)]
            acc.append((p == t).float().mean())
        return sum(acc)/len(acc)
        

# Config

In [21]:
BATCH_SIZE = 16
EPOCH = 30
LR = 1e-3
DEVICE = "cuda"

In [22]:
train_ds = DSProcessed()
train_dl = DataLoader(train_ds, BATCH_SIZE, True, num_workers=1)
x, y = next(iter(train_dl))
loss_fc = nn.CrossEntropyLoss()
accuracy_fc = Accuracy()
writer = SummaryWriter("runs/v1")

In [23]:
x.shape, y.shape

(torch.Size([16, 807, 63]), torch.Size([16, 45]))

In [24]:
model = Model(x.shape, (y.shape[0], 61, y.shape[1])).to(DEVICE)
optim = torch.optim.Adam(model.parameters(), LR)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optim, "min", 0.9, 200, 1e-4)

In [25]:
f"{sum([param[1].numel() for param in model.named_parameters()]):_}"

'2_373_438'

In [26]:
def train(
    dl,
    model,
    loss_fc,
    accuracy_fc,
    optim,
    device,
    epoch,
    writer,
    scheduler,
):
    size = len(dl)
    model.train()
    for batch, (x, y) in enumerate(dl):
        x, y = x.to(device), y.to(device)
        target = one_hot(y[:,1:], 62)

        pred = model(x, y)
        loss = loss_fc(pred, target)

        loss.backward()
        optim.step()
        optim.zero_grad()

        loss = loss.item()
        accuracy = accuracy_fc(pred, target)
        lr = optim.param_groups[0]['lr']
        
        scheduler.step(loss)
        
        writer.add_scalar("loss", loss, (epoch + (batch/size))*1e5)
        writer.add_scalar("accuracy", accuracy, (epoch + (batch/size))*1e5)
        writer.add_scalar("learnig_rate", lr, (epoch + (batch/size))*1e5)
        print(f"    [{batch+1:>5d}/{size:>5d}] | loss: {loss:>7f} | accuracy: {accuracy:.2%} | lr: {lr}{' '*20}", end="\r")

# Training

In [27]:
%%script false --no-raise-error
with torch.autograd.set_detect_anomaly(True):
    for e in range(EPOCH):
        print(f"\n{'-'*10} epoch: {e+1} {'-'*10}")
        train(train_dl, model, loss_fc, accuracy_fc, optim, DEVICE, e, writer, scheduler)

# Testing

In [28]:
iter_dl = iter(train_dl)

In [29]:
x, y = next(iter_dl)
print(CTP().list_pred_to_string(y[0].int().tolist())[1:])
print(CTP().list_pred_to_string(model(x.to(DEVICE), y.to(DEVICE)).to("cpu").detach().argmax(2)[0].tolist()))

4032 spidel roadEPPPPPPPPPPPPPPPPPPPPPPPPPPP
_s~ug5x.~d9.5q.udcPPPPPPPPPPPPPPPPPPPPPPPPPP


In [30]:
#assert False

# Save / Load Model

In [31]:
%%script false --no-raise-error
with open("./models/va1.torch", "wb") as f: torch.save(model.state_dict(), f)

In [32]:
with open("./models/v1.torch", "rb") as f: model.load_state_dict(torch.load(f))

# To TFLite

In [33]:
import onnx
from onnx_tf.backend import prepare

In [34]:
model = model.to("cpu").eval()
x, y = x.to("cpu"), y.to("cpu")

In [50]:
torch.onnx.export(model, {"x": x, "y": y}, "./models/v1.onnx", opset_version=14, input_names=["x", "y"], output_names=["output"])

verbose: False, log level: Level.ERROR



In [51]:
onnx_model = onnx.load("./models/v1.onnx")
onnx.checker.check_model(onnx_model)

In [52]:
prepare(onnx_model).export_graph("./models/v1.tf")

BackendIsNotSupposedToImplementIt: in user code:

    File "/home/guy/Desktop/Google-American-Sign-Language-Fingerspelling-Recognition/venv/lib/python3.10/site-packages/onnx_tf/backend_tf_module.py", line 99, in __call__  *
        output_ops = self.backend._onnx_node_to_tensorflow_op(onnx_node,
    File "/home/guy/Desktop/Google-American-Sign-Language-Fingerspelling-Recognition/venv/lib/python3.10/site-packages/onnx_tf/backend.py", line 347, in _onnx_node_to_tensorflow_op  *
        return handler.handle(node, tensor_dict=tensor_dict, strict=strict)
    File "/home/guy/Desktop/Google-American-Sign-Language-Fingerspelling-Recognition/venv/lib/python3.10/site-packages/onnx_tf/handlers/handler.py", line 61, in handle  *
        raise BackendIsNotSupposedToImplementIt("{} version {} is not implemented.".format(node.op_type, cls.SINCE_VERSION))

    BackendIsNotSupposedToImplementIt: Unsqueeze version 13 is not implemented.
