In [1]:
import re, random, math, os
from collections import Counter
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pack_padded_sequence
from datasets import load_dataset
from tqdm import tqdm

In [2]:
# --------------------------
# 0) Configuración general
# --------------------------
SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# Hiperparámetros principales
BATCH_SIZE   = 64
NUM_EPOCHS   = 3
EMBED_DIM    = 100
HIDDEN_DIM   = 128
NUM_LAYERS   = 1
BIDIR        = False
DROPOUT      = 0.2
MIN_FREQ     = 5           # palabras con freq < MIN_FREQ se mapean a <unk>
MAX_LEN      = 200         # truncado/padding por secuencia
LR           = 1e-3

Device: cpu


In [3]:
# --------------------------
# 1) Cargar dataset AG_NEWS
# --------------------------
# Columnas: "text" y "label" (label en [0..3])
ds = load_dataset("ag_news")
train_ds = ds["train"]
test_ds  = ds["test"]
NUM_CLASSES = 4
TARGET_NAMES = ["World", "Sports", "Business", "Sci/Tech"]
print("Train:", len(train_ds), "| Test:", len(test_ds))

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

data/train-00000-of-00001.parquet:   0%|          | 0.00/18.6M [00:00<?, ?B/s]

data/test-00000-of-00001.parquet:   0%|          | 0.00/1.23M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/120000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/7600 [00:00<?, ? examples/s]

Train: 120000 | Test: 7600


In [4]:
# --------------------------
# 2) Tokenizador + vocabulario
# --------------------------
# Tokenización simple por palabras, lowercase, sin signos
_word_re = re.compile(r"\b\w+\b", re.UNICODE)
def tokenize(text: str):
    return _word_re.findall(text.lower())

# Construcción de vocab a partir del split de train
counter = Counter()
for ex in train_ds:
    counter.update(tokenize(ex["text"]))

specials = ["<pad>", "<unk>"]
itos = specials + [tok for tok, c in counter.items() if c >= MIN_FREQ]
stoi = {tok: i for i, tok in enumerate(itos)}
PAD_IDX = stoi["<pad>"]
UNK_IDX = stoi["<unk>"]
print("Vocab size:", len(itos))

def text_to_ids(text: str):
    ids = [stoi.get(tok, UNK_IDX) for tok in tokenize(text)]
    # truncado a MAX_LEN
    if len(ids) > MAX_LEN:
        ids = ids[:MAX_LEN]
    return torch.tensor(ids, dtype=torch.long)

Vocab size: 27813


In [5]:
# --------------------------
# 3) Dataset y collate_fn
# --------------------------
class NewsDataset(Dataset):
    def __init__(self, hf_split):
        self.data = hf_split

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        ex = self.data[int(idx)]
        label = int(ex["label"])               # 0..3
        text_ids = text_to_ids(ex["text"])
        return text_ids, label

def collate_fn(batch):
    # batch: lista de (tensor_ids, label_int)
    seqs, labels = zip(*batch)
    lengths = torch.tensor([len(s) for s in seqs], dtype=torch.long)

    # padding a la derecha hasta MAX_LEN
    padded = nn.utils.rnn.pad_sequence(seqs, batch_first=True, padding_value=PAD_IDX)
    if padded.size(1) < MAX_LEN:
        pad_extra = torch.full((padded.size(0), MAX_LEN - padded.size(1)), PAD_IDX, dtype=torch.long)
        padded = torch.cat([padded, pad_extra], dim=1)
    elif padded.size(1) > MAX_LEN:
        padded = padded[:, :MAX_LEN]
        lengths = torch.clamp(lengths, max=MAX_LEN)

    y = torch.tensor(labels, dtype=torch.long)
    return padded, lengths, y

train_loader = DataLoader(NewsDataset(train_ds), batch_size=BATCH_SIZE, shuffle=True,  collate_fn=collate_fn)
test_loader  = DataLoader(NewsDataset(test_ds),  batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

In [6]:
# --------------------------
# 4) Modelo LSTM
# --------------------------
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes,
                 num_layers=1, bidirectional=False, dropout=0.0, pad_idx=None):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=bidirectional,
            dropout=dropout if num_layers > 1 else 0.0
        )
        out_dim = hidden_dim * (2 if bidirectional else 1)
        self.fc = nn.Linear(out_dim, num_classes)

        # Inicialización estable
        nn.init.xavier_uniform_(self.embedding.weight)
        with torch.no_grad():
            if pad_idx is not None:
                self.embedding.weight[pad_idx].fill_(0)

    def forward(self, x, lengths):
        # x: [B, T], lengths: [B]
        emb = self.embedding(x)  # [B, T, E]
        packed = pack_padded_sequence(emb, lengths.cpu(), batch_first=True, enforce_sorted=False)
        _, (h_n, _) = self.lstm(packed)  # h_n: [L*(2 if bi), B, H]

        if self.lstm.bidirectional:
            last_hidden = torch.cat((h_n[-2], h_n[-1]), dim=1)  # [B, 2H]
        else:
            last_hidden = h_n[-1]  # [B, H]

        logits = self.fc(last_hidden)  # [B, C]
        return logits

model = LSTMClassifier(
    vocab_size=len(itos),
    embed_dim=EMBED_DIM,
    hidden_dim=HIDDEN_DIM,
    num_classes=NUM_CLASSES,
    num_layers=NUM_LAYERS,
    bidirectional=BIDIR,
    dropout=DROPOUT,
    pad_idx=PAD_IDX
).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LR)


In [7]:
# --------------------------
# 5) Entrenamiento
# --------------------------
for epoch in range(NUM_EPOCHS):
    print(f"Epoch [{epoch + 1}/{NUM_EPOCHS}]")
    loop = tqdm(train_loader, total=len(train_loader), desc=f"Epoch {epoch}/{NUM_EPOCHS}")
    for X, lengths, y in loop:
        X, lengths, y = X.to(device), lengths.to(device), y.to(device)
        optimizer.zero_grad()
        logits = model(X, lengths)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()
        loop.set_postfix(loss=loss.item())

Epoch [1/3]


Epoch 0/3: 100%|██████████| 1875/1875 [07:54<00:00,  3.96it/s, loss=0.248]


Epoch [2/3]


Epoch 1/3: 100%|██████████| 1875/1875 [08:01<00:00,  3.89it/s, loss=0.181]


Epoch [3/3]


Epoch 2/3: 100%|██████████| 1875/1875 [08:05<00:00,  3.86it/s, loss=0.0715]


In [8]:
# --------------------------
# 6) Evaluación
# --------------------------
def check_accuracy(loader, model):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for X, lengths, y in loader:
            X, lengths, y = X.to(device), lengths.to(device), y.to(device)
            logits = model(X, lengths)
            preds = logits.argmax(dim=1)
            correct += (preds == y).sum().item()
            total   += y.size(0)
    acc = 100.0 * correct / total
    model.train()
    return acc
train_acc = check_accuracy(train_loader, model)
test_acc  = check_accuracy(test_loader, model)

In [9]:
print(f"Train accuracy: {train_acc:.2f}%")
print(f"Test  accuracy: {test_acc:.2f}%")

Train accuracy: 96.02%
Test  accuracy: 91.67%


In [10]:
# --------------------------
# 7) Inferencia de ejemplo
# --------------------------
idx2label = {i: name for i, name in enumerate(TARGET_NAMES)}
example_text = "The central bank raised interest rates to combat inflation."
model.eval()
with torch.no_grad():
    ids = text_to_ids(example_text)
    length = torch.tensor([len(ids)], dtype=torch.long)
    x = ids.unsqueeze(0)
    if x.size(1) < MAX_LEN:
        pad = torch.full((1, MAX_LEN - x.size(1)), PAD_IDX, dtype=torch.long)
        x = torch.cat([x, pad], dim=1)
    elif x.size(1) > MAX_LEN:
        x = x[:, :MAX_LEN]
        length = torch.clamp(length, max=MAX_LEN)

    logits = model(x.to(device), length.to(device))
    pred = logits.argmax(dim=1).item()
print("Ejemplo -> Predicción:", idx2label[pred])

Ejemplo -> Predicción: Business



Ejercicios:
1) Graficar learning curves.
2) Comparar performance para distinto número de capas LSTM.
3) Comparar performance para distintos modelos de redes neuronales recurrentes.