In [12]:
from crewai import Agent, Task, Crew
from crewai_tools import Tool
import json
import sqlite3
from typing import Dict, Any
import os

#########################
# Setup Data and Paths
#########################

BASE_DIR = "./database/"
db_map = {}

for db_id in os.listdir(BASE_DIR):
    db_dir_path = os.path.join(BASE_DIR, db_id)
    if os.path.isdir(db_dir_path):
        # Try to find a .sqlite file or .db file that matches the db_id
        sqlite_path = os.path.join(db_dir_path, f"{db_id}.sqlite")
        db_path = os.path.join(db_dir_path, f"{db_id}.db")

        if os.path.exists(sqlite_path):
            db_map[db_id] = sqlite_path
        elif os.path.exists(db_path):
            db_map[db_id] = db_path
        else:
            # If neither file is found, you can skip or handle error
            print(f"No .sqlite or .db file found for {db_id}, skipping...")
            
# Assume we have a JSON schema file for the database(s).
schema_path = "./schema_info.json"
with open(schema_path, 'r') as f:
    schema_info = json.load(f)



In [25]:
class SchemaInfoTool(Tool):
    def __init__(self):
        super().__init__(
            name="schema_info_tool",
            description="Return the schema_info for a given db_id, along with question and reasoning_type.",
            func=self.run
        )

    def run(self, question: str, db_id: str, reasoning_type: str) -> Dict[str, Any]:
        db_schema = schema_info.get(db_id, {})
        return {
            "schema_info": db_schema,
            "db_id": db_id,
            "reasoning_type": reasoning_type,
            "question": question
        }


In [30]:
class SQLExecutionTool(Tool):
    def __init__(self):
        super().__init__(
            name="sql_execution_tool",
            description="Execute the provided SQL query on the specified db_id and return the results.",
            func=self.run
        )

    def run(self, sql_query: str, db_id: str) -> Dict[str, Any]:
        if db_id not in db_map:
            return {"error": f"No database found for db_id={db_id}", "sql_query": sql_query, "db_id": db_id}

        db_path = db_map[db_id]
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        results = []
        columns = []
        try:
            cursor.execute(sql_query)
            results = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description] if cursor.description else []
        except Exception as e:
            print(f"Error executing query: {e}")
        finally:
            conn.close()

        return {
            "columns": columns,
            "rows": results,
            "db_id": db_id,
            "sql_query": sql_query
        }

In [31]:
#########################
# Define Agents
#########################


# 1. Knowledge Agent
knowledge_agent = Agent(
    name="knowledge_agent",
    role="System",
    goal="Provide schema information and other database knowledge.",
    backstory="This agent holds database schema details.",
    tools=[SchemaInfoTool()],
    llm="gpt-4o-mini"  # or None if no LLM
)

# The description will instruct the agent to use the schema_info_tool.
knowledge_task = Task(
    description=(
        "You are a system agent. The user provides a question, db_id, and reasoning_type. "
        "Use the schema_info_tool to retrieve the schema information for the given db_id."
    ),
    expected_output="Schema info and context",
    agent=knowledge_agent,
    parameters={
        "question": "{question}",
        "db_id": "{db_id}",
        "reasoning_type": "{reasoning_type}"
    }
)



In [32]:
# 2. Schema Linking Agent: Identifies relevant tables/columns.
schema_linking_agent = Agent(
    name="schema_linking_agent",
    role="System",
    goal="Link user query to database schema elements.",
    backstory="This agent uses the schema info to find relevant tables/columns."
)

schema_linking_task = Task(
    description=(
        "Given the schema_info and the user question, identify relevant tables and columns. "
        "Return linked_tables and linked_columns as JSON."
    ),
    expected_output="Linked schema elements",
    agent=schema_linking_agent,
    parameters={
        "schema_info": knowledge_task,
        "question": "{question}",
        "db_id": "{db_id}",
        "reasoning_type": "{reasoning_type}"
    }
)


In [33]:

# 3. SQL Generation Agent
sql_generation_agent = Agent(
    name="sql_generation_agent",
    role="System",
    goal="Generate a correct SQL query from the question and schema links.",
    backstory="This agent uses reasoning to produce an SQL query.",
    llm="gpt-4o-mini"
)

sql_generation_task = Task(
    description=(
        "Given the linked_tables and linked_columns, and the question, generate a valid SQL query."
    ),
    expected_output="SQL query string",
    agent=sql_generation_agent,
    parameters={
        "linked_tables": schema_linking_task,
        "linked_columns": schema_linking_task,
        "question": "{question}",
        "db_id": "{db_id}",
        "reasoning_type": "{reasoning_type}"
    }
)





In [34]:
# 4. SQL Execution Agent
sql_execution_agent = Agent(
    name="sql_execution_agent",
    role="System",
    goal="Execute the SQL query and return the results.",
    backstory="This agent runs the SQL against the specified database.",
    tools=[SQLExecutionTool()]
)

sql_execution_task = Task(
    description=(
        "Use the sql_execution_tool to run the given SQL query on db_id and return the results."
    ),
    expected_output="Query results",
    agent=sql_execution_agent,
    parameters={
        "sql_query": sql_generation_task,
        "db_id": "{db_id}",
        "reasoning_type": "{reasoning_type}",
        "question": "{question}"
    }
)

In [37]:
#########################
# Create Crew and Run
#########################

crew = Crew(
    agents=[
        knowledge_agent,
        schema_linking_agent,
        sql_generation_agent,
        sql_execution_agent
    ],
    tasks=[
        knowledge_task,
        schema_linking_task,
        sql_generation_task,
        sql_execution_task
    ]
)

# Example usage:
db_id = "concert_singer"
reasoning_type = "- + C"
question = 'List the name and age of each singer at the time of the release of the song "Gentleman".'

result = crew.kickoff(
    inputs={
        "question": question,
        "db_id": db_id,
        "reasoning_type": reasoning_type
    }
)


print("---------------------------------------------------")
print("Final Result:")
print(result)

2024-12-19 06:35:14,592 - 140634424218048 - llm.py-llm:170 - ERROR: LiteLLM call failed: litellm.AuthenticationError: AuthenticationError: OpenAIException - The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable
2024-12-19 06:35:14,598 - 140634424218048 - llm.py-llm:170 - ERROR: LiteLLM call failed: litellm.AuthenticationError: AuthenticationError: OpenAIException - The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable
2024-12-19 06:35:14,602 - 140634424218048 - llm.py-llm:170 - ERROR: LiteLLM call failed: litellm.AuthenticationError: AuthenticationError: OpenAIException - The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable




LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True'.



LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True'.



LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True'.



AuthenticationError: litellm.AuthenticationError: AuthenticationError: OpenAIException - The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable