In [None]:
ARQUIVO_DESTINO_GRAFO = 'results/grafo_interativo_v2.html'
PLANILHA_CONTAGEM_TOKENS = 'results/planilha_tokens_total.xlsx'

PROMPT_FILE_1 = "prompts/prompt_2.txt"
PROMPT_FILE_2 = "prompts/prompt.txt"

ANALYSED_FILE_PATH = "legislation/L14133.html"

SPACY_MODEL_NAME = "pt_core_news_lg"
EMBEDING_MODEL_ID = "ricardo-filho/bert-base-portuguese-cased-nli-assin-2"
LANGUAGE_MODEL_ID = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"

QUERY_INICIAL = "licitação"


In [2]:
import torch
import textwrap
import spacy
import re
import os
import pandas as pd
import numpy as np
import nltk
import openpyxl
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from tqdm.auto import tqdm
from sentence_transformers import SentenceTransformer, util
from pyvis.network import Network
from nltk.util import ngrams
from nltk.corpus import stopwords
from nltk import word_tokenize
from collections import Counter
from bs4 import BeautifulSoup
tqdm.pandas()

# !python -m spacy download pt_core_news_lg

# !pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
def roman_to_int(roman):
    roman_numerals = {
        'I': 1, 'IV': 4, 'V': 5, 'IX': 9, 'X': 10, 'XL': 40, 'L': 50,
        'XC': 90, 'C': 100, 'CD': 400, 'D': 500, 'CM': 900, 'M': 1000
    }
    i, num = 0, 0
    while i < len(roman):
        if i + 1 < len(roman) and roman[i:i+2] in roman_numerals:
            num += roman_numerals[roman[i:i+2]]
            i += 2
        else:
            num += roman_numerals[roman[i]]
            i += 1
    return num


In [4]:
def text_formatter(text: str) -> str:

    cleaned_text = re.sub(r"\.+", ".", text)
    cleaned_text = re.sub(r"\ +", " ", cleaned_text)
    cleaned_text = (
        cleaned_text
        .replace("\t", " ")
        .replace("\n", " ")
        .replace("º", "o - ")
        .replace("", " ")
        .replace("", "")
        .replace(" ", " ")
        .replace("§§", "parágrafos")
        .replace("§", "parágrafo")
        .replace("art.", "artigo")
        .replace("Artigo", "artigo")
        .replace("Art.", "artigo")
        .replace("arts.", "artigos")
        .replace("Parágrafo único.", "parágrafo 1o -")
        .replace(" .", ".")
        .strip()
    )
    cleaned_text = re.sub(r"\.+", ".", cleaned_text)
    cleaned_text = re.sub(r"\ +", " ", cleaned_text)
    cleaned_text = re.sub(r"artigo (\d+)\.", r"artigo \1o - ", cleaned_text)
    cleaned_text = re.sub(r"parágrafo (\d+)\.", r"parágrafo \1o - ", cleaned_text)
    cleaned_text = re.sub(r"^([IVXLCDM]+)\s*-\s*", lambda m:  "inciso " + str(roman_to_int(m.group(1))) + "o - ", cleaned_text, flags=re.M)
    cleaned_text = re.sub(r"([IVXLCDM]+)\s*-\s*", lambda m: "inciso " + str(roman_to_int(m.group(1))) + "o", cleaned_text)
    cleaned_text = re.sub(r"^([a-zA-Z])\)\s*", r"alinea \1o - ", cleaned_text, flags=re.M)

    cleaned_text = cleaned_text.replace("desta Lei", "Da Lei de Licitações e Contratos Administrativos")
    cleaned_text = cleaned_text.replace("Esta Lei", "A Lei de Licitações e Contratos Administrativos")
    cleaned_text = cleaned_text.replace("esta Lei", "A Lei de Licitações e Contratos Administrativos")

    if ("VETADO" in cleaned_text):
        return ""

    return cleaned_text


In [5]:
def open_and_read_html(html_path: str) -> list[dict]:

    if not os.path.exists(html_path):
        raise FileNotFoundError(f"Arquivo não encontrado: {html_path}")

    with open(html_path, "r", encoding="latin-1") as file:
        soup = BeautifulSoup(file, "html.parser", from_encoding="latin-1")

    [x.extract() for x in soup.findAll('blockquote')]

    sections_and_texts = []

    paragraphs = [p for p in soup.find_all("p") if p.get("align") != "center"]


    for paragraph in tqdm(paragraphs, desc="Processing HTML Sections"):
        text = paragraph.get_text()
        text = text_formatter(text)

        if text:
            sections_and_texts.append(text)

    return sections_and_texts


sections_and_texts = open_and_read_html(html_path=ANALYSED_FILE_PATH)


  [x.extract() for x in soup.findAll('blockquote')]
Processing HTML Sections: 100%|██████████| 1476/1476 [00:00<00:00, 30092.27it/s]


In [6]:

def consolidar_trechos_hierarquicos(sections_and_texts):


    def limpar_texto(texto):
        # Remove "artigo X -", "inciso X -" e "alinea X -"
        texto = re.sub(r'\b(artigo|parágrafo|inciso)\s*\d+[a-z]*\s*-\s*', '', texto, flags=re.IGNORECASE)
        # Remove "alinea X -"
        texto = re.sub(r'\balinea\s*[a-z]+\s*-\s*', '', texto, flags=re.IGNORECASE)
        # Remove espaços extras
        texto = re.sub(r'\s+', ' ', texto).strip()
        return texto


    def extrair_trecho(texto):
        # Expressão regular para encontrar os padrões de "artigo", "parágrafo", "inciso" e "alínea"
        padrao = r'(artigo \d+o|parágrafo \d+o|inciso \d+o|alinea [a-z])'

        # Encontrar todas as ocorrências do padrão no texto
        ocorrencias = re.findall(padrao, texto)

        # Unir as ocorrências em uma string com o formato desejado
        resultado = ' - '.join(ocorrencias)

        return resultado


    total_list = []
    articles_list = []
    current_section = []
    sentence_groups = []
    artigo_caput_holder = ""
    core_holder = ["", "", "", ""]


    for item in sections_and_texts:
        item_string = str(item)
        # Verifica se o texto contém "Art."
        if (item_string.startswith("artigo")):
            core_holder[0] = item_string
            core_holder[1] = "caput"
            for valor in range(0+1, 4):
                core_holder[valor] = ""
        elif (item_string.startswith("parágrafo")):
            core_holder[1] = item_string
            for valor in range(0+2, 4):
                core_holder[valor] = ""
        elif (item_string.startswith("inciso")):
            core_holder[2] = item_string
            for valor in range(0+3, 4):
                core_holder[valor] = ""
        elif (item_string.startswith("alinea")):
            core_holder[3] = item_string
            # if current_section:
            #     articles_list.append(current_section)  # Salva a seção atual
            # current_section = []  # Inicia uma nova seção
        if (core_holder[0]):
            return_string = core_holder[0]
            if (core_holder[1]):
                return_string = return_string+" "+core_holder[1]
            if (core_holder[2]):
                return_string = return_string+" "+core_holder[2]
            if (core_holder[3]):
                return_string = return_string+" "+core_holder[3]

            if (not (return_string.endswith(":"))):

                texto = limpar_texto(return_string)
                reference = extrair_trecho(return_string)

                articles_list.append({"info_reference": reference, "info_result": texto})

    return pd.DataFrame(articles_list).drop_duplicates('info_reference').drop_duplicates('info_result')

grouped_sentences = consolidar_trechos_hierarquicos(sections_and_texts)





In [7]:
class EmbeddingModelSingleton:
    _instance = None

    @classmethod
    def get_instance(cls):
        if cls._instance is None:
            cls._instance = SentenceTransformer(
                EMBEDING_MODEL_ID, 
                device="cuda")
        return cls._instance

In [8]:
def generate_embeddings(grouped_sentences,column):

    embedding_model = EmbeddingModelSingleton.get_instance()
    
    return grouped_sentences.progress_apply(lambda x: embedding_model.encode(x[column]), axis=1)
 
def save_embeddings_to_csv(grouped_sentences):
    
    grouped_sentences['embedding'] = generate_embeddings(grouped_sentences,'info_result')
    
    return grouped_sentences

grouped_sentences_embedding = save_embeddings_to_csv(grouped_sentences)

100%|██████████| 1217/1217 [00:12<00:00, 93.74it/s]


In [9]:
def generate_ngrams(grouped_sentences):

    nltk.download('punkt')
    nltk.download('punkt_tab')
    nltk.download('stopwords')

    stop_words = set(stopwords.words('portuguese'))

    texto = " ".join(grouped_sentences['info_result'].to_list())
    tokens = word_tokenize(texto.lower())

    tokens = [word for word in tokens if word.isalnum() and word not in stop_words]

    for index,match_type in enumerate(['Bigrama','Trigrama','Quadrigrama']):

        ngram = list(ngrams(tokens, index+2))

        df_ngrams = pd.DataFrame(Counter(ngram).items(), columns=[match_type, 'Frequência'])

        df_ngrams = df_ngrams.sort_values(by='Frequência', ascending=False)

        print(f"\n{match_type} mais comuns:\n", df_ngrams)

generate_ngrams(grouped_sentences)

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Augusto_IN3\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\Augusto_IN3\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Augusto_IN3\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!



Bigrama mais comuns:
                             Bigrama  Frequência
1           (licitações, contratos)         492
0                 (lei, licitações)         466
2      (contratos, administrativos)         466
154                 (deste, artigo)         206
269                  (caput, deste)         150
...                             ...         ...
15439                    (108, lei)           1
15440                  (1993, data)           1
15442                  (2011, após)           1
15443            (após, decorridos)           1
15444               (decorridos, 2)           1

[15469 rows x 2 columns]

Trigrama mais comuns:
                                        Trigrama  Frequência
1      (licitações, contratos, administrativos)         466
0                  (lei, licitações, contratos)         466
297                      (caput, deste, artigo)         150
429                     (fins, lei, licitações)         110
325               (obras, serviços, engenharia)    

In [10]:
def generate_morph_df(grouped_sentences):

    spacy_nlp = spacy.load(SPACY_MODEL_NAME)

    morfologia_list = []

    # Process each sentence
    for sentence in grouped_sentences['info_result']:
        doc = spacy_nlp(str(sentence).lower())
        for token in doc:
            result = {
                'text': token.text, 
                'lemma': token.lemma_, 
                'pos': token.pos_, 
                'dep': token.dep_, 
                'headText': token.head.text, 
                'headLemma': token.head.lemma_,
                'headDep': token.head.dep_, 
                'headPOS': token.head.pos_, 
                'TokenChildren': [child for child in token.children]} 

            morfologia_list.append(result)

    return pd.DataFrame(morfologia_list)

morph_df = generate_morph_df(grouped_sentences)


In [11]:
def generate_network_graph(lemma_list,morph_df):


    children_list = (morph_df[morph_df['headLemma'].isin(lemma_list)])[['text', 'lemma', 'dep', 'headText', 'headDep', 'pos', 'headLemma']].drop_duplicates()
    children_list = children_list[(children_list['pos'].isin(['VERB']))].head(20)
    return_list = children_list

    campo_1 = 'lemma'
    campo_2 = 'headLemma'

    return_list = return_list[[campo_1, campo_2]].drop_duplicates()

    # Criando o grafo interativo
    net = Network(height='1200px', width='100%', directed=False)

    # Processando o texto linha por linha
    for _, linha in return_list.iterrows():
        net.add_node(linha[campo_1])
        net.add_node(linha[campo_2])
        net.add_edge(linha[campo_1], linha[campo_2])

    # Gerando o arquivo HTML
    net.write_html(ARQUIVO_DESTINO_GRAFO)

lemma_list = ['licitação', 'licitante']

generate_network_graph(lemma_list,morph_df)

In [12]:
def generate_morph_count(morph_df):
    morph_df['count'] = 1

    morph_df = morph_df[~(morph_df['pos'].isin(['CCONJ', 'DET', 'NUM', 'ADP', 'PUNCT', 'SCONJ', 'PRON']))]

    resulting_morph = morph_df.groupby(['lemma', 'pos']).agg(
        distinct_text=('text', lambda x: ', '.join(x.unique())),
        count=('count', 'sum')
    ).reset_index().sort_values('count', ascending=False)

    resulting_morph.to_excel(PLANILHA_CONTAGEM_TOKENS, index=False)

generate_morph_count(morph_df)

In [13]:
def print_wrapped(text, wrap_length=80):
    wrapped_text = textwrap.fill(text, wrap_length)
    print(wrapped_text)


In [14]:
def retrieve_relevant_resources(query: str,
                                embeddings_list: list,
                                n_resources_to_return: int = 5,
                                print_time: bool = False):
    

    embeddings = torch.tensor(np.array(embeddings_list), dtype=torch.float32).to('cuda')

    embedding_model = EmbeddingModelSingleton.get_instance()

    query_embedding = embedding_model.encode(query, convert_to_tensor=True)

    dot_scores = util.dot_score(query_embedding, embeddings)[0]

    scores, indices = torch.topk(input=dot_scores,
                                 k=n_resources_to_return)

    return scores, indices.tolist()

In [15]:
def setup_llm_model():


    tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=LANGUAGE_MODEL_ID)
    
    quantization_config = BitsAndBytesConfig(load_in_4bit=True,bnb_4bit_compute_dtype=torch.float16)

    llm_model = AutoModelForCausalLM.from_pretrained(pretrained_model_name_or_path=LANGUAGE_MODEL_ID,
                                                    torch_dtype=torch.float16,
                                                    quantization_config=quantization_config,
                                                    low_cpu_mem_usage=True,
                                                    attn_implementation="sdpa")

    return {'tokenizer':tokenizer,'llm_model':llm_model}

llm_model_dict = setup_llm_model()

Loading checkpoint shards: 100%|██████████| 2/2 [00:25<00:00, 12.95s/it]


In [16]:
def prompt_formatter(query: str,
                     tokenizer,
                     context_items: list | str = None) -> str:
    
    if isinstance(context_items, list):    
        context = "- "+"\n- ".join(context_items)
        prompt_file = PROMPT_FILE_1

    else:
        context = context_items
        prompt_file = PROMPT_FILE_2
        

    with open(prompt_file, 'r',encoding='utf-8') as file:
        base_prompt = "\n"+file.read()+"\n"

    base_prompt = base_prompt.format(context=context, query=query)

    dialogue_template = [
        {"role": "user",
        "content": base_prompt}
    ]

    prompt = tokenizer.apply_chat_template(conversation=dialogue_template,
                                        tokenize=False,
                                        add_generation_prompt=True)

    inputs = tokenizer(prompt, return_tensors="pt", padding=True)

    attention_mask = inputs["attention_mask"].to('cuda')
    
    return (prompt, attention_mask)

In [17]:


def ask(query,
        llm_model_dict,
        grouped_sentences_embedding=None,
        context=None,
        temperature=0.7,
        max_new_tokens=512,
        n_resources_to_return=5):


    if(not context):

        scores, indices = retrieve_relevant_resources(query=query,n_resources_to_return=n_resources_to_return,embeddings_list=grouped_sentences_embedding["embedding"].tolist())

        context_items = [grouped_sentences_embedding.iloc[i]["info_result"] for i in indices]

    else:
        context_items = context
    
    
    prompt, attention_mask = prompt_formatter(query=query,tokenizer = llm_model_dict['tokenizer'],context_items=context_items)
        

    print(f"[PROMPT]\n")
    print(prompt)

    prompt_tokens =  llm_model_dict['tokenizer'](prompt, return_tensors="pt").to('cuda')

    prompt_tokens = prompt_tokens["input_ids"]

    prompt_length = len(prompt_tokens[0])

    print("Total prompt length:", prompt_length)

    start_index = prompt_tokens.shape[-1]

    output =  llm_model_dict['llm_model'].generate(prompt_tokens,
                                max_new_tokens=max_new_tokens+prompt_length,
                                pad_token_id=llm_model_dict['tokenizer'].eos_token_id,
                                attention_mask=attention_mask
                                # num_return_sequences=2,
                                # do_sample=True,
                                # temperature=temperature,
                                # top_p=0.95,
                                # top_k=50s
                                )

    generation_output = output

    print(len(output[0][start_index:]))
    generation_text = llm_model_dict['tokenizer'].decode(output[0][start_index:], skip_special_tokens=True)

    print(f"[RESULTADO]\n")
    print(generation_text)

    # print(generation_text)

    return generation_text, context

In [None]:

answer, context = ask(query=QUERY_INICIAL,
                      llm_model_dict=llm_model_dict,
                      grouped_sentences_embedding=grouped_sentences_embedding,
                      temperature=0.8,
                      n_resources_to_return=5,
                      max_new_tokens=1024
                      )

answer2, context2 = ask(query=QUERY_INICIAL,
                        llm_model_dict=llm_model_dict,
                        context=answer.split("</think>")[1],
                        temperature=0.8,
                        max_new_tokens=2048
                        )

[PROMPT]

<｜begin▁of▁sentence｜><｜User｜>
### Quem é você:
- Você é um modelo de linguagem avançado especializado em extrair informações de texto para modelagem conceitual. 
- Sua tarefa é analisar o texto fornecido e extrair as informações disponíveis **exclusivamente do contexto apresentado**.

### Instruções:
1. Identifique todas as informações **explícitas ou implicitamente relacionadas** ao termo "licitação" no texto abaixo.
2. Liste cada informação relevante no formato:
- licitação - [Informação específica]
3. **Não inclua suposições ou conhecimentos externos**.
4. Caso o termo não seja mencionado ou não haja informações relacionadas, retorne exatamente: "sem informações encontradas".

### Exemplo Orientativo:
Texto: "Os contratos devem seguir normas específicas e são supervisionados pelos gestores."
Termo: "contratos"
Resposta: 
- contratos - Devem seguir normas específicas
- contratos - São supervisionados pelos gestores

### Sua tarefa
Com base no texto abaixo, faça a extração d