@inproceedings{souza2020bertimbau, author = {F{'a}bio Souza and Rodrigo Nogueira and Roberto Lotufo}, title = {{BERT}imbau: pretrained {BERT} models for {B}razilian {P}ortuguese}, booktitle = {9th Brazilian Conference on Intelligent Systems, {BRACIS}, Rio Grande do Sul, Brazil, October 20-23 (to appear)}, year = {2020} }

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install nbstripout
!nbstripout /content/drive/MyDrive/2025/tcc-final/fine_tuning_Bertimbau_version2_24_07.ipynb

In [None]:
!pip install transformers evaluate accelerate
import pandas as pd
import numpy as np
from sklearn.model_selection import StratifiedKFold
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding,
    EarlyStoppingCallback # Importa explicitamente para o callback
)
import torch
from torch.utils.data import Dataset as TorchDataset
import seaborn as sns
import matplotlib.pyplot as plt
import sys
import subprocess
import time
import evaluate
start_total_time = time.time()

# **1. Preparação do Dataset para K-Fold**

Iniciamos carregando o dataset completo de denúncias. Diferente do método anterior, aqui não faremos uma divisão fixa. Em vez disso, usaremos a ***técnica de Validação Cruzada (K-Fold)***, que nos permite treinar e validar o modelo várias vezes em subconjuntos diferentes do nosso dataset, obtendo uma avaliação de desempenho mais confiável e robusta.

In [None]:
# Carregar dataset já processado
file_path = '/content/drive/MyDrive/2025/tcc-final/denuncias_balanceadas.xlsx'
df = pd.read_excel(file_path)

print(f"Dataset carregado. Total de registros: {len(df)}")
print("Primeiras 5 linhas do dataset:")
print(df.head())
print("\nInformações do dataset:")
print(df.info())

# **2. Processando o Texto com o BERTimbau**

Assim como na abordagem anterior, esta etapa é crucial para preparar o texto. Usamos o ***tokenizer*** do modelo ***BERTimbau*** para converter as denúncias em um formato numérico compreensível pelo modelo. Também criamos classes personalizadas ***(CustomDataset)*** e um *** DataCollator*** para otimizar a forma como os dados são alimentados ao modelo durante o treinamento e a avaliação.


In [None]:
model_path = "neuralmind/bert-base-portuguese-cased"
tokenizer = AutoTokenizer.from_pretrained(model_path)

def tokenize_function(batch):
    return tokenizer(batch["texto"], truncation=True, max_length=512, padding='longest', return_tensors='pt')

# Mapeamento de Labels: Essencial para o modelo e métricas
id2label = {0: "invasao_domicilio", 1: "violencia_fisica"}
label2id = {v: k for k, v in id2label.items()}

# Custom Dataset (para trabalhar com tensores PyTorch diretamente)
class CustomDataset(TorchDataset): # Certifique-se que 'Dataset' foi importado corretamente
    def __init__(self, encodings, labels):
        self.encodings = encodings # Agora será um dicionário de tensores (input_ids, attention_mask)
        self.labels = labels       # Agora será uma lista de labels (ou Series com índice reiniciado)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long) # Acesso direto pelo índice
        return item

    def __len__(self):
        return len(self.labels)

# Data Collator para padding dinâmico durante o treinamento
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

# **3. Métricas de Desempenho**

Para avaliar o desempenho do nosso modelo de forma completa, definimos uma série de métricas. Além da ***acurácia*** (a proporção de acertos), calculamos o ***AUC-ROC*** (que mede a capacidade de distinção entre as classes), e as métricas de ***Precisão, Recall e F1-Score***. O cálculo dessas métricas para cada classe e a média (***macro***) nos dão uma visão detalhada do desempenho do modelo, especialmente em datasets com classes desbalanceadas.

In [None]:
precision_metric = evaluate.load("precision")
recall_metric = evaluate.load("recall")
f1_metric = evaluate.load("f1")
accuracy_metric = evaluate.load("accuracy")
auc_metric = evaluate.load("roc_auc")

def compute_metrics(eval_pred):
    try:
        predictions, labels = eval_pred
        probabilities = np.exp(predictions - np.max(predictions, axis=-1, keepdims=True)) / np.sum(np.exp(predictions - np.max(predictions, axis=-1, keepdims=True)), axis=-1, keepdims=True)
        positive_class_probs = probabilities[:, 1]

        # Classe prevista (ID da maior probabilidade)
        preds = np.argmax(predictions, axis=1)

        # Labels das classes, conforme definido no notebook
        class_labels = [label2id["invasao_domicilio"], label2id["violencia_fisica"]]

        # --- CÁLCULO DAS MÉTRICAS ---
        acc = accuracy_metric.compute(predictions=preds, references=labels)["accuracy"]
        auc = auc_metric.compute(prediction_scores=positive_class_probs, references=labels)["roc_auc"]

        f1_results_by_class = f1_metric.compute(predictions=preds, references=labels, average=None, labels=class_labels)["f1"]
        f1_macro = f1_metric.compute(predictions=preds, references=labels, average="macro")["f1"]

        precision_results_by_class = precision_metric.compute(predictions=preds, references=labels, average=None, labels=class_labels)["precision"]
        precision_macro = precision_metric.compute(predictions=preds, references=labels, average="macro")["precision"]

        recall_results_by_class = recall_metric.compute(predictions=preds, references=labels, average=None, labels=class_labels)["recall"]
        recall_macro = recall_metric.compute(predictions=preds, references=labels, average="macro")["recall"]

        # --- RETORNO DO DICIONÁRIO COMPLETO DE MÉTRICAS ---
        return {
            "accuracy": round(acc, 4),
            "auc": round(auc, 4),
            "f1_invasao": round(f1_results_by_class[0], 4),
            "f1_violencia": round(f1_results_by_class[1], 4),
            "precision_invasao": round(precision_results_by_class[0], 4),
            "precision_violencia": round(precision_results_by_class[1], 4),
            "recall_invasao": round(recall_results_by_class[0], 4),
            "recall_violencia": round(recall_results_by_class[1], 4),
            "precision_macro": round(precision_macro, 4),
            "recall_macro": round(recall_macro, 4),
            "f1_macro": round(f1_macro, 4)
        }
    except Exception as e:
        print(f"Erro no cálculo de métricas: {str(e)}")
        # Em caso de erro, retorna valores padrão para não quebrar o treinamento
        return {
            "accuracy": 0.0,
            "auc": 0.0,
            "f1_invasao": 0.0,
            "f1_violencia": 0.0,
            "precision_invasao": 0.0,
            "precision_violencia": 0.0,
            "recall_invasao": 0.0,
            "recall_violencia": 0.0,
            "precision_macro": 0.0,
            "recall_macro": 0.0,
            "f1_macro": 0.0
        }

# **4. Validação Cruzada (K-Fold) com Early Stopping**

Esta é a etapa central do nosso projeto. Utilizamos a ***Validação Cruzada com 5 folds*** para treinar o modelo. Isso significa que o dataset é dividido em 5 partes: o modelo é treinado em 4 delas e validado na 5ª. Este processo se repete 5 vezes, garantindo que todas as denúncias sejam usadas na validação e nos dando uma estimativa mais precisa do desempenho geral do modelo. O ***Early Stopping*** é uma ferramenta crucial para evitar o overfitting, interrompendo o treinamento se o modelo parar de melhorar.

In [None]:
# Essas listas armazenarão o histórico de métricas de CADA fold para a plotagem.
all_fold_train_losses = []
all_fold_eval_losses = []
all_fold_eval_accuracies = []

# ---------------------------------------------------------------------
# Parâmetros de Treinamento
training_args = TrainingArguments(
    output_dir="./bertimbau-denuncias-cv", # Diretório de saída para K-Fold
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    learning_rate=1e-5,
    num_train_epochs=30, # Aumentado para permitir que Early Stopping atue
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss", # Monitorar a perda de validação
    greater_is_better=False, # Menor perda é melhor
    logging_dir="./logs-cv",
    logging_strategy="epoch",
    disable_tqdm=False,
    report_to="none",
    fp16=True, # Treinamento com precisão mista
    seed=42, # Semente para reprodutibilidade
    gradient_accumulation_steps=1,
    remove_unused_columns=True,
    label_names=["labels"]
)

# Preparar dados para K-Fold
all_texts = df['texto'].tolist()
all_labels_mapped = df['classe'].map(label2id).tolist() # Labels já mapeadas para IDs numéricos

N_SPLITS = 5
skf = StratifiedKFold(n_splits=N_SPLITS, shuffle=True, random_state=42)

# Lista para armazenar as métricas de teste de cada fold
all_fold_test_metrics = []
print(f"\n🚀 Iniciando Treinamento com K-Fold Cross-Validation ({N_SPLITS} folds)....\n")

# Loop através dos folds
for fold, (train_index, test_index) in enumerate(skf.split(all_texts, all_labels_mapped)):
    print(f"\n==================== INICIANDO FOLD {fold+1}/{N_SPLITS} ====================")

    # Dividir os dados para o fold atual
    train_fold_texts = [all_texts[i] for i in train_index]
    train_fold_labels = [all_labels_mapped[i] for i in train_index]
    test_fold_texts = [all_texts[i] for i in test_index]
    test_fold_labels = [all_labels_mapped[i] for i in test_index]

    # Tokenizar os dados para o fold atual
    train_fold_encodings = tokenize_function({"texto": train_fold_texts})
    test_fold_encodings = tokenize_function({"texto": test_fold_texts})

    # Criar CustomDatasets para o fold atual
    train_dataset = CustomDataset(train_fold_encodings, train_fold_labels)
    eval_dataset_for_trainer = CustomDataset(test_fold_encodings, test_fold_labels)

    # Re-instanciar o modelo para cada fold
    model = AutoModelForSequenceClassification.from_pretrained(
        model_path,
        num_labels=2,
        id2label=id2label,
        label2id=label2id
    )

    # Congelar camadas do BERT (exceto pooler e classificador)
    for name, param in model.named_parameters():
        if 'classifier' not in name and 'pooler' not in name:
            param.requires_grad = False

    print(f"⚙️ Número de parâmetros treináveis no FOLD {fold+1}: {sum(p.numel() for p in model.parameters() if p.requires_grad)}")

    # Re-instanciar o Trainer para cada fold
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset_for_trainer,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=3, early_stopping_threshold=1e-3)],
        compute_metrics=compute_metrics,
        data_collator=data_collator
    )

    # Treinar o modelo para o fold atual
    try:
        train_result = trainer.train()

        # Salvar o best model do fold atual
        output_dir_fold = f"/content/drive/MyDrive/2025/tcc-final/resultados_kfold/fold-{fold+1}"
        trainer.save_model(output_dir_fold)
        print(f"✅ Modelo salvo em: {output_dir_fold}")

        # Perform evaluation on the test set
        evaluation_output = trainer.evaluate(eval_dataset_for_trainer)

        # Extract predictions and true labels
        predictions = trainer.predict(eval_dataset_for_trainer).predictions
        predicted_labels = np.argmax(predictions, axis=1)
        true_labels = eval_dataset_for_trainer.labels

        # Map predicted labels back to original class names
        predicted_class_names = [id2label[label] for label in predicted_labels]
        true_class_names = [id2label[label] for label in true_labels]

        # Create a DataFrame with predictions
        predictions_df = pd.DataFrame({
            "texto": test_fold_texts,
            "classe_verdadeira": true_class_names,
            "classe_prevista": predicted_class_names
        })

        # Save predictions to Excel
        predictions_output_path = f"/content/drive/MyDrive/2025/tcc-final/resultados_kfold/fold_{fold+1}_predictions.xlsx"
        predictions_df.to_excel(predictions_output_path, index=False)
        print(f"✅ Previsões salvas em: {predictions_output_path}")

        # --- LÓGICA SIMPLIFICADA DE COLETA DE MÉTRICAS ---
        logs = trainer.state.log_history

        # Listas temporárias para as métricas deste fold
        fold_train_losses = []
        fold_eval_losses = []
        fold_eval_accuracies = []

        # Agora é simples: 1 log por época para cada métrica
        for log_entry in logs:
            if 'loss' in log_entry and 'eval_loss' not in log_entry:
                fold_train_losses.append(log_entry['loss'])
            if 'eval_loss' in log_entry:
                fold_eval_losses.append(log_entry['eval_loss'])
                fold_eval_accuracies.append(log_entry['eval_accuracy'])

        # 🔍 DIAGNÓSTICO: Verificar se está tudo alinhado
        print(f"\n📊 FOLD {fold+1} - Métricas coletadas:")
        print(f"   • Train losses: {len(fold_train_losses)} épocas")
        print(f"   • Eval losses: {len(fold_eval_losses)} épocas")
        print(f"   • Eval accuracies: {len(fold_eval_accuracies)} épocas")

        # ⚠️ SEGURANÇA: Garantir mesmo tamanho (caso haja alguma inconsistência)
        min_length = min(len(fold_train_losses), len(fold_eval_losses))
        if len(fold_train_losses) != len(fold_eval_losses):
            print(f"   ⚠️ AVISO: Tamanhos diferentes! Ajustando para {min_length} épocas")
            fold_train_losses = fold_train_losses[:min_length]
            fold_eval_losses = fold_eval_losses[:min_length]
            fold_eval_accuracies = fold_eval_accuracies[:min_length]

        # Armazenar as curvas de métricas deste fold
        all_fold_train_losses.append(fold_train_losses)
        all_fold_eval_losses.append(fold_eval_losses)
        all_fold_eval_accuracies.append(fold_eval_accuracies)
        # --------------------------------------------------------

        print(f"\n📊 Métricas finais de treino para FOLD {fold+1}:")
        print(f"   Loss: {train_result.metrics.get('train_loss', 'N/A'):.4f}")
        print(f"   Tempo total: {train_result.metrics.get('train_runtime', 'N/A'):.2f}s")

        # Evaluate on test set
        print(f"🧪 Avaliação no conjunto de teste/validação do FOLD {fold+1}...")
        fold_test_metrics = trainer.evaluate(eval_dataset_for_trainer)
        all_fold_test_metrics.append(fold_test_metrics)
        print(f"   Resultados do FOLD {fold+1}: {fold_test_metrics}")

    except Exception as e:
        import traceback
        print(f"❌ Erro durante o treinamento do FOLD {fold+1}:")
        print(traceback.format_exc())
        all_fold_test_metrics.append({
            'eval_loss': float('nan'),
            'eval_accuracy': 0.0,
            'eval_auc': 0.0,
            'eval_f1_invasao': 0.0,
            'eval_f1_violencia': 0.0,
            'eval_runtime': float('nan'),
            'eval_samples_per_second': 0.0,
            'eval_steps_per_second': 0.0
        })

# --- CÓDIGO DE PLOTAGEM SIMPLIFICADO ---
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 6))

# --- Gráfico 1: Perda de Treino por Época ---
for i, train_loss_history in enumerate(all_fold_train_losses):
    epochs = range(1, len(train_loss_history) + 1)
    ax1.plot(epochs, train_loss_history,
             marker='x', linestyle='--', label=f'Fold {i+1}')

ax1.set_xlabel("Época")
ax1.set_ylabel("Loss")
ax1.set_title("Perda de Treino por Época (Todos os Folds)")
ax1.legend()
ax1.grid(True)

# --- Gráfico 2: Perda de Validação por Época ---
for i, eval_loss_history in enumerate(all_fold_eval_losses):
    epochs = range(1, len(eval_loss_history) + 1)
    ax2.plot(epochs, eval_loss_history,
             marker='o', linestyle='-', label=f'Fold {i+1}')

ax2.set_xlabel("Época")
ax2.set_ylabel("Loss")
ax2.set_title("Perda de Validação por Época (Todos os Folds)")
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.show()

# **5. RESULTADOS FINAIS DA VALIDAÇÃO CRUZADA**

In [None]:
print("\n\n==================== RESULTADOS FINAIS K-FOLD CROSS-VALIDATION ====================")

if all_fold_test_metrics:
    # Coletar todas as métricas de todos os folds
    df_metrics = pd.DataFrame(all_fold_test_metrics)

    # Calcular médias e desvios padrão
    avg_metrics = df_metrics.mean(numeric_only=True).round(4).to_dict()
    std_metrics = df_metrics.std(numeric_only=True).round(4).to_dict()

    print("Métricas Médias (e Desvio Padrão) em todos os Folds:")
    for metric, avg_value in avg_metrics.items():
        # Excluir métricas de tempo/desempenho por segundo da apresentação principal se desejar
        if 'runtime' not in metric and 'samples_per_second' not in metric and 'steps_per_second' not in metric:
            print(f"{metric}: {avg_value} (± {std_metrics.get(metric, 0)})")
else:
    print("Nenhum resultado de fold foi coletado. Ocorreram erros em todos os folds.")

print("\n✅ K-Fold Cross-Validation Concluído.")

# **6. SALVAR MODELO (Considerações Pós K-Fold)**

In [None]:
criterio = "eval_accuracy"  # ou "eval_auc", "eval_f1_invasao", etc.
best_fold_idx = int(np.argmax([m.get(criterio, float('-inf')) for m in all_fold_test_metrics]))
print(f"🏆 Melhor modelo: Fold {best_fold_idx+1} com {criterio} = {all_fold_test_metrics[best_fold_idx][criterio]:.4f}")