Certifique-se de ter instalado as seguintes dependências:
- pandas
- torch
- transformers
- pytorch-lightning

## Introdução
Este projeto consiste em um sistema para treinamento e avaliação de modelos de processamento de linguagem natural (PLN) utilizando diferentes arquiteturas de modelos de linguagem pré-treinados, como BERT, RoBERTa e Longformer. O sistema é projetado para realizar tarefas específicas de classificação ou regressão em dados de notas médicas, utilizando técnicas avançadas de pré-processamento, treinamento e avaliação de modelos.


# Argumentos do Script
O script principal aceita os seguintes argumentos de linha de comando:

--CAMINHO_COORTE: Caminho para o arquivo CSV contendo os dados da coorte a serem processados.

--TIPO_MODELO: Tipo de modelo a ser utilizado para treinamento e avaliação. Opções disponíveis incluem bert, roberta e longformer.

--max_epochs: Número máximo de épocas para treinamento do modelo (padrão: 10).

--learning_rate: Taxa de aprendizado para o otimizador AdamW (padrão: 0.001).

--batch_size: Tamanho do lote para o DataLoader durante o treinamento (padrão: 32).

In [None]:
import os
from collections import defaultdict
from typing import List, Tuple, Union

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
from torch.utils.data import Dataset, DataLoader
from transformers import LongformerTokenizerFast, RobertaTokenizerFast, BertTokenizerFast, AutoTokenizer
from tqdm import tqdm

from deduper import Simple_deduper, dedup_note_df
from utils import CATEGORIAS_NOTAS, _args4dedup, _task2target, _cohort2sets, _task2rawfile, _name4dedup


class ConjuntoDedup(Dataset):
    def __init__(self, args, stay_df, deduper, raw_text_df=None, deduped_data=None, tokenizer=None, token2id=None):
        """
        Inicializa o conjunto de dados para modelagem com PyTorch.

        Args:
            args (Namespace): Argumentos do programa.
            stay_df (pd.DataFrame): DataFrame contendo os dados da coorte.
            deduper (Simple_deduper): Objeto deduplicador.
            raw_text_df (pd.DataFrame, optional): DataFrame com texto bruto.
            deduped_data (dict, optional): Dados deduplicados.
            tokenizer (AutoTokenizer, optional): Tokenizer para modelos BERT.
            token2id (dict, optional): Mapeamento de token para ID.

        """
        super().__init__()
        self.tarefa = args.TAREFA
        self.deduper = deduper
        self.tokenizer = tokenizer
        self.token2id = token2id

        self.dedup_args = _args4dedup(args)
        self.skip_dedup = args.skip_dedup
        self.skip_style = args.skip_style  # cabeça, cauda, cabeça-cauda se pular dedup
        self.drop_type = args.drop_type  # padrão sem descarte ''

        self.max_length = args.max_length
        self.max_note_length = args.max_note_length
        self.bert_max_length = args.bert_max_length

        self.max_sent_num = args.max_sent_num
        self.max_doc_num = args.max_doc_num

        self.silent = args.silent
        self.use_hierarch = 'hier' in args.TIPO_MODELO
        self.use_sent_hier = args.TIPO_MODELO == 'hier3'

        # carregar dados com sent mantido: (pt-doc-sent-token)
        self.sent_kept = True

        if raw_text_df is not None:
            assert deduped_data is None, "Não alimente duas fontes de dados juntas"
            self.data, self.text_lens = self.carregar_dados(stay_df, raw_text_df)
        else:
            assert raw_text_df is None, "Não alimente duas fontes de dados juntas"
            self.data, self.text_lens = self.carregar_deduped_data(stay_df, deduped_data)

    def carregar_dados(self, stay_df, raw_text_df):
        """
        Carrega os dados brutos e deduplica conforme necessário.

        Args:
            stay_df (pd.DataFrame): DataFrame da coorte.
            raw_text_df (pd.DataFrame): DataFrame com texto bruto.

        Returns:
            data: Dados carregados e deduplicados.
            lens: Comprimentos dos dados.

        """
        data, lens = [], []
        for _, r in tqdm(stay_df.iterrows(), disable=self.silent, total=stay_df.shape[0]):
            hadm = r['HADM_ID']
            target = r[_task2target(self.tarefa)]

            text_df = raw_text_df[raw_text_df.HADM_ID == hadm]
            notas = dedup_note_df(text_df, CATEGORIAS_NOTAS, self.deduper, **self.dedup_args)
            nota_parseada = self._parsear_notas(notas)

            texto_codificado, comprimento = self._tokenizar_texto(nota_parseada)
            data.append((texto_codificado, target, hadm))
            lens.append(comprimento)

        return data, lens

    def carregar_deduped_data(self, stay_df, hadm2deduped):
        """
        Carrega os dados deduplicados.

        Args:
            stay_df (pd.DataFrame): DataFrame da coorte.
            hadm2deduped (dict): Dicionário de dados deduplicados por HADM_ID.

        Returns:
            data: Dados carregados e deduplicados.
            lens: Comprimentos dos dados.

        """
        data, lens = [], []
        for _, r in tqdm(stay_df.iterrows(), disable=self.silent, total=stay_df.shape[0]):
            hadm = r['HADM_ID']
            target = r[_task2target(self.tarefa)]

            notas = hadm2deduped[hadm]
            nota_parseada = self._parsear_notas(notas)

            texto_codificado, comprimento = self._tokenizar_texto(nota_parseada)
            data.append((texto_codificado, target, hadm))
            lens.append(comprimento)

        return data, lens

    def _tokenizar_texto(self, nota_parseada):
        """
        Tokeniza o texto com base no tipo de modelo especificado.

        Args:
            nota_parseada (str or list): Nota ou lista de notas a serem tokenizadas.

        Returns:
            texto_codificado: Texto tokenizado.
            comprimento: Comprimento do texto tokenizado.

        """
        if self.tokenizer is not None:
            if isinstance(nota_parseada, list):
                nota_parseada = ' '.join(nota_parseada)
            assert isinstance(nota_parseada, str)
            texto_codificado = self.tokenizer(nota_parseada, max_length=self.bert_max_length, truncation=True, padding='max_length', return_token_type_ids=False)
            comprimento = np.array(texto_codificado['attention_mask']).sum()

        else:
            if not self.use_hierarch:
                notas_codificadas = [self._texto2id(nota, self.token2id) for nota in nota_parseada]
                if self.max_note_length > 0:
                    notas_codificadas = [nota[:self.max_note_length] for nota in notas_codificadas if len(nota) > 0]
                texto_tokenizado = [i for j in notas_codificadas for i in j]
                comprimento = len(texto_tokenizado[:self.max_length])

                min_len = min(self.max_length, len(texto_tokenizado))
                texto_codificado = np.zeros(self.max_length)
                texto_codificado[:min_len] = texto_tokenizado[:min_len]

            else:
                if self.max_doc_num > 0:
                    nota_parseada = nota_parseada[:self.max_doc_num]

                if not self.sent_kept or not self.use_sent_hier:
                    texto_codificado = [self._texto2id(nota, self.token2id) for nota in nota_parseada]
                    texto_codificado = [texto[:self.max_note_length] for texto in texto_codificado if len(texto) > 0]
                    comprimento = sum(len(t) for t in texto_codificado)
                else:
                    texto_codificado, comprimento = [], 0
                    for doc in nota_parseada:
                        doc_codificado = [self._texto2id(sent, self.token2id) for sent in doc]
                        texto_codificado.append(doc_codificado[:self.max_sent_num])
                        comprimento += sum(len(t) for t in doc_codificado)

        return texto_codificado, comprimento

    @staticmethod
    def _texto2id(texto, token2id):
        return [token2id[token.lower()] if token.lower() in token2id else token2id['<unk>'] for token in texto.split()]

    def _parsear_notas(self, notas):
        """
        Parseia as notas do paciente, filtrando categorias específicas se necessário.

        Args:
            notas (list): Lista de tuplas (categoria, texto).

        Returns:
            list: Lista de strings ou string única.

        """
        if self.drop_type != '':
            notas = self._drop_cat(notas, self.drop_type)

        notas = [t for _, t in notas]

        if self.use_hierarch:
            if not self.use_sent_hier:
                notas = [' '.join(sents) for sents in notas]
            return notas
        else:
            if self.sent_kept:
                notas = [' '.join(sents) for sents in notas]

            if self.skip_dedup:
                if self.skip_style == 'tail':
                    return notas[::-1]
                elif self.skip_style == 'headtail':
                    return self._merge_select(notas, self.max_length)
                else:
                    return notas
            else:
                return notas

    @staticmethod
    def _drop_cat(lista_notas, to_drop, categorias=None):
        """
        Remove categorias específicas das notas.

        Args:
            lista_notas (list): Lista de tuplas (categoria, nota).
            to_drop (str): Categorias a serem removidas, separadas por '+'.

        Returns:
            list: Lista filtrada de tuplas (categoria, nota).

        """
        if '+' in to_drop:
            drop = to_drop.split('+')
        else:
            drop = [to_drop]

        if categorias is None:
            notas_drop = [n for n in drop if n in categorias]
        else:
            notas_drop = [n for n in drop if n in CATEGORIAS_NOTAS]

        return [(cat, note) for cat, note in lista_notas if cat not in notas_drop]

    def _merge_select(self, tlist, max_length):
        """
        Combina as notas de cabeça e cauda.

        Args:
            tlist (list): Lista de notas.
            max_length (int): Comprimento máximo da lista.

        Returns:
            list: Lista combinada de notas.

        """
        outlist, count = [], 0
        while count < max_length and count < len(tlist):
            outlist.append(tlist[count])
            count += 1
            if count < max_length:
                outlist.append(tlist[-count])
                count += 1
        return outlist


class ModeloDedup(pl.LightningModule):
    def __init__(self, args, model):
        """
        Inicializa o modelo para modelagem com PyTorch Lightning.

        Args:
            args (Namespace): Argumentos do programa.
            model: Modelo PyTorch.

        """
        super().__init__()
        self.args = args
        self.model = model
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        inputs, targets = batch
        outputs = self.model(inputs)
        loss = self.criterion(outputs, targets)
        return loss

    def validation_step(self, batch, batch_idx):
        inputs, targets = batch
        outputs = self.model(inputs)
        loss = self.criterion(outputs, targets)
        return {'val_loss': loss, 'val_preds': outputs, 'val_targets': targets}

    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x['val_loss'] for x in outputs]).mean()
        val_preds = torch.cat([x['val_preds'] for x in outputs])
        val_targets = torch.cat([x['val_targets'] for x in outputs])
        val_acc = torch.sum(val_preds.argmax(dim=1) == val_targets).item() / len(val_targets)
        self.log('val_loss', avg_loss, on_epoch=True)
        self.log('val_acc', val_acc, on_epoch=True)

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.args.learning_rate)
        return optimizer


def treinar_modelo(args, train_loader, val_loader):
    """
    Função para treinar o modelo.

    Args:
        args (Namespace): Argumentos do programa.
        train_loader (DataLoader): DataLoader para conjunto de treino.
        val_loader (DataLoader): DataLoader para conjunto de validação.

    Returns:
        model: Modelo treinado.

    """
    model = construir_modelo(args)
    trainer = pl.Trainer(
        gpus=args.gpus,
        max_epochs=args.max_epochs,
        progress_bar_refresh_rate=1,
        weights_summary='full'
    )
    trainer.fit(model, train_loader, val_loader)
    return model


def construir_modelo(args):
    """
    Constrói o modelo com base nos argumentos fornecidos.

    Args:
        args (Namespace): Argumentos do programa.

    Returns:
        model: Modelo construído.

    """
    if args.TIPO_MODELO == 'bert':
        tokenizer = BertTokenizerFast.from_pretrained(args.NOME_MODELO)
        model = BertForSequenceClassification.from_pretrained(args.NOME_MODELO, num_labels=args.num_labels)
    elif args.TIPO_MODELO == 'roberta':
        tokenizer = RobertaTokenizerFast.from_pretrained(args.NOME_MODELO)
        model = RobertaForSequenceClassification.from_pretrained(args.NOME_MODELO, num_labels=args.num_labels)
    elif args.TIPO_MODELO == 'longformer':
        tokenizer = LongformerTokenizerFast.from_pretrained(args.NOME_MODELO)
        model = LongformerForSequenceClassification.from_pretrained(args.NOME_MODELO, num_labels=args.num_labels)
    else:
        raise ValueError(f"Tipo de modelo {args.TIPO_MODELO} não suportado.")

    conjunto_treino = ConjuntoDedup(args, args.dados_treino, args.deduper, tokenizer=tokenizer)
    conjunto_validacao = ConjuntoDedup(args, args.dados_validacao, args.deduper, tokenizer=tokenizer)

    loader_treino = DataLoader(conjunto_treino, batch_size=args.batch_size, shuffle=True, num_workers=args.num_workers)
    loader_validacao = DataLoader(conjunto_validacao, batch_size=args.batch_size, shuffle=False, num_workers=args.num_workers)

    model = ModeloDedup(args, model)
    return model


def principal(args):
    """
    Função principal para execução do código.

    Args:
        args (Namespace): Argumentos do programa.

    """
    # Carregar dados
    dados_texto_bruto = pd.read_csv(_task2rawfile(args))
    coorte = pd.read_csv(args.CAMINHO_COORTE)

    # Carregar modelo e iniciar treinamento
    modelo = treinar_modelo(args, loader_treino, loader_validacao)
    torch.save(modelo.state_dict(), args.CAMINHO_SALVAR_MODELO)


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser()
    # Definir argumentos do parser aqui
    args = parser.parse_args()

    principal(args)