In [1]:
from dotenv import load_dotenv
load_dotenv()
import logging
from bs4 import BeautifulSoup
import html2text
import httpx
import yaml
import json
from pydantic import Field, BaseModel
from langgraph.graph import MessagesState, StateGraph, START, END
from typing import List
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.prompts import PromptTemplate
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langgraph.types import Send
import operator
from typing import Annotated
from typing import NamedTuple



# Set up the logger
logging.basicConfig(
    level=logging.INFO,  # Set to DEBUG for detailed logs
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        # logging.FileHandler("scraper.log"),  # Log to a file
        logging.StreamHandler()  # Log to console
    ]
)

logger = logging.getLogger(__name__)


def fetch_documents(url: str) -> str:
    """Fetch a document from a URL and return the markdownified text.

    Args:
        url (satr): The URL of the document to fetch.

    Returns:
        str: The markdownified text of the document.
    """
    httpx_client = httpx.Client(follow_redirects=True, timeout=10)

    try:
        response = httpx_client.get(url, timeout=10)
        response.raise_for_status()
        html_content = response
        soup = BeautifulSoup(html_content, 'html.parser')
        a_tags = soup.find_all('a')
        for a_tag in a_tags:
            a_tag.decompose()
        
        img_tags = soup.find_all('img')
        for img_tag in img_tags:
            img_tag.decompose()

        target_div = soup.find('div', class_= "theme-doc-markdown markdown") #langchain
        
        if not target_div:
            target_div = soup.find('article') #langraph
        
        if not target_div:
            return
        
        return html2text.html2text(str(target_div))
    except (httpx.HTTPStatusError, httpx.RequestError) as e:
        return f"Encountered an HTTP error: {str(e)}"
    
    
agent_architecture_urls = ["https://langchain-ai.github.io/langgraph/tutorials/multi_agent/multi-agent-collaboration",
 "https://langchain-ai.github.io/langgraph/tutorials/multi_agent/agent_supervisor",
 "https://langchain-ai.github.io/langgraph/tutorials/multi_agent/hierarchical_agent_teams",
 "https://langchain-ai.github.io/langgraph/tutorials/plan-and-execute/plan-and-execute",
 "https://langchain-ai.github.io/langgraph/tutorials/self-discover/self-discover",
]


template = """Your job is to get information from a user about what kind of agent they wish to build.

You should get the following information from them:

- What the objective of the agent is
- Various usecases of the agent
- Examples of the usage of the agent: input query and expected output from the agent

If you are not able to discern this info, ask them to clarify! Do not attempt to wildly guess.

After you are able to discern all the information, call the tool AgentInstruction """

In [2]:
class AgentInstructions(BaseModel):
    """Instructions on how to build the Agent"""
    objective: str = Field(description= "What is the primary objective of the agent")
    usecases: List[str] = Field(description= "What are the various responsibilities of the agent which it needs to fulfill")
    examples : str = Field(description= "What are some examples of the usage of the agent (input query and expected output from the agent) ?")

class ArchEvaluationReport(BaseModel):
    """Class to represent the architecture evaluation report"""
    name: str = Field(description="Name of the agent architecture being evaluated")
    highlights: str = Field(description="Concise summary of the architecture in 5 lines")
    evaluation_score: int = Field(description="evaluation of suitability of the agentic_architecture against user requirements from 1-10, 1 being least relevant to 10 being most relevant")
    justification: str = Field( description="Justification for the score")
    tailored_design: str = Field(description="Tailored design using the architecture")
    
class ArchEvaluationWithUrl(NamedTuple):
    url: str
    report: ArchEvaluationReport

class AgentBuilderState(MessagesState):
    agent_instructions: AgentInstructions = Field("the requirement analysis generated by the model.")
    arch_evaluation_reports: Annotated[List[ArchEvaluationWithUrl], operator.add] = Field("list of agent architectures suggested in map-reduce step")
    best_agent_architecture: ArchEvaluationWithUrl = Field("The agent architecture best suited to above requirements")
    json_code: str = Field("The json code generated")
    python_code: str = Field("The Python code generated")

class ArchitectureEvaluationState(MessagesState):
    agent_instructions: AgentInstructions = Field("the requirement analysis generated by the model.")
    url: str = Field("url of the agent architecture to evaluate against")

llm = ChatOpenAI(temperature=0, model="gpt-4o-mini", streaming=True)

def requirement_analysis_node(state: AgentBuilderState):
    
    llm_with_tool = llm.bind_tools([AgentInstructions])
    response = llm_with_tool.invoke([SystemMessage(content=template)] + state["messages"])
    
      # Construct the final answer from the arguments of the last tool call  
    if len(response.tool_calls) == 0:
        return {"messages": [response]}
    
    agent_instructions = response.tool_calls[0]
    agent_instructions = AgentInstructions(**agent_instructions["args"])
    
    return {"messages": [response], "agent_instructions": agent_instructions}


In [10]:
ARCH_EVALUATION_PROMPT = PromptTemplate.from_template(
    """
You are tasked with assessing the provided agentic architecture documentation to determine its relevance and applicability to the user requirements outlined below.

Inputs:

- Agentic Architecture Documentation:
{agent_architecture}

- User Requirements:- Objectives: {objective}
- Use Cases: {responsibilities}
- Examples: {examples}


Deliverables:
Your evaluation should be presented in the following structured format:

- Architecture Name:
Provide the name of the agent architecture being assessed.
- Concise Identifier:
Suggest a brief, descriptive name for the architecture that encapsulates its purpose.
- Key Highlights (2-3 Lines):
Identify and summarize the most significant aspects or features of the architecture.
- Feature Summary:
Offer a detailed overview of the architecture's unique elements, functionalities, and capabilities.
- Relevance Score (1-10):
Assign a score based on the alignment between the architecture's features and the user's requirements (1 = minimally relevant, 10 = highly relevant).
- Score Justification:
Provide a clear and concise rationale (5-10 sentences) for the relevance score, highlighting how specific features match—or fail to match—the user's objectives, use cases, and examples.
- Implementation Proposal:
Outline a tailored approach for leveraging the architecture to meet the user’s requirements. Be specific and actionable, addressing how it can fulfill the stated objectives and responsibilities.
"""
)


AGENT_KERNEL_PROMPT = PromptTemplate.from_template(
    """
        You are tasked with designing a langgraph StateGraph object that implements the {agent_architecture_name} architecture tailored to meet the user requirements outlined below.
        <Requirements>
        Objectives: {objective}
        usecases: {responsibilities}
        examples: {examples}
        </Requirements>

        <Documentation for {agent_architecture_name}>
        {agent_architecture}
        </Documentation for {agent_architecture_name}>
        
        Suggestion on how to implement the architecture in a way that meets the user requirements:
        {agent_tailored}
        
        Output needs to be a compiled StateGraph object.
        
        Important to Note:
        * Do not hallucinate when writing StateGraph related code, refer to the documentation provided.
        * Understand the concept of the architecture, and refer to the examples of code. Now generate your own code tailored to requirements.
    """)

In [11]:
def agent_kernel_builder(state: AgentBuilderState):
    """Build the agent kernel using the best architecture."""
    best_architecture: ArchEvaluationWithUrl = state["best_agent_architecture"]
    agent_instructions : AgentInstructions = state["agent_instructions"]
    langgraph_glossary_url = "https://langchain-ai.github.io/langgraph/concepts/low_level/"
    agent_architecture_url : str = best_architecture.url
    agent_architecture_report : ArchEvaluationReport = best_architecture.report
    # agent_architecture_report.name
    #agent_architecture_report.highlights
    #agent_architecture_report.justification
    #agent_architecture_report.tailored_design
    
    response =  llm.invoke([SystemMessage(content=AGENT_KERNEL_PROMPT.format(
        objective=agent_instructions.objective,
        responsibilities=agent_instructions.usecases,
        examples = agent_instructions.examples,
        # langgraph_glossary=fetch_documents(langgraph_glossary_url),
        agent_tailored=agent_architecture_report.tailored_design,
        agent_architecture_name = agent_architecture_report.name,
        agent_architecture=fetch_documents(agent_architecture_url)))])
    
    # Return the generated agent kernel as the output
    return {
        "messages": [AIMessage(content="Generated agent kernel code!")],
        "python_code": response.content,
    }

In [12]:

def architecture_evaluation_map_node(state: AgentBuilderState):
    return [Send("evaluate_against_architecture", {"agent_instructions": state["agent_instructions"], "url": url}) for url in agent_architecture_urls]

def route_state(state: AgentBuilderState):
    messages = state["messages"]
    if isinstance(messages[-1], AIMessage) and messages[-1].tool_calls:
        return "add_tool_message"
    elif not isinstance(messages[-1], HumanMessage):
        return END
    return "requirement_analysis"

def evaluate_against_architecture(state: ArchitectureEvaluationState):
    agent_instructions: AgentInstructions = state["agent_instructions"]
    url: str = state["url"]
    llm_with_structured_output = llm.with_structured_output(ArchEvaluationReport)
    archEvaluationReport: ArchEvaluationReport = llm_with_structured_output.invoke(
        [SystemMessage(content=ARCH_EVALUATION_PROMPT.format(agent_architecture=fetch_documents(url),
                                                             objective=agent_instructions.objective,
                                                             responsibilities=agent_instructions.usecases,
                                                             examples = agent_instructions.examples))])
    
    return {
        "messages": [AIMessage(content=f"Evaluated architecture {url}, arch_name: {archEvaluationReport.name}")],
        "arch_evaluation_reports": [ArchEvaluationWithUrl(url,archEvaluationReport)],
    }

def best_architecture(state: AgentBuilderState):
    """Select the best architecture based on the evaluation reports."""
    # Sort the architectures based on their evaluation scores
    arch_reports : List[ArchEvaluationWithUrl] = state["arch_evaluation_reports"]
    sorted_architectures = sorted(arch_reports, key=lambda x: x.report.evaluation_score , reverse=True)
    
    # Select the best architecture (the first one in the sorted list)
    best_architecture = sorted_architectures[0]
    
    print("found the best architecture")
    
    # Return the best architecture as the output
    return {
        "messages": [AIMessage(content="Best architecture selected!")],
        "best_agent_architecture": best_architecture,
    }

In [13]:

CODE_TO_JSON_PROMPT = PromptTemplate.from_template("""
You are tasked with converting the following stategraph comPilation code into a JSON. 

Contextual documents for understanding code:
{documents}

The code is as follows:
{code_snippet}

OUTPUT: Explaination and JSON. Do not include any code blocks. Seperate the JSON and explaination blocks and ensure that there is an explaination for each line of JSON produced but keep the blocks seperated.
Each Output JSON will have a nodes sections containing all the nodes and an edges section

Please follow:
1. Produce the explaination first and then the JSON after it. DO not produce the JSON first. 
2. For any conditional edges, please include all the nodes that the source of a conditional edge can reach as part of the explaination.
3. Any Edge entry in the JSON can only be conditional(mention conditional: true) if the source for that edge acts as a source for multiple edges. If you cannot point to atleast 2 targets for 1 source, then that source will not have any conditional edges
4. A source can have any number of targets. Please write the explaination for each source node to target node edge
5. Please ensure that the JSON starts with __START__ node and __END__ node with the correct edges from and to them
6. Ensure all elements in the nodes sections of the output json contain the following fields: Schema_info, input_schema, output_schema, description, function_name. Please do not return any entries in the nodes without these fields and these fields can't be empty
7. Ensure all elements in the edges sections of the output json contain the following fields: source, target, routing_conditions, conditional. Please do not return any entries in the edges without these fields and they can't be empty
8. Every node should be a part of atleast one edge, Please ensure this is followed


Example output JSON for a node:
    "code_node":{{
        "schema_info": /"/"/"CodeWriterState:
      type: TypedDict
      fields:
      - name: user_query
        type: str
      - name: execution_result
        type: str/"/"/",
    "input_schema": "CodeWriterState",
    "output_schema":"RequiremenCodeWriterStatetAnalysisState",
    "description":"This node analyzes the user_query, if the query is to write a code, it will make a tool call to run the proposed code. This node returns command object",
    "function_name": "code_step"
    }}

Example output JSON for an edge:
edge:{{ source: "abc", target: "cde", routing_condition: "if abc made a tool call then go to cde", "conditional": true}}
edge:{{ source: "abc", target: "xyz", routing_condition: "if abc made an interupt to a human then go to xyz", "conditional": true}}
edge:{{ source: "xyz", target: "_END_", routing_condition: "no nodes to go after xyz, we have our final output for this path", "conditional": false}}s         
""")

In [14]:
def code_to_json_node(state: AgentBuilderState):
    """Convert the generated code to JSON."""
    langgraph_glossary_url = "https://langchain-ai.github.io/langgraph/concepts/low_level/"
    json_code_ouptut = llm.invoke([SystemMessage(content=CODE_TO_JSON_PROMPT.format(
        code_snippet=state["python_code"],
        documents = fetch_documents(langgraph_glossary_url),
        ))])
    
    # Return the JSON code as the output
    return {
        "messages": [AIMessage(content="Generated JSON code!")],
        "json_code": json_code_ouptut.content,
    }



In [15]:
workflow = StateGraph(AgentBuilderState)
workflow.add_node("requirement_analysis", requirement_analysis_node)
workflow.add_node("evaluate_against_architecture", evaluate_against_architecture)
workflow.add_node("best_architecture", best_architecture)
workflow.add_node("agent_kernel_builder", agent_kernel_builder)
workflow.add_node("code_to_json", code_to_json_node)

@workflow.add_node
def add_tool_message(state: AgentBuilderState):
    
   
    return {
        "messages": [
            ToolMessage(
                content="Requirements generated!",
                tool_call_id=state["messages"][-1].tool_calls[0]["id"],
            )
        ]
    }


workflow.add_edge("code_to_json", END)
workflow.add_edge("agent_kernel_builder", "code_to_json")
workflow.add_edge("best_architecture", "agent_kernel_builder")
workflow.add_edge("evaluate_against_architecture", "best_architecture")
workflow.add_conditional_edges("add_tool_message", architecture_evaluation_map_node,["evaluate_against_architecture"])
workflow.add_conditional_edges("requirement_analysis", route_state, ["add_tool_message", "requirement_analysis", END])
workflow.add_edge(START, "requirement_analysis")
infograph = workflow.compile()

In [8]:
import uuid

cached_human_responses = ["hi!", "rag prompt", "1 rag, 2 none, 3 no, 4 no", "red", "q"]
cached_response_index = 0
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
while True:
    try:
        user = input("User (q/Q to quit): ")
    except:
        user = cached_human_responses[cached_response_index]
        cached_response_index += 1
    print(f"User (q/Q to quit): {user}")
    if user in {"q", "Q"}:
        print("AI: Byebye")
        break
    output = None
    for output in infograph.stream(
        {"messages": [HumanMessage(content=user)]}, config=config, stream_mode="updates"
    ):
        last_message = next(iter(output.values()))["messages"][-1]
        last_message.pretty_print()

    if output and "prompt" in output:
        print("Done!")
        

User (q/Q to quit): q
AI: Byebye
