### Configure LangChain and LangSmith

In [1]:
import os

# Read the file and load the keys
token_file = open('../token.txt', 'r')
# Read the first line
open_ai_token_line = token_file.readline()

if open_ai_token_line:
    open_ai_key = open_ai_token_line.split('=')[1].strip()
    print(f'Open AI Key  is loaded')

# Read the second line
langsmith_token_line = token_file.readline()

if langsmith_token_line:
    langsmith_key = langsmith_token_line.split('=')[1].strip()
    print(f'Langsmith Key is loaded')


if langsmith_key is not None:
    # Configure LangSmith
    os.environ["LANGCHAIN_TRACING_V2"] = "true"
    os.environ["LANGCHAIN_PROJECT"] = "MULTI-AGENT"
    os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
    os.environ["LANGCHAIN_API_KEY"] = langsmith_key

Open AI Key  is loaded
Langsmith Key is loaded


### Tools

#### Ecore parser

In [3]:
from langchain.agents import Tool

#PyEcore
from pyecore.resources import ResourceSet, URI

class EcoreParser:

    def __init__(self):
        self.resource_set = ResourceSet()
        self.classes_per_metamodel = {}
    
    def check_ecore_class(self, ecore_name: str, class_to_test: str):
        ecore_path = f'../temp/{ecore_name}.ecore'
        if ecore_path not in self.classes_per_metamodel:
            self.classes_per_metamodel[ecore_path] = []
        if class_to_test in self.classes_per_metamodel[ecore_path]:
            return True
        #register metamodel
        resource_path = self.resource_set.get_resource(URI(ecore_path))
        content = resource_path.contents[0]
        if content is None:
            return False
        if content.nsURI is None:
            return False
        if  content.nsURI not in self.resource_set.metamodel_registry:
            self.resource_set.metamodel_registry[content.nsURI] = content
        #check class
        for element in resource_path.contents.contents:    
            class_name = element.eClass.name
            if class_name == class_to_test:
                if self.classes_per_metamodel[ecore_path] is None:
                    self.classes_per_metamodel[ecore_path].append(class_name)
                return True        
        return False
    
from langchain.pydantic_v1 import BaseModel, Field

from langchain.agents import tool

class CheckInput(BaseModel):
    metamodel_name: str = Field(description="Name of the metamodel to be checked")
    class_to_test: str = Field(description="Class to be tested  and check if it exists in the metamodel")

class CheckEcoreClassTool():
    """
    CheckEcoreClassTool class.
    """

    @staticmethod
    @tool("check_ecore_class", args_schema=CheckInput)
    def check_ecore_class(metamodel_name: str, class_to_test: str) -> bool:
        """
        Run the tool synchronously.

        Parameters
        ----------
        metamodel_name : str
            The path to the Ecore file.
        class_to_test : str
            The class to test.

        Returns
        -------
        bool
            True if the Ecore class is found, False otherwise.
        """
        parser = EcoreParser()
        return parser.check_ecore_class(metamodel_name, class_to_test)
    
    def get_tool(self) -> Tool:
        """
        Get the tool.

        Returns
        -------
        Tool
            The tool.
        """
        return Tool(
            func=CheckEcoreClassTool.check_ecore_class,
            name="check_ecore_class",
            description="Useful for when you need to verify if a given class exists in a specific metamodel",
        )
    

#### PlantUML Reader

In [4]:
import os
from langchain.text_splitter import CharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.agents import Tool
from langchain.chains import RetrievalQA

class MetamodelReaderToolCreator:

    def create_reader_tool(self, document=None):
        """
        Create reader tool for the given document.

        Parameters
        ----------
        document : list[dict], optional
            A dictionary representing the document. It should have a 'name' key
            representing the name of the document and a 'content' key representing the content of the document,
            by default None

        Returns
        -------
        Tool
            A Tool object representing the retrieval tool created for the document.
        """  

        return Tool(
                name=f'{document["name"]}_Reader',
                description=f"useful when you want to analyze or get attributes from the PlantUML metamodel {document['name']}",
                func=(lambda _: document['content'].strip()),
        )
    
reader_tools_creator = MetamodelReaderToolCreator()

meta1_rel_path = '../temp/Book.txt'
meta1_file = open(meta1_rel_path, 'r')
meta2_rel_path = '../temp/Publication.txt'
meta2_file = open(meta2_rel_path, 'r')

document1 = {"name": f"{meta1_rel_path.split('/')[-1].split('.')[0]}", "content": meta1_file.read()}
document2 = {"name": f"{meta2_rel_path.split('/')[-1].split('.')[0]}", "content": meta2_file.read()}

##### Tools for all agents

In [5]:
checkEcoreClassTool = CheckEcoreClassTool()
tools = [checkEcoreClassTool.get_tool()]
tools.append(reader_tools_creator.create_reader_tool(document1))
tools.append(reader_tools_creator.create_reader_tool(document2))

tools

[Tool(name='check_ecore_class', description='Useful for when you need to verify if a given class exists in a specific metamodel', func=StructuredTool(name='check_ecore_class', description='check_ecore_class(metamodel_name: str, class_to_test: str) -> bool - Run the tool synchronously.\n\n        Parameters\n        ----------\n        metamodel_name : str\n            The path to the Ecore file.\n        class_to_test : str\n            The class to test.\n\n        Returns\n        -------\n        bool\n            True if the Ecore class is found, False otherwise.', args_schema=<class '__main__.CheckInput'>, func=<function CheckEcoreClassTool.check_ecore_class at 0x000001C6CAF30FE0>)),
 Tool(name='Book_Reader', description='useful when you want to analyze or get attributes from the PlantUML metamodel Book', func=<function MetamodelReaderToolCreator.create_reader_tool.<locals>.<lambda> at 0x000001C6B9373F60>),
 Tool(name='Publication_Reader', description='useful when you want to anal

### Plan and execute

##### Template

In [6]:
# Set up the base template
template = """
Given the user inputs, create a plan to solve it. Each plan should comprise an action from the following {num_tools} types:

{tool_descriptions}

{num_tools}. join(): Collects and combines results from prior actions.

 - An LLM agent is called upon invoking join() to either finalize the user query or wait until the plans are executed.

 - join should always be the last action in the plan, and will be called in two scenarios:

   (a) if the answer can be determined by gathering the outputs from tasks to generate the final response.

   (b) if the answer cannot be determined in the planning phase before you execute the plans. Guidelines:

 - Each action described above contains input/output types and description.

    - You must strictly adhere to the input and output types for each action.

    - The action descriptions contain the guidelines. You MUST strictly follow those guidelines when you use the actions.

 - Each action in the plan should strictly be one of the above types. Follow the Python conventions for each action.

 - Each action MUST have a unique ID, which is strictly increasing.

 - Inputs for actions can either be constants or outputs from preceding actions. In the latter case, use the format $id to denote the ID of the previous action whose output will be the input.

 - Always call join as the last action in the plan. Say '<END_OF_PLAN>' after you call join

 - Ensure the plan maximizes parallelizability.

 - Only use the provided action types. If a query cannot be addressed using these, invoke the join action for the next steps.

 - Never introduce new actions other than the ones provided."""
template

"\nGiven the user inputs, create a plan to solve it. Each plan should comprise an action from the following {num_tools} types:\n\n{tool_descriptions}\n\n{num_tools}. join(): Collects and combines results from prior actions.\n\n - An LLM agent is called upon invoking join() to either finalize the user query or wait until the plans are executed.\n\n - join should always be the last action in the plan, and will be called in two scenarios:\n\n   (a) if the answer can be determined by gathering the outputs from tasks to generate the final response.\n\n   (b) if the answer cannot be determined in the planning phase before you execute the plans. Guidelines:\n\n - Each action described above contains input/output types and description.\n\n    - You must strictly adhere to the input and output types for each action.\n\n    - The action descriptions contain the guidelines. You MUST strictly follow those guidelines when you use the actions.\n\n - Each action in the plan should strictly be one of 

In [8]:
from langchain.prompts import PromptTemplate

prompt = PromptTemplate.from_template(template)
print(prompt.pretty_print())


Given the user inputs, create a plan to solve it. Each plan should comprise an action from the following [33;1m[1;3m{num_tools}[0m types:

[33;1m[1;3m{tool_descriptions}[0m

[33;1m[1;3m{num_tools}[0m. join(): Collects and combines results from prior actions.

 - An LLM agent is called upon invoking join() to either finalize the user query or wait until the plans are executed.

 - join should always be the last action in the plan, and will be called in two scenarios:

   (a) if the answer can be determined by gathering the outputs from tasks to generate the final response.

   (b) if the answer cannot be determined in the planning phase before you execute the plans. Guidelines:

 - Each action described above contains input/output types and description.

    - You must strictly adhere to the input and output types for each action.

    - The action descriptions contain the guidelines. You MUST strictly follow those guidelines when you use the actions.

 - Each action in the pl

In [21]:
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI


def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str):
    # Each worker node will be given a name and some tools.
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                system_prompt,
            ),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_openai_tools_agent(llm, tools, prompt)
    executor = AgentExecutor(agent=agent, tools=tools)
    return executor

In [22]:
def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {"messages": [HumanMessage(content=result["output"], name=name)]}

In [23]:
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

members = ["Researcher", "Coder"]
system_prompt = (
    "You are a supervisor tasked with managing a conversation between the"
    " following workers:  {members}. Given the following user request,"
    " respond with the worker to act next. Each worker will perform a"
    " task and respond with their results and status. When finished,"
    " respond with FINISH."
)
# Our team supervisor is an LLM node. It just picks the next agent to process
# and decides when the work is completed
options = ["FINISH"] + members
# Using openai function calling can make output parsing easier for us
function_def = {
    "name": "route",
    "description": "Select the next role.",
    "parameters": {
        "title": "routeSchema",
        "type": "object",
        "properties": {
            "next": {
                "title": "Next",
                "anyOf": [
                    {"enum": options},
                ],
            }
        },
        "required": ["next"],
    },
}
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="messages"),
        (
            "system",
            "Given the conversation above, who should act next?"
            " Or should we FINISH? Select one of: {options}",
        ),
    ]
).partial(options=str(options), members=", ".join(members))

llm = ChatOpenAI(model="gpt-3.5-turbo", openai_api_key=open_ai_key)

supervisor_chain = (
    prompt
    | llm.bind_functions(functions=[function_def], function_call="route")
    | JsonOutputFunctionsParser()
)

In [24]:
import operator
from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict
import functools

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, END


# The agent state is the input to each node in the graph
class AgentState(TypedDict):
    # The annotation tells the graph that new messages will always
    # be added to the current states
    messages: Annotated[Sequence[BaseMessage], operator.add]
    # The 'next' field indicates where to route to next
    next: str


research_agent = create_agent(llm, [tavily_tool], "You are a web researcher.")
research_node = functools.partial(agent_node, agent=research_agent, name="Researcher")

# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION. PROCEED WITH CAUTION
code_agent = create_agent(
    llm,
    [python_repl_tool],
    "You may generate safe python code to analyze data and generate charts using matplotlib.",
)
code_node = functools.partial(agent_node, agent=code_agent, name="Coder")

workflow = StateGraph(AgentState)
workflow.add_node("Researcher", research_node)
workflow.add_node("Coder", code_node)
workflow.add_node("supervisor", supervisor_chain)

In [25]:
for member in members:
    # We want our workers to ALWAYS "report back" to the supervisor when done
    workflow.add_edge(member, "supervisor")
# The supervisor populates the "next" field in the graph state
# which routes to a node or finishes
conditional_map = {k: k for k in members}
conditional_map["FINISH"] = END
workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)
# Finally, add entrypoint
workflow.set_entry_point("supervisor")

graph = workflow.compile()
conditional_map

{'Researcher': 'Researcher', 'Coder': 'Coder', 'FINISH': '__end__'}

In [26]:
for s in graph.stream(
    {
        "messages": [
            HumanMessage(content="Code hello world and print it to the terminal")
        ]
    }
):
    if "__end__" not in s:
        print(s)
        print("----")

{'supervisor': {'next': 'Coder'}}
----
{'Coder': {'messages': [HumanMessage(content='The code "Hello, World!" has been printed to the terminal.', name='Coder')]}}
----
{'supervisor': {'next': 'FINISH'}}
----
