In [44]:
import os
from typing import (
    List,
    Any,
    AsyncGenerator,
    Coroutine,
    Tuple,
    Optional,
    TypedDict,
    Annotated,
    Literal,
    Union,
)
from IPython.display import display, Image

from dotenv import load_dotenv
from langchain_core.messages import AIMessage, SystemMessage, BaseMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langchain_core.tools import StructuredTool, render_text_description
from langchain_community.tools import TavilySearchResults
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import Row, RowMapping
from langgraph.graph import StateGraph, START, END

from agents.consts import (
    GUIDANCE_PROMPT,
    DISCREPANCY_TEMPLATE,
    SUPERVISOR_TEMPLATE,
    FEEDBACK_TEMPLATE,
    GUIDANCE_TEMPLATE,
)
from agents.llm_callback import CustomLlmTrackerCallback
from db.db import get_session
from db.models import Grade, UserSkills, User, Skill
from service.service import BaseService
from utils.common import convert_agent_msg_to_llm_message

In [45]:
load_dotenv()

search = TavilySearchResults()

LITE_LLM_API_KEY = os.getenv("OPENAI_API_KEY")
LITE_LLM_URL = os.getenv("OPENAI_BASE_URL")
LITE_MODEL = os.getenv("OPENAI_MODEL")

custom_callback = CustomLlmTrackerCallback("guidance")

In [46]:
from agents.dto import AgentMessage, ChatMessage
import operator
from langgraph.graph import add_messages


async def find_current_grade_for_user_and_skill(
    user_id: int, skill_id: int
) -> UserSkills:
    """
    Utilize to find current expertise and grading level with user id and skill_id
    :param user_id: users id
    :param skill_id: skill id
    :return:
    """
    async for session in get_session():
        user_skill_service: BaseService[UserSkills, int, Any, Any] = BaseService(
            UserSkills, session
        )
        filters = {
            "user_id": user_id,
            "skill_id": skill_id,
        }
        user_skill = await user_skill_service.list_all(filters=filters)
        if len(user_skill) == 0:
            raise Exception(f"No user_skills found for user_id {user_id}")
        single_user_skill = user_skill[0]
        await single_user_skill.awaitable_attrs.user
        await single_user_skill.awaitable_attrs.skill
        await single_user_skill.awaitable_attrs.grade
        return single_user_skill


class DiscrepancyValues(BaseModel):
    grade_id: int
    skill_id: int
    user_id: int


class GuidanceValue(BaseModel):
    messages: Annotated[list, add_messages]


class SupervisorState(TypedDict):
    discrepancy: DiscrepancyValues
    guidance: GuidanceValue
    next_steps: Annotated[List[str], operator.add]
    messages: Annotated[List[AgentMessage], operator.add]
    chat_messages: Annotated[List[ChatMessage], operator.add]


async def discrepancy_agent(state: SupervisorState) -> SupervisorState:
    """
    Discrepancy agent that resolves the discrepancies and explains
    the differences between the grades from saved and now provided stated
    :return: None
    """
    discrepancy_callback = CustomLlmTrackerCallback("discrepancy_agent")
    tools = [
        StructuredTool.from_function(
            function=find_current_grade_for_user_and_skill,
            coroutine=find_current_grade_for_user_and_skill,
        ),
        StructuredTool.from_function(
            function=get_grades_or_expertise,
            coroutine=get_grades_or_expertise,
        ),
    ]
    model = ChatOpenAI(
        temperature=0,
        max_tokens=300,
        model=LITE_MODEL,
        api_key=LITE_LLM_API_KEY,
        base_url=LITE_LLM_URL,
        streaming=True,
        verbose=True,
        callbacks=[discrepancy_callback],
    )

    prompt_template = ChatPromptTemplate.from_template(DISCREPANCY_TEMPLATE)
    prompt = await prompt_template.ainvoke(
        input={
            "user_id": state["discrepancy"].user_id,
            "skill_id": state["discrepancy"].skill_id,
            "current_grade": state["discrepancy"].grade_id,
        }
    )
    print(f"\n\nDISCREPANCY AGAIN PROMPT\n {prompt}")
    agent = create_react_agent(model=model, tools=tools)
    response = await agent.ainvoke(prompt)
    print(f"\n\nDISCREPANCY AGAIN RESPONSE\n {response}")
    msg = []
    if "messages" in response and len(response["messages"]) > 0:
        response_msgs = response["messages"][-1]
        msg = [
            AgentMessage(
                message=response_msgs,
                role="discrepancy",
            )
        ]
    return {
        "discrepancy": state["discrepancy"],
        "guidance": state["guidance"],
        "next_steps": state["next_steps"],
        "messages": msg,
        "chat_messages": state["chat_messages"],
    }

In [47]:
import re
from langgraph.types import interrupt


async def supervisor_agent(state: SupervisorState) -> SupervisorState:
    prompt_template = ChatPromptTemplate.from_template(SUPERVISOR_TEMPLATE)
    msgs = []
    for msg in state["chat_messages"]:
        if msg["role"] == "human":
            answer = f"Answer: {msg["message"]}"
            msgs.append(answer)
        elif msg["role"] == "ai":
            question = f"Question: {msg["message"]}"
            msgs.append(question)
    prompt_msgs = "\n".join(msgs)
    scratchpad_msgs = [m.message.content for m in state["messages"]]
    scratchpad_msgs_str = "\n".join(scratchpad_msgs)
    prompt = await prompt_template.ainvoke(
        {"discussion": prompt_msgs, "agent_scratchpad": scratchpad_msgs_str}
    )
    print(f"\n\nSUPERVISOR AGENT PROMPT\n {prompt}")

    model = ChatOpenAI(
        temperature=0,
        max_tokens=300,
        model=LITE_MODEL,
        api_key=LITE_LLM_API_KEY,
        base_url=LITE_LLM_URL,
        streaming=True,
        verbose=True,
        stop=["\nObserve:"],
    )
    response = await model.ainvoke(prompt)
    print(f"\n\nSUPERVISOR AGENT RESPONSE\n {response}")
    content = response.content
    next_steps = []
    match = re.search(r"\nCall: (discrepancy|guidance|feedback|grading)", content)

    if match:
        print("\n\n\nIS MATCHING THIS\n\n\n")
        for val in match.groups():
            next_steps.append(val)
    else:
        next_steps.append("finish")
        print("\n\n\nFINISHHHHHHH\n\n\n")

    msg = AgentMessage(
        message=response,
        role="supervisor",
    )

    return {
        "discrepancy": state["discrepancy"],
        "guidance": state["guidance"],
        "next_steps": next_steps,
        "messages": [msg],
        "chat_messages": state["chat_messages"],
    }


async def grading_agent(state: SupervisorState) -> SupervisorState:
    system_msg = """
        Based on the provided discussion your job is to confirm the level of expertise of the user!
        If you are not sure that the grade or expertise is clearly recognizable please let the the user know
        If you're certain state explicitly which expertise is correct for the user!

        Discussion:
        {discussion}
        """
    prompt_template = ChatPromptTemplate.from_template(system_msg)
    msgs = []
    for msg in state["chat_messages"]:
        if msg["role"] == "human":
            answer = f"Answer: {msg["message"]}"
            msgs.append(answer)
        elif msg["role"] == "ai":
            question = f"Question: {msg["message"]}"
            msgs.append(question)

    prompt = await prompt_template.ainvoke(input={"discussion": "\n".join(msgs)})
    model = ChatOpenAI(
        temperature=0,
        max_tokens=300,
        model=LITE_MODEL,
        api_key=LITE_LLM_API_KEY,
        base_url=LITE_LLM_URL,
        streaming=True,
        verbose=True,
    )
    response = await model.ainvoke(prompt)
    print(f"\n\nGRADING AGENT RESPONSE\n {response}")
    msg = AgentMessage(message=response, role="grade")
    return {
        "discrepancy": state["discrepancy"],
        "guidance": state["guidance"],
        "next_steps": state["next_steps"],
        "messages": [msg],
        "chat_messages": state["chat_messages"],
    }


async def evasion_detector_agent(state: SupervisorState) -> SupervisorState:
    prompt_template = ChatPromptTemplate.from_template(
        """
        From provided discussion, check whether the user is evading to answer a provided question?

        Discussion:
        {}

        Respond in the following format:
        Observe: Your answer
        """
    )
    return state


async def feedback_agent(state: SupervisorState) -> SupervisorState:
    model = ChatOpenAI(
        temperature=0,
        max_tokens=300,
        model=LITE_MODEL,
        api_key=LITE_LLM_API_KEY,
        base_url=LITE_LLM_URL,
        streaming=True,
        verbose=True,
    )
    msgs = convert_agent_msg_to_llm_message(state["messages"])
    print(f"\n\nFEEDBACK AGENT PROMPT\n {msgs}")
    print(f"\n\nFEEDBACK AGENT PROMPT\n {state['chat_messages'][-1]}")
    prompt_template = ChatPromptTemplate.from_messages(
        [SystemMessage(FEEDBACK_TEMPLATE)] + msgs
    )
    prompt = await prompt_template.ainvoke(input={})
    feedback_response = await model.ainvoke(prompt)

    interrupt_val = {
        "answer_to_revisit": "Please provide additional feedback",
    }
    print(f"\n\nFEEDBACK AGENT\n {interrupt_val}")
    print(f"\n\nFEEDBACK RESPONSE\n {feedback_response}")
    value = interrupt(
        interrupt_val,
    )

    return {
        "discrepancy": state["discrepancy"],
        "guidance": state["guidance"],
        "next_steps": state["next_steps"],
        "messages": [interrupt_val],
        "chat_messages": [feedback_response],
    }


async def guidance_agent(state: SupervisorState) -> SupervisorState:
    print("\n\n\nENTERING GUIDANCE\n\n\n")
    tools = [search]
    template = ChatPromptTemplate.from_template(GUIDANCE_TEMPLATE)
    prompt = await template.ainvoke(
        input={
            "tools": render_text_description(tools),
            "context": state["chat_messages"][0]["message"],
            "answer": state["chat_messages"][-1]["message"],
        }
    )
    model = ChatOpenAI(
        temperature=0,
        max_tokens=300,
        model=LITE_MODEL,
        api_key=LITE_LLM_API_KEY,
        base_url=LITE_LLM_URL,
        streaming=True,
        verbose=True,
    )
    print(f"\n\nGUIDANCE AGENT PROMPT\n {prompt}")
    agent = create_react_agent(model=model, tools=tools)
    agent_response = await agent.ainvoke(prompt)
    print(f"\n\nGUIDANCE AGENT RESPONSE\n {agent_response}")
    msg = AgentMessage(
        message=agent_response["messages"][-1],
        role="guidance",
    )
    return {
        "discrepancy": state["discrepancy"],
        "guidance": state["guidance"],
        "next_steps": state["next_steps"],
        "messages": [msg],
        "chat_messages": state["chat_messages"],
    }


async def finish(state: SupervisorState) -> SupervisorState:
    return state


async def next_step(
    state: SupervisorState,
) -> Literal["guidance", "feedback", "discrepancy", "finish", "grading"]:
    if len(state["next_steps"]) > 0:
        if state["next_steps"][-1] == "guidance":
            return "guidance"
        elif state["next_steps"][-1] == "discrepancy":
            return "discrepancy"
        elif state["next_steps"][-1] == "feedback":
            return "feedback"
        elif state["next_steps"][-1] == "grading":
            return "grading"
    return "finish"

In [48]:
class GuidanceHelperStdOutput(BaseModel):
    has_user_answered: bool = Field(
        description="Whether the user has correctly answered the topic at hand"
    )
    expertise_level: str = Field(
        description="The expertise user has self evaluated himself with"
    )
    expertise_id: int = Field(description="The expertise or grade ID")
    is_more_categories_answered: bool = Field(
        description="if multiple categories have been selected", default=False
    )
    should_admin_be_involved: bool = Field(
        description="Whether the admin should be involved if user is evading the topic or fooling around"
    )
    message: str = Field(description="Message to send to the user")


async def get_grades_or_expertise() -> List[Grade]:
    """
    Useful tool to retrieve current grades or expertise level grading system
    :return: List of json representing those grades and all their fields
    """
    async for session in get_session():
        service: BaseService[Grade, int, Any, Any] = BaseService(Grade, session)
        all_db_grades = await service.list_all()
        all_grades_json: List[str] = []
        for grade in all_db_grades:
            json_grade = grade.model_dump_json()
            all_grades_json.append(json_grade)
        return all_grades_json


async def get_current_grade_for_user(
    skill_id: int, user_id: int
) -> None | Row[Any] | RowMapping | Any:
    """
    Useful tool to expertise level or grade for specific skill for a user
    :param skill_id: id of the skill user is looking (ex. Java Development, DevOPS etc...)
    :param user_id: id of the user
    :return: UserSkill explaining the expertise level for specific user and skill
    """
    async for session in get_session():
        service: BaseService[UserSkills, int, Any, Any] = BaseService(
            UserSkills, session
        )
        filters = {
            "skill_id": skill_id,
            "user_id": user_id,
        }
        proper_skill = await service.list_all(filters=filters)
        if len(proper_skill) == 0:
            return None
        return proper_skill[0]


async def is_valid_response_for_guidance(
    chunk: BaseMessage,
) -> Tuple[bool, Optional[AIMessage]]:
    if "agent" in chunk and "messages" in chunk["agent"]:
        msg_content = chunk["agent"]["messages"][-1]
        if isinstance(msg_content, AIMessage) and msg_content.content != "":
            return True, msg_content
    return False, None


async def strip_unnecessary_chars(llm_str: AIMessage) -> str:
    content = llm_str.content
    content = content.replace("```json", "").replace("```", "")
    return content


async def provide_guidance(
    msgs: List[str],
    user: User,
    skill: Skill,
) -> AsyncGenerator[GuidanceHelperStdOutput, Any]:
    model = ChatOpenAI(
        temperature=0,
        model=LITE_MODEL,
        api_key=LITE_LLM_API_KEY,
        base_url=LITE_LLM_URL,
        streaming=True,
        verbose=True,
        callbacks=[custom_callback],
    )
    tools = [
        StructuredTool.from_function(
            function=get_grades_or_expertise,
            coroutine=get_grades_or_expertise,
        ),
        StructuredTool.from_function(
            function=get_current_grade_for_user,
            coroutine=get_current_grade_for_user,
        ),
    ]
    intermediate_steps = []

    system_msg = GUIDANCE_PROMPT
    agent = create_react_agent(model=model, tools=tools)
    async for chunk in agent.astream(
        {
            "messages": [SystemMessage(system_msg)] + msgs,
            "tools": render_text_description(tools),
            "context": msgs[0],
            "intermediate_steps": intermediate_steps,
            "user": user,
            "skill": skill,
        }
    ):
        print("PROVIDE FEEDBACK", chunk)
        (is_valid, msg_content) = is_valid_response_for_guidance(chunk)
        if is_valid:
            content = strip_unnecessary_chars(msg_content)
            try:
                ch = GuidanceHelperStdOutput.model_validate_json(content)
                yield ch
            except ValidationError:
                yield GuidanceHelperStdOutput(
                    has_user_answered=False,
                    expertise_level="",
                    expertise_id=0,
                    should_admin_be_involved=False,
                    message=content,
                )

In [60]:
state_graph = StateGraph(SupervisorState)

state_graph.add_node("supervisor", supervisor_agent)
state_graph.add_node("discrepancy", discrepancy_agent)
state_graph.add_node("guidance", guidance_agent)
state_graph.add_node("feedback", feedback_agent)
state_graph.add_node("grading", grading_agent)
state_graph.add_node("finish", finish)
state_graph.add_edge(START, "supervisor")
state_graph.add_conditional_edges("supervisor", next_step)
state_graph.add_edge("discrepancy", "supervisor")
state_graph.add_edge("guidance", "supervisor")
state_graph.add_edge("grading", "supervisor")
state_graph.add_edge("feedback", "supervisor")
state_graph.add_edge("finish", END)

graph = state_graph.compile()


state_vals = SupervisorState(
    discrepancy=DiscrepancyValues(skill_id=1, user_id=1, grade_id=7),
    guidance=GuidanceValue(messages=[]),
    next_steps=[],
    messages=[],
    chat_messages=[
        {
            "message": """
                Expertise Levels in Backup and Recovery
                Welcome! In this discussion, we will explore the various expertise levels related to the skill of Backup and Recovery. Understanding these levels will help you select the appropriate expertise that aligns with your current knowledge and experience.

                Here are the available expertise grades:

                Not Informed: You have no prior knowledge of the topic.
                Informed Basics: You understand the basic concepts of Backup and Recovery.
                Informed in Details: You have a deeper understanding of the strategies involved.
                Practice and Lab Examples: You can apply your knowledge through practical examples and lab work.
                Production Maintenance: You are capable of maintaining backup systems in a production environment.
                Production from Scratch: You can set up backup systems from the ground up.
                Educator/Expert: You possess extensive knowledge and can teach others about Backup and Recovery.
                Consider your current level of understanding and experience to choose the expertise that best fits you. This will enhance your learning and engagement in our discussion!
                """,
            "role": "ai",
        },
        {
            "message": """
                Hmmm what are you referring when talking about backup and recovery
                """,
            "role": "human",
        },
    ],
)

async for chunk in graph.astream(state_vals):
    print(chunk)



SUPERVISOR AGENT PROMPT
 messages=[HumanMessage(content="\n        You are supervising multiple agents doing their job. You distribute the tasks to them to solve the problem stated in the discussion (Question and Answer).\n        When the user has clearly identified itself with specific grade or expertise level provide a Finish Answer!\n        The identification or clarification of expertise level must be only for one grade, the user must not be between two or more grades or expertise level.\n        It has to be precisely one expertise level!\n        While this identification is not yet clear, always ask user for additional feedback!\n        Utilize grading and experience levels provided in the first question, do not utilize any other under no circumstances!\n        Stay on the topic of the discussion, warn the user if the topic is diverging!\n        Always validate and check discrepancies about users experience/grade level when you find out which grade is it!\n        You hav

In [None]:
response = await guidance_agent(state_vals)
print(response)

In [None]:
display(Image(graph.get_graph().draw_mermaid_png()))