In [None]:
import pandas as pd
import numpy as np
import requests
from dotenv import load_dotenv
import os
from openai import OpenAI
import json 

In [None]:
from IPython.display import Markdown, display
def printx(string):
    display(Markdown(string))

In [None]:
# configs
load_dotenv()
api_key = os.getenv('YANDEX_API_KEY')
folder_id = os.getenv('FOLDER_ID')

### Определение класса агента

Данный класс способен принимать на вход различные инструменты в виде python функций, а также создавать и пополнять векторную базу данных для настройки RAG.

In [None]:
import io

class Agent():

    def __init__(self,
            name,
            instruction, 
            tools = [], search_content = [], 
            model = model,
            response_format = None
            ):
        self.user_sessions = {}
        self.name = name
        self.instruction = instruction
        self.model = model
        self.tool_map = { x.__name__ : x for x in tools if issubclass(x, BaseModel) }
        self.tools = [
            self._create_tool_annot(x) for x in tools
        ]
        self.response_format = response_format
        self.vector_store = None
        if search_content:
            i=0
            self.vector_store = client.vector_stores.create(name=f'rag_store_{self.name}')
            for c in search_content:
                f = client.files.create(
                        purpose="assistants",
                        file = (f'rag_{self.name}_{i}.txt',io.BytesIO(c.encode("utf-8")),'text/markdown'))
                client.vector_stores.files.create(file_id=f.id, vector_store_id=self.vector_store.id)
                print(f" + Uploading rag_{self.name}_{i}.txt as id={f.id} to store={self.vector_store.id}")
                i+=1
            self.tools.append({
                "type" : "file_search",
                "vector_store_ids" : [self.vector_store.id],
                "max_num_results" : 5,
            })
            
    def _create_tool_annot(self, x):
        if issubclass(x, BaseModel):
            return {
                "type": "function",
                "name": x.__name__,
                "description": x.__doc__,
                "parameters": x.model_json_schema(),
            }
        else:
            return x

    def __call__(self, message, session_id='default',return_raw=False):
        s = self.user_sessions.get(session_id,{ 'previous_response_id' : None, 'history' : [] })
        s['history'].append({ 'role': 'user', 'content': message })
        txt = None
        if self.response_format:
            txt = {
                "format" : {
                    "type" : "json_schema",
                    "name" : "struct_out",
                    "schema" : self.response_format.model_json_schema()
                }
            }
        res = client.responses.create(
            model = self.model,
            store = True,
            tools = self.tools,
            instructions = self.instruction,
            previous_response_id = s['previous_response_id'],
            input = message,
            text = txt
        )
        # Обрабатываем вызов локальных инструментов
        tool_calls = [item for item in res.output if item.type == "function_call"]
        if tool_calls:
            s['history'].append({ 'role' : 'func_call', 'content' : res.output_text })
            out = []
            for call in tool_calls:
                print(f" + Обрабатываем: {call.name} ({call.arguments})")
                try:
                    fn = self.tool_map[call.name]
                    obj = fn.model_validate(json.loads(call.arguments))
                    result = obj.process(session_id)
                except Exception as e:
                    result = f"Ошибка: {e}"
                #print(f" + Результат: {result}")
                out.append({
                    "type": "function_call_output",
                    "call_id": call.call_id,
                    "output": result
                })
                res = client.responses.create(
                    model=self.model,
                    input=out,
                    tools=self.tools,
                    previous_response_id=res.id,
                    store=True
                )
        # MCP Approval Requests
        mcp_approve = [ item for item in res.output if item.type == "mcp_approval_request"]
        if mcp_approve:
            res = client.responses.create(
                model=self.model,
                previous_response_id=res.id,
                tools = self.tools,
                input=[{
                    "type": "mcp_approval_response",
                    "approve": True,
                    "approval_request_id": m.id
                }
                for m in mcp_approve
                ])
        s['previous_response_id'] = res.id
        s['history'].append({ 'role' : 'assistant', 'content' : res.output_text })
        self.user_sessions[session_id] = s
        if return_raw:
            return res
        if self.response_format:
            return self.response_format.model_validate_json(res.output_text)
        else:
            return res.output_text

    def history(self, session_id='default'):
        return self.user_sessions[session_id]['history']

### Агент-консультант СЛФ

В примере будет использоваться каталог СЛФ для создания агента, который способен ориентироваться в каталоге. Вместо файла каталога можно использоваться другие документы: нормативные акты, учебники, датасеты

In [None]:
# сохраняем файл, который будем использовать для RAG, в формате markdown
catalog = pd.read_parquet('./nomen.parquet')
catalog.to_markdown('nomen.md')

In [None]:
# здесь указывается путь к файлу с каталогом в формате markdown
with open("./nomen.md", encoding="utf-8") as f:
    food_wine = f.readlines()
header = food_wine[:2]
chunk_size = 1000 * 3  # approx 1000 tokens * 3 char/token
docs = []
s = header.copy()
for x in food_wine[2:]:
    s.append(x)
    if len("".join(s)) > chunk_size:
        docs.append("".join(s))
        s = header.copy()

In [None]:
prompt = f"""
Ты - консультант на складе поставщика продуктов для сегмента HoReCa, который должен советовать клиентам продукты и отвечать 
на вопросы по ассортименту продуктов, используя имеющуюся у тебя информацию. Если в каталоге нет указанного продукта
 - напиши, что продукт отсутствует в каталоге. Не придумывай ничего!
"""

consultant = Agent(
    'consultant',
    search_content=docs,
    instruction=prompt) 

In [None]:
printx(consultant("Что ты умеешь?"))

In [None]:
printx(consultant("Посоветуй продукты из каталога, которые можно использовать для приготовления блюда 'Борщ'"))