In [1]:
import os
import re
import json
import pickle

import fasttext
from fasttext import util
import shutil

import time
from tqdm import tqdm
import math

import numpy as np

from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import KFold

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam, AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau

from transformers import AutoTokenizer, AutoModel

In [2]:
words = np.load('/home/sju/HyoJun/Creative_semester_system/words.npy')
words = words.tolist()
y_data1 = np.load('/home/sju/HyoJun/Creative_semester_system/y_data1.npy')

In [19]:
class TestDataset(Dataset):
    def __init__(self, words, landmarks):
        self.words = words
        self.landmarks = torch.tensor(landmarks, dtype=torch.float32)
        self.first_frame = self.landmarks[:, 0, :, :]  # (N, L, D)

    def __len__(self):
        return len(self.words)

    def __getitem__(self, idx):
        word = self.words[idx]
        first_frame = self.first_frame[idx]   # (L, D)
        return word, first_frame

In [20]:
class WordEmbeddingDecoderModel(nn.Module):
    def __init__(self, ft_model_path, hidden_size, num_frames, num_landmarks, dim):
        super(WordEmbeddingDecoderModel, self).__init__()
        self.num_frames = num_frames
        self.num_landmarks = num_landmarks
        self.dim = dim
        self.hidden_size = hidden_size

        self.ft = fasttext.load_model(ft_model_path)
        self.embedding_dim = self.ft.get_dimension()
        
        self.encoder_proj = nn.Linear(self.embedding_dim, hidden_size)

        self.decoder_input_proj = nn.Linear(num_landmarks * dim, hidden_size)

        self.decoder_layer = nn.TransformerDecoderLayer(
            d_model=hidden_size,
            nhead=8,
            dim_feedforward=hidden_size//2,
            dropout=0.3,
            activation='relu',
            batch_first=True
        )
        self.transformer_decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=8)

        self.fc = nn.Linear(hidden_size, num_landmarks * dim)

    def encode_text(self, batch_words, device='cuda'):
        
        batch_embeddings = []
        for words in batch_words:
            word_embeds = [self.ft.get_word_vector(w) for w in words]
            batch_embeddings.append(word_embeds)
        
        embeddings = torch.tensor(batch_embeddings).to(device)
        embeddings = self.encoder_proj(embeddings)
        
        return embeddings

    def forward(self, words: list[str], decoder_input: torch.Tensor, device: torch.device):
        
        memory = self.encode_text(words, device) # (1, 1, 768)

        # Prepare decoder input
        decoder_input = decoder_input.to(device) # (1, 204, 137, 2)
        
        batch_size = decoder_input.size(0)
        num_frames = decoder_input.size(1)
        
        decoder_input_flat = decoder_input.view(batch_size, num_frames, -1)
        decoder_input_proj = self.decoder_input_proj(decoder_input_flat)  # (B, T, hidden_size)

        # Create autoregressive mask
        tgt_mask = torch.triu(torch.ones(num_frames, num_frames), diagonal=1).bool().to(device)

        # Decode
        decoder_output = self.transformer_decoder(
            tgt=decoder_input_proj,
            memory=memory,
            tgt_mask=tgt_mask
        )  # (B, T, hidden_size)

        output = self.fc(decoder_output)  # (B, T, L*D)
        output = output.view(batch_size, num_frames, self.num_landmarks, self.dim)

        return output

In [21]:
HIDDEN_SIZE = 768
NUM_FRAMES = y_data1.shape[1]
NUM_LANDMARKS = y_data1.shape[2]
DIM = 2

EPOCHS = 100
BATCH_SIZE = 1
LR = 1e-6

In [None]:
model = WordEmbeddingDecoderModel(
    ft_model_path='/home/sju/HyoJun/Creative_semester_system/cc.ko.300.bin',
    hidden_size=HIDDEN_SIZE,
    num_frames=NUM_FRAMES,
    num_landmarks=NUM_LANDMARKS,
    dim=DIM
).cuda()

model.load_state_dict(torch.load(f"/home/sju/HyoJun/Creative_semester_system/Model/text2sign.pth"));

<All keys matched successfully>

In [23]:
n = 0

word = words[n]
landmark = y_data1[n, ...]

print(word)

['고민']


In [24]:
%%time

test_dataset = TestDataset(word, landmark[None, ...])
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)

model.eval()

preds_list = []
for batch in test_dataloader:
    word, first_frame = batch
    word = word
    first_frame = first_frame.cuda()
    
    current_dec_input = first_frame.unsqueeze(1)   #.float()
    
    for step in range(NUM_FRAMES):  

        with torch.no_grad():
            output = model(word, current_dec_input, 'cuda')
            last_frame = output[:, -1, :, :]  # (B, L, D)
            last_frame_unsq = last_frame.unsqueeze(1)  # (B, 1, L, D)
            current_dec_input = torch.cat([current_dec_input, last_frame_unsq], dim=1)

    final_pred = current_dec_input[:, 1:, :, :]  # (batch, NUM_FRAMES, L, D)

    preds_list.append(final_pred.cpu())

preds = torch.cat(preds_list, dim=0).numpy()
print(preds.shape)

  embeddings = torch.tensor(batch_embeddings).to(device)


(1, 204, 137, 2)
CPU times: user 9.53 s, sys: 4.31 ms, total: 9.54 s
Wall time: 556 ms


In [27]:
with open('/home/sju/HyoJun/Creative_semester_system/scalers.pkl', 'rb') as f:
    scalers = pickle.load(f)

In [28]:
y_data_restored = []

for arr_normalized, scaler in zip(preds, scalers):
    arr_restored = scaler.inverse_transform(arr_normalized.reshape(-1, arr_normalized.shape[-1])).reshape(arr_normalized.shape)
    y_data_restored.append(arr_restored)

y_data_restored = np.array(y_data_restored)
print("Restored y_data shape:", y_data_restored.shape)

Restored y_data shape: (1, 204, 137, 2)


In [29]:
preds_inv = y_data_restored
print(preds_inv.shape)
np.save(f"/home/sju/HyoJun/Creative_semester_system/preds/sign_preds{n+1}.npy", preds_inv)

(1, 204, 137, 2)
