### import libraries and set variables

In [1]:
import os
import base64
import requests
from openai import OpenAI
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import (
    BaseMessage,
    HumanMessage,
    ToolMessage,
)
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

from langgraph.graph import END, StateGraph, START
from langchain_core.tools import tool
import functools
from langchain_core.messages import AIMessage
import operator
from typing import Sequence , List
from langchain_openai import ChatOpenAI
import json
from langchain_core.messages import ToolMessage

from langgraph.checkpoint import *
from langgraph.checkpoint.base import empty_checkpoint

from langgraph.checkpoint.sqlite import SqliteSaver

import re

os.environ["LANGSMITH_API_KEY"] = "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"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "tutor agents"
os.environ["OPENAI_API_KEY"] = "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"
GPT_MODEL = "gpt-4-turbo"
client = OpenAI()
llm = ChatOpenAI(model=GPT_MODEL)

### Memory Utility Function

In [2]:
def clear_memory(memory: BaseCheckpointSaver, thread_id: str) -> None:
    checkpoint = empty_checkpoint()
    memory.put(config={"configurable": {"thread_id": thread_id}}, checkpoint=checkpoint, metadata={})

memory = SqliteSaver.from_conn_string("checkpoints.sqlite")

In [8]:
''' global list '''
#! very important list
global_prompts_list =[]
agent_activation_order = []
assert len(global_prompts_list) == len(agent_activation_order), 'should have as many topics as tutor lessons!'

### Create necessary function for agent creation

In [3]:
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    sender: str

In [4]:
# Helper function to create a node for a given agent
def agent_node(state, agent, name):
    result = agent.invoke(state)
    # We convert the agent output into a format that is suitable to append to the global state
    if isinstance(result, ToolMessage):
        pass
    else:
        result = AIMessage(**result.dict(exclude={"type", "name"}), name=name)
    return {
        "messages": [result],
        # Since we have a strict workflow, we can
        # track the sender so we know who to pass to next.
        "sender": name,
    }

In [5]:
def set_agent_prompt(agentName: str, tools) -> str:
    ''' function to determine custom prompt based on the agent type (only 3 atm)
    need to add tools available to prompts
    '''
    try:
        if agentName == 'tracker':
            return (
                "system",
                "You are the tracker,"
                "Your role is to supervise the role of the tutoring agents."
                "1. call your start_signal tool BEFORE ANYTHING YOU DO"
                "2. IF start_signal responds with no_more_lessons, then"
                #"you NEED to call the start_signal tool"
                " OUTPUT ONLY kill_process"
                f"your tools are {[tool.name for tool in tools]}"
            )
        elif agentName == 'orchestrator':
            return ( #! change this because we will have a different tool to write the output of the llm in lists
                "system",
                "your are the orchestrator"
                "please state your role clearly"
                "steps to follow for your task:"
                "1. retrieve the content using your tool getChunks "
                "2. do not analyse the content"
                "once your tool has been called output 'continue'"
                # "-GROUP BY kapitel"
                # "-then GROUP BY thema"
                # "-then GROUP BY kategorie"
                # "-for each group YOU MUSY CHOOSE the agent that is most present"
                # "-KEEP THE ORDER of the contents"
                # "STRUCTURE of ouput:"
                # "1. list of Agent names (your options are: conversational, reader, listener, questionAnswering and GrammarSummary)"
                # "2. list raw content of ONLY the inhalt of the group"
                # "template to use as an example:"
                # "['conversational', 'reader', ...]"
                # "['raw content for conversational', 'raw content for reader',...]"
                # "OUTPUT ONLY THE TWO LISTS"
                f"your tools are {[tool.name for tool in tools]}"
            )
        elif agentName == 'communicator':
            return (
                "system",
                "you are the communicator"
                "you communicate in a friendly way with the user"
                "your role is to give user recommendations about learning"
                "based on the user's profile and the user progress" #there will be a function to access this
                "Please state your role clearly"
                f"your tools are {[tool.name for tool in tools]}"
            )
    except:
        raise 'agentName is incorrect'   

In [6]:
''' FUNCTION TO CREATE THEE FOLLOWING AGENTS: TRACKER, COMMUNICATOR, ORCHESTRATOR'''
''' the system_message var is used for the general personality of the agent, determined during its creation '''

def create_agent(agentName: str, llm, tools, system_message: str): #* for tracker, communicator and orchestor
    """Create main agent."""
    prompt = ChatPromptTemplate.from_messages(
        [
            (
            set_agent_prompt(agentName=agentName, tools=tools)
            ),
            MessagesPlaceholder(variable_name="messages"),
        ]
    )
    prompt = prompt.partial(system_message=system_message)
    prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
    return prompt | llm.bind_tools(tools)

In [7]:
def create_tutor_agent(agentName: str, llm, tools, system_message: str): #* for converastional, listener etc...
    """Create the dfferents tutor agent."""
    prompt = ChatPromptTemplate.from_messages(
        [
            (
            "system",
            "you are the {agent_name} agent."
            " IGNORE THE PREVIOUS CHAT HISTORY, you do not know the lesson content a the beggining of your chat"
            "follow those steps:"
            "1. FIRST ALWAYS MAKE A FUNCTION CALL getTutorPrompt to retrieve the lesson content"
            "2. START INTERACTING with the user, "
            "do not stop interacting with the user unless:"
            "-they understood all the lesson material or"
            "-they explicitely said they want to stop"
            "3. if you covered all the lesson material,"
            "call the create_progress_report tool" #! add prompt for the report generation (give template to follow)
            "after recieving the report, write REPORT DONE"
            f"your tools are {[tool.name for tool in tools]}"
            "{system_message}"),
            MessagesPlaceholder(variable_name="messages"),
        ]
    )

    prompt = prompt.partial(agent_name=agentName)
    prompt = prompt.partial(system_message=system_message)

    return prompt | llm.bind_tools(tools)

### Create Communicator node and it's tools

**communicator planning:**
1. communicator reads the user profile, user progress file and curriculum
2. commnuicator generates a learning recommendation 
3. communicator outputs recommendation to the users

2 options: 

4.1. users accepts 

    1. generate report of who is user, what user already learned, what user wants to learn

    2. output instructions to orchestrator

4.2 users rejects

    1. commnuicator asks user to specify what they want or don't want to learn

    2. communicator regenerates new recommendation and outputs it to the user

    3. update the user preferences in the user profile file
    
    4. go to step 3.1

Available tools:
tool to read all files and send content back to communicator
tool to update user preferences 

### Create orchestrator node and it's tools

In [9]:
@tool
def getChunks(text) -> str:
    ''' use an llm to seperate the texts '''
    #print('seperating chunks...')
    with open('data/all_contents.txt', 'r') as file:
        text = file.read()

    prompt = '''steps to follow to seprate the text:
            -GROUP BY kapitel
            -then GROUP BY thema
            -then GROUP BY kategorie
            -for each group YOU MUSY CHOOSE the agent that is most present
            -KEEP THE ORDER of the contents
            STRUCTURE of ouput:
            1. list of Agent names (your options are: conversational, reader, listener, questionAnswering and GrammarSummary)
            2. list raw content of ONLY the inhalt of the group 
            DO NOT PUT COMMAS , IN THE RAW CONTENT
            template to use as an example:
            [conversational, reader, ...]
            ['raw content for conversational', 'raw content for reader',...]
            OUTPUT ONLY THE TWO LISTS
            '''
    try:
        response = client.chat.completions.create(
            model= GPT_MODEL,
            messages=[
                {"role": "system", "content": prompt},  # IN THEORY this prompt works (could use 1.few shots or 2.fine tuning for larger datasets)
                {"role": "user", "content": text}
            ],
            temperature=0.0,
        )
        llm_output = response.choices[0].message.content # == ' list 1\n list2 '
        matches = re.findall(r'\[([^]]+)\]', llm_output)
        agents = matches[0].split(',')
        contents = matches[1].split(',')
        # print(type(list1), list1)
        # print(type(list2), list2)
        global global_prompts_list 
        global_prompts_list = contents

        global agent_activation_order
        agent_activation_order = agents
        return 'continue'
    except Exception as e:
        print("Unable to generate ChatCompletion response")
        print(f"Exception: {e}")
        return e

In [10]:
orchestrator_agent = create_agent(
    agentName='orchestrator', 
    llm=llm, 
    tools=[getChunks], 
    system_message="You are the orchester agent, you job is to get the content chunks regrouped by similar goals and agent and provide the sequence of work for this agents " #* to be redefined later
    )

In [11]:
orchestrator_node = functools.partial(agent_node, agent=orchestrator_agent, name='orchestrator')

### Create tracker agent node and it's tools

In [12]:
''' TOOLS FOR THE TRACKER 
'''
@tool
def start_signal() -> str:
    ''' function to determine what agent need to be woken up
    ex: agentName <-> the ID present in the shared list between orchestrator and tracker
    output -> "conversational" <-> used by the router to know where to go in the graph
    '''
    if len(agent_activation_order) != 0: #if there are agents left to execute
        agent = agent_activation_order[0]
        assert type(agent) == str, f'type of agent incorrect (should be string but got {type(agent)})'
        #print(agent)
        return agent
    else: #list is empty here
        return 'no_more_lesson'

In [13]:
tracker_agent = create_agent(
    agentName='tracker', 
    llm=llm, 
    tools=[start_signal], 
    system_message="You are the tracker agent, you job is to track agent tutors and to create reports for user progress" #* to be redefined later
    )

In [14]:
tracker_node = functools.partial(agent_node, agent=tracker_agent, name='tracker')

### Tutor tools

In [15]:
''' tool function for all tutor agents to retrieve their prompts 
this function will probably need some tuning and tests to ensure there are no problems coming from the orchester
'''
@tool 
def getTutorPrompt() -> str:
    ''' return the first element of the global prompts list '''
    prompt = global_prompts_list[0] # -> 'Content' -> retrive content
    assert type(prompt) == str, 'prompt is not a string...'
    print(f'BEFORE RETRIEVAL: \ncontents = {global_prompts_list} \nagents = {agent_activation_order}')
    global_prompts_list.pop(0)
    agent_name = agent_activation_order[0]
    print(f"Retrieved content {prompt} for agent {agent_name}")
    agent_activation_order.pop(0)
    print(f'AFTER RETRIEVAL: \ncontents = {global_prompts_list} \nagents = {agent_activation_order}')
    return agent_name + ' lesson: ' + prompt #what is gonna be returned to the tutor agent

@tool
def create_progress_report(reportAndFeedback=None) -> str: 
    ''' write all the reports in file 'address' 
    THIS TOOL DOES NOT AFFECT THE PROCESS IN ANY WAY, SINCE THE LEARNING PROCESS ONLY STOPS WHEN THERE ARE NO MORE AGENTS
    convert reportandfeedback in json format
    write/add to global user_progress file this string
    return str -> Updated user progress file

    user_progress_file structure:
    #this 
    {
        "UserID": "id", 
        "Kapitel": "1",
        "Thema": "Moien",
        "agent": "conversational",
        "goals": "explain basic greetings",
        "feedback": "the user learned ......"
    }

    '''
    print('Tool not writing anything yet but called properly!') #debugging tool to see something happenend
    return'FINAL REPORT placeholder for the actual report'

### Create Conversational agent node

In [16]:
conversational_agent = create_tutor_agent(
    agentName='conversational',
    llm=llm,
    tools=[getTutorPrompt, create_progress_report],
    system_message="You are the conversational agent, your job is to teach the lesson to the student through conversation" # add 'in Luxembourgish' for final tests
)

In [17]:
conversational_node = functools.partial(agent_node, agent=conversational_agent, name='conversational')

### Create Reader agent node

In [18]:
reader_agent = create_tutor_agent(
    agentName='reader',
    llm=llm,
    tools=[getTutorPrompt, create_progress_report],
    system_message="You are the reader agent, your job is to teach the lesson to the student through reading comprehension" # add 'in Luxembourgish' for final tests
)

In [19]:
reader_node = functools.partial(agent_node, agent=reader_agent, name='reader')

### Create Listening agent node

### Create Question Answering agent node

### Create GrammaSummary agent node

### Define the edge logic

In [20]:
from langgraph.prebuilt import ToolNode

tracker_tools = [start_signal]
tutor_tools = [getTutorPrompt, create_progress_report]
orchestrator_tools = [getChunks]
tracker_tool_node = ToolNode(tracker_tools)
tutor_tool_node = ToolNode(tutor_tools)
orchestrator_tool_node = ToolNode(orchestrator_tools)

In [21]:
# Either agent can decide to end
from typing import Literal

# define the router function
def router_tracker(state) -> Literal["call_tool", "kill_process"]:
    print('-- tracker router --')
    messages = state["messages"]
    last_message = messages[-1]
    # depends only on the start_signal output
    if 'kill_process' in last_message.content:
        return "kill_process"
    else:
        return "call_tool" #to force start_signal call
    
def route_to_tutor(state) -> Literal["conversational", "no_more_lesson"]:
    print('-- tracker_call_tool router --')
    messages = state["messages"]
    print('testing start_signal output: ', messages[-1].content)
    return [messages[-1].content]

def router_tutor(state) -> Literal["call_tool", "continue", "FINAL REPORT"]:
    ''' BUG: sometimes the tutor starts the convo without calling its tool, fix asap @urgent'''
    #? create seperate router for each tutor agent or generalise it? -should be able to generalize
    print('-- tutor agent router --')
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
        return "call_tool"
    if "REPORT DONE" in last_message.content:
        return "FINAL REPORT"
    elif "USER TURN":
        print('AI ASSISTANT: ', last_message.content)
        user_input = input()
        print('YOU: ', user_input)
        #add this to messages
        last_message.content += " user response: " + user_input
        return "continue" 
    return "call_tool" #forcing call tool (sometimes agent starts conversation anyways, need to fix that)

def route_back_to_tutor(state) -> Literal['conversational', 'reader']:
    print('-- tutor call tool router --')
    messages = state["messages"]
    last_message = messages[-1]
    if 'FINAL REPORT' and 'reader' in last_message.content:
        return 'reader'
    if 'FINAL REPORT' and 'conversational' in last_message.content:
        return 'conversational'
    if 'conversational' in last_message.content: #? might have to change for multiple tutors
        return 'conversational'
    if 'reader' in last_message.content:
        return 'reader'
    
def orchestrator_router(state) -> Literal['continue_to_tracker', 'call_tool']:
    print('-- orchestrator router --')
    messages = state['messages']
    last_message = messages[-1]
    if 'continue' in last_message.content: #if both tools have been called go to the tracker
        # clear_memory(memory=memory, thread_id="123456", thread_ts="123456")
        return 'continue_to_tracker'
    if last_message.tool_calls: #otherwise call your tool
        return 'call_tool'
    

In [22]:
## add Graph Nodes
workflow = StateGraph(AgentState)
workflow.add_node("orchestrator", orchestrator_node)
workflow.add_node("tracker", tracker_node)
workflow.add_node("conversational", conversational_node)
workflow.add_node("reader", reader_node)

workflow.add_node('orchestrator_call_tool', orchestrator_tool_node)
workflow.add_node("tracker_call_tool", tracker_tool_node)
workflow.add_node("tutor_call_tool", tutor_tool_node)

In [23]:
## add conditional edges
workflow.add_conditional_edges(
    "orchestrator",
    orchestrator_router,
    {"call_tool": "orchestrator_call_tool", "continue_to_tracker": "tracker"}
)

workflow.add_edge(
    "orchestrator_call_tool",
    "orchestrator"
)

workflow.add_conditional_edges(
    "tracker",
    router_tracker,
    {"call_tool": "tracker_call_tool", "kill_process": END},
)

workflow.add_conditional_edges(
    "conversational",
    router_tutor,
    {"call_tool": "tutor_call_tool", "FINAL REPORT": "tracker", "continue": "conversational"},) #

workflow.add_conditional_edges(
    "reader",
    router_tutor,
    {"call_tool": "tutor_call_tool", "FINAL REPORT": "tracker", "continue": "reader"}
)

workflow.add_conditional_edges(
    "tracker_call_tool",
    route_to_tutor,
    {"conversational": "conversational", "reader": "reader", "no_more_lesson": "tracker"}
)

workflow.add_conditional_edges(
    #! for mutliple agents: USE LAMBDA EXPRESSION TO ALWAYS SEND MSG BACK TO SENDER + no need to send back agent type in get prompt
    "tutor_call_tool",
    route_back_to_tutor,
    {'conversational': 'conversational', 'reader':'reader'}
)


In [24]:
workflow.add_edge(START, "orchestrator")

graph = workflow.compile() #checkpointer=memory

In [25]:
from IPython.display import Image, display

try:
    display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
except Exception:
    # This requires some extra dependencies and is optional
    pass

<IPython.core.display.Image object>

In [26]:
for s in graph.stream( 
        {
            "messages": [
                HumanMessage(content="Orchestrator, please start with your task(s)")
            ]
        }, {"recursion_limit": 100}
    ):
        if "__end__" not in s:
            print(s)
            print("----")

-- orchestrator router --
{'orchestrator': {'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_l2JRULOHpPZcZISADroEHhYN', 'function': {'arguments': '{"text":"The complexity of modern software systems and the methods used to ensure their reliability vary widely. One common approach is through the use of software testing, which aims to detect defects before the product is deployed. Another method is software verification, which involves checking that software meets specified requirements. Both methods are crucial for the development of dependable software systems."}', 'name': 'getChunks'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 79, 'prompt_tokens': 111, 'total_tokens': 190}, 'model_name': 'gpt-4-turbo-2024-04-09', 'system_fingerprint': 'fp_486730399b', 'finish_reason': 'tool_calls', 'logprobs': None}, name='orchestrator', id='run-150f41e0-22e6-476e-9fa6-c2c1511d78b4-0', tool_calls=[{'name': 'getChunks', 'args': {'text

KeyError: ' listening'