## SQL Generation Agent with DuckDB

In [20]:
!pip install langchain_community
!pip install langchain --upgrade


# Essential libraries
import os  # For setting environment variables
import openai  # For OpenAI API interactions
import pandas as pd  # For handling data in dataframes
import json  # For working with JSON data structures
import logging  # For adding logging functionality

# LangChain and related utilities
from langchain.agents import initialize_agent, Tool  # To initialize LangChain agents and tools
from langchain.agents.agent_toolkits import SQLDatabaseToolkit  # For SQL database toolkits
from langchain.chat_models import ChatOpenAI  # For OpenAI chat-based models
from langchain.prompts import PromptTemplate  # For prompt customization
from langchain.sql_database import SQLDatabase  # For SQL database connections




In [89]:
os.environ["LANGCHAIN_API_KEY"] = "YOUR_KEY"
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["OPENAI_API_KEY"] = "YOUR_KEY"

### DuckDB Loader Example

In [90]:
from langchain_community.document_loaders import DuckDBLoader

In [127]:
from langchain_community.document_loaders.duckdb_loader import DuckDBLoader
import duckdb

# Define the path to the CSV file and DuckDB database
file_path = "./custom_safety_occurrences_50.csv"
db_path = "./safety_occurrences.duckdb"

# Connect to DuckDB
con = duckdb.connect(database=db_path)  # Persistent DuckDB database

# Create a table for the safety occurrences
try:
    con.execute(f"DROP TABLE IF EXISTS safety_occurrences")
    con.execute(f"CREATE TABLE safety_occurrences AS SELECT * FROM read_csv_auto('{file_path}')")
    print("Table 'safety_occurrences' created successfully.")
except duckdb.BinderException:
    print("Table already exists. Skipping creation.")

# Verify table creation
try:
    tables = con.execute("SHOW TABLES").fetchall()
    print(f"Available tables: {tables}")
except Exception as e:
    print(f"Error listing tables: {e}")

# Use DuckDBLoader to load and query the data
query = "SELECT * FROM safety_occurrences"

loader = DuckDBLoader(query=query, database=db_path)

# Load the data
try:
    data = loader.load()
    print("Data loaded successfully.")
    print(data[:5])  # Display the first 5 rows for verification
except Exception as e:
    print(f"Error loading data: {e}")


Table 'safety_occurrences' created successfully.
Available tables: [('safety_occurrences',)]
Data loaded successfully.
[Document(metadata={}, page_content='event_type: Incident\naircraft_model: A320\noperation_type: Commercial\nphase_of_flight: Takeoff\nseverity: Low\nreported_by: ATC\noccurrence_date: 2025-01-01\nlocation: Heathrow\ncontributing_factors: Human Error\ncorrective_actions: Procedure Update'), Document(metadata={}, page_content='event_type: Accident\naircraft_model: B737\noperation_type: Cargo\nphase_of_flight: Climb\nseverity: Medium\nreported_by: Pilot\noccurrence_date: 2025-01-02\nlocation: JFK\ncontributing_factors: Weather\ncorrective_actions: Additional Training'), Document(metadata={}, page_content='event_type: Serious Incident\naircraft_model: A380\noperation_type: Private\nphase_of_flight: Cruise\nseverity: High\nreported_by: Ground Crew\noccurrence_date: 2025-01-03\nlocation: Changi\ncontributing_factors: Technical Failure\ncorrective_actions: Technical Inspecti

In [128]:
data

[Document(metadata={}, page_content='event_type: Incident\naircraft_model: A320\noperation_type: Commercial\nphase_of_flight: Takeoff\nseverity: Low\nreported_by: ATC\noccurrence_date: 2025-01-01\nlocation: Heathrow\ncontributing_factors: Human Error\ncorrective_actions: Procedure Update'),
 Document(metadata={}, page_content='event_type: Accident\naircraft_model: B737\noperation_type: Cargo\nphase_of_flight: Climb\nseverity: Medium\nreported_by: Pilot\noccurrence_date: 2025-01-02\nlocation: JFK\ncontributing_factors: Weather\ncorrective_actions: Additional Training'),
 Document(metadata={}, page_content='event_type: Serious Incident\naircraft_model: A380\noperation_type: Private\nphase_of_flight: Cruise\nseverity: High\nreported_by: Ground Crew\noccurrence_date: 2025-01-03\nlocation: Changi\ncontributing_factors: Technical Failure\ncorrective_actions: Technical Inspection'),
 Document(metadata={}, page_content='event_type: Incident\naircraft_model: B777\noperation_type: Commercial\nph

### Build the SQL Generation Agent

In [129]:
from typing_extensions import TypedDict


class State(TypedDict):
    question: str
    query: str
    result: str
    answer: str

In [130]:
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")

In [131]:
from langchain import hub
query_prompt_template = hub.pull("langchain-ai/sql-query-system-prompt")
assert len(query_prompt_template.messages) == 1
query_prompt_template.messages[0].pretty_print()



Given an input question, create a syntactically correct [33;1m[1;3m{dialect}[0m query to run to help find the answer. Unless the user specifies in his question a specific number of examples they wish to obtain, always limit your query to at most [33;1m[1;3m{top_k}[0m results. You can order the results by a relevant column to return the most interesting examples in the database.

Never query for all the columns from a specific table, only ask for a the few relevant columns given the question.

Pay attention to use only the column names that you can see in the schema description. Be careful to not query for columns that do not exist. Also, pay attention to which column is in which table.

Only use the following tables:
[33;1m[1;3m{table_info}[0m

Question: [33;1m[1;3m{input}[0m


### Define Write Query

In [136]:
def write_query(state):
    """Generate and clean SQL query to fetch information."""
    # Define the prompt template explicitly for this function
    query_prompt_template = """
    You are an expert SQL generator for the DuckDB database.
    The table schema is as follows:

    - event_type: The type of event, such as Incident, Accident, or Serious Incident.
    - aircraft_model: The model of the aircraft involved, such as A320 or B737.
    - operation_type: The type of operation, such as Commercial, Cargo, or Private.
    - phase_of_flight: The phase of flight when the event occurred, such as Takeoff, Climb, Cruise, Descent, or Landing.
    - severity: The severity of the event, such as Low, Medium, or High.
    - reported_by: The entity that reported the event, such as ATC, Pilot, Ground Crew, or Passenger.
    - occurrence_date: The date the event occurred, formatted as YYYY-MM-DD.
    - location: The location where the event occurred, such as an airport or geographic region.
    - contributing_factors: Factors that contributed to the event, such as Human Error, Weather, or Technical Failure.
    - corrective_actions: Actions taken to address the event, such as Procedure Update, Additional Training, or Technical Inspection.

    The table name is `safety_occurrences`.

    Given the question: "{input}", write a valid SQL query that works with this schema.
    Return only the SQL query, with no explanations.
    """

    # Generate the input prompt
    prompt = query_prompt_template.format(input=state["question"])

    # Use invoke to call the model
    response = llm.invoke(prompt)

    # Debugging: Print raw response object
    print(f"Raw LLM Response: {response}")

    # Extract content from the response
    if hasattr(response, "content"):
        query_text = response.content.strip()  # Extract the query text

        # Remove extraneous text if present (like "Query:" or SQL code blocks)
        if "```sql" in query_text:
            query_text = query_text.split("```sql")[1]
        if "```" in query_text:
            query_text = query_text.split("```")[0]
        query_text = query_text.strip()

        return {"query": query_text}
    else:
        raise ValueError("Unexpected response format. 'content' attribute not found.")


In [137]:
def execute_query(state):
    """Execute SQL query."""
    try:
        # Extract the query string from the state
        query = state.get("query")
        if not query:
            raise ValueError("No query found in the state.")

        # Debugging: Print the query being executed
        print(f"Executing Query: {query}")

        # Execute the query using the DuckDB connection
        result = con.execute(query).fetchall()
        return {"result": result}
    except Exception as e:
        print(f"Error executing query: {e}")
        return {"error": str(e)}


In [138]:
sql_query = write_query({"question": "Occurrences of type Incident involving a A320 model?"})
print(sql_query)
execute_query(sql_query)

Raw LLM Response: content="```sql\nSELECT *\nFROM safety_occurrences\nWHERE event_type = 'Incident' AND aircraft_model = 'A320';\n```" additional_kwargs={} response_metadata={'token_usage': {'completion_tokens': 27, 'prompt_tokens': 298, 'total_tokens': 325, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_4691090a87', 'finish_reason': 'stop', 'logprobs': None} id='run-284a7b0c-4c35-48f9-a1e4-124dcf6273c8-0'
{'query': "SELECT *\nFROM safety_occurrences\nWHERE event_type = 'Incident' AND aircraft_model = 'A320';"}
Executing Query: SELECT *
FROM safety_occurrences
WHERE event_type = 'Incident' AND aircraft_model = 'A320';


{'result': [('Incident',
   'A320',
   'Commercial',
   'Takeoff',
   'Low',
   'ATC',
   datetime.date(2025, 1, 1),
   'Heathrow',
   'Human Error',
   'Procedure Update'),
  ('Incident',
   'A320',
   'Commercial',
   'Cruise',
   'Low',
   'ATC',
   datetime.date(2025, 1, 13),
   'Changi',
   'Human Error',
   'Procedure Update'),
  ('Incident',
   'A320',
   'Commercial',
   'Landing',
   'Low',
   'ATC',
   datetime.date(2025, 1, 25),
   'Dubai',
   'Human Error',
   'Procedure Update'),
  ('Incident',
   'A320',
   'Commercial',
   'Climb',
   'Low',
   'ATC',
   datetime.date(2025, 2, 6),
   'JFK',
   'Human Error',
   'Procedure Update'),
  ('Incident',
   'A320',
   'Commercial',
   'Descent',
   'Low',
   'ATC',
   datetime.date(2025, 2, 18),
   'Frankfurt',
   'Human Error',
   'Procedure Update')]}

In [139]:
# Define an interactive cell
from IPython.display import display
from ipywidgets import widgets

# Create a text input widget
question_input = widgets.Text(
    value="",
    placeholder="Type your question here...",
    description="Question:",
    layout=widgets.Layout(width="90%"),
)

# Create an output widget to display results
output = widgets.Output()

# Define a callback to handle the interaction
def handle_query(change):
    output.clear_output()  # Clear previous output
    with output:
        try:
            # Fetch the SQL query using the user-provided question
            sql_query = write_query({"question": question_input.value})
            print(f"Generated SQL Query:\n{sql_query['query']}")

            # Execute the query and print the results (ensure execute_query is defined)
            execute_query(sql_query)
        except Exception as e:
            print(f"Error: {e}")

# Attach the callback to the input widget
question_input.observe(handle_query, names="value")

# Display the interactive widgets
display(question_input, output)


Text(value='', description='Question:', layout=Layout(width='90%'), placeholder='Type your question here...')

Output()