In [1]:
import re
from langchain_core import documents
from langchain_community.document_loaders import TextLoader

In [2]:
class CustomTextSplitter:
    def __init__(self, target_chunk_size=1000):
        
        self.target_chunk_size = target_chunk_size
        self.source = ''
        self.chunks = []
        self.current_chunk = ''
        self.dcts = []
        self.bullet_name = ''
        self.prev_bullet_name = ''
        self.mode_list = ['bullet', 'line', 'space']
        self.sep_dict = {'line': '\n', 'space': ' '}
        self.link_dict = {'bullet': '\n', 'line': '\n', 'space': ' '}
        
    def is_bullet(self, line):
        # Проверка, начинается ли строка с буллита
        return re.match(r'^\d+(\.\d+)*\.', line.strip())
    
    def make_chunks(self, input_text, mode):
        link = self.link_dict[mode] 
        if len(input_text) == 0:
            return
        if len(self.current_chunk) > 0:
            # Если текущий чанк не пуст, проверяем, поместится ли туда текущий фрагмент
            new_chunk = self.current_chunk + link + input_text
            if len(new_chunk) <= self.target_chunk_size:
                self.current_chunk = new_chunk
            else:
                # Если не помещается, сначала сохраняем текущий чанк, потом 
                # в текущий чанк записываем новый фрагмент
                self.chunks.append(self.current_chunk)
                dct = documents.base.Document(self.current_chunk)
                dct.metadata = {'source': self.source}
                # В текущей реализации в метаданные пишется имя последнего буллита чанка
                if mode == 'bullet':
                    dct.metadata['bullet'] = self.prev_bullet_name
                else:
                    dct.metadata['bullet'] = self.bullet_name
                self.dcts.append(dct)
                self.current_chunk = ''
                if len(input_text) <= self.target_chunk_size:
                    self.current_chunk = input_text
                else:
                    # Текст не помещается в чанк целиком. Тогда определяем параметры нового 
                    # разбиения и вызываем рекурсивно этот же метод.
                    next_mode, separator = self.prepare_for_new_split(mode)
                    if next_mode is None:
                        return
                    else:
                        for item in input_text.split(separator):
                            self.make_chunks(item, next_mode)
                        # После завершения разбиения сохраняем текущий чанк, если он не пуст.    
                        if self.current_chunk != '':
                            self.chunks.append(self.current_chunk)
                            dct = documents.base.Document(self.current_chunk)
                            # В текущей реализации в метаданные пишется имя последнего буллита чанка
                            dct.metadata = {'source': self.source,'bullet': self.bullet_name}
                            self.dcts.append(dct)
                            self.current_chunk = ''
        elif len(input_text) <= self.target_chunk_size:
                self.current_chunk = input_text
        else:
            # Текст не помещается в чанк целиком. Тогда определяем параметры нового 
            # разбиения и вызываем рекурсивно этот же метод.            
            next_mode, separator = self.prepare_for_new_split(mode)
            if next_mode is None:
                return
            else:
                for item in input_text.split(separator):
                    self.make_chunks(item, next_mode)
                # После завершения разбиения сохраняем текущий чанк, если он не пуст. 
                if self.current_chunk != '':
                    self.chunks.append(self.current_chunk)
                    dct = documents.base.Document(self.current_chunk)
                    # В текущей реализации в метаданные пишется имя последнего буллита чанка
                    dct.metadata = {'source': self.source,'bullet': self.bullet_name}
                    self.dcts.append(dct)
                    self.current_chunk = ''
    
    def prepare_for_new_split(self, current_mode):
        # Определяем параметры разбиения
            next_mode_index = self.mode_list.index(current_mode) + 1
            if next_mode_index < len(self.mode_list):
                next_mode = self.mode_list[next_mode_index]
                next_separator = self.sep_dict[next_mode]
                return next_mode, next_separator
            else:
                return None, None
            
    def split_text(self, text, source):
        bullet_chunks = []
        current_bullet_chunk = ''
        self.source = source
        # Разбиваем исходный текст на куски, каждый из которых (кроме первого) 
        # начинается с буллита
        lines = text.split("\n")
        for line in lines:
            if len(line) > 0:
                if self.is_bullet(line):
                    if len(current_bullet_chunk) > 0:
                        bullet_chunks.append(current_bullet_chunk)
                    current_bullet_chunk = line      
                elif len(current_bullet_chunk) > 0:
                    current_bullet_chunk = current_bullet_chunk + '\n' + line
                else:
                    current_bullet_chunk = line     
        bullet_chunks.append(current_bullet_chunk)
        # Для каждого буллита сохраняем его имя (=номер) и вызываем метод разбивки на чанки
        for bc in bullet_chunks:
            if len(bc) > 0:
                self.prev_bullet_name = self.bullet_name
                if self.is_bullet(bc):
                    self.bullet_name = re.match(r'^\d+(\.\d+)*\.', bc.strip()).group()
                else:
                    self.bullet_name = bc.split()[0]
                self.make_chunks(bc, 'bullet')
                
        return self.dcts
    

In [3]:
loader = TextLoader("./положение.txt", encoding = 'UTF-8')
text_documents = loader.load()

In [4]:
# Разбиение текста на чанки. Лимит чанка по умолчанию =1000. Если нужен другой, 
# задаём его с помощью target_chunk_size=... при создании splitter

splitter = CustomTextSplitter()
test_chunks = splitter.split_text(text_documents[0].page_content, text_documents[0].metadata['source'])
test_chunks

[Document(page_content='УТВЕРЖДЕНО решением Совета директоров ПАО «Компания 1» от 29 марта 2019 № ПТ-0102/14 ПОЛОЖЕНИЕ о закупках товаров, работ, услуг ПАО «Компания 1» Оглавление\n1. ОБЩИЕ ПОЛОЖЕНИЯ 5\n1.1. Предмет и цели регулирования 5\n1.2. Термины и определения 7\n1.3. Центральный орган управления закупками (ЦОУЗ) 15\n1.4. Закупочная комиссия, порядок создания, функционирования и полномочия 17\n1.5. Требования к участникам закупки 19\n1.6. Требования к описанию предмета закупки 21\n1.7. Требования к информационному обеспечению закупок 22\n2. ПЛАНИРОВАНИЕ ЗАКУПОК 23\n3. ПРЕДКВАЛИФИКАЦИЯ. РЕЕСТР ПОТЕНЦИАЛЬНЫХ УЧАСТНИКОВ ЗАКУПОК 23\n4. ОПРЕДЕЛЕНИЕ НАЧАЛЬНОЙ (МАКСИМАЛЬНОЙ) ЦЕНЫ ДОГОВОРА (ПРЕДМЕТА ЗАКУПКИ) 27\n5. ОРГАНИЗАЦИЯ ПРОВЕДЕНИЯ ЗАКУПОК 40\n6. СПОСОБЫ ЗАКУПОК И УСЛОВИЯ ИХ ПРИМЕНЕНИЯ 42\n7. ПОРЯДОК ПОДГОТОВКИ И ОСУЩЕСТВЛЕНИЯ КОНКУРЕНТНЫХ ЗАКУПОК 46\n7.1. Общий порядок подготовки и проведения конкурентных закупок 46\n7.2. Извещение об осуществлении конкурентной закупки 48\n7.3. До