In [None]:
# create_load.ipynb

# --- 1. Connect to PostgreSQL ---
from sqlalchemy import create_engine
import pandas as pd

user = "postgres"
password = "Sathwickkiran%4012"
host = "127.0.0.1"
port = "5432"
database = "music"

DATABASE_URL = f"postgresql+psycopg://{user}:{password}@{host}:{port}/{database}"
engine = create_engine(DATABASE_URL)
engine.connect()
print("Connected to database successfully!")


In [None]:
from sqlalchemy import text, MetaData
metadata = MetaData()
with engine.connect() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS artist (
            artist_id VARCHAR(30) PRIMARY KEY,
            name VARCHAR(150)
        );

        CREATE TABLE IF NOT EXISTS album (
            album_id VARCHAR(30) PRIMARY KEY,
            title VARCHAR(150),
            artist_id VARCHAR(30),
            FOREIGN KEY (artist_id) REFERENCES artist(artist_id) ON DELETE CASCADE
        );

        CREATE TABLE IF NOT EXISTS genre (
            genre_id VARCHAR(30) PRIMARY KEY,
            name VARCHAR(50)
        );

        CREATE TABLE IF NOT EXISTS media_type (
            media_type_id VARCHAR(30) PRIMARY KEY,
            name VARCHAR(30)
        );

        CREATE TABLE IF NOT EXISTS track (
            track_id VARCHAR(30) PRIMARY KEY,
            name VARCHAR(250),
            album_id VARCHAR(30),
            media_type_id VARCHAR(30),
            genre_id VARCHAR(30),
            composer VARCHAR(250),
            milliseconds BIGINT,
            bytes INTEGER,
            unit_price NUMERIC,
            FOREIGN KEY (album_id) REFERENCES album(album_id) ON DELETE CASCADE,
            FOREIGN KEY (genre_id) REFERENCES genre(genre_id) ON DELETE CASCADE,
            FOREIGN KEY (media_type_id) REFERENCES media_type(media_type_id) ON DELETE CASCADE
        );

        CREATE TABLE IF NOT EXISTS playlist (
            playlist_id VARCHAR(30) PRIMARY KEY,
            name VARCHAR(50)
        );

        CREATE TABLE IF NOT EXISTS playlist_track (
            playlist_id VARCHAR(30),
            track_id VARCHAR(50),
            PRIMARY KEY (playlist_id, track_id),
            FOREIGN KEY (playlist_id) REFERENCES playlist(playlist_id) ON DELETE CASCADE,
            FOREIGN KEY (track_id) REFERENCES track(track_id) ON DELETE CASCADE
        );

        CREATE TABLE IF NOT EXISTS employee (
            employee_id VARCHAR(30) PRIMARY KEY,
            last_name VARCHAR(50),
            first_name VARCHAR(50),
            title VARCHAR(250),
            reports_to VARCHAR(30),
            levels VARCHAR(10),
            birth_date TIMESTAMP,
            hire_date TIMESTAMP,
            address VARCHAR(120),
            city VARCHAR(50),
            state VARCHAR(50),
            country VARCHAR(30),
            postal_code VARCHAR(30),
            phone VARCHAR(30),
            fax VARCHAR(30),
            email VARCHAR(30),
            FOREIGN KEY (reports_to) REFERENCES employee(employee_id) ON DELETE CASCADE
        );

        CREATE TABLE IF NOT EXISTS customer (
            customer_id VARCHAR(30) PRIMARY KEY,
            first_name VARCHAR(30),
            last_name VARCHAR(30),
            company VARCHAR(150),
            address VARCHAR(250),
            city VARCHAR(30),
            state VARCHAR(30),
            country VARCHAR(30),
            postal_code VARCHAR(30),
            phone VARCHAR(30),
            fax VARCHAR(30),
            email VARCHAR(30),
            support_rep_id VARCHAR(30),
            FOREIGN KEY (support_rep_id) REFERENCES employee(employee_id) ON DELETE CASCADE
        );

        CREATE TABLE IF NOT EXISTS invoice (
            invoice_id VARCHAR(30) PRIMARY KEY,
            customer_id VARCHAR(30),
            invoice_date TIMESTAMP,
            billing_address VARCHAR(120),
            billing_city VARCHAR(30),
            billing_state VARCHAR(30),
            billing_country VARCHAR(30),
            billing_postal VARCHAR(30),
            total DOUBLE PRECISION,
            FOREIGN KEY (customer_id) REFERENCES customer(customer_id) ON DELETE CASCADE
        );

        CREATE TABLE IF NOT EXISTS invoice_line (
            invoice_line_id VARCHAR(30) PRIMARY KEY,
            invoice_id VARCHAR(30),
            track_id VARCHAR(30),
            unit_price NUMERIC,
            quantity INTEGER,
            FOREIGN KEY (invoice_id) REFERENCES invoice(invoice_id) ON DELETE CASCADE,
            FOREIGN KEY (track_id) REFERENCES track(track_id) ON DELETE CASCADE
        );
    """))

print("Tables Created")

In [None]:
!pip install psycopg2-binary


In [None]:
import os
import pandas as pd
import psycopg2
from psycopg2 import sql, extras
DB = dict(
    host="127.0.0.1",
    dbname="music",
    user="postgres",
    password="Sathwickkiran@12", 
    port=5432
)

DIR = "/Users/sathw/Desktop/DMQL-Project"
FILES = {
    "artist": "artist.csv",
    "album": "album.csv",
    "genre": "genre.csv",
    "media_type": "media_type.csv",
    "track": "track.csv",
    "playlist": "playlist.csv",
    "playlist_track": "playlist_track.csv",
    "employee": "employee.csv",
    "customer": "customer.csv",
    "invoice": "invoice.csv",
    "invoice_line": "invoice_line.csv"
}
COLS = {
    "artist": 2, "album": 3, "genre": 2, "media_type": 2,
    "track": 9, "playlist": 2, "playlist_track": 2,
    "employee": 16, "customer": 13, "invoice": 9, "invoice_line": 5
}
BIGINT_MAX = 9_223_372_036_854_775_807
def safe_int(x):
    if x in (None, "", "#VALUE!"):
        return None
    try:
        v = int(float(x))
    except Exception:
        return None
    if abs(v) > BIGINT_MAX:
        raise ValueError("BIGINT overflow")
    return v
def read_df(path, n):
    df = pd.read_csv(path, dtype=str, na_filter=True)
    return df.iloc[:, :n].where(pd.notnull(df), None)
def batch_insert(cur, tbl, df):
    placeholders = ", ".join(["%s"] * df.shape[1])
    stmt = sql.SQL(
        "INSERT INTO {} VALUES (" + placeholders + ") ON CONFLICT DO NOTHING"
    ).format(sql.Identifier(tbl))
    extras.execute_batch(cur, stmt.as_string(cur), df.values.tolist(), page_size=1000)
with psycopg2.connect(**DB) as conn:
    conn.autocommit = True
    with conn.cursor() as cur:
        print("✅ Connected to database")
        cur.execute("""
            TRUNCATE
              invoice_line, invoice,
              customer, employee,
              playlist_track, playlist,
              track, media_type, genre,
              album, artist
            RESTART IDENTITY CASCADE;
        """)
        print(" All tables truncated.")
        load_order = [
            "artist", "album", "genre", "media_type", "track",
            "playlist", "playlist_track", "employee", "customer",
            "invoice", "invoice_line"
        ]

        for tbl in load_order:
            path = os.path.join(DIR, FILES[tbl])
            df = read_df(path, COLS[tbl])
            if tbl == "track":
                df = df.drop_duplicates(subset=["track_id"])
                df["milliseconds"] = df["milliseconds"].apply(safe_int)
                df["bytes"]        = df["bytes"].apply(safe_int)
            elif tbl == "invoice_line":
                df["quantity"] = df["quantity"].apply(safe_int)
            elif tbl == "playlist_track":
                df = df.drop_duplicates(subset=["playlist_id", "track_id"])
            try:
                batch_insert(cur, tbl, df)
                print(f" {tbl}: {len(df)} rows processed")
            except Exception as e:
                print(f" Failed on {tbl}: {e}")

print("\n All tables are loaded \n")


In [None]:
import psycopg2

DB = dict(
    host="127.0.0.1",
    dbname="music",
    user="postgres",
    password="Sathwickkiran@12",
    port=5432
)
with psycopg2.connect(**DB) as conn:
    conn.autocommit = True
    with conn.cursor() as cur:
        print("Starting Milestone 2 decomposition…")
        cur.execute("DROP TABLE IF EXISTS track_price;")
        cur.execute("""
            CREATE TABLE track_price (
                track_id   BIGINT PRIMARY KEY
                            REFERENCES track(track_id),
                unit_price NUMERIC
            );
        """)
        cur.execute("""
            INSERT INTO track_price(track_id, unit_price)
            SELECT DISTINCT track_id, unit_price
              FROM invoice_line
             WHERE track_id IS NOT NULL
               AND unit_price IS NOT NULL;
        """)
        cur.execute("ALTER TABLE invoice_line DROP COLUMN unit_price;")
        print("  • track_price created and invoice_line.unit_price dropped")


In [None]:
with psycopg2.connect(**DB) as conn:
    conn.autocommit = True
    with conn.cursor() as cur:
        print(" Starting Milestone 2 decomposition…")
        
        cur.execute("DROP TABLE IF EXISTS media_type_pricing;")
        cur.execute("""
            CREATE TABLE media_type_pricing (
                media_type_id VARCHAR(30),
                unit_price    NUMERIC,
                PRIMARY KEY(media_type_id, unit_price)
            );
        """)
        cur.execute("""
            INSERT INTO media_type_pricing(media_type_id, unit_price)
            SELECT DISTINCT media_type_id, unit_price
              FROM track
             WHERE media_type_id IS NOT NULL
               AND unit_price IS NOT NULL;
        """)
        cur.execute("ALTER TABLE track DROP COLUMN unit_price;")
        print("  Created and dropped")

In [None]:
import psycopg2
from psycopg2 import errors

DB = dict(
    host="127.0.0.1",
    dbname="music",
    user="postgres",
    password="Sathwickkiran@12",
    port=5432
)

with psycopg2.connect(**DB) as conn:
    conn.autocommit = True
    with conn.cursor() as cur:
        try:
            cur.execute("""
                ALTER TABLE customer
                  ADD CONSTRAINT customer_pkey PRIMARY KEY (customer_id);
            """)
        except errors.DuplicateObject:
            pass
        cur.execute("DROP TABLE IF EXISTS customer_billing;")

        cur.execute("""
            CREATE TABLE customer_billing (
                customer_id     BIGINT PRIMARY KEY
                                  REFERENCES customer(customer_id),
                billing_address VARCHAR(120),
                billing_city    VARCHAR(30),
                billing_state   VARCHAR(30),
                billing_country VARCHAR(30),
                billing_postal_code  VARCHAR(30)
            );
        """)
        cur.execute("""
            INSERT INTO customer_billing (
                customer_id,
                billing_address,
                billing_city,
                billing_state,
                billing_country,
                billing_postal_code
            )
            SELECT DISTINCT
                customer_id::BIGINT,
                billing_address,
                billing_city,
                billing_state,
                billing_country,
                billing_postal_code
              FROM invoice
             WHERE customer_id IS NOT NULL;
        """)

        cur.execute("""
            ALTER TABLE invoice
              DROP COLUMN billing_address,
              DROP COLUMN billing_city,
              DROP COLUMN billing_state,
              DROP COLUMN billing_country,
              DROP COLUMN billing_postal_code;
        """)

        print("customer_billing created, data migrated, invoice.billing_* dropped")


In [None]:
import psycopg2
from psycopg2 import errors

DB = dict(
    host="127.0.0.1",
    dbname="music",
    user="postgres",
    password="Sathwickkiran@12",
    port=5432
)
with psycopg2.connect(**DB) as conn:
    conn.autocommit = True
    with conn.cursor() as cur:
        cur.execute("DROP TABLE IF EXISTS location;")
        cur.execute("""
            CREATE TABLE location (
                postal_code VARCHAR(30) PRIMARY KEY,
                city        VARCHAR(50),
                state       VARCHAR(30),
                country     VARCHAR(30)
            );
        """)
        cur.execute("""
            INSERT INTO location (postal_code, city, state, country)
            SELECT DISTINCT
                postal_code,
                city,
                state,
                country
              FROM employee
             WHERE postal_code IS NOT NULL;
        """)
        cur.execute("""
            ALTER TABLE employee
              DROP COLUMN city,
              DROP COLUMN state,
              DROP COLUMN country;
        """)

        cur.execute("""
            ALTER TABLE employee
              ADD FOREIGN KEY (postal_code)
              REFERENCES location(postal_code);
        """)

        print(" location created, employee.city/state/country dropped, FK in place")


In [None]:
import psycopg2

DB = dict(
    host="127.0.0.1",
    dbname="music_test",
    user="postgres",
    password="Sathwickkiran@12",
    port=5432
)

with psycopg2.connect(**DB) as conn:
    conn.autocommit = True
    with conn.cursor() as cur:
        cur.execute("DROP TABLE IF EXISTS customer_location;")
        cur.execute("""
            CREATE TABLE customer_location (
                postal_code VARCHAR(30) PRIMARY KEY,
                city        VARCHAR(30),
                state       VARCHAR(30),
                country     VARCHAR(30)
            );
        """)

        cur.execute("""
            INSERT INTO customer_location (postal_code, city, state, country)
            SELECT DISTINCT
                postal_code,
                city,
                state,
                country
              FROM customer
             WHERE postal_code IS NOT NULL;
        """)

        cur.execute("""
            ALTER TABLE customer
              DROP COLUMN city,
              DROP COLUMN state,
              DROP COLUMN country;
        """)

        cur.execute("""
            ALTER TABLE customer
              ADD FOREIGN KEY (postal_code)
              REFERENCES customer_location(postal_code);
        """)

        print(" customer_location created, customer.city/state/country dropped, FK in place")
