## MInd steps for the automation workflow in large steps 

1. the agent will get the name of a new data table from duckdb.
2. extract the new table schema and couple of rows from it.
3. make sure all the needed fields are there even if there is a slight naming difference.
4. create separate nodes based on functionalities for analytical separation.
5. extract predefined analysis table schemas in the prompts.
6. insert those analysis into the tables with the right names in the db.

In [1]:
import os 

import duckdb
# conn = duckdb.connect("test_duckdb.duckdb")
duckdb.sql("INSTALL spatial;")
duckdb.sql("LOAD spatial;")

duckdb_uri = 'duckdb:///test_duckdb.duckdb'

from dotenv import load_dotenv
load_dotenv()

# get the SQL Agent from langchain_community
from langchain_community.utilities import SQLDatabase
from langchain_community.agent_toolkits import SQLDatabaseToolkit
# from pydantic_ai import Agent

In [2]:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")

In [3]:
db = SQLDatabase.from_uri(duckdb_uri)



In [4]:
# Supabase storage credentials
BUCKET_URL = os.getenv("BUCKET_URL")
ENDPOINT_URL = os.getenv("ENDPOINT_URL")

AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.getenv("AWS_REGION")

In [5]:
duckdb.sql(
    f"""
    DROP SECRET IF EXISTS supabase_storage;
    CREATE SECRET supabase_storage (
        TYPE S3,
        KEY_ID '{AWS_ACCESS_KEY_ID}',
        SECRET '{AWS_SECRET_ACCESS_KEY}',
        ENDPOINT '{ENDPOINT_URL}',                                            
        REGION '{AWS_REGION}',
        URL_STYLE 'path'
    )
    """
)

┌─────────┐
│ Success │
│ boolean │
├─────────┤
│ true    │
└─────────┘

In [6]:
# conn.execute(
#     f"""
#     create table restaurant_info as select * from st_read('{BUCKET_URL}')
#     """
# )

# get the llm agent up with the db 

In [7]:
db.dialect

'duckdb'

In [8]:
db.get_usable_table_names()

['restaurant_info']

In [9]:
toolkit = SQLDatabaseToolkit(db=db, llm=llm)

In [10]:
tools = toolkit.get_tools()

In [11]:
tools

[QuerySQLDataBaseTool(description="Input to this tool is a detailed and correct SQL query, output is a result from the database. If the query is not correct, an error message will be returned. If an error is returned, rewrite the query, check the query, and try again. If you encounter an issue with Unknown column 'xxxx' in 'field list', use sql_db_schema to query the correct table fields.", db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x7f6f40bc2110>),
 InfoSQLDatabaseTool(description='Input to this tool is a comma-separated list of tables, output is the schema and sample rows for those tables. Be sure that the tables actually exist by calling sql_db_list_tables first! Example Input: table1, table2, table3', db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x7f6f40bc2110>),
 ListSQLDatabaseTool(db=<langchain_community.utilities.sql_database.SQLDatabase object at 0x7f6f40bc2110>),
 QuerySQLCheckerTool(description='Use this tool to double check

In [15]:
list_tables_tool = next(tool for tool in tools if tool.name == "sql_db_list_tables")
get_schema_tool = next(tool for tool in tools if tool.name == "sql_db_schema")
db_query_tool = next(tool for tool in tools if tool.name == "sql_db_query")

In [13]:
print(list_tables_tool.invoke(""))

restaurant_info


In [14]:
print(get_schema_tool.invoke("restaurant_info"))


CREATE TABLE restaurant_info (
	reference INTEGER, 
	number INTEGER, 
	external_number VARCHAR, 
	branch_name VARCHAR, 
	branch_reference VARCHAR, 
	type VARCHAR, 
	source VARCHAR, 
	status VARCHAR, 
	original_order_reference VARCHAR, 
	business_date DATE, 
	subtotal FLOAT, 
	total_price INTEGER, 
	total_charge INTEGER, 
	coupon_code VARCHAR, 
	coupon_name VARCHAR, 
	discount_name VARCHAR, 
	discounts INTEGER, 
	guests INTEGER, 
	tax_exclusive_discount_amount FLOAT, 
	tax_exclusive_total_charges INTEGER, 
	rounding INTEGER, 
	total_taxes FLOAT, 
	created_at TIMESTAMP WITHOUT TIME ZONE, 
	created_by VARCHAR, 
	accepted_at VARCHAR, 
	opened_at TIMESTAMP WITHOUT TIME ZONE, 
	due_at VARCHAR, 
	closed_at TIMESTAMP WITHOUT TIME ZONE, 
	closed_by VARCHAR, 
	check_number INTEGER, 
	tags VARCHAR, 
	customer_name VARCHAR, 
	customer_dial_code VARCHAR, 
	customer_phone VARCHAR, 
	customer_address_delivery_zone_name VARCHAR, 
	customer_address_name VARCHAR, 
	customer_address_description VARCHAR,

## Building the nodes and different aspects using langgraph 

In [16]:
from langchain_core.prompts import ChatPromptTemplate

In [17]:
query_check_system = """You are a SQL expert with a strong attention to detail.
Double check the DuckDB query for common mistakes, including:
- Using NOT IN with NULL values
- Using UNION when UNION ALL should have been used
- Using BETWEEN for exclusive ranges
- Data type mismatch in predicates
- Properly quoting identifiers
- Using the correct number of arguments for functions
- Casting to the correct data type
- Using the proper columns for joins

If there are any of the above mistakes, rewrite the query. If there are no mistakes, just reproduce the original query.

You will call the appropriate tool to execute the query after running this check."""

query_check_prompt = ChatPromptTemplate.from_messages(
    [("system", query_check_system), ("placeholder", "{messages}")]
)

query_check = query_check_prompt | llm


## Define workflow

In [28]:
from typing import Annotated, Literal
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.messages import AIMessage
from langchain_openai import ChatOpenAI

from pydantic import BaseModel, Field
from typing_extensions import TypedDict

from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import AnyMessage, add_messages

In [26]:
class State(TypedDict):
    table_name: str
    messages: Annotated[list[AnyMessage], add_messages]
    sql_query: str
    analytical_table_inserted_in: str
    
class OutState(TypedDict):
    analytical_table_inserted_in: str

In [None]:
def parse_question(self, state: dict) -> dict:
        """Parse user question and identify relevant tables and columns."""
        raw_table = state['table_name']
        schema = get_schema_tool.invoke(raw_table)
        prompt = ChatPromptTemplate.from_messages([
            ("system", '''You are a senior Analytics Engineer specializing in creating optimized SQL queries that benefits the business different KPIs. 
Given a database table with data and database schema, identify columns that we can use to create the following metrics:
- Daily sales

Your response should be in the following JSON format:
{{
    "is_relevant": boolean,
    "relevant_tables": [
        {{
            "table_name": string,
            "columns": [string],
            "noun_columns": [string]
        }}
    ]
}}

'''),
            ("human", "===Database schema:\n{schema}\n\n===User raw_table:\n{raw_table}\n\nIdentify relevant tables and columns:")
        ])

        output_parser = JsonOutputParser()
        
        response = self.llm_manager.invoke(prompt, schema=schema, question=question)
        parsed_response = output_parser.parse(response)
        return {"parsed_question": parsed_response}


In [27]:
workflow = StateGraph(State)

In [None]:
workflow.add_node("raw_ingested_table",)