In [1]:
import openai
import os
import sqlite3
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, HumanMessagePromptTemplate, MessagesPlaceholder
from langchain.tools import Tool, StructuredTool
from langchain.agents import OpenAIFunctionsAgent, AgentExecutor
from langchain.schema import SystemMessage
from langchain.memory import ConversationBufferMemory
from pydantic.v1 import BaseModel
from typing import List, Union, Tuple


In [2]:
# Path to database
db_path = os.getenv("DATABASE_PATH", "db.sqlite")

In [3]:
# Set up your OpenAI API key. You need an API key from OpenAI to use GPT models.

api_key = "*****"

In [4]:
# Initialize chat with OpenAI

chat = ChatOpenAI(openai_api_key=api_key)

  warn_deprecated(


In [5]:
# Define a function to Execute SQLite Queries
def run_sqlite_query(query: str) -> Union[str, List[Tuple]]:
    """Execute a SQLite query and fetch the results.

    Args:
        query: The SQL query to execute.

    Returns:
        A list of tuples containing the query results or an error message.
    """
    with sqlite3.connect(db_path) as conn:
        cursor = conn.cursor()
        try:
            cursor.execute(query)
            return cursor.fetchall()
        except sqlite3.OperationalError as err:
            return f"The following error occurred: {str(err)}"
    

In [6]:
# Create Schema Definition for Query Arguments

class RunQueryArgsSchema(BaseModel):
    query: str  

In [7]:
# Creating a Tool to Run SQLite Queries

run_query_tool = Tool.from_function(
    name="run_query",
    description="Run sqlite query",
    func=run_sqlite_query,
    args_schema=RunQueryArgsSchema    
)

In [9]:
# Create a function to retrieve table names from a database

def list_tables():
    rows = run_sqlite_query("SELECT name FROM sqlite_master WHERE type='table';")
    return "\n".join(row[0] for row in rows if row[0] is not None)
tables = list_tables()

In [10]:
# Define a function for detailing table schemas

def describe_tables(table_names):
    tables = ', '.join("'" + table + "'" for table in table_names)
    rows = run_sqlite_query(f"SELECT sql FROM sqlite_master WHERE type='table' and name IN ({tables});")
    return "\n".join(row[0] for row in rows if row[0] is not None)
    

In [11]:
# Construct a tool leveraging the describe_tables function for database introspection

class DescribeTablesArgsSchema(BaseModel):
    table_names: List[str]

describe_tables_tool = Tool.from_function(
    name="describe_table",
    description="Given a list of table names returns a squema of the tables",
    func=describe_tables,
    args_schema=DescribeTablesArgsSchema
)


In [12]:
# Define a function for generating HTML reports based on query results

def write_report(filename, html):
    with open(filename, 'w') as f:
        f.write(html)

In [13]:
# Create a reporting tool that utilizes the write_report function for output formatting

class WriteReportsArgsSchema(BaseModel):
    filename: str
    html: str
    
write_report_tool = StructuredTool.from_function(
    name="write_report",
    description="write html file to disk. Use this tools whenever someone asks for a report",
    func=write_report,
    args_schema=WriteReportsArgsSchema
)

In [14]:
# List all tools designed for database querying and report generation

tools = [run_query_tool, describe_tables_tool, write_report_tool]

In [15]:
# Allocate memory for storing conversational context and history

memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

In [16]:
# Designs a prompt template for guiding conversational agent interactions

prompt = ChatPromptTemplate(
    messages=[
        SystemMessage(content=(
            "You are an AI that has access to a SQLite database.\n{tables}.\n"
            f"The database has tables of: {tables}\n"
            "Do not make any assumptions about what tables or columns exist "
            "Instead, use the 'describe_tables' function"
        )),
        MessagesPlaceholder(variable_name="chat_history"),
        HumanMessagePromptTemplate.from_template("{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad")
    ]
)  

In [17]:
# Create an agent

agent = OpenAIFunctionsAgent(
    llm=chat,
    prompt=prompt,
    tools=tools
)

  warn_deprecated(


In [18]:
# Deploys an agent executor to facilitate the execution of defined operations and user queries

agent_executor = AgentExecutor(
    agent=agent,
    # verbose=True,
    tools=tools,
    handle_parsing_errors=True,
    memory=memory
)

In [19]:
# Conducts tests to validate the conversational agent's functionality and response accuracy

response = agent_executor("How many users are in the database?")
print(response["output"])
response = agent_executor("and how many products?")
print(response["output"])
response = agent_executor("How many users bought more then 5 products?")
print(response["output"])
response = agent_executor("Generate a report about 5 most popular product and write it intoa file")
print(response["output"])

  warn_deprecated(


There are 2000 users in the database.
There are 4000 products in the database.
There are 120 users who have bought more than 5 products.
I have generated a report on the 5 most popular products and saved it as "popular_products_report.html".
