# Yumi Chatbot R&D


In [None]:
import os
from typing import Any, Dict

import tiktoken
from dotenv import load_dotenv
from langchain.schema.output_parser import StrOutputParser
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import (
    BaseChatMessageHistory,
    InMemoryChatMessageHistory,
)
from langchain_core.globals import set_debug
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import (
    RunnableLambda,
    RunnablePassthrough,
    RunnableSerializable,
)
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_mistralai import ChatMistralAI
from langchain_openai import ChatOpenAI

from prompts.gen_prompts import GENERAL_PROMPT, RAG_PROMPT
from rag_pinecone import basic_retriever

set_debug(True)
load_dotenv()

## LLM Settings


In [None]:
google_api_key = os.getenv("GOOGLE_API_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")
mistralai_api_key = os.getenv("MISTRALAI_API_KEY")

gemini_llm = ChatGoogleGenerativeAI(
    google_api_key=f"{google_api_key}",
    model="gemini-pro",
)  # Type: Ignore
mistral_llm = ChatMistralAI(model="mistral-large-latest")
openai_llm = ChatOpenAI(openai_api_key=openai_api_key, model="gpt-4o")

## Memory


In [None]:
memory_store = {}

In [None]:
def summarize_memory(
    stored_session: InMemoryChatMessageHistory,
) -> InMemoryChatMessageHistory:
    summarization_prompt = ChatPromptTemplate.from_messages(
        [
            MessagesPlaceholder(variable_name="history"),
            (
                "user",
                "The mesages above are from an AI/Human chat session. You need to distill the above chat messages into a single summary message. Include as many specific details as you can. But be sure that it is done in a way that is concise and easy to understand as it will be used to summarize the chat history and used as reference later by the AI",
            ),
        ]
    )
    summarization_chain = (summarization_prompt | gemini_llm).with_config(
        config={"run_name": "sumarize_memory"}
    )
    summary_message = summarization_chain.invoke({"history": stored_session.messages})
    stored_session.clear()
    stored_session.add_message(summary_message)
    return stored_session

In [None]:
def check_memory_token_size(messages: BaseChatMessageHistory) -> bool:
    encoding = tiktoken.get_encoding("cl100k_base")
    count = []
    for message in messages:
        token_count = len(encoding.encode(message.content))
        count.append(token_count)
    total_tokens = sum(count)
    if total_tokens > 100:
        return True
    else:
        return False


def get_session_history(session_id: str) -> BaseChatMessageHistory:
    if session_id not in memory_store:
        memory_store[session_id] = ChatMessageHistory()
        return memory_store[session_id]
    stored_session: InMemoryChatMessageHistory = memory_store[session_id]
    if len(stored_session.messages) > 6:
        if check_memory_token_size(stored_session.messages):
            return summarize_memory(stored_session)
    return stored_session

## Basic Runnables


In [None]:
def baisc_conversation(
    query: Dict[str, Any], config: Dict[str, Dict[str, Any]] = None
) -> RunnableWithMessageHistory:
    basic_convo = GENERAL_PROMPT | openai_llm | StrOutputParser()
    with_message_history = RunnableWithMessageHistory(
        basic_convo,
        get_session_history,
        input_messages_key="query",
        history_messages_key="history",
    )
    return with_message_history.invoke(query, config)


baisc_conversation(
    {
        "query": "Can you kindly list all the cities we spoke about?",
    },
    config={"configurable": {"session_id": "def234"}},
)

In [None]:
print(len(memory_store["def234"].messages))
print(check_memory_token_size(memory_store["def234"].messages))
print(memory_store["def234"].messages)

## Rag Testing


In [None]:
# docs = await basic_retriever(query="Who is Alis?")
# print(len(docs))
# print(docs[0].page_content)

In [None]:
retriever_runnable = RunnableLambda(basic_retriever)
basic_convo = RAG_PROMPT | openai_llm | StrOutputParser()

with_message_history = RunnableWithMessageHistory(
    basic_convo,
    get_session_history,
    input_messages_key="query",
    history_messages_key="history",
)
query = "Who is Alis?"
chain = (
    {"context": retriever_runnable, "query": RunnablePassthrough()}
    | with_message_history
    | StrOutputParser()
)

await chain.ainvoke("Who is Alis?", config={"configurable": {"session_id": "def234"}})

In [None]:
async def basic_rag_conversation(
    query: str, config: Dict[str, Dict[str, Any]]
) -> RunnableSerializable:
    basic_convo = RAG_PROMPT | openai_llm | StrOutputParser()
    with_message_history = RunnableWithMessageHistory(
        basic_convo,
        get_session_history,
        input_messages_key="query",
        history_messages_key="history",
    )
    retriever_runnable = RunnableLambda(basic_retriever)
    chain = (
        {"context": retriever_runnable, "query": RunnablePassthrough()}
        | with_message_history
        | StrOutputParser()
    )
    response = await chain.ainvoke(query, config)
    return response


response = basic_rag_conversation(
    "Who is Alis?", {"configurable": {"session_id": "def234"}}
)

print(await response)

## Agent Testing


In [None]:
from langchain.tools.retriever import create_retriever_tool
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langgraph.prebuilt import chat_agent_executor

from llm import baisc_conversation, basic_rag_conversation, get_weather

In [None]:
retriever_tool = create_retriever_tool(
    basic_rag_conversation,
    "Basic_Rag_Retriever",
    "A tool used for basic retrival of PDF documents from a Vectorstore in Pincone.",
)
weather_tool = create_retriever_tool(
    get_weather,
    "Weather_Tool",
    "A tool used to get the weather for a specific location.",
)

tools = [retriever_tool, weather_tool]

# model_with_tools = openai_llm.bind_tools(tools)
# response = model_with_tools.invoke(
#     [HumanMessage(content="According to the document, who is Alis Landale?")]
# )
# response.content
agent_executor = chat_agent_executor.create_tool_calling_executor(openai_llm, tools)
response = await agent_executor.ainvoke(
    {"messages": [HumanMessage(content="What is the weather in New York?")]},
    config={"configurable": {"session_id": "abc123"}},
)

response["messages"]