Primer BETO para clasificacion de frase en insultos,saludos, preguntas personales, preguntas economicas, preguntas de proceso, respuestas para cada tipo.

In [None]:
# mount drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:

%pip install transformers torch pandas scikit-learn numpy

Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from transformers import BertTokenizerFast
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import json
import os

# --- Configuración ---
ARCHIVO_DATASET = 'drive/MyDrive/PLN_Segundo_Momento/dataset_completo.csv'
MODELO_BERT = 'dccuchile/bert-base-spanish-wwm-uncased' # Modelo BETO
OUTPUT_DIR = 'drive/MyDrive/PLN_Segundo_Momento/modelo_clasificador_frases' # Directorio para guardar el modelo
MAX_LEN = 128 # Longitud máxima de secuencia para BERT
BATCH_SIZE = 16 # Tamaño del lote para entrenamiento
EPOCHS = 4      # Número de épocas de entrenamiento (ajustar según sea necesario)
LEARNING_RATE = 2e-5 # Tasa de aprendizaje para fine-tuning

# --- Carga y Preprocesamiento del Dataset ---
print(f"Cargando dataset desde: {ARCHIVO_DATASET}")
try:
    df = pd.read_csv(ARCHIVO_DATASET)
    print(f"Dataset cargado con {len(df)} filas.")
    print("Columnas encontradas:", df.columns.tolist())

    # Asegurarse que las columnas necesarias existen
    if 'Frase' not in df.columns or 'Categoria' not in df.columns:
        raise ValueError("El CSV debe contener las columnas 'Frase' y 'Categoria'")

    # Eliminar filas con valores nulos en Frase o Categoria si existen
    df.dropna(subset=['Frase', 'Categoria'], inplace=True)
    df = df[df['Frase'].astype(str).str.strip() != ''] # Eliminar frases vacías
    print(f"Dataset después de limpiar nulos/vacíos: {len(df)} filas.")

    if df.empty:
        raise ValueError("El dataset está vacío después de la limpieza.")

    print("\nDistribución inicial de categorías:")
    print(df['Categoria'].value_counts())

except FileNotFoundError:
    print(f"Error: No se encontró el archivo {ARCHIVO_DATASET}")
    exit()
except Exception as e:
    print(f"Error al cargar o procesar el dataset: {e}")
    exit()

# --- Codificar las Etiquetas (Categorías) ---
# Convierte las etiquetas de texto ("Insulto", "Saludo") en números (0, 1, ...)
label_encoder = LabelEncoder()
df['label'] = label_encoder.fit_transform(df['Categoria'])
num_labels = len(label_encoder.classes_)
print(f"\nSe encontraron {num_labels} categorías únicas.")

# Guardar el mapeo de label a id y viceversa (IMPORTANTE para después)
label2id = {label: i for i, label in enumerate(label_encoder.classes_)}
id2label = {i: label for i, label in enumerate(label_encoder.classes_)}

# Crear directorio de salida si no existe
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Guardar los mapeos
map_file_path = os.path.join(OUTPUT_DIR, 'label_mappings.json')
with open(map_file_path, 'w', encoding='utf-8') as f:
    json.dump({'label2id': label2id, 'id2label': id2label}, f, ensure_ascii=False, indent=4)
print(f"Mapeos de etiquetas guardados en: {map_file_path}")

# --- Dividir en Entrenamiento y Validación ---
# Usamos una división estratificada para mantener la proporción de categorías
train_texts, val_texts, train_labels, val_labels = train_test_split(
    df['Frase'].tolist(),
    df['label'].tolist(),
    test_size=0.2, # 20% para validación
    random_state=42,
    stratify=df['label'] # Importante para clases desbalanceadas
)

print(f"\nDatos divididos:")
print(f"  - Entrenamiento: {len(train_texts)} frases")
print(f"  - Validación: {len(val_texts)} frases")

# --- Tokenización ---
print(f"\nCargando tokenizador para el modelo: {MODELO_BERT}")
tokenizer = BertTokenizerFast.from_pretrained(MODELO_BERT)

# Tokenizar los datasets
# padding='max_length' -> Rellena secuencias cortas hasta MAX_LEN
# truncation=True -> Corta secuencias largas a MAX_LEN
# return_tensors='pt' -> Devuelve tensores de PyTorch
train_encodings = tokenizer(train_texts, truncation=True, padding='max_length', max_length=MAX_LEN)
val_encodings = tokenizer(val_texts, truncation=True, padding='max_length', max_length=MAX_LEN)

# --- Crear Clase de Dataset para PyTorch ---
class FrasesDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        # 'input_ids', 'token_type_ids', 'attention_mask' son generados por el tokenizador
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx]) # Añade la etiqueta numérica
        return item

    def __len__(self):
        return len(self.labels)

# Crear instancias del Dataset
train_dataset = FrasesDataset(train_encodings, train_labels)
val_dataset = FrasesDataset(val_encodings, val_labels)

# --- Crear DataLoaders ---
# Permiten cargar datos en lotes (batches) de forma eficiente
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

print("\n¡Preparación de datos completada!")

Cargando dataset desde: drive/MyDrive/PLN_Segundo_Momento/dataset_completo.csv
Dataset cargado con 15780 filas.
Columnas encontradas: ['Frase', 'Categoria']
Dataset después de limpiar nulos/vacíos: 15780 filas.

Distribución inicial de categorías:
Categoria
Respuesta Empleados            1500
Respuesta Cartera              1500
Respuesta Categoria Empresa    1500
Respuesta Deudas               1500
Respuesta Ganancias            1500
Respuesta Nombre               1500
Insulto                        1500
Respuesta Activos              1500
Saludo                         1500
Pregunta Económica             1123
Pregunta Personal               488
Pregunta sobre Proceso          425
Respuesta Sector                244
Name: count, dtype: int64

Se encontraron 13 categorías únicas.
Mapeos de etiquetas guardados en: drive/MyDrive/PLN_Segundo_Momento/modelo_clasificador_frases/label_mappings.json

Datos divididos:
  - Entrenamiento: 12624 frases
  - Validación: 3156 frases

Cargando tokeniz

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/310 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/248k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/486k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/134 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/650 [00:00<?, ?B/s]


¡Preparación de datos completada!


In [None]:
from transformers import BertForSequenceClassification, get_linear_schedule_with_warmup
from torch.optim import AdamW
import torch
from tqdm.auto import tqdm # Barra de progreso
from sklearn.metrics import accuracy_score, f1_score

# --- Cargar Modelo Pre-entrenado con Cabeza de Clasificación ---
print(f"\nCargando modelo pre-entrenado: {MODELO_BERT}")
# BertForSequenceClassification ya incluye BERT + una capa lineal para clasificación
# Le decimos cuántas etiquetas de salida debe tener (num_labels)
model = BertForSequenceClassification.from_pretrained(
    MODELO_BERT,
    num_labels=num_labels # El número de categorías que encontramos antes
)

# --- Configurar Dispositivo (GPU si está disponible, si no CPU) ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device) # Mover el modelo al dispositivo
print(f"\nUsando dispositivo: {device}")
if device.type == 'cpu':
    print("ADVERTENCIA: Entrenar en CPU será significativamente más lento. Se recomienda GPU.")

# --- Optimizador y Planificador de Tasa de Aprendizaje ---
# AdamW es un optimizador común para Transformers
optimizer = AdamW(model.parameters(), lr=LEARNING_RATE)

# El planificador ajusta la tasa de aprendizaje durante el entrenamiento
num_training_steps = len(train_loader) * EPOCHS
lr_scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=0, # Puedes ajustar esto, e.g., 10% de los pasos
    num_training_steps=num_training_steps
)

print("\nConfiguración del modelo y optimizador lista.")


Cargando modelo pre-entrenado: dccuchile/bert-base-spanish-wwm-uncased


pytorch_model.bin:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at dccuchile/bert-base-spanish-wwm-uncased and are newly initialized: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight', 'classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]


Usando dispositivo: cuda

Configuración del modelo y optimizador lista.


In [None]:
print("\n--- Iniciando Entrenamiento ---")

best_val_accuracy = 0.0 # Para guardar el mejor modelo

for epoch in range(EPOCHS):
    print(f"\n--- Época {epoch + 1}/{EPOCHS} ---")

    # --- Fase de Entrenamiento ---
    model.train() # Poner el modelo en modo entrenamiento
    total_train_loss = 0
    progress_bar_train = tqdm(train_loader, desc="Entrenando", leave=False)

    for batch in progress_bar_train:
        optimizer.zero_grad() # Limpiar gradientes anteriores

        # Mover datos del lote al dispositivo (GPU/CPU)
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        # token_type_ids no suele ser necesario para clasificación de una sola secuencia

        # Pasar datos por el modelo
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)

        # El modelo devuelve la pérdida (loss) directamente cuando se proporcionan 'labels'
        loss = outputs.loss
        total_train_loss += loss.item()

        # Backpropagation: Calcular gradientes
        loss.backward()

        # Actualizar pesos del modelo
        optimizer.step()

        # Actualizar tasa de aprendizaje
        lr_scheduler.step()

        progress_bar_train.set_postfix({'loss': loss.item()})

    avg_train_loss = total_train_loss / len(train_loader)
    print(f"Pérdida media de entrenamiento: {avg_train_loss:.4f}")

    # --- Fase de Validación ---
    model.eval() # Poner el modelo en modo evaluación (desactiva dropout, etc.)
    total_val_loss = 0
    all_preds = []
    all_labels = []
    progress_bar_val = tqdm(val_loader, desc="Validando", leave=False)

    # No necesitamos calcular gradientes durante la validación
    with torch.no_grad():
        for batch in progress_bar_val:
            # Mover datos del lote al dispositivo
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            # Pasar datos por el modelo
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)

            loss = outputs.loss
            total_val_loss += loss.item()

            # Obtener las predicciones (logits) y encontrar la clase predicha (índice con mayor valor)
            logits = outputs.logits
            predictions = torch.argmax(logits, dim=-1)

            # Guardar predicciones y etiquetas verdaderas para calcular métricas después
            all_preds.extend(predictions.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            progress_bar_val.set_postfix({'loss': loss.item()})

    avg_val_loss = total_val_loss / len(val_loader)
    # Calcular métricas de validación
    val_accuracy = accuracy_score(all_labels, all_preds)
    # Usamos 'weighted' para F1-score por si las clases están desbalanceadas
    val_f1 = f1_score(all_labels, all_preds, average='weighted')

    print(f"Pérdida media de validación: {avg_val_loss:.4f}")
    print(f"Accuracy de validación: {val_accuracy:.4f}")
    print(f"F1-Score (Weighted) de validación: {val_f1:.4f}")

    # --- Guardar el mejor modelo basado en accuracy de validación ---
    if val_accuracy > best_val_accuracy:
        print(f"¡Mejora en Accuracy de Validación encontrada ({best_val_accuracy:.4f} -> {val_accuracy:.4f})! Guardando modelo...")
        best_val_accuracy = val_accuracy
        # Guarda el modelo, tokenizador y configuración en el directorio especificado
        model.save_pretrained(OUTPUT_DIR)
        tokenizer.save_pretrained(OUTPUT_DIR)
        print(f"Modelo y tokenizador guardados en: {OUTPUT_DIR}")
    else:
        print(f"No hubo mejora en Accuracy de Validación respecto a la mejor ({best_val_accuracy:.4f}).")


print("\n--- Entrenamiento Completado ---")
print(f"El mejor modelo (basado en accuracy de validación) se guardó en: {OUTPUT_DIR}")


--- Iniciando Entrenamiento ---

--- Época 1/4 ---


Entrenando:   0%|          | 0/789 [00:00<?, ?it/s]

Pérdida media de entrenamiento: 0.2202


Validando:   0%|          | 0/198 [00:00<?, ?it/s]

Pérdida media de validación: 0.0165
Accuracy de validación: 0.9978
F1-Score (Weighted) de validación: 0.9978
¡Mejora en Accuracy de Validación encontrada (0.0000 -> 0.9978)! Guardando modelo...
Modelo y tokenizador guardados en: drive/MyDrive/PLN_Segundo_Momento/modelo_clasificador_frases

--- Época 2/4 ---


Entrenando:   0%|          | 0/789 [00:00<?, ?it/s]

Pérdida media de entrenamiento: 0.0128


Validando:   0%|          | 0/198 [00:00<?, ?it/s]

Pérdida media de validación: 0.0102
Accuracy de validación: 0.9987
F1-Score (Weighted) de validación: 0.9987
¡Mejora en Accuracy de Validación encontrada (0.9978 -> 0.9987)! Guardando modelo...
Modelo y tokenizador guardados en: drive/MyDrive/PLN_Segundo_Momento/modelo_clasificador_frases

--- Época 3/4 ---


Entrenando:   0%|          | 0/789 [00:00<?, ?it/s]

Pérdida media de entrenamiento: 0.0055


Validando:   0%|          | 0/198 [00:00<?, ?it/s]

Pérdida media de validación: 0.0074
Accuracy de validación: 0.9994
F1-Score (Weighted) de validación: 0.9994
¡Mejora en Accuracy de Validación encontrada (0.9987 -> 0.9994)! Guardando modelo...
Modelo y tokenizador guardados en: drive/MyDrive/PLN_Segundo_Momento/modelo_clasificador_frases

--- Época 4/4 ---


Entrenando:   0%|          | 0/789 [00:00<?, ?it/s]

Pérdida media de entrenamiento: 0.0030


Validando:   0%|          | 0/198 [00:00<?, ?it/s]

Pérdida media de validación: 0.0069
Accuracy de validación: 0.9994
F1-Score (Weighted) de validación: 0.9994
No hubo mejora en Accuracy de Validación respecto a la mejor (0.9994).

--- Entrenamiento Completado ---
El mejor modelo (basado en accuracy de validación) se guardó en: drive/MyDrive/PLN_Segundo_Momento/modelo_clasificador_frases


In [None]:
import torch
from transformers import BertForSequenceClassification, BertTokenizerFast
import json
import os

# --- Configuración (Asegúrate que coincida con lo guardado) ---
MODELO_GUARDADO_DIR = 'drive/MyDrive/PLN_Segundo_Momento/modelo_clasificador_frases' # Directorio donde se guardó el modelo
MAX_LEN = 128 # Debe ser el mismo MAX_LEN usado en el entrenamiento

# --- Cargar Mapeo de Etiquetas ---
map_file_path = os.path.join(MODELO_GUARDADO_DIR, 'label_mappings.json')
try:
    with open(map_file_path, 'r', encoding='utf-8') as f:
        mappings = json.load(f)
    id2label = {int(k): v for k, v in mappings['id2label'].items()} # Convertir keys a int
    label2id = mappings['label2id']
    num_labels = len(id2label)
    print(f"Mapeos de etiquetas cargados desde: {map_file_path}")
except FileNotFoundError:
    print(f"Error: No se encontró el archivo de mapeo '{map_file_path}'. Asegúrate de haber entrenado y guardado el modelo.")
    exit()
except Exception as e:
    print(f"Error al cargar el mapeo de etiquetas: {e}")
    exit()

# --- Cargar Modelo y Tokenizador Guardados ---
print(f"\nCargando modelo y tokenizador desde: {MODELO_GUARDADO_DIR}")
try:
    model_cargado = BertForSequenceClassification.from_pretrained(MODELO_GUARDADO_DIR)
    tokenizer_cargado = BertTokenizerFast.from_pretrained(MODELO_GUARDADO_DIR)
    print("¡Modelo y tokenizador cargados exitosamente!")
except Exception as e:
    print(f"Error al cargar el modelo o tokenizador desde '{MODELO_GUARDADO_DIR}': {e}")
    print("Asegúrate de que el directorio exista y contenga los archivos del modelo guardado.")
    exit()

# --- Configurar Dispositivo ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_cargado.to(device)
model_cargado.eval() # Poner el modelo en modo evaluación SIEMPRE para inferencia
print(f"\nModelo cargado y listo para predicciones en dispositivo: {device}")

Mapeos de etiquetas cargados desde: drive/MyDrive/PLN_Segundo_Momento/modelo_clasificador_frases/label_mappings.json

Cargando modelo y tokenizador desde: drive/MyDrive/PLN_Segundo_Momento/modelo_clasificador_frases
¡Modelo y tokenizador cargados exitosamente!

Modelo cargado y listo para predicciones en dispositivo: cuda


In [None]:
def predecir_categoria(texto, modelo, tokenizer, device, id_to_label_map, max_len):
    """Predice la categoría de una frase usando el modelo cargado."""
    if not texto or not isinstance(texto, str) or texto.strip() == "":
        return "Texto inválido."

    # 1. Tokenizar el texto de entrada
    inputs = tokenizer(
        texto,
        return_tensors='pt', # Devolver tensores PyTorch
        max_length=max_len,
        padding='max_length', # Rellenar hasta max_len
        truncation=True # Truncar si es más largo
    )

    # 2. Mover los tensores al dispositivo correcto (GPU/CPU)
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    # 3. Realizar la predicción (SIN calcular gradientes)
    with torch.no_grad():
        outputs = modelo(input_ids, attention_mask=attention_mask)

    # 4. Obtener los logits (puntuaciones crudas para cada clase)
    logits = outputs.logits

    # 5. Encontrar la clase con la puntuación más alta (el índice predicho)
    predicted_class_id = torch.argmax(logits, dim=-1).item()

    # 6. Mapear el ID predicho de vuelta a la etiqueta de texto (nombre de la categoría)
    categoria_predicha = id_to_label_map.get(predicted_class_id, "Categoría Desconocida")

    # Opcional: Obtener probabilidades (aplicando Softmax a los logits)
    # probabilities = torch.softmax(logits, dim=-1).cpu().numpy()[0]
    # probability = probabilities[predicted_class_id]
    # return f"{categoria_predicha} (Prob: {probability:.4f})"

    return categoria_predicha

# --- Bucle de Chat Simple ---
print("\n--- Chat Simple ---")
print("Escribe una frase para clasificarla o 'salir' para terminar.")

while True:
    try:
        frase_usuario = input("Tú: ")
        if frase_usuario.lower() in ['salir', 'exit', 'quit', 'terminar']:
            print("Chatbot: ¡Adiós!")
            break

        categoria = predecir_categoria(
            frase_usuario,
            model_cargado,
            tokenizer_cargado,
            device,
            id2label, # Usamos el mapeo de ID a Label
            MAX_LEN
        )
        print(f"Chatbot (predicción): {categoria}")

    except KeyboardInterrupt: # Manejar Ctrl+C
        print("\nChatbot: ¡Adiós!")
        break
    except Exception as e:
        print(f"Chatbot: Ocurrió un error inesperado: {e}")


--- Chat Simple ---
Escribe una frase para clasificarla o 'salir' para terminar.
Chatbot (predicción): Saludo
Chatbot (predicción): Insulto
Chatbot (predicción): Insulto
Chatbot (predicción): Respuesta Empleados
Chatbot (predicción): Insulto
Chatbot (predicción): Respuesta Nombre
Chatbot (predicción): Saludo
Chatbot (predicción): Saludo
Chatbot (predicción): Pregunta Personal
Chatbot (predicción): Insulto
Chatbot (predicción): Pregunta sobre Proceso
Chatbot (predicción): Respuesta Deudas
Chatbot (predicción): Pregunta sobre Proceso
Chatbot (predicción): Respuesta Empleados
Tú: salir


## 2. MODELO BERTO PARA NER

In [None]:
%pip install transformers torch pandas scikit-learn numpy datasets seqeval

Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting seqeval
  Downloading seqeval-1.2.2.tar.gz (43 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.6/43.6 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloa

In [None]:
import json
import os
import torch
import numpy as np
from datasets import Dataset, DatasetDict, Features, Value, ClassLabel, Sequence
from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
    TrainingArguments,
    Trainer,
    DataCollatorForTokenClassification
)
from seqeval.metrics import classification_report, f1_score
from seqeval.scheme import IOB2 # Usaremos el esquema IOB2 (similar a BIO)

# --- Configuración ---
ARCHIVO_DATASET_JSONL = 'drive/MyDrive/PLN_Segundo_Momento/dataset_ner.jsonl' # TU archivo de entrada
MODELO_BERT_NER = 'dccuchile/bert-base-spanish-wwm-uncased' # Modelo BETO (o BERTIN)
OUTPUT_DIR_NER = 'drive/MyDrive/PLN_Segundo_Momento/modelo_ner_frases' # Directorio para guardar el modelo NER
MAX_LEN_NER = 128 # Longitud máxima de secuencia para BERT
BATCH_SIZE_NER = 8 # Lotes más pequeños suelen ser mejores para NER
EPOCHS_NER = 5      # NER puede requerir más épocas que clasificación
LEARNING_RATE_NER = 3e-5
LABEL_ALL_SUBWORDS = False # Estrategia para etiquetar subpalabras (False es común)

# --- 1. Carga y Preparación de Datos ---

def cargar_datos_jsonl(ruta_archivo):
    """Carga datos desde un archivo JSON Lines."""
    datos = []
    try:
        with open(ruta_archivo, 'r', encoding='utf-8') as f:
            for linea in f:
                try:
                    datos.append(json.loads(linea))
                except json.JSONDecodeError:
                    print(f"Advertencia: Omitiendo línea mal formada: {linea.strip()}")
        print(f"Cargados {len(datos)} ejemplos desde {ruta_archivo}")
        if not datos:
            raise ValueError("No se cargaron datos válidos del archivo.")
        return datos
    except FileNotFoundError:
        print(f"Error: No se encontró el archivo {ruta_archivo}")
        exit()
    except Exception as e:
        print(f"Error inesperado al cargar {ruta_archivo}: {e}")
        exit()

def crear_dataset_hf(datos_cargados):
    """Convierte los datos cargados a un Dataset de Hugging Face."""
    # Extraer textos y entidades
    textos = [ejemplo['text'] for ejemplo in datos_cargados]
    # Extraer las entidades en el formato esperado por `datasets`
    # Necesitamos convertir [start, end, label] a [{'start': start, 'end': end, 'label': label}]
    ner_tags_list = []
    all_labels_set = set()
    for ejemplo in datos_cargados:
        tags_ejemplo = []
        if 'entities' in ejemplo and ejemplo['entities']:
            for start, end, label in ejemplo['entities']:
                # Validación simple
                if start >= end or start < 0 or end > len(ejemplo['text']):
                     print(f"Advertencia: Entidad inválida omitida en '{ejemplo['text']}': {[start, end, label]}")
                     continue
                tags_ejemplo.append({'start': start, 'end': end, 'label': label})
                all_labels_set.add(label)
        ner_tags_list.append(tags_ejemplo)

    if not all_labels_set:
        print("Error: No se encontraron etiquetas de entidad válidas en los datos.")
        exit()

    # Crear la estructura de Features para el Dataset
    # Mapeamos las etiquetas de string a ClassLabel
    label_list = sorted(list(all_labels_set))
    features = Features({
        'id': Value('string'),
        'text': Value('string'),
        'ner_tags': Sequence({ # Lista de diccionarios por cada entidad
            'start': Value('int32'),
            'end': Value('int32'),
            'label': ClassLabel(names=label_list) # Mapea string a int automáticamente
        })
    })

    # Crear el Dataset
    hf_dataset = Dataset.from_dict(
        {
            "id": [str(i) for i in range(len(textos))],
            "text": textos,
            "ner_tags": ner_tags_list,
        },
        features=features
    )

    print("\nDataset de Hugging Face creado.")
    print("Características (Features):", hf_dataset.features)
    # El ClassLabel ya nos da el mapeo numérico
    print("Etiquetas de entidad encontradas:", label_list)
    return hf_dataset, label_list

# Cargar los datos
datos_originales = cargar_datos_jsonl(ARCHIVO_DATASET_JSONL)
dataset_hf, label_list = crear_dataset_hf(datos_originales)

# Obtener las etiquetas BIO
# Necesitamos etiquetas B-LABEL, I-LABEL para cada label original, más 'O'
bio_label_list = ["O"] + [f"B-{lbl}" for lbl in label_list] + [f"I-{lbl}" for lbl in label_list]
label2id = {label: i for i, label in enumerate(bio_label_list)}
id2label = {i: label for i, label in enumerate(bio_label_list)}
num_labels_ner = len(bio_label_list)

print(f"\nNúmero total de etiquetas BIO: {num_labels_ner}")
print("Mapeo label2id (BIO):", label2id)

# Crear directorio de salida si no existe
os.makedirs(OUTPUT_DIR_NER, exist_ok=True)
# Guardar los mapeos BIO (muy importantes para después)
map_file_path_ner = os.path.join(OUTPUT_DIR_NER, 'ner_label_mappings.json')
with open(map_file_path_ner, 'w', encoding='utf-8') as f:
    json.dump({'label2id': label2id, 'id2label': id2label}, f, ensure_ascii=False, indent=4)
print(f"Mapeos de etiquetas NER guardados en: {map_file_path_ner}")


# --- 2. Tokenización y Alineación de Etiquetas ---
print(f"\nCargando tokenizador NER para: {MODELO_BERT_NER}")
tokenizer_ner = AutoTokenizer.from_pretrained(MODELO_BERT_NER)

def tokenize_and_align_labels(examples):
    """Tokeniza texto y alinea las etiquetas NER con los tokens/subtokens."""
    # Tokenizar los textos. `is_split_into_words=False` porque damos frases completas.
    # `truncation=True`, `padding='max_length'` para asegurar longitud uniforme.
    # `max_length=MAX_LEN_NER`
    # `return_offsets_mapping=True` es CLAVE para obtener los spans de caracteres de cada token.
    tokenized_inputs = tokenizer_ner(
        examples["text"],
        truncation=True,
        padding="max_length",
        max_length=MAX_LEN_NER,
        return_offsets_mapping=True,
        is_split_into_words=False # Importante: le pasamos frases
    )

    labels = []
    # Iterar sobre cada ejemplo (frase) en el lote
    for i, ner_tags_ejemplo in enumerate(examples["ner_tags"]):
        # Obtener los IDs de palabra correspondientes a cada token
        # word_ids nos dice a qué palabra original pertenece cada token/subtoken
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        # Obtener los mapeos de offset (inicio_char, fin_char) para los tokens
        offset_mapping = tokenized_inputs["offset_mapping"][i]

        # Crear una lista de etiquetas inicializada con 'O' (o su ID) para esta frase
        # Usamos -100 como etiqueta especial para tokens especiales ([CLS], [SEP]) y subtokens subsiguientes
        # que no queremos incluir en la pérdida.
        previous_word_idx = None
        label_ids = [-100] * len(word_ids) # Inicializar con ignore_index

        # Ordenar las entidades por su posición inicial para procesarlas correctamente
        ner_tags_ejemplo.sort(key=lambda x: x['start'])

        # Índice para rastrear la entidad actual que estamos buscando
        current_tag_idx = 0
        for token_idx, word_idx in enumerate(word_ids):
            # Si es un token especial ([CLS], [SEP], padding) o no tiene word_id, asignar -100
            if word_idx is None:
                continue

            # Si estamos en un nuevo word_id (primer subtoken de una palabra)
            if word_idx != previous_word_idx:
                # Resetear la etiqueta por defecto a 'O' para esta palabra
                current_label_id = label2id["O"]

                # Buscar si esta palabra (token) está dentro de alguna entidad
                # Iteramos sobre las entidades restantes desde current_tag_idx
                for tag_idx in range(current_tag_idx, len(ner_tags_ejemplo)):
                    tag = ner_tags_ejemplo[tag_idx]
                    token_start, token_end = offset_mapping[token_idx]

                    # Comprobar si el inicio de este token cae dentro del span de la entidad
                    # y si el token no es solo padding (end > start)
                    if token_start >= tag['start'] and token_start < tag['end'] and token_end > token_start:
                        # ¡Coincidencia! Este token es el inicio o parte de la entidad
                        label_str = id2label[tag['label']] # Obtener 'NOMBRE_EMPRESA', etc.
                        current_label_id = label2id[f"B-{label_str}"] # Asignar B-LABEL
                        # Marcar que ya procesamos/encontramos esta entidad (parcialmente)
                        # Avanzamos current_tag_idx solo si el token actual está completamente DENTRO
                        # de la entidad o si es la última entidad. En la práctica,
                        # esta lógica puede ser compleja. Una forma más simple es
                        # seguir buscando si el *siguiente* token cae en la misma entidad.
                        # La lógica aquí es básica: si encontramos el inicio, lo etiquetamos B-.
                        break # Encontramos la etiqueta para esta palabra, salimos del bucle interno
                    # Si el inicio del token ya pasó el final de la entidad actual,
                    # avanzamos al siguiente tag para la próxima palabra.
                    elif token_start >= tag['end']:
                         current_tag_idx += 1 # Ya no buscaremos este tag
                    # Si el token empieza antes que la entidad actual, seguimos buscando
                    # la entidad actual con los siguientes tokens.

                label_ids[token_idx] = current_label_id

            # Si es un subtoken subsiguiente de la misma palabra
            elif word_idx == previous_word_idx:
                 # Aplicar la estrategia LABEL_ALL_SUBWORDS
                 if LABEL_ALL_SUBWORDS:
                      # Si la etiqueta anterior era B- o I-, la mantenemos como I-
                      # Si era O, la mantenemos como O.
                      # Necesitamos la etiqueta asignada al token anterior *de esta palabra*
                      # Esto se complica, más fácil es usar la etiqueta de la primera parte
                      # y si esa era B-, poner I-
                      if label_ids[token_idx-1] != -100:
                           prev_label_id = label_ids[token_idx-1]
                           # Si la etiqueta previa era B-Algo, convertirla a I-Algo
                           if id2label[prev_label_id].startswith("B-"):
                                corresponding_i_label = id2label[prev_label_id].replace("B-", "I-")
                                if corresponding_i_label in label2id:
                                    label_ids[token_idx] = label2id[corresponding_i_label]
                                else: # Si no existe I- (poco probable), mantener B- o poner O? Poner -100 es seguro.
                                     label_ids[token_idx] = -100
                           # Si la etiqueta previa era I-Algo, mantenerla I-Algo
                           elif id2label[prev_label_id].startswith("I-"):
                                label_ids[token_idx] = prev_label_id
                           else: # Si era O, mantener O (-100 por defecto ya lo hace)
                                label_ids[token_idx] = label2id["O"] # O asignar -100
                      else:
                          label_ids[token_idx] = -100 # Si el token anterior fue ignorado, ignorar este tambien

                 else: # Estrategia común: ignorar subtokens subsiguientes
                    label_ids[token_idx] = -100

            previous_word_idx = word_idx # Actualizar el índice de la palabra anterior

        labels.append(label_ids)

    # Añadir las etiquetas procesadas al diccionario de salida
    tokenized_inputs["labels"] = labels
    return tokenized_inputs


print("\nTokenizando y alineando etiquetas...")
# Aplicar la función a todo el dataset usando .map() - es eficiente
tokenized_dataset = dataset_hf.map(
    tokenize_and_align_labels,
    batched=True, # Procesar en lotes
    remove_columns=dataset_hf.column_names # Eliminar columnas antiguas
)

print("Ejemplo de datos procesados (primer ejemplo):")
print("Tokens:", tokenizer_ner.convert_ids_to_tokens(tokenized_dataset[0]['input_ids']))
print("Labels:", [id2label.get(lbl_id, lbl_id) for lbl_id in tokenized_dataset[0]['labels']]) # Muestra N/A si no está en id2label

# Dividir en entrenamiento y validación (ej. 90/10)
train_val_split = tokenized_dataset.train_test_split(test_size=0.1, seed=42)
final_datasets = DatasetDict({
    'train': train_val_split['train'],
    'validation': train_val_split['test']
})
print("\nDatasets divididos para entrenamiento/validación:")
print(final_datasets)


# --- 3. Carga del Modelo NER ---
print(f"\nCargando modelo NER pre-entrenado: {MODELO_BERT_NER}")
model_ner = AutoModelForTokenClassification.from_pretrained(
    MODELO_BERT_NER,
    num_labels=num_labels_ner, # Número de etiquetas BIO
    id2label=id2label,       # Mapeo ID -> Label (para inferencia)
    label2id=label2id        # Mapeo Label -> ID (para inferencia)
)

# --- 4. Configuración del Entrenamiento ---
print("\nConfigurando el entrenamiento...")
training_args_ner = TrainingArguments(
    output_dir=OUTPUT_DIR_NER,
    num_train_epochs=EPOCHS_NER,
    per_device_train_batch_size=BATCH_SIZE_NER,
    per_device_eval_batch_size=BATCH_SIZE_NER,
    learning_rate=LEARNING_RATE_NER,
    weight_decay=0.01,
    evaluation_strategy="epoch", # Evaluar después de cada época
    save_strategy="epoch",       # Guardar después de cada época
    load_best_model_at_end=True, # Cargar el mejor modelo al final
    metric_for_best_model="eval_f1", # Usar F1-score de validación para decidir el mejor
    greater_is_better=True,
    logging_dir=f"{OUTPUT_DIR_NER}/logs",
    logging_steps=50, # Loggear cada 50 pasos
    push_to_hub=False # No subir al Hub de Hugging Face
)

# Data Collator: se encarga de crear los lotes correctamente (padding dinámico si es necesario)
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer_ner)

# Métrica de Evaluación (usando seqeval)
def compute_metrics(p):
    predictions, labels = p
    # predictions son logits, necesitamos el argmax para obtener los IDs predichos
    predictions = np.argmax(predictions, axis=2)

    # Convertir IDs a etiquetas BIO strings, ignorando -100
    true_labels = [
        [id2label[l] for l in label if l != -100]
        for label in labels
    ]
    true_predictions = [
        [id2label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    # Calcular métricas usando seqeval
    report = classification_report(true_labels, true_predictions, output_dict=True, mode='strict', scheme=IOB2)

    # Extraer métricas clave (ej. F1 general ponderado)
    results = {
        "precision": report["weighted avg"]["precision"],
        "recall": report["weighted avg"]["recall"],
        "f1": report["weighted avg"]["f1-score"],
        "accuracy": report["weighted avg"]["precision"], # seqeval no da accuracy simple, usamos precision como proxy o calculamos aparte si es necesario
    }
    # Imprimir reporte completo para inspección durante el entrenamiento
    # print("\n--- Seqeval Report ---")
    # print(classification_report(true_labels, true_predictions, mode='strict', scheme=IOB2))
    # print("--- End Report ---")
    return results


# --- 5. Entrenamiento ---
trainer = Trainer(
    model=model_ner,
    args=training_args_ner,
    train_dataset=final_datasets["train"],
    eval_dataset=final_datasets["validation"],
    tokenizer=tokenizer_ner,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

print("\n--- Iniciando Entrenamiento NER ---")
trainer.train()
print("--- Entrenamiento NER Completado ---")

# --- 6. Guardar el Mejor Modelo y Tokenizador ---
# El Trainer ya guarda el mejor modelo si `load_best_model_at_end=True`
# y `save_strategy` está activa. Forzamos un guardado final por si acaso.
print(f"\nGuardando el modelo final entrenado en: {OUTPUT_DIR_NER}")
trainer.save_model(OUTPUT_DIR_NER)
tokenizer_ner.save_pretrained(OUTPUT_DIR_NER)
# El mapeo de etiquetas ya se guardó antes.

print("¡Modelo NER y tokenizador guardados exitosamente!")

# --- 7. Cargar Modelo y Tokenizador Guardados (para inferencia) ---
print(f"\n--- Cargando modelo NER guardado desde: {OUTPUT_DIR_NER} ---")

# Re-cargar mapeos
map_file_path_ner = os.path.join(OUTPUT_DIR_NER, 'ner_label_mappings.json')
try:
    with open(map_file_path_ner, 'r', encoding='utf-8') as f:
        mappings_ner = json.load(f)
    id2label_cargado = {int(k): v for k, v in mappings_ner['id2label'].items()}
    label2id_cargado = mappings_ner['label2id']
    print("Mapeos de etiquetas NER cargados.")
except Exception as e:
    print(f"Error al cargar mapeos NER: {e}")
    exit()

# Cargar modelo y tokenizador
try:
    model_cargado_ner = AutoModelForTokenClassification.from_pretrained(OUTPUT_DIR_NER)
    tokenizer_cargado_ner = AutoTokenizer.from_pretrained(OUTPUT_DIR_NER)
    print("Modelo y tokenizador NER cargados exitosamente.")
except Exception as e:
    print(f"Error al cargar modelo/tokenizador NER: {e}")
    exit()

# Configurar dispositivo y modo evaluación
device_ner = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_cargado_ner.to(device_ner)
model_cargado_ner.eval()
print(f"Modelo NER listo para inferencia en dispositivo: {device_ner}")

# --- 8. Función de Predicción y Chat Simple ---

def predecir_ner(texto, modelo, tokenizer, device, id_to_label_map, max_len):
    """Predice entidades NER en un texto dado."""
    if not texto or not isinstance(texto, str) or texto.strip() == "":
        return [], "Texto inválido."

    # 1. Tokenizar y obtener offsets
    inputs = tokenizer(
        texto,
        return_tensors='pt',
        max_length=max_len,
        padding='max_length',
        truncation=True,
        return_offsets_mapping=True # Necesario para mapear tokens a texto original
    )
    offset_mapping = inputs.pop("offset_mapping").squeeze().tolist() # Quitar y convertir a lista

    # 2. Mover a dispositivo
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # 3. Inferencia
    with torch.no_grad():
        outputs = modelo(**inputs)

    # 4. Obtener predicciones (IDs)
    predictions = torch.argmax(outputs.logits, dim=-1).squeeze().tolist()

    # 5. Reconstruir entidades desde tokens/predicciones/offsets
    entities = []
    current_entity = None

    for i, pred_id in enumerate(predictions):
        # Ignorar tokens especiales y de padding (basado en offset)
        start_char, end_char = offset_mapping[i]
        if start_char == end_char: # Token especial o padding
             # Si estábamos construyendo una entidad, la cerramos aquí
            if current_entity:
                entities.append(current_entity)
                current_entity = None
            continue

        label_name = id_to_label_map.get(pred_id, "O")

        if label_name.startswith("B-"):
            # Si había una entidad abierta, la guardamos
            if current_entity:
                entities.append(current_entity)
            # Empezamos una nueva entidad
            current_entity = {
                "text": texto[start_char:end_char],
                "label": label_name[2:], # Quitar el prefijo "B-"
                "start": start_char,
                "end": end_char
            }
        elif label_name.startswith("I-"):
            # Si estamos dentro de una entidad y la etiqueta coincide (o es continuación)
            if current_entity and current_entity["label"] == label_name[2:]:
                # Extendemos el texto y el índice final
                # ¡Cuidado! Esto asume que los tokens son contiguos. El offset es más seguro.
                current_entity["text"] = texto[current_entity["start"]:end_char]
                current_entity["end"] = end_char
            else:
                # Caso raro: I- sin B- o con label diferente. Cerramos la anterior si existe.
                if current_entity:
                    entities.append(current_entity)
                current_entity = None # O podrías intentar empezar una nueva entidad con I-? Mejor ignorar.

        elif label_name == "O":
            # Si encontramos 'O' y había una entidad abierta, la cerramos
            if current_entity:
                entities.append(current_entity)
                current_entity = None

    # Asegurarse de guardar la última entidad si el texto termina con ella
    if current_entity:
        entities.append(current_entity)

    # Devolver la lista de entidades encontradas y el texto original
    return entities, texto


# --- Bucle de Chat Simple NER ---
print("\n--- Chat Simple NER ---")
print("Escribe una frase para extraer entidades o 'salir' para terminar.")

while True:
    try:
        frase_usuario = input("Tú: ")
        if frase_usuario.lower() in ['salir', 'exit', 'quit', 'terminar']:
            print("Chatbot NER: ¡Adiós!")
            break

        entidades_encontradas, texto_original = predecir_ner(
            frase_usuario,
            model_cargado_ner,
            tokenizer_cargado_ner,
            device_ner,
            id2label_cargado, # Usamos el mapeo ID -> Label cargado
            MAX_LEN_NER
        )

        print("Chatbot NER:")
        if entidades_encontradas:
            print("  Entidades encontradas:")
            for ent in entidades_encontradas:
                print(f"    - Texto: '{ent['text']}'")
                print(f"      Label: {ent['label']}")
                print(f"      Posición: ({ent['start']}, {ent['end']})")
        elif texto_original != "Texto inválido.":
            print("  No se encontraron entidades conocidas en la frase.")
        else:
            print(f"  {texto_original}") # Mensaje de error si el texto fue inválido

    except KeyboardInterrupt:
        print("\nChatbot NER: ¡Adiós!")
        break
    except Exception as e:
        print(f"Chatbot NER: Ocurrió un error inesperado: {e}")
        import traceback
        traceback.print_exc() # Imprimir traceback para depuración