In [7]:
import spacy
import random
from spacy.util import minibatch, compounding
from pathlib import Path

In [8]:
nlp = spacy.load("pt_core_news_sm")

In [9]:

def train_model(data_dict, iterations):
    nlp = spacy.blank("pt")

    nlp.add_pipe("ner", name="ner", last=True)

    for label in data_dict.keys():
        nlp.get_pipe("ner").add_label(label)

    train_data = []
    for label, examples in data_dict.items():
        for text, annotations in examples:
            train_data.append((text, annotations))

    nlp.begin_training()
    for itn in range(iterations):
        random.shuffle(train_data)
        losses = {}
        
        batches = minibatch(train_data, size=compounding(4.0, 32.0, 1.001))
        for batch in batches:
            texts, annotations = zip(*batch)
            example_batch = []
            for text, annotation in zip(texts, annotations):
                doc = nlp.make_doc(text)
                example = spacy.training.example.Example.from_dict(doc, annotation)
                example_batch.append(example)
            nlp.update(example_batch, losses=losses)
        
        print("Iteration:", itn+1, "Loss:", losses)

    return nlp

def save_model(model, output_dir):
    output_dir = Path(output_dir)
    if not output_dir.exists():
        output_dir.mkdir()
    model_path = output_dir / "NER_model"
    model.to_disk(model_path)
    print("Model saved to:", model_path)


#### Exemplo de dados dos parâmetros:

In [29]:
data_dict = entidades = {
    "BOMBA": [
    ("Bomba centrífuga", {"entities": [(0, 5, "BOMBA")]}),
    ]}

#### Preparando os dados

In [3]:
import pandas as pd
import json
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from spacy.lang.pt.stop_words import STOP_WORDS
from nltk.corpus import wordnet
from nltk.stem.snowball import SnowballStemmer
from re import sub
from nltk import download
download('punkt')
download('stopwords')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\herik\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\herik\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [1]:
def create_json(json_path, content):
    with open(json_path, 'w', encoding='utf-8') as f:
        json.dump(content, f, indent=4)


def load_json(file):
    with open(file, 'r', encoding='utf-8') as f:
        data = json.load(f)
    return data

def load_df(file):
    df = pd.read_excel(file)
    return df

def remove_punct(text):
    text = sub(r"[!#$%&'()*+,-./:;<=>?@[^_`{|}~]+", ' ',text)
    text = sub(r'\s+', ' ',text)
    return text

def extract_keywords(text):
    tokens = word_tokenize(text)
    keywords = []
    for word in tokens:
        word = word.lower()
        if word not in stopwords.words('portuguese') or word.lower() not in STOP_WORDS:
            keywords.append(word)
    keywords = ' '.join(keywords)
    return keywords

def preprocess_stem(text):
    stemmer = SnowballStemmer("portuguese")
    stems = []
    tokens = word_tokenize(text)
    for token in tokens:
        stems.append(stemmer.stem(token))
    stems = ' '.join(stems)
    return str(stems)

def remove_accent(text):
    text = sub('[áàãâä]', 'a', sub('[éèêë]', 'e', sub('[íìîï]', 'i', sub('[óòõôö]', 'o', sub('[úùûü]', 'u', text)))))
    text = sub(r'\s+', ' ',text)
    return text

def preprocess(text):
    text = remove_punct(text)
    text = extract_keywords(text)
    text = preprocess_stem(text)
    text = remove_accent(text)
    return text

In [3]:
import re
import pandas as pd

def encontrar_palavras(texto, frases):
    resultado = []
    
    for frase in frases:
        padrao = re.compile(r'\b{}\b'.format(re.escape(frase)))
        correspondencias = padrao.finditer(texto)
        
        for correspondencia in correspondencias:
            dicionario = {
                "text": frase,
                "startc": correspondencia.start(),
                "endc": correspondencia.end() - 1,
                "startp": len(re.findall(r'\b\w+\b', texto[:correspondencia.start()])),
                "endp": (len(re.findall(r'\b\w+\b', texto[:correspondencia.start()])) + len(frase.split())) - 1
            }
            resultado.append(dicionario)
    return resultado

In [4]:
data = load_json(r'..\documents\entidades2.json')
df = load_df('..\documents\classes.xlsx')

In [10]:
new_data = []
words = []
for idx, row in df.iterrows():
    classe = idx + 1
    try:
        if data[idx]['classe'] == classe:
            list_words = row['keywords'].split(',')
            for i in range(len(list_words)):
                list_words[i] = preprocess(list_words[i]).strip()
            words.append(list_words)
    except IndexError:
        break

new_dict = {}
for i in range(len(data)):
    list_tuple = []
    for texto in data[i]["texts"]:
        texto = preprocess(texto).strip()
        list_find_word = encontrar_palavras(texto,words[i])
        list_find = []
        for j in list_find_word:
            list_word_found = [j['startc'],j['endc']+1,str(data[i]['classe'])]
            list_find.append(list_word_found)
        list_tuple.append((texto, {"entities": list_find}))
    new_dict[str(data[i]['classe'])] = list_tuple
        # dict_ = encontrar_palavras(texto,words)
        # new_dict[f'texto{cont}'] = texto
        # new_dict['entities'] = dict_
        # cont += 1
        # new_data.append(new_dict)
        # print(new_data)
    create_json(r'..\documents\entidades3-classify.json',new_dict)

#### Treinando o modelo

In [34]:
def corrigir_sobreposicao_entidades(data_dict):
    for label, examples in data_dict.items():
        for i in range(len(examples)):
            entities = examples[i][1]["entities"]
            entities_sorted = sorted(entities, key=lambda x: x[1] - x[0], reverse=True)
            entities_filtered = []
            for j in range(len(entities_sorted)):
                entity = entities_sorted[j]
                entity_start = entity[0]
                entity_end = entity[1]
                entity_label = entity[2]
                
                entity_exists = False
                for existing_entity in entities_filtered:
                    if (existing_entity[0] <= entity_start <= existing_entity[1]) or (existing_entity[0] <= entity_end <= existing_entity[1]):
                        entity_exists = True
                        break
                
                if not entity_exists:
                    entities_filtered.append(entity)
            
            data_dict[label][i][1]["entities"] = entities_filtered
    
    return data_dict

def verificar_intervalos(lista):
    spans = []
    for intervalo_atual in lista:
        start, end, label = intervalo_atual
        span_atual = (start, end, label)
        overlap = False
        for span_existente in spans:
            if span_existente[0] <= start < span_existente[1] or span_existente[0] < end <= span_existente[1]:
                overlap = True
                break
        if not overlap:
            spans.append(span_atual)
    lista_corrigida = [[start, end, label] for start, end, label in spans]
    return lista_corrigida

def prepara(dict_):
    for i in dict_.values():
        for y in i:
            for j in y[1].values():
                new_value = verificar_intervalos(j)
                y[1]['entities'] = new_value
    return dict_

In [35]:
data_train = load_json(r'..\documents\entidades3-classify.json')

# data_dict = corrigir_sobreposicao_entidades(data_train)
data_dict = prepara(data_train)

iterations = 100
model = train_model(data_dict, iterations)
output_dir = r"model"
save_model(model, output_dir)



Iteration: 1 Loss: {'ner': 1536.3399661156807}
Iteration: 2 Loss: {'ner': 419.8025585498161}
Iteration: 3 Loss: {'ner': 156.15117192780036}
Iteration: 4 Loss: {'ner': 101.82290648591245}
Iteration: 5 Loss: {'ner': 115.35646462280877}
Iteration: 6 Loss: {'ner': 79.60682365473156}
Iteration: 7 Loss: {'ner': 43.26848835395674}
Iteration: 8 Loss: {'ner': 41.29418882552955}
Iteration: 9 Loss: {'ner': 67.43128388256821}
Iteration: 10 Loss: {'ner': 56.58299198088355}
Iteration: 11 Loss: {'ner': 35.968468110855575}
Iteration: 12 Loss: {'ner': 37.99214003382879}
Iteration: 13 Loss: {'ner': 16.801876325791074}
Iteration: 14 Loss: {'ner': 28.310037982854542}
Iteration: 15 Loss: {'ner': 50.227962798081805}
Iteration: 16 Loss: {'ner': 43.660531579426326}
Iteration: 17 Loss: {'ner': 38.59656687821507}
Iteration: 18 Loss: {'ner': 31.883199426954974}
Iteration: 19 Loss: {'ner': 21.192720807780358}
Iteration: 20 Loss: {'ner': 16.18484542013092}
Iteration: 21 Loss: {'ner': 18.0075120131861}
Iteration: 2

#### Testando o modelo

In [5]:
def classifier(texto):
    diretorio_modelo = r'NER_model'
    nlp = spacy.load(diretorio_modelo)
    doc = nlp(texto)
    labels = [entidade.label_ for entidade in doc.ents]
    return labels


In [11]:
data_teste = load_json(r'..\documents\entidades2_teste.json')

acertos = 0
erros = 0
for i in data_teste:
    for j in i['texts']:
        text = preprocess(j)
        result = classifier(text)
        try:
            if str(result[0]) == str(i['classe']):
                acertos += 1
            else:
                erros += 1
        except IndexError:
            erros += 1


print('acertou:',acertos)
print('errou:',erros)
print('precisão:',(acertos/(acertos+erros))*100)

acertou: 472
errou: 60
precisão: 88.7218045112782
