<h1 align="center">NER básico</h1>

Data Scientist.: Dr.Eddy Giusepe Chirinos Isidro

In [2]:
from transformers import BertTokenizerFast, DistilBertTokenizerFast, BatchEncoding, PreTrainedTokenizerFast, TrainingArguments, Trainer
from transformers import BertForTokenClassification, DistilBertForTokenClassification
from tokenizers import Encoding

import itertools
from typing import List, Any, Dict, Union, Set
import json
from pprint import pprint
import os
import gc
from dataclasses import dataclass
from tqdm import tqdm
from pathlib import Path
from pathlib import Path
import re

import torch
from torch.utils.data import Dataset

from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
import numpy as np
import numpy

In [3]:
FORMATO_NER_IOB: str = 'IOB'
FORMATO_NER_BILOU: str = 'BILOU'
IGNORE_LABEL_MODEL_ID: int=-100
IGNORE_LABEL: str='[IGNORE]'

GDRIVE_PATH:str = '/home/eddygiusepe/Imagens/Eddy_codigos/NLP_Transformers/NER_BERT_Deep_Learning/basic_NER_BERT'
DATASET_DDI_TRAIN:str = os.path.join(GDRIVE_PATH, 'dataset', 'ddi_train.json')
DATASET_DDI_TEST:str = os.path.join(GDRIVE_PATH, 'dataset', 'ddi_test.json')
DATASET_WNUT:str = os.path.join(GDRIVE_PATH, 'dataset', 'wnut17train.conll')


MODEL_BASE: str = 'distilbert-base-cased'
MODEL_TRAINED_PATH: str = os.path.join(GDRIVE_PATH, 'model', 'base-ner')
MODEL_TRAINED_LOG: str = os.path.join(MODEL_TRAINED_PATH, 'trainer.log')
LABEL_OUTPUT_PATH: str = os.path.join(MODEL_TRAINED_PATH, 'labelset.txt') 
MODEL_TRAINED_WNUT_PATH: str = os.path.join(GDRIVE_PATH, 'model', 'base-ner-wnut')
MODEL_TRAINED_WNUT_LOG: str = os.path.join(MODEL_TRAINED_WNUT_PATH, 'trainer.log')
LABEL_OUTPUT_WNUT_PATH: str = os.path.join(MODEL_TRAINED_WNUT_PATH, 'labelset.txt') 

PUNCTUATION_LIST = [ ',','.',':',';']

CONTROL_TOKENS = ['[PAD]', '[SEP]', '[CLS]' ]

     

In [4]:
@dataclass
class TrainingBatchExample:
    batch_encoding: BatchEncoding
    labels: List[List[int]]

In [5]:
class NERDataset(torch.utils.data.Dataset):
    def __init__(self, batch: Union[BatchEncoding, TrainingBatchExample], labels: List[List[int]]=None):
        _encodings: BatchEncoding = None
        _labels: List[List[int]] = []

        if isinstance(batch, TrainingBatchExample):
            _encodings = batch.batch_encoding
            _labels = batch.labels
        else:
            _encodings = batch
            _labels = labels

        if ("offset_mapping" in _encodings):
            _encoding.pop("offset_mapping")
        self.encodings = _encodings
        self.labels = _labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)
     

In [6]:
class LabelNER:
    """
        Classe preparada para conter os labels de um treinamento/predição de NER.
        Os labels podem vir sem preparação, ou seja, anotações onde somente o nome do label é informado e não estão presentes os prefixos (IOB ou BILOU).

        load_from_list e load_from_annotations devem ser utilizados no treinamento e o label set deve ser gravado utilizando save().
        No caso de predição ou carga para teste utilizar o load para carregar um label set previamente utilizado.
     
    """

    def __str__(self):
            return f"{len(self.labels_to_id)} labels {str(self.labels_to_id)}"

    def __len__(self):
        return len(self.labels_to_id)

    def __init__(self):
        self.labels_to_id = {}
        self.ids_to_label = {}
        
    def get_label_list(self):
        return self.labels_to_id.keys()

    def get_id_list(self):
        return self.ids_to_label.keys()

    def load_from_complete_list(self, labels: List[str]) -> None:
        '''
            Args: 
                labels(:obj:`List[str]`):
                Lista de labels completa, esperado conter O(utside) e os prefixos de cada entidade.
            
            Carrega a lista de labels "as is" sem tratamento.
        '''
        self.labels_to_id: Dict = {label: id for id, label in enumerate(labels)}
        self.ids_to_label:Dict = {id: label for label, id in self.labels_to_id.items()}
        self._finaliza_carga_labels()

    def load_from_simple_list(self, labels: List[str], ner_label_format: str=FORMATO_NER_IOB) -> None:
        '''
            Args: 
                labels(:obj:`List[str]`):
                    Lista de labels sem prefixo e não contendo o tipo O(utside).
                ner_label_format(:obj:`str`, `optional`, defaults to `"IOB"`):
                    Formato para classificação dos tokens de uma entidade - IOB ou BILOU
            
            Inclui o tipo O(utside) e faz a permutação entre labels e os prefixos do formato informado
        '''
        self.labels_to_id["O"] = 0
        self.ids_to_label[0] = "O"
        num = 0  # in case there are no labels
        prefix_list: str = "BI" if ner_label_format == FORMATO_NER_IOB else "BILU"

        for _num, (label, s) in enumerate(itertools.product(labels, prefix_list)):
            num = _num + 1  # skip 0
            l = f"{s}-{label}"
            self.labels_to_id[l] = num
            self.ids_to_label[num] = l
        
        self._finaliza_carga_labels()

    def load_from_file(self, input_file_path:str) -> None:
        '''
            Args: 
                labelset_file(:obj:`str`):
                    Nome do arquivo contendo a lista de labels previamente gravado. Muito importante ter sido gravado por essa classe ou ter a garantia que o arquivo está com os labels na ordem correta.
            
            Carrega o arquivo com os labels ordenados. O arquivo pode ser construido manualmente, mas deve conter um label por linha, na ordem utilizada para treinar o modelo, já que essa ordem foi criada na 
            extração ou carga dos labels para o treinamento do modelo.
        '''
        with open(input_file_path, 'r' ) as label_file:
            for ind, label in enumerate(label_file):
                label = label.strip('\n')
                self.labels_to_id[label] = ind
                self.ids_to_label[ind] = label

    def _finaliza_carga_labels(self) -> None:
        ''' 
            Adicionar o label de ignorar wordpiece para os casos em que o modelo será treinado nesse formato
        '''
        self.labels_to_id[IGNORE_LABEL] = IGNORE_LABEL_MODEL_ID
        self.ids_to_label[IGNORE_LABEL_MODEL_ID] = IGNORE_LABEL

    def save(self, output_file_path: str):
        with open(output_file_path, 'w' ) as label_file:
            for label in self.labels_to_id:
                label_file.write(label)
                label_file.write('\n')    


    def convert_label_list_to_id_list(self, lista: List[str]) -> List[int]:
        """
            Converte uma lista de labels nos respectivos id`s. Para processamento no modelo essa conversão precisará ser realizada
        """
        return list(map(self.labels_to_id.get, lista))

    def convert_id_list_to_label_list(self, lista: List[int]) -> List[str]:
        """
            Converte uma lista de id`s nos respectivos labels. Para compreensão do resultado retornado pelo modelo essa conversão será necessária.
        """
        return list(map(self.ids_to_label.get, lista))  

     

In [7]:
def align_tokens(batch_encoding :BatchEncoding, 
                list_annotations: Union[List[List[Dict]],List[List[str]]], 
                is_span_annotations: bool=True, 
                ignore_word_piece: bool = False,
                ner_label_format: str=FORMATO_NER_IOB,
                label_ner: LabelNER=None) -> TrainingBatchExample:

    """

    Alinhamento dos tokens e respectivos labels para tratar a disparidade gerada pelos Tokenizadores Bert quando geram Worpieces.
    As anotações foram criadas levando em conta palavras, mas os tokenizadores incluem partes de palavras (wordpieces), exemplo:
    Starbucks --> Star, ##bu, ##cks. A anotação está mapeada para uma entrada de B-Location, contudo três entradas deverão ser tratadas.

    Args:
        batch_encoding (:obj:`BatchEncoding`):
            Batch encoding previamente tokenizado (input_ids, attention_mask). 
            Contém lista de Encodings, ou seja, cada sentença é um encoding e o BatchEncoding contém todos eles.
            O modelo é preparado para receber esse tipo de dado, por isso sua estrutura será preservada.
        
        is_span_annotations(:obj:`bool`, `optional`, defaults to `True`):
            Informa se a lista de anotações está no formato span ou se cada token já está classificado com seu respectivo label.
            Se for span então o parametro list_annotations será do tipo List[List[Dict]]
            Os spans são entradas com inicio e fim de uma faixa de caracteres onde classificados com a entidadade informada. A classificação nesse caso é somente da entidade, não é esperado prefixo IOB ou BILOU.
                
            Se NÃO for span então o parametro list_annotations será do tipo List[List[int]]
            No caso de tokens já classificados (lista de string) espera-se receber os prefixos IOB ou BILOU.

            Ambos os casos a classificação existente está por token, o alinhamento resolverá o problema de alinhamento token --> wordpiece.

        ignore_word_piece(:obj:`bool`, `optional`, defaults to `False`):
            Informa o que fazer com os tokens wordpiece. Se eles forem ignorados então serão treinados com o label -100 (ignorados), caso contrário serão tratados como tokens normais,
            podendo receber I- ou L- (formato BILOU)

        ner_label_format(:obj:`bool`, `optional`, defaults to `"IOB"`):
            Formato de classificação de labels, espera o formato IOB ou BILOU

        label_ner(:obj:`LabelNER`):
            Objeto da classe LabelNER que contém os labels e seus respectivos id's. Será utilizada para converter os labels de string para seus respectivos identificadores.

        Returns:
            :obj:`TrainingBatchExamples`:
                TrainingBatchExamples contendo o BatchEncoding passado e os labels alinhados

    """

    list_aligned_labels: List[List[int]] = []

    for ind_encoding in range(len(batch_encoding.encodings)):
        encoding: Encoding = batch_encoding[ind_encoding]
        annotations = list_annotations[ind_encoding]

        if is_span_annotations:
            aligned_labels_str: List[str] = align_tokens_from_span_annotations(encoding,
                                                                               annotations,
                                                                               ignore_word_piece,
                                                                               ner_label_format)
        else:
           aligned_labels_str: List[str] = align_tokens_from_token_tags(encoding,
                                                                        annotations,
                                                                        ignore_word_piece)     
           
        list_aligned_labels.append(label_ner.convert_label_list_to_id_list(aligned_labels_str))

    training_batch = TrainingBatchExample(batch_encoding, list_aligned_labels)
    
    return training_batch

     

In [8]:
def align_tokens_from_span_annotations(encoding: Encoding, 
                                       annotations: List[Dict],                                     
                                       ignore_word_piece: bool = False,
                                       ner_label_format: str=FORMATO_NER_IOB):
    """

    Construção do array de labels de acordo com as anotações por span, tratando alinhamento de tokens com wordpieces.
    
    Args:
        encoding (:obj:`Encoding`):
            Enconding da sentença previamente tokenizada (input_ids, attention_mask)            
        
        annotations(:obj:`List[Dict]`)
            Lista de spans das entidades, ou seja, cada entrada do dicionário contém o inicio e fim de uma entidade que pode conter uma ou mais palavras.
            A função espera receber anotações com as seguintes entradas:
                - start - inicio do span
                - end - fim do span
                - label - label do span. Não é esperado que o label contenha os prefixos IOB ou BILOU, somente o nome do label.

        ignore_word_piece(:obj:`bool`, `optional`, defaults to `False`):
            Informa o que fazer com os tokens wordpiece. Se eles forem ignorados então serão treinados com o label -100 (ignorados), caso contrário serão tratados como tokens normais,
            podendo receber I- ou L- (formato BILOU)

        ner_label_format(:obj:`bool`, `optional`, defaults to `"IOB"`):
            Formato de classificação de labels, espera o formato IOB ou BILOU

        Returns:
            :obj:`List[str]`:
                labels alinhados com os tokens, ignorando ou incluindo nos wordpieces na classificação (parametro ignore_word_piece).

    """

    
    # Inicialização com BILOU e ajusta caso necessário
    unique_prefix: str='U'
    begin_prefix: str='B'
    inside_prefix: str='I'
    last_prefix: str='L'

    if ner_label_format == FORMATO_NER_IOB:
        unique_prefix = 'B'
        last_prefix = 'I'

    tokens = encoding.tokens
    #print(tokens)
    aligned_labels: List[str] = ["O"] * len(tokens)  # Make a list to store our labels the same length as our tokens
    for anno in annotations:
        annotation_token_ix_set = (set())  # A set that stores the token indices of the annotation
        for char_ix in range(anno["start"], anno["end"]):
            token_ix = encoding.char_to_token(char_ix)
            if token_ix is not None:
                if not tokens[token_ix] in PUNCTUATION_LIST: # alguns datasets incluem caracteres de pontuação na entidade por falha nas anotações
                    annotation_token_ix_set.add(token_ix)

        if len(annotation_token_ix_set) == 1:
            # If there is only one token
            token_ix = annotation_token_ix_set.pop()
            aligned_labels[token_ix] = f"{unique_prefix}-{anno['label']}"

        else:
            last_token_in_anno_ix = len(annotation_token_ix_set) - 1
            for num, token_ix in enumerate(sorted(annotation_token_ix_set)):
                ignorar_token: bool=False

                #tratamento worpiece
                if ignore_word_piece:
                    if token_ix > 1: #[CLS] e o primeiro token não serão wordpieces com certeza e o offets deles compromete a lógica [CLS] (0,0) [1o token] (0,1)
                        offset_ini = encoding.offsets[token_ix][0]
                        offset_end_anterior = encoding.offsets[token_ix-1][1]
                        if offset_ini == offset_end_anterior:
                            ignorar_token = True
                            aligned_labels[token_ix] = IGNORE_LABEL        

                if not ignorar_token:
                    if num == 0:
                        prefix = begin_prefix
                    elif num == last_token_in_anno_ix:
                        prefix = last_prefix
                    else:
                        prefix = inside_prefix
                    aligned_labels[token_ix] = f"{prefix}-{anno['label']}"
    return aligned_labels
     

In [9]:
def align_tokens_from_token_tags(encoding: Encoding, tag_list: List[str], ignore_word_piece: bool = False):
    """
        Anotações por tokens são feitas em arquivos já tokenizados, não são strings, são linhas de tokens e classificação. O offset para identificação de wordpiece 
        deve ser tratado diferente das strings com anotações por span. Tokens que possuem o primeiro item do offset > 0 e o segundo item diferente de zero são wordpiece 
        (a segunda parte da crítica seria para o [CLS] ou [SEP]).
        Somente o formato IOB será tratado nessa função de alinhamento.

        Args:
        encoding (:obj:`Encoding`):
            Enconding da sentença previamente tokenizada (input_ids, attention_mask)            
        
        tag_list(:obj:`List[str]`)
            Lista com os labels por token. Não possui o tratamento do worpiece. len(label_list) <= len(enconding.tokens)

        ignore_word_piece(:obj:`bool`, `optional`, defaults to `False`):
            Informa o que fazer com os tokens wordpiece. Se eles forem ignorados então serão treinados com o label -100 (ignorados), caso contrário serão tratados como tokens normais,
            podendo receber I- ou L- (formato BILOU)
        
        Returns:
            :obj:`List[str]`:
                labels alinhados com os tokens, ignorando ou incluindo nos wordpieces na classificação (parametro ignore_word_piece).
    """

    aligned_labels: List[str] = ["O"] * len(encoding.tokens)
    ind_label: int = 0
    last_token_label: str = None

    for ind_token, token_input_id in enumerate(encoding.ids):
        offsets = encoding.offsets
        if ind_token == 0: #ignora o [CLS]
            continue
        if (offsets[ind_token][0] > 0):
            if last_token_label is not None:
                if ignore_word_piece:
                    aligned_labels[ind_token] = IGNORE_LABEL
                else:
                    aligned_labels[ind_token] = last_token_label.replace("B-","I-") 
        else:
            # tratamento dos [PAD]`s que vão além da lista de labels
            label: str = tag_list[ind_label] if ind_label < len(tag_list) else "O"
            aligned_labels[ind_token] = label
            last_token_label = label if label != "O" else None                    
            ind_label = ind_label + 1
    
    return aligned_labels
     

In [10]:
# Liberar Recursos da GPU
def destroy_model(model: Union[DistilBertForTokenClassification,BertForTokenClassification]):
    del model
    gc.collect()
    torch.cuda.empty_cache()

def destroy_tokenizer(tokenizer: Union[BertTokenizerFast, DistilBertTokenizerFast]):
    del tokenizer
    gc.collect()
    torch.cuda.empty_cache()
     

In [11]:
def prepara_NER_prediction_for_metrics(tokenizer: Union[BertTokenizerFast, DistilBertTokenizerFast], 
                                       batch_encoding: BatchEncoding, 
                                       test_labels: numpy.ndarray, 
                                       pred_labels: numpy.ndarray,
                                       ignorar_wordpiece: bool):
    """
        Ajusta da predição realizada pelo NER para avaliação das métricas, tratando wordpieces caso necessário.
        Os seguintes tokens de controle devem ser ignorados: [PAD], [SEP], [CLS]. Ignorar representa atribuir o valor correto à predição, 
        já que a predição que eventualmente foi predita será ignorada na métrica do teste e no uso ordinário.

        Args:
            tokenizer (:obj:`Union[BertTokenizerFast,DistilBertTokenizerFast]`):
                Tokenizer utiilzado no modelo e que tokenizou o batch_encoding.

            batch_encoding (:obj:`BatchEncoding`):
                Batch enconding das sentenças tokenizadas utilizando o tokenizer passado por parametro. Será usado caso os wordpieces sejam ignorados.
            
            test_labels (:obj:`numpy.ndarray(int)`):
                Array com os valores corretos dos labels.
            
            pred_labels (:obj:`numpy.ndarray(int)`):
                Array com os valores inferido pelo modelo.

            ignorar_wordpiece (:obj:`bool`):
                Caso afirmativo os wordpieces serão ignorados e receberão o valor correto, já que o importante seria sua primeira parte, que identifica o token

            
        Returns:
            :obj:`numpy.ndarray(int)`:
                Vetor de predição preparado para aplicação de métricas.
    """
    control_encodings:BatchEncoding = tokenizer(CONTROL_TOKENS)
    # [CLS] e [SEP] são adicionados pelo tokenizador gerando 3 * numero de tokens de controle, por isso foi gerado um set
    ignored_tokens = set([token_id for control_sent in control_encodings.input_ids for token_id in control_sent])
    
    for ind_encoding in range(len(batch_encoding.encodings)):
        encoding:Encoding = batch_encoding[ind_encoding]
        
        for ind_token,input_id in enumerate(encoding.ids):
            if input_id in ignored_tokens:
                if pred_labels[ind_encoding][ind_token] != test_labels[ind_encoding][ind_token]:
                    #print(input_id,' [', ind_encoding, ',', ind_token, '] -', pred_labels[ind_encoding][ind_token], '--> ', test_labels[ind_encoding][ind_token] )
                    pred_labels[ind_encoding][ind_token] = test_labels[ind_encoding][ind_token]


        if ignorar_wordpiece:
            for ind_token, offset in enumerate(encoding.offsets):
                if ind_token <=1:
                    continue
                offset_ini = encoding.offsets[ind_token][0]
                offset_end_anterior = encoding.offsets[ind_token-1][1]
                if offset_ini == offset_end_anterior:
                    pred_labels[ind_encoding,ind_token] = test_labels[ind_encoding,ind_token]

    return pred_labels
     

In [12]:
# Retorna os tensores que estão em uso na memória da GPU

def get_gpu_memory_status():
    total = (torch.cuda.get_device_properties(0).total_memory)/(1024 **2)
    reserved = (torch.cuda.memory_reserved(0))/(1024 **2)
    allocated = (torch.cuda.memory_allocated(0))/(1024 **2)
    return f"Total: {total:.2f} | Reserved: {reserved:.2f} | Allocated: {allocated:.2f}"

def print_gpu_use():
    result = []
    for tracked_object in gc.get_objects():
        if torch.is_tensor(tracked_object):
            shape = tracked_object.shape
            result.append({
                'name': type(tracked_object).__name__,
                '1d': len(shape)==1,
                '2d': len(shape)==2,
                'nrows': shape[0] if (len(shape) > 0) else None,
                'ncols': shape[1] if (len(shape) > 1) else None,
                'gpu': tracked_object.is_cuda,
                'pinned': tracked_object.is_pinned()
            })
        
    d = pd.DataFrame(result)
    d.groupby('name')['gpu', 'pinned', '1d', '2d'].sum()
    print(d)




In [13]:
def plot_grafico_trainer_loss(trainer_loss: numpy.ndarray, validation_loss: numpy.ndarray):
    # Use plot styling from seaborn.
    sns.set(style='darkgrid')

    # Increase the plot size and font size.
    sns.set(font_scale=1.5)
    plt.rcParams["figure.figsize"] = (12,6)

    # Plot the learning curve.
    plt.plot(trainer_loss[:,0],trainer_loss[:,1] , label="training loss")
    plt.plot(validation_loss[:,0], validation_loss[:,1] , label="validation loss")

    # Label the plot.
    plt.title("Learning curve")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()

    plt.show()
     


# 1. Treinamento com Anotações

In [14]:
# carregamento e alteração do campo para ficar similar ao esperado pela função que faz os alinhamentos
label_set: Set[str] = set()
with open(DATASET_DDI_TRAIN) as json_ds_file:
    raw = json.load(json_ds_file)
    for example in raw:
        # our simple implementation expects the label to be called label, so we adjust the original data
        for anno in example["annotations"]:
            anno["label"] = anno["tag"]
            label_set.add(anno["label"])

label_list: List[str] = list(label_set)
     

FileNotFoundError: [Errno 2] No such file or directory: '/home/eddygiusepe/Imagens/Eddy_codigos/NLP_Transformers/NER_BERT_Deep_Learning/basic_NER_BERT/dataset/ddi_train.json'