In [1]:
import re

import json

import os

In [2]:
DATA_PATH="data/"

In [3]:
json_normas_path = DATA_PATH + 'normas_full.json'

with open(json_normas_path, 'r') as file:
    normas_json = json.load(file)

normas = list(normas_json.values())

In [None]:
normas_json.keys()

In [None]:
normas_json['RIR2018.txt']

## Creating an hierarchical splitter

Using as base the code in https://github.com/langchain-ai/langchain/blob/master/libs/text-splitters/langchain_text_splitters/character.py

In [4]:
from typing import Any, List, Literal, Optional, Union
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [17]:
class HierachicalRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):

    def __init__(
        self,
        separators: Optional[List[str]] = None,
        keep_separator: Union[bool, Literal["start", "end"]] = True,
        is_separator_regex: bool = False,
        apply_chunk_size: Optional[int] = 0,        
        **kwargs: Any,
    ) -> None:
        """Create a new TextSplitter."""
        super().__init__(keep_separator=keep_separator, **kwargs)
        self._separators = separators or ["\n\n", "\n", " ", ""]
        self._is_separator_regex = is_separator_regex
        self._apply_chunk_size = apply_chunk_size


    
    def _split_text_with_regex(
        self, text: str, separator: str
    ) -> List[str]:
        # Now that we have the separator, split the text

        separators = None
        
        if separator:
            if self._keep_separator:
                try:
                    # The parentheses in the pattern keep the delimiters in the result.
                    _splits = re.split(f"({separator})", text)
                    splits = (
                        ([_splits[i] + _splits[i + 1] for i in range(0, len(_splits) - 1, 2)])
                        if self._keep_separator == "end"
                        else ([_splits[i] + _splits[i + 1] for i in range(1, len(_splits), 2)])
                    )
                    if len(_splits) % 2 == 0:
                        splits += _splits[-1:]
                    splits = (
                        (splits + [_splits[-1]])
                        if self._keep_separator == "end"
                        else ([_splits[0]] + splits)
                    )

                    separators = ["preamble"] + [_splits[i].strip("\n").strip() for i in range(1, len(_splits), 2)]
                except IndexError as e:
                    print(e)
                    print(_splits)
            else:
                splits = re.split(separator, text)
        else:
            splits = list(text)
        return [s for s in splits if s != ""], separators



    def _split_text(self, text: str, separators: List[str], steps_to_apply_chunk_size: int) -> List[str]:

        # print(separators)
        # print(steps_to_apply_chunk_size)
        # print(self._length_function(text))
        
        """Split incoming text and return chunks."""
        final_chunks = {}
        # Get appropriate separator to use
        separator = separators[-1]
        new_separators = []
        for i, _s in enumerate(separators):
            _separator = _s if self._is_separator_regex else re.escape(_s)
            if _s == "":
                separator = _s
                break
            if re.search(_separator, text):
                separator = _s
                new_separators = separators[i + 1 :]
                break

        _separator = separator if self._is_separator_regex else re.escape(separator)

        if steps_to_apply_chunk_size > 0 or self._length_function(text) > self._chunk_size:
            splits, separators_text = self._split_text_with_regex(text, _separator)
            
            # Does not merge small chunks to respect hierarchy
            if not new_separators:
                final_chunks = dict(zip(separators_text, splits))
            else:
                for i, s in enumerate(splits):
                    # print(f"new_separators={new_separators}, len(new_separators)={len(new_separators)}, len(separators)={len(separators)}")
                    other_info = self._split_text(s, new_separators, steps_to_apply_chunk_size - (len(separators) - len(new_separators)))
                    final_chunks[separators_text[i]] = other_info
        else:
            final_chunks = text

        return final_chunks


    def split_text(self, text: str) -> List[str]:
        """Split the input text into smaller chunks based on predefined separators.

        Args:
            text (str): The input text to be split.

        Returns:
            List[str]: A list of text chunks obtained after splitting.
        """
        return self._split_text(text, self._separators, self._apply_chunk_size)

In [60]:
spliting_hierarchy=[
    "\nLIVRO [IVX]+",
    "\nTÍTULO [IVX]+",
    "\nCAPÍTULO [IVX]+|Capítulo [IVX]+|\nCAPÍTULO ÚNICO",
    "\nSeção [IVX]+",
    "\nSubseção .+",
    # "\nArt\. \d+[º\. ]*|\n.\s+Art\. \d+[º\. ]*|\nArtigo único\.",
    "\nArt\. \d+[º\. ]*|\n.\s+Art\. \d+[º\. ]*",
    "\n§ \d+[º\. ]*",
    "\n[IVX]+[-\s]*",
]

In [61]:
hierarchical_text_splitter = HierachicalRecursiveCharacterTextSplitter(
    separators=spliting_hierarchy,
    is_separator_regex=True,
    chunk_size=1000,  # Tamanho máximo do chunk em caracteres
    chunk_overlap=100,  # Sobreposição entre chunks para contexto
    apply_chunk_size=6 # Separator index to start applying the chunk size check.
)

In [62]:
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,  # Tamanho máximo do chunk em caracteres
    chunk_overlap=100,  # Sobreposição entre chunks para contexto
)

#### Test the segmentation to some specific documents

In [None]:
chunks = hierarchical_text_splitter.split_text(normas_json['RIR2018.txt'])

In [None]:
chunks = hierarchical_text_splitter.split_text(normas_json['Lei nº 5.452.txt'])

In [26]:
chunks = hierarchical_text_splitter.split_text(normas_json['Ato Declaratório Cosar nº 47.txt'])

In [38]:
chunks = hierarchical_text_splitter.split_text(normas_json['Ato Declaratório Interpretativo SRF nº 2.txt'])

In [63]:
chunks = hierarchical_text_splitter.split_text(normas_json['Lei nº 9.096.txt'])

In [64]:
chunks

{'preamble': {'preamble': 'Presidência da República\nCasa Civil\nSubchefia para Assuntos Jurídicos\nLEI Nº 9.096, DE 19 DE SETEMBRO DE 1995\nTexto Compilado\nMensagem de veto\n(Vide Lei nº 9.259, de 1996)\n(Vide Lei nº 9.693, de 1998)\n(Vide Decreto nº 7.791, de 2012)\n(Vide ADI Nº 5.398)\n(Vide ADI Nº 6.230)\nDispõe sobre partidos políticos, regulamenta os arts. 17 e 14, § 3º,\ninciso V, da Constituição Federal.\nO VICE-PRESIDENTE DA REPÚBLICA no exercício do cargo de PRESIDENTE DA REPÚBLICA Faço saber que o Congresso Nacional decreta e\neu sanciono a seguinte Lei:'},
 'TÍTULO I': {'preamble': '\nTÍTULO I\nDisposições Preliminares',
  'Art. 1º': '\nArt. 1º O partido político, pessoa jurídica de direito privado, destina-se a assegurar, no interesse do regime democrático, a autenticidade do\nsistema representativo e a defender os direitos fundamentais definidos na Constituição Federal.\nParágrafo único. O partido político não se equipara às entidades paraestatais.                 (inclu

In [30]:
normas_json['Ato Declaratório Interpretativo SRF nº 2.txt']

'Multivigente  Vigente  Original  Relacional\nATO DECLARATÓRIO INTERPRETATIVO SRF Nº 2, DE 27 DE MARÇO DE 2007\n(Publicado(a) no DOU de 28/03/2007, seção , página 13)  \nDispõe sobre o tratamento tributário dos rendimentos\ndecorrentes de locação de partes comuns de condomínio\nedilício.\nO SECRETÁRIO DA RECEITA FEDERAL, no uso da atribuição que lhe confere o inciso III do art. 230 do\nRegimento Interno da Secretaria da Receita Federal, aprovado pela Portaria MF nº 30, de 25 de fevereiro de 2005, e\ntendo em vista o que consta no processo nº 10980.010644/2005-96, declara:\nArtigo único. Na hipótese de locação de partes comuns de condomínio edilício, será observado o seguinte:\nI - os rendimentos decorrentes serão considerados auferidos pelos condôminos, na proporção da parcela que\nfor atribuída a cada um, ainda que tais rendimentos sejam utilizados na composição do fundo de receitas do\ncondomínio, na redução da contribuição condominial ou para qualquer outro fim;\nII - o condômino es

### Check the extracted data hierarchy

In [None]:
len(chunks)

In [None]:
chunks.keys()

In [None]:
chunks

### Flatten the extracted hierarchy

In [43]:
def flatten_passages_hierarchy(passages_dictionary, current_path="", passages=[]):

    for item, value in passages_dictionary.items():
        item_path = current_path
        
        if item == "preamble":
            if type(value) == dict:
                flatten_passages_hierarchy(value, current_path, passages)

        if item_path != "":
            item_path += "_" + item
        else:
            item_path = item
        
        if type(value) == dict:
            flatten_passages_hierarchy(value, item_path, passages)
        else:
            passages.append({'path': item_path,
                             'passage': value})

In [65]:
passages = []
current_path = ""

In [66]:
flatten_passages_hierarchy(chunks, current_path, passages)

In [67]:
len(passages)

285

In [68]:
passages

[{'path': 'preamble',
  'passage': 'Presidência da República\nCasa Civil\nSubchefia para Assuntos Jurídicos\nLEI Nº 9.096, DE 19 DE SETEMBRO DE 1995\nTexto Compilado\nMensagem de veto\n(Vide Lei nº 9.259, de 1996)\n(Vide Lei nº 9.693, de 1998)\n(Vide Decreto nº 7.791, de 2012)\n(Vide ADI Nº 5.398)\n(Vide ADI Nº 6.230)\nDispõe sobre partidos políticos, regulamenta os arts. 17 e 14, § 3º,\ninciso V, da Constituição Federal.\nO VICE-PRESIDENTE DA REPÚBLICA no exercício do cargo de PRESIDENTE DA REPÚBLICA Faço saber que o Congresso Nacional decreta e\neu sanciono a seguinte Lei:'},
 {'path': 'preamble_preamble',
  'passage': 'Presidência da República\nCasa Civil\nSubchefia para Assuntos Jurídicos\nLEI Nº 9.096, DE 19 DE SETEMBRO DE 1995\nTexto Compilado\nMensagem de veto\n(Vide Lei nº 9.259, de 1996)\n(Vide Lei nº 9.693, de 1998)\n(Vide Decreto nº 7.791, de 2012)\n(Vide ADI Nº 5.398)\n(Vide ADI Nº 6.230)\nDispõe sobre partidos políticos, regulamenta os arts. 17 e 14, § 3º,\ninciso V, da Co

In [47]:
passages[0]

{'path': 'preamble',
 'passage': 'Multivigente  Vigente  Original  Relacional\nATO DECLARATÓRIO INTERPRETATIVO SRF Nº 2, DE 27 DE MARÇO DE 2007\n(Publicado(a) no DOU de 28/03/2007, seção , página 13)  \nDispõe sobre o tratamento tributário dos rendimentos\ndecorrentes de locação de partes comuns de condomínio\nedilício.\nO SECRETÁRIO DA RECEITA FEDERAL, no uso da atribuição que lhe confere o inciso III do art. 230 do\nRegimento Interno da Secretaria da Receita Federal, aprovado pela Portaria MF nº 30, de 25 de fevereiro de 2005, e\ntendo em vista o que consta no processo nº 10980.010644/2005-96, declara:\nArtigo único. Na hipótese de locação de partes comuns de condomínio edilício, será observado o seguinte:\nI - os rendimentos decorrentes serão considerados auferidos pelos condôminos, na proporção da parcela que\nfor atribuída a cada um, ainda que tais rendimentos sejam utilizados na composição do fundo de receitas do\ncondomínio, na redução da contribuição condominial ou para qualque

### Check passages tokens length

In [10]:
from transformers import AutoTokenizer, AutoModel
import torch

tokenizer = AutoTokenizer.from_pretrained("stjiris/bert-large-portuguese-cased-legal-tsdae")
model = AutoModel.from_pretrained("stjiris/bert-large-portuguese-cased-legal-tsdae")

device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

BertModel(
  (embeddings): BertEmbeddings(
    (word_embeddings): Embedding(29794, 1024, padding_idx=0)
    (position_embeddings): Embedding(512, 1024)
    (token_type_embeddings): Embedding(2, 1024)
    (LayerNorm): LayerNorm((1024,), eps=1e-12, elementwise_affine=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (encoder): BertEncoder(
    (layer): ModuleList(
      (0-23): 24 x BertLayer(
        (attention): BertAttention(
          (self): BertSdpaSelfAttention(
            (query): Linear(in_features=1024, out_features=1024, bias=True)
            (key): Linear(in_features=1024, out_features=1024, bias=True)
            (value): Linear(in_features=1024, out_features=1024, bias=True)
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (output): BertSelfOutput(
            (dense): Linear(in_features=1024, out_features=1024, bias=True)
            (LayerNorm): LayerNorm((1024,), eps=1e-12, elementwise_affine=True)
            (dropout): Dropout(p=0.1, 

In [22]:
import statistics
from scipy import stats
import numpy as np

def contar_tokens(chunk):
    tokens = tokenizer(chunk, return_tensors="pt", truncation=False)  # Tokeniza o texto sem truncar
    num_tokens = len(tokens['input_ids'][0])  # Conta o número de tokens
    return num_tokens

In [23]:
def passages_statistics(passages_list):
    num_tokens_por_chunk = [contar_tokens(passage) for passage in passages_list]

    desc_stats = stats.describe(num_tokens_por_chunk)
    
    mediana_tokens = statistics.median(num_tokens_por_chunk)
    max_token_passage = np.argmax(num_tokens_por_chunk)

    return({'max_tokens': desc_stats.minmax[1],
            'min_tokens': desc_stats.minmax[0],
            'mean_tokens': desc_stats.mean,
            'std_tokens': np.sqrt(desc_stats.variance),
            'skewness_tokens': desc_stats.skewness,
            'kurtosis_tokens': desc_stats.kurtosis,
            'median_tokens': statistics.median(num_tokens_por_chunk),
            'max_token_passage': np.argmax(num_tokens_por_chunk)})

In [None]:
passages_statistics([passage['passage'] for passage in passages])

In [None]:
passages[26]

### Now process (segmentation and flattening) all the referenced documents

In [13]:
from tqdm import tqdm

In [69]:
applied_splitter = []
passages_referred_docs = []

for filename, doc in tqdm(normas_json.items(), desc="Processing all referred legal documents"):
    # print(f"Processing file: {filename}")
    
    # Remove a extensão ".txt" do nome do arquivo
    doc_name = filename.replace(".txt", "")

    # First try applying the hierarchical splitter
    chunks = hierarchical_text_splitter.split_text(doc)
    passages = []
    current_path = ""

    flatten_passages_hierarchy(chunks, current_path, passages)
    
    # print(chunks.keys())
    
    total_statistics = passages_statistics([passage['passage'] for passage in passages])

    # print(total_statistics)

    #
    # Check if there are indications the hierarchical splitter did not work
    # verifying if there is any chunk longer than the maximum size.
    #
    
    if total_statistics['max_tokens'] > 1000:
        # For such document, apply a regular text splitter.
        
        # print(f">>> Applying regular text splitter in file {filename}.")

        chunks = text_splitter.split_text(doc)
        applied_splitter.append({'filename': filename,
                                 'splitter': 'text'})

        for passage in chunks:
            # Formata cada chunk com o nome do arquivo e o conteúdo
            passage_with_path = "{}: {}".format(doc_name, passage)
            passages_referred_docs.append(passage_with_path)    
    else:
        applied_splitter.append({'filename': filename,
                                 'splitter': 'hierarchical'})
    
        for passage in passages:
            # Formata cada chunk com o nome do arquivo e o conteúdo
            passage_with_path = "{}: {}".format(doc_name + "_" + passage['path'], passage['passage'])
            passages_referred_docs.append(passage_with_path)

Processing all referred legal documents: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 332/332 [00:13<00:00, 24.78it/s]


In [70]:
len(passages_referred_docs)

31467

In [75]:
passages_referred_docs[5]

'Ato Declaratório Interpretativo RFB nº 1_Art. 1º: \nArt. 1º No caso de pessoa física residente no País que adquire a condição de não residente, para fins de\naplicação do regime especial de tributação aplicável ao investidor estrangeiro não residente em país com tributação\nfavorecida nos termos do art. 24 da Lei nº 9.430, de 27 de dezembro de 1996, deverá o responsável tributário:\nI - exigir da pessoa física residente no País que adquire a condição de não residente a comprovação de que\napresentou a Comunicação de Saída Definitiva do País à Secretaria da Receita Federal do Brasil; e\nII - reter e recolher o imposto sobre a renda incidente sobre os rendimentos auferidos até o dia anterior ao da\naquisição da condição de não residente.'

In [76]:
passages_referred_docs[345]

'Decreto nº 3.000_LIVRO I_TÍTULO IV_CAPÍTULO III_Seção VII_Subseção I_Art. 58._preamble: \nArt. 58.  Considera-se atividade rural (Lei nº 8.023, de 12 de abril de 1990, art. 2º, Lei nº 9.250, de 1995, art.17, e Lei nº 9.430, de 1996, art. 59):'

In [77]:
passages_referred_docs[4985]

'Instrução Normativa RFB nº 2.172_CAPÍTULO VIII_Art. 18.: \nArt. 18. Será declarada nula a inscrição no CPF em que for constatada fraude.'

#### Check data statistics

In [71]:
passages_statistics(passages_referred_docs)

{'max_tokens': 1033,
 'min_tokens': 13,
 'mean_tokens': 138.0960053389265,
 'std_tokens': 96.19933497733604,
 'skewness_tokens': 2.2777788816370266,
 'kurtosis_tokens': 10.53901833375771,
 'median_tokens': 107,
 'max_token_passage': 13112}

In [72]:
passages_referred_docs[13112]

'Lei nº 13.097_CAPÍTULO I_Seção I_Art. 1º_§ 12. ..............................................................................................................................: \n§ 12. ..............................................................................................................................\n....................................................................................................................................................\nXL - produtos classificados no Ex 01 do código 8503.00.90 da Tipi.\n......................................................................................................................................... ” (NR)\n“Art. 28. ........................................................................................................................\n....................................................................................................................................................\nXXXVII - produtos classificados no Ex 01 d

In [73]:
import pickle

In [74]:
with open(DATA_PATH + "chunks_normas_new.pkl", "wb") as output_file:
    pickle.dump(passages_referred_docs, output_file, pickle.HIGHEST_PROTOCOL)