In [1]:
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

from sklearn.preprocessing import LabelEncoder
from torchtext.vocab import FastText


## Preparación de los datos

### Cargar dataset

In [2]:
# Cargar el dataset
df = pd.read_csv('workspace/IMDB Dataset.csv')

# Verificar las columnas y el tamaño
print(df.columns)
print(df.shape)


Index(['review', 'sentiment'], dtype='object')
(50000, 2)


### Preprocesamiento de texto

In [3]:
import re
from collections import defaultdict, Counter
from sklearn.preprocessing import LabelEncoder

def limpiar_texto(texto):
    # Convertir a minúsculas
    texto = texto.lower()
    # Eliminar caracteres no alfabéticos
    texto = re.sub(r'[^a-zA-Z\s]', '', texto)
    return texto

df['review'] = df['review'].apply(limpiar_texto)

# Tokenización simple
df['tokens'] = df['review'].apply(lambda x: x.split())

# Construir el vocabulario
counter = Counter()
for tokens in df['tokens']:
    counter.update(tokens)

# Definir el tamaño del vocabulario
vocab_size = 20000  # Puedes ajustar este valor

# Palabras más comunes
vocab = ['<PAD>', '<UNK>'] + [word for word, freq in counter.most_common(vocab_size - 2)]

# Mapeo de palabras a índices
word_to_idx = {word: idx for idx, word in enumerate(vocab)}

def tokens_a_indices(tokens, word_to_idx, max_len=500):
    indices = [word_to_idx.get(token, word_to_idx['<UNK>']) for token in tokens]
    if len(indices) < max_len:
        indices += [word_to_idx['<PAD>']] * (max_len - len(indices))
    else:
        indices = indices[:max_len]
    return indices

df['input'] = df['tokens'].apply(lambda x: tokens_a_indices(x, word_to_idx))

le = LabelEncoder()
df['label'] = le.fit_transform(df['sentiment'])  # positive:1, negative:0


## Preparar la matriz de embeddings

In [6]:
# Inicializar FastText
fasttext = FastText(language='en')

# Supongamos que ya tienes definido vocab y vocab_size
embedding_dim = fasttext.dim  # Obtiene la dimensión de los embeddings de FastText
embedding_matrix = np.zeros((vocab_size, embedding_dim))

for i, word in enumerate(vocab):
    if word in fasttext.stoi:
        embedding_matrix[i] = fasttext[word]
    else:
        embedding_matrix[i] = np.random.normal(scale=0.6, size=(embedding_dim, ))

### División del dataset

In [7]:
train_df, test_df = train_test_split(df, test_size=0.1, random_state=42, stratify=df['label'])

## Definir el modelo con embeddings preetrenados

In [8]:
class RNNClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout, embedding_matrix):
        super(RNNClassifier, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding.weight = nn.Parameter(torch.tensor(embedding_matrix, dtype=torch.float32))
        self.embedding.weight.requires_grad = False  # Establecer en False para no entrenar los embeddings
        
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, (hidden, cell) = self.lstm(embedded)
        out = self.dropout(hidden[-1])
        out = self.fc(out)
        return out


## Inicializar y entrenar el modelo

In [19]:
class IMDBDataset(Dataset):
    def __init__(self, dataframe):
        self.inputs = dataframe['input'].tolist()
        self.labels = dataframe['label'].tolist()

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.inputs[idx], dtype=torch.long), torch.tensor(self.labels[idx], dtype=torch.float)


In [23]:
# Parámetros del modelo
hidden_dim = 256
output_dim = 1
n_layers = 2
dropout = 0.5
bidirectional = True
dropout = 0.5
batch_size = 64
num_epochs = 10
learning_rate = 1e-3

model = RNNClassifier(vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout, embedding_matrix)

# Definir la función de pérdida y el optimizador
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Mover el modelo y la pérdida a la GPU si está disponible
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = criterion.to(device)

# Preparar los datos
train_dataset = IMDBDataset(train_df)
test_dataset = IMDBDataset(test_df)

# K-Fold Cross Validation
k_folds = 5
kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)

# Dispositivo
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


## Entrenamiento del modelo

## Función de entrenamiento y evaluación

In [21]:
def train_epoch(model, dataloader, criterion, optimizer):
    model.train()
    epoch_loss = 0
    epoch_acc = 0
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        lengths = (inputs != word_to_idx['<PAD>']).sum(1)
        optimizer.zero_grad()
        predictions = model(inputs, lengths)
        loss = criterion(predictions, labels)
        rounded_preds = torch.round(predictions)
        correct = (rounded_preds == labels).float()
        acc = correct.sum() / len(correct)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        epoch_acc += acc.item()
    return epoch_loss / len(dataloader), epoch_acc / len(dataloader)

def evaluate(model, dataloader, criterion):
    model.eval()
    epoch_loss = 0
    epoch_acc = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            lengths = (inputs != word_to_idx['<PAD>']).sum(1)
            predictions = model(inputs, lengths)
            loss = criterion(predictions, labels)
            rounded_preds = torch.round(predictions)
            correct = (rounded_preds == labels).float()
            acc = correct.sum() / len(correct)
            epoch_loss += loss.item()
            epoch_acc += acc.item()
    return epoch_loss / len(dataloader), epoch_acc / len(dataloader)


### Entrenamiento con K-fold

In [24]:
fold_results = {}

for fold, (train_ids, val_ids) in enumerate(kfold.split(train_dataset)):
    print(f'Fold {fold+1}')
    
    # Subset para este fold
    train_subsampler = torch.utils.data.Subset(train_dataset, train_ids)
    val_subsampler = torch.utils.data.Subset(train_dataset, val_ids)
    
    # DataLoaders
    train_loader = DataLoader(train_subsampler, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_subsampler, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
    
    # Inicializar el modelo
    model = RNNClassifier(vocab_size, embed_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout)
    model.to(device)
    
    # Definir la pérdida y el optimizador
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    # Guardar pérdidas y precisiones
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []
    
    # Entrenamiento por épocas
    for epoch in range(num_epochs):
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
        val_loss, val_acc = evaluate(model, val_loader, criterion)
        
        train_losses.append(train_loss)
        train_accuracies.append(train_acc)
        val_losses.append(val_loss)
        val_accuracies.append(val_acc)
        
        print(f'Epoch {epoch+1}:')
        print(f'\tTrain Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}%')
        print(f'\t Val. Loss: {val_loss:.4f} |  Val. Acc: {val_acc*100:.2f}%')
    
    # Guardar resultados del fold
    fold_results[fold] = {
        'train_losses': train_losses,
        'train_accuracies': train_accuracies,
        'val_losses': val_losses,
        'val_accuracies': val_accuracies,
        'model_state': model.state_dict()
    }

# Visualizar resultados promedio
avg_train_losses = np.mean([fold_results[fold]['train_losses'] for fold in fold_results], axis=0)
avg_val_losses = np.mean([fold_results[fold]['val_losses'] for fold in fold_results], axis=0)
avg_train_acc = np.mean([fold_results[fold]['train_accuracies'] for fold in fold_results], axis=0)
avg_val_acc = np.mean([fold_results[fold]['val_accuracies'] for fold in fold_results], axis=0)

epochs_range = range(1, num_epochs+1)

plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
plt.plot(epochs_range, avg_train_losses, label='Train Loss')
plt.plot(epochs_range, avg_val_losses, label='Validation Loss')
plt.legend()
plt.title('Pérdida por Época')

plt.subplot(1,2,2)
plt.plot(epochs_range, avg_train_acc, label='Train Acc')
plt.plot(epochs_range, avg_val_acc, label='Validation Acc')
plt.legend()
plt.title('Precisión por Época')

plt.show()


Fold 1


NameError: name 'collate_fn' is not defined

In [16]:
# Guardar pérdidas y precisiones
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

# Entrenamiento por épocas
for epoch in range(num_epochs):
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer)
    val_loss, val_acc = evaluate(model, val_loader, criterion)

    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    val_losses.append(val_loss)
    val_accuracies.append(val_acc)

    print(f'Epoch {epoch+1}:')
    print(f'\tTrain Loss: {train_loss:.4f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {val_loss:.4f} |  Val. Acc: {val_acc*100:.2f}%')

    # Guardar resultados del fold
fold_results[fold] = {
    'train_losses': train_losses,
    'train_accuracies': train_accuracies,
    'val_losses': val_losses,
    'val_accuracies': val_accuracies,
    'model_state': model.state_dict()
}

# Visualizar resultados promedio
avg_train_losses = np.mean([fold_results[fold]['train_losses'] for fold in fold_results], axis=0)
avg_val_losses = np.mean([fold_results[fold]['val_losses'] for fold in fold_results], axis=0)
avg_train_acc = np.mean([fold_results[fold]['train_accuracies'] for fold in fold_results], axis=0)
avg_val_acc = np.mean([fold_results[fold]['val_accuracies'] for fold in fold_results], axis=0)

epochs_range = range(1, num_epochs+1)

plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
plt.plot(epochs_range, avg_train_losses, label='Train Loss')
plt.plot(epochs_range, avg_val_losses, label='Validation Loss')
plt.legend()
plt.title('Pérdida por Época')

plt.subplot(1,2,2)
plt.plot(epochs_range, avg_train_acc, label='Train Acc')
plt.plot(epochs_range, avg_val_acc, label='Validation Acc')
plt.legend()
plt.title('Precisión por Época')

plt.show()

NameError: name 'train_loader' is not defined

## Evaluación del conjunto de prueba

### Selección del mejor K-fold

In [None]:
best_fold = 0
best_model_state = fold_results[best_fold]['model_state']

### Cargar el mejor modelo

In [None]:
# Inicializar el modelo y cargar el estado
best_model = RNNClassifier(vocab_size, embed_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout)
best_model.load_state_dict(best_model_state)
best_model.to(device)
best_model.eval()

### Crear dataloader de prueba

In [None]:
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        lengths = (inputs != word_to_idx['<PAD>']).sum(1)
        predictions = best_model(inputs, lengths)
        rounded_preds = torch.round(predictions)
        all_preds.extend(rounded_preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Cálculo de métricas
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)
conf_matrix = confusion_matrix(all_labels, all_preds)

print(f'Precisión: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')
print('Matriz de Confusión:')
print(conf_matrix)

### Visualización de la matriz de confisión

In [None]:
import seaborn as sns

plt.figure(figsize=(6,4))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=le.classes_, yticklabels=le.classes_)
plt.xlabel('Predicción')
plt.ylabel('Etiqueta Verdadera')
plt.title('Matriz de Confusión')
plt.show()