# Classificação de tokens

## Carrega bibliotecas

In [1]:
import os
import csv
import json

import torch
import numpy as np
import pandas as pd

import pprint

from datasets import Dataset
from sklearn.metrics import roc_auc_score, accuracy_score

from transformers import BertTokenizer, BertForSequenceClassification, TrainingArguments, Trainer, EvalPrediction

## Define arquivo de parâmetros

* Se existir um arquivo em `learning/instance/parameters.json`, usa este arquivo;
* Se não existir, usa as definições-padrão abaixo

In [17]:
import os
current_path = os.path.abspath('.')

if os.path.basename(current_path) != 'learning':
    raise FileNotFoundError('Execute este notebook a partir da pasta \'learning\'!')

parameters_path = os.path.join(current_path, 'instance', 'parameters.json')

if os.path.exists(parameters_path):
    status = f'usando arquivo de parâmetros em {parameters_path}'
    with open(parameters_path, 'r', encoding='utf-8') as read_file:
        parameters = json.load(read_file)
else:
    status = (f'arquivo com parâmetros não encontrado em {parameters_path}'
              f'usando definições do próprio notebook')
    
    parameters = {
        "model_name": "neuralmind/bert-base-portuguese-cased",
        "num_train_epochs": 3,
        "use_cpu": False,
        "repo_owner": "COPLIN-UFSM",
        "repo_name": "nlp-data",
        "remote_dataset_path": 'data/token_classification/input/annotated.jsonl',
        "local_dataset_path": r'C:\Users\henry\Projects\COPLIN-UFSM\nlp\learning\instance\annotated.jsonl',
        "input_column": "text",
        "val_size": 0.2,
        "output_dir": "instance/models",
        "output_model_name": "student-token-classification",
        "batch_size": 128,
        "optim": "adamw_torch",
        "problem_type": "token_classification",
        "max_length": 128,
        "class_name": ["positive", "negative"],
        "auto_find_batch_size": True,
        "push_to_hub": False,
        "github_access_token": None
    }

print(f'\n{status}')


usando arquivo de parâmetros em C:\Users\henry\Projects\COPLIN-UFSM\nlp\learning\instance\parameters.json


## Carrega dataset

* Se o dataset não estiver presente, armazena-o nos arquivos temporários do computador
* Se estiver presente, verifica o MD5 para garantir que é o mesmo armazenado remotamente
* Se estiver presente e não for o mesmo remoto, baixa novamente

In [16]:
import numpy as np
import pandas as pd
import requests
from io import StringIO

def get_dataset(parameters):
    def __do_request__(path):
        return requests.get(
            f'https://api.github.com/repos/{owner}/{repo}/contents/{path}',
            headers={
                'accept': 'application/vnd.github.v3.raw',
                'authorization': f'token {token}'
            }
        )
    
    # parâmetros da requisição
    owner = parameters['repo_owner']
    repo = parameters['repo_name']
    local_dataset_path = parameters['local_dataset_path']
    remote_dataset_path = parameters['remote_dataset_path']
    md5_remote_path = remote_dataset_path[:remote_dataset_path.index('.')] + '.md5'
    md5_local_path = local_dataset_path[:local_dataset_path.index('.')] + '.md5'
    token = parameters['github_access_token'] 
    
    # solicita o md5 do dataset antes
    md5_remote_contents = __do_request__(md5_remote_path).text
    if os.path.exists(md5_local_path):
        with open(md5_local_path, 'r') as read_file:
            md5_local_contents = read_file.read()
    else:
        md5_local_contents = None
    
    print(f' local md5: {md5_local_contents}')
    print(f'remote md5: {md5_remote_contents}')
        
    if md5_local_contents != md5_remote_contents:
        print('md5 não compatível; Lendo dataset do repositório do GitHub')
        # escreve md5 remoto
        with open(md5_local_path, 'w') as write_file:
            write_file.write(md5_remote_contents)
      
        # converte texto em dataframe
        df = pd.read_json(StringIO(__do_request__(remote_dataset_path).text), lines=True)
        # escreve dataframe em disco
        with open(os.path.join('instance', parameters['local_dataset_path']), 'w', encoding='utf-8') as write_file:
            for i, row in df.iterrows():
                write_file.write(row.to_json(force_ascii=False) + '\n')    
    else:
        # lê dataframe do disco
        print('md5 compatível; Lendo dataset do disco local')
        df = pd.read_json(parameters['local_dataset_path'], lines=True)
        
    return df

df = get_dataset(parameters)
display(df)

 local md5: fafc3427ee648e28a253f214a2976c43
remote md5: fafc3427ee648e28a253f214a2976c43
md5 compatível; Lendo dataset do disco local


Unnamed: 0,id,text,ID_REFEICAO,Avaliação Variedade e Qualidade,Avaliação Atendimento,Avaliação Organização e Limpeza,label,Comments
0,29875,o feijão possui muitos gases devido ao método ...,15269107,3,5,5,"[[0, 111, comida/bebida], [0, 111, negativo]]",[]
1,29876,poucos legumes e o arroz poderia estar um pouc...,15269042,3,5,5,"[[0, 59, comida/bebida], [0, 59, negativo], [6...",[]
2,29877,"a comida estava boa e com a marmita do almoço,...",15268993,5,5,5,"[[0, 63, comida/bebida], [0, 63, positivo]]",[]
3,29878,muito arroz. já esta virando desperdício.,15269191,3,3,3,"[[0, 41, negativo], [0, 41, organização]]",[]
4,29879,"a batata poderia ser mais cozida, estava crua ...",15268862,4,5,5,"[[0, 51, negativo], [0, 52, comida/bebida], [5...",[]
...,...,...,...,...,...,...,...,...
10923,40798,sou vegana e tinha ovo como opção de proteína,17905614,1,4,3,"[[0, 45, negativo], [0, 45, outras dietas]]",[]
10924,40799,"por favor quando servirem lentilha, sirvam fei...",17897771,2,5,5,"[[0, 56, comida/bebida], [0, 56, negativo], [0...",[]
10925,40800,"execelente café, o cafe poderia ser disponibil...",17622301,5,5,5,"[[0, 16, comida/bebida], [0, 17, positivo], [1...",[]
10926,40801,faltou sal na galinha.,17625116,5,5,5,"[[0, 22, comida/bebida], [0, 22, negativo]]",[]


## Define funções de cálculo de métricas

In [None]:
def multi_label_metrics(predictions, labels, threshold=0.5):
    """
    Calcula métricas de um modelo multi-rótulo. Adaptação do código-fonte de
        https://jesusleal.io/2021/04/21/Longformer-multilabel-classification/
    """
    # first, apply sigmoid on predictions which are of shape (batch_size, num_labels)
    sigmoid = torch.nn.Sigmoid()
    probs = sigmoid(torch.Tensor(predictions))
    # next, use threshold to turn them into integer predictions
    y_pred = np.zeros(probs.shape)
    y_pred[np.where(probs >= threshold)] = 1
    # finally, compute metrics
    y_true = labels
    roc_auc = roc_auc_score(y_true, y_pred, average='micro')
    accuracy = accuracy_score(y_true, y_pred)
    # return as dictionary
    metrics = {
       'roc_auc': roc_auc,
       'accuracy': accuracy
    }
    return metrics


def compute_metrics(p: EvalPrediction) -> dict:
    preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions
    result = multi_label_metrics(
        predictions=preds,
        labels=p.label_ids
    )
    return result

In [None]:
def get_dataset_metadata(df: pd.DataFrame, class_labels: str | list) -> tuple:
    """
    Coleta metadados de um DataFrame.

    :param df: DataFrame para o qual os metadados serão coletados
    :param class_labels: Um dos dois: o nome da coluna no DataFrame com os rótulos, ou uma lista com o nome dos rótulos
    :return: Uma tupla com os seguintes itens: dataframe (pd.DataFrame), número de rótulos (int), lista com rótulos
        (list), nome da coluna com o atributo classe (str), label2id (dict), id2label (dict)
    """

    if isinstance(class_labels, str):
        labels = sorted(df[class_labels].unique())  # type: list
    elif isinstance(class_labels, list):
        labels = sorted(class_labels)
    else:
        raise TypeError('Tipo desconhecido para o parâmetro class_name (deve ser str ou list)')

    label2id = {k: i for i, k in enumerate(labels)}
    id2label = {i: k for k, i in label2id.items()}
    num_labels = len(labels)

    return num_labels, labels, label2id, id2label


def preprocess_data(
        dataset: Dataset, tokenizer: BertTokenizer,
        label2id: dict, input_column: str, max_length: int
):
    # take a batch of texts

    text = dataset[input_column]
    # encode them
    encoding = tokenizer(text, padding='max_length', truncation=True, max_length=max_length)
    # add labels
    labels_batch = {k: dataset[k] for k in dataset if k in list(label2id.keys())}
    # create numpy array of shape (batch_size, num_labels)
    labels_matrix = np.zeros((len(text), len(label2id)))
    # fill numpy array
    for label, idx in label2id.items():
        labels_matrix[:, idx] = labels_batch[label]

    encoding['label'] = labels_matrix.tolist()

    return encoding


def tokenize_and_get_metadata(
        df: pd.DataFrame, tokenizer: BertTokenizer, input_column: str, classe_name: str | list,
        max_length: int, batch_size: int = 8
):
    num_labels, labels, label2id, id2label = get_dataset_metadata(df, classe_name)

    dataset = Dataset.from_pandas(df)

    tokenized = dataset.map(
        lambda x: preprocess_data(x, tokenizer, label2id, input_column, max_length),
        batched=True, batch_size=batch_size
    )
    tokenized.set_format('torch')
    return tokenized, num_labels, labels, label2id, id2label


def tokenize_datasets(tokenizer: BertTokenizer, original_sets: dict, parameters: dict) -> tuple:
    tokenized_sets = {}
    num_labels = None
    labels = None
    label2id = None
    id2label = None
    for name in original_sets.keys():
        tokenized_sets[name], num_labels, labels, label2id, id2label = tokenize_and_get_metadata(
            original_sets[name], tokenizer, parameters['input_column'], parameters['class_name'],
            parameters['max_length'], parameters['batch_size']
        )

    return tokenized_sets, num_labels, labels, label2id, id2label


def do_train_model(
    tokenizer: BertTokenizer, parameters: dict, tokenized_sets: dict, num_labels: int, id2label: dict, label2id: dict
) -> tuple:
    

    return tokenizer, trainer


def evaluate_on_test_set(trainer: Trainer, test_set=None) -> dict:
    """
    Avalia desempenho do modelo no conjunto de teste, se este existir

    :param trainer: Treinador do modelo
    :param test_set: Conjunto de teste
    :return: Um dicionário com as métricas de desempenho
    """
    if test_set is not None:
        res = trainer.evaluate(test_set)
        print('Resultados no conjunto de teste:')
        for k, v in res.items():
            print(f'{k}: {v}')
        return res
    return {}


def main(parameters, original_sets) -> None:
    device = get_device(parameters['use_cpu'])
    print(f'using {device} as device')

    tokenizer = BertTokenizer.from_pretrained(parameters['model_name'], do_lower_case=False)  # type: BertTokenizer
    tokenized_sets, num_labels, labels, label2id, id2label = tokenize_datasets(tokenizer, original_sets, parameters)
    tokenizer, trainer = do_train_model(tokenizer, parameters, tokenized_sets, num_labels, id2label, label2id)
    tokenizer.save_pretrained(os.path.join(parameters['output_dir'], parameters['output_model_name']))
    trainer.save_model(os.path.join(parameters['output_dir'], parameters['output_model_name']))
    evaluate_on_test_set(trainer, tokenized_sets['test'])

## Pega dispositivo onde o treinamento será executado

In [None]:
def get_device(use_cpu: bool = False) -> str:
    if not use_cpu:
        device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
    else:
        device = 'cpu'

    return device

## Treina modelo

In [None]:
# carrega um modelo pré-treinado com uma camada totalmente conectada nova no fim do modelo
model = BertForSequenceClassification.from_pretrained(
    parameters['model_name'], num_labels=num_labels, id2label=id2label, label2id=label2id,
    problem_type=parameters['problem_type']
)

training_args = TrainingArguments(
    output_dir=parameters['output_dir'],
    eval_strategy='epoch',
    save_strategy='epoch',
    save_total_limit=1,
    num_train_epochs=parameters['num_train_epochs'],
    # use_cpu=parameters['use_cpu'],
    optim=parameters['optim'],
    load_best_model_at_end=True,
    auto_find_batch_size=parameters['auto_find_batch_size'],
)

trainer = Trainer(
    model=model,
    tokenizer=tokenizer,
    args=training_args,
    train_dataset=tokenized_sets['train'],
    eval_dataset=tokenized_sets['val'],
    compute_metrics=compute_metrics,
)

trainer.train()