# Laboratorio 2
## Pipeline de Machine Learning para Clasificación de Sentimientos con Transformers

En este laboratorio, realizarás una clasificación de sentimiento de reseñas de películas mediante el modelo de transformadores. Como parte de este laboratorio, implementarás un análisis exploratorio de datos para obtener conocimientos sobre la data y su distribución. Posteriormente, los modelos requieren un tokenizer que implementarás personalmente. A continuación, programarás el elemento integral del modelo de transformador, la Mecánica de Atención. Finalmente, combinarás todo y entrenarás al modelo en la tarea de clasificación binaria.

## Tutorial: Cómo activar la GPU gratuita en Google Colab

Para este laboratorio, se recomienda usar la GPU gratuita de Google Colab para acelerar el entrenamiento. Sigue estos pasos:

### Pasos para activar la GPU:

1. **Abre tu notebook en Google Colab**
   - Sube este archivo a Google Drive
   - Haz clic derecho sobre el archivo → "Abrir con" → "Google Colaboratory"

2. **Cambia el tipo de entorno de ejecución**
   - En el menú superior, ve a: **Entorno de ejecución** → **Cambiar tipo de entorno de ejecución**
   - (O en inglés: **Runtime** → **Change runtime type**)

3. **Selecciona la GPU**
   - En "Acelerador por hardware" (Hardware accelerator), selecciona T4 GPU
   - Haz clic en **Guardar** (Save)

4. **Verifica que la GPU esté activa**
   - Ejecuta la siguiente celda para confirmar que tienes acceso a la GPU

### Notas importantes:
- La GPU gratuita tiene límites de uso diario (aproximadamente 12-15 horas por día)
- Si se desconecta, deberás volver a ejecutar todas las celdas
- Guarda tu progreso frecuentemente

In [None]:
# Verificar que la GPU está disponible
import torch
if torch.cuda.is_available():
    print(f"✓ GPU detectada: {torch.cuda.get_device_name(0)}")
    print(f"  Memoria total: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
else:
    print("✗ No se detectó GPU. Por favor, activa la GPU siguiendo los pasos del tutorial anterior.")

## Instalación de Dependencias

Primero, instalaremos las bibliotecas necesarias para este laboratorio.

In [None]:
!pip install datasets

## Imports y Setup Inicial

Importamos todas las bibliotecas necesarias y configuramos las semillas aleatorias para reproducibilidad.

In [None]:
# Initial Setup
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split
import random
import numpy as np
from datasets import load_dataset
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter

# Set random seeds for reproducibility
random_seed = 42
torch.manual_seed(random_seed)
random.seed(random_seed)
np.random.seed(random_seed)

# Check if CUDA is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

## Análisis Exploratorio de Datos (EDA)

### 1.- Carga de Datos

Cargaremos el dataset IMDB que contiene reseñas de películas con sentimientos positivos y negativos.

In [None]:
# Load the dataset
dataset = load_dataset('imdb')

texts = dataset['train']['text'] + dataset['test']['text']
labels = dataset['train']['label'] + dataset['test']['label']

print(f"Total de textos: {len(texts)}")
print(f"Etiquetas únicas: {np.unique(labels)}")

### 2.- Visualización de Distribuciones

Analizaremos la distribución de longitudes de las reseñas y la distribución de las etiquetas (positivo/negativo).

In [None]:
def word_count(text):
    return len(text.split())

review_lengths = [word_count(text) for text in texts]

# Plot the distribution of review lengths
plt.figure(figsize=(12, 6))
sns.histplot(review_lengths, bins=30, kde=True)
plt.title('Gráfico de distribución de longitudes de reseñas')
plt.xlabel('Cantidad de palabras')
plt.ylabel('Frecuencia')
plt.show()

# Plot the distribution of labels
label_counts = Counter(labels)
plt.figure(figsize=(6, 6))
sns.barplot(x=list(label_counts.keys()), y=list(label_counts.values()))
plt.title('Gráfico de distribución de etiquetas')
plt.xlabel('Etiqueta')
plt.ylabel('Conteo')
plt.xticks(ticks=[0, 1], labels=['Negativo', 'Positivo'])
plt.show()

average_positive_length = np.mean([review_lengths[i] for i in range(len(labels)) if labels[i] == 1])
average_negative_length = np.mean([review_lengths[i] for i in range(len(labels)) if labels[i] == 0])

print(f"Promedio de longitud de reseñas positivas: {average_positive_length:.2f} palabras")
print(f"Promedio de longitud de reseñas negativas: {average_negative_length:.2f} palabras")

## Tareas

### 1: Implementación del Tokenizador

- [ ] Ampliar el tokenizador para que construya un vocabulario de 30.000 palabras y fragmentos de palabra (libro capítulo 12.5).
- [ ] Mostrar el tamaño del vocabulario a lo largo de las iteraciones del algoritmo.

In [None]:
# Tokenizer Implementation
class SimpleTokenizer:
    def __init__(self):
        """
        Inicializa el tokenizador con las variables necesarias para construir el vocabulario.
        El vocabulario contiene los tokens especiales y su código, mientras que la reversa_vocabulario
        contiene el código de cada token como clave y el token mismo como valor.
        """
        self.special_tokens = {'[PAD]': 0, '[UNK]': 1} # Tokens especiales con sus códigos
        self.vocab = {'[PAD]': 0, '[UNK]': 1} # Vocabulario inicializado con tokens especiales
        self.reverse_vocab = {0: '[PAD]', 1: '[UNK]'} # Reversa vocabulario inicializado

    def tokenize(self, text):
        """
        Convierte el texto a una lista de tokens al convertirlo a minúscula y separarla por espacios.
        """
        tokens = text.lower().split()
        return tokens

    def build_vocab(self, texts):
        """
        Crea el vocabulario a partir de la lista de textos proporcionados.
        Se agrega cada token nuevo al vocabulario y se actualiza la reversa_vocabulario correspondiente.
        """
        # TODO Ampliar el tokenizador para que construya un vocabulario de 30.000 palabras y fragmentos de palabra (libro capítulo 12.5).
        index = len(self.vocab)
        for text in texts:
            tokens = self.tokenize(text)
            for token in tokens:
                if token not in self.vocab:
                    self.vocab[token] = index
                    self.reverse_vocab[index] = token
                    index += 1

    def encode(self, text, max_length=128):
        """
        Convierte el texto a una lista de IDs de tokens utilizando el vocabulario.
        Si la longitud de los IDs es menor que la máxima longitud, se agregan [PAD] repetidos para alcanzar la longitud deseada.
        """
        # TODO Ampliar el tokenizador para que construya un vocabulario de 30.000 palabras y fragmentos de palabra (libro capítulo 12.5).
        tokens = self.tokenize(text)
        token_ids = [self.vocab.get(token, self.vocab['[UNK]']) for token in tokens]
        if len(token_ids) < max_length:
            token_ids += [self.vocab['[PAD]']] * (max_length - len(token_ids))
        else:
            token_ids = token_ids[:max_length]
        return token_ids

    def decode(self, token_ids):
        """
        Convierte la lista de IDs de tokens a un texto utilizando la reversa_vocabulario.
        """
        return ' '.join(self.reverse_vocab.get(id, '[UNK]') for id in token_ids)

    @property
    def vocab_size(self):
        return len(self.vocab)

simple_tokenizer = SimpleTokenizer()
simple_tokenizer.build_vocab(texts)
print(f"Tamaño del vocabulario: {simple_tokenizer.vocab_size}")
if simple_tokenizer.vocab_size >= 29900 or simple_tokenizer.vocab_size <= 30100:
    print(f"Vocabulary is incorrect: {simple_tokenizer.vocab_size}")

### Prueba del Tokenizador

Verificaremos que el tokenizador funciona correctamente codificando y decodificando un texto de ejemplo.

In [None]:
print("Ejemplo de codificación:")
print(simple_tokenizer.encode("The movie was maravilloso"))
print("\nEjemplo de decodificación:")
print(simple_tokenizer.decode(simple_tokenizer.encode("The movie was maravilloso")))

In [None]:
print(f"Imprimir 15 de la totalidad del vocabulario de tamaño {simple_tokenizer.vocab_size}")
for idx, key in enumerate(simple_tokenizer.vocab):
    print(f"palabra o fragmento correspondiente '{key}' con su token {simple_tokenizer.vocab[key]}")
    if idx == 15:
        break

## Preparación del Conjunto de Datos

Este conjunto de datos se ha reducido a 1500 desde 50.000 para que puedas desarrollar en una instancia de **Google Colab con GPU**. Con la GPU activada, el entrenamiento será mucho más rápido. Si usas CPU, también será posible pero tomará más tiempo.

Crearemos los conjuntos de entrenamiento, validación y prueba.

In [None]:
# Implementación del conjunto de datos IMDB
class IMDBDataset(Dataset):
    def __init__(self, tokenizer, max_length=128, seed=42):
        dataset = load_dataset('imdb')
        texts, labels = dataset['train']['text'] + dataset['test']['text'], dataset['train']['label'] + dataset['test']['label']
        data = list(zip(texts, labels))
        random.Random(seed).shuffle(data)
        self.texts, self.labels = zip(*data)
        self.texts = self.texts[:1500]
        self.labels = self.labels[:1500]
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        inputs = self.tokenizer.encode(text, max_length=self.max_length)
        return torch.tensor(inputs, dtype=torch.long), label

# longitud de secuencia
max_length = 128
dataset = IMDBDataset(simple_tokenizer, max_length=max_length)

# Dividir el conjunto de datos en conjuntos de entrenamiento, validación y prueba
train_size = 800
val_size = 200
test_size = 500
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size], generator=torch.Generator().manual_seed(random_seed))

# DataLoader para batchear
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

print(f"Tamaños de los conjuntos - Entrenamiento: {len(train_dataset)}, Validación: {len(val_dataset)}, Prueba: {len(test_dataset)}")

## Modelo

### 2: Mecanismo de Atención

- [ ] Capa de atención unidireccional **SingleHeadAttention** con matrices de Query, Key y Value $Q_q, Q_k, Q_v$ (libro capítulo 12.2)
- [ ] **Ajustado** Punto de dot producto de escala (libro capítulo 12.3.2)

In [None]:
class ScaledDotProductAttention(nn.Module):
    """
    Implementación de la atención unidireccional escalada por punto de escala, 
    utilizada en las transformadoras.
    
    - Utiliza la función dot producto escalada para calcular los pesos de atención.
    - Aplica la función softmax para obtener las probabilidades de atención.
    - Multiplica las probabilidades de atención con el valor correspondiente para obtener el output.
    """
    def __init__(self, dim_model):
        super(ScaledDotProductAttention, self).__init__()
        # TODO

    def forward(self, query, key, value):
        """
        Procesar los inputs y obtener el output.
        
        - Calcula las matrices de dot producto entre query y key.
        - Divide las matrices de dot producto por la escala.
        - Aplica la función softmax para obtener las probabilidades de atención.
        - Multiplica las probabilidades de atención con el valor correspondiente para obtener el output.
        
        Returns:
            output (tensor): El output de la capa de atención.
            attn_weights (tensor): Las probabilidades de atención.
        """
        # TODO
        return output, attn_weights

class SingleHeadAttention(nn.Module):
    """
    Implementación de la atención unidireccional con varias capas lineales, 
    utilizada en las transformadoras.
    
    - Utiliza varias capas lineales para calcular las matrices de query, key y value.
    - Aplica la capa de atención escalada por punto de escala para obtener las probabilidades de atención.
    - Multiplica las probabilidades de atención con el valor correspondiente para obtener el output final.
    """
    def __init__(self, dim_model):
        super(SingleHeadAttention, self).__init__()
        # TODO

    def forward(self, x):
        """
        Procesar los inputs y obtener el output.
        
        - Calcula las matrices de query, key y value utilizando varias capas lineales.
        - Aplica la capa de atención escalada por punto de escala para obtener las probabilidades de atención.
        - Multiplica las probabilidades de atención con el valor correspondiente para obtener el output final.
        
        Returns:
            output (tensor): El output final de la capa de atención.
        """
        # TODO
        output = 0
        return output

### 3: Capa del Transformer

- [ ] Unir los elementos de la capa SingleHeadAttention, Block de normalización por capa (nn.LayerNorm) y el bloque de forward feedforward (nn.Linear) con una capa de normalización final (libro capítulo 12.4)
- [ ] No olvidar las conexiones residuales

En general, se recomienda utilizar ReLU y además técnicas de regularización para mejorar la generalización del modelo a los datos de prueba (como Dropout).

In [None]:
class TransformerEncoderLayer(nn.Module):
    """
    Una capa única en la arquitectura del codificador de transformer.
    
    Esta clase define un paso adelante que aplica atención autoregresa, seguido de dos capas lineales y normalización de capa.
    """
    def __init__(self, dim_model, dim_feedforward):
        super(TransformerEncoderLayer, self).__init__()
        # TODO

    def forward(self, src):
        """
        Paso adelante a través de la capa del codificador de transformer.
        
        Parámetros:
            src (Tensor): Tensor de entrada a ser procesado por la capa, torch.Size([8, 128, 32])
        
        Regresa:
            Tensor: Tensor de salida después de aplicar la atención autoregresa y el FFN.
        """
        # TODO
        return src

### Arquitectura del Modelo

El **SentimentAnalysisModel** básico se proporciona. Este modelo combina todos los componentes anteriores para crear un clasificador de sentimientos.

In [None]:
class SentimentAnalysisModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes):
        super(SentimentAnalysisModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.encoder = TransformerEncoderLayer(embed_dim, hidden_dim)
        self.fc = nn.Linear(embed_dim, num_classes)

    def forward(self, src):
        src = self.embedding(src)
        encoded_output = self.encoder(src)
        pooled_output = encoded_output.mean(dim=1)
        return self.fc(pooled_output)

# Puedes experimentar con diferentes parámetros hiperbásicos, como la embedding o el dimensión oculta.
# Tal como puedes ajustar los parámetros de aprendizaje para mejorar el rendimiento del modelo.
embed_dim = 32
hidden_dim = 128
num_classes = 2
vocab_size = simple_tokenizer.vocab_size
model = SentimentAnalysisModel(vocab_size, embed_dim, hidden_dim, num_classes)

# Test with synthetic data
fake_data = torch.randint(0, vocab_size, (8, max_length))
fake_output = model(fake_data)
assert fake_output.shape == (8, num_classes), "Output shape is incorrect"
print(f"Modelo creado exitosamente. Shape de salida: {fake_output.shape}")
model

## Fase de Entrenamiento

### 4: Entrenamiento

- [ ] Implementar el mecanismo de detección temprana o otra monitoreo del progreso de aprendizaje
- [ ] Experimentar con tamaños más grandes de modelos y comparar el rendimiento obtenido con la cantidad de parámetros requeridos

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


# Bucle de entrenamiento con validación
for epoch in range(1): # TODO aumentar el número de épocas para entrenar modelos más potentes
    model.train()
    total_loss = 0
    for texts, labels in train_loader:
        optimizer.zero_grad()
        output = model(texts)
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch: {epoch+1}, Training Loss: {total_loss/len(train_loader):.4f}")

    # Bucle de validación
    # TODO Implementar el mecanismo de detección temprana o otra monitoreo del progreso de aprendizaje
    model.eval()
    val_correct, val_total = 0, 0
    val_loss = 0
    with torch.no_grad():
        for texts, labels in val_loader:
            outputs = model(texts)
            _, predicted = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()
            loss = criterion(outputs, labels)
            val_loss += loss.item()
    val_accuracy = 100 * val_correct / val_total
    print(f'Validation Loss: {val_loss/len(val_loader):.4f}, Validation Accuracy: {val_accuracy:.2f}%')

## Análisis de Resultados

### 5: Evaluación en el Conjunto de Test

- [ ] Comparar el rendimiento del modelo con muestras de prueba de diferentes longitudes
- [ ] Bonus: alcanzar una precisión superior al 90%. Es posible que debas incrementar el numero de parámetros y que debas implementar la atención múltiple AutoAtención y multicapa para lograr esto (libro capítulo 12.3.3).

In [None]:
from sklearn.metrics import confusion_matrix
# Test loop
model.eval()
test_correct, test_total = 0, 0
y_true = []
y_pred = []
with torch.no_grad():
    for texts, labels in test_loader:
        outputs = model(texts)
        _, predicted = torch.max(outputs, 1)
        y_pred.append(predicted[0])
        y_true.append(labels[0])
        
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()
test_accuracy = 100 * test_correct / test_total
print(f'Test Accuracy: {test_accuracy:.2f}%')
print("\nMatriz de confusión:")
print(confusion_matrix(y_pred, y_true))

## Despliegue

### 6: Pruebas con Ejemplos Personalizados

- [ ] Intenta crear nuevos ejemplos donde el modelo falla al clasificar correctamente el sentimiento.
- [ ] Experimenta, describe y explica qué ocurre cuando intentas clasificar una crítica de cine en español utilizando tu modelo de transformador.

In [None]:
review = "The movie was amazing"
inputs = simple_tokenizer.encode(review, max_length=max_length)
inputs = torch.tensor([inputs], dtype=torch.long)

model.eval()
with torch.no_grad():
    outputs = model(inputs)
    _, predicted_class = torch.max(outputs, 1)

class_labels = {0: "Negative", 1: "Positive"}
predicted_label = class_labels[predicted_class.item()]
print(f"The predicted class for the text '{review}' is: {predicted_label}")

## Conclusiones

Analiza los resultados obtenidos:

- ¿Qué tipo de reseñas fueron más difíciles de clasificar para el modelo? ¿A qué crees que se debe?
- ¿Qué mejoras podrías proponer para el modelo o el proceso de entrenamiento?
- ¿Qué sucede cuando pruebas el modelo con texto en español?