# Setup and Imports

In [9]:
!pip3 install -r ../requirements.txt

Collecting duckdb (from -r ../requirements.txt (line 4))
  Downloading duckdb-1.2.1-cp312-cp312-win_amd64.whl.metadata (995 bytes)
Downloading duckdb-1.2.1-cp312-cp312-win_amd64.whl (11.4 MB)
   ---------------------------------------- 0.0/11.4 MB ? eta -:--:--
   ---------------------------------------- 0.1/11.4 MB 2.6 MB/s eta 0:00:05
    --------------------------------------- 0.2/11.4 MB 2.1 MB/s eta 0:00:06
   - -------------------------------------- 0.4/11.4 MB 2.4 MB/s eta 0:00:05
   - -------------------------------------- 0.4/11.4 MB 2.5 MB/s eta 0:00:05
   - -------------------------------------- 0.6/11.4 MB 2.2 MB/s eta 0:00:05
   -- ------------------------------------- 0.6/11.4 MB 2.3 MB/s eta 0:00:05
   -- ------------------------------------- 0.8/11.4 MB 2.3 MB/s eta 0:00:05
   --- ------------------------------------ 1.1/11.4 MB 2.6 MB/s eta 0:00:04
   ---- ----------------------------------- 1.2/11.4 MB 2.6 MB/s eta 0:00:04
   ---- ----------------------------------- 1


[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import os
import json
import duckdb
from pydantic import BaseModel, Field
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import ToolNode
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display
from typing_extensions import TypedDict

# Agentic Pipeline

In [2]:
# Setting the local dataset directory and table names.
DATASET_DIR = "../data/choc_ai_dataset"  
TABLES = ["menu", "order", "order_item"]

In [3]:
# Connecting to DuckDB (in-memory; you can also persist to a file if needed)
conn = duckdb.connect(database=':memory:', read_only=False)

# Loading CSV files as tables if they don't already exist.
for table in TABLES:
    table_name = table  # Using table name directly
    csv_path = os.path.join(DATASET_DIR, f"{table}.csv")

    # For, debugging: Print absolute path and check if file exists.
    abs_path = os.path.abspath(csv_path)
    print(f"Looking for file at: {abs_path}")
    if not os.path.exists(csv_path):
        print(f"File {abs_path} does not exist. Check your directory structure.")
        continue
    
    # Fetching all tables and filter for our table name.
    existing_tables = [row[0] for row in conn.execute("SHOW TABLES").fetchall()]
    
    if table_name in existing_tables:
        print(f"Table {table_name} already exists. Skipping load.")
    else:
        # DuckDB can automatically detect CSV schema via read_csv_auto.
        conn.execute(f'CREATE TABLE "{table_name}" AS SELECT * FROM read_csv_auto(\'{csv_path}\')')
        print(f"Loaded table {table_name} from {csv_path}.")
        print()

def get_schema() -> str:
    """Retrieves and returns the DuckDB schema for all tables as a JSON string."""
    schema_info = {}
    
    # Listing all tables.
    tables = conn.execute("SHOW TABLES").fetchall()
    
    for (table,) in tables:
        # Getting table info using PRAGMA.
        columns = conn.execute(f"PRAGMA table_info('{table}')").fetchall()
        
        # Each row: (cid, name, type, notnull, dflt_value, pk)
        schema_info[table] = [{"name": col[1], "type": col[2]} for col in columns]
    
    return json.dumps(schema_info, indent=2)

Looking for file at: c:\Users\user\Documents\Learning\Development\agentic-workflow\projects\agentic-workflow-for-BigQuery-data-using-langraph-and-gemini\data\choc_ai_dataset\menu.csv
Loaded table menu from ../data/choc_ai_dataset\menu.csv.

Looking for file at: c:\Users\user\Documents\Learning\Development\agentic-workflow\projects\agentic-workflow-for-BigQuery-data-using-langraph-and-gemini\data\choc_ai_dataset\order.csv
Loaded table order from ../data/choc_ai_dataset\order.csv.

Looking for file at: c:\Users\user\Documents\Learning\Development\agentic-workflow\projects\agentic-workflow-for-BigQuery-data-using-langraph-and-gemini\data\choc_ai_dataset\order_item.csv
Loaded table order_item from ../data/choc_ai_dataset\order_item.csv.



In [4]:
@tool
def execute_query_tool(query: str) -> str:
    """
    Execute a SQL query against DuckDB and return the results as a JSON string.
    """
    try:
        # Executing the query and fetch all rows.
        result = conn.execute(query).fetchall()
        
        # Getting column names from the cursor description.
        columns = [desc[0] for desc in conn.description]
        
        # Combining rows with column names.
        data = [dict(zip(columns, row)) for row in result]
    
        return json.dumps(data, indent=2)
    
    except Exception as e:
        error_message = f"DuckDB Error: {str(e)}"
    
        return json.dumps({"error": error_message})

In [5]:
class SubmitFinalAnswer(BaseModel):
    """Represents the final answer submitted by the agent."""
    final_answer: str = Field(..., description="The final answer to submit to the user")

In [6]:
MODEL = "gpt-3.5-turbo"
# Ensure that your OpenAI API key is set in the environment variable OPENAI_API_KEY,
# or pass it directly here using openai_api_key="YOUR_API_KEY"
data_llm_with_tools = ChatOpenAI(model_name=MODEL, openai_api_key=os.getenv("OPENAI_API_KEY")).bind_tools([execute_query_tool, SubmitFinalAnswer])

In [7]:
# Sample system message; note the schema placeholder will be replaced dynamically.
sys_message_template = """
You are an expert in answering questions about the local dataset stored in DuckDB.

Your job is to execute the relevant SQL statements against the tables to get the best answer.
The user is only interested in seeing the final result.

1. If the user request is reasonable and compatible with the schema, YOU MUST FIRST call the `execute_query_tool`
   to get the result.
   When generating the SQL query:
   - Use meaningful aliases for column names.
   - Limit results to 5 rows (unless specified). Order results for clarity.
   - Select only necessary columns; avoid SELECT *.
   - Use valid DuckDB SQL (no escape characters).
   - Use only SELECT statements (no DML).

2. Call the `execute_query_tool` to execute the generated SQL query. If the query fails, analyze the error message and attempt to correct the SQL.
   If correction is not possible, inform the user of the error and its likely cause.

3. Only once you have the result from DuckDB, call the `SubmitFinalAnswer` tool to present the final results to the user and terminate the conversation.

You will use the following schema for all queries and all SQL must conform to this schema:
{schema}

EXAMPLE:
If a user asks: "Which is the most expensive item on the menu?"
You should:
1. Call execute_query_tool to execute SQL such as:
   SELECT menu_name, menu_price AS price FROM menu ORDER BY menu_price DESC LIMIT 1
2. Then call SubmitFinalAnswer to respond to the user with the result.
"""

In [8]:
class State(TypedDict):
    """Defines the workflow state."""
    messages: list[AnyMessage]
    dataset_schema: str

In [9]:
memory = MemorySaver()

In [None]:
def get_schema_node(state: dict) -> dict:
    """Retrieves the schema from DuckDB and stores it in the state."""
    if not state.get("dataset_schema"):
        schema = get_schema()
    
        return {"dataset_schema": schema, "messages": [AIMessage(content="Schema retrieved from DuckDB.")]}
    
    return {"messages": [AIMessage(content="Schema retrieved from memory.")]}

# def data_chatbot_node(state):
#     schema = state["dataset_schema"]
#     messages = [SystemMessage(content=sys_message_template.format(schema=schema))] + state["messages"]

#     # Ensuring the message sequence is correct
#     for i in range(len(messages) - 1):

#         if hasattr(messages[i], 'role') and messages[i].role == 'tool_calls':

#             if not hasattr(messages[i + 1], 'role') or messages[i + 1].role != 'tool':
#                 raise ValueError("A 'tool' message must follow a 'tool_calls' message.")

#     response = data_llm_with_tools.invoke(messages)

#     return {"messages": [response]}

def data_chatbot_node(state):
    schema = state["dataset_schema"]
    
    # Creating a new list with just the system message to start
    filtered_messages = [SystemMessage(content=sys_message_template.format(schema=schema))]
    
    # Processing the existing messages to ensure proper tool call structure
    state_messages = state["messages"]
    
    # Looping through the state messages and build a properly structured sequence
    tool_calls_message = None
    
    for msg in state_messages:
        
        # If we find a message with tool_calls, store it and wait for the corresponding tool response
        if hasattr(msg, 'tool_calls') and msg.tool_calls:
            tool_calls_message = msg
            filtered_messages.append(msg)
        
        # If this is a tool response message, make sure it follows a tool_calls message
        elif hasattr(msg, 'role') and msg.role == 'tool':
        
            # Only add tool messages if they have a preceding tool_calls message
            if tool_calls_message is not None:
                filtered_messages.append(msg)
                tool_calls_message = None  # Reset after adding the pair
        else:
            # For regular messages (human, AI), just add them normally
            filtered_messages.append(msg)
    
    # Using the filtered and properly structured messages
    try:
        response = data_llm_with_tools.invoke(filtered_messages)
        return {"messages": [response]}
    except Exception as e:
        
        # Logging the error and provide a fallback response
        print(f"Error in data_chatbot_node: {e}")
        # Print filtered messages for debugging
        for i, msg in enumerate(filtered_messages):
            print(f"Message {i}: {type(msg).__name__} - {msg}")
        
        # Returning a fallback AI message
        return {"messages": [AIMessage(content="I encountered an error processing your request. Please try again with a different query.")]}

def get_state(state: dict) -> str:
    """Determines the next step in the workflow."""
    last_message = state["messages"][-1]
    
    if isinstance(last_message, AIMessage) and last_message.tool_calls:
    
        if any(call["name"] == "execute_query_tool" for call in last_message.tool_calls):
            return "execute_sql"

    return END

In [None]:
# Redefining the workflow graph with improved message handling
workflow = StateGraph(State)
workflow.add_node("get_schema", get_schema_node)
workflow.add_node("data_chatbot", data_chatbot_node)

# Using a custom function to handle the tool execution and ensure proper message formatting
def execute_sql_with_formatting(state):
    """Execute SQL and ensure proper message formatting for the response"""
    messages = state["messages"]
    last_message = messages[-1]
    
    if not hasattr(last_message, 'tool_calls'):
        # If there's no tool call in the last message, return early
        return {"messages": messages + [AIMessage(content="No valid tool call found.")]}
    
    # Finding the execute_query_tool call
    for tool_call in last_message.tool_calls:
        
        if tool_call["name"] == "execute_query_tool":
        
            # Executing the query
            query = tool_call["args"]["query"]
            try:
                result = execute_query_tool(query)
                
                # Creating a properly formatted tool response
                from langchain_core.messages import ToolMessage
                
                tool_message = ToolMessage(
                    content=result,
                    tool_call_id=tool_call["id"],
                    name="execute_query_tool"
                )
                return {"messages": messages + [tool_message]}
            except Exception as e:
                return {"messages": messages + [AIMessage(content=f"Error executing query: {str(e)}")]}
    
    # If no execute_query_tool call was found
    return {"messages": messages + [AIMessage(content="No execute_query_tool call found.")]}

workflow.add_node("execute_sql", execute_sql_with_formatting)
workflow.add_edge(START, "get_schema")
workflow.add_edge("get_schema", "data_chatbot")
workflow.add_conditional_edges("data_chatbot", get_state, ["execute_sql", END])
workflow.add_edge("execute_sql", "data_chatbot")

# Compiling the workflow graph
data_chatbot_graph = workflow.compile(checkpointer=memory)

In [30]:
config = {"configurable": {"thread_id": "101"}, "recursion_limit": 20}

# Driver Loop

In [None]:
while True:
    user_input = input("User (q/Q to quit): ")
    if user_input in {"q", "Q"}:
        break
    for output in data_chatbot_graph.stream(
        {"messages": [HumanMessage(content=user_input)]},
        config=config,
        stream_mode="updates"
    ):
        last_message = next(iter(output.values()))["messages"][-1]
        last_message.pretty_print()



Schema retrieved from memory.
Tool Calls:
  execute_query_tool (call_lYMuxdPSAtMIc2sW1ACHb9aF)
 Call ID: call_lYMuxdPSAtMIc2sW1ACHb9aF
  Args:
    query: SELECT * FROM menu LIMIT 5
Name: execute_query_tool

[
  {
    "menu_id": 15,
    "menu_name": "Lavender Sea Salt Chocolate",
    "menu_description": "A decadent blend of rich dark chocolate infused with aromatic lavender, topped with a sprinkle of delicate sea salt.",
    "menu_size": "1 pc",
    "menu_price": 5.5,
    "menu_allergy_info": "Contains: Milk, Soy, Gluten. May contain traces of nuts.",
    "menu_image_gcs_filename": "gs://data-analytics-golden-demo/chocolate-ai/v1/Synthetic-Data-Generation-Menu/15.png",
    "menu_image_http_url": "https://storage.googleapis.com/data-analytics-golden-demo/chocolate-ai/v1/Synthetic-Data-Generation-Menu/15.png",
    "menu_image_prompt": "Imagine a single, perfectly formed chocolate square, dusted with a light sprinkling of sea salt. The chocolate is a deep, rich brown, almost black, and ha

  result = execute_query_tool(query)


Tool Calls:
  SubmitFinalAnswer (call_Z3dni21VBIE7wUJMQVBWys6f)
 Call ID: call_Z3dni21VBIE7wUJMQVBWys6f
  Args:
    final_answer: Here are 5 menu items:
1. Lavender Sea Salt Chocolate - $5.5
2. Tour de France Commemorative Chocolate - Modern - $14.99
3. Tour de France Commemorative Chocolate - Modern - $14.99
4. Tour de France Commemorative Chocolate - Steampunk - $14.99
5. Parisian Chocolate Easter Egg - $24.99

Schema retrieved from memory.
Tool Calls:
  execute_query_tool (call_jWPDCKx7tjF8GQTIGmklQLT1)
 Call ID: call_jWPDCKx7tjF8GQTIGmklQLT1
  Args:
    query: SELECT menu_name, menu_price AS price FROM menu ORDER BY menu_price DESC LIMIT 1
Name: execute_query_tool

[
  {
    "menu_name": "Five Spice Chocolate Celebration Cake",
    "price": 75.0
  }
]
Tool Calls:
  SubmitFinalAnswer (call_eoIGqKcUZxgzQJ2i22ymTz0B)
 Call ID: call_eoIGqKcUZxgzQJ2i22ymTz0B
  Args:
    final_answer: The most expensive item on the menu is the Five Spice Chocolate Celebration Cake priced at $75.0
