In [1]:
import os
import pandas as pd
import numpy as np

In [2]:
class tvb_hksl_split_parser():
    def __init__(self, file: str):
        self.file = file
        self.train_info = pd.read_csv(self.file, delimiter="|") 
        # extend the dataframe with extracted information
        self.train_info["glosses_tokenized"] = self.train_info["glosses"].str.split(' ')
        # self.train_info["date"] = self.train_info["id"].str.split('/').apply(lambda x: x[0])
        self.train_info["frames"] = self.train_info["id"].str.split('/').apply(lambda x: x[1])
        self.train_info["length"] = self.train_info["frames"].str.split('-').apply(lambda x: int(x[1]) - int(x[0]) + 1)
        # add <START> and <END> tokens to the glosses
        # self.train_info["glosses_tokenized"] = self.train_info["glosses_tokenized"].apply(lambda x: ["<START>"] + x + ["<END>"])
        # self.train_info["glosses_length"] = self.train_info["glosses_tokenized"].apply(lambda x: len(x))
        

    def get_train_id(self) -> pd.Series:
        if os.name == "nt": # for windows system only
            return self.train_info["id"].str.replace("/", "\\")
        return self.train_info["id"]

    def get_train_glosses_tokenized(self) -> pd.Series:
        return self.train_info["glosses_tokenized"]

    def get_max_length(self) -> int:
        return self.train_info["length"].max()

    # def get_max_glosses_length(self) -> int:
    #     return self.train_info["glosses_length"].max()

    # def pad_train_glosses_tokenized(self, max_length: int) -> pd.Series:
    #     self.train_info["glosses_tokenized"] = self.train_info["glosses_tokenized"].apply(lambda x: x + ["<END>"] * (max_length - len(x)))
    #     self.train_info["glosses_length"] = self.train_info["glosses_tokenized"].apply(lambda x: len(x))
    #     return self.train_info["glosses_tokenized"]
    
    # def get_word_dict(self) -> dict:
    #     word_dict = {}
    #     for tokens in self.train_info["glosses_tokenized"]:
    #         for token in tokens:
    #             if token not in word_dict:
    #                 word_dict[token] = len(word_dict)
    #     return word_dict

In [3]:
train_parser = tvb_hksl_split_parser(r"F:\dataset\tvb-hksl-news\split\train.csv")
test_parser = tvb_hksl_split_parser(r"F:\dataset\tvb-hksl-news\split\test.csv")
dev_parser = tvb_hksl_split_parser(r"F:\dataset\tvb-hksl-news\split\dev.csv")

# make a word dictionary
word_dict = {}
word_dict["<END>"] = len(word_dict)
word_dict["<START>"] = len(word_dict)
word_dict["<X>"] = len(word_dict)
word_dict["<BAD>"] = len(word_dict)
word_dict["<MUMBLE>"] = len(word_dict)
word_dict["<STOP>"] = len(word_dict)
# word_dict["<UNK>"] = len(word_dict)

for parser in [train_parser, test_parser, dev_parser]:
    for glosses in parser.get_train_glosses_tokenized():
        for word in glosses:
            if word not in word_dict:
                word_dict[word] = len(word_dict)

reverse_word_dict = {v: k for k, v in word_dict.items()}

In [4]:
keypoint_directory = r"F:\dataset\tvb-hksl-news\keypoints_mediapipe"

In [5]:
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.functional import log_softmax
from torch.nn import CTCLoss


# Define the dataset class
class SignLanguageDataset(Dataset):
    def __init__(self, parser:tvb_hksl_split_parser, keypoint_directory, word_dict):
        self.parser = parser
        self.keypoint_directory = keypoint_directory
        self.word_dict = word_dict
        self.ids = self.parser.get_train_id()
        self.glosses = self.parser.get_train_glosses_tokenized()

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, idx):
        keypoint_path = os.path.join(self.keypoint_directory, self.ids.iloc[idx] + ".npy")
        keypoints = np.load(keypoint_path)
        glosses = [self.word_dict[word] for word in self.glosses.iloc[idx]]
        return torch.tensor(keypoints, dtype=torch.float32), torch.tensor(glosses, dtype=torch.long)

In [6]:
train_dataset = SignLanguageDataset(train_parser, keypoint_directory, word_dict)
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=lambda x: x)

In [7]:
# Isolated

# class SignLanguageModel(nn.Module):
#     def __init__(self, input_dim, hidden_dim, output_dim):
#         super(SignLanguageModel, self).__init__()
#         self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=2, batch_first=True, bidirectional=True)
#         self.fc = nn.Linear(hidden_dim * 2, output_dim)

#     def forward(self, x):
#         x, _ = self.lstm(x)
#         x = self.fc(x)
#         return log_softmax(x, dim=2)

In [8]:
# Continuous

class SignLanguageModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(SignLanguageModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=1, batch_first=True, bidirectional=True)
        self.fc1 = nn.Linear(hidden_dim * 2, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x, _ = self.lstm(x)
        x = self.fc1(x)
        x = self.fc2(x)
        return log_softmax(x, dim=2)

In [9]:
# Initialize the model, loss function, and optimizer
input_dim = 1662  # Number of keypoint features
hidden_dim = 256
output_dim = len(word_dict)
model = SignLanguageModel(input_dim, hidden_dim, output_dim)
criterion = CTCLoss(blank=word_dict["<END>"])
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [10]:
# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for batch in train_loader:
        keypoints, glosses = zip(*batch)
        keypoints = torch.nn.utils.rnn.pad_sequence(keypoints, batch_first=True)
        glosses = torch.nn.utils.rnn.pad_sequence(glosses, batch_first=True, padding_value=word_dict["<END>"])

        optimizer.zero_grad()
        outputs = model(keypoints)
        input_lengths = torch.full(size=(outputs.size(0),), fill_value=outputs.size(1), dtype=torch.long)
        target_lengths = torch.tensor([len(g) for g in glosses], dtype=torch.long)
        loss = criterion(outputs.permute(1, 0, 2), glosses, input_lengths, target_lengths)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss / len(train_loader)}")

Epoch 1/10, Loss: 3.3066746407469187
Epoch 2/10, Loss: 2.822948678093062
Epoch 3/10, Loss: 2.7538540626025916
Epoch 4/10, Loss: 2.7410398635251747
Epoch 5/10, Loss: 2.702391983633615
Epoch 6/10, Loss: 2.6910918984331187
Epoch 7/10, Loss: 2.677604852753084
Epoch 8/10, Loss: 2.6742056435588095
Epoch 9/10, Loss: 2.676973118801802
Epoch 10/10, Loss: 2.6865136101613243


In [11]:
# Save the model
torch.save(model.state_dict(), "ctc_model_continuous.pth")

In [12]:
# Practical test
model.eval()

def predict_and_split(keypoints):
    # predict the glosses and splitting index of each gloss in keypoints
    keypoints = torch.tensor(keypoints, dtype=torch.float32).unsqueeze(0)
    outputs = model(keypoints)
    outputs = outputs.squeeze(0).detach().numpy()
    glosses = []
    split_indices = []
    current_gloss = []
    for i, output in enumerate(outputs):
        if output.argmax() == word_dict["<END>"]:
            glosses.append(current_gloss)
            split_indices.append(i)
            current_gloss = []
        else:
            current_gloss.append(reverse_word_dict[output.argmax()])
    return glosses, split_indices

[['香港'], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [],

In [None]:
# Testing sample: 2020-07-21/017386-017665
# raw: D S E 文 考試 明天 說 這 四 十 百分比 入 香港 本地 全部 香港 大 七 <BAD> 考試 五 部分 五 <BAD> 星 星
# original: 文憑試明日放榜 超過四成日校考生 考獲入讀本地大學的成績 七人考到七科5
# interpreted: D+S+E+文+考試(=香港中學文憑考試) 明天 公佈 這 四十+百分比(=四成) 入 香港 本地 全部 香港 大 七 BAD-SEGMENT 考試 五 部份 五 BAD-SEGMENT 星 星

test_keypoints = np.load(r"F:\dataset\tvb-hksl-news\keypoints_mediapipe\2020-07-21\017386-017665.npy")
test_gloss, test_split = predict_and_split(test_keypoints)
print(test_gloss)
print(test_split)