In [1]:
import mediapipe as mp
import cv2

from tokenizers import Tokenizer
from tokenizers.models import Unigram
from tokenizers.trainers import UnigramTrainer
from tokenizers.processors import TemplateProcessing

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset

from transformers import T5ForConditionalGeneration
from transformers import PreTrainedTokenizerFast
from transformers import Trainer, TrainingArguments

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.animation as animation

import os
import glob

2024-07-08 23:52:42.964387: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-07-08 23:52:43.071374: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
MAX_FRAMES = 512
MAX_TOKENS = 512

In [3]:
device = "cuda" if torch.cuda.is_available() else "cpu"

# landmark functions

In [4]:
mp_holistic = mp.solutions.holistic
















In [5]:
# requirements
# face
# - lips 
# [0, 267, 269, 270, 409, 291, 375, 321, 405, 314, 17, 84, 181, 91, 146, 61, 185, 40, 39, 37] outside
# [13, 312, 311, 310, 415, 308, 324, 318, 402, 317, 14, 87, 178, 88, 95, 78, 191, 80, 81, 82] inside
# - left eye 
# [386, 387, 388, 466, 263, 249, 390, 373, 374, 380, 381, 382, 362, 398, 384, 385]
# - right eye
# [159, 160, 161, 246, 33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158]
# - left eyebrow
# [336, 296, 334, 293, 300, 285, 295, 282, 283, 276]
# - right eyebrow
# [107, 66, 105, 63, 70, 55, 65, 52, 53, 46]
# - face outline
# [10, 338, 297, 332, 284, 251, 389, 356, 454, 323, 361, 288, 397, 365, 379, 378, 400, 377, 152, 148, 176, 149, 150, 136, 172, 58, 132, 93, 234, 127, 162, 21, 54, 103, 67, 109]

# pose
# [0, 8, 7]
# [11, 13, 15]
# [12, 14, 16]
# [23, 24]
# - nose
# - left ear
# - right ear
# - left shoulder
# - right shoulder
# - left elbow
# - right elbow
# - left wrist
# - right wrist
# - left hip
# - right hip

# hands
# (all)



INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


In [6]:
len([13, 312, 311, 310, 415, 308, 324, 318, 402, 317, 14, 87, 178, 88, 95, 78, 191, 80, 81, 82])

20

In [7]:
161 * 3

483

In [8]:
lips_outside = [0, 267, 269, 270, 409, 291, 375, 321, 405, 314, 17, 84, 181, 91, 146, 61, 185, 40, 39, 37]
lips_inside = [13, 312, 311, 310, 415, 308, 324, 318, 402, 317, 14, 87, 178, 88, 95, 78, 191, 80, 81, 82]
left_eye = [386, 387, 388, 466, 263, 249, 390, 373, 374, 380, 381, 382, 362, 398, 384, 385]
right_eye = [159, 160, 161, 246, 33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158]
left_eyebrow = [336, 296, 334, 293, 300, 285, 295, 282, 283, 276]
right_eyebrow = [107, 66, 105, 63, 70, 55, 65, 52, 53, 46]
face_outline = [10, 338, 297, 332, 284, 251, 389, 356, 454, 323, 361, 288, 397, 365, 379, 378, 400, 377, 152, 148, 176, 149, 150, 136, 172, 58, 132, 93, 234, 127, 162, 21, 54, 103, 67, 109]

only_face = []
only_face.extend(lips_inside)
only_face.extend(left_eye)
only_face.extend(right_eye)
only_face.extend(left_eyebrow)
only_face.extend(right_eyebrow)
only_face.extend(face_outline)

In [9]:
only_pose = [0, 8, 7, 11, 13, 15, 12, 14, 16, 23, 24]

In [10]:
len(only_face)

108

In [11]:
len(only_pose)

11

In [72]:
def resize_and_pad(landmarks, original_width, original_height, target_width=854, target_height=480):
    """
    Resize landmarks to fit the height of the target size, center them,
    and add padding to reach the target width.
    """
    # Calculate scaling factor based on height
    scale = target_height / original_height
    
    # Calculate new width after scaling
    new_width = int(original_width * scale)
    
    # Resize landmarks
    landmarks_resized = landmarks.copy()
    landmarks_resized[0::3] = landmarks_resized[0::3] * scale  # x coordinates
    landmarks_resized[1::3] = landmarks_resized[1::3] * scale  # y coordinates
    
    # Calculate padding
    pad_left = (target_width - new_width) // 2
    
    # Add padding to x coordinates
    landmarks_resized[0::3] += pad_left
    
    return landmarks_resized

In [77]:
def extract_keypoints(video_path, mp_holistic=mp_holistic, target_width=854, target_height=480):
    cap = cv2.VideoCapture(video_path)
    frames = []

    holistic = mp_holistic.Holistic(static_image_mode=False, min_detection_confidence=0.5)
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            
        results = holistic.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        #print(results.face_landmarks)
        if results.pose_landmarks is None:
            print("No pose landmarks")
        if results.face_landmarks is None:
            print("No face landmarks")
        if results.left_hand_landmarks is None:
            print("No left hand landmarks")
        if results.right_hand_landmarks is None:
            print("No right hand landmarks")
        
            
        # Extract keypoints (customize based on your needs)
        pose = np.array([[res.x, res.y, res.z] for res in results.pose_landmarks.landmark])[only_pose].flatten() if results.pose_landmarks else np.zeros(11*4)
        face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark])[only_face].flatten() if results.face_landmarks else np.zeros(108*3)
        lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
        rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)

        frame_landmarks = np.concatenate([
            pose, 
            face,
            lh, 
            rh])

        # Denormalize landmarks
        frame_landmarks[0::3] *= original_width  # x coordinates
        frame_landmarks[1::3] *= original_height  # y coordinates

        frame_landmarks = resize_and_pad(frame_landmarks, original_width, original_height, target_width, target_height)

        # Normalize landmarks to the target size
        frame_landmarks[0::3] /= target_width  # x coordinates
        frame_landmarks[1::3] /= target_height  # y coordinates
            
        frames.append(frame_landmarks)
    
    cap.release()
    return np.array(frames)

In [13]:
def interpolate_keypoints(keypoints1, keypoints2, num_frames):
    return np.array([keypoints1 + (keypoints2 - keypoints1) * i / (num_frames - 1) for i in range(num_frames)])

In [14]:
def concat_keypoints(videos_keypoints):
    new_keypoints = videos_keypoints[0]
    for i in range(1, len(videos_keypoints)):
        new_keypoints = np.concatenate([new_keypoints, interpolate_keypoints(videos_keypoints[i-1][-1], videos_keypoints[i][0], 10), videos_keypoints[i]])

    return new_keypoints

In [15]:
# convert keypoints back to xyz
def convert_to_xyz(keypoints):
    return keypoints.reshape((keypoints.shape[0], -1, 3))

In [102]:
def random_rotation(landmarks, max_angle=5):
    angle = np.random.uniform(-max_angle, max_angle)
    rad = np.radians(angle)
    c, s = np.cos(rad), np.sin(rad)
    rotation_matrix = np.array([[c, -s, 0], [s, c, 0], [0, 0, 1]])
    reshaped = landmarks.reshape(-1, 161, 3)
    rotated = np.dot(reshaped, rotation_matrix)
    return rotated.reshape(landmarks.shape)

def random_scale(landmarks, scale_range=(0.9, 1.1)):
    scale = np.random.uniform(*scale_range)
    return landmarks * scale

def random_translation(landmarks, max_translation=0.1):
    # Generate translation for each x, y, z
    translation = np.random.uniform(-max_translation, max_translation, size=3)
    
    # Reshape landmarks to (num_frames, 161, 3)
    reshaped = landmarks.reshape(-1, 161, 3)
    
    # Apply translation
    translated = reshaped + translation
    
    # Reshape back to original shape
    return translated.reshape(landmarks.shape)

def add_noise(landmarks, noise_level=0.0001):
    noise = np.random.normal(0, noise_level, landmarks.shape)
    return landmarks + noise

def random_frame_dropout(landmarks, max_dropout_ratio=0.1):
    num_frames = landmarks.shape[0]
    num_dropout = int(num_frames * np.random.uniform(0, max_dropout_ratio))
    dropout_indices = np.random.choice(num_frames, num_dropout, replace=False)
    landmarks[dropout_indices] = 0
    return landmarks

def augment_landmarks(landmarks):
    landmarks = random_rotation(landmarks)
    landmarks = random_scale(landmarks)
    landmarks = random_translation(landmarks)
    landmarks = add_noise(landmarks)
    landmarks = random_frame_dropout(landmarks)
    return landmarks

In [78]:
video1_keypoints = extract_keypoints("sign_language_translate/dataset/videos/โชคดี.mp4")
video2_keypoints = extract_keypoints("sign_language_translate/dataset/videos/สวัสดี.mp4")





























In [61]:
video1_keypoints.shape

(27, 483)

In [18]:
last_frame_video1 = video1_keypoints[-1]
first_frame_video2 = video2_keypoints[0]

In [19]:
new_video = concat_keypoints([video1_keypoints, video2_keypoints, video1_keypoints])

In [103]:
test = augment_landmarks(video1_keypoints)

In [104]:
xyz = convert_to_xyz(test)

In [21]:
# # draw points in 3d space
# def update(i):
#     ax.clear()
#     ax.set_xlim(0, 854)
#     ax.set_ylim(-1, 1)
#     ax.set_zlim(0, 480)
#     ax.scatter(854 - xyz[i, :, 0] * 854, xyz[i, :, 2], 480 - xyz[i, :, 1] * 480, s=1)

# fig = plt.figure()
# ax = fig.add_subplot(111, projection='3d')
# ani = animation.FuncAnimation(fig, update, frames=len(xyz), interval=1000)

# writervideo = animation.FFMpegWriter(fps=14) 
# ani.save("sign_language_translate/ani.mp4", writer=writervideo) 
# plt.close() 


In [105]:
# draw points in 2d space
def update(i):
    ax.clear()
    ax.set_xlim(0, 854)
    ax.set_ylim(0, 480)
    ax.scatter(854 - xyz[i, :, 0] * 854, 480 - xyz[i, :, 1] * 480, s=1)

fig = plt.figure()
ax = fig.add_subplot(111)
ani = animation.FuncAnimation(fig, update, frames=len(xyz), interval=1000)

writervideo = animation.FFMpegWriter(fps=14)
ani.save("sign_language_translate/ani2d2.mp4", writer=writervideo)
plt.close()


# convert videos to landmark data and save

In [109]:
video_files_paths = glob.glob("sign_language_translate/dataset/videos/*.mp4")
video_files_paths[:5]

['sign_language_translate/dataset/videos/สวัสดี.mp4',
 'sign_language_translate/dataset/videos/โชคดี.mp4']

In [110]:
# extract keypoints from all videos and save as npy
save_path = "sign_language_translate/dataset/keypoints"
os.makedirs(save_path, exist_ok=True)

for video_path in video_files_paths:
    video_keypoints = extract_keypoints(video_path)
    np.save(os.path.join(save_path, os.path.basename(video_path).replace(".mp4", ".npy")), video_keypoints)



In [115]:
# test load keypoints
video_keypoints = np.load("sign_language_translate/dataset/keypoints/โชคดี.npy")
video_keypoints.shape

(27, 483)

# load dataset

In [23]:
csv_dataset = pd.read_csv("sign_language_translate/test.csv")
csv_dataset

Unnamed: 0,sign,thai
0,สวัสดี โชคดี,สวัสดีครับ
1,โชคดี สวัสดี,โชคดีครับ


# train tokenizer

In [133]:
tokenizer_trainer = UnigramTrainer(special_tokens=["<pad>", "</s>", "<unk>"])

In [134]:
tokenizer = Tokenizer(Unigram())

In [135]:
# train tokenizer
tokenizer.train_from_iterator(csv_dataset["thai"].to_list(), trainer=tokenizer_trainer)





In [136]:
tokenizer.enable_padding(pad_id=tokenizer.token_to_id("<pad>"), pad_token="<pad>", length=MAX_TOKENS)
tokenizer.post_processor = TemplateProcessing(
    single="$A </s>",
    special_tokens=[
        ("</s>", tokenizer.token_to_id("</s>")),
    ],
)

In [137]:
# save tokenizer
tokenizer.save("sign_language_translate/tokenizer-unigram.json")

# load tokenizer

In [24]:
tokenizer = PreTrainedTokenizerFast(tokenizer_file="sign_language_translate/tokenizer-unigram.json")

In [25]:
vocab_size = tokenizer.vocab_size
vocab_size

13

In [149]:
a = tokenizer.encode("สวัสดี", padding="max_length", max_length=MAX_TOKENS, return_tensors="pt")

In [152]:
a.shape

torch.Size([1, 512])

In [153]:
tokenizer.decode(a[0])

'ส ว ั ส ด ี </s> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad> <pad

# dataset loader

In [167]:
def pad_landmarks(landmarks, max_frames=512, pad_value=0):
    if landmarks.shape[0] < max_frames:
        padding = torch.full((max_frames - landmarks.shape[0], landmarks.shape[1]), pad_value)
        padded_seq = torch.cat([landmarks, padding], dim=0)
        attention_mask = torch.cat([torch.ones(landmarks.shape[0]), torch.zeros(max_frames - landmarks.shape[0])])
    else:
        padded_seq = landmarks[:max_frames]
        attention_mask = torch.ones(max_frames)

    return padded_seq, attention_mask

In [194]:
class CustomDataset(Dataset):
    def __init__(self, dataset, tokenizer, augmentation=True, max_frames=MAX_FRAMES, max_tokens=MAX_TOKENS, keypoints_path="sign_language_translate/dataset/keypoints"):
        self.dataset = dataset
        self.tokenizer = tokenizer
        self.augmentation = augmentation
        self.max_frames = max_frames
        self.max_tokens = max_tokens
        self.keypoints_path = keypoints_path

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        sign, thai = self.dataset.iloc[idx]
        
        sign = sign.split(" ")
        keypoints_files = [os.path.join(self.keypoints_path, f"{sign_word}.npy") for sign_word in sign]
        keypoints = [np.load(file) for file in keypoints_files]
        
        # concat all keypoints
        keypoints = concat_keypoints(keypoints)

        # augment keypoints
        if self.augmentation:
            keypoints = augment_landmarks(keypoints)

        # pad keypoints
        keypoints, attention_mask = pad_landmarks(torch.tensor(keypoints, dtype=torch.float32), max_frames=self.max_frames)

        thai = self.tokenizer.encode(thai, padding="max_length", max_length=self.max_tokens, return_tensors="pt")[0]

        return {
            "landmarks": keypoints,
            "attention_mask": attention_mask,
            "labels": thai,
        }


In [195]:
dataset = CustomDataset(csv_dataset, tokenizer)

In [196]:
dataset[0]["attention_mask"].dtype

torch.float32

# define model

In [110]:
class Sign2ThaiT5(nn.Module):
    def __init__(self, t5_model="t5-small", vocab_size=vocab_size, input_dim=483, ):
        super().__init__()
        self.t5 = T5ForConditionalGeneration.from_pretrained(t5_model)
        self.t5.resize_token_embeddings(vocab_size)

        self.projection = nn.Linear(input_dim, self.t5.config.d_model, bias=False)


    def forward(self, landmarks, attention_mask=None, decoder_input_ids=None, labels=None):
        # Input shape: (batch_size, frames_len, landmarks_xyz)
        # Project landmarks to T5's d_model dimension
        projected_landmarks = self.projection(landmarks)
        
        # Pass the projected landmarks to T5
        outputs = self.t5(inputs_embeds=projected_landmarks,
                          attention_mask=attention_mask,
                          decoder_input_ids=decoder_input_ids,
                          labels=labels)
        
        return outputs
    
    def generate(self, landmarks, attention_mask, **generate_kwargs):
        projected_landmarks = self.projection(landmarks)
        
        return self.t5.generate(inputs_embeds=projected_landmarks, attention_mask=attention_mask, **generate_kwargs)


In [111]:
model = Sign2ThaiT5()



In [112]:
model.to(device)

Sign2ThaiT5(
  (t5): T5ForConditionalGeneration(
    (shared): Embedding(13, 512)
    (encoder): T5Stack(
      (embed_tokens): Embedding(13, 512)
      (block): ModuleList(
        (0): T5Block(
          (layer): ModuleList(
            (0): T5LayerSelfAttention(
              (SelfAttention): T5Attention(
                (q): Linear(in_features=512, out_features=512, bias=False)
                (k): Linear(in_features=512, out_features=512, bias=False)
                (v): Linear(in_features=512, out_features=512, bias=False)
                (o): Linear(in_features=512, out_features=512, bias=False)
                (relative_attention_bias): Embedding(32, 8)
              )
              (layer_norm): T5LayerNorm()
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (1): T5LayerFF(
              (DenseReluDense): T5DenseActDense(
                (wi): Linear(in_features=512, out_features=2048, bias=False)
                (wo): Linear(in_features=2048, ou

In [123]:
#model(torch.randn((1, 512, 483), dtype=torch.float32).to(device), decoder_input_ids=torch.randint(0, 1, (1, 512)).to(device))

Seq2SeqLMOutput(loss=None, logits=tensor([[[-19.2501, -16.5643, -17.9939,  ..., -14.8157, -12.0182, -12.5565],
         [-19.2501, -16.5643, -17.9939,  ..., -14.8157, -12.0182, -12.5565],
         [-19.2501, -16.5643, -17.9939,  ..., -14.8157, -12.0182, -12.5565],
         ...,
         [-19.2501, -16.5643, -17.9939,  ..., -14.8157, -12.0182, -12.5565],
         [-19.2501, -16.5643, -17.9939,  ..., -14.8157, -12.0182, -12.5565],
         [-19.2501, -16.5643, -17.9939,  ..., -14.8157, -12.0182, -12.5565]]],
       device='cuda:0', grad_fn=<UnsafeViewBackward0>), past_key_values=((tensor([[[[-0.4865, -2.3323, -1.1428,  ..., -2.5693, -1.7539, -0.5693],
          [-0.4865, -2.3323, -1.1428,  ..., -2.5693, -1.7539, -0.5693],
          [-0.4865, -2.3323, -1.1428,  ..., -2.5693, -1.7539, -0.5693],
          ...,
          [-0.4865, -2.3323, -1.1428,  ..., -2.5693, -1.7539, -0.5693],
          [-0.4865, -2.3323, -1.1428,  ..., -2.5693, -1.7539, -0.5693],
          [-0.4865, -2.3323, -1.1428,  

In [179]:
training_args = TrainingArguments(
    output_dir="sign_language_translate/results/sign2thai",
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir="./logs",
)

In [197]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset,
)

trainer.train()

Step,Training Loss


TrainOutput(global_step=3, training_loss=22.38366953531901, metrics={'train_runtime': 0.9215, 'train_samples_per_second': 6.511, 'train_steps_per_second': 3.256, 'total_flos': 0.0, 'train_loss': 22.38366953531901, 'epoch': 3.0})