# CoT-Structed_Output-Checklist-RAG

In [1]:
%pip install -q chromadb openai pydantic python-dotenv sentence_transformers

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
from typing import List, Literal, Optional

import chromadb
import orjson
from chromadb.utils import embedding_functions
from dotenv import load_dotenv
from openai import AsyncOpenAI
from pydantic import BaseModel, Field, ValidationError

# Загрузка переменных окружения
load_dotenv()

# Настройка клиента Chroma
chroma_client = await chromadb.AsyncHttpClient(
    host=os.getenv("CHROMA_HOST"), port=os.getenv("CHROMA_PORT")
)

# Настройка клиента vLLM
vllm_client = AsyncOpenAI(
    base_url=os.getenv("VLLM_BASE_URL"), api_key=os.getenv("VLLM_API_KEY")
)

# Создание функции эмбеддинга
embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name=os.getenv("EMBEDDING_MODEL_NAME")
)

  from tqdm.autonotebook import tqdm, trange


In [3]:
# Модели для структурированного вывода
class RetrievalStrategy(BaseModel):
    query_type: Literal["semantic", "keyword", "hybrid"] = Field(
        ..., description="Тип поиска для извлечения документов"
    )
    top_k: int = Field(..., description="Количество документов для извлечения")
    filter_criteria: Optional[dict] = Field(
        None, description="Критерии фильтрации документов"
    )


class AnswerChecklist(BaseModel):
    is_question_understood: bool = Field(
        ..., description="Понятен ли вопрос пользователя?"
    )
    are_documents_relevant: bool = Field(
        ..., description="Релевантны ли найденные документы?"
    )
    is_answer_complete: bool = Field(
        ..., description="Полон ли ответ на вопрос?"
    )
    are_sources_cited: bool = Field(
        ..., description="Указаны ли источники информации?"
    )


class ResponseModel(BaseModel):
    retrieval_strategy: RetrievalStrategy
    answer_checklist: AnswerChecklist
    final_answer: str
    sources: List[str]


async def get_chroma_collection():
    return await chroma_client.get_or_create_collection(
        name=os.getenv("CHROMA_COLLECTION_NAME"),
        embedding_function=embedding_function,
    )


async def retrieve_documents(
    query: str, strategy: RetrievalStrategy
) -> List[dict]:
    collection = await get_chroma_collection()
    results = await collection.query(
        query_texts=[query],
        n_results=strategy.top_k,
        where=strategy.filter_criteria,
    )
    return [
        {"id": id, "content": doc, "metadata": meta}
        for id, doc, meta in zip(
            results["ids"][0], results["documents"][0], results["metadatas"][0]
        )
    ]


async def generate_response(query: str, documents: List[dict]) -> ResponseModel:
    context = "\n".join(
        [f"Document {i+1}: {doc['content']}" for i, doc in enumerate(documents)]
    )

    prompt = f"""Вопрос пользователя: {query}

Контекст из документов:
{context}

Пожалуйста, сформируйте ответ на вопрос пользователя, используя предоставленный контекст.
Ваш ответ должен быть строго в формате JSON и соответствовать следующей структуре:

{{
    "retrieval_strategy": {{
        "query_type": "semantic" | "keyword" | "hybrid",
        "top_k": <число>,
        "filter_criteria": <объект или null>
    }},
    "answer_checklist": {{
        "is_question_understood": true | false,
        "are_documents_relevant": true | false,
        "is_answer_complete": true | false,
        "are_sources_cited": true | false
    }},
    "final_answer": "<текст ответа>",
    "sources": ["<номер документа>", ...]
}}

Убедитесь, что все поля заполнены корректно и соответствуют типам данных."""

    response = await vllm_client.chat.completions.create(
        model="Qwen/Qwen2.5-32B-Instruct",
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"},
    )

    try:
        response_data = orjson.loads(response.choices[0].message.content)
        return ResponseModel(**response_data)
    except (orjson.JSONDecodeError, ValidationError) as e:
        print(f"Ошибка при обработке ответа модели: {e}")
        print(f"Содержимое ответа: {response.choices[0].message.content}")
        # Возвращаем заглушку в случае ошибки
        return ResponseModel(
            retrieval_strategy=RetrievalStrategy(query_type="semantic", top_k=5),
            answer_checklist=AnswerChecklist(
                is_question_understood=False,
                are_documents_relevant=False,
                is_answer_complete=False,
                are_sources_cited=False
            ),
            final_answer="Извините, произошла ошибка при обработке ответа.",
            sources=[]
        )


async def rag_assistant(query: str) -> str:
    initial_strategy = RetrievalStrategy(query_type="semantic", top_k=5)
    documents = await retrieve_documents(query, initial_strategy)
    response = await generate_response(query, documents)

    final_response = f"""Ответ: {response.final_answer}

Источники информации: {', '.join(response.sources)}

Стратегия поиска: {response.retrieval_strategy.query_type}, top_k={response.retrieval_strategy.top_k}
Чек-лист:
- Вопрос понят: {'Да' if response.answer_checklist.is_question_understood else 'Нет'}
- Документы релевантны: {'Да' if response.answer_checklist.are_documents_relevant else 'Нет'}
- Ответ полный: {'Да' if response.answer_checklist.is_answer_complete else 'Нет'}
- Источники указаны: {'Да' if response.answer_checklist.are_sources_cited else 'Нет'}"""

    return final_response

In [5]:
query = "Расскажи про занятие номер 2, тип лекция, что за тема занятия?"
answer = await rag_assistant(query)
print(answer)

CancelledError: 