<h1 style="font-size:200%; font-family:cursive; color:white;">1. Import Required Libraries & Dataset</h1>

In [1]:
import numpy as np
import pandas as pd
import os
import torch
import torch.nn as nn
from sklearn.metrics import classification_report
from transformers import AutoModel, BertTokenizer

#specify GPU
device = torch.device("cuda")

### Check if Pytorch is using GPU

In [None]:
torch.cuda.is_available(), torch.cuda.device_count(), torch.cuda.current_device(), torch.cuda.device(0), torch.cuda.get_device_name(0), device

In [4]:
df_videos = pd.read_csv("/app/AI-Module/Resources/Datasets/how2sign.csv", usecols=["translation", "id"])
#df_videos = pd.read_csv("C:/Users/48519558/Desktop/SignAI-ML/AI-Module/Resources/Datasets/how2sign.csv", usecols=["translation", "id"])
#df_videos = pd.read_csv("D:/SignAI-ML/AI-Module/Resources/Datasets/how2sign.csv", usecols=["translation", "id"])

In [5]:
# df_videos = pd.read_csv("D:/Sprinfil/Dataset/how2sign.csv")
# df_videos = df_videos.sort_values(["id"]).reset_index().drop(["index"], axis=1)
# df_videos.head()
# import ast
# points = df_videos["points"]
# for index, point in enumerate(points):
#     print(index)
#     points.at[index] = ast.literal_eval(point)
# type(points.iloc[0])
# df_videos["points"] = points
# df_videos.head()
# for index, item in enumerate(points):
#     item = np.array(item)
#     print(index, item.shape)
#     item_with_index = np.array((item, index), dtype=object)
#     points.iloc[index]=item
#     np.save(f"{index}.npy", item_with_index)
# df_videos["points"] = points
# df_videos.head()

In [None]:
puntos_folder = "/app/AI-Module/Resources/Datasets/Points"
#puntos_folder = "C:/Users/48519558/Desktop/SignAI-ML/AI-Module/Resources/Datasets/Points"
#puntos_folder = "D:/SignAI-ML/AI-Module/Resources/Datasets/Points"
files = [puntos_folder + "/" + file for file in os.listdir(puntos_folder)]
len(files)

In [7]:
def load_points(files):
    for file in files:
        item_with_index = np.load(file, allow_pickle=True)
        item = item_with_index[0].astype(np.float16)
        index = item_with_index[1]
        yield item, index

puntos_list = []
ids_list = []

for item, index in load_points(files):
    puntos_list.append(item)
    ids_list.append(index)

df_puntos = pd.DataFrame({
    'points': puntos_list,
    'id': ids_list
})
df_puntos = df_puntos.sort_values(["id"]).reset_index(drop=True)

In [8]:
df_videos = df_videos.merge(df_puntos, on="id", how="inner")

***

In [None]:
max_len = df_videos['points'].apply(lambda x: len(x)).max()
print(max_len)
df_videos.head()

In [10]:
def add_padding(max_frames, point_series: pd.Series):
    for i in range(len(point_series)):
        current_length = len(point_series[i])
        if current_length < max_frames:
            padding = np.full(
                (max_frames - current_length, 2172), 
                -1,
                dtype=np.float16
            )
            padding[:, 3::4] = 0
            point_series[i] = np.concatenate((point_series[i], padding), axis=0)
    return point_series

In [None]:
df_videos['points'] = add_padding(max_len, df_videos['points'])

In [12]:
from sklearn.model_selection import train_test_split
X_df = df_videos['points'].to_frame()
y_df = df_videos['translation'].to_frame()
seed = 31991
X_train, X_test, y_train, y_test = train_test_split(X_df, y_df, test_size=(100/500), random_state=seed)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=(100/400), random_state=seed)

In [None]:
X_train.head(), y_train.head(), X_val.head(), y_val.head(), X_test.head(), y_test.head()

In [None]:
X_train.shape, y_train.shape, X_val.shape, y_val.shape, X_test.shape, y_test.shape

In [None]:
import gc
globals().pop("df_videos", None)
globals().pop("puntos_list", None)
globals().pop("ids_list", None)
globals().pop("df_puntos", None)
globals().pop("X_df", None)
globals().pop("item", None)
globals().pop("y_df", None)
gc.collect()


<h1 style="font-size:200%; font-family:cursive; color:white;">3. Import Bert - base- uncased</h1>

In [None]:
# import BERT-base pretrained model
bert = AutoModel.from_pretrained('bert-base-uncased')
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [17]:
# tokenize and encode sequences in the training set
tokens_label_train = tokenizer.batch_encode_plus(
    y_train['translation'].tolist(),
    padding=True,
    truncation=True,
    return_tensors="pt"
)

# tokenize and encode sequences in the validation set
tokens_label_val = tokenizer.batch_encode_plus(
    y_val['translation'].tolist(),
    padding = True,
    truncation = True,
    return_tensors="pt"
)

# tokenize and encode sequences in the test set
tokens_label_test = tokenizer.batch_encode_plus(
    y_test['translation'].tolist(),
    padding = True,
    truncation = True,
    return_tensors="pt"
)

<u><h2 style="font-size:170%; font-family:cursive;">What is the maximum sequence length of the input?</h2></u>

<p style="font-size:150%; font-family:verdana;">The maximum sequence length of the input = 512</p>

<h1 style="font-size:200%; font-family:cursive; color:white;">5. List to Tensors</h1>

In [18]:
def create_attention_mask_from_points(tensor: torch.Tensor) -> torch.Tensor:
    # Crear una máscara inicial llena de unos
    mask = torch.ones(tensor.size(), dtype=torch.int8, device=tensor.device)

    # Reestructurar el tensor para facilitar la verificación de condiciones
    points = tensor.view(tensor.size(0), tensor.size(1), -1, 4)  # Cambiar la forma para agrupar por 4
    conditions = (points[:, :, :, 0] == -1) & (points[:, :, :, 1] == -1) & (points[:, :, :, 2] == -1) & (points[:, :, :, 3] == 0)

    # Aplicar la condición directamente a la máscara
    mask.view(tensor.size(0), tensor.size(1), -1, 4)[conditions] = 0

    return mask


In [None]:
# convert lists to tensors
gc.collect()
train_np = np.array(X_train['points'].tolist(), dtype=np.float16)
train_seq = torch.tensor(train_np, dtype=torch.float16)
gc.collect()
train_mask = create_attention_mask_from_points(train_seq)
train_y = tokens_label_train['input_ids']
globals().pop("X_train", None)
globals().pop("train_np", None)
globals().pop("y_train", None)
globals().pop("tokens_label_train", None)
gc.collect()
gc.collect()
val_np = np.array(X_val['points'].tolist(), dtype=np.float16)
val_seq = torch.tensor(val_np, dtype=torch.float16)
gc.collect()
val_mask = create_attention_mask_from_points(val_seq)
val_y = tokens_label_val['input_ids']
globals().pop("X_val", None)
globals().pop("val_np", None)
globals().pop("y_val", None)
globals().pop("tokens_label_val", None)
gc.collect()
gc.collect()
test_np = np.array(X_test['points'].tolist(), dtype=np.float16)
test_seq = torch.tensor(test_np, dtype=torch.float16)
gc.collect()
test_mask = create_attention_mask_from_points(test_seq)
test_y = tokens_label_test['input_ids']
globals().pop("X_test", None)
globals().pop("test_np", None)
globals().pop("y_test", None)
globals().pop("tokens_label_test", None)
gc.collect()

<h1 style="font-size:200%; font-family:cursive; color:white;">6. Data Loader</h1>

In [None]:
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

#define a batch size
batch_size = 32

# wrap tensors
train_data = TensorDataset(train_seq, train_mask, train_y)
globals().pop("train_seq", None)
globals().pop("train_mask", None)
globals().pop("train_y", None)
gc.collect()
# sampler for sampling the data during training
train_sampler = RandomSampler(train_data)

# dataLoader for train set
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)
globals().pop("train_data", None)
globals().pop("train_sampler", None)
gc.collect()

# wrap tensors
val_data = TensorDataset(val_seq, val_mask, val_y)
globals().pop("val_seq", None)
globals().pop("val_mask", None)
globals().pop("val_y", None)
gc.collect()

# sampler for sampling the data during training
val_sampler = SequentialSampler(val_data)

# dataLoader for validation set
val_dataloader = DataLoader(val_data, sampler = val_sampler, batch_size=batch_size)
globals().pop("val_data", None)
globals().pop("val_sampler", None)
gc.collect()

<h1 style="font-size:200%; font-family:cursive; color:white;">7. Model Architecture</h1>

In [21]:
#float("sfgldiubjknstrgflbjhk")

In [None]:
import torch
from torch import nn
from transformers import T5Tokenizer, T5ForConditionalGeneration

class FloatToTextModel(nn.Module):
    def __init__(self, model_name="t5-small"):
        super(FloatToTextModel, self).__init__()
        self.tokenizer = T5Tokenizer.from_pretrained(model_name)
        self.model = T5ForConditionalGeneration.from_pretrained(model_name)

    def float_to_text(self, float_sequence):
        """Convierte una secuencia de floats a texto."""
        return " ".join(map(str, float_sequence))

    def forward(self, float_sequence, labels=None):
        """Propaga la entrada a través del modelo.
        
        Args:
            float_sequence: Tensor de entrada con forma (batch_size, 2537, 2172).
            labels: (Opcional) IDs de tokens de la salida esperada para calcular la pérdida.

        Returns:
            Salida del modelo, que puede incluir la pérdida si se proporcionan etiquetas.
        """
        # Asegurarse de que float_sequence tenga la forma correcta
        batch_size = float_sequence.shape[0]
        
        # Procesar cada elemento del batch de manera independiente
        outputs = []
        for i in range(batch_size):
            # Extraer el primer elemento de la segunda dimensión
            input_seq = float_sequence[i, 0, :]  # Extrae el primer "vector" de la dimensión 1
            input_text = self.float_to_text(input_seq.numpy())  # Convertir a texto

            # Tokenizar el texto de entrada
            input_ids = self.tokenizer(input_text, return_tensors="pt", padding=True, truncation=True).input_ids
            
            # Propagar a través del modelo
            output = self.model(input_ids=input_ids, labels=labels[i].unsqueeze(0) if labels is not None else None)
            outputs.append(output)

        return outputs  # Devuelve una lista de salidas para cada elemento del batch

    def generate(self, float_sequence):
        """Genera texto a partir de una secuencia de floats."""
        self.eval()  # Cambia a modo evaluación
        batch_size = float_sequence.shape[0]
        generated_texts = []

        for i in range(batch_size):
            input_seq = float_sequence[i, 0, :]  # Extraer el primer "vector" de la dimensión 1
            input_text = self.float_to_text(input_seq.numpy())  # Convertir a texto

            # Tokenizar el texto de entrada
            input_ids = self.tokenizer(input_text, return_tensors="pt", padding=True, truncation=True).input_ids

            # Generar la salida
            with torch.no_grad():
                outputs = self.model.generate(input_ids)
            
            # Decodificar los IDs de salida a texto
            output_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
            generated_texts.append(output_text)

        return generated_texts  # Devuelve una lista de textos generados para cada elemento del batch

# Ejemplo de uso
if __name__ == "__main__":
    model = FloatToTextModel()
    float_sequence = torch.randn((2, 2537, 2172))  # Ejemplo de batch_size = 2

    # Generar texto
    generated_texts = model.generate(float_sequence)
    for idx, text in enumerate(generated_texts):
        print(f"Texto generado para el batch {idx}: {text}")

    # Ejemplo de forward con etiquetas (para entrenamiento)
    labels = model.tokenizer("hola mundo", return_tensors="pt").input_ids.unsqueeze(0).repeat(2, 1)  # Ajustar etiquetas al tamaño del batch
    outputs = model(float_sequence, labels=labels)
    for idx, output in enumerate(outputs):
        print(f"Pérdida para el batch {idx}: {output.loss.item()}")


<h1 style="font-size:200%; font-family:cursive; color:white;">8. Fine - Tune</h1>

In [None]:
import wandb
wandb.login()

In [24]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

def calculate_bleu(preds, references):
    smoothie = SmoothingFunction().method4  # Smoothing method to avoid zero scores for short sentences
    # Calcula el BLEU score para cada predicción en el batch
    scores = []
    for pred, ref in zip(preds, references):
        score = sentence_bleu([ref], pred, smoothing_function=smoothie)
        scores.append(score)
    return scores

# Function to convert token ids back to words (predictions and labels)
def decode_predictions(predictions, tokenizer):
    decoded_preds = []
    for pred in predictions:
        # Convert token IDs to tokens (words) using the tokenizer's decode method
        decoded = tokenizer.decode(pred, skip_special_tokens=True)
        decoded_preds.append(decoded)
    return decoded_preds

In [25]:
def train(
        model: torch.nn.Module, 
        train_loader: DataLoader, 
        optimizer: torch.optim.Optimizer, 
        criterion: torch.nn.Module, 
        device: torch.device, 
        epoch: int):
    
    model.train()
    running_loss = 0.0
    total = 0
    correct = 0
    
    all_preds = []
    all_refs = []

    for batch_idx, (inputs, mask, labels) in enumerate(train_loader):
        inputs, mask, labels = inputs.to(device), mask.to(device), labels.to(device)
        
        # Forward pass
        outputs = model(inputs)  # (batch_size, seq_length, vocab_size)
        loss = criterion(outputs.view(-1, outputs.size(-1)), labels.view(-1))
        
        # Backward pass y optimización
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Cálculo de métricas
        running_loss += loss.item()
        total += labels.numel()
        _, predicted = torch.max(outputs.view(-1, outputs.size(-1)), 1)
        correct += (predicted == labels.view(-1)).sum().item()

        # Decodificación para BLEU
        all_preds.extend(predicted.tolist())
        all_refs.extend(labels.view(-1).tolist())
        
        # Logs en consola
        if batch_idx % 100 == 0:
            print(f'Epoch [{epoch}], Step [{batch_idx}/{len(train_loader)}], Loss: {loss.item():.4f}')
        
        # Logs en W&B
        wandb.log({
            "epoch": epoch,
            "batch_idx": batch_idx,
            "loss": loss.item(),
            "accuracy_batch": 100 * correct / total
        })
    
    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = 100 * correct / total

    # Calcular BLEU
    train_bleu = calculate_bleu(all_preds, all_refs)

    print(f"Epoch [{epoch}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%")
    
    return epoch_loss, train_bleu

def evaluate(
        model: torch.nn.Module, 
        val_loader: DataLoader, 
        criterion: torch.nn.Module, 
        device: torch.device, 
        epoch: int):
    
    model.eval()
    running_loss = 0.0
    total = 0
    correct = 0
    
    all_preds = []
    all_refs = []

    with torch.no_grad():
        for batch_idx, (inputs, mask, labels) in enumerate(val_loader):
            inputs, mask, labels = inputs.to(device), mask.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs.view(-1, outputs.size(-1)), labels.view(-1))
            
            # Cálculo de métricas
            running_loss += loss.item()
            total += labels.numel()
            _, predicted = torch.max(outputs.view(-1, outputs.size(-1)), 1)
            correct += (predicted == labels.view(-1)).sum().item()

            # Decodificación para BLEU
            all_preds.extend(predicted.tolist())
            all_refs.extend(labels.view(-1).tolist())
            
            # Logs en consola
            if batch_idx % 100 == 0:
                print(f'Eval Step [{batch_idx}/{len(val_loader)}], Loss: {loss.item():.4f}')
            
            # Logs en W&B
            wandb.log({
                "eval_batch_idx": batch_idx,
                "eval_loss": loss.item(),
                "eval_accuracy_batch": 100 * correct / total
            })
    
    epoch_loss = running_loss / len(val_loader)
    epoch_accuracy = 100 * correct / total

    # Calcular BLEU
    valid_bleu = calculate_bleu(all_preds, all_refs)

    print(f"Validation Epoch [{epoch}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%")
    
    return epoch_loss, valid_bleu


In [None]:
# Inicialización de variables para seguimiento del mejor modelo
best_valid_loss = float('inf')
best_bleu_score = 0

# Listas para almacenar loss y BLEU de cada epoch
train_losses = []
valid_losses = []
train_bleu_scores = []
valid_bleu_scores = []

# Configuración de hiperparámetros
wandb.config = {
    "epochs": epochs,
    "batch_size": batch_size,
    "learning_rate": optimizer.param_groups[0]['lr']
}

# Ciclo de entrenamiento y evaluación por cada epoch
for epoch in range(epochs):
    print(f"\nEpoch {epoch + 1}/{epochs}")

    # Entrenar el modelo
    train_loss, train_bleu = train(model, train_dataloader, optimizer, cross_entropy, device, epoch)
    
    # Evaluar el modelo
    valid_loss, valid_bleu = evaluate(model, val_dataloader, cross_entropy, device, epoch)

    # Guardar el mejor modelo basado en la validación de loss
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best_model.pt')
        print(f"Modelo guardado en epoch {epoch + 1} con loss de validación {valid_loss:.4f}")

    # Guardar loss y BLEU para entrenamiento y validación
    train_losses.append(train_loss)
    valid_losses.append(valid_loss)
    train_bleu_scores.append(train_bleu)
    valid_bleu_scores.append(valid_bleu)

    # Mostrar estadísticas de entrenamiento y validación
    print(f"Training Loss: {train_loss:.4f} | Training BLEU: {train_bleu:.4f}")
    print(f"Validation Loss: {valid_loss:.4f} | Validation BLEU: {valid_bleu:.4f}")
    
    # Registros de la época en W&B
    wandb.log({
        "epoch": epoch + 1,
        "train_loss": train_loss,
        "train_bleu": train_bleu,
        "valid_loss": valid_loss,
        "valid_bleu": valid_bleu
    })

# Finaliza el seguimiento de W&B
wandb.finish()

In [None]:
#load weights of best model
path = 'best_model.pt'
model.load_state_dict(torch.load(path))

<h1 style="font-size:200%; font-family:cursive; color:white;">9. Make Predictions</h1>

In [None]:
# get predictions for test data
with torch.no_grad():
    preds = model(test_seq.to(device), test_mask.to(device))
    preds = preds.detach().cpu().numpy()

In [None]:
# model's performance
preds = np.argmax(preds, axis = 1)
print(classification_report(test_y, preds))