## create database

### create user table

In [2]:
# import sqlite3
# import json

# # Connect to SQLite database (creates the file if it doesn't exist)
# conn = sqlite3.connect('procore_db.sqlite')
# cursor = conn.cursor()

# # Create the users table
# cursor.execute("""
# CREATE TABLE IF NOT EXISTS users (
#     id INTEGER PRIMARY KEY,
#     address TEXT,
#     avatar TEXT,
#     business_id TEXT,
#     business_phone TEXT,
#     business_phone_extension INTEGER,
#     city TEXT,
#     contact_id INTEGER,
#     country_code TEXT,
#     created_at TEXT,
#     email_address TEXT,
#     email_signature TEXT,
#     employee_id TEXT,
#     erp_integrated_accountant BOOLEAN,
#     fax_number TEXT,
#     first_name TEXT,
#     initials TEXT,
#     is_active BOOLEAN,
#     is_employee BOOLEAN,
#     job_title TEXT,
#     last_activated_at TEXT,
#     last_login_at TEXT,
#     last_name TEXT,
#     mobile_phone TEXT,
#     name TEXT,
#     notes TEXT,
#     origin_id TEXT,
#     origin_data TEXT,
#     state_code TEXT,
#     updated_at TEXT,
#     welcome_email_sent_at TEXT,
#     zip TEXT,
#     work_classification_id INTEGER,
#     permission_template TEXT, -- JSON field
#     company_permission_template TEXT, -- JSON field
#     vendor TEXT, -- JSON field
#     role TEXT,
#     verified_employee BOOLEAN
# )
# """)

# # Commit and close connection
# conn.commit()
# conn.close()

# print("Database and table created successfully!")
import sqlite3
import json

# Connect to SQLite database (creates the file if it doesn't exist)
conn = sqlite3.connect('procore_db.sqlite')
cursor = conn.cursor()

# Drop existing table if you want to recreate it (uncomment if needed)
cursor.execute("DROP TABLE IF EXISTS users")

# Create the users table with only email_address and last_name as required
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
  id INTEGER PRIMARY KEY,
  email_address TEXT NOT NULL,  -- Required field
  last_name TEXT NOT NULL,      -- Required field
  address TEXT NULL,
  avatar TEXT NULL,
  business_id TEXT NULL,
  business_phone TEXT NULL,
  business_phone_extension INTEGER NULL,
  city TEXT NULL,
  contact_id INTEGER NULL,
  country_code TEXT NULL,
  created_at TEXT NULL,
  email_signature TEXT NULL,
  employee_id TEXT NULL,
  erp_integrated_accountant BOOLEAN NULL,
  fax_number TEXT NULL,
  first_name TEXT NULL,
  initials TEXT NULL,
  is_active BOOLEAN NULL,
  is_employee BOOLEAN NULL,
  job_title TEXT NULL,
  last_activated_at TEXT NULL,
  last_login_at TEXT NULL,
  mobile_phone TEXT NULL,
  name TEXT NULL,
  notes TEXT NULL,
  origin_id TEXT NULL,
  origin_data TEXT NULL,
  state_code TEXT NULL,
  updated_at TEXT NULL,
  welcome_email_sent_at TEXT NULL,
  zip TEXT NULL,
  work_classification_id INTEGER NULL,
  permission_template TEXT NULL,        -- JSON field
  company_permission_template TEXT NULL, -- JSON field
  vendor TEXT NULL,                     -- JSON field
  role TEXT NULL,
  verified_employee BOOLEAN NULL
)
""")

# Commit and close connection
conn.commit()
conn.close()

print("Database and table created successfully!")

Database and table created successfully!


## Building Agent

### Connecting to database

In [9]:
from langchain_community.utilities import SQLDatabase

db = SQLDatabase.from_uri("sqlite:///procore_db.sqlite")
print(db.dialect)
print(db.get_usable_table_names())
db.run("SELECT * FROM users LIMIT 10;")

sqlite
['users']


''

### handle errors

In [10]:
from typing import Any

from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda, RunnableWithFallbacks
from langgraph.prebuilt import ToolNode


def create_tool_node_with_fallback(tools: list) -> RunnableWithFallbacks[Any, dict]:
    """
    Create a ToolNode with a fallback to handle errors and surface them to the agent.
    """
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )


def handle_tool_error(state) -> dict:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",
                tool_call_id=tc["id"],
            )
            for tc in tool_calls
        ]
    }

### SQL tools (get tables and get schemas)

In [11]:

from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_openai import ChatOpenAI

toolkit = SQLDatabaseToolkit(db=db, llm=ChatOpenAI())
tools = toolkit.get_tools()

list_tables_tool = next(tool for tool in tools if tool.name == "sql_db_list_tables")
get_schema_tool = next(tool for tool in tools if tool.name == "sql_db_schema")

print(list_tables_tool.invoke(""))

print(get_schema_tool.invoke("users"))

users

CREATE TABLE users (
	id INTEGER, 
	address TEXT, 
	avatar TEXT, 
	business_id TEXT, 
	business_phone TEXT, 
	business_phone_extension INTEGER, 
	city TEXT, 
	contact_id INTEGER, 
	country_code TEXT, 
	created_at TEXT, 
	email_address TEXT, 
	email_signature TEXT, 
	employee_id TEXT, 
	erp_integrated_accountant BOOLEAN, 
	fax_number TEXT, 
	first_name TEXT, 
	initials TEXT, 
	is_active BOOLEAN, 
	is_employee BOOLEAN, 
	job_title TEXT, 
	last_activated_at TEXT, 
	last_login_at TEXT, 
	last_name TEXT, 
	mobile_phone TEXT, 
	name TEXT, 
	notes TEXT, 
	origin_id TEXT, 
	origin_data TEXT, 
	state_code TEXT, 
	updated_at TEXT, 
	welcome_email_sent_at TEXT, 
	zip TEXT, 
	work_classification_id INTEGER, 
	permission_template TEXT, 
	company_permission_template TEXT, 
	vendor TEXT, 
	role TEXT, 
	verified_employee BOOLEAN, 
	PRIMARY KEY (id)
)

/*
3 rows from users table:
id	address	avatar	business_id	business_phone	business_phone_extension	city	contact_id	country_code	created_at	email_a

### create query tool

In [18]:
from langchain_core.tools import tool


@tool
def db_query_tool(query: str) -> str:
    """
    Execute a SQL query against the database and get back the result.
    If the query is not correct, an error message will be returned.
    If an error is returned, rewrite the query, check the query, and try again.
    """
    result = db.run_no_throw(query)
    if not result:
        return "Error: Query failed. Please rewrite your query and try again."
    return result


print(db_query_tool.invoke("SELECT * FROM users LIMIT 10;"))

Error: Query failed. Please rewrite your query and try again.


In [26]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI()

from langchain_core.prompts import ChatPromptTemplate

query_check_system = """You are a SQL expert with a strong attention to detail.
Double check the SQLite query for common mistakes, including:
- Using NOT IN with NULL values
- Using UNION when UNION ALL should have been used
- Using BETWEEN for exclusive ranges
- Data type mismatch in predicates
- Properly quoting identifiers
- Using the correct number of arguments for functions
- Casting to the correct data type
- Using the proper columns for joins

If there are any of the above mistakes, rewrite the query. If there are no mistakes, just reproduce the original query.

You will call the appropriate tool to execute the query after running this check."""

query_check_prompt = ChatPromptTemplate.from_messages(
    [("system", query_check_system), ("placeholder", "{messages}")]
)
query_check = query_check_prompt | llm.bind_tools(
    [db_query_tool], tool_choice="required"
)

query_check.invoke({"messages": [("user", "SELECT users whom names starts with n;")]})

AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_RXDe0UCQrJpthVTh64FQ0OTm', 'function': {'arguments': '{"query":"SELECT * FROM users WHERE name LIKE \'n%\'"}', 'name': 'db_query_tool'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 224, 'total_tokens': 246, 'completion_tokens_details': {'audio_tokens': 0, 'reasoning_tokens': 0, 'accepted_prediction_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-70266031-6c79-44c6-b3b9-713041ecfc3c-0', tool_calls=[{'name': 'db_query_tool', 'args': {'query': "SELECT * FROM users WHERE name LIKE 'n%'"}, 'id': 'call_RXDe0UCQrJpthVTh64FQ0OTm', 'type': 'tool_call'}], usage_metadata={'input_tokens': 224, 'output_tokens': 22, 'total_tokens': 246, 'input_token_details': {'audio': 0, 'cache_rea

In [None]:
from typing import Annotated, Literal

from langchain_core.messages import AIMessage
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI
from typing_extensions import TypedDict

from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import AnyMessage, add_messages

# Define the state for the agent
class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]


# Define a new graph
workflow = StateGraph(State)

# Add a node for the first tool call
def first_tool_call(state: State) -> dict[str, list[AIMessage]]:
    return {
        "messages": [
            AIMessage(
                content="",
                tool_calls=[
                    {
                        "name": "sql_db_list_tables",
                        "args": {},
                        "id": "tool_abcd123",
                    }
                ],
            )
        ]
    }


def model_check_query(state: State) -> dict[str, list[AIMessage]]:
    """
    Use this tool to double-check if your query is correct before executing it.
    """
    return {"messages": [query_check.invoke({"messages": [state["messages"][-1]]})]}

workflow.add_node("first_tool_call", first_tool_call)
     

# Add nodes for the first two tools
workflow.add_node(
    "list_tables_tool", create_tool_node_with_fallback([list_tables_tool])
)
workflow.add_node("get_schema_tool", create_tool_node_with_fallback([get_schema_tool]))

# Add a node for a model to choose the relevant tables based on the question and available tables
model_get_schema = llm.bind_tools(
    [get_schema_tool]
)
workflow.add_node(
    "model_get_schema",
    lambda state: {
        "messages": [model_get_schema.invoke(state["messages"])],
    },
)

     

# Describe a tool to represent the end state
class SubmitFinalAnswer(BaseModel):
    """Submit the final answer to the user based on the query results."""

    final_answer: str = Field(..., description="The final answer to the user")


# Add a node for a model to generate a query based on the question and schema
query_gen_system = """You are a SQL expert with a strong attention to detail.

Given an input question, output a syntactically correct SQLite query to run, then look at the results of the query and return the answer.

DO NOT call any tool besides SubmitFinalAnswer to submit the final answer.

When generating the query:

Output the SQL query that answers the input question without a tool call.

Unless the user specifies a specific number of examples they wish to obtain, always limit your query to at most 5 results.
You can order the results by a relevant column to return the most interesting examples in the database.
Never query for all the columns from a specific table, only ask for the relevant columns given the question.

If you get an error while executing a query, rewrite the query and try again.

If you get an empty result set, you should try to rewrite the query to get a non-empty result set.
NEVER make stuff up if you don't have enough information to answer the query... just say you don't have enough information.

If you have enough information to answer the input question, simply invoke the appropriate tool to submit the final answer to the user.

DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the database."""
query_gen_prompt = ChatPromptTemplate.from_messages(
    [("system", query_gen_system), ("placeholder", "{messages}")]
)
query_gen = query_gen_prompt | llm.bind_tools(
    [SubmitFinalAnswer]
)


def query_gen_node(state: State):
    message = query_gen.invoke(state)

    # Sometimes, the LLM will hallucinate and call the wrong tool. We need to catch this and return an error message.
    tool_messages = []
    if message.tool_calls:
        for tc in message.tool_calls:
            if tc["name"] != "SubmitFinalAnswer":
                tool_messages.append(
                    ToolMessage(
                        content=f"Error: The wrong tool was called: {tc['name']}. Please fix your mistakes. Remember to only call SubmitFinalAnswer to submit the final answer. Generated queries should be outputted WITHOUT a tool call.",
                        tool_call_id=tc["id"],
                    )
                )
    else:
        tool_messages = []
    return {"messages": [message] + tool_messages}


workflow.add_node("query_gen", query_gen_node)

# Add a node for the model to check the query before executing it
workflow.add_node("correct_query", model_check_query)

# Add node for executing the query
workflow.add_node("execute_query", create_tool_node_with_fallback([db_query_tool]))


# Define a conditional edge to decide whether to continue or end the workflow
def should_continue(state: State) -> Literal[END, "correct_query", "query_gen"]:
    messages = state["messages"]
    last_message = messages[-1]
    # If there is a tool call, then we finish
    if getattr(last_message, "tool_calls", None):
        return END
    if last_message.content.startswith("Error:"):
        return "query_gen"
    else:
        return "correct_query"


# Specify the edges between the nodes
workflow.add_edge(START, "first_tool_call")
workflow.add_edge("first_tool_call", "list_tables_tool")
workflow.add_edge("list_tables_tool", "model_get_schema")
workflow.add_edge("model_get_schema", "get_schema_tool")
workflow.add_edge("get_schema_tool", "query_gen")
workflow.add_conditional_edges(
    "query_gen",
    should_continue,
)
workflow.add_edge("correct_query", "execute_query")
workflow.add_edge("execute_query", "query_gen")

# Compile the workflow into a runnable
app = workflow.compile()
     

from IPython.display import Image, display
from langchain_core.runnables.graph import MermaidDrawMethod

display(
    Image(
        app.get_graph().draw_mermaid_png(
            draw_method=MermaidDrawMethod.API,
        )
    )
)

## SQLDatabase Toolkit

In [27]:
import sqlite3

import requests
from langchain_community.utilities.sql_database import SQLDatabase
from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool


# def get_engine_for_chinook_db():
#     """Pull sql file, populate in-memory database, and create engine."""
#     url = "https://raw.githubusercontent.com/lerocha/chinook-database/master/ChinookDatabase/DataSources/Chinook_Sqlite.sql"
#     response = requests.get(url)
#     sql_script = response.text

#     connection = sqlite3.connect(":memory:", check_same_thread=False)
#     connection.executescript(sql_script)
#     return create_engine(
#         "sqlite://",
#         creator=lambda: connection,
#         poolclass=StaticPool,
#         connect_args={"check_same_thread": False},
#     )


# engine = get_engine_for_chinook_db()

# db = SQLDatabase(engine)

from langchain_community.utilities import SQLDatabase

db = SQLDatabase.from_uri("sqlite:///procore_db.sqlite")
print(db.dialect)
print(db.get_usable_table_names())
db.run("SELECT * FROM users LIMIT 10;")

sqlite
['users']


''

In [30]:
from langchain_openai import ChatOpenAI
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit

llm = ChatOpenAI()
toolkit = SQLDatabaseToolkit(db=db, llm=llm)

In [38]:
toolkit.get_tools()

[QuerySQLDataBaseTool(description="Input to this tool is a detailed and correct SQL query, output is a result from the database. If the query is not correct, an error message will be returned. If an error is returned, rewrite the query, check the query, and try again. If you encounter an issue with Unknown column 'xxxx' in 'field list', use sql_db_schema to query the correct table fields.", db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x00000153F179E990>),
 InfoSQLDatabaseTool(description='Input to this tool is a comma-separated list of tables, output is the schema and sample rows for those tables. Be sure that the tables actually exist by calling sql_db_list_tables first! Example Input: table1, table2, table3', db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x00000153F179E990>),
 ListSQLDatabaseTool(db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x00000153F179E990>),
 QuerySQLCheckerTool(description='Use this tool to 

In [42]:
from langchain_community.tools.sql_database.tool import (
    InfoSQLDatabaseTool,
    ListSQLDatabaseTool,
    QuerySQLCheckerTool,
    QuerySQLDataBaseTool,
)

from langchain import hub

prompt_template = hub.pull("langchain-ai/sql-agent-system-prompt")

assert len(prompt_template.messages) == 1
print(prompt_template.input_variables)

system_message = prompt_template.format(dialect="SQLite", top_k=5)



['dialect', 'top_k']


In [43]:
from langgraph.prebuilt import create_react_agent

agent_executor = create_react_agent(
    llm, toolkit.get_tools(), state_modifier=system_message
)

In [44]:
example_query = "Which country's customers spent the most?"

events = agent_executor.stream(
    {"messages": [("user", example_query)]},
    stream_mode="values",
)
for event in events:
    event["messages"][-1].pretty_print()


Which country's customers spent the most?
Tool Calls:
  sql_db_list_tables (call_dDKyzZtkJRYQwH71XgtxtV0I)
 Call ID: call_dDKyzZtkJRYQwH71XgtxtV0I
  Args:
Name: sql_db_list_tables

users
Tool Calls:
  sql_db_schema (call_sHEU8BHnjeYpE9RnjqDjw6AK)
 Call ID: call_sHEU8BHnjeYpE9RnjqDjw6AK
  Args:
    table_names: users
Name: sql_db_schema


CREATE TABLE users (
	id INTEGER, 
	address TEXT, 
	avatar TEXT, 
	business_id TEXT, 
	business_phone TEXT, 
	business_phone_extension INTEGER, 
	city TEXT, 
	contact_id INTEGER, 
	country_code TEXT, 
	created_at TEXT, 
	email_address TEXT, 
	email_signature TEXT, 
	employee_id TEXT, 
	erp_integrated_accountant BOOLEAN, 
	fax_number TEXT, 
	first_name TEXT, 
	initials TEXT, 
	is_active BOOLEAN, 
	is_employee BOOLEAN, 
	job_title TEXT, 
	last_activated_at TEXT, 
	last_login_at TEXT, 
	last_name TEXT, 
	mobile_phone TEXT, 
	name TEXT, 
	notes TEXT, 
	origin_id TEXT, 
	origin_data TEXT, 
	state_code TEXT, 
	updated_at TEXT, 
	welcome_email_sent_at TEXT, 

In [45]:
example_query = "Who are the top 3 best selling artists?"

events = agent_executor.stream(
    {"messages": [("user", example_query)]},
    stream_mode="values",
)
for event in events:
    event["messages"][-1].pretty_print()


Who are the top 3 best selling artists?
Tool Calls:
  sql_db_query (call_WRUE2YUfcjqavc7nPJdMXr6B)
 Call ID: call_WRUE2YUfcjqavc7nPJdMXr6B
  Args:
    query: SELECT artist_name, SUM(quantity) AS total_sold FROM sales GROUP BY artist_name ORDER BY total_sold DESC LIMIT 3
Name: sql_db_query

Error: (sqlite3.OperationalError) no such table: sales
[SQL: SELECT artist_name, SUM(quantity) AS total_sold FROM sales GROUP BY artist_name ORDER BY total_sold DESC LIMIT 3]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
Tool Calls:
  sql_db_list_tables (call_1VMkMRpW8c7a1aHCmgzSui9O)
 Call ID: call_1VMkMRpW8c7a1aHCmgzSui9O
  Args:
Name: sql_db_list_tables

users
Tool Calls:
  sql_db_schema (call_2XILVdEgKp5dCqXyzKHOMi0a)
 Call ID: call_2XILVdEgKp5dCqXyzKHOMi0a
  Args:
    table_names: users
Name: sql_db_schema


CREATE TABLE users (
	id INTEGER, 
	address TEXT, 
	avatar TEXT, 
	business_id TEXT, 
	business_phone TEXT, 
	business_phone_extension INTEGER, 
	city TEXT, 
	contact_id INTE