<a href="https://colab.research.google.com/github/doodle294/Vizuara_Assignment3_Tanush/blob/main/VizuaraTanushllm_assignment_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import sqlite3
import os
import re
import httpx

In [None]:
os.environ['GEMINI_API_KEY'] = ""

In [None]:
import google.generativeai as genai

class Agent:
    def __init__(self, client: genai, system: str):

        self.client = client
        self.system = system
        self.messages = []


        self.model = self.client.GenerativeModel(
            "gemini-2.5-flash-lite",
            system_instruction=self.system
        )

    def __call__(self, message: str):

        if message:
            self.messages.append({"role": "user", "content": message})

        result = self.execute()
        self.messages.append({"role": "model", "content": result})
        return result

    def execute(self):

        formatted_messages = []
        for message in self.messages:
            formatted_messages.append({"role": message["role"], "parts": [{"text": message["content"]}]})


        response = self.model.generate_content(formatted_messages)

        return response.text

In [None]:
conn = sqlite3.connect('movie_db.db')
cursor = conn.cursor()
print("Database created successfully!")

Database created successfully!


In [None]:
# --- START: Clean up existing tables (NEW CODE) ---
# Safely drop tables to prevent "already exists" errors
cursor.execute("DROP TABLE IF EXISTS movie_actors;")
cursor.execute("DROP TABLE IF EXISTS actors;")
cursor.execute("DROP TABLE IF EXISTS movies;")
# --- END: Clean up existing tables ---


# Create Movies table
create_movies_table_sql = """
CREATE TABLE movies (
    movie_id INTEGER PRIMARY KEY,
    title TEXT NOT NULL,
    release_year INTEGER,
    genre TEXT,
    rating REAL
);
"""
cursor.execute(create_movies_table_sql)

# Create Actors table
create_actors_table_sql = """
CREATE TABLE actors (
    actor_id INTEGER PRIMARY KEY,
    name TEXT NOT NULL
);
"""
cursor.execute(create_actors_table_sql)

# Create Junction Table for Many-to-Many relationship
create_movie_actors_table_sql = """
CREATE TABLE movie_actors (
    movie_id INTEGER,
    actor_id INTEGER,
    role TEXT,
    PRIMARY KEY (movie_id, actor_id),
    FOREIGN KEY (movie_id) REFERENCES movies (movie_id),
    FOREIGN KEY (actor_id) REFERENCES actors (actor_id)
);
"""
cursor.execute(create_movie_actors_table_sql)

print("Tables created: movies, actors, movie_actors")

Tables created: movies, actors, movie_actors


In [None]:
# Insert Movie Data
movies_data = [
    (1, 'Inception', 2010, 'Sci-Fi', 8.8),
    (2, 'Pulp Fiction', 1994, 'Crime', 8.9),
    (3, 'The Dark Knight', 2008, 'Action', 9.0),
    (4, 'Interstellar', 2014, 'Sci-Fi', 8.6),
    (5, 'Fight Club', 1999, 'Drama', 8.8)
]
insert_movies_sql = "INSERT INTO movies VALUES (?, ?, ?, ?, ?);"
cursor.executemany(insert_movies_sql, movies_data)

# Insert Actor Data
actors_data = [
    (101, 'Leonardo DiCaprio'),
    (102, 'Christian Bale'),
    (103, 'Brad Pitt'),
    (104, 'Uma Thurman'),
    (105, 'Matthew McConaughey')
]
insert_actors_sql = "INSERT INTO actors VALUES (?, ?);"
cursor.executemany(insert_actors_sql, actors_data)

# Insert Movie_Actors Data (Mapping actors to movies)
movie_actors_data = [
    (1, 101, 'Protagonist'), # Inception - DiCaprio
    (3, 102, 'Batman'),      # Dark Knight - Bale
    (3, 101, 'Supporting'),  # Dark Knight - DiCaprio (for demonstration/joining)
    (5, 103, 'Narrator'),    # Fight Club - Pitt
    (2, 104, 'Mia Wallace'), # Pulp Fiction - Thurman
    (4, 105, 'Cooper')       # Interstellar - McConaughey
]
insert_movie_actors_sql = "INSERT INTO movie_actors VALUES (?, ?, ?);"
cursor.executemany(insert_movie_actors_sql, movie_actors_data)

conn.commit()
print("Data insertion complete.")

Data insertion complete.


In [None]:
select_movies_sql = "SELECT * FROM movies;"
cursor.execute(select_movies_sql)
print("Data in 'movies' table:")
for row in cursor.fetchall():
    print(row)

select_actors_sql = "SELECT * FROM actors;"
cursor.execute(select_actors_sql)
print("\nData in 'actors' table:")
for row in cursor.fetchall():
    print(row)

select_movie_actors_sql = "SELECT * FROM movie_actors;"
cursor.execute(select_movie_actors_sql)
print("\nData in 'movie_actors' table:")
for row in cursor.fetchall():
    print(row)

Data in 'movies' table:
(1, 'Inception', 2010, 'Sci-Fi', 8.8)
(2, 'Pulp Fiction', 1994, 'Crime', 8.9)
(3, 'The Dark Knight', 2008, 'Action', 9.0)
(4, 'Interstellar', 2014, 'Sci-Fi', 8.6)
(5, 'Fight Club', 1999, 'Drama', 8.8)

Data in 'actors' table:
(101, 'Leonardo DiCaprio')
(102, 'Christian Bale')
(103, 'Brad Pitt')
(104, 'Uma Thurman')
(105, 'Matthew McConaughey')

Data in 'movie_actors' table:
(1, 101, 'Protagonist')
(3, 102, 'Batman')
(3, 101, 'Supporting')
(5, 103, 'Narrator')
(2, 104, 'Mia Wallace')
(4, 105, 'Cooper')


In [None]:
prompt = """
You are a helpful assistant that runs in a loop of Thought, Action, PAUSE, Observation.
Your goal is to answer questions about a SQL database.
At the end of the loop, when you have the answer, you output a FINAL ANSWER.

Use Thought to describe your step-by-step reasoning about the question.
Use Action to run one of the actions available to you. Your output must stop after the Action, and you will return PAUSE.
Observation will be the result of running that action.

Your available actions are:

list_tables:
Lists all tables in the database.
e.g. Action: list_tables{}

describe_table:
Returns the schema (columns and types) for a specific table.
e.g. Action: describe_table{"table_name": "users"}

query_database:
Executes a single, safe, read-only SQL SELECT query.
e.g. Action: query_database{"query": "SELECT COUNT(*) FROM users"}

**Important Rules:**
1.  **Always** check the schema first. Before running `query_database`, you must use `list_tables` and `describe_table` to know the table and column names.
2.  Your `Action` output must be the tool name followed by a valid JSON object for its arguments.
3.  You can **only** execute `SELECT` queries.

Example session:

Question: How many users are there?
Thought: I need to find a table that contains user information. I will start by listing all available tables.
Action: list_tables{}
PAUSE

You will be called again with this:

Observation: ['users', 'orders']

You then output:

Thought: The 'users' table seems correct. I should check its columns to confirm before querying it.
Action: describe_table{"table_name": "users"}
PAUSE

You will be called again with this:

Observation: [("user_id", "INTEGER"), ("username", "TEXT"), ("email", "TEXT")]

You then output:

Thought: The 'users' table has a 'user_id' and 'username', so it is the correct table. I can now count the total number of rows to answer the question.
Action: query_database{"query": "SELECT COUNT(*) FROM users"}
PAUSE

You will be called again with this:

Observation: [(5,)]

You then output:

Thought: The query returned a count of 5. This is the final answer.
FINAL ANSWER: There are 5 users.
""".strip()

In [None]:
import re
import json
action_re = re.compile(r'^Action: (\w+)({.*})$', re.IGNORECASE)


known_actions = {
    "list_tables": list_tables,
    "describe_table": describe_table,
    "query_database": query_database,
}

def query(question, max_turns=5):
    i = 0
    genai.configure(api_key="")
    bot = Agent(client=genai, system=prompt)
    next_prompt = question

    print(f"Question: {question}\n")

    while i < max_turns:
        i += 1
        result = bot(next_prompt)
        print(result)


        if "FINAL ANSWER:" in result:
            return

        actions = [action_re.search(a) for a in result.split('\n') if action_re.search(a)]

        if actions:
            action, json_input_str = actions[0].groups()

            if action.lower() not in known_actions:
                print(f"Observation: Unknown action '{action}'.")
                next_prompt = f"Observation: Unknown action '{action}'. Please use a known tool."
                continue

            print(f" -- running {action} with {json_input_str}")

            try:

                action_args = json.loads(json_input_str)
            except json.JSONDecodeError:
                print(f"Observation: Invalid JSON format for action.")
                next_prompt = "Observation: Invalid JSON. Please provide a valid JSON object for the action."
                continue


            try:
                observation = known_actions[action.lower()](**action_args)
                print(f"Observation: {observation}")
                next_prompt = f"Observation: {observation}"
            except Exception as e:
                print(f"Observation: Error running tool: {e}")
                next_prompt = f"Observation: Error: {e}"

        else:
            print("No action found, stopping.")
            return

In [None]:
def list_tables():
  try:
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()
    table_list = [table[0] for table in tables]
    return table_list

  except Exception as e:
    return f"Error listing tables: {e}"


def describe_table(table_name:str):
  safe_tables = list_tables()
  if table_name not in safe_tables:
    return f"Error: Table '{table_name}' does not exist."

  try:
        cursor = conn.cursor()
        cursor.execute(f"PRAGMA table_info('{table_name}');")
        columns_info = cursor.fetchall()

        if not columns_info:
            return f"Error: Table '{table_name}' does not exist or has no columns."

        schema = [(col[1], col[2]) for col in columns_info]
        return schema

  except Exception as e:
    return f"Error describing {table_name} : {e}"


def is_safe_select(query: str):

    query_lower = query.strip().lower()

    if not query_lower.startswith('select'):
        return False

    forbidden_keywords = [
        'insert', 'update', 'delete', 'drop', 'alter', 'create',
        'truncate', 'grant', 'revoke', 'attach', 'detach'
    ]

    for keyword in forbidden_keywords:
        if re.search(r'\b' + keyword + r'\b', query_lower):
            return False

    return True

def query_database(query: str):

    if not is_safe_select(query):

        return f"Error: Query is not a safe, read-only SELECT statement."

    try:

        cursor = conn.cursor()


        cursor.execute(query)


        results = cursor.fetchall()


        headers = [desc[0] for desc in cursor.description]


        return {"columns": headers, "rows": results}

    except Exception as e:

        return f"Error querying database: {e}"

In [None]:
query("List the titles of all movies that star Leonardo DiCaprio and were released after the year 2000.")

Question: List the titles of all movies that star Leonardo DiCaprio and were released after the year 2000.

Thought: I need to find movie titles, actor information, and release dates. I should start by listing the tables to see which tables might contain this information.
Action: list_tables{}
PAUSE
 -- running list_tables with {}
Observation: ['movies', 'actors', 'movie_actors']
Thought: I have three tables: 'movies', 'actors', and 'movie_actors'.
'movies' likely contains movie titles and release years.
'actors' likely contains actor names.
'movie_actors' is probably a linking table between movies and actors.

I need to:
1. Find the actor ID for 'Leonardo DiCaprio' from the 'actors' table.
2. Find movie IDs associated with that actor ID from the 'movie_actors' table.
3. Get the movie titles and release years for those movie IDs from the 'movies' table.
4. Filter the results to include only movies released after 2000.

First, I'll check the schema for the 'actors' table to find how act

In [None]:
query("What are the genres in the database and how many movies are in each?")

Question: What are the genres in the database and how many movies are in each?

Thought: I need to find the genres and count the number of movies associated with each genre. I'll start by listing the tables to see which tables might contain genre and movie information.
Action: list_tables{}
PAUSE

 -- running list_tables with {}
Observation: ['movies', 'actors', 'movie_actors']
Thought: The available tables are 'movies', 'actors', and 'movie_actors'. It's likely that 'movies' contains genre information or there's a separate table for genres. I should inspect the 'movies' table schema first.
Action: describe_table{"table_name": "movies"}
PAUSE

 -- running describe_table with {"table_name": "movies"}
Observation: [('movie_id', 'INTEGER'), ('title', 'TEXT'), ('release_year', 'INTEGER'), ('genre', 'TEXT'), ('rating', 'REAL')]
Thought: The 'movies' table has a 'genre' column, which is exactly what I need. Now I can write a query to count the number of movies for each genre.
Action: query_d

In [None]:
query("List all movies released before 2005 that have a rating greater than 8.8.")

Question: List all movies released before 2005 that have a rating greater than 8.8.

Thought: I need to find information about movies, their release year, and their ratings.
First, I need to see what tables are available in the database.
Action: list_tables{}
PAUSE

 -- running list_tables with {}
Observation: ['movies', 'actors', 'movie_actors']
Thought: The 'movies' table is the most likely place to find movie titles, release years, and ratings. I should inspect its schema.
Action: describe_table{"table_name": "movies"}
PAUSE

 -- running describe_table with {"table_name": "movies"}
Observation: [('movie_id', 'INTEGER'), ('title', 'TEXT'), ('release_year', 'INTEGER'), ('genre', 'TEXT'), ('rating', 'REAL')]
Thought: The 'movies' table has 'release_year' and 'rating' columns, which are exactly what I need to answer the question.
I need to select the 'title' of movies where 'release_year' is less than 2005 AND 'rating' is greater than 8.8.
Action: query_database{"query": "SELECT title F