In [2]:
!pip3 install langchain transformers pgvector pymongo -q

# План
- сделать текст сплиттер на каждом документе (страницах)
- посчитать embedings в chromadb

In [12]:
from typing import (
    AbstractSet,
    Any,
    Callable,
    Collection,
    Dict,
    Iterable,
    List,
    Literal,
    Optional,
    Sequence,
    Tuple,
    Type,
    TypedDict,
    TypeVar,
    Union,
    cast,
)
import copy
import re
from enum import Enum

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from io import BytesIO, StringIO
from pydantic import BaseModel, Field

TS = TypeVar("TS", bound="TextSplitter")

class Language(str, Enum):
    """Enum of the programming languages."""

    CPP = "cpp"
    GO = "go"
    JAVA = "java"
    KOTLIN = "kotlin"
    JS = "js"
    TS = "ts"
    PHP = "php"
    PROTO = "proto"
    PYTHON = "python"
    RST = "rst"
    RUBY = "ruby"
    RUST = "rust"
    SCALA = "scala"
    SWIFT = "swift"
    MARKDOWN = "markdown"
    LATEX = "latex"
    HTML = "html"
    SOL = "sol"
    CSHARP = "csharp"
    COBOL = "cobol"
    C = "c"
    LUA = "lua"
    PERL = "perl"


def _split_text_with_regex(
    text: str, separator: str, keep_separator: bool
) -> List[str]:
    # Now that we have the separator, split the text
    if separator:
        if keep_separator:
            # The parentheses in the pattern keep the delimiters in the result.
            _splits = re.split(f"({separator})", text)
            splits = [_splits[i] + _splits[i + 1] for i in range(1, len(_splits), 2)]
            if len(_splits) % 2 == 0:
                splits += _splits[-1:]
            splits = [_splits[0]] + splits
        else:
            splits = re.split(separator, text)
    else:
        splits = list(text)
    return [s for s in splits if s != ""]


class Document(BaseModel):
    """Class for storing a piece of text and associated metadata."""

    page_content: str
    """String text."""
    metadata: dict = Field(default_factory=dict)
    """Arbitrary metadata about the page content (e.g., source, relationships to other
        documents, etc.).
    """
    type: Literal["Document"] = "Document"

    def __init__(self, page_content: str, **kwargs: Any) -> None:
        """Pass page_content in as positional or named arg."""
        super().__init__(page_content=page_content, **kwargs)

    @classmethod
    def is_lc_serializable(cls) -> bool:
        """Return whether this class is serializable."""
        return True

    @classmethod
    def get_lc_namespace(cls) -> List[str]:
        """Get the namespace of the langchain object."""
        return ["langchain", "schema", "document"]


class TextSplitter(ABC):
    """Interface for splitting text into chunks."""

    def __init__(
        self,
        chunk_size: int = 4000,
        chunk_overlap: int = 200,
        length_function: Callable[[str], int] = len,
        keep_separator: bool = False,
        add_start_index: bool = False,
        strip_whitespace: bool = True,
    ) -> None:
        """Create a new TextSplitter.

        Args:
            chunk_size: Maximum size of chunks to return
            chunk_overlap: Overlap in characters between chunks
            length_function: Function that measures the length of given chunks
            keep_separator: Whether to keep the separator in the chunks
            add_start_index: If `True`, includes chunk's start index in metadata
            strip_whitespace: If `True`, strips whitespace from the start and end of
                              every document
        """
        if chunk_overlap > chunk_size:
            raise ValueError(
                f"Got a larger chunk overlap ({chunk_overlap}) than chunk size "
                f"({chunk_size}), should be smaller."
            )
        self._chunk_size = chunk_size
        self._chunk_overlap = chunk_overlap
        self._length_function = length_function
        self._keep_separator = keep_separator
        self._add_start_index = add_start_index
        self._strip_whitespace = strip_whitespace

    @abstractmethod
    def split_text(self, text: str) -> List[str]:
        """Split text into multiple components."""

    def create_documents(
        self, texts: List[str], metadatas: Optional[List[dict]] = None
    ) -> List[Document]:
        """Create documents from a list of texts."""
        _metadatas = metadatas or [{}] * len(texts)
        documents = []
        for i, text in enumerate(texts):
            index = 0
            previous_chunk_len = 0
            for chunk in self.split_text(text):
                metadata = copy.deepcopy(_metadatas[i])
                if self._add_start_index:
                    offset = index + previous_chunk_len - self._chunk_overlap
                    index = text.find(chunk, max(0, offset))
                    metadata["start_index"] = index
                    previous_chunk_len = len(chunk)
                new_doc = Document(page_content=chunk, metadata=metadata)
                documents.append(new_doc)
        return documents

    def split_documents(self, documents: Iterable[Document]) -> List[Document]:
        """Split documents."""
        texts, metadatas = [], []
        for doc in documents:
            texts.append(doc.page_content)
            metadatas.append(doc.metadata)
        return self.create_documents(texts, metadatas=metadatas)

    def _join_docs(self, docs: List[str], separator: str) -> Optional[str]:
        text = separator.join(docs)
        if self._strip_whitespace:
            text = text.strip()
        if text == "":
            return None
        else:
            return text

    def _merge_splits(self, splits: Iterable[str], separator: str) -> List[str]:
        # We now want to combine these smaller pieces into medium size
        # chunks to send to the LLM.
        separator_len = self._length_function(separator)

        docs = []
        current_doc: List[str] = []
        total = 0
        for d in splits:
            _len = self._length_function(d)
            if (
                total + _len + (separator_len if len(current_doc) > 0 else 0)
                > self._chunk_size
            ):
                if total > self._chunk_size:
                    logger.warning(
                        f"Created a chunk of size {total}, "
                        f"which is longer than the specified {self._chunk_size}"
                    )
                if len(current_doc) > 0:
                    doc = self._join_docs(current_doc, separator)
                    if doc is not None:
                        docs.append(doc)
                    # Keep on popping if:
                    # - we have a larger chunk than in the chunk overlap
                    # - or if we still have any chunks and the length is long
                    while total > self._chunk_overlap or (
                        total + _len + (separator_len if len(current_doc) > 0 else 0)
                        > self._chunk_size
                        and total > 0
                    ):
                        total -= self._length_function(current_doc[0]) + (
                            separator_len if len(current_doc) > 1 else 0
                        )
                        current_doc = current_doc[1:]
            current_doc.append(d)
            total += _len + (separator_len if len(current_doc) > 1 else 0)
        doc = self._join_docs(current_doc, separator)
        if doc is not None:
            docs.append(doc)
        return docs

    @classmethod
    def from_huggingface_tokenizer(cls, tokenizer: Any, **kwargs: Any) -> "TextSplitter":
        """Text splitter that uses HuggingFace tokenizer to count length."""
        try:
            from transformers import PreTrainedTokenizerBase

            if not isinstance(tokenizer, PreTrainedTokenizerBase):
                raise ValueError(
                    "Tokenizer received was not an instance of PreTrainedTokenizerBase"
                )

            def _huggingface_tokenizer_length(text: str) -> int:
                return len(tokenizer.encode(text))

        except ImportError:
            raise ValueError(
                "Could not import transformers python package. "
                "Please install it with `pip install transformers`."
            )
        return cls(length_function=_huggingface_tokenizer_length, **kwargs)

    @classmethod
    def from_tiktoken_encoder(
        cls: Type[TS],
        encoding_name: str = "gpt2",
        model_name: Optional[str] = None,
        allowed_special: Union[Literal["all"], AbstractSet[str]] = set(),
        disallowed_special: Union[Literal["all"], Collection[str]] = "all",
        **kwargs: Any,
    ) -> TS:
        """Text splitter that uses tiktoken encoder to count length."""
        try:
            import tiktoken
        except ImportError:
            raise ImportError(
                "Could not import tiktoken python package. "
                "This is needed in order to calculate max_tokens_for_prompt. "
                "Please install it with `pip install tiktoken`."
            )

        if model_name is not None:
            enc = tiktoken.encoding_for_model(model_name)
        else:
            enc = tiktoken.get_encoding(encoding_name)

        def _tiktoken_encoder(text: str) -> int:
            return len(
                enc.encode(
                    text,
                    allowed_special=allowed_special,
                    disallowed_special=disallowed_special,
                )
            )

        if issubclass(cls, TokenTextSplitter):
            extra_kwargs = {
                "encoding_name": encoding_name,
                "model_name": model_name,
                "allowed_special": allowed_special,
                "disallowed_special": disallowed_special,
            }
            kwargs = {**kwargs, **extra_kwargs}

        return cls(length_function=_tiktoken_encoder, **kwargs)

    def transform_documents(
        self, documents: Sequence[Document], **kwargs: Any
    ) -> Sequence[Document]:
        """Transform sequence of documents by splitting them."""
        return self.split_documents(list(documents))


class RecursiveCharacterTextSplitter(TextSplitter):
    """Splitting text by recursively look at characters.

    Recursively tries to split by different characters to find one
    that works.
    """

    def __init__(
        self,
        separators: Optional[List[str]] = None,
        keep_separator: bool = True,
        is_separator_regex: bool = False,
        **kwargs: Any,
    ) -> None:
        """Create a new TextSplitter."""
        super().__init__(keep_separator=keep_separator, **kwargs)
        self._separators = separators or ["\n\n", "\n", " ", ""]
        self._is_separator_regex = is_separator_regex

    def _split_text(self, text: str, separators: List[str]) -> List[str]:
        """Split incoming text and return chunks."""
        final_chunks = []
        # Get appropriate separator to use
        separator = separators[-1]
        new_separators = []
        for i, _s in enumerate(separators):
            _separator = _s if self._is_separator_regex else re.escape(_s)
            if _s == "":
                separator = _s
                break
            if re.search(_separator, text):
                separator = _s
                new_separators = separators[i + 1 :]
                break

        _separator = separator if self._is_separator_regex else re.escape(separator)
        splits = _split_text_with_regex(text, _separator, self._keep_separator)

        # Now go merging things, recursively splitting longer texts.
        _good_splits = []
        _separator = "" if self._keep_separator else separator
        for s in splits:
            if self._length_function(s) < self._chunk_size:
                _good_splits.append(s)
            else:
                if _good_splits:
                    merged_text = self._merge_splits(_good_splits, _separator)
                    final_chunks.extend(merged_text)
                    _good_splits = []
                if not new_separators:
                    final_chunks.append(s)
                else:
                    other_info = self._split_text(s, new_separators)
                    final_chunks.extend(other_info)
        if _good_splits:
            merged_text = self._merge_splits(_good_splits, _separator)
            final_chunks.extend(merged_text)
        return final_chunks

    def split_text(self, text: str) -> List[str]:
        return self._split_text(text, self._separators)

    @classmethod
    def from_language(
        cls, language: Language, **kwargs: Any
    ) -> "RecursiveCharacterTextSplitter":
        separators = cls.get_separators_for_language(language)
        return cls(separators=separators, is_separator_regex=True, **kwargs)

    @staticmethod
    def get_separators_for_language(language: Language) -> List[str]:
        return []

In [20]:
test_text = "расходов по операциям приобретенными ценными бумагами): 10.4.2 бухгалтерском учете кредитной организации заемщика данная операция отражается как приобретение заимствованных ценных бумаг в соответствии с главой 3 настоящего Положения Одновременно обязательство по возврату заимствованных ценных бумаг списывается сО счета & 91314 <Ценные бумаги, полученные по операциям, совершаемым на возвратной основеж случае если обязательство По возврату заимствованных ценных бумаг учтено на балансовом счете По учету привлеченных средств случае реализации заимствованных ценных бумаг); сумма денежных средств; предоставленных погашение займа, отражастся ПО дебету балансового счета По учету выбытия   (реализации) ценных бумаг; обязательство По возврату заимствованных ценных бумаг по кредиту балансового счета по учету выбытия (реализации) ценных бумаг. Одноврсменно сумма остатка (при сго наличии), образовавшаяся на счете & 61210 <Выбытие (реализация) ценных бумагъ, подлежит отнесению на счета & 70601 <Доходых или & 70606 <Расходыж (по символу доходов От операций приобретенными ценными бумагами или расходов По операциям приобретенными ценными бумагами) 10.5. Процентные расходы По операциям займа ценных бумаг процентные доходЫ От операций   займа ценных бумаг отражаются учетом следующего 10.5.1. Начисление и уплата процентов По договору займа ценных бумаг учитывается кредитной  организацией засмщиком на балансовых счетах по учету начисленных процентов (к уплате) ПО привлеченным средствам Начисление процентного расхода по договору займа ценных бумаг отражается бухгалтерской записью:"

In [24]:
splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", " ", ""],
    chunk_size=2048,
    chunk_overlap=409,
    length_function=len,
)

In [25]:
splits = splitter.split_text(test_text)

In [32]:
from pymongo import MongoClient

client = MongoClient(dsn)
materials = client["cbr"].get_collection("materials_copy")

In [33]:
res = materials.find({"invalid": False}).limit(1000)

In [40]:
text_splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", " ", ""],
    chunk_size=2048,
    chunk_overlap=409,
    length_function=len,
)

In [37]:
from collections import namedtuple

material = namedtuple("material", ["doc", "page", "text", "src"])

docs = [
    material(obj["doc"], obj["page"], obj["text"], obj["src"]) for obj in
    res
]

In [41]:
_docs = [
    Document(page_content=doc.text, metadata={
        "src":doc.src
    })
    for doc in docs
]

In [42]:
result = text_splitter.split_documents(
    _docs,
)

In [43]:
result[0]

Document(page_content='ЦЕНТРАЛЬНЫЙ БАНК РОССИЙСКОЙ ФЕДЕРАЦИИ (БАНК РОССИИ) УК А 3 А НИЕ 11 онтября 2018г х\' 4930-У г Москва  ЗЮГТГРСНН РЕТИЩЦИ {ИЕЛЙЕ*ЕП МЕЛЕРЖШП ЗаевгмстРивОЁАе Рсгистрацнонкый ж 53109 24"92 [2щЖ внесенин изменений в Положение Банка России от 19 июня 2012 года Л: 383-П к0 правилах осуществления перевода денежных средствэ соответствии пунктом статьи Федерального закона От 10 июля 2002 года & 86-Ф3 <0 Центральном банке Российской Федерации (Банке России)э (Собрание законодательства Российской Федерации, 2002, ) 28, ст: 2790; 2003, ) 2, ст: 157; & 52, ст. 5032; 2004, & 27, с 2711; ) 31, ст. 3233; 2005, &\' 25, ст. 2426, &: 30, сТ: 3101; 2006, & 19, ст: 2061; & 25, ст. 2648; 2007, & 1, ст. 9, ст. 10; &е 10, Ст 1151; & 18, ст. 2117; 2008, & 42, СТ: 4696, сТ: 4699; &: 44, ст: 4982; & 52, ст: 6229, 6231; 2009, )\' 1, с: 25; &\' 29, СТ 3629, &\' 48, ст: 5731; 2010, &\' 45, ст: 5756; 2011, & 7, сТ 907; & 27, С: 3873; & 43, ст: 5973; Л 48, СТ: 6728; 2012, &\' 50, ст: 6954; &\' 

In [48]:
model = "Tochka-AI/ruRoPEBert-e5-base-2k"

In [53]:
from transformers import AutoTokenizer, AutoModel

model_name = "Tochka-AI/ruRoPEBert-e5-base-2k"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(
    model_name, trust_remote_code=True, attn_implementation="eager"
)

A new version of the following files was downloaded from https://huggingface.co/Tochka-AI/ruRoPEBert-e5-base-2k:
- modeling_rope_bert.py
. Make sure to double-check they do not contain any added malicious code. To avoid downloading new versions of the code file, you can pin a revision.


In [58]:
from langchain_community.embeddings import HuggingFaceEmbeddings

embedings = HuggingFaceEmbeddings(model_name=model_name)

No sentence-transformers model found with name Tochka-AI/ruRoPEBert-e5-base-2k. Creating a new one with MEAN pooling.
Some weights of BertModel were not initialized from the model checkpoint at Tochka-AI/ruRoPEBert-e5-base-2k and are newly initialized: ['bert.embeddings.position_embeddings.weight', 'bert.pooler.dense.bias', 'bert.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [59]:
from langchain.vectorstores import Chroma
chroma = Chroma.from_documents(
    embedding=embedings, documents=result, collection_name="test"
)


In [62]:
def retrieve_vector_db(query, db, n_results=3):
    return db.similarity_search(query, n_results)


query = "Как назначается временная администрация?"
retrieved_docs = retrieve_vector_db(query=query, db=chroma)

In [63]:
retrieved_docs

[Document(page_content='Предложение саморегулируемую организацию арбитражных управляющих, имеющую первый порядковый номер в Списке: 1.9. Банк   России должен направлять саморегулируемой организации арбитражных управляющих Прсдложение способом, предусмотренным пунктом 1.6 настоящего Указания. 1.10. Саморегулируемая организация арбитражных управляющих получившая Предложение имеющая намерение представить кандидатуры руководителя и членов временной администрации финансовой организации, должна направить Банк России уведомление представлении кандидатур руководителя и членов временной администрации финансовой организации (далее Уведомление саморегулируемой организации арбитражных управляющих ), включающее следующие сведения: наименование финансовой организации, которую назначается временная администрация; фамилия; Имя, отчество (при наличии) идентификационный номер налогоплательщика кандидатуры руководителя временной администрации; фамилия; ИМЯ, отчество (при наличии) идентификационный номер 

In [94]:
from langchain.llms import Ollama
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

ollama = Ollama(base_url="http://larek.tech:11434", model="llama2", temperature=0)

template = """
Отвечай только на русском. Если пишешь на другом языке, переводи его на русской.
Если не знаешь ответа, скажи что не знаешь ответа, не пробуй отвечать.
Я дам тебе три текста, из которых надо дать ответ на поставленный вопрос.
Также тебе надо оставить ссылку из источник.

Context:
источник {url1}:
{context1}

источник {url2}:
{context2}

источник {url3}:
{context3}

Вопрос: {question} на русском языке. Ответь на вопрос основываясь на данных документах
Развернутый ответ:
"""
prompt = PromptTemplate.from_template(template)
llm_chain = LLMChain(prompt=prompt, llm=ollama)

In [103]:
query = "Какое отношение к денежно-кредитной политике имеет управление ликвидностью банковского сектора и ставками денежного рынка, которое осуществляет Банк России?"
docs = retrieve_vector_db(query, chroma)

In [109]:
generated = llm_chain.run(
    context1=docs[0].page_content,
    url1=docs[0].metadata["src"],
    context2=docs[1].page_content,
    url2=docs[1].metadata["src"],
    context3=docs[2].page_content,
    url3=docs[2].metadata["src"],
    question=query,
)

In [110]:
print(generated)

Ответ: Управление ликвидностью банковского сектора и ставками денежного рынка, которое осуществляет Банк России, играет важную роль в регулировании денежно-кредитной политики в России.

Согласно документам, на сайте ЦБ РФ, управление ликвидностью банковского сектора и ставками денежного рынка является частью довольно широкого круга функций, которые выполняются Банком России для регулирования денежно-кредитной политики. В частности, это включает в себя:

1. Контроль выполнения кредитными организациями обязательных резервных требований, установленных Банком России;
2. Оценку и контроль рисков, связанных с операциями на финансовых рынках;
3. Разработку и применение методов и инструментов для мониторинга и регулирования денежно-кредитной политики;
4. Обучение и развитие кредитных организаций, а также увеличение их конкурентоспособности на рынке;
5. Решение задач по обеспечению стабильности финансовой системы России;
6. Разработка и применение методов и инструментов для мониторинга и регули

In [111]:
for doc in docs:
    print(doc.page_content, end="\n")

Советник первого заместителя (заместителя) Председателя Центрального банка Российской Федерации Советник главного аудитора Центрального банка Российской Федерации Дирсктор дспартамента Начальник департамента Руководитель службы Директор Университета Банка России Первый заместитель (заместитель) главного   бухгалтера   Центрального банка Российской Федерации первый заместитель (заместитель) директора Департамента бухгалтерского и отчстности Первый заместитель (заместитель) директора Департамента бухгалтерского учета отчетности, Департамента исследований прогнозирования Департамента наличного денежного обращения; Департамента   национальной   платежной системы,  Департамента допуска прекращения деятельности финансовых организаций; Департамента финансового оздоровления, Департамента корпоративных отношений Департамента обеспечения банковского   надзора; Департамента банковского регулирования; Департамента надзора за системно значимыми кредитными организациями; Департамента операций на фин