# Criar um Dataset Anotado Sintético com dados para avaliar LGPD


In [None]:
import pandas as pd

df = pd.read_csv("./output/dataset_sintetico_0_50.csv")
print("Qnt: ", len(df))
df.head(5)

In [None]:
DADOS_FAKE = df.to_dict(orient="records")
DADOS_FAKE[:1]

## Carregando Dados


### Função para converter em Tokens e Input IDs


In [None]:
import re

LABELS = [
    "O",
    "B-NOME",
    "I-NOME",
    "B-DATA",
    "I-DATA",
    "B-ENDERECO",
    "I-ENDERECO",
    "B-CPF",
    "I-CPF",
    "B-TELEFONE",
    "I-TELEFONE",
    "B-EMAIL",
    "I-EMAIL",
    "B-DINHEIRO",
    "I-DINHEIRO",
    "B-CEP",
    "I-CEP",
]


def split_text_like_conll(text):
    # A expressão regular. Separe também e deixe junto símbolos como R$
    pattern = r"\w+|[^\w\s]|[0-9]+[.,][0-9]+"
    words_split = re.findall(pattern, text)
    return words_split


def convert_words_to_labels_ids(sentence="", labels=LABELS, dados={}):
    words = split_text_like_conll(sentence)
    labels_ids = [0] * len(words)

    for label_type, entries in dados.items():
        b_label = "B-" + label_type
        i_label = "I-" + label_type

        for entry in entries:
            entry_parts = split_text_like_conll(entry)
            for start_index in range(len(words) - len(entry_parts) + 1):
                if words[start_index : start_index + len(entry_parts)] == entry_parts:
                    if b_label in labels:
                        labels_ids[start_index] = labels.index(b_label)
                        for j in range(1, len(entry_parts)):
                            if i_label in labels:
                                labels_ids[start_index + j] = labels.index(i_label)

    return words, labels_ids


# ===========

TEXT_LOADED = "Marinalva Bete Raz e Jorge Luiz receberam R$ 3.829,83 reais."
DADOS_LOADED = {
    "NOME": ["Marinalva Bete Raz", "Jorge Luiz"],
    "DINHEIRO": ["R$ 3.829,83"],
}

output_words, output_labels = convert_words_to_labels_ids(
    TEXT_LOADED, dados=DADOS_LOADED
)
print(output_words)
print(output_labels)

### Carregando o Modelo atual


In [None]:
from transformers import pipeline, AutoTokenizer
import torch


def get_transformer():
    model_name = "pierreguillou/ner-bert-large-cased-pt-lenerbr"
    tokenizer = AutoTokenizer.from_pretrained(
        model_name,
        max_length=512,
        model_max_length=512,
        truncation=True,
    )
    return pipeline(
        "ner",
        tokenizer=tokenizer,
        model=model_name,
        aggregation_strategy="first",
        device=0 if torch.cuda.is_available() else -1,
    )


TRANSFORMER_MODEL = get_transformer()

In [None]:
from utils.ModeloNerUtils import (
    ModeloNerUtils,
)

ModeloNerUtilsObj = ModeloNerUtils(transformer=TRANSFORMER_MODEL)


def model_fun(text):
    PARAM = {
        "remover_labels": [
            "LEGISLACAO",
            "ORGANIZACAO",
            "NUMERO",
            "JURISPRUDENCIA",
            "CNPJ",
        ],
        "merges": {
            "NOME": ["PESSOA"],
            "DATA": ["TEMPO"],
            "ENDERECO": ["LOCAL"],
            "CPF": ["CPF"],
            "TELEFONE": ["TELEFONE"],
            "EMAIL": ["EMAIL"],
            "DINHEIRO": ["DINHEIRO"],
            "CEP": ["CEP"],
        },
    }
    ents = ModeloNerUtilsObj.merge_all_models(text)
    ents = ModeloNerUtilsObj.remove_labels(ents, PARAM["remover_labels"])
    ents = ModeloNerUtilsObj.merge_labels_from_to(ents, PARAM["merges"])
    return ents


resposta_gerada = model_fun(TEXT_LOADED)
resposta_gerada

In [None]:
def convert_transformer_to_dict(ents):
    result = {}
    for ent in ents:
        entity_group = ent["entity_group"]
        word = ent["word"]
        if entity_group not in result:
            result[entity_group] = []
        result[entity_group].append(word)
        result[entity_group] = list(set(result[entity_group]))
    return result


# =========

resp_model = convert_transformer_to_dict(model_fun(TEXT_LOADED))
resp_model

In [None]:
convert_words_to_labels_ids(
    TEXT_LOADED, dados=convert_transformer_to_dict(model_fun(TEXT_LOADED))
)

### Carregando os Dados Fake Anotados


In [8]:
# texto_teste_1_sintetico = {
#     "texto": "Marinalva Bete Raz e Jorge Luiz reclama por indenização no valor de R$ 82.662,00 e da-costamarcos-vinicius@example.net e CEP 85123-123. Com endereço no Trevo Alves, 453, Carlos Prates, 27929672 Farias/AP, mesmo assim.",
#     "dados_sinteticos": {
#         "NOME": ["Marinalva Bete Raz", "Jorge Luiz"],
#         "DINHEIRO": ["R$ 82.662,00"],
#         "CEP": ["85123-123"],
#         "EMAIL": ["da-costamarcos-vinicius@example.net"],
#         "ENDERECO": ["Trevo Alves, 453\\nCarlos Prates\\n27929672 Farias / AP"],
#     },
# }

# # ======
# print(
#     convert_words_to_labels_ids(
#         texto_teste_1_sintetico["texto"],
#         dados=texto_teste_1_sintetico["dados_sinteticos"],
#     )
# )

## Avaliação


### Rodando a Avaliação


In [None]:
DADOS_AVALIACAO_GERADOS = []

for item in DADOS_FAKE:
    try:
        from_model = convert_words_to_labels_ids(
            item["texto"], dados=convert_transformer_to_dict(model_fun(item["texto"]))
        )

        DADOS_AVALIACAO_GERADOS.append(from_model)
    except Exception as e:
        print("Erro: ", e)
        print("Texto: ", item)

# ===========

print("\n\nQnt DADOS_AVALIACAO_GERADOS: ", len(DADOS_AVALIACAO_GERADOS))
# print("\nDADOS_AVALIACAO_GERADOS: ", DADOS_AVALIACAO_GERADOS[:2])

In [10]:
# Save locally DADOS_AVALIACAO_GERADOS and load after
# import pickle

# Save
# with open("./output/DADOS_AVALIACAO_GERADOS.pkl", "wb") as f:
#     pickle.dump(DADOS_AVALIACAO_GERADOS, f)

# Load
# with open("./output/DADOS_AVALIACAO_GERADOS.pkl", "rb") as f:
#     DADOS_AVALIACAO_GERADOS = pickle.load(f)

In [None]:
import json

DADOS_AVALIACAO_SINTETICOS = []

for item in DADOS_FAKE:
    # print(type(item["dados_sinteticos"]), item["dados_sinteticos"])
    try:
        from_sint = convert_words_to_labels_ids(
            item["texto"],
            # dados=json.loads(item["dados_sinteticos_merged"].replace("'", '"')),
            dados=json.loads(item["dados_sinteticos"].replace("'", '"')),
        )

        DADOS_AVALIACAO_SINTETICOS.append(from_sint)
    except Exception as e:
        print("Erro: ", e)
        print("Texto: ", item)

# ===========

print("\n\nQnt DADOS_AVALIACAO_SINTETICOS: ", len(DADOS_AVALIACAO_SINTETICOS))
# print("\nDADOS_AVALIACAO_SINTETICOS: ", DADOS_AVALIACAO_SINTETICOS[:2])

In [None]:
ID2LABEL = {i: label for i, label in enumerate(LABELS)}
ID2LABEL

https://huggingface.co/spaces/evaluate-metric/seqeval


In [13]:
# !pip install seqeval

In [None]:
from datasets import load_metric


def run_avaliacao(dados_sinteticos, dados_gerados, id2label={}):
    metric = load_metric("seqeval", trust_remote_code=True)

    # Get predictions and references
    predictions = [d[1] for d in dados_gerados]
    references = [d[1] for d in dados_sinteticos]

    # Checking if the lengths are the same
    for ind, arr in enumerate(predictions):
        size_predictions = len(predictions[ind])
        size_references = len(references[ind])

        if size_predictions != size_references:
            print(
                f"Error Index {ind}: The number of predictions ({size_predictions}) is different from the number of references ({size_references})"
            )
            # Adding more zeros to the end of the smallest list
            if size_predictions < size_references:
                for i in range(size_references - size_predictions):
                    predictions[ind].append(0)
            else:
                for i in range(size_predictions - size_references):
                    references[ind].append(0)

    # Convert id to label
    predictions_labels = []
    for preds in predictions:
        predictions_labels.append([id2label[pred] for pred in preds])

    references_labels = []
    for refs in references:
        references_labels.append([id2label[ref] for ref in refs])

    results = metric.compute(
        predictions=predictions_labels, references=references_labels, zero_division=0
    )
    return results


# ==========

d_sin = [
    [["texto"], [1, 1, 1, 0, 3, 0, 6]],
    [["texto"], [1, 1, 0, 2, 2]],
    [["texto"], [1, 1, 0, 2, 2]],
]
d_ger = [
    [["texto"], [1, 1, 1, 0, 0, 0, 6]],
    [["texto"], [1, 1, 0, 0, 2]],
    [["texto"], [1, 1, 0, 2, 2, 0]],
]

run_avaliacao(d_sin, d_ger, ID2LABEL)

#### Avaliação F1 para o Artigo


In [None]:
resp = run_avaliacao(DADOS_AVALIACAO_SINTETICOS, DADOS_AVALIACAO_GERADOS, ID2LABEL)
resp

In [16]:
input_json_order = [
    "NOME",
    "DATA",
    "ENDERECO",
    "CPF",
    "TELEFONE",
    "EMAIL",
    "DINHEIRO",
    "CEP",
]

# Reorganizar o input_json de acordo com input_json_order
input_json = resp
input_json_new = {key: input_json[key] for key in input_json_order}

# Adicionar as métricas "overall" ao final
input_json_new["overall_precision"] = input_json["overall_precision"]
input_json_new["overall_recall"] = input_json["overall_recall"]
input_json_new["overall_f1"] = input_json["overall_f1"]
input_json_new["overall_accuracy"] = input_json["overall_accuracy"]

# input_json_new

#### Tabela Latex


In [None]:
def gerar_tabela_latex_de_avaliacao(
    input_json, titulo="Avaliação do novo Modelo NER", round_num=2
):
    # Cabeçalho da tabela LaTeX
    tabela_latex = (
        """\\begin{table}[H]
    \\centering
    \\caption{"""
        + titulo
        + """}
    \\label{tab:avaliacao_ner_com_correcoes}
    \\begin{tabular}{|l|l|l|l|l|}
    \\hline
    \\textbf{Entity}  & \\textbf{Precision} & \\textbf{Recall} & \\textbf{F1-Score} & \\textbf{Support} \\\\
    \\hline
    """
    )

    # Adicionar linhas para cada entidade
    for entity, metrics in input_json.items():
        if entity.startswith("overall"):
            continue
        precision = round(metrics["precision"], round_num)
        recall = round(metrics["recall"], round_num)
        f1 = round(metrics["f1"], round_num)
        support = metrics["number"]
        tabela_latex += f"{entity} & {precision} & {recall} & {f1} & {support} \\\\\n"

    tabela_latex += "\\hline\n"

    # Adicionar linha para Overall
    overall_precision = round(input_json["overall_precision"], round_num)
    overall_recall = round(input_json["overall_recall"], round_num)
    overall_f1 = round(input_json["overall_f1"], round_num)
    tabela_latex += f"\\textbf{{Overall}} & {overall_precision} & {overall_recall} & {overall_f1} & - \\\\\n"

    tabela_latex += """\\hline
    \\end{tabular}
    \\end{table}"""

    return tabela_latex


print(gerar_tabela_latex_de_avaliacao(input_json_new, round_num=3))

## Gerando Dataset para Treino


In [None]:
# Salve DADOS_AVALIACAO_SINTETICOS com o pandas e em formato Parquet
pd.set_option("max_colwidth", 200)


df_sint = pd.DataFrame(DADOS_AVALIACAO_SINTETICOS, columns=["texto", "labels"])
df_sint.to_parquet("./output/DADOS_AVALIACAO_SINTETICOS.parquet")
df_sint.head(2)

## Casos de Erro


In [19]:
# print(DADOS_AVALIACAO_GERADOS[8:9][0]["text"])

In [20]:
# print(
#     DADOS_AVALIACAO_GERADOS[8:9][0]["inputs_ids"],
#     len(DADOS_AVALIACAO_GERADOS[8:9][0]["inputs_ids"]),

# )



In [None]:
def run_compare(
    dados_sinteticos,
    dados_gerados,
    id2label={},
    filter_by_labels=[],
    original_data=None,
    padding_show=5,
):
    # Get predictions and references
    predictions = [d[1] for d in dados_gerados]
    references = [d[1] for d in dados_sinteticos]

    qnt_total = 0
    # Showing the differences by labels
    for i, (pred, ref) in enumerate(zip(predictions, references)):
        if pred != ref:
            is_first_time = True

            for j, (p, r) in enumerate(zip(pred, ref)):
                # show excerpt from the error with padding of 5 words

                previsa_diff_from_ref = p != r
                is_prev_has_in_filter = id2label[p][2:] in filter_by_labels
                is_ref_has_in_filter = id2label[r][2:] in filter_by_labels
                is_prev_or_ref_has_in_filter = (
                    is_prev_has_in_filter or is_ref_has_in_filter
                )

                if previsa_diff_from_ref and is_prev_or_ref_has_in_filter:
                    if previsa_diff_from_ref and is_first_time:
                        print("\n")
                        print(f"============= Index {i} =============")

                        if original_data is None:
                            print(f"Texto: {' '.join(dados_sinteticos[i][0])}")
                        else:
                            print(f"Texto: {original_data[i]['texto']}")
                        is_first_time = False
                        qnt_total += 1

                    print("\n")
                    print(f"Pred -> Index {j} - Ref: {id2label[p]}")
                    print(f"Ref -> Index {j} - Ref: {id2label[r]}")
                    print(
                        f"Text: {dados_sinteticos[i][0][j-padding_show:j+padding_show]}"
                    )
                    print(
                        f"Text junto: {' '.join(dados_sinteticos[i][0][j-padding_show:j+padding_show])}"
                    )

    if qnt_total > 0:
        print(f"\n\n ========= Quantidade Total: {qnt_total} =========\n\n")


# ==========


d_sin = [
    [
        [
            "Marinalva",
            "Bete",
            "Raz",
            "e",
            "Jorge",
            "Luiz",
            "do",
            "cpf",
            "637",
            ".",
            "841",
            ".",
            "250",
            "-",
            "36",
            ".",
        ],
        [1, 2, 2, 0, 1, 2, 0, 0, 7, 8, 8, 8, 8, 8, 8, 0],
    ],
    [
        [
            "Marinalva",
            "Bete",
            "Raz",
            "e",
            "Jorge",
            "Luiz",
            "do",
            "cpf",
            "637",
            ".",
            "841",
            ".",
            "250",
            "-",
            "36",
            ".",
        ],
        [1, 2, 2, 0, 1, 2, 0, 0, 7, 8, 8, 8, 8, 8, 8, 0],
    ],
]
d_ger = [
    [
        [
            "Marinalva",
            "Bete",
            "Raz",
            "e",
            "Jorge",
            "Luiz",
            "do",
            "cpf",
            "637",
            ".",
            "841",
            ".",
            "250",
            "-",
            "36",
            ".",
        ],
        [1, 2, 2, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
    ],
    [
        [
            "Marinalva",
            "Bete",
            "Raz",
            "e",
            "Jorge",
            "Luiz",
            "do",
            "cpf",
            "637",
            ".",
            "841",
            ".",
            "250",
            "-",
            "36",
            ".",
        ],
        [1, 2, 2, 0, 1, 2, 5, 0, 7, 8, 8, 8, 8, 8, 8, 0],
    ],
]

run_compare(d_sin, d_ger, ID2LABEL, filter_by_labels=["CPF", "ENDERECO"])
# run_compare(d_sin, d_ger, ID2LABEL, filter_by_labels=["ENDERECO"])
# run_compare(
#     d_sin,
#     d_ger,
#     ID2LABEL,
#     filter_by_labels=["CPF", "ENDERECO"],
#     original_data=DADOS_FAKE,
# )

### Analisando os erros


#### CPF


In [None]:
resp_comp = run_compare(
    DADOS_AVALIACAO_SINTETICOS,
    DADOS_AVALIACAO_GERADOS,
    ID2LABEL,
    filter_by_labels=["CPF"],
    original_data=DADOS_FAKE,
    padding_show=1,
)
resp_comp

In [None]:
def get_dados_sinteticos_from_index(dados=DADOS_FAKE, index=0):
    dados_ind = dados[index]
    texto = dados_ind["texto"]

    dados_sint_ind_prev = convert_transformer_to_dict(model_fun(texto))
    dados_sint_ind_ref = json.loads(dados_ind["dados_sinteticos"].replace("'", '"'))
    # dados_sint_ind_ref = json.loads(
    #     dados_ind["dados_sinteticos_merged"].replace("'", '"')
    # )

    return texto, dados_sint_ind_ref, dados_sint_ind_prev


# ===============
get_dados_sinteticos_from_index(index=21)

In [None]:
get_dados_sinteticos_from_index(index=21)

In [25]:
# get_dados_sinteticos_from_index(index=601)

#### TELEFONE


In [None]:
resp_comp = run_compare(
    DADOS_AVALIACAO_SINTETICOS,
    DADOS_AVALIACAO_GERADOS,
    ID2LABEL,
    filter_by_labels=["TELEFONE"],
    original_data=DADOS_FAKE,
    padding_show=1,
)
resp_comp

In [27]:
# get_dados_sinteticos_from_index(index=139)

#### CEP


In [28]:
# resp_comp = run_compare(
#     DADOS_AVALIACAO_SINTETICOS,
#     DADOS_AVALIACAO_GERADOS,
#     ID2LABEL,
#     filter_by_labels=["CEP"],
#     original_data=DADOS_FAKE,
#     padding_show=1,
# )
# resp_comp

In [29]:
# get_dados_sinteticos_from_index(index=235)

#### ENDERECO


In [None]:
resp_comp = run_compare(
    DADOS_AVALIACAO_SINTETICOS,
    DADOS_AVALIACAO_GERADOS,
    ID2LABEL,
    filter_by_labels=["ENDERECO"],
    original_data=DADOS_FAKE,
    padding_show=1,
)
resp_comp

In [None]:
get_dados_sinteticos_from_index(index=6)

In [32]:
# get_dados_sinteticos_from_index(index=72)