## Hybrid Retrieval and Agent Integration - PoC - 3
Here, we will send the entire messages from subgraph `messages` back to the parent graph. The goal is for the parent-graph to be able to answer follow-up questions, like -- 

- Give me the code used to generate your statistics.
- What methodology did you use to generate your results.
- Which filters did you apply.

Since the current data-analyst subgraph only reports back with the final answer (and not the intermediate code and steps), the parent graph is unable to answer any follow-up questions like the ones demoe'd above.

Let's get started.

In [1]:
import os
import io
import sys
import json
import traceback
import regex as re
import pandas as pd
from pyjstat import pyjstat
from textwrap import dedent
from pyjstat import pyjstat
from pydantic import BaseModel, Field
from typing_extensions import TypedDict
from typing import Dict, List, Any, Literal, Annotated

from langgraph.types import Command
from langchain_core.tools import tool
from langgraph.graph.message import add_messages
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, AIMessage, ToolMessage
from langgraph.prebuilt import InjectedState
from langchain_core.tools import tool, InjectedToolCallId
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.messages.ai import AIMessageChunk
from langgraph.store.postgres import PostgresStore
from langgraph.checkpoint.postgres import PostgresSaver


In [2]:
from pathlib import Path
import os

root = Path().absolute().parents[1]
os.chdir(str(root))

from src.helpers.json_stat_archive_db import JSONStatArchiveDB

In [3]:
llm_low = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash-lite",
    temperature=0.5,
    max_tokens=None,
    timeout=None,
    max_retries=2,
)
llm_med = ChatGoogleGenerativeAI(
    model="gemini-2.5-flash",
    temperature=0.5,
    max_tokens=None,
    timeout=None,
    max_retries=2,
)
llm_high  = ChatGoogleGenerativeAI(
    model="gemini-2.5-pro",
    temperature=0.5,
    max_tokens=None,
    timeout=None,
    max_retries=2,
)

In [4]:
def _run_python_safely(code: str) -> Dict[str, Any]:
    """
    Minimal sandbox runner. In production: isolate with subprocess, container, time & mem limits.
    Returns {"stdout": str, "error": {"type":, "message":, "trace":}} on failure.
    """
    stdout_capture = io.StringIO()
    old_stdout, old_stderr = sys.stdout, sys.stderr
    sys.stdout = stdout_capture
    sys.stderr = stdout_capture  # co-mingle
    globals_dict = {"__name__": "__main__"}
    try:
        exec(code, globals_dict, globals_dict)
        out = stdout_capture.getvalue()
        return {"stdout": out}
    except Exception as e:
        err = {"type": e.__class__.__name__, "message": str(e), "trace": traceback.format_exc()}
        return {"stdout": stdout_capture.getvalue(), "error": err}
    finally:
        sys.stdout, sys.stderr = old_stdout, old_stderr

@tool(name_or_callable="python_code_executor", parse_docstring=True)
def python_code_executor(
    code: str,
    description: str,
    state: Annotated[dict, InjectedState],
    tool_call_id: Annotated[str, InjectedToolCallId],
) -> str:
    """
    Executes the given Python code and returns the output.
    
    Args:
        code (str): The Python code to execute.
        description (str): A short description of the code being executed.
    
    Returns:
        str: The output of the executed code.
    """
    # print("Executing code in python_code_executor: ", description, "\n")
    result = _run_python_safely(code)

    existing_reports = state.get("report", [])
    

    if 'error' in result:
        current_report = {
            "task": description,
            "code": code,
            "result": result.get("error", ""),
        }
        existing_reports.append(current_report)
        return Command(
            update={
                "messages": [
                    ToolMessage(
                        content=f"Error executing code: {result['error']}",
                        tool_call_id=tool_call_id,
                        name="python_code_executor"
                    )
                ],
                "report": existing_reports,
            }
        )
    else:
        # print("Result from python_code_executor: ", str(result['stdout'])[:100])
        current_report = {
            "task": description,
            "code": code,
            "result": result.get("stdout", ""),
        }
        existing_reports.append(current_report)
        return Command(
            update={
                "messages": [
                    ToolMessage(
                        content=result['stdout'],
                        tool_call_id=tool_call_id,
                        name="python_code_executor"
                    )
                ],
                "report": existing_reports
            }
        )

TOOLS = [python_code_executor]

llm_med_with_code_exec_tool = llm_med.bind_tools(TOOLS, allowed_function_names=["python_code_executor"])
# llm_high_with_code_exec_tool = llm_high.bind_tools(TOOLS, allowed_function_names=["python_code_executor"])

In [5]:
class ReportModel(BaseModel):
    """Represents a report generated by the data-analysis agent."""
    task: str = Field(description="The task to be performed by the data-analysis agent.")
    code: str = Field(description="The code to be executed by the data-analysis agent that performs the requested task.")
    result: str = Field(description="The result of code execution.")

class State(TypedDict):
    """State for the analyst agent."""
    messages: Annotated[list[BaseMessage], add_messages] = Field(default=[], description="The messages for the agent.")
    table_id: str = Field(description="The ID of the table being queried.")
    question: str = Field(description="The question asked by the user.")
    analysis_plan: str = Field(description="The rough analysis plan for the agent to follow.")
    context: str = Field(default=None, description="The context for the agent to use.")
    iters: int = Field(default=0, description="The number of iterations the agent has gone through.")
    report: List[ReportModel] = Field(default=[], description="The report generated by the agent.")


In [6]:
cso_archive_reader = JSONStatArchiveDB(compression_level=12)

In [7]:
SYSTEM_PROMPT_ANALYST = dedent(
    f"""\
        # ROLE: I am a Data Analyst Agent that has access to Python-shell tool/function - `python_code_executor(code: str, description: str)`.

        # INSTRUCTIONS:
            - I call the `python_code_executor` tool to analyse the data.
            - Once I feel I know enough, I give a crisp and concise answer to the user's question.

        # NOTE:
            - The python-script should import necessary libraries (pandas, numpy, os, pathlib, etc) to read the CSV file and perform data manipulation 
            - The python-script use `print` statements for printing any statistics that you need to fetch.
            - In a single tool-call to `python_code_executor`, I ask for a single statistic to be fetched.
            - I keep the commentary limited in this step.
            - Once I have enough statistics to answer the user's question, I give a crisp and concise answer to user's question, with proper data backing it up.
        
        # WARNINGS:
            - For tool-calls to `python_code_executor` tool, only send the python code as `code` parameter
            - Do not include any visualizations or plots in the code.
            - In case I get reported back with any errors in executing the python code, I should make necessary corrections and call the python_code_executor tool to re-run the code.
            - The "Analysis-Plan" provided to me is a rough plan, I should adapt it as per the data available in the table.
        
        # TIPS:
            - For high cardinality columns, consider using simple keyword based filtering (like `str.contains('abc|xyz')`). Also consider items / categories related to said keywords.
    """
)

def analyst_agent(state: State) -> str:
    """
    The analyst agent has access to the Python-Shell tool and uses it answer the user query by analysing the data available in context.

    Args:
        state (State): The state containing the messages and other data.
    
    Returns:
        str: The response from the analyst agent.
    """
    try:
        table_id = state["table_id"]
        question = state["question"]
        analysis_plan = state["analysis_plan"]
        context = state["context"]
        old_messages = state["messages"]
        report = state.get("report", [])

        system_prompt = SYSTEM_PROMPT_ANALYST
        iters = state.get("iters", 0)
        iters += 1

        context = f"CONTEXT:\n{context}\n\nANALYSIS PLAN:\n{analysis_plan}"
        msgs = [
            SystemMessage(content=system_prompt, name="analyst_agent"),
            SystemMessage(content=context, name="analyst_agent"),
            HumanMessage(content=question, name="analyst_agent"),
        ] + old_messages
        
        if iters <= 10:
            # print("Running data-analyst agent...")
            res = llm_med_with_code_exec_tool.invoke(msgs)
        else:
            # print("Stopping tool-calls as max-iterations reached. Generating final response...")
            res = llm_med.invoke(msgs)
            return {"messages": [res], "iters": iters, "context": context, "report": report}

        if isinstance(res, AIMessage):
            return {"messages": [res], "iters": iters, "context": context, "report": report}
        else:
            res = AIMessage("Error generating code. Please try again.")
            return {"messages": [res], "iters": iters, "context": context, "report": report}
    except Exception as e:
        res = AIMessage(f"Error occurred during analysis: {str(e)}")
        return {"messages": [res], "iters": iters, "context": context, "report": report}


In [8]:
def has_tool_calls(msg: AIMessage) -> bool:
    """
    Check if the AIMessage has tool calls.
    
    Args:
        msg (AIMessage): The AI message to check.
    
    Returns:
        bool: True if the message has tool calls, False otherwise.
    """
    return getattr(msg, "tool_calls", []) != [] or \
           bool(getattr(msg, "additional_kwargs", {}).get("function_call") or \
                getattr(msg, "additional_kwargs", {}).get("tool_calls"))


In [9]:
tool_node = ToolNode(TOOLS)

analyst_graph_builder = StateGraph(State)

analyst_graph_builder.add_node("analyst_agent", analyst_agent)
analyst_graph_builder.add_node("tools", tool_node)

analyst_graph_builder.add_edge(START, "analyst_agent")
# analyst_graph_builder.add_conditional_edges("analyst_agent", custom_tools_condition, {"__end__": END, "tools": "tools"})
analyst_graph_builder.add_conditional_edges("analyst_agent", tools_condition)
analyst_graph_builder.add_edge("tools", "analyst_agent")

analyst_graph = analyst_graph_builder.compile()
# analyst_graph

In [10]:
# table_ids = ["PCA21", "PCA23", "PCA22"]

# res = analyst_graph.batch(
#     [{
#         "table_id": table_ids[0],
#         "question": "What's the issue here?",
#         "context": "context",
#         "analysis_plan": "analysis_plan"
#     },
#     {
#         "table_id": table_ids[1],
#         "question": "What's the issue here?",
#         "context": "context",
#         "analysis_plan": "analysis_plan"
#     },
#     {
#         "table_id": table_ids[2],
#         "question": "What's the issue here?",
#         "context": "context",
#         "analysis_plan": "analysis_plan"
#     }]
# )
# print("Done")

## Fixing context-isolation problem

state:
    - messages: List[str]
    - relevant_docs: List[Dict]
        - table_id: str
        - context: str
        - analysis_plan: str
    - question: str
    - iter: int
         
reviewer-agent
- has access to data-cso hybrid-retrieval tool - given a user prompt, it may choose to call this hybrid-retrieval tool. This tool updates the `relevant_docs` in state, and also returns a ToolMessage containing all context for all tables it retrieved [CHANGE: I've integrated the planner part in this tool itself, so no 2 separate tools now...]
- [OBSELETE; ignore] it also has access to the planner tool, which is basically an agent. it takes in the `messages`, creates a low-level plan for each data-source analysis and updates the `relevant_docs` with the plan. Also it returns a ToolMessage containing the plan for all tables
- finally, it has access to the data-analyst subgraph as a tool. It can make multiple tool-calls to this tool. The tool node containing this subgraph will call the subgraph with the `table_id`, `question`, `analysis_plan`, and the `context`.

Giving this level of autonomy may have the following effect:
- Pros: context-isolation problem may be fixed
- Cons: redundant recursive loops, uncertainty in making tool-calls / malformed tool-calls, hallucinations, or even premature answering

In [11]:
class AnalysisPlanSubModel(BaseModel):
    table_id: str = Field(description="The table-ID.")
    analysis_plan: List[str] = Field(description="The low-level analysis plan for the table-ID. Contains a list of steps.")

class AnalysisPlanModel(BaseModel):
    responses: List[AnalysisPlanSubModel] = Field(description="List of analysis plans for the table-IDs.")

# Graph State Models
class RelevantTablesModel(BaseModel):
    """
    Model representing a relevant table for the analysis.
    """
    table_id: str = Field(description="The ID of the relevant table.")
    context: str = Field(description="The statistical data profile of the table.")
    analysis_plan: List[str] = Field(description="The low-level analysis plan for the table. Contains a list of steps.")


class ParentState(TypedDict):
    """State for the parent graph."""
    messages: Annotated[list[BaseMessage], add_messages] = Field(default=[], description="The messages for the parent graph.")
    question: str = Field(default=None, description="The question asked by the user.")
    iter: int = Field(default=0, description="The iteration count for the current state.")
    relevant_tables_metadata: List[RelevantTablesModel] = Field(default=[], description="List of relevant table IDs, their contexts, and analysis plans.")
    reports: Dict[str, List[ReportModel]] = Field(default={}, description="Dictionary mapping table IDs to their analysis reports.")

In [12]:
from src.helpers.hybrid_retrieval import HybridRetrieval


retriever = HybridRetrieval(top_k_stage_1=200, top_k_stage_2=20)


def _create_table_analysis(df: pd.DataFrame, table_id: str) -> dict:
    """
    Analyzes the table and returns a dictionary with the analysis results.

    Args:
        df (pd.DataFrame): The DataFrame containing the CSO data.

    Returns:
        dict: A dictionary containing the analysis results.
    """
    csv_fp = f"cache/{table_id}.csv"
    try:
        table_shape = df.shape
        table_sample = pd.concat([df.head(5), df.tail(5)]) if len(df) > 10 else df
        
        table_info_df = pd.DataFrame({
            "columns": df.columns,
            "dtypes": [str(df[col].dtype) for col in df.columns],
            "nunique": [df[col].nunique() if df[col].nunique() <= 50 else '>50' for col in df.columns],
            "nulls": [df[col].isnull().sum() for col in df.columns]
        })

        context_list = [
            f"**Table ID:** {table_id}",
            f"- **CSV File Path**: {csv_fp}",
            f"- **Table Shape**: {table_shape}",
            "- **Table Info**:",
            table_info_df.to_string(index=False),
            "- **Table Sample (first and last 5 rows)**:",
            table_sample.to_string(index=False),
        ]

    except Exception as e:
        context_list = None

    return context_list


@tool("hybrid_retrieval_tool", parse_docstring=True)
def hybrid_retrieval_tool(
    user_prompt: str,
    tool_call_id: Annotated[str, InjectedToolCallId],
) -> Command:
    """
    Tool to invoke the hybrid retrieval and return relevant table IDs.
    
    Args:
        user_prompt (str): The user's question or prompt.

    Returns:
        Command: The command to update the chat with the relevant table IDs.
    """
    # print("HYBRID_RETRIEVAL_TOOL: Invoking hybrid retrieval tool...")
    relevant_tables_ids = retriever.search(query=user_prompt)
    # print(f"HYBRID_RETRIEVAL_TOOL: LLM selected {len(relevant_tables_ids)} relevant tables.")

    if not relevant_tables_ids:
        # print("HYBRID_RETRIEVAL_TOOL: No relevant tables found.")
        return Command(
            update={
                "messages": [
                    ToolMessage(
                        content="No relevant tables found.",
                        tool_call_id=tool_call_id,
                        name="relevant_tables_metadata",
                    )
                ]
            }
        )
    else:
        # print(f"HYBRID_RETRIEVAL_TOOL: Found {len(relevant_tables_ids)} relevant tables.")

        csv_save_dir = "cache/"
        relevant_tables_metadata = []
        contexts_dict = {}


        for table_id in relevant_tables_ids:
            csv_fp = csv_save_dir + f"{table_id}.csv"

            # check if "<table_id>.csv" exists. If not, read the pyjstat-file from artifacts and save the DataFrame as "<table_id>.csv"
            if not os.path.exists(csv_fp):
                for _, ds, _ in cso_archive_reader.read("artifacts/cso_bkp/cso_archive/jsonstat_archive.sqlite", table_id=table_id, with_labels=True):
                    df: pd.DataFrame = pyjstat.from_json_stat(ds)[0]
                df.to_csv(csv_fp, index=False)
            else:
                df = pd.read_csv(csv_fp)
            
            context_list = _create_table_analysis(df, table_id)
            contexts_dict[table_id] = "\n".join(context_list)
            
        system_message = dedent(
            """\
                # ROLE: I am a planner agent.

                # RETURN FORMAT (Pydantic):
                    - table_id: str = Field(description="The ID of the table.")
                    - analysis_plan: list[str] = Field(description="The low-level analysis plan for the table-ID. Contains a list of steps.")
                ```

                # INSTRUCTIONS:
                - Create a high-level plan for the data-analyst agent to carry out its analysis step-by-step.
                - Be concise, do not go over 3-4 steps.
            """
        )

        inputs = []
        for table_id in relevant_tables_ids:
            context = contexts_dict[table_id]
            human_message = f"Question : {user_prompt}\n\n Context:\n{context}"
            inputs.append((SystemMessage(content=system_message, name="planner_agent"), HumanMessage(content=human_message, name="user")))

        msgs = llm_med.with_structured_output(AnalysisPlanSubModel).batch(inputs)
        res_list = [msg.model_dump() for msg in msgs]

        # print(f"HYBRID_RETRIEVAL_TOOL: Planner agent created analysis plans for {len(res_list)} tables.")
        for res_dict in res_list:
            table_id = res_dict["table_id"]
            analysis_plan = res_dict["analysis_plan"]

            if table_id in relevant_tables_ids:
                relevant_tables_metadata.append(
                    {
                        "table_id": table_id,
                        "context": contexts_dict.get(table_id, ""),
                        "analysis_plan": analysis_plan
                    }
                )

        tool_message_list = []
        for relevant_table_metadata in relevant_tables_metadata:
            tool_message_list.append(f"Table ID: {relevant_table_metadata['table_id']}")
            tool_message_list.append(f"Context: {relevant_table_metadata['context']}")
            tool_message_list.append(f"Analysis Plan: {relevant_table_metadata['analysis_plan']}")
            tool_message_list.append("")

        tool_message = "\n".join(tool_message_list)

        return Command(
            update={
                "relevant_tables_metadata": relevant_tables_metadata,
                "messages": [
                    ToolMessage(
                        # content=tool_message,
                        content=f"Found {len(relevant_tables_ids)} relevant tables. The table-IDs are: {', '.join(relevant_tables_ids)}",
                        tool_call_id=tool_call_id,
                        name="relevant_tables_metadata"
                    )
                ]
            }
        )

@tool("data_analyst_tool", parse_docstring=True)
def data_analyst_tool(
    table_ids: List[str],
    # question: str,
    state: Annotated[dict, InjectedState],
    tool_call_id: Annotated[str, InjectedToolCallId],
):
    """
    Tool to invoke the data analyst agent and return the final analysis.
    
    Args:
        table_ids (List[str]): A list containing table-IDs of the tables to analyze.
        state (dict): The current state of the conversation.
        tool_call_id (str): The ID of the tool call.

    Returns:
        Command: The command to update the chat with the data analyst's response.
    """
    question = state["question"]
    relevant_tables_metadata = state["relevant_tables_metadata"]
    reports_dict = state.get("reports", {})

    for relevant_table_metadata in relevant_tables_metadata:
        if relevant_table_metadata["table_id"] in table_ids:
            analysis_plan = "\n".join(relevant_table_metadata["analysis_plan"])
            context = relevant_table_metadata["context"]
            break
    
    batch = []

    for relevant_table_metadata in relevant_tables_metadata:
        table_id = relevant_table_metadata["table_id"]
        # print("XXX", table_id, table_ids)
        if table_id in table_ids:
            analysis_plan = "\n".join(relevant_table_metadata["analysis_plan"])
            context = relevant_table_metadata["context"]
            batch.append(
                {"table_id": table_id, "question": question, "context": context, "analysis_plan": analysis_plan}
            )

    # print("DATA_ANALYST_TOOL: Invoking data analyst tool...")
    responses = analyst_graph.batch(batch)

    content = []

    # print("LENGTH OF BATCH: ", len(batch))
    # print("LENGTH OF RESPONSES: ", len(responses))
    for i in range(len(responses)):
        table_id = batch[i]["table_id"]
        response = responses[i]
        reports_dict[table_id] = reports_dict.get(table_id, []) + response["report"]
        content.append(f"### Analysis for Table ID: {table_id}\n")
        content.append(response["messages"][-1].content)
        content.append("")

    content = "\n".join(content)

    return Command(
        update={
            "messages": [
                ToolMessage(
                    content=content,
                    tool_call_id=tool_call_id,
                    name="data_analyst_tool",
                )
            ],
            "reports": reports_dict
        }
    )

@tool("provenance_tool", parse_docstring=True) #, return_direct=True)
def provenance_tool(
    table_id: str,
    state: Annotated[dict, InjectedState],
    tool_call_id: Annotated[str, InjectedToolCallId],
):
    """
    Tool to retrieve the provenance information (analysis report containing low-level code used for generating analysis) for a specific table.

    Args:
        table_id (str): The ID of the table to retrieve provenance information for.
        state (dict): The current state of the conversation.
        tool_call_id (str): The ID of the tool call.

    Returns:
        Command: The command to update the chat with the provenance information.
    """
    report_text_list = []
    reports_dict = state.get("reports", {})
    report = reports_dict.get(table_id, [])

    for i, entry in enumerate(report):
        report_text_list.append(f"### Task {i+1}: {entry['task']}\n")
        report_text_list.append(f"**Code:**\n```python\n{entry['code']}\n```")
        report_text_list.append(f"**Output:**\n```python\n{entry['result']}\n```")
        report_text_list.append("")

    report_text = "\n".join(report_text_list)

    return Command(
        update={
            "messages": [
                ToolMessage(
                    content=report_text,
                    tool_call_id=tool_call_id,
                    name="provenance_tool",
                )
            ]
        }
    )

NEW_TOOLS = [hybrid_retrieval_tool, data_analyst_tool, provenance_tool]
llm_med_with_tools_poc = llm_med.bind_tools(NEW_TOOLS)

  from .autonotebook import tqdm as notebook_tqdm


In [13]:
def _to_text(content) -> str:
    # Handles str, list[str], list[dict-like parts], or anything else.
    if isinstance(content, str):
        return content
    if isinstance(content, list):
        pieces = []
        for part in content:
            if isinstance(part, str):
                pieces.append(part)
            elif isinstance(part, dict):
                # Gemini often uses {"type":"text","text": "..."} parts
                pieces.append(part.get("text", ""))
            else:
                # Some wrappers expose objects with a .text attr
                pieces.append(getattr(part, "text", str(part)))
        return "\n".join(pieces)
    return str(content)

def _coerce_ai_to_text(ai: AIMessage) -> AIMessage:
    text = _to_text(ai.content)
    # Rewrap as a new AIMessage keeping useful metadata
    return AIMessage(
        content=text,
        additional_kwargs=ai.additional_kwargs,
        response_metadata=getattr(ai, "response_metadata", None)
    )


reviewer_system_prompt = dedent(
    """\
        # GOAL: I'm a helpful assistant. My job is to answer the user-question using all the facts available.

        # INSTRUCTIONS:
        - I have the following tools available for my tasks:
            1. hybrid_retrieval_tool: To retrieve relevant table IDs from CSO-DATA based on user queries.
            2. data_analyst_tool: To run the analysis on the retrieved tables.
            3. provenance_tool: To fetch the provenance information (analysis report containing low-level code used for generating analysis) for a specific table.
        - Once I have the relevant data from tables, I will invoke the `data_analyst_tool` with the list of all all relevant table_ids.
        - Once I have the results from the analysis, I will answer the user's question in detail, using only the insights gained from the analysis.
        - I will cite the sources of my information (table-IDs) against each fact.
        - For follow up questions:
            1. If the user explicitly asks a follow-up question for the code / filters / methodology used in generating the insights, I will retrieve the provenance information by calling the `provenance_tool`, and only then answer the user follow-up question
            2. If the user asks for additional analysis or a different perspective on the data, I will re-run the analysis with the new parameters and provide the updated results.

        # RETURN FORMAT:
        - The final answer containing text / tables should be in Markdown format
    """
)

def reviewer_agent(state: ParentState):
    response = {}
    iter = state.get("iter", 0) + 1
    messages = state["messages"]

    if isinstance(messages[-1], HumanMessage):
        if state.get("question", None) is not None and messages[-1].content != state["question"]:
            iter = 0 # reset iteration for new questions

        response["question"] = messages[-1].content
    
    res = llm_med_with_tools_poc.invoke(
        [SystemMessage(content=reviewer_system_prompt, name="reviewer_agent")] + messages,
        config={"response_mime_type": "text/plain"},

    )

    if iter > 12:
        response["messages"] = [AIMessage("Max iterations reached, ending...")]
    else:
        res = _coerce_ai_to_text(res) if isinstance(res, AIMessage) and not has_tool_calls(res) else res
        response["messages"] = [res]
    
    response["iter"] = iter

    return response

In [14]:
new_tool_node = ToolNode(NEW_TOOLS)

new_graph_builder = StateGraph(ParentState)

new_graph_builder.add_node("reviewer_agent", reviewer_agent)
new_graph_builder.add_node("tools", new_tool_node)

new_graph_builder.add_edge(START, "reviewer_agent")
new_graph_builder.add_conditional_edges("reviewer_agent", tools_condition)
new_graph_builder.add_edge("tools", "reviewer_agent")


checkpointer = InMemorySaver()
app = new_graph_builder.compile(
    checkpointer=checkpointer,
)
# app

In [36]:
question = "detailed statistics on cosmetics, toileteries and related items production in prodcom data for ireland"

chunks = []

DB_URI = "postgresql://myuser:mypass@localhost:5432/mydatabase"
with (
    PostgresStore.from_conn_string(DB_URI) as store,
    PostgresSaver.from_conn_string(DB_URI) as checkpointer,
):
    store.setup()
    checkpointer.setup()

    app = new_graph_builder.compile(
        checkpointer=checkpointer,
        store=store,
    )

    config = {"configurable": {"thread_id": "5", "user_id": 1}}

    for chunk in app.stream(
        {"messages": [HumanMessage(content=question, name="user")]},
        config=config,
        subgraphs=True,
        stream_mode="messages",
    ):
        chunks.append(chunk)
        if not chunk[0]:
            text = chunk[1][0]
            if isinstance(text, AIMessageChunk) and text.content:
                print(text.content)


In [37]:
with (
    PostgresStore.from_conn_string(DB_URI) as store,
    PostgresSaver.from_conn_string(DB_URI) as checkpointer,
):
    store.setup()
    checkpointer.setup()

    config = {"configurable": {"thread_id": "4", "user_id": 1}}
    
    app = new_graph_builder.compile(
        checkpointer=checkpointer,
        store=store,
    )
    
    x = app.get_state(config)

In [35]:
x.values

{'messages': [HumanMessage(content='detailed statistics on cosmetics, toileteries and related items production in prodcom data for ireland', additional_kwargs={}, response_metadata={}, name='user', id='cbba1982-f61e-4d06-bb7e-69396ebb8427'),
  AIMessage(content='', additional_kwargs={'function_call': {'name': 'hybrid_retrieval_tool', 'arguments': '{"user_prompt": "detailed statistics on cosmetics, toileteries and related items production in prodcom data for ireland"}'}}, response_metadata={'finish_reason': 'STOP', 'model_name': 'gemini-2.5-flash', 'safety_ratings': []}, id='run--03b0a902-89ce-43ee-8d3a-52caf21f0d34', tool_calls=[{'name': 'hybrid_retrieval_tool', 'args': {'user_prompt': 'detailed statistics on cosmetics, toileteries and related items production in prodcom data for ireland'}, 'id': '7a1df3aa-3681-4e67-bf58-eefbccb13726', 'type': 'tool_call'}], usage_metadata={'input_tokens': 531, 'output_tokens': 106, 'total_tokens': 637, 'input_token_details': {'cache_read': 0}, 'output

In [41]:
chunks

[((),
  (AIMessageChunk(content='', additional_kwargs={'function_call': {'name': 'hybrid_retrieval_tool', 'arguments': '{"user_prompt": "detailed statistics on cosmetics, toileteries and related items production in prodcom data for ireland"}'}}, response_metadata={'finish_reason': 'STOP', 'model_name': 'gemini-2.5-flash', 'safety_ratings': []}, id='run--8182ae85-245b-4ac2-ba75-64101927df64', tool_calls=[{'name': 'hybrid_retrieval_tool', 'args': {'user_prompt': 'detailed statistics on cosmetics, toileteries and related items production in prodcom data for ireland'}, 'id': '71b20d20-70bb-4776-92bd-2767415585eb', 'type': 'tool_call'}], usage_metadata={'input_tokens': 531, 'output_tokens': 137, 'total_tokens': 668, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 100}}, tool_call_chunks=[{'name': 'hybrid_retrieval_tool', 'args': '{"user_prompt": "detailed statistics on cosmetics, toileteries and related items production in prodcom data for ireland"}', 'id': '

In [16]:
question = "QUESTION: Give me the breakdown of renewable energy resources as a percentage share and absolute numbers in 2022 in ireland."
question = "QUESTION: How's the energy production in ireland over the years? Do a detailed comparison of renewable vs non-renewable share of energy"
# question = "QUESTION: How's the wind energy production over the years?"
# question = "QUESTION: Did the Ukraine war have any impact on the energy production in ireland??"
# question = "QUESTION: How has nuclear energy done over the years in ireland?"
# question = "QUESTION: What's the impact of climate change on energy production in ireland?"
# question = "QUESTION: How's the energy production in ireland over 2021-2025? Do a detailed comparison of renewable vs non-renewable share of energy"
# question = "QUESTION: What's the total agricultural area in Ireland? Answer in detail."
question = "detailed statistics on cosmetics, toileteries and related items production in prodcom data for ireland"
# question = "detailed statistics on lignin based products and related items production in prodcom data for ireland"
# question = "Population growth in ireland over the last 2 decades?"
# question = "What are the production volume and value for agriculture sector. Within the agriculture sector, what is the share of different sector w.r.t volume and value."
# question = "Analyse the BIIESGE documents for me, and report the findings."

res = app.invoke({"messages": [HumanMessage(content=question, name="user")]})

I0000 00:00:1756065468.646763 2340568 fork_posix.cc:71] Other threads are currently calling into gRPC, skipping fork() handlers
                                                     

['Prodcom Sales 2018 (Euro Thousand)' 'Prodcom Sales 2018 (Volume)']


In [19]:
res["messages"][-1].content

"Here are the detailed statistics on cosmetics, toiletries, and related items production in Prodcom data for Ireland, based on the provided analyses:\n\n**2022 (Source: PCA22)**\n*   **Prodcom Sales:** 21,625.0\n*   **Prodcom Sales (Volume):** 3,830,078.0\n    *   *Products included:* cosmetic, toilet, perfume, soap, shampoo, make-up, fragrance, dental, and oral hygiene.\n\n**2019 (Source: PCA18)**\n*   **Prodcom Sales (Euro Thousand):**\n    *   **Powder-puffs and pads for the application of cosmetics or toilet preparations:** -100,000,000 Euro Thousand (sum and mean)\n    *   **Brushes for the application of cosmetics:** 4,132,561,000 Euro Thousand (sum and mean)\n*   **Prodcom Sales (Volume):**\n    *   **Powder-puffs and pads for the application of cosmetics or toilet preparations:** 0 (sum and mean)\n    *   **Brushes for the application of cosmetics:** 19,045 (sum and mean)\n\n**2018 (Source: PCA17)**\n*   **Prodcom Sales:** 10,711 Euro Thousand\n*   **Prodcom Sales (Volume):** 0

In [None]:
question_follow_up = "Can you re-do the analysis after removing the negative values, and re-report the results?"

res_follow_up = app.invoke({
    "messages": res["messages"] + [HumanMessage(content=question_follow_up)],
    "relevant_tables_metadata": res["relevant_tables_metadata"],
    "reports": res["reports"]
})

In [None]:
res_follow_up["messages"][-1].content

In [None]:
res_follow_up

## Streaming Output

In [15]:
question = "detailed statistics on cosmetics, toileteries and related items production in prodcom data for ireland"
# question = "Can you provide the code behind your analysis?"

chunks_dump = []
# for chunk in app.stream({"messages": [HumanMessage(content=question, name="user")]}):
#     chunks_dump.append(chunk)
#     print(chunk)

config = {"configurable": {"thread_id": "1"}}

for chunk in app.stream(
    {"messages": [HumanMessage(content=question, name="user")]},
    config=config,
    subgraphs=True,
    stream_mode="messages",
):
    chunks_dump.append(chunk)
    # print(chunk)
    if not chunk[0]:
        text = chunk[1][0]
        if isinstance(text, AIMessageChunk) and text.content:
            print(text.content)

I0000 00:00:1756069090.100994 2366349 fork_posix.cc:71] Other threads are currently calling into gRPC, skipping fork() handlers
                                                     

Total value for products containing 'cosmetics': 0.0


I0000 00:00:1756069779.136134 2366223 fork_posix.cc:71] Other threads are currently calling into gRPC, skipping fork() handlers


In [17]:
for chunk in chunks_dump:
    if not chunk[0]:
        text = chunk[1][0]
        if isinstance(text, AIMessageChunk) and text.content:
            print(text.content)

In [41]:
print(chunks_dump[-1][1][0].content)

Based on the Prodcom data for Ireland, here are the detailed statistics on the production of cosmetics, toiletries, and related items:

**Overall Sales and Production Values:**
*   **Total Prodcom Sales Value:** 1096.0 (PCA23)
*   **Total Prodcom Sales Volume:** 0.0 (PCA23)

**General Production Statistics:**
Across various categories of cosmetics, toiletries, and related items, the data exhibits significant anomalies, primarily large negative values (often -100,000,000 or -99,999,999.0) and zero medians, which likely represent suppressed or missing data.

*   **Table PCA22 (General Production Data):**
    *   Count: 26 entries
    *   Mean Value: -3.45e+07
    *   Standard Deviation: 4.86e+07
    *   Minimum Value: -1.00e+08
    *   25th Percentile: -1.00e+08
    *   50th Percentile (Median): 0.00e+00
    *   75th Percentile: 0.00e+00
    *   Maximum Value: 2.89e+06
    *   *Note: The prevalence of large negative values and a zero median suggests data anomalies or specific codes for m

In [30]:
len(chunks_dump)

77

In [28]:
chunks_dump[-27]

(('tools:dfb61db2-64d0-4e4a-8b9e-43934ec6c190', '3'),
 (AIMessageChunk(content=' were filtered out.', additional_kwargs={}, response_metadata={'finish_reason': 'STOP', 'model_name': 'gemini-2.5-flash', 'safety_ratings': []}, id='run--f4576a73-db20-4e2d-aa19-b4a8b6750443', usage_metadata={'output_token_details': {'reasoning': 0}, 'input_token_details': {'cache_read': 0}, 'output_tokens': 4, 'total_tokens': 4, 'input_tokens': 0}),
  {'thread_id': '1',
   'langgraph_step': 7,
   'langgraph_node': 'analyst_agent',
   'langgraph_triggers': ('branch:to:analyst_agent',),
   'langgraph_path': ('__pregel_pull', 'analyst_agent'),
   'langgraph_checkpoint_ns': 'tools:dfb61db2-64d0-4e4a-8b9e-43934ec6c190|3|analyst_agent:6e6c98fa-1fec-6276-9c71-11b46d5d3eee',
   'checkpoint_ns': 'tools:dfb61db2-64d0-4e4a-8b9e-43934ec6c190',
   'ls_provider': 'google_genai',
   'ls_model_name': 'gemini-2.5-flash',
   'ls_model_type': 'chat',
   'ls_temperature': 0.5}))

In [27]:
chunks_dump[-26]

((),
 (ToolMessage(content="### Analysis for Table ID: PCA23\n\nFor cosmetics, toiletries, and related items in the Prodcom data for Ireland:\n\n*   **Total Prodcom Sales Value:** 1096.0\n*   **Total Prodcom Sales Volume:** 0.0\n\n### Analysis for Table ID: PCA22\n\nThe production data for cosmetics, toiletries, and related items in Ireland shows the following statistics:\n\n*   **Count:** 26 entries\n*   **Mean Value:** -3.45e+07\n*   **Standard Deviation:** 4.86e+07\n*   **Minimum Value:** -1.00e+08\n*   **25th Percentile:** -1.00e+08\n*   **50th Percentile (Median):** 0.00e+00\n*   **75th Percentile:** 0.00e+00\n*   **Maximum Value:** 2.89e+06\n\nThe presence of large negative values (down to -1.00e+08) and a median of 0.00e+00 suggests that there might be data anomalies or specific codes used for missing/invalid data within the dataset. It's recommended to investigate the meaning of these negative values for a more accurate interpretation.\n\n### Analysis for Table ID: PCA18\n\nThe

In [29]:
chunks_dump[-25]

((),
 (AIMessageChunk(content='Based on the Prodcom data for Ireland, here are the detailed statistics on the production of cosmetics, toiletries, and related items:\n\n**Overall Sales and Production Values:**\n*   **Total Prodcom Sales Value:** 1096.0', additional_kwargs={}, response_metadata={'safety_ratings': []}, id='run--18e82cda-20bf-4057-a806-3b2ab4df5a0a', usage_metadata={'input_tokens': 1806, 'output_tokens': 336, 'total_tokens': 2142, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 285}}),
  {'thread_id': '1',
   'langgraph_step': 5,
   'langgraph_node': 'reviewer_agent',
   'langgraph_triggers': ('branch:to:reviewer_agent',),
   'langgraph_path': ('__pregel_pull', 'reviewer_agent'),
   'langgraph_checkpoint_ns': 'reviewer_agent:c300e5ad-d370-2dc1-807f-7f57d666d5bd',
   'checkpoint_ns': 'reviewer_agent:c300e5ad-d370-2dc1-807f-7f57d666d5bd',
   'response_mime_type': 'text/plain',
   'ls_provider': 'google_genai',
   'ls_model_name': 'gemini-2.5-

In [None]:
print(chunks_dump[-2][1][0].content)

99999.0) and zero medians. This makes a complete and detailed analysis challenging without further clarification on the meaning of these codes.


In [42]:
chunks_dump[1]

{'tools': {'relevant_tables_metadata': [{'table_id': 'PCA23',
    'context': '**Table ID:** PCA23\n- **CSV File Path**: cache/PCA23.csv\n- **Table Shape**: (7950, 4)\n- **Table Info**:\n  columns  dtypes nunique  nulls\nStatistic  object       2      0\n     Year   int64       1      0\n  Product  object     >50      0\n    value float64     >50      0\n- **Table Sample (first and last 5 rows)**:\n                  Statistic  Year                                                                                     Product       value\n         Prodcom Sales 2023  2023 07101010 Iron ores and concentrates. Non-agglomerated (excluding roasted iron pyrites) (kg)         0.0\n         Prodcom Sales 2023  2023     07101020 Iron ores and concentrates. Agglomerated (excluding roasted iron pyrites) (kg) -99999999.0\n         Prodcom Sales 2023  2023                                                  07291100 Copper ores and concentrates (kg)         0.0\n         Prodcom Sales 2023  2023          

In [43]:
chunks_dump[2]

{'reviewer_agent': {'messages': [AIMessage(content='', additional_kwargs={'function_call': {'name': 'data_analyst_tool', 'arguments': '{"table_ids": ["PCA23", "PCA22", "PCA18", "PCA17", "PCA15"]}'}}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'model_name': 'gemini-2.5-flash', 'safety_ratings': []}, id='run--aed208c2-24b5-432e-81cb-dc8f4cb88592-0', tool_calls=[{'name': 'data_analyst_tool', 'args': {'table_ids': ['PCA23', 'PCA22', 'PCA18', 'PCA17', 'PCA15']}, 'id': 'ac79a172-6bc4-4f46-ae03-9f4984a9c468', 'type': 'tool_call'}], usage_metadata={'input_tokens': 617, 'output_tokens': 142, 'total_tokens': 759, 'input_token_details': {'cache_read': 604}, 'output_token_details': {'reasoning': 100}})],
  'iter': 2}}

In [45]:
chunks_dump[4]

{'reviewer_agent': {'messages': [AIMessage(content='Here are the detailed statistics on cosmetics, toiletries, and related items production in Prodcom data for Ireland, based on the provided analyses:\n\n### Prodcom Data for 2023 (Table ID: PCA23)\n\nIn 2023, the production data for cosmetics, toiletries, and related items in Ireland shows varying sales figures. Notably, many product categories reported zero sales in both Euro and volume.\n\n| Product | Statistic | Value |\n|:---|:---|:---|\n| 13991900 Powder-puffs and pads for the application of cosmetics or toilet preparations (p/st) | Prodcom Sales 2023 | 0.0 |\n| 13991900 Powder-puffs and pads for the application of cosmetics or toilet preparations (p/st) | Prodcom Sales 2023 (Volume) | 0.0 |\n| 20412020 Anionic organic surface-active agents (excluding soap) (kg) | Prodcom Sales 2023 | 13820.0 |\n| 20412020 Anionic organic surface-active agents (excluding soap) (kg) | Prodcom Sales 2023 (Volume) | 30832763.0 |\n| 20412050 Non-ionic