In [4]:
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.corpus import stopwords
from spacy.lang.pt.stop_words import STOP_WORDS
from nltk.corpus import wordnet
from nltk.stem.snowball import SnowballStemmer
from nltk.stem import WordNetLemmatizer
from re import sub
from nltk import download
download('punkt')
download('stopwords')

import json
import os
from re import compile, findall, escape
import re
import pandas as pd

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\herik\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\herik\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [8]:
def remove_num(text):
    text = sub(r'\d+', '', text)
    text = sub(r'\s+', ' ',text)
    return text

def remove_punct(text):
    text = sub(r"[!#$%&'()*+,-./:;<=>?@[^_`{|}~]+", ' ',text)
    text = sub(r'\s+', ' ',text)
    return text

def extract_keywords(text):
    tokens = word_tokenize(text)
    keywords = []
    for word in tokens:
        word = word.lower()
        if word not in stopwords.words('portuguese') or word.lower() not in STOP_WORDS:
            keywords.append(word)
    return ' '.join(keywords)

def get_synonyms(text):
    tokens = word_tokenize(text)
    synonyms = []
    for word in tokens:
        for syn in wordnet.synsets(word, lang="por"):
            for lemma in syn.lemmas(lang="por"):
                synonyms.append(lemma.name())
    return synonyms

def remove_accent(text):
    text = sub('[áàãâä]', 'a', sub('[éèêë]', 'e', sub('[íìîï]', 'i', sub('[óòõôö]', 'o', sub('[úùûü]', 'u', text)))))
    text = sub(r'\s+', ' ',text)
    return text

def preprocess_lemma(text):
    lemmatizer = WordNetLemmatizer()
    tokens = word_tokenize(text)
    lemmas = []
    for token in tokens:
        lemmas.append(lemmatizer.lemmatize(token))
    lemmas = ' '.join(lemmas)
    return lemmas

def preprocess_stem(text):
    stemmer = SnowballStemmer("portuguese")
    tokens = word_tokenize(text)
    stems = []
    for token in tokens:
        stems.append(stemmer.stem(token))
    stems = ' '.join(stems)
    return stems

def preprocess(text, tipo=None):
    text = remove_punct(text)
    text = remove_num(text)
    text = extract_keywords(text)
    if tipo == 'lemma':
        text = preprocess_lemma(text)
    elif tipo == 'stem':
        text = preprocess_stem(text)
    else:
        pass
    text = remove_accent(text)
    return text

def load_txt(path):
    with open(path,'r', encoding='utf-8') as file:
        linhas = file.readlines()
        lines = []
        for linha in linhas:
            linha = linha.strip()
            if linha != '':
                lines.append(linha)
    return lines

In [9]:
def create_json(json_path, content):
    if os.path.isfile(json_path):
        with open(json_path, 'r+', encoding='utf-8') as f:
            data = json.load(f)
            data.append(content)
            f.seek(0)
            json.dump(data, f, indent=4)
    else:
        with open(json_path, 'w', encoding='utf-8') as f:
            data = [content]
            json.dump(data, f, indent=4)

def check_intervals(lst):
    spans = []
    for current_interval in lst:
        start, end, label = current_interval
        current_span = (start, end, label)
        overlap = False
        for existing_span in spans:
            if existing_span[0] <= start < existing_span[1] or existing_span[0] < end <= existing_span[1]:
                overlap = True
                break
        if not overlap:
            spans.append(current_span)
    corrected_list = [[start, end, label] for start, end, label in spans]
    return corrected_list

def find_words(text, find_tokens):
    result = []
    for token in find_tokens:
        pattern = compile(r'\b{}\b'.format(escape(token)))
        matches = pattern.finditer(text)
        for match in matches:
            dictionary = {
                "text": token,
                "start_index": match.start(),
                "end_index": match.end() - 1,
                "start_position": len(findall(r'\b\w+\b', text[:match.start()])),
                "end_position": (len(findall(r'\b\w+\b', text[:match.start()])) + len(token.split())) - 1
            }
            result.append(dictionary)
    return result

def preparing_data(df, column_label, column_keyword, path, labels_with_texts):
    words = []
    for idx, row in df.iterrows():
        list_words = re.split(r',|;|.', row[column_keyword])
        for i in range(len(list_words)):
            list_words[i] = preprocess(list_words[i],'lemma').strip()
        words.append(list_words)
    dict_train = {}
    for i in range(len(labels_with_texts)):
        list_tuple = []
        for text in labels_with_texts[i]["texts"]:
            text = preprocess(text).strip()
            list_find_word = find_words(text,words[i])
            list_find = []
            for j in list_find_word:
                list_word_found = [j['start_index'],j['end_index']+1,str(labels_with_texts[i][column_label])]
                list_find.append(list_word_found)
            list_tuple.append((text, {"entities": list_find}))
        dict_train[str(labels_with_texts[i][column_label])] = list_tuple

        for values in dict_train.values():
            for items in values:
                for inner_values in items[1].values():
                    new_value = check_intervals(inner_values)
                    items[1]['entities'] = new_value

        create_json(path, dict_train)



def criar_dataframe(classe, lista):
    data = {'labels': [classe] * len(lista),
            'samples': lista}
    df = pd.DataFrame(data)
    return df

def concat_df(list_df):
    df_concated = pd.concat(list_df, axis=0)
    return df_concated

def preparing_data(path):
    list_df = []
    count = 0
    for nome_arquivo in os.listdir(path):
        file = f'{path}\{nome_arquivo}'
        lines = load_txt(file)
        if count < 10:
            label = 'bombas'
        elif count > 9 and count < 19:
            label = 'rolamentos'
        elif count > 18 and count < 31:
            label = 'válvulas'
        elif count > 30 and count < 43:
            label = 'acionamentos por corrente'
        elif count > 42 and count < 46:
            label = 'caixas de engrenagens'
        elif count > 45 and count < 54:
            label = 'Sistemas de óleo lubrificante'
        elif count > 53 and count < 70:
            label = 'Acionamentos por correia em V'
        elif count > 69 and count < 74:
            label = 'Sistemas de ventiladores'
        elif count > 73 and count < 79:
            label = 'Purgadores de vapor'
        elif count > 78 and count < 105:
            label = 'Motores elétricos'
        elif count > 104 and count < 107:
            label = 'Contatos elétricos'
        elif count > 106 and count < 110:
            label = 'Disjuntores elétricos de caixa moldada'
        elif count > 109 and count < 113:
            label = 'Circuito magnético'
        elif count > 112 and count < 116:
            label = 'Circuito dielétrico'
        count += 1
        df = criar_dataframe(label,lines)
        list_df.append(df)
    new_df = concat_df(list_df)
    return new_df
        

In [10]:
a = preparing_data(r'..\database\problems\problemas txt')
a

Unnamed: 0,labels,samples
0,bombas,A bomba está superaquecendo.
1,bombas,A bomba apresenta temperatura elevada.
2,bombas,A bomba está esquentando demais.
3,bombas,O motor da bomba está superaquecido.
4,bombas,A bomba está piscando e emitindo calor excessivo.
...,...,...
195,Circuito dielétrico,Motor elétrico não entra em operação após ser ...
196,Circuito dielétrico,Falha no arranque do motor elétrico
197,Circuito dielétrico,Motor não liga mesmo após receber energia
198,Circuito dielétrico,Dificuldade em acionar o motor elétrico para a...


In [7]:
!python -m spacy download "pt_core_news_sm"
from spacy.util import minibatch, compounding
from spacy import blank, training, load
from pathlib import Path
import random
nlp = load("pt_core_news_sm")

You should consider upgrading via the 'c:\Users\Semeq\Desktop\Chatbot\.venv\Scripts\python.exe -m pip install --upgrade pip' command.


Collecting pt-core-news-sm==3.5.0
  Downloading https://github.com/explosion/spacy-models/releases/download/pt_core_news_sm-3.5.0/pt_core_news_sm-3.5.0-py3-none-any.whl (13.0 MB)
Installing collected packages: pt-core-news-sm
Successfully installed pt-core-news-sm-3.5.0
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('pt_core_news_sm')


In [8]:
def save_model(model, path_model):
    path_model = Path(path_model)
    if not path_model.exists():
        path_model.mkdir()
    model_path = path_model
    model.to_disk(model_path)
    print("Model saved to:", model_path)

def train_model(data_dict, epochs, path_model):
    nlp = blank("pt")
    nlp.add_pipe("ner", name="ner", last=True)
    for label in data_dict.keys():
        nlp.get_pipe("ner").add_label(label)
    train_data = []
    for label, examples in data_dict.items():
        for text, annotations in examples:
            train_data.append((text, annotations))
    nlp.begin_training()
    for itn in range(epochs):
        random.shuffle(train_data)
        losses = {}
        batches = minibatch(train_data, size=compounding(4.0, 32.0, 1.001))
        for batch in batches:
            texts, annotations = zip(*batch)
            example_batch = []
            for text, annotation in zip(texts, annotations):
                doc = nlp.make_doc(text)
                example = training.example.Example.from_dict(doc, annotation)
                example_batch.append(example)
            nlp.update(example_batch, losses=losses)
        print("Epoch:", itn+1, "Loss:", losses)
    save_model(nlp, path_model)