In [5]:
import sqlite3
import requests
from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool
from langchain.chat_models import init_chat_model
import os
from dotenv import load_dotenv
from crewai_tools import ScrapeWebsiteTool, SerperDevTool, BaseTool
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit
from langchain_community.tools.sql_database.tool import  InfoSQLDatabaseTool,ListSQLDatabaseTool,QuerySQLCheckerTool,QuerySQLDataBaseTool  
from langchain_community.utilities import SQLDatabase
from crewai_tools import tool
from crewai import Agent, Task, Crew, Process, LLM
load_dotenv()

def get_engine_for_chinook_db():
    """Pull sql file, populate in-memory database, and create engine."""
    url = "https://raw.githubusercontent.com/lerocha/chinook-database/master/ChinookDatabase/DataSources/Chinook_Sqlite.sql"
    response = requests.get(url)
    sql_script = response.text

    connection = sqlite3.connect(":memory:", check_same_thread=False)
    connection.executescript(sql_script)
    return create_engine(
        "sqlite://",
        creator=lambda: connection,
        poolclass=StaticPool,
        connect_args={"check_same_thread": False},
    )


engine = get_engine_for_chinook_db()

db = SQLDatabase(engine)

In [None]:
# LLM

API_KEY=os.getenv("GOOGLE_API_KEY", GOOGLE_API_KEY)
llm = LLM(model="gemini/gemini-2.0-flash",provider="google", api_key=API_KEY)

In [7]:
# --- Tool Definitions for CrewAI ---

# Tool for the Query Writer Agent: List available tables
@tool("List Database Tables")
def list_tables_tool():
    """
    Returns a list of table names available in the database.
    Use this tool to know which tables you can query.
    Input: None
    """
    # No input needed for ListSQLDatabaseTool's run method
    return ListSQLDatabaseTool(db=db).invoke("")

# Tool for the Query Writer Agent: Get schema of specific tables
@tool("Get Table Schema")
def get_schema_tool(table_names: str):
    """
    Returns the schema (columns and their types) for the specified table(s).
    Provide table names as a comma-separated string.
    Use this tool to understand the structure of tables before writing a query.
    Input: A string containing comma-separated table names (e.g., 'users, orders').
    """
    return InfoSQLDatabaseTool(db=db).invoke(table_names)

# Tool for the Query Validator/Executor Agent: Check SQL query syntax
# @tool("Check SQL Query Syntax")
# def check_query_tool(sql_query: str)-> str:
#     """
#     Use this tool to double check if your query is correct before executing it. Always use this
#     tool before executing a query with `execute_sql`.
#     """
#     return QuerySQLCheckerTool(db=db, llm=llm).invoke({"query": sql_query}) # it didn't work with gemini and gemini already perform well

# Tool for the Query Validator/Executor Agent: Execute SQL query
@tool("Execute SQL Query")
def execute_query_tool(sql_query: str):
    """
    Executes a previously validated SQL query against the database.
    Use this ONLY after the query has been checked by the 'Check SQL Query Syntax' tool.
    Input: A string containing the SQL query to be executed.
    Returns the query results or an error message if execution fails.
    """
    # Note: Using QuerySQLDataBaseTool (with capital B in DataBase)
    executor_tool = QuerySQLDataBaseTool(db=db)
    return executor_tool.invoke(sql_query)



In [8]:
### Agents ###

# --- Define Agents with Tools To be Used ---

schema_agent = Agent(
    role="Database Schema Expert",
    goal="Interpret and explain the database schema.",
    backstory="You understand the structure of the database, including tables, schemas, and relationships.",
    tools=[list_tables_tool,get_schema_tool],
    verbose=True,
    llm = llm,
    allow_delegation=False
)

query_builder_agent = Agent(
    role="SQL Architect",
    goal="Construct precise SQL queries based on the user query and schema_agent output.",
    backstory="You use natural language, database schema, and business rules to generate valid and optimized SQL queries.",
    verbose=True,
    llm=llm,
    allow_delegation=False
)

executor_agent = Agent(
    role="Query Runner",
    goal="Execute SQL and return clear, structured answers.",
    backstory="You take SQL queries, run them on the database, and format the results in human-readable form.",
    tools=[execute_query_tool],
    verbose=True,
    llm=llm,
    allow_delegation=False
)

In [9]:
### TASKS ###

# --- Define Tasks with Expected Outputs ---

task_1 = Task(
    description= "Analyze the database schema and identify which tables and columns are relevant"
    "to the user query {input}. Refine the user query to include those details.",
    agent=schema_agent,
    expected_output=(
        "A concise summary identifying the database table and column(s) needed to answer the user question. "
        "This should clearly state the target table and the mechanism "
    )
)

task_2 = Task(
    description=(
        "Based on the schema analysis summary provided by the previous task, construct the precise SQL query "
        "needed to ansswer the user query {input} in the database. Ensure the query is syntactically correct. "
        "validate syntax before finalizing."
    ),
    agent=query_builder_agent,
    # Context is implicitly passed between tasks in CrewAI if they are sequential
    expected_output=(
        "A single, ready-to-execute SQL query string that accurately reflects the requirement to answer the user question "
        "based on the identified table and criteria. "
    )
)

task_3 = Task(
    description=(
        "Take the provided SQL query string. Execute this query against the database using the 'Execute SQL Query' tool. "
        "Retrieve the raw result (which should be a count). "
        "Format this numerical result into a clear, friendly, natural language sentence that directly answers the original user query: '{input}'."
    ),
    agent=executor_agent,
    expected_output=(
        "A final, human-readable sentence stating the numbers and info needed "
        "The sentence should directly answer the initial question. "
    )
)



crew = Crew(agents=[schema_agent,query_builder_agent,executor_agent],tasks=[task_1,task_2,task_3],verbose=True)
user_question = 'How many Albums we have?'
result = crew.kickoff(inputs={"input": user_question})
print("🔍 Final Answer:\n", result)

[1m[95m# Agent:[00m [1m[92mDatabase Schema Expert[00m
[95m## Task:[00m [92mAnalyze the database schema and identify which tables and columns are relevantto the user query How many Albums we have?. Refine the user query to include those details.[00m


[1m[95m# Agent:[00m [1m[92mDatabase Schema Expert[00m
[95m## Thought:[00m [92mI need to explore the database to identify the table containing album information and then determine the relevant columns for counting the albums.[00m
[95m## Using tool:[00m [92mList Database Tables[00m
[95m## Tool Input:[00m [92m
"{}"[00m
[95m## Tool Output:[00m [92m
Album, Artist, Customer, Employee, Genre, Invoice, InvoiceLine, MediaType, Playlist, PlaylistTrack, Track[00m


[1m[95m# Agent:[00m [1m[92mDatabase Schema Expert[00m
[95m## Thought:[00m [92mThought:The 'Album' table seems like the most relevant table for answering the question "How many Albums we have?". Now, I need to get the schema of the 'Album' table to c