# Imports

In [1]:
import json
from itertools import chain
from operator import itemgetter
from typing import List, Literal, Optional, Union

import chromadb
import nltk
from dotenv import load_dotenv
from IPython.display import Markdown
from langchain.retrievers import EnsembleRetriever
from langchain_chroma import Chroma
from langchain_community.retrievers import BM25Retriever
from langchain_core.prompts import ChatPromptTemplate, FewShotChatMessagePromptTemplate
from langchain_core.runnables import Runnable, RunnableLambda
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_mistralai.chat_models import ChatMistralAI
from langsmith import traceable
from nltk.tokenize import word_tokenize
from pydantic import BaseModel, Field

# Initialization

In [2]:
load_dotenv()  # For MISTRAL_API_KEY and LangSmith tracing
nltk.download("punkt_tab")  # Tokenizer for BM-25 retriever

[nltk_data] Downloading package punkt_tab to
[nltk_data]     /home/deniskirbaba/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

In [3]:
settings = {
    "llm": {"model_name": "mistral-large-latest", "temperature": 0.7, "top_p": 0.9},
    "emb_model": {"model_name": "deepvk/USER-bge-m3"},
    "query": {
        "n_rephrase": 2,
        "n_hyde": 2,
    },
    "retriever": {"k_simil": 10, "k_mmr": 10, "k_bm25": 10, "k_codex": 10},
}

In [4]:
# Load models
llm = ChatMistralAI(**settings["llm"])
emb_model = HuggingFaceEmbeddings(**settings["emb_model"])

# Query classification and analysis

In [5]:
with open("prompts/system_prompt_1.txt") as f:
    system_prompt_1 = f.read()

with open("prompts/examples_1.json") as f:
    examples_1 = json.load(f)

example_prompt_1 = ChatPromptTemplate.from_messages(
    [
        ("user", "{input}"),
        ("assistant", "{output}"),
    ]
)

few_shot_prompt_1 = FewShotChatMessagePromptTemplate(
    example_prompt=example_prompt_1,
    examples=examples_1,
)

prompt_1 = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt_1),
        few_shot_prompt_1,
        ("user", "{question}"),
    ]
)

In [6]:
class RAGQueries(BaseModel):
    """
    Данные, сформированные на основе запроса пользователя для поиска релевантных документов по юридической практике в векторной БД
    Необходимы для уточнения запроса и повышения релевантности выдачи
    """

    keyinfo: str = Field(description="Ключевая информация")
    rephrase: List[str] = Field(
        ...,
        description=f"{settings["query"]["n_rephrase"]} различных запросов в векторную базу данных, содержащую судебные дела",
    )
    hyde: List[str] = Field(
        ...,
        description=f"{settings["query"]["n_hyde"]} различных описаний реалистичных документа юридической практики",
    )
    codex: Optional[
        Literal["Гражданский кодекс", "Уголовный кодекс", "Административный кодекс"]
    ] = Field(
        default=None,
        description="Фильтр для уточнения правового поля запроса, используемый при обращении к векторной базе данных, если возможно определить",
    )


class ChitChatResponse(BaseModel):
    """
    Ответ на запрос пользователя, который не предполагает юридической консультации.
    """

    response: str = Field(..., description="Ответ на общий или неюридический запрос пользователя.")


class FirstResponse(BaseModel):
    """
    Итоговый результат обработки запроса пользователя.
    В зависимости от характера запроса результат может быть представлен в двух форматах:
    - RAGQueries: используется для юридических запросов. Содержит структурированные данные для обращения к векторной базе с целью поиска релевантных документов.
    - ChitChatResponse: применяется для общих или неюридических запросов. Содержит текстовый ответ на запрос пользователя.
    """

    response: Union[RAGQueries, ChitChatResponse] = Field(
        ...,
        description="""Итоговый результат обработки запроса пользователя. 
    Форматы ответа:
    - RAGQueries: для юридических запросов, содержит структурированные данные для поиска документов в векторной базе.
    - ChitChatResponse: для общих запросов, содержит текстовый ответ.""",
    )


In [7]:
llm_1 = llm.with_structured_output(FirstResponse)
chain_1 = prompt_1 | llm_1

# Retrieval and final generation

In [8]:
@traceable
def create_queries(inputs: dict) -> dict:
    """
    Creates queries for retriever:
        1. Original user query
        2. n_rephrase queries with key info + rephrase
        3. n_hyde queries with key info + rephrase

    Params:
        `inputs` - output from previous LangChain Runnable in chain
    """
    original_query: str = inputs["question"]
    llm_processed: RAGQueries = inputs["response_1"].response

    queries = [original_query]
    for s in chain(llm_processed.rephrase, llm_processed.hyde):
        queries.append(llm_processed.keyinfo + "\n" + s)
    return {"queries": queries, "codex_filter": llm_processed.codex}

In [9]:
# Set up ChromaDB with legal practices docs
# Note: this DB should already be initialized
persistent_client = chromadb.PersistentClient(path="../legal_practice_db/vector_storage")
legal_practices_store = Chroma(
    client=persistent_client,
    collection_name="legal_practices",
    embedding_function=emb_model,
)

In [10]:
def create_retrievers(legal_practices_store: Chroma, retriever_settings: dict) -> dict:
    """
    Creates 4 retrievers, which will be ensembled later.

    Params:
        `legal_practices_store` - ChromaDB object of legal practices data
        `retriever_settings` - settings for retrievers
    """
    retrievers = {}
    retrievers["simil"] = legal_practices_store.as_retriever(
        search_type="similarity",
        search_kwargs={
            "k": retriever_settings["k_simil"],
        },
    )

    retrievers["mmr"] = legal_practices_store.as_retriever(
        search_type="mmr",
        search_kwargs={
            "k": retriever_settings["k_mmr"],
            "fetch_k": 25,
            "lambda_mult": 0.5,
        },
    )

    data_for_bm25 = legal_practices_store.get(include=["documents", "metadatas"])
    retrievers["bm25"] = BM25Retriever.from_texts(
        texts=data_for_bm25["documents"],
        metadatas=data_for_bm25["metadatas"],
        preprocess_func=word_tokenize,
        k=retriever_settings["k_bm25"],
    )

    retrievers["codex_filter"] = legal_practices_store.as_retriever(
        search_type="similarity_score_threshold",
        search_kwargs={
            "k": retriever_settings["k_codex"],
            "score_threshold": 0.5,
            # add at runtime: "filter": {"codex": {"$eq": "?"}},
        },
    )
    return retrievers

In [11]:
legal_practices_retrievers = create_retrievers(
    legal_practices_store=legal_practices_store, retriever_settings=settings["retriever"]
)

legal_practices_ensemble_retriever = EnsembleRetriever(
    retrievers=list(legal_practices_retrievers.values()),
    tags=list(legal_practices_retrievers.keys()),
    weights=[0.25, 0.25, 0.25, 0.25],
    c=60,
)

In [12]:
@traceable
def batch_query(inputs) -> dict:
    """
    Makes batch invoke of legal_practices_ensemble_retriever for each query.
    Also set the filter for codex (if it's not None).

    Params:
        `inputs` - output from previous LangChain Runnable in chain
    """
    # Add/remove codex filter for retriever
    codex_poss_values = ["А", "АГ", "АУ", "АГУ", "Г", "ГУ", "У"]
    if inputs["codex_filter"] and inputs["codex_filter"][0] in "АГУ":
        legal_practices_ensemble_retriever.retrievers[3].search_kwargs["filter"] = {
            "codex": {"$in": [val for val in codex_poss_values if inputs["codex_filter"][0] in val]}
        }
    else:
        legal_practices_ensemble_retriever.retrievers[3].search_kwargs.pop("filter", None)

    # Batch retrieve
    relevant_docs = legal_practices_ensemble_retriever.batch(inputs["queries"])

    return {"relevant_docs": relevant_docs, "orig_query": inputs["queries"][0]}

In [13]:
@traceable
def prepare_docs(inputs) -> dict:
    """
    Post-processing of retrieved relevant docs.
    Steps:
        1. Picks up the most relevant docs for each query w/o duplicates
        2. Add `theme` from metadata of doc to its content
        3. Merges all docs in one string

    Params:
        `inputs` - output from previous LangChain Runnable in chain
    """
    relevant_docs = inputs["relevant_docs"]
    # Pick top document for each query without duplicates
    seen_uids = set()
    top_docs = []
    for query_docs in relevant_docs:  # relevant_docs is a list of lists (docs per query)
        for doc in query_docs:
            uid = doc.metadata.get("uid")
            if uid not in seen_uids:
                seen_uids.add(uid)
                top_docs.append(doc)
                break

    # Merge `theme` into `page_content`
    for doc in top_docs:
        theme = doc.metadata.get("theme", "")
        doc.page_content = f"{theme}\n{doc.page_content}"

    # Merge all docs
    context = "\n\n".join(doc.page_content for doc in top_docs)

    return {"context": context, "orig_query": inputs["orig_query"]}

In [14]:
with open("prompts/system_prompt_2.txt") as f:
    system_prompt_2 = f.read()

prompt_2 = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt_2),
        ("user", "{orig_query}\n===\n{context}"),
    ]
)

In [15]:
class ResponseWithLegalDocs(BaseModel):
    """
    Модель для структурированного ответа с юридическими документами.
    """

    text_response: str = Field(
        description="Полный текст юридического ответа на запрос пользователя."
    )
    legal_docs: List[str] = Field(
        description="Список названий юридических документов, использованных для формирования ответа.",
    )

In [16]:
llm_2 = llm.with_structured_output(ResponseWithLegalDocs)

chain_2 = (
    RunnableLambda(create_queries)
    | RunnableLambda(batch_query)
    | RunnableLambda(prepare_docs)
    | prompt_2
    | llm_2
)

# Final chain

In [17]:
@traceable
def route(inputs) -> str | Runnable:
    """
    Swithes behavior of pipeline based on query classification:
        1. If LLM classify query as chit-chat - we just return the text response without doing RAG
        2. Otherwise, if query belongs to legal field - execute `chain_2` for RAG

    Params:
        `inputs` - output from previous LangChain Runnable in chain
    """
    if isinstance(inputs["response_1"].response, ChitChatResponse):
        return inputs["response_1"].response.response
    else:
        return chain_2

In [18]:
final_chain = {"response_1": chain_1, "question": itemgetter("question")} | RunnableLambda(route)

In [19]:
query = "Я ехал на велосипеде, упал и поцарапал машину, что делать и что мне будет?"
response = final_chain.invoke({"question": query})

Failed to use model_dump to serialize <class 'langchain_core.runnables.base.RunnableSequence'> to JSON: PydanticSerializationError(Unable to serialize unknown type: <class 'langchain_core.runnables.base.RunnableLambda'>)


In [20]:
Markdown(response.text_response)

Ваш случай относится к дорожно-транспортному происшествию (ДТП), в котором вы, будучи велосипедистом, повредили автомобиль. Важно понимать, что в таких ситуациях применяются нормы Кодекса Российской Федерации об административных правонарушениях (КоАП РФ) и Правил дорожного движения Российской Федерации (ПДД РФ).

1. **Обязанности участников ДТП**: Согласно пункту 2.5 ПДД РФ, водитель транспортного средства обязан соблюдать осторожность и быть внимательным к другим участникам движения, включая велосипедистов. В вашем случае, если вы не соблюдали правила движения, это может быть расценено как административное правонарушение.

2. **Ответственность за причинение вреда**: В соответствии с частью 1 статьи 12.24 КоАП РФ, нарушение ПДД, повлекшее причинение легкого или средней тяжести вреда здоровью, влечет административную ответственность. В вашем случае, если вы повредили автомобиль, это может быть квалифицировано как нарушение, повлекшее материальный ущерб.

3. **Действия после ДТП**: Вам необходимо остановиться, включить аварийную сигнализацию и выставить знак аварийной остановки. Также обязательно сообщить о ДТП в полицию и дождаться прибытия сотрудников. Важно зафиксировать все обстоятельства происшествия, включая фотографии и контактные данные свидетелей.

4. **Возмещение ущерба**: Владелец поврежденного автомобиля имеет право требовать от вас возмещения ущерба. Это может быть сделано через страховую компанию, если у вас есть полис ОСАГО, или через суд, если страхования нет.

**Рекомендации**:
- Оставайтесь на месте происшествия и вызовите полицию.
- Зафиксируйте все обстоятельства ДТП.
- Обратитесь в свою страховую компанию для урегулирования вопроса о возмещении ущерба.
- Если у вас нет страховки, будьте готовы к возмещению ущерба в судебном порядке.

**Использованные документы**:
- Кодекс Российской Федерации об административных правонарушениях (КоАП РФ)
- Правила дорожного движения Российской Федерации (ПДД РФ)
- Статья 12.24 КоАП РФ
- Пункт 2.5 ПДД РФ

In [21]:
response.legal_docs

['Кодекс Российской Федерации об административных правонарушениях (КоАП РФ)',
 'Правила дорожного движения Российской Федерации (ПДД РФ)',
 'Статья 12.24 КоАП РФ',
 'Пункт 2.5 ПДД РФ']