# Modelo Bertimbau para avaliação das proposições

In [None]:
import torch, os, evaluate, wandb,datetime
from sklearn.metrics import classification_report
import numpy as np
from sklearn.model_selection import train_test_split
from datasets import Dataset, load_dataset
import pandas as pd
from google.colab import drive
from sklearn.metrics import accuracy_score, recall_score, precision_score, roc_auc_score, \
                            classification_report, f1_score, precision_recall_fscore_support
from transformers import AutoTokenizer, AutoModelForSequenceClassification,\
                            Trainer,TrainingArguments, AutoConfig, EarlyStoppingCallback, IntervalStrategy
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'

## Carregar e organizar o dataset

In [None]:
#caminho para o drive com os dados de treinamento
BASE_PATH = "/content/drive/MyDrive/IA-AzMina/modelos/"
RANDOM_SEED = 5151

dataset = load_dataset("azmina/ementas_congresso")

In [None]:
#organizando o dataset com as colunas que serão utilizadas e dividindo entre treino e teste
cols = ["text","label_desfavoravel"]
fix_columns = {"label_desfavoravel":"label"}

df_test = df["test"].to_pandas().dropna(subset=['textoInteiroTeor'])[cols].rename(columns=fix_columns)
df_train = df["train"].to_pandas()[cols].rename(columns=fix_columns)
df_val = df["val"].to_pandas()[cols].rename(columns=fix_columns)

In [None]:
#carregando o tokenizer e o modelo
unique_labels = df_train.label.unique()

label2id = {str(label): int(i) for i, label in enumerate(unique_labels)}
id2label = {int(i): str(label) for i, label in enumerate(unique_labels)}

pretrainedmodel = 'neuralmind/bert-base-portuguese-cased'

model = AutoModelForSequenceClassification.from_pretrained(pretrainedmodel,
                                                           num_labels=len(unique_labels),
                                                           id2label=id2label,
                                                           label2id=label2id,
                                                           hidden_dropout_prob=0.07,
                                                           attention_probs_dropout_prob=0.07)

tokenizer = AutoTokenizer.from_pretrained(pretrainedmodel)

In [None]:
def tokenize_function(record,max_length=512):
    return tokenizer(record['text'], truncation=True, padding='max_length', max_length=max_length)

train = Dataset.from_pandas(df_train).map(lambda x: tokenize_function(x, max_length=512), batched=True)
val = Dataset.from_pandas(df_val).map(lambda x: tokenize_function(x, max_length=512), batched=True)
test = Dataset.from_pandas(df_test).map(lambda x: tokenize_function(x, max_length=512), batched=True)

## Definindo as métricas de avaliação

In [None]:
accuracy_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")
recall_metric = evaluate.load("recall")
precision_metric = evaluate.load("precision")

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)

    accuracy_result = accuracy_metric.compute(predictions=predictions, references=labels)
    f1_result = f1_metric.compute(predictions=predictions, references=labels,average='macro')
    recall_result = recall_metric.compute(predictions=predictions, references=labels,average='macro')
    precision_result = precision_metric.compute(predictions=predictions, references=labels,average='macro')

    result = {**accuracy_result, **f1_result, **recall_result, **precision_result}
    return result

## Definição da função de perda

In [None]:

# Fonte: https://amaarora.github.io/posts/2020-06-29-FocalLoss.html


class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2, num_classes=2):
        super(FocalLoss, self).__init__()
        self.alpha = torch.tensor([alpha, 1-alpha]).cuda()
        self.gamma = gamma
        self.num_classes = num_classes

    def forward(self, inputs, targets):
        # Convert targets to long type and one-hot encode
        targets = targets.type(torch.long)

        # One-hot encoding of targets
        targets_one_hot = F.one_hot(targets, num_classes=self.num_classes).float()

        # Compute binary cross-entropy loss
        BCE_loss = F.binary_cross_entropy_with_logits(inputs, targets_one_hot, reduction='none')

        # Compute probabilities
        pt = torch.exp(-BCE_loss)

        # Select alpha values for each sample based on the true class
        at = self.alpha.gather(0, targets.data.view(-1)).view(-1, 1)

        # Compute focal loss
        F_loss = at * (1 - pt)**self.gamma * BCE_loss

        return F_loss.mean()



class CustomTrainer(Trainer):
    def __init__(self, *args, focal_loss=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.focal_loss = focal_loss

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        # Standard compute_loss method with support for unexpected arguments
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        loss = self.focal_loss(logits, labels)  # Use the Focal Loss
        return (loss, outputs) if return_outputs else loss

focal_loss_fn = FocalLoss(alpha=0.25, gamma=2)

## Treinamento do modelo

In [None]:
batch_size = 64
eval_steps = len(train) // batch_size * 3 # aumente o multiplicador para avaliar menos

training_args = TrainingArguments(
    output_dir=".",
    eval_strategy="epoch",
    eval_steps=eval_steps,
    save_strategy="epoch",
    save_steps=eval_steps*2,
    warmup_steps=150,
    logging_steps=10,
    learning_rate=1e-5,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=19,
    weight_decay=0.02,
    seed=RANDOM_SEED,
    save_total_limit=1,
    report_to="wandb",
    metric_for_best_model="f1",
    run_name=wandb_run_name,
    load_best_model_at_end = True,
    optim="adamw_torch"
)

trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=train,
    eval_dataset=val,
    focal_loss=focal_loss_fn,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

trainer.train()

## Avaliação no conjunto de teste

In [None]:
trainer.evaluate(test)

In [None]:
eval_test = test.to_pandas()
eval_test["pred_label"] = eval_test["text"].apply(lambda x: predict_sentence(x))
eval_test["pred_label"].value_counts()

eval_test["label_name"] = eval_test["label"].apply(lambda x: id2label[x])

print(classification_report(eval_test["label_name"], eval_test["pred_label"]))
