<a href="https://colab.research.google.com/github/aadi22346/LLM-assignment-3/blob/main/llm_assignment_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [16]:
import sqlite3
import os
import re

In [12]:
import google.generativeai as genai

class Agent:
    def __init__(self, client: genai, system: str):

        self.client = client
        self.system = system
        self.messages = []

        self.model = self.client.GenerativeModel(
            "gemini-2.5-flash-lite",
            system_instruction=self.system
        )

    def __call__(self, message: str):

        if message:
            self.messages.append({"role": "user", "content": message})

        result = self.execute()
        self.messages.append({"role": "model", "content": result})
        return result

    def execute(self):

        formatted_messages = []
        for message in self.messages:
            formatted_messages.append({"role": message["role"], "parts": [{"text": message["content"]}]})

        response = self.model.generate_content(formatted_messages)

        return response.text

In [3]:
conn = sqlite3.connect('my_database.db')
cursor = conn.cursor()
print("Database created successfully!")

Database created successfully!


In [4]:
create_users_table_sql = """
CREATE TABLE users (
    user_id INTEGER PRIMARY KEY,
    username TEXT,
    email TEXT
);
"""

cursor.execute(create_users_table_sql)

create_orders_table_sql = """
CREATE TABLE orders (
    order_id INTEGER PRIMARY KEY,
    user_id INTEGER,
    product_name TEXT,
    quantity INTEGER,
    FOREIGN KEY (user_id) REFERENCES users (user_id)
);
"""

cursor.execute(create_orders_table_sql)

<sqlite3.Cursor at 0x79b226fafec0>

In [5]:
users_data = [
    ('Alice', 'alice@example.com'),
    ('Bob', 'bob@example.com'),
    ('Charlie', 'charlie@example.com'),
    ('David', 'david@example.com'),
    ('Eve', 'eve@example.com'),
]

insert_users_sql = "INSERT INTO users (username, email) VALUES (?, ?);"
cursor.executemany(insert_users_sql, users_data)

orders_data = [
    (1, 'Laptop', 1),
    (1, 'Keyboard', 1),
    (2, 'Mouse', 2),
    (3, 'Monitor', 1),
    (4, 'Webcam', 1),
    (4, 'Headphones', 1),
    (5, 'Printer', 1),
]

insert_orders_sql = "INSERT INTO orders (user_id, product_name, quantity) VALUES (?, ?, ?);"
cursor.executemany(insert_orders_sql, orders_data)

conn.commit()

In [6]:
select_users_sql = "SELECT * FROM users;"
cursor.execute(select_users_sql)
users_data = cursor.fetchall()
print("Data in 'users' table:")
for row in users_data:
    print(row)

select_orders_sql = "SELECT * FROM orders;"
cursor.execute(select_orders_sql)
orders_data = cursor.fetchall()
print("\nData in 'orders' table:")
for row in orders_data:
    print(row)

Data in 'users' table:
(1, 'Alice', 'alice@example.com')
(2, 'Bob', 'bob@example.com')
(3, 'Charlie', 'charlie@example.com')
(4, 'David', 'david@example.com')
(5, 'Eve', 'eve@example.com')

Data in 'orders' table:
(1, 1, 'Laptop', 1)
(2, 1, 'Keyboard', 1)
(3, 2, 'Mouse', 2)
(4, 3, 'Monitor', 1)
(5, 4, 'Webcam', 1)
(6, 4, 'Headphones', 1)
(7, 5, 'Printer', 1)


In [7]:
prompt = """
You are a helpful assistant that runs in a loop of Thought, Action, PAUSE, Observation.
Your goal is to answer questions about a SQL database.
At the end of the loop, when you have the answer, you output a FINAL ANSWER.

Use Thought to describe your step-by-step reasoning about the question.
Use Action to run one of the actions available to you. Your output must stop after the Action, and you will return PAUSE.
Observation will be the result of running that action.

Your available actions are:

list_tables:
Lists all tables in the database.
e.g. Action: list_tables{}

describe_table:
Returns the schema (columns and types) for a specific table.
e.g. Action: describe_table{"table_name": "users"}

query_database:
Executes a single, safe, read-only SQL SELECT query.
e.g. Action: query_database{"query": "SELECT COUNT(*) FROM users"}

**Important Rules:**
1.  **Always** check the schema first. Before running `query_database`, you must use `list_tables` and `describe_table` to know the table and column names.
2.  Your `Action` output must be the tool name followed by a valid JSON object for its arguments.
3.  You can **only** execute `SELECT` queries.

Example session:

Question: How many users are there?
Thought: I need to find a table that contains user information. I will start by listing all available tables.
Action: list_tables{}
PAUSE

You will be called again with this:

Observation: ['users', 'orders']

You then output:

Thought: The 'users' table seems correct. I should check its columns to confirm before querying it.
Action: describe_table{"table_name": "users"}
PAUSE

You will be called again with this:

Observation: [("user_id", "INTEGER"), ("username", "TEXT"), ("email", "TEXT")]

You then output:

Thought: The 'users' table has a 'user_id' and 'username', so it is the correct table. I can now count the total number of rows to answer the question.
Action: query_database{"query": "SELECT COUNT(*) FROM users"}
PAUSE

You will be called again with this:

Observation: [(5,)]

You then output:

Thought: The query returned a count of 5. This is the final answer.
FINAL ANSWER: There are 5 users.
""".strip()

In [10]:
import re
import json
action_re = re.compile(r'^Action: (\w+)({.*})$', re.IGNORECASE)


known_actions = {
    "list_tables": list_tables,
    "describe_table": describe_table,
    "query_database": query_database,
}

def query(question, max_turns=5):
    i = 0
    genai.configure(api_key="A*************************M")
    bot = Agent(client=genai, system=prompt)
    next_prompt = question

    print(f"Question: {question}\n")

    while i < max_turns:
        i += 1
        result = bot(next_prompt)
        print(result)


        if "FINAL ANSWER:" in result:
            return

        actions = [action_re.search(a) for a in result.split('\n') if action_re.search(a)]

        if actions:
            action, json_input_str = actions[0].groups()

            if action.lower() not in known_actions:
                print(f"Observation: Unknown action '{action}'.")
                next_prompt = f"Observation: Unknown action '{action}'. Please use a known tool."
                continue

            print(f" -- running {action} with {json_input_str}")

            try:

                action_args = json.loads(json_input_str)
            except json.JSONDecodeError:
                print(f"Observation: Invalid JSON format for action.")
                next_prompt = "Observation: Invalid JSON. Please provide a valid JSON object for the action."
                continue


            try:
                observation = known_actions[action.lower()](**action_args)
                print(f"Observation: {observation}")
                next_prompt = f"Observation: {observation}"
            except Exception as e:
                print(f"Observation: Error running tool: {e}")
                next_prompt = f"Observation: Error: {e}"

        else:
            print("No action found, stopping.")
            return

In [9]:
def list_tables():
  try:
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()
    table_list = [table[0] for table in tables]
    return table_list

  except Exception as e:
    return f"Error listing tables: {e}"


def describe_table(table_name:str):
  safe_tables = list_tables()
  if table_name not in safe_tables:
    return f"Error: Table '{table_name}' does not exist."

  try:
        cursor = conn.cursor()
        cursor.execute(f"PRAGMA table_info('{table_name}');")
        columns_info = cursor.fetchall()

        if not columns_info:
            return f"Error: Table '{table_name}' does not exist or has no columns."

        schema = [(col[1], col[2]) for col in columns_info]
        return schema

  except Exception as e:
    return f"Error describing {table_name} : {e}"


def is_safe_select(query: str):

    query_lower = query.strip().lower()

    if not query_lower.startswith('select'):
        return False

    forbidden_keywords = [
        'insert', 'update', 'delete', 'drop', 'alter', 'create',
        'truncate', 'grant', 'revoke', 'attach', 'detach'
    ]

    for keyword in forbidden_keywords:
        if re.search(r'\b' + keyword + r'\b', query_lower):
            return False

    return True

def query_database(query: str):

    if not is_safe_select(query):

        return f"Error: Query is not a safe, read-only SELECT statement."

    try:

        cursor = conn.cursor()


        cursor.execute(query)


        results = cursor.fetchall()


        headers = [desc[0] for desc in cursor.description]


        return {"columns": headers, "rows": results}

    except Exception as e:

        return f"Error querying database: {e}"

In [13]:
query("How many users are in the database?")

Question: How many users are in the database?

Thought: I need to find the table that contains user information and then count the number of users. I will start by listing all the tables in the database.
Action: list_tables{}
PAUSE
 -- running list_tables with {}
Observation: ['users', 'orders']
Thought: The 'users' table seems like the most relevant table for counting users. I should inspect its schema to confirm and to see how users are represented.
Action: describe_table{"table_name": "users"}
PAUSE
 -- running describe_table with {"table_name": "users"}
Observation: [('user_id', 'INTEGER'), ('username', 'TEXT'), ('email', 'TEXT')]
Thought: The 'users' table has a 'user_id' column, which is suitable for identifying and counting distinct users. I can now query the database to get the count of users.
Action: query_database{"query": "SELECT COUNT(user_id) FROM users"}
PAUSE
 -- running query_database with {"query": "SELECT COUNT(user_id) FROM users"}
Observation: {'columns': ['COUNT(us

In [14]:
query("How many orders are in the database?")

Question: How many orders are in the database?

Thought: I need to find out how many orders are in the database. I should start by listing all the tables to see if there's an 'orders' table.
Action: list_tables{}
PAUSE
 -- running list_tables with {}
Observation: ['users', 'orders']
Thought: The 'orders' table likely contains the order information. I should inspect its schema to confirm.
Action: describe_table{"table_name": "orders"}
PAUSE
 -- running describe_table with {"table_name": "orders"}
Observation: [('order_id', 'INTEGER'), ('user_id', 'INTEGER'), ('product_name', 'TEXT'), ('quantity', 'INTEGER')]
Thought: The 'orders' table has columns like 'order_id' and 'product_name', which indicates it's the correct table to count orders. I will now count the number of rows in the 'orders' table.
Action: query_database{"query": "SELECT COUNT(*) FROM orders"}
PAUSE
 -- running query_database with {"query": "SELECT COUNT(*) FROM orders"}
Observation: {'columns': ['COUNT(*)'], 'rows': [(7,)

In [15]:
query("What is the most ordered item")

Question: What is the most ordered item

Thought: I need to find out which item is ordered the most. This likely involves looking at order details and item information. I'll start by listing the tables to see what's available.
Action: list_tables{}
PAUSE

 -- running list_tables with {}
Observation: ['users', 'orders']
Thought: The 'orders' table seems relevant. I need to inspect its schema to understand how order items are represented.
Action: describe_table{"table_name": "orders"}
PAUSE

 -- running describe_table with {"table_name": "orders"}
Observation: [('order_id', 'INTEGER'), ('user_id', 'INTEGER'), ('product_name', 'TEXT'), ('quantity', 'INTEGER')]
Thought: The 'orders' table contains 'product_name' and 'quantity'. To find the most ordered item, I should sum the quantities for each product and then find the product with the maximum sum.
Action: query_database{"query": "SELECT product_name, SUM(quantity) AS total_quantity FROM orders GROUP BY product_name ORDER BY total_quantit