In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

from torchtext.legacy import datasets
from torchtext.legacy.data import Field, LabelField, BucketIterator
from torch.nn.utils.rnn import pad_sequence, pad_packed_sequence, pack_padded_sequence

from sklearn.metrics import confusion_matrix, classification_report

import numpy as np
import random

RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.cuda.manual_seed_all(RANDOM_SEED)

In [None]:
TEXT = Field(tokenize = 'spacy', lower = True) # Indicamos que queremos el texto tokenizado
LABEL = LabelField(dtype = torch.int64) # Indicamos que la etiqueta la queremos como un entero

train_data, test_data = datasets.IMDB.splits(TEXT, LABEL) # Descargar el dataset IMDB

downloading aclImdb_v1.tar.gz


100%|██████████| 84.1M/84.1M [00:02<00:00, 30.1MB/s]


In [None]:
# Mostrar el número de instancias de entrenamiento y prueba
print(f"Number of training examples: {len(train_data.examples)}")
print(f"Number of testing examples: {len(test_data.examples)}")

# Mostrar como ejemplo la primera instancia
print(vars(train_data.examples[0]))

Number of training examples: 25000
Number of testing examples: 25000
{'text': ['it', "'s", 'a', 'very', 'nice', 'movie', 'and', 'i', 'would', 'definitely', 'recommend', 'it', 'to', 'everyone', '.', 'but', 'there', 'are', '2', 'minus', 'points', ':', '-', 'the', 'level', 'of', 'the', 'stories', 'has', 'a', 'large', 'spectrum', '.', 'some', 'of', 'the', 'scenes', 'are', 'very', 'great', 'and', 'some', 'are', 'just', 'boring', '.', '-', 'a', 'lot', 'of', 'stories', 'are', 'not', 'self', '-', 'contained', '(', 'if', 'you', 'compare', 'to', 'f.e', '.', 'coffee', 'and', 'cigarettes', ',', 'where', 'each', 'story', 'has', 'a', 'point', ',', 'a', 'message', ',', 'a', 'punchline', 'or', 'however', 'you', 'wanna', 'call', 'it', ')', 'but', 'well', ',', 'most', 'stories', 'are', 'really', 'good', ',', 'some', 'are', 'great', 'and', 'overall', 'it', "'s", 'one', 'of', 'the', 'best', 'movies', 'this', 'year', 'for', 'sure!<br', '/><br', '/>annoying', ',', 'that', 'i', 'have', 'to', 'fill', '10', 'l

In [None]:
# Construir vocabulario del conjunto de entrenamiento
TEXT.build_vocab(train_data, max_size=10000, min_freq=5, vectors="glove.6B.100d")  # Usando word embeddings pre-entrenados
LABEL.build_vocab(train_data, min_freq = 5)

print(f"Unique tokens in TEXT vocabulary: {len(TEXT.vocab)}")
print(f"Unique tokens in LABEL vocabulary: {len(LABEL.vocab)}")

.vector_cache/glove.6B.zip: 862MB [02:40, 5.38MB/s]                           
100%|█████████▉| 399999/400000 [00:15<00:00, 26152.29it/s]


Unique tokens in TEXT vocabulary: 10002
Unique tokens in LABEL vocabulary: 2


In [None]:
# Asignamos el dispositivo en el que se entrenará el modelo propuesto
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

BATCH_SIZE = 64

# Iterador de entrenamiento y prueba
train_iterator, test_iterator = BucketIterator.splits(
      (train_data, test_data), 
      batch_size = BATCH_SIZE, 
      device = device)

In [None]:
# Creamos la clase del modelo
class BiGRU(nn.Module):
  def __init__(self, input_size, vocab_size, output_dim, emb_dim, hidden_dim, n_layers, dropout_rate):
    # vocab_size <--- tamaño del vocabulario
    # output_dim <--- ([positive, negative]) == 2
    # emb_dim <--- dimensión de la matríz de embeddings
    # hidden_dim <--- dimensión de la codificación
    # n_layers <--- número de capas en la GRU
    
    super(BiGRU, self).__init__()
    self.n_layers = n_layers
    self.hidden_dim = hidden_dim
    self.input_size = input_size

    self.embedding = nn.Embedding(vocab_size, emb_dim)
    self.gru = nn.GRU(input_size=input_size, hidden_size=hidden_dim, 
                          num_layers=n_layers, bidirectional=True)
    self.fc1 = nn.Linear(hidden_dim*2, 64)
    self.fc2 = nn.Linear(64, output_dim)
    
    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(dropout_rate)

  def forward(self, text):
    embedded = self.dropout(self.embedding(text))

    output, hn = self.gru(embedded)
    hn = torch.cat([h for h in hn], dim=-1)
    
    output = self.fc1(hn)
    output = self.fc2(self.relu(output))

    return output

In [None]:
# Inicialización de hiperparámetros
INPUT_SIZE = 100
VOCAB_SIZE = len(TEXT.vocab)
OUTPUT_DIM = len(LABEL.vocab)
EMBBEDING_DIM = 100
HID_DIM = 128
N_LAYERS = 1
DROPOUT_RATE = 0.10
LEARNING_RATE = 1e-3

# Inicializar nuestro modelo
model = BiGRU(INPUT_SIZE, VOCAB_SIZE, OUTPUT_DIM, EMBBEDING_DIM, HID_DIM, N_LAYERS, DROPOUT_RATE).to(device)

# Cargar los word embedding pre-entrenados
model.embedding.weight.data.copy_(TEXT.vocab.vectors)

# Definimos el optimizados
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Definimos un decremento de tasa de aprendizaje (opcional)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3, verbose=True)

# Definimos la función de pérdida
criterion = nn.CrossEntropyLoss()

In [None]:
# Calcula el total de etiquetas correctamente clasificadas
def sum_correct(preds, labels):
  pred_flat = np.argmax(preds, axis=1).flatten()
  labels_flat = labels.flatten()
  return np.sum(pred_flat == labels_flat), pred_flat, labels_flat

# Función para el entrenamiento de nuestro modelo
def train(model, iterator, optimizer=optimizer, criterion=criterion, clip=1):
    model.train()
    epoch_loss = 0
    total_correct = 0
    total_count = 0
    
    for i, batch in enumerate(iterator):
        src = batch.text.to(device)
        trg = batch.label.to(device)
        
        optimizer.zero_grad()
        output = model(src)

        total_correct += torch.sum(torch.eq(output.argmax(1), trg))
        total_count += len(trg)
        
        loss = criterion(output, trg)
        
        loss.backward() 
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
    
    print(f'Train accuracy: {(total_correct/total_count):.6f}')
    mean_loss = epoch_loss / len(iterator)
    scheduler.step(mean_loss)
    return mean_loss # Pérdida promedio

In [None]:
# Ciclo de entrenamiento
total_epoch = 5
for epoch in range(total_epoch):
  result = train(model=model, iterator=train_iterator)
  print(f'Epoch {epoch + 1} / {total_epoch}, Mean loss: {result:.6f}')

Train accuracy: 0.749280
Epoch 1 / 5, Mean loss: 0.493229
Train accuracy: 0.897120
Epoch 2 / 5, Mean loss: 0.262159
Train accuracy: 0.930400
Epoch 3 / 5, Mean loss: 0.183681
Train accuracy: 0.951160
Epoch 4 / 5, Mean loss: 0.137071
Train accuracy: 0.967400
Epoch 5 / 5, Mean loss: 0.097001


In [None]:
total_correct = 0
total_count = 0
model_prediction = []
ground_truth = []

with torch.no_grad():
  for i, batch in enumerate(test_iterator):
    src = batch.text.to(device)
    trg = batch.label.to(device)
    output = model(src)

    # Mover las etiquetas y las predicciones a la cpu
    output = output.detach().cpu().numpy()
    label_ids = trg.to('cpu').numpy()

    # Sumar las predicciones correctas
    correct, predictions, labels = sum_correct(output, label_ids)
    model_prediction.append(predictions.tolist())
    ground_truth.append(labels.tolist())
    total_correct += correct
    total_count += len(trg)

print(f'Test accuracy: {(total_correct/total_count):.6f}')

Test accuracy: 0.893600


In [None]:
# Reporte de clasificación
model_prediction = [item for sublist in model_prediction for item in sublist]
ground_truth = [item for sublist in ground_truth for item in sublist]
print(classification_report(ground_truth, model_prediction, labels=[0 ,1], digits=4))

              precision    recall  f1-score   support

           0     0.9069    0.8772    0.8918     12500
           1     0.8811    0.9100    0.8953     12500

    accuracy                         0.8936     25000
   macro avg     0.8940    0.8936    0.8936     25000
weighted avg     0.8940    0.8936    0.8936     25000

