# Assignment 3 - Migration from Relational to NoSQL

Student Name: Nirav Patel

Student ID: 40248940

Date: April 8, 2025

> The Python scripts that you will see in this file, and that can be viewed in the `scripts` folder use environment variables.

> You may set the required variables as explained in the `README.md` if you wish to run the scripts.


We begin by providing the DDL that is used to create the tables for the first data source, which is the Starwars API.

This file is conatined in the `ddl` folder. Running this SQL code is NOT required as the following Python code handles it.


In [None]:
-- Drop tables if they exist
DROP TABLE IF EXISTS people_films CASCADE;
DROP TABLE IF EXISTS people_species CASCADE;
DROP TABLE IF EXISTS people_starships CASCADE;
DROP TABLE IF EXISTS people_vehicles CASCADE;
DROP TABLE IF EXISTS films_species CASCADE;
DROP TABLE IF EXISTS films_starships CASCADE;
DROP TABLE IF EXISTS films_vehicles CASCADE;
DROP TABLE IF EXISTS films_planets CASCADE;

DROP TABLE IF EXISTS people CASCADE;
DROP TABLE IF EXISTS films CASCADE;
DROP TABLE IF EXISTS starships CASCADE;
DROP TABLE IF EXISTS vehicles CASCADE;
DROP TABLE IF EXISTS species CASCADE;
DROP TABLE IF EXISTS planets CASCADE;

-- Create tables
CREATE TABLE people (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    birth_year VARCHAR(20),
    eye_color VARCHAR(50),
    gender VARCHAR(20),
    hair_color VARCHAR(50),
    height VARCHAR(20),
    mass VARCHAR(20),
    skin_color VARCHAR(50),
    homeworld VARCHAR(255),
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

CREATE TABLE films (
    id SERIAL PRIMARY KEY,
    title VARCHAR(255) NOT NULL,
    episode_id INTEGER NOT NULL,
    opening_crawl TEXT,
    director VARCHAR(255),
    producer VARCHAR(255),
    release_date DATE,
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

CREATE TABLE starships (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    model VARCHAR(255),
    starship_class VARCHAR(255),
    manufacturer VARCHAR(255),
    cost_in_credits VARCHAR(50),
    length VARCHAR(50),
    crew VARCHAR(50),
    passengers VARCHAR(50),
    max_atmosphering_speed VARCHAR(50),
    hyperdrive_rating VARCHAR(50),
    MGLT VARCHAR(50),
    cargo_capacity VARCHAR(50),
    consumables VARCHAR(255),
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

CREATE TABLE vehicles (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    model VARCHAR(255),
    vehicle_class VARCHAR(255),
    manufacturer VARCHAR(255),
    length VARCHAR(50),
    cost_in_credits VARCHAR(50),
    crew VARCHAR(50),
    passengers VARCHAR(50),
    max_atmosphering_speed VARCHAR(50),
    cargo_capacity VARCHAR(50),
    consumables VARCHAR(255),
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

CREATE TABLE species (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    classification VARCHAR(255),
    designation VARCHAR(50),
    average_height VARCHAR(50),
    average_lifespan VARCHAR(50),
    eye_colors VARCHAR(255),
    hair_colors VARCHAR(255),
    skin_colors VARCHAR(255),
    language VARCHAR(255),
    homeworld VARCHAR(255),
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

CREATE TABLE planets (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    diameter VARCHAR(50),
    rotation_period VARCHAR(50),
    orbital_period VARCHAR(50),
    gravity VARCHAR(50),
    population VARCHAR(50),
    climate VARCHAR(255),
    terrain VARCHAR(255),
    surface_water VARCHAR(50),
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

-- Relationship Tables for Many-to-Many Relationships
CREATE TABLE people_films (
    person_url VARCHAR(255) REFERENCES people(url) ON DELETE CASCADE,
    film_url VARCHAR(255) REFERENCES films(url) ON DELETE CASCADE,
    PRIMARY KEY (person_url, film_url)
);

CREATE TABLE people_species (
    person_url VARCHAR(255) REFERENCES people(url) ON DELETE CASCADE,
    species_url VARCHAR(255) REFERENCES species(url) ON DELETE CASCADE,
    PRIMARY KEY (person_url, species_url)
);

CREATE TABLE people_starships (
    person_url VARCHAR(255) REFERENCES people(url) ON DELETE CASCADE,
    starship_url VARCHAR(255) REFERENCES starships(url) ON DELETE CASCADE,
    PRIMARY KEY (person_url, starship_url)
);

CREATE TABLE people_vehicles (
    person_url VARCHAR(255) REFERENCES people(url) ON DELETE CASCADE,
    vehicle_url VARCHAR(255) REFERENCES vehicles(url) ON DELETE CASCADE,
    PRIMARY KEY (person_url, vehicle_url)
);

CREATE TABLE films_species (
    film_url VARCHAR(255) REFERENCES films(url) ON DELETE CASCADE,
    species_url VARCHAR(255) REFERENCES species(url) ON DELETE CASCADE,
    PRIMARY KEY (film_url, species_url)
);

CREATE TABLE films_starships (
    film_url VARCHAR(255) REFERENCES films(url) ON DELETE CASCADE,
    starship_url VARCHAR(255) REFERENCES starships(url) ON DELETE CASCADE,
    PRIMARY KEY (film_url, starship_url)
);

CREATE TABLE films_vehicles (
    film_url VARCHAR(255) REFERENCES films(url) ON DELETE CASCADE,
    vehicle_url VARCHAR(255) REFERENCES vehicles(url) ON DELETE CASCADE,
    PRIMARY KEY (film_url, vehicle_url)
);

CREATE TABLE films_planets (
    film_url VARCHAR(255) REFERENCES films(url) ON DELETE CASCADE,
    planet_url VARCHAR(255) REFERENCES planets(url) ON DELETE CASCADE,
    PRIMARY KEY (film_url, planet_url)
);

We now provide the code used to fetch data for the desired entities from the API using Python.


In [None]:
import requests
import psycopg2
from psycopg2.extras import execute_values
from dotenv import load_dotenv
import os

# Load environment variables from the .env file
load_dotenv()

# Access the environment variables
DB_PARAMS = {
    "dbname": os.getenv("UNNORMALIZED_DB_NAME"),
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "host": os.getenv("DB_HOST"),
    "port": os.getenv("DB_PORT")
}

BASE_URL = "https://swapi.dev/api/"
SCHEMA_FILE = "ddl/01_starwars_api_tables.sql"

try:
    # Connect to the PostgreSQL database
    conn = psycopg2.connect(**DB_PARAMS)
    cursor = conn.cursor()

    # Read the SQL schema file
    with open(SCHEMA_FILE, "r", encoding="utf-8") as file:
        sql_script = file.read()

    # Execute the SQL script
    cursor.execute(sql_script)
    conn.commit()

    print("Star Wars Schema executed successfully.")

except Exception as e:
    print(f"Error executing schema: {e}")

finally:
    # Close the cursor and connection
    if cursor:
        cursor.close()
    if conn:
        conn.close()

# Connect to PostgreSQL
def get_db_connection():
    return psycopg2.connect(**DB_PARAMS)

# Fetch data from SWAPI with pagination
def fetch_data(endpoint):
    url = BASE_URL + endpoint
    data = []
    while url:
        response = requests.get(url)
        if response.status_code == 200:
            json_data = response.json()
            data.extend(json_data["results"])
            url = json_data["next"]  # Pagination
        else:
            print(f"Failed to fetch {endpoint}: {response.status_code}")
            break
    return data

# Insert data into database
def insert_data(table, data, columns):
    conn = get_db_connection()
    cur = conn.cursor()

    # Exclude relationship columns from the insert data
    non_relationship_columns = [col for col in columns if col not in ["films", "species", "starships", "vehicles", "planets"]]

    query = f"""
        INSERT INTO {table} ({', '.join(non_relationship_columns)})
        VALUES %s
        ON CONFLICT DO NOTHING
    """
    values = []
    for record in data:
        row = []
        for col in non_relationship_columns:
            if isinstance(record.get(col), list):
                row.append("{" + ",".join(record.get(col, [])) + "}")  # Convert list to PostgreSQL array format
            else:
                row.append(record.get(col, None))
        values.append(row)

    try:
        print("Inserting non-relationship data into table:", table)
        execute_values(cur, query, values)
        conn.commit()

    except Exception as e:
        print(f"Error inserting non-relationship data into {table}: {e}")
        conn.rollback()  # Rollback on error
    finally:
        cur.close()

    return conn, table, data, columns

def insert_relationship_data(conn, table, data, columns):
    cur = conn.cursor()

    # Phase 2: Insert relationship data for all tables
    if "films" in columns:
        for record in data:
            for film_url in record.get("films", []):
                print(f"Inserting into people_films: person_url={record['url']} film_url={film_url}")
                execute_values(cur, f"""
                    INSERT INTO people_films (person_url, film_url)
                    VALUES %s
                """, [(record["url"], film_url)])

    # Insert relationships for "species"
    if "species" in columns:
        for record in data:
            for species_url in record.get("species", []):
                if table == "people":  # Inserting into people_starships
                    print(f"Inserting into people_species: person_url={record['url']} species_url={species_url}")
                    execute_values(cur, f"""
                        INSERT INTO people_species (person_url, species_url)
                        VALUES %s
                    """, [(record["url"], species_url)])
                elif table == "films":  # Inserting into films_starships
                    print(f"Inserting into films_species: film_url={record['url']} species_url={species_url}")
                    execute_values(cur, f"""
                        INSERT INTO films_species (film_url, species_url)
                        VALUES %s
                    """, [(record["url"], species_url)])

    # Insert relationships for "starships"
    if "starships" in columns:
        for record in data:
            for starship_url in record.get("starships", []):
                if table == "people":  # Inserting into people_starships
                    print(f"Inserting into people_starships: person_url={record['url']} starship_url={starship_url}")
                    execute_values(cur, f"""
                        INSERT INTO people_starships (person_url, starship_url)
                        VALUES %s
                    """, [(record["url"], starship_url)])
                elif table == "films":  # Inserting into films_starships
                    print(f"Inserting into films_starships: film_url={record['url']} starship_url={starship_url}")
                    execute_values(cur, f"""
                        INSERT INTO films_starships (film_url, starship_url)
                        VALUES %s
                    """, [(record["url"], starship_url)])

    # Insert relationships for "vehicles"
    if "vehicles" in columns:
        for record in data:
            for vehicle_url in record.get("vehicles", []):
                if table == "people":  # Inserting into people_vehicles
                    print(f"Inserting into people_vehicles: person_url={record['url']} vehicle_url={vehicle_url}")
                    execute_values(cur, f"""
                        INSERT INTO people_vehicles (person_url, vehicle_url)
                        VALUES %s
                    """, [(record["url"], vehicle_url)])
                elif table == "films":  # Inserting into films_vehicles
                    print(f"Inserting into films_vehicles: film_url={record['url']} vehicle_url={vehicle_url}")
                    execute_values(cur, f"""
                        INSERT INTO films_vehicles (film_url, vehicle_url)
                        VALUES %s
                    """, [(record["url"], vehicle_url)])

    # Insert relationships for "planets"
    if "planets" in columns:
        for record in data:
            for planet_url in record.get("planets", []):
                print(f"Inserting into films_planets: film_url={record['url']} planet_url={planet_url}")
                execute_values(cur, f"""
                    INSERT INTO films_planets (film_url, planet_url)
                    VALUES %s
                """, [(record["url"], planet_url)])

    # Commit the relationship data after all inserts
    conn.commit()

def fetch_data_by_url(url):
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to fetch {url}: {response.status_code}")
        return []

# Fetch and store Star Wars data
def main():
    tables = {
        "people": ["name", "birth_year", "eye_color", "gender", "hair_color", "height", "mass", "skin_color", "homeworld", "url", "created", "edited", "films", "species", "starships", "vehicles"],
        "films": ["title", "episode_id", "opening_crawl", "director", "producer", "release_date", "url", "created", "edited", "species", "starships", "vehicles", "planets"],
        "planets": ["name", "diameter", "rotation_period", "orbital_period", "gravity", "population", "climate", "terrain", "surface_water", "url", "created", "edited"],
        "species": ["name", "classification", "designation", "average_height", "average_lifespan", "eye_colors", "hair_colors", "skin_colors", "language", "homeworld", "url", "created", "edited"],
        "vehicles": ["name", "model", "vehicle_class", "manufacturer", "length", "cost_in_credits", "crew", "passengers", "max_atmosphering_speed", "cargo_capacity", "consumables", "url", "created", "edited"],
        "starships": ["name", "model", "starship_class", "manufacturer", "cost_in_credits", "length", "crew", "passengers", "max_atmosphering_speed", "hyperdrive_rating", "MGLT", "cargo_capacity", "consumables", "url", "created", "edited"]
    }
    
    # Phase 1: Insert non-relationship data for all tables
    all_connections = []
    for table, columns in tables.items():
        print(f"Fetching {table}...")
        data = fetch_data(table)
        if data:
            print(f"Inserting {len(data)} records into {table}...")
            conn, table, data, columns = insert_data(table, data, columns)
            all_connections.append((conn, table, data, columns))
        else:
            print(f"No data fetched for {table}.")

    # Phase 2: Insert relationship data for all tables
    for conn, table, data, columns in all_connections:
        print(f"Inserting relationship data for {table}...")
        insert_relationship_data(conn, table, data, columns)
        conn.close()  # Close each connection after handling it

if __name__ == "__main__":
    main()


Star Wars Schema executed successfully.

Fetching people...
Inserting 82 records into people...
Inserting non-relationship data into table: people

Fetching films...
Inserting 6 records into films...
Inserting non-relationship data into table: films

Fetching planets...
Inserting 60 records into planets...
Inserting non-relationship data into table: planets

Fetching species...
Inserting 37 records into species...
Inserting non-relationship data into table: species

Fetching vehicles...
Inserting 39 records into vehicles...
Inserting non-relationship data into table: vehicles

Fetching starships...
Inserting 36 records into starships...
Inserting non-relationship data into table: starships
Inserting relationship data for people...
Inserting into people_films: person_url=https://swapi.dev/api/people/1/ film_url=https://swapi.dev/api/films/1/
Inserting into people_films: person_url=https://swapi.dev/api/people/1/ film_url=https://swapi.dev/api/films/2/
Inserting into people_films: person

We first provide the DDL script used to create the tables required for the data from the OMDB API.

This file is conatined in the `ddl` folder. Running this SQL code is NOT required as the following Python code handles it.


In [None]:
DROP TABLE IF EXISTS rating_providers CASCADE;
DROP TABLE IF EXISTS ratings CASCADE;

-- Table to store rating providers (e.g., Internet Movie Database, Rotten Tomatoes, Metacritic)
CREATE TABLE rating_providers (
    id SERIAL PRIMARY KEY,
    name TEXT UNIQUE NOT NULL
);

-- Table to store movie ratings
CREATE TABLE ratings (
    id SERIAL PRIMARY KEY,
    film_id SERIAL NOT NULL,
    imdb_id TEXT NOT NULL,  -- IMDb ID for the movie
    rating_provider_id INT NOT NULL,
    rating_value TEXT NOT NULL,  -- Ratings may be in different formats (e.g., "8.4/10", "94%", "78/100")
    FOREIGN KEY (rating_provider_id) REFERENCES rating_providers(id) ON DELETE CASCADE,
    FOREIGN KEY (film_id) REFERENCES films(id) ON DELETE CASCADE
);


We continue with the Python script used to fetch data from the OMDB API.


In [None]:
import requests
import psycopg2
from dotenv import load_dotenv
from dotenv import load_dotenv
import os

# Load environment variables from the .env file
load_dotenv()

# Database connection
conn = psycopg2.connect(
    dbname=os.getenv("UNNORMALIZED_DB_NAME"),
    user= os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    host=os.getenv("DB_HOST"),
    port=os.getenv("DB_PORT")
)
cur = conn.cursor()

# OMDB API Key
OMDB_BASE_URL = "http://www.omdbapi.com/"
SCHEMA_FILE = "ddl/02_omdb_tables.sql"

try:
    cursor = conn.cursor()

    # Read the SQL schema file
    with open(SCHEMA_FILE, "r", encoding="utf-8") as file:
        sql_script = file.read()

    # Execute the SQL script
    cursor.execute(sql_script)
    conn.commit()

    print("OMDB Schema executed successfully.")

except Exception as e:
    print(f"Error executing schema: {e}")

# Fetch all films (title and release year)
print("Fetching films from database...")
cur.execute("SELECT id, title, release_date FROM films;")
films = cur.fetchall()
print(f"Retrieved {len(films)} films.")

# Function to get movie ratings from OMDB API
def fetch_movie_ratings(title, year):
    params = {
        "t": title,
        "y": year,
        "apikey": os.getenv("OMDB_API_KEY")
    }
    print(f"Requesting OMDB API for {title} ({year})...")
    response = requests.get(OMDB_BASE_URL, params=params)
    if response.status_code == 200:
        print(f"Response received for {title} ({year})")
        return response.json()
    else:
        print(f"Failed to fetch data for {title} ({year}) - Status Code: {response.status_code}")
        return None

# Insert rating providers
print("Fetching existing rating providers...")
cur.execute("SELECT id, name FROM rating_providers;")
existing_providers = {name: id for id, name in cur.fetchall()}
print(f"Retrieved {len(existing_providers)} rating providers.")

# Process each film
for film_id, title, release_date in films:
    year = release_date.year if release_date else None
    if not year:
        print(f"Skipping {title} due to missing release year.")
        continue

    print(f"Fetching ratings for {title} ({year})")
    movie_data = fetch_movie_ratings(title, year)
    
    if not movie_data or movie_data.get("Response") == "False":
        print(f"No data found for {title} ({year}).")
        continue
    
    imdb_id = movie_data.get("imdbID", "N/A")
    print(f"IMDb ID for {title} ({year}): {imdb_id}")
    
    print(f"Processing ratings for {title} ({year})")
    for rating in movie_data.get("Ratings", []):
        provider_name = rating["Source"]
        rating_value = rating["Value"]
        print(f"Found rating: {provider_name} - {rating_value}")
        
        # Ensure provider exists
        if provider_name not in existing_providers:
            print(f"Inserting new rating provider: {provider_name}")
            cur.execute("INSERT INTO rating_providers (name) VALUES (%s) RETURNING id;", (provider_name,))
            provider_id = cur.fetchone()[0]
            existing_providers[provider_name] = provider_id
            conn.commit()
        else:
            provider_id = existing_providers[provider_name]
        
        # Insert rating with imdb_id
        print(f"Inserting rating for {title}: IMDb ID {imdb_id}, Provider ID {provider_id}, Value {rating_value}")
        cur.execute(
            """
            INSERT INTO ratings (film_id, imdb_id, rating_provider_id, rating_value)
            VALUES (%s, %s, %s, %s);
            """,
            (str(film_id), str(imdb_id), provider_id, rating_value)  # Ensuring correct types
        )
        conn.commit()

print("Closing database connection...")
cur.close() 
conn.close()
print("Ratings data populated successfully.")


After downloading the TMDB Movie dataset from Kaggle, the following DDL script was used to create the tables necessary for the data to be collected from TMDB.

This file is conatined in the `ddl` folder. Running this SQL code is NOT required as the following Python code handles it.


In [None]:
-- Delete the movie_metadata table if it exists
DROP TABLE IF EXISTS movie_metadata;

-- Table to store additional movie metadata (popularity and keywords)
CREATE TABLE movie_metadata (
    id INT PRIMARY KEY,
    imdb_id TEXT UNIQUE NOT NULL,
    popularity NUMERIC NOT NULL,
    overview VARCHAR(1000),
    runtime INTEGER,
    keywords TEXT NOT NULL,
    FOREIGN KEY (id) REFERENCES films(id) ON DELETE CASCADE
);


We then use Python to fetch data from the `.csv` file downloaded and use the Star Wars movie data only.


In [None]:
import csv
import psycopg2
from dotenv import load_dotenv
import os

# Load environment variables from the .env file
load_dotenv()

conn = psycopg2.connect(
    dbname=os.getenv("UNNORMALIZED_DB_NAME"),
    user= os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    host=os.getenv("DB_HOST"),
    port=os.getenv("DB_PORT")
)
cur = conn.cursor()

SCHEMA_FILE = "ddl/03_tmdb_tables.sql"

try:
    cursor = conn.cursor()

    # Read the SQL schema file
    with open(SCHEMA_FILE, "r", encoding="utf-8") as file:
        sql_script = file.read()

    # Execute the SQL script
    cursor.execute(sql_script)
    conn.commit()

    print("TMDB Schema executed successfully.")

except Exception as e:
    print(f"Error executing schema: {e}")

# Path to the CSV file
current_directory = os.getcwd()
csv_file_path = os.path.join(current_directory, "data", "tmdb_filtered_dataset.csv")

# Read IMDb IDs from ratings table and map them to film ids
print("Fetching IMDb IDs and corresponding film ids from ratings table...")
cur.execute("""
    SELECT r.imdb_id, f.id 
    FROM ratings r
    JOIN films f ON r.film_id = f.id;
""")
imdb_to_film_id = {row[0]: row[1] for row in cur.fetchall()}
print(f"Retrieved {len(imdb_to_film_id)} IMDb IDs and film IDs.")

# Read CSV and insert relevant data
with open(csv_file_path, mode="r", encoding="utf-8") as csv_file:
    reader = csv.DictReader(csv_file)

    for row in reader:
        imdb_id = row.get("imdb_id")
        popularity = row.get("popularity")
        keywords = row.get("keywords")
        overview = row.get("overview")
        runtime = row.get("runtime")

        # Skip rows with missing data or if imdb_id is not found in ratings table
        if not imdb_id or imdb_id not in imdb_to_film_id or not popularity or not keywords:
            continue

        film_id = imdb_to_film_id[imdb_id]

        print(f"Inserting film_id {film_id} (IMDb ID: {imdb_id}) with popularity {popularity} and keywords {keywords}")

        # Insert into the movie_metadata table, ensuring the film_id is used as a reference
        cur.execute(
            """
            INSERT INTO movie_metadata (id, imdb_id, popularity, keywords, overview, runtime)
            VALUES (%s, %s, %s, %s, %s, %s)
            ON CONFLICT (id) DO NOTHING;
            """,
            (film_id, imdb_id, popularity, keywords, overview, runtime)
        )
        conn.commit()

print("Closing database connection...")
cur.close()
conn.close()
print("Movie metadata populated successfully.")


After retrieving the initial set of data, and investigating the overall structure of the unnormalized database, we will be using the following DDL script to structure the normalized dataset.

This file is conatined in the `ddl` folder. Running this SQL code is NOT required as the following Python code handles it.


In [None]:
-- Drop tables if they exist
DROP TABLE IF EXISTS people CASCADE;
DROP TABLE IF EXISTS films CASCADE;
DROP TABLE IF EXISTS starships CASCADE;
DROP TABLE IF EXISTS vehicles CASCADE;
DROP TABLE IF EXISTS species CASCADE;
DROP TABLE IF EXISTS planets CASCADE;

DROP TABLE IF EXISTS people_films CASCADE;
DROP TABLE IF EXISTS people_species CASCADE;
DROP TABLE IF EXISTS people_starships CASCADE;
DROP TABLE IF EXISTS people_vehicles CASCADE;
DROP TABLE IF EXISTS films_species CASCADE;
DROP TABLE IF EXISTS films_starships CASCADE;
DROP TABLE IF EXISTS films_vehicles CASCADE;
DROP TABLE IF EXISTS films_planets CASCADE;

DROP TABLE IF EXISTS rating_providers CASCADE;
DROP TABLE IF EXISTS ratings CASCADE;

DROP TABLE IF EXISTS keywords CASCADE;
DROP TABLE IF EXISTS movie_metadata CASCADE;

-- Create tables
CREATE TABLE people (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    birth_year VARCHAR(20),
    eye_color VARCHAR(50),
    gender VARCHAR(20),
    hair_color VARCHAR(50),
    height VARCHAR(20),
    mass VARCHAR(20),
    skin_color VARCHAR(50),
    homeworld VARCHAR(255),
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

CREATE TABLE films (
    id SERIAL PRIMARY KEY,
    title VARCHAR(255) NOT NULL,
    episode_id INTEGER NOT NULL,
    opening_crawl TEXT,
    director VARCHAR(255),
    producer VARCHAR(255),
    release_date DATE,
    imdb_id VARCHAR(50),  -- Added IMDb ID
    overview VARCHAR(1000),
    runtime INTEGER,
    popularity NUMERIC,
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

CREATE TABLE starships (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    model VARCHAR(255),
    starship_class VARCHAR(255),
    manufacturer VARCHAR(255),
    cost_in_credits VARCHAR(50),
    length VARCHAR(50),
    crew VARCHAR(50),
    passengers VARCHAR(50),
    max_atmosphering_speed VARCHAR(50),
    hyperdrive_rating VARCHAR(50),
    MGLT VARCHAR(50),
    cargo_capacity VARCHAR(50),
    consumables VARCHAR(255),
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

CREATE TABLE vehicles (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    model VARCHAR(255),
    vehicle_class VARCHAR(255),
    manufacturer VARCHAR(255),
    length VARCHAR(50),
    cost_in_credits VARCHAR(50),
    crew VARCHAR(50),
    passengers VARCHAR(50),
    max_atmosphering_speed VARCHAR(50),
    cargo_capacity VARCHAR(50),
    consumables VARCHAR(255),
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

CREATE TABLE species (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    classification VARCHAR(255),
    designation VARCHAR(50),
    average_height VARCHAR(50),
    average_lifespan VARCHAR(50),
    eye_colors VARCHAR(255),
    hair_colors VARCHAR(255),
    skin_colors VARCHAR(255),
    language VARCHAR(255),
    homeworld VARCHAR(255),
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

CREATE TABLE planets (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    diameter VARCHAR(50),
    rotation_period VARCHAR(50),
    orbital_period VARCHAR(50),
    gravity VARCHAR(50),
    population VARCHAR(50),
    climate VARCHAR(255),
    terrain VARCHAR(255),
    surface_water VARCHAR(50),
    url VARCHAR(255) UNIQUE NOT NULL,
    created TIMESTAMP,
    edited TIMESTAMP
);

-- Relationship Tables for Many-to-Many Relationships with IDs
CREATE TABLE people_films (
    person_url VARCHAR(255) REFERENCES people(url) ON DELETE CASCADE,
    film_url VARCHAR(255) REFERENCES films(url) ON DELETE CASCADE,
    PRIMARY KEY (person_url, film_url)
);

CREATE TABLE people_species (
    person_url VARCHAR(255) REFERENCES people(url) ON DELETE CASCADE,
    species_url VARCHAR(255) REFERENCES species(url) ON DELETE CASCADE,
    PRIMARY KEY (person_url, species_url)
);

CREATE TABLE people_starships (
    person_url VARCHAR(255) REFERENCES people(url) ON DELETE CASCADE,
    starship_url VARCHAR(255) REFERENCES starships(url) ON DELETE CASCADE,
    PRIMARY KEY (person_url, starship_url)
);

CREATE TABLE people_vehicles (
    person_url VARCHAR(255) REFERENCES people(url) ON DELETE CASCADE,
    vehicle_url VARCHAR(255) REFERENCES vehicles(url) ON DELETE CASCADE,
    PRIMARY KEY (person_url, vehicle_url)
);

CREATE TABLE films_species (
    film_url VARCHAR(255) REFERENCES films(url) ON DELETE CASCADE,
    species_url VARCHAR(255) REFERENCES species(url) ON DELETE CASCADE,
    PRIMARY KEY (film_url, species_url)
);

CREATE TABLE films_starships (
    film_url VARCHAR(255) REFERENCES films(url) ON DELETE CASCADE,
    starship_url VARCHAR(255) REFERENCES starships(url) ON DELETE CASCADE,
    PRIMARY KEY (film_url, starship_url)
);

CREATE TABLE films_vehicles (
    film_url VARCHAR(255) REFERENCES films(url) ON DELETE CASCADE,
    vehicle_url VARCHAR(255) REFERENCES vehicles(url) ON DELETE CASCADE,
    PRIMARY KEY (film_url, vehicle_url)
);

CREATE TABLE films_planets (
    film_url VARCHAR(255) REFERENCES films(url) ON DELETE CASCADE,
    planet_url TEXT REFERENCES planets(url) ON DELETE CASCADE,
    PRIMARY KEY (film_url, planet_url)
);

-- Table to store rating providers (e.g., Internet Movie Database, Rotten Tomatoes, Metacritic)
CREATE TABLE rating_providers (
    id SERIAL PRIMARY KEY,
    name TEXT UNIQUE NOT NULL
);

-- Table to store movie ratings
CREATE TABLE ratings (
    id SERIAL PRIMARY KEY,
    film_id INT NOT NULL,
    rating_provider_id INT NOT NULL,
    rating_value NUMERIC NOT NULL,  -- Rating value is now numeric (e.g., 8.4, 94, 78)
    FOREIGN KEY (rating_provider_id) REFERENCES rating_providers(id) ON DELETE CASCADE,
    FOREIGN KEY (film_id) REFERENCES films(id) ON DELETE CASCADE
);

-- Table to store keywords
CREATE TABLE keywords (
    id SERIAL PRIMARY KEY,
    keyword TEXT UNIQUE NOT NULL
);

-- Table to store additional movie metadata
CREATE TABLE movie_metadata (
    id SERIAL PRIMARY KEY,
    film_id INT NOT NULL,
    keyword_id INT NOT NULL,
    FOREIGN KEY (film_id) REFERENCES films(id) ON DELETE CASCADE,
    FOREIGN KEY (keyword_id) REFERENCES keywords(id) ON DELETE CASCADE
);


We make use of batch queries and inserts to take data from our unnormalized database and place it into the normalized database, through Python.

1. **Separated Entities into Dedicated Tables**

   - Instead of storing all data in a single `films` table, individual tables were created for:
     - `people`, `planets`, `starships`, `vehicles`, `species`.

2. **Created Relationship (Junction) Tables**

   - Many-to-many relationships were **extracted** into separate linking tables:
     - `people_films`, `films_species`, `films_starships`, `films_vehicles`, etc.

3. **Extracted Ratings into a Separate Table**

   - `ratings` were moved to a new table with a **consistent rating scale** (0-100).
   - `rating_providers` table was introduced to avoid repeating provider names.

4. **Normalized Keywords for Movie Metadata**
   - `keywords` were extracted into a separate table to avoid duplication.
   - A linking table associates `films` with `keywords`.


In [None]:
import psycopg2

from dotenv import load_dotenv
import os

# Load environment variables from the .env file
load_dotenv()

starwars_db_conn = psycopg2.connect(
    dbname=os.getenv("UNNORMALIZED_DB_NAME"),
    user= os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    host=os.getenv("DB_HOST"),
    port=os.getenv("DB_PORT")
)

normalized_db_conn = psycopg2.connect(
    dbname=os.getenv("NORMALIZED_DB_NAME"),
    user= os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    host=os.getenv("DB_HOST"),
    port=os.getenv("DB_PORT")
)

SCHEMA_FILE = "ddl/04_final_schema.sql"

try:
    cursor = normalized_db_conn.cursor()

    # Read the SQL schema file
    with open(SCHEMA_FILE, "r", encoding="utf-8") as file:
        sql_script = file.read()

    # Execute the SQL script
    cursor.execute(sql_script)
    normalized_db_conn.commit()

    print("Normalized Schema executed successfully.")

except Exception as e:
    print(f"Error executing schema: {e}")

# Create a cursor for both databases
starwars_db_cursor = starwars_db_conn.cursor()
normalized_db_cursor = normalized_db_conn.cursor()

# ============================================================
# PHASE 1: Migrating 'films' Data from starwars_db to normalized_starwars_db
# ============================================================

# Query to get data from the 'films' table and the 'popularity' from 'movie_metadata' in starwars_db
starwars_db_cursor.execute("""
    SELECT 
        f.id, f.title, f.episode_id, f.opening_crawl, f.director, f.producer, 
        f.release_date, f.url, f.created, f.edited, m.popularity, m.imdb_id
    FROM films f
    LEFT JOIN movie_metadata m ON f.id = m.id
""")

# Fetch all rows from the query result
films = starwars_db_cursor.fetchall()

# Insert data into the 'films' table in normalized_starwars_db
for film in films:
    id, title, episode_id, opening_crawl, director, producer, release_date, url, created, edited, popularity, imdb_id = film
    
    # Prepare the SQL statement to insert into the normalized films table
    insert_query = """
        INSERT INTO films (id, title, episode_id, opening_crawl, director, producer, 
                           release_date, url, created, edited, popularity, imdb_id)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    
    # Execute the insertion in the normalized_starwars_db
    normalized_db_cursor.execute(insert_query, (id, title, episode_id, opening_crawl, director, producer, 
                                                release_date, url, created, edited, popularity, imdb_id))

# Commit the transaction and close the connections
normalized_db_conn.commit()

# ============================================================
# End of PHASE 1
# ============================================================

# ============================================================
# PHASE 2: Migrating 'people', 'starships', 'vehicles', 'species', and 'planets' Data from starwars_db to normalized_starwars_db
# ============================================================

# People Table
starwars_db_cursor.execute("SELECT id, name, birth_year, eye_color, gender, hair_color, height, mass, skin_color, homeworld, url, created, edited FROM people")
people = starwars_db_cursor.fetchall()
for person in people:
    id, name, birth_year, eye_color, gender, hair_color, height, mass, skin_color, homeworld, url, created, edited = person
    insert_query = """
        INSERT INTO people (id, name, birth_year, eye_color, gender, hair_color, height, mass, skin_color, homeworld, url, created, edited)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    normalized_db_cursor.execute(insert_query, (id, name, birth_year, eye_color, gender, hair_color, height, mass, skin_color, homeworld, url, created, edited))

# Starships Table
starwars_db_cursor.execute("SELECT id, name, model, starship_class, manufacturer, cost_in_credits, length, crew, passengers, max_atmosphering_speed, hyperdrive_rating, MGLT, cargo_capacity, consumables, url, created, edited FROM starships")
starships = starwars_db_cursor.fetchall()
for starship in starships:
    # Unpack the data into corresponding variables
    id, name, model, starship_class, manufacturer, cost_in_credits, length, crew, passengers, max_atmosphering_speed, hyperdrive_rating, MGLT, cargo_capacity, consumables, url, created, edited = starship
    
    # Prepare the SQL statement to insert into the normalized starships table
    insert_query = """
        INSERT INTO starships (id, name, model, starship_class, manufacturer, cost_in_credits, length, crew, passengers, max_atmosphering_speed, hyperdrive_rating, MGLT, cargo_capacity, consumables, url, created, edited)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    
    # Execute the insertion in the normalized_starwars_db
    normalized_db_cursor.execute(insert_query, (id, name, model, starship_class, manufacturer, cost_in_credits, length, crew, passengers, max_atmosphering_speed, hyperdrive_rating, MGLT, cargo_capacity, consumables, url, created, edited))

# Vehicles Table
starwars_db_cursor.execute("SELECT id, name, model, vehicle_class, manufacturer, length, cost_in_credits, crew, passengers, max_atmosphering_speed, cargo_capacity, consumables, url, created, edited FROM vehicles")
vehicles = starwars_db_cursor.fetchall()
for vehicle in vehicles:
    id, name, model, vehicle_class, manufacturer, length, cost_in_credits, crew, passengers, max_atmosphering_speed, cargo_capacity, consumables, url, created, edited = vehicle
    insert_query = """
        INSERT INTO vehicles (id, name, model, vehicle_class, manufacturer, length, cost_in_credits, crew, passengers, max_atmosphering_speed, cargo_capacity, consumables, url, created, edited)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    normalized_db_cursor.execute(insert_query, (id, name, model, vehicle_class, manufacturer, length, cost_in_credits, crew, passengers, max_atmosphering_speed, cargo_capacity, consumables, url, created, edited))

# Species Table
starwars_db_cursor.execute("SELECT id, name, classification, designation, average_height, average_lifespan, eye_colors, hair_colors, skin_colors, language, homeworld, url, created, edited FROM species")
species = starwars_db_cursor.fetchall()
for spec in species:
    id, name, classification, designation, average_height, average_lifespan, eye_colors, hair_colors, skin_colors, language, homeworld, url, created, edited = spec
    insert_query = """
        INSERT INTO species (id, name, classification, designation, average_height, average_lifespan, eye_colors, hair_colors, skin_colors, language, homeworld, url, created, edited)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    normalized_db_cursor.execute(insert_query, (id, name, classification, designation, average_height, average_lifespan, eye_colors, hair_colors, skin_colors, language, homeworld, url, created, edited))

# Planets Table
starwars_db_cursor.execute("SELECT id, name, diameter, rotation_period, orbital_period, gravity, population, climate, terrain, surface_water, url, created, edited FROM planets")
planets = starwars_db_cursor.fetchall()
for planet in planets:
    id, name, diameter, rotation_period, orbital_period, gravity, population, climate, terrain, surface_water, url, created, edited = planet
    insert_query = """
        INSERT INTO planets (id, name, diameter, rotation_period, orbital_period, gravity, population, climate, terrain, surface_water, url, created, edited)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """
    normalized_db_cursor.execute(insert_query, (id, name, diameter, rotation_period, orbital_period, gravity, population, climate, terrain, surface_water, url, created, edited))

# Commit the transaction
normalized_db_conn.commit()

# ============================================================
# End of PHASE 2
# ============================================================

# ============================================================
# PHASE 3: Migrating Relationships Data and Extracting IDs from URLs for Relationship Tables
# ============================================================

# People_Films Relationship
starwars_db_cursor.execute("SELECT person_url, film_url FROM people_films")
people_films = starwars_db_cursor.fetchall()
for person_url, film_url in people_films:
    
    insert_query = """
        INSERT INTO people_films (person_url, film_url)
        VALUES (%s, %s)
    """
    normalized_db_cursor.execute(insert_query, (person_url, film_url))

# People_Species Relationship
starwars_db_cursor.execute("SELECT person_url, species_url FROM people_species")
people_species = starwars_db_cursor.fetchall()
for person_url, species_url in people_species:
    
    insert_query = """
        INSERT INTO people_species (person_url, species_url)
        VALUES (%s, %s)
    """
    normalized_db_cursor.execute(insert_query, (person_url, species_url))

# People_Starships Relationship
starwars_db_cursor.execute("SELECT person_url, starship_url FROM people_starships")
people_starships = starwars_db_cursor.fetchall()
for person_url, starship_url in people_starships:
    
    insert_query = """
        INSERT INTO people_starships (person_url, starship_url)
        VALUES (%s, %s)
    """
    normalized_db_cursor.execute(insert_query, (person_url, starship_url))

# People_Vehicles Relationship
starwars_db_cursor.execute("SELECT person_url, vehicle_url FROM people_vehicles")
people_vehicles = starwars_db_cursor.fetchall()
for person_url, vehicle_url in people_vehicles:
    
    insert_query = """
        INSERT INTO people_vehicles (person_url, vehicle_url)
        VALUES (%s, %s)
    """
    normalized_db_cursor.execute(insert_query, (person_url, vehicle_url))

# Films_Species Relationship
starwars_db_cursor.execute("SELECT film_url, species_url FROM films_species")
films_species = starwars_db_cursor.fetchall()
for film_url, species_url in films_species:
    
    insert_query = """
        INSERT INTO films_species (film_url, species_url)
        VALUES (%s, %s)
    """
    normalized_db_cursor.execute(insert_query, (film_url, species_url))

# Films_Starships Relationship
starwars_db_cursor.execute("SELECT film_url, starship_url FROM films_starships")
films_starships = starwars_db_cursor.fetchall()
for film_url, starship_url in films_starships:
    
    insert_query = """
        INSERT INTO films_starships (film_url, starship_url)
        VALUES (%s, %s)
    """
    normalized_db_cursor.execute(insert_query, (film_url, starship_url))

# Films_Vehicles Relationship
starwars_db_cursor.execute("SELECT film_url, vehicle_url FROM films_vehicles")
films_vehicles = starwars_db_cursor.fetchall()
for film_url, vehicle_url in films_vehicles:
    
    insert_query = """
        INSERT INTO films_vehicles (film_url, vehicle_url)
        VALUES (%s, %s)
    """
    normalized_db_cursor.execute(insert_query, (film_url, vehicle_url))

# Films_Planets Relationship
starwars_db_cursor.execute("SELECT film_url, planet_url FROM films_planets")
films_planets = starwars_db_cursor.fetchall()
for film_url, planet_url in films_planets:
    
    insert_query = """
        INSERT INTO films_planets (film_url, planet_url)
        VALUES (%s, %s)
    """
    normalized_db_cursor.execute(insert_query, (film_url, planet_url))

# Commit the transaction
normalized_db_conn.commit()

# ============================================================
# End of PHASE 3
# ============================================================

# ============================================================
# PHASE 4: Migrating Ratings and Rating Providers Data from starwars_db to normalized_starwars_db
# ============================================================

# Step 1: Fetch distinct rating provider names
starwars_db_cursor.execute("SELECT DISTINCT name FROM rating_providers WHERE name IS NOT NULL")
rating_providers = starwars_db_cursor.fetchall()

# Function to convert provider name to a valid column name
def sanitize_column_name(name):
    return name.strip().lower().replace(' ', '_').replace('-', '_') + '_rating'

# Step 2: Add rating columns to the `films` table
for provider in rating_providers:
    column_name = sanitize_column_name(provider[0])
    alter_query = f"""
        ALTER TABLE films
        ADD COLUMN IF NOT EXISTS {column_name} NUMERIC
    """
    normalized_db_cursor.execute(alter_query)

# Commit to save the schema changes
normalized_db_conn.commit()

# Step 3: Fetch rating data from source
starwars_db_cursor.execute("""
    SELECT r.film_id, rp.name, r.rating_value
    FROM ratings r
    JOIN rating_providers rp ON r.rating_provider_id = rp.id
""")

ratings = starwars_db_cursor.fetchall()

# Step 4: Normalize and insert ratings into `films` table
for film_id, provider_name, rating_value in ratings:
    column_name = sanitize_column_name(provider_name)

    # Normalize the rating value
    if isinstance(rating_value, str):
        rating_value = rating_value.strip()
        if '%' in rating_value:
            rating_value = float(rating_value.strip('%'))
        elif '/' in rating_value:
            numerator, denominator = rating_value.split('/')
            rating_value = float(numerator)
            denominator = float(denominator)
            if denominator == 10:
                rating_value *= 10
        elif '.' in rating_value:
            rating_value = float(rating_value)
            if rating_value < 1:
                rating_value *= 100
            else:
                rating_value *= 10
        else:
            rating_value = float(rating_value)

    # Clamp to range 0â€“100
    rating_value = min(max(rating_value, 0), 100)

    # Update the rating value in the films table
    update_query = f"""
        UPDATE films
        SET {column_name} = %s
        WHERE id = %s
    """
    normalized_db_cursor.execute(update_query, (rating_value, film_id))

# Final commit
normalized_db_conn.commit()

# ============================================================
# End of PHASE 4
# ============================================================

# ============================================================
# PHASE 5: Migrating Movie Metadata to Normalized Structure
# ============================================================

# Fetch all movie metadata from the old movie_metadata table
starwars_db_cursor.execute("""
    SELECT id, popularity, keywords, overview, runtime
    FROM movie_metadata
""")

movie_metadata_rows = starwars_db_cursor.fetchall()

# Insert keywords into the normalized `keywords` table and film details into the `films` table
for metadata in movie_metadata_rows:
    film_id, popularity, keywords, overview, runtime = metadata

    # Insert `overview` and `runtime` into the `films` table
    normalized_db_cursor.execute("""
        UPDATE films 
        SET overview = %s, runtime = %s
        WHERE id = %s
    """, (overview, runtime, film_id))

    # Split the keywords into a list (assuming they are comma-separated)
    keyword_list = keywords.split(',') if keywords else []
    
    for keyword in keyword_list:
        # Strip whitespace and handle duplicates
        keyword = keyword.strip()
        
        if keyword:
            # Check if the keyword already exists in the `keywords` table
            normalized_db_cursor.execute("SELECT id FROM keywords WHERE keyword = %s", (keyword,))
            keyword_row = normalized_db_cursor.fetchone()
            
            if keyword_row:
                keyword_id = keyword_row[0]
            else:
                # Insert new keyword if it doesn't exist
                normalized_db_cursor.execute("INSERT INTO keywords (keyword) VALUES (%s) RETURNING id", (keyword,))
                keyword_id = normalized_db_cursor.fetchone()[0]

            # Insert into the new movie_metadata table
            normalized_db_cursor.execute("""
                INSERT INTO movie_metadata (film_id, keyword_id)
                VALUES (%s, %s)
            """, (film_id, keyword_id))

# Commit the changes to the normalized database
normalized_db_conn.commit()

# ============================================================
# End of PHASE 5
# ============================================================


# Close the cursors and connections
starwars_db_cursor.close()
normalized_db_cursor.close()
starwars_db_conn.close()
normalized_db_conn.close()

print("Films table populated successfully in normalized_starwars_db!")


# Data Files and Scripts

Once the normalized database has been structured and populated with data, we now proceed to export the data into CSVs to be imported into our Neo4j database.


In [None]:
import os
from dotenv import load_dotenv
import psycopg2
import csv

# Load environment variables from the .env file
load_dotenv()

# Connect to PostgreSQL
conn = psycopg2.connect(
    dbname=os.getenv("NORMALIZED_DB_NAME"),
    user= os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    host=os.getenv("DB_HOST"),
    port=os.getenv("DB_PORT")
)

# Create a cursor to interact with the database
cursor = conn.cursor()

# Define a function to fetch data and write it to CSV
def export_to_csv(query, filename):
    cursor.execute(query)
    rows = cursor.fetchall()
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        # Write headers based on the query column names
        writer.writerow([desc[0] for desc in cursor.description])
        writer.writerows(rows)
    print(f"Data exported to {filename}")

# ===========================
# Export Films Table
# ===========================
films_query = """
SELECT *
FROM films;
"""
export_to_csv(films_query, 'migration-csv-tables/films.csv')

# ===========================
# Export People Table
# ===========================
people_query = """
SELECT *
FROM people;
"""
export_to_csv(people_query, 'migration-csv-tables/people.csv')

# ===========================
# Export Starships Table
# ===========================
starships_query = """
SELECT *
FROM starships;
"""
export_to_csv(starships_query, 'migration-csv-tables/starships.csv')

# ===========================
# Export Vehicles Table
# ===========================
vehicles_query = """
SELECT *
FROM vehicles;
"""
export_to_csv(vehicles_query, 'migration-csv-tables/vehicles.csv')

# ===========================
# Export Species Table
# ===========================
species_query = """
SELECT *
FROM species;
"""
export_to_csv(species_query, 'migration-csv-tables/species.csv')

# ===========================
# Export Planets Table
# ===========================
planets_query = """
SELECT *
FROM planets;
"""
export_to_csv(planets_query, 'migration-csv-tables/planets.csv')

# ===========================
# Export Keywords Table
# ===========================
keywords_query = """
SELECT id, keyword FROM keywords;
"""
export_to_csv(keywords_query, 'migration-csv-tables/keywords.csv')

# ===========================
# Export Movie Metadata Table
# ===========================
movie_metadata_query = """
SELECT film_id, keyword_id FROM movie_metadata;
"""
export_to_csv(movie_metadata_query, 'migration-csv-tables/movie_metadata.csv')

# ===========================
# Export Rating Providers Table
# ===========================
rating_providers_query = """
SELECT id, name FROM rating_providers;
"""
export_to_csv(rating_providers_query, 'migration-csv-tables/rating_providers.csv')

# ===========================
# Export Ratings Table
# ===========================
ratings_query = """
SELECT film_id, rating_provider_id, rating_value FROM ratings;
"""
export_to_csv(ratings_query, 'migration-csv-tables/ratings.csv')

def export_aggregated_relation(query, filename):
    cursor.execute(query)
    rows = cursor.fetchall()
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow([desc[0] for desc in cursor.description])
        writer.writerows(rows)
    print(f"Exported: {filename}")

# -----------------------
# People â†’ Films
# -----------------------
export_aggregated_relation("""
    SELECT person_url, string_agg(film_url, ',') AS films
    FROM people_films
    GROUP BY person_url
""", "migration-csv-tables/people_films_array.csv")

# -----------------------
# People â†’ Species
# -----------------------
export_aggregated_relation("""
    SELECT person_url, string_agg(species_url, ',') AS species
    FROM people_species
    GROUP BY person_url
""", "migration-csv-tables/people_species_array.csv")

# -----------------------
# People â†’ Starships
# -----------------------
export_aggregated_relation("""
    SELECT person_url, string_agg(starship_url, ',') AS starships
    FROM people_starships
    GROUP BY person_url
""", "migration-csv-tables/people_starships_array.csv")

# -----------------------
# People â†’ Vehicles
# -----------------------
export_aggregated_relation("""
    SELECT person_url, string_agg(vehicle_url, ',') AS vehicles
    FROM people_vehicles
    GROUP BY person_url
""", "migration-csv-tables/people_vehicles_array.csv")

# -----------------------
# Films â†’ Species
# -----------------------
export_aggregated_relation("""
    SELECT film_url, string_agg(species_url, ',') AS species
    FROM films_species
    GROUP BY film_url
""", "migration-csv-tables/films_species_array.csv")

# -----------------------
# Films â†’ Starships
# -----------------------
export_aggregated_relation("""
    SELECT film_url, string_agg(starship_url, ',') AS starships
    FROM films_starships
    GROUP BY film_url
""", "migration-csv-tables/films_starships_array.csv")

# -----------------------
# Films â†’ Vehicles
# -----------------------
export_aggregated_relation("""
    SELECT film_url, string_agg(vehicle_url, ',') AS vehicles
    FROM films_vehicles
    GROUP BY film_url
""", "migration-csv-tables/films_vehicles_array.csv")

# -----------------------
# Films â†’ Planets
# -----------------------
export_aggregated_relation("""
    SELECT film_url, string_agg(planet_url, ',') AS planets
    FROM films_planets
    GROUP BY film_url
""", "migration-csv-tables/films_planets_array.csv")

# -----------------------
# Films â†’ Keywords
# -----------------------
export_aggregated_relation("""
    SELECT f.url AS film_url, string_agg(k.keyword, ',') AS keywords
    FROM movie_metadata mm
    JOIN films f ON mm.film_id = f.id
    JOIN keywords k ON mm.keyword_id = k.id
    GROUP BY f.url
""", "migration-csv-tables/films_keywords_array.csv")

# Close the cursor and connection
cursor.close()
conn.close()

print("All data exported successfully!")


We then proceed to write a script that enables us to make use of the exported CSV files and import into our Neo4j database.


In [None]:
from neo4j import GraphDatabase
import os
import csv
from dotenv import load_dotenv

# Load environment variables from the .env file
load_dotenv()

# Path to CSVs
CSV_DIR = "migration-csv-tables"

driver = GraphDatabase.driver(
    os.getenv("NEO4J_URI"),
    auth=(os.getenv("NEO4J_USER"), os.getenv("NEO4J_PASSWORD"))
)

# Phase 1: Create nodes from CSVs with dynamic fields
def create_nodes(tx, label, csv_file):
    with open(os.path.join(CSV_DIR, csv_file), newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        headers = reader.fieldnames
        for row in reader:
            props = {k: row[k] for k in headers}
            prop_str = ", ".join(f"{k}: ${k}" for k in headers)
            query = f"MERGE (n:{label} {{ {prop_str} }})"
            tx.run(query, **props)

# Phase 2: Add arrays as node attributes using dynamic detection
def add_array_property(tx, label, csv_file):
    with open(os.path.join(CSV_DIR, csv_file), newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        headers = reader.fieldnames
        if len(headers) != 2:
            raise ValueError(f"Expected exactly 2 columns in {csv_file}, got {headers}")
        id_field, array_field = headers

        for row in reader:
            identifier = row[id_field]
            array_data = row[array_field].split(",") if row[array_field] else []
            query = f"""
                MATCH (n:{label} {{url: $id}})
                SET n.{array_field} = $items
            """
            tx.run(query, id=identifier, items=array_data)

with driver.session() as session:

    # ========= Phase 1: Import Nodes =========
    print("ðŸ“¥ Importing node data...")

    session.execute_write(create_nodes, "Film", "films.csv")
    session.execute_write(create_nodes, "Person", "people.csv")
    session.execute_write(create_nodes, "Starship", "starships.csv")
    session.execute_write(create_nodes, "Vehicle", "vehicles.csv")
    session.execute_write(create_nodes, "Species", "species.csv")
    session.execute_write(create_nodes, "Planet", "planets.csv")

    print("âœ… Nodes imported!")

    # ========= Phase 2: Add arrays =========
    print("ðŸ”— Adding array attributes...")

    session.execute_write(add_array_property, "Person", "people_films_array.csv")
    session.execute_write(add_array_property, "Person", "people_species_array.csv")
    session.execute_write(add_array_property, "Person", "people_starships_array.csv")
    session.execute_write(add_array_property, "Person", "people_vehicles_array.csv")

    session.execute_write(add_array_property, "Film", "films_species_array.csv")
    session.execute_write(add_array_property, "Film", "films_starships_array.csv")
    session.execute_write(add_array_property, "Film", "films_vehicles_array.csv")
    session.execute_write(add_array_property, "Film", "films_planets_array.csv")
    session.execute_write(add_array_property, "Film", "films_keywords_array.csv")

    print("âœ… Array attributes added!")

driver.close()
print("ðŸš€ Neo4j import complete.")


# Queries

## A) Find the total number of films, total number of planets, total number of species in the database.


In [1]:
import os
from neo4j import GraphDatabase
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Neo4j driver setup
driver = GraphDatabase.driver(
    os.getenv("NEO4J_URI"),
    auth=(os.getenv("NEO4J_USER"), os.getenv("NEO4J_PASSWORD"))
)

In [None]:
// Total number of films
MATCH (f:Film)
RETURN count(f) AS total_films;

// Total number of planets
MATCH (p:Planet)
RETURN count(p) AS total_planets;

// Total number of species
MATCH (s:Species)
RETURN count(s) AS total_species;


In [5]:
QUERY_FILE = "queries/01_total_counts.cypher"

# Load and split the queries (assumes each query ends with a semicolon)
with open(QUERY_FILE, 'r', encoding='utf-8') as file:
    contents = file.read()
    queries = [q.strip() for q in contents.split(';') if q.strip()]

with driver.session() as session:
    for i, query in enumerate(queries):
        result = session.run(query)
        for record in result:
            print(record)

<Record total_films=6>
<Record total_planets=60>
<Record total_species=37>


## B) Find all planets associated with a sample film.


In [None]:
MATCH (f:Film {title: "A New Hope"})
WITH f, f.planets AS planet_urls
UNWIND planet_urls AS planet_url
MATCH (p:Planet {url: planet_url})
RETURN p.name AS planet_name;


In [6]:
QUERY_FILE = "queries/02_planets_for_sample_film.cypher"

with open(QUERY_FILE, 'r', encoding='utf-8') as file:
    cypher = file.read()

with driver.session() as session:
    result = session.run(cypher)
    for record in result:
        print(record)

<Record planet_name='Tatooine'>
<Record planet_name='Alderaan'>
<Record planet_name='Yavin IV'>


## C) Find all films that are released after the year 1980 and has an imdb-rating of at least 5.


In [None]:
MATCH (f:Film)
WHERE f.release_date > "1980-01-01" 
  AND toInteger(f.internet_movie_database_rating) >= 50
RETURN f.title, f.release_date, f.internet_movie_database_rating;


In [7]:
QUERY_FILE = "queries/03_films_after_1980_with_rating_5.cypher"

with open(QUERY_FILE, 'r', encoding='utf-8') as file:
    cypher = file.read()

with driver.session() as session:
    result = session.run(cypher)
    for record in result:
        print(record)

<Record f.title='The Empire Strikes Back' f.release_date='1980-05-17' f.internet_movie_database_rating='87.0'>
<Record f.title='Return of the Jedi' f.release_date='1983-05-25' f.internet_movie_database_rating='83.0'>
<Record f.title='The Phantom Menace' f.release_date='1999-05-19' f.internet_movie_database_rating='65.0'>
<Record f.title='Attack of the Clones' f.release_date='2002-05-16' f.internet_movie_database_rating='66.0'>
<Record f.title='Revenge of the Sith' f.release_date='2005-05-19' f.internet_movie_database_rating='76.0'>


## D) Find all films with two vehicles of your choice. List films that may be associated with either of the vehicles (not necessarily both).


In [None]:
MATCH (f:Film)
MATCH (v:Vehicle)
WHERE v.name IN ['Sith speeder', 'Koro-2 Exodrive airspeeder'] 
AND v.url IN f.vehicles
RETURN f.title, f.release_date, COLLECT(v.name) AS vehicles;


In [8]:
QUERY_FILE = "queries/04_films_with_vehicles.cypher"

with open(QUERY_FILE, 'r', encoding='utf-8') as file:
    cypher = file.read()

with driver.session() as session:
    result = session.run(cypher)
    for record in result:
        print(record)

<Record f.title='The Phantom Menace' f.release_date='1999-05-19' vehicles=['Sith speeder']>
<Record f.title='Attack of the Clones' f.release_date='2002-05-16' vehicles=['Koro-2 Exodrive airspeeder']>


## E) Find the film with largest number of keywords.


In [None]:
MATCH (f:Film)
WITH f, size(f.keywords) AS num_keywords
ORDER BY num_keywords DESC
LIMIT 1
RETURN f.title, num_keywords;


In [9]:
QUERY_FILE = "queries/05_film_with_largest_keywords.cypher"

with open(QUERY_FILE, 'r', encoding='utf-8') as file:
    cypher = file.read()

with driver.session() as session:
    result = session.run(cypher)
    for record in result:
        print(record)

<Record f.title='A New Hope' num_keywords=17>


## F) Build full text search index to query movie overview.


In [None]:
CREATE FULLTEXT INDEX filmOverviewIndex FOR (n:Film) ON EACH [n.overview];

In [13]:
QUERY_FILE = "queries/06_create_fulltext_index.cypher"

with open(QUERY_FILE, 'r', encoding='utf-8') as file:
    queries = [q.strip() for q in file.read().split(';') if q.strip()]

with driver.session() as session:
    for query in queries:
        session.run(query)

print("âœ… Fulltext index dropped if it existed, and created.")

âœ… Fulltext index dropped if it existed, and created.


## G) Write a full text search query and search for some sample text of your choice.


In [None]:
CALL db.index.fulltext.queryNodes("filmOverviewIndex", "galaxy")
YIELD node, score
RETURN node.title AS title, node.overview AS overview, score
ORDER BY score DESC
LIMIT 10;


In [15]:
QUERY_FILE = "queries/07_fulltext_search_query.cypher"

with open(QUERY_FILE, 'r', encoding='utf-8') as file:
    cypher = file.read()

with driver.session() as session:
    result = session.run(cypher)
    for record in result:
        print(record)

driver.close()

<Record title='Attack of the Clones' overview='Following an assassination attempt on Senator PadmÃƒÂ© Amidala, Jedi Knights Anakin Skywalker and Obi-Wan Kenobi investigate a mysterious plot that could change the galaxy forever.' score=0.7931073307991028>
