In [1]:
import os
from langchain.chat_models import init_chat_model
from dotenv import load_dotenv

# Load from env
load_dotenv()

model = init_chat_model("gpt-4o-mini")

In [5]:
from pathlib import Path
import sys
# append dev directory to sys.path
sys.path.insert(0, str(Path.cwd().parent / "agent"))

In [None]:
# Import the data exploration agent
from data_exploration_subagent import build_agent as build_data_exploration_agent
from langchain.tools import tool

# Build the data exploration agent
data_exploration_agent = build_data_exploration_agent()

# Wrap data_exploration_agent as a tool
@tool("database_exploration_agent", description="Use this agent to explore and query the art database. Provide a natural language question about the art data.")
def call_data_exploration_agent(query: str) -> str:
    """
    Call the database exploration agent to answer questions about the art database.
    
    Args:
        query: A natural language question about the art data
        
    Returns:
        The response from the data exploration agent
    """
    result = data_exploration_agent.invoke(
        {"messages": [{"role": "user", "content": query}]}
    )
    return result["messages"][-1].content

from image_qna_tool import build_image_qna_tool
image_qna_tool = build_image_qna_tool()


tools = [call_data_exploration_agent, image_qna_tool]


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


database_exploration_agent: Use this agent to explore and query the art database. Provide a natural language question about the art data.

image_qna_tool: Use this tool when you want to answer questions that needs visual information from an image. The image should be a URL or a local file path.



In [None]:
from langgraph.graph import MessagesState
from typing import List

# Reducer 
def merge_tool_results(a: dict, b: dict) -> dict:
    return {**a, **b}

# Custom State for data exploration
class PlanningState(MessagesState):
    base_plan: List[str] | None 
    feedback: str | None
    replan_count: int = 0
    max_replan_count: int = 5

In [None]:
planning_system_prompt = """
    You are an efficient task planner. Your job is to only plan and list tasks or subtasks in the most efficient and resourceful way.
    You are given a user query/task and a list of tools.
    
    list the tasks in the following format:
    1. task 1
    2. task 2
    3. task 3
    ...

    plan and list the tasks in a way that each task can be solved by one of these tools.

    {goals}

    tools:
    1. image_qna_tool: This tool equipped with a visual question answering model that can answer questions related to the images in the database.
    2. database_exploration_agent: This tool can answer questions related to the database but limited to the scope of the schema.

    tools_description:
    {tools}    
    """

replanning_system_prompt = """
    You are an efficient task planner. Your job is to replan tasks based on feedback from failures.
    
    Previous plan had issues:
    {feedback}
    
    Please generate a revised plan that avoids this issue or approaches the problem differently.
    Current plan before revision:
    {base_plan}

    tools_description:
    {tools}
    
    Return the plan in the same numbered format (1. step1, 2. step2, etc.)
    """

In [None]:
import re
# Helper functions
def format_tools_for_prompt(tools):
    """Format tool descriptions for the prompt."""
    lines = []
    for i, tool in enumerate(tools, start=1):
        lines.append(f"{i}. {tool.name}: {tool.description}")
    return "\n".join(lines)

def parse_plan(plan_text: str) -> list[str]:
    """
    Converts a numbered plan string into a list of steps.
    
    Args:
        plan_text: A string with numbered steps
        
    Returns:
        A list of step descriptions without numbers
    """
    # Split on line breaks and remove empty lines
    lines = [line.strip() for line in plan_text.split("\n") if line.strip()]
    
    steps = []
    for line in lines:
        # Remove the number + dot prefix (e.g., "1. ", "2. ")
        step = re.sub(r"^\d+\.\s*", "", line)
        steps.append(step)
    
    return steps

In [None]:
from langchain_core.tools import tool

# Create update_plan_tool
@tool("update_plan_tool", description="Use this tool to initialize or update the task plan. Provide a natural language instruction to update the plan.")
def update_plan_tool(goal: str, feedback: str, base_plan: List[str] | None, ) -> List[str]:
    """
    Initialize or update the task plan based on the given instruction. An intelligent model will be used to generate the plan.
    
    Args:
        goal: The goal of the task
        feedback: The feedback from the user or previous task
        base_plan: The current task planning list
        
    Returns:
        The updated task planning list
    """

    # Initialize the plan if it is not provided
    if base_plan is None:

        # Initialize the plan
        system_prompt = planning_system_prompt.format(
            goals=goal, tools=format_tools_for_prompt(tools)
        )
        system_message = {
            "role": "system", 
            "content": system_prompt
        }

        plan = model.invoke(
            [system_message]
        )

        plan = parse_plan(plan.content)
        return plan
    
    # Replan 
    system_prompt = replanning_system_prompt.format(
        feedback=feedback, base_plan="\n".join(base_plan), tools=format_tools_for_prompt(tools)
    )
    system_message = {
        "role": "system", 
        "content": system_prompt
    }

    plan = model.invoke(
        [system_message]
    )
    plan = parse_plan(plan.content) 

    return plan