In [17]:
from dotenv import load_dotenv
import os
load_dotenv('../.env_api')
from rich.pretty import pprint
from operator import add

def ppprint(obj):
    pprint(obj, indent_guides=False)

os.environ["HTTP_PROXY"] = "http://localhost:9090"
os.environ["HTTPS_PROXY"] = "http://localhost:9090"
os.environ["REQUESTS_CA_BUNDLE"] = "/Users/tomek/Library/Application Support/com.proxyman.NSProxy-setapp/app-data/proxyman-ca.pem"
os.environ["SSL_CERT_FILE"] = "/Users/tomek/Library/Application Support/com.proxyman.NSProxy-setapp/app-data/proxyman-ca.pem"
from langsmith import Client
import os

from typing import List, Annotated, Any, Dict, Optional
from pydantic import BaseModel, Field

import instructor
from openai import OpenAI

import sys
from pathlib import Path


### Traditional helpers

In [18]:
def parse_function_definition(function_def: str) -> Dict[str, Any]:
    """Parse a function definition string to extract metadata including type hints."""
    result = {
        "name": "",
        "description": "",
        "parameters": {"type": "object", "properties": {}},
        "required": [],
        "returns": {"type": "string", "description": ""}
    }
    
    # Parse the function using AST
    tree = ast.parse(function_def.strip())
    if not tree.body or not isinstance(tree.body[0], ast.FunctionDef):
        return result
        
    func = tree.body[0]
    result["name"] = func.name
    
    # Extract docstring
    docstring = ast.get_docstring(func) or ""
    if docstring:
        # Extract description (first line/paragraph)
        desc_end = docstring.find('\n\n') if '\n\n' in docstring else docstring.find('\nArgs:')
        desc_end = desc_end if desc_end > 0 else docstring.find('\nParameters:')
        result["description"] = docstring[:desc_end].strip() if desc_end > 0 else docstring.strip()
        
        # Parse parameter descriptions
        param_descs = parse_docstring_params(docstring)
        
        # Extract return description
        if "Returns:" in docstring:
            result["returns"]["description"] = docstring.split("Returns:")[1].strip().split('\n')[0]
    
    # Extract parameters with type hints
    args = func.args
    defaults = args.defaults
    num_args = len(args.args)
    num_defaults = len(defaults)
    
    for i, arg in enumerate(args.args):
        if arg.arg == 'self':
            continue
            
        param_info = {
            "type": get_type_from_annotation(arg.annotation) if arg.annotation else "string",
            "description": param_descs.get(arg.arg, "")
        }
        
        # Check for default value
        default_idx = i - (num_args - num_defaults)
        if default_idx >= 0:
            param_info["default"] = ast.literal_eval(ast.unparse(defaults[default_idx]))
        else:
            result["required"].append(arg.arg)
        
        result["parameters"]["properties"][arg.arg] = param_info
    
    # Extract return type
    if func.returns:
        result["returns"]["type"] = get_type_from_annotation(func.returns)
    
    return result

def get_type_from_annotation(annotation) -> str:
    """Convert AST annotation to type string."""
    if not annotation:
        return "string"
    
    type_map = {
        'str': 'string',
        'int': 'integer', 
        'float': 'number',
        'bool': 'boolean',
        'list': 'array',
        'dict': 'object',
        'List': 'array',
        'Dict': 'object'
    }
    
    if isinstance(annotation, ast.Name):
        return type_map.get(annotation.id, annotation.id)
    elif isinstance(annotation, ast.Subscript) and isinstance(annotation.value, ast.Name):
        base_type = annotation.value.id
        return type_map.get(base_type, base_type.lower())
    
    return "string"

def parse_docstring_params(docstring: str) -> Dict[str, str]:
    """Extract parameter descriptions from docstring (handles both Args: and Parameters: formats)."""
    params = {}
    lines = docstring.split('\n')
    in_params = False
    current_param = None
    
    for line in lines:
        stripped = line.strip()
        
        # Check for parameter section start
        if stripped in ['Args:', 'Arguments:', 'Parameters:', 'Params:']:
            in_params = True
            current_param = None
        elif stripped.startswith('Returns:') or stripped.startswith('Raises:'):
            in_params = False
        elif in_params:
            # Parse parameter line (handles "param: desc" and "- param: desc" formats)
            if ':' in stripped and (stripped[0].isalpha() or stripped.startswith(('-', '*'))):
                param_name = stripped.lstrip('- *').split(':')[0].strip()
                param_desc = ':'.join(stripped.lstrip('- *').split(':')[1:]).strip()
                params[param_name] = param_desc
                current_param = param_name
            elif current_param and stripped:
                # Continuation of previous parameter description
                params[current_param] += ' ' + stripped
    
    return params


def get_tool_descriptions_from_node(tool_node):
    """Extract tool descriptions from the ToolNode object."""
    descriptions = []
    
    if hasattr(tool_node, 'tools_by_name'):
        tools_by_name = tool_node.tools_by_name
        
        for tool_name, tool in tools_by_name.items():
            function_string = inspect.getsource(globals()[tool_name])
            # function_string = inspect.getsource(getattr(tool_name))
            result = parse_function_definition(function_string)

            if result:
                descriptions.append(result)
    
    return descriptions if descriptions else "Could not extract tool descriptions"
def lc_messages_to_regular_messages(msg):

    if isinstance(msg, dict):
        
        if msg.get("role") == "user":
            return {"role": "user", "content": msg["content"]}
        elif msg.get("role") == "assistant":
            return {"role": "assistant", "content": msg["content"]}
        elif msg.get("role") == "tool":
            return {
                "role": "tool", 
                "content": msg["content"], 
                "tool_call_id": msg.get("tool_call_id")
            }
        
    elif isinstance(msg, AIMessage):

        result = {
            "role": "assistant",
            "content": msg.content
        }
        
        if hasattr(msg, 'tool_calls') and msg.tool_calls and len(msg.tool_calls) > 0 and not msg.tool_calls[0].get("name").startswith("functions."):
            result["tool_calls"] = [
                {
                    "id": tc["id"],
                    "type": "function",
                    "function": {
                        "name": tc["name"].replace("functions.", ""),
                        "arguments": json.dumps(tc["args"])
                    }
                }
                for tc in msg.tool_calls
            ]
            
        return result
    
    elif isinstance(msg, ToolMessage):

        return {"role": "tool", "content": msg.content, "tool_call_id": msg.tool_call_id}
    
    else:

        return {"role": "user", "content": str(msg)}


async def get_tool_descriptions_from_mcp_servers(mcp_servers: list[str]) -> list[dict]:

    tool_descriptions = []

    for mcp_server in mcp_servers:
        client = FastMCPClient(mcp_server)
        async with client:
            tools = await client.list_tools()
            for tool in tools:
                tool_description = {
                    'name': tool.name,
                    'description': tool.description.split("\n\n")[0],
                    'required': tool.inputSchema.get('required', []),
                    'returns': {
                        'type': 'string',
                        'description': tool.description.split("Returns:")[1].strip(),
                    },
                    'parameters': {
                        'type': 'object',
                        'properties': {}
                    },
                    'server': mcp_server,
                }
                property_descriptions = parse_docstring_params(tool.description)
                properties = tool.inputSchema.get('properties', {})

                for key, value in properties.items():
                    properties[key]['description'] = property_descriptions.get(key, '')

                tool_description['parameters']['properties'] = properties
                tool_descriptions.append(tool_description)

    return tool_descriptions

# MCP tool node

async def mcp_tool_node(state) -> str:
    tool_messages = []

    for i, tool_call in enumerate(state.mcp_tool_calls):
        client = FastMCPClient(tool_call.server)
        async with client:
            result = await client.call_tool(tool_call.name, tool_call.arguments)
            tool_message = ToolMessage(
                content=result,
                tool_call_id=f'call_{i}'
            )
            tool_messages.append(tool_message)

    return {
        'messages': tool_messages,
    }

### Coordinator agent

In [19]:
class MCPToolCall(BaseModel):
    name: str
    arguments: dict
    server: str

class ToolCall(BaseModel):
    name: str
    arguments: dict

class RAGUsedContext(BaseModel):
    id: str
    description: str
    
class Delegation(BaseModel):
    agent: str
    task: str = Field(default="")

class CoordinatorAgentResponse(BaseModel):
    next_agent: str
    plan: list[Delegation]
    final_answer: bool = Field(default=False)
    answer: str

class State(BaseModel):
    messages: Annotated[List[Any], add] = []
    answer: str = ""

    coordinator_iteration: int = Field(default=0)
    product_qa_iteration: int = Field(default=0)
    shopping_cart_iteration: int = Field(default=0)

    coordinator_final_answer: bool = Field(default=False)
    product_qa_final_answer: bool = Field(default=False)
    shopping_cart_final_answer: bool = Field(default=False)

    product_qa_available_tools: List[Dict[str, Any]] = []
    shopping_cart_available_tools: List[Dict[str, Any]] = []

    tool_calls: Optional[List[ToolCall]] = Field(default_factory=list)
    mcp_tool_calls: Optional[List[MCPToolCall]] = Field(default_factory=list)
    retrieved_context: List[RAGUsedContext] = Field(default_factory=list)
    
    user_id: str = ""
    cart_id: str = ""

    next_agent: str = ""
    plan: list[Delegation] = Field(default_factory=list)

    trace_id: str = ""

In [20]:
def coordinator_agent_node(state) -> dict:
    template = """
        You are a Coordinator Agent as part of a shopping assistant.

        Your role is to create plans for solving user queries and delegate the tasks accordingly.
        You will be given a conversation history, your task is to create a plan for solving the user's query.
        After the plan is created, you should output the next agent to invoke and the task to be performed by that agent.
        Once an agent finishes its task, you will be handed the control back, you should then review the conversation history and revise the plan.
        If there is a sequence of tasks to be performed by a single agent, you should combine them into a single task.

        The possible agents are:

        - product_qa_agent: The user is asking a question about a product. This can be a question about available products, their specifications, user reviews etc.
        - shopping_cart_agent: The user is asking to add or remove items from the shopping cart or questions about the current shopping cart.

        CRITICAL RULES:
        - If next_agent is "", final_answer MUST be false
        (You cannot delegate the task to an agent and return to the user in the same response)
        - If final_answer is true, next_agent MUST be ""
        (You must wait for agent results before returning to user)
        - If you need to call other agents before answering, set:
        next_agent="...", final_answer=false
        - After receiving agent results, you can then set:
        next_agent="", final_answer=true
        - One of the following has to be true:
        next_agent is "" and final_answer is true
        next_agent is not "" and final_answer is false

        Additional instructions:

        - Do not route to any agent if the user's query needs clarification. Clarify it yourself by setting next_agent to "" and final_answer to true.
        - Write the plan to the plan field.
        - Write the next agent to invoke to the next_agent field.
        - Once you have all the information needed to answer the user's query, you should set the final_answer field to True and output the answer to the user's query.
        - The final answer to the user query should be a comprehensive answer that explains the actions that were performed to answer the query.
        - Never set final_answer to true if the plan is not complete.
        - You should output the next_agent field as well as the plan field.
        """
    
    prompt = template

    messages = state.messages

    conversation = []

    for msg in messages:
        conversation.append(lc_messages_to_regular_messages(msg))

    client = instructor.from_openai(OpenAI(api_key=os.getenv("OPENAI_API_KEY")))

    response, raw_response = client.chat.completions.create_with_completion(
            model="gpt-4.1",
            response_model=CoordinatorAgentResponse,
            messages=[{"role": "system", "content": prompt}, *conversation],
            temperature=0,
    )


    return {
        # "messages": ai_message,
        "next_agent": response.next_agent,
        "plan": response.plan,
        "coordinator_final_answer": response.final_answer,
        "coordinator_iteration": state.coordinator_iteration + 1,
        "answer": response.answer,
        "trace_id": ''
    }

In [21]:
initial_state = State(messages=[    
    {"role": "user", "content": "What is the weather in Tokyo?"}
])

answer = coordinator_agent_node(initial_state)

In [22]:
pprint(answer)

In [23]:
ls_client = Client(api_key=os.getenv("LANGSMITH_API_KEY"))

In [24]:
def next_agent_evaluator(run, example):
    next_agent_match = run.outputs['next_agent'] == example.outputs['next_agent']
    final_answer_match = run.outputs['coordinator_final_answer'] == example.outputs['coordinator_final_answer']

    return all([next_agent_match, final_answer_match])

In [25]:
results = ls_client.evaluate(
    lambda x: coordinator_agent_node(State(messages=x['messages'])),
    data="coordinator-evaluation-dataset",
    evaluators = [
        next_agent_evaluator,
    ],
    experiment_prefix='coordinator-evaluation-dataset'
)

  from .autonotebook import tqdm as notebook_tqdm


View the evaluation results for experiment: 'coordinator-evaluation-dataset-73ef46ca' at:
https://smith.langchain.com/o/c5906f8c-2b0e-445e-8ba7-ea63d7432c8c/datasets/a8a58390-b576-4b77-8875-b5c42217c390/compare?selectedSessions=31862d6b-9e26-4c33-9318-75a0cfd1dd25




10it [00:27,  2.71s/it]


In [26]:
results

Unnamed: 0,inputs.messages,outputs.next_agent,outputs.plan,outputs.coordinator_final_answer,outputs.coordinator_iteration,outputs.answer,outputs.trace_id,error,reference.next_agent,reference.coordinator_final_answer,feedback.next_agent_evaluator,execution_time,example_id,id
0,"[{'role': 'user', 'content': 'Can you add two,...",product_qa_agent,[agent='product_qa_agent' task='Find two red t...,False,1,I will first look for two red tablets that are...,,,product_qa_agent,False,True,3.571108,27ab4cc7-0aeb-4fad-9cdd-fb8d4e21d282,0b75afe0-6712-4cd0-a9b9-02f5d6cf00ca
1,"[{'role': 'user', 'content': 'Can you help me ...",,[],True,1,Could you please clarify what specific help yo...,,,,True,True,2.235633,9dd03758-0d53-4596-aaf0-6b0916c520b0,934410b4-1f12-4a6c-a4ad-3a43010860a2
2,"[{'role': 'user', 'content': 'What kind of stu...",product_qa_agent,[agent='product_qa_agent' task='Provide an ove...,False,1,I will retrieve information about the types of...,,,,True,False,1.624167,177e9895-2a61-41db-918f-3aa7ab0328e5,db90969c-b13a-452a-a450-5d44a89986ed
3,"[{'role': 'user', 'content': 'Can you put the ...",product_qa_agent,[agent='product_qa_agent' task='Identify the i...,False,1,"To fulfill your request, I will first identify...",,,product_qa_agent,False,True,1.963981,3b283d96-275b-4dfb-a88b-5f936af4bbc7,fad60da9-be74-4298-9dbe-c1b28b5598dd
4,"[{'role': 'user', 'content': 'Can you find som...",shopping_cart_agent,"[agent='shopping_cart_agent' task=""Retrieve th...",False,1,"First, I will check which items are currently ...",,,shopping_cart_agent,False,True,6.036264,4e41acd2-24c7-4e9e-aee6-a20ebea06f57,f31ca4f4-a466-4d51-86b7-d16a630bf41c
5,"[{'role': 'user', 'content': 'Can you add the ...",product_qa_agent,[agent='product_qa_agent' task='Find the best ...,False,1,I will first find the best laptop bags availab...,,,product_qa_agent,False,True,1.977017,70fcc019-367e-4406-9a96-5ac13e450ea6,eff9bbda-3b4c-4b62-9a0d-7cdcfd6b5569
6,"[{'role': 'user', 'content': 'Can you add thos...",product_qa_agent,[agent='product_qa_agent' task='Identify which...,False,1,I need to clarify which earphones you are refe...,,,,True,False,2.158475,5fefc96c-34df-4856-b691-a72997264df5,264efafa-b808-4f9c-8fdc-631e6ef98739
7,"[{'role': 'user', 'content': 'Can you add an i...",shopping_cart_agent,"[agent='shopping_cart_agent' task=""Add the ite...",False,1,I will add the item with ID B09NLTDHQ6 to your...,,,shopping_cart_agent,False,True,1.14958,5bff47b3-d75b-46c7-8878-0c994fc2a002,c8df9a90-4f49-4890-92bd-1608ee45009f
8,"[{'role': 'user', 'content': 'Can I get some e...",product_qa_agent,[agent='product_qa_agent' task='Find available...,False,1,I will look up available earphones for you and...,,,product_qa_agent,False,True,2.856572,4700be65-c001-4875-bbef-850f84e24c38,24cd002c-ddd0-4277-942d-33c766246d44
9,"[{'role': 'user', 'content': 'What is the weat...",,[],True,1,"I'm sorry, but I can only assist with shopping...",,,product_qa_agent,True,False,3.001133,29cb4033-3d2b-4a29-a64a-a15fb482c795,bc2c0ef2-7bb6-4915-90f6-07a4d0df2bc2


In [27]:
results.experiment_name

'coordinator-evaluation-dataset-73ef46ca'

In [29]:
results_resp = ls_client.read_project(
    project_name=results.experiment_name,
    include_stats=True,
)

pprint(results_resp)    

In [31]:
results_resp.feedback_stats['next_agent_evaluator']['avg']

0.7

In [32]:
def next_agent_core_evaluator(run, example):
    next_agent_match = run.outputs['next_agent'] == example.outputs['next_agent']
    final_answer_match = run.outputs['coordinator_final_answer'] == example.outputs['coordinator_final_answer']
    return all([next_agent_match, final_answer_match])

def next_agent_evaluator_groq_llama_3_3_70b_versatile(run, example):
    return next_agent_core_evaluator(run, example)

evaluator = next_agent_evaluator_groq_llama_3_3_70b_versatile

evaluator.__name__

'next_agent_evaluator_groq_llama_3_3_70b_versatile'

In [33]:
str(evaluator)

'<function next_agent_evaluator_groq_llama_3_3_70b_versatile at 0x13bcda7a0>'