In [3]:
import os
import pathlib
import pandas as pd
from snowflake.snowpark import Session
from dotenv import load_dotenv

load_dotenv()

# Snowflake connection parameters from env variables
snowflake_params = {
    "account": os.getenv("SNOWFLAKE_ACCOUNT"),
    "user": os.getenv("SNOWFLAKE_USER"),
    "password": os.getenv("SNOWFLAKE_PASSWORD"),
    "role": os.getenv("SNOWFLAKE_ROLE"),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
    "database": os.getenv("SNOWFLAKE_DATABASE"),
    "schema": os.getenv("SNOWFLAKE_SCHEMA"),  # Raw schema for stage
}

new_database = snowflake_params["database"]
raw_schema = snowflake_params["schema"]
cleaned_schema = "ERD_SCHEMA_CLEANED"
final_schema = "ERD_SCHEMA_STAR"

# This is the NEW FINAL TRANSFORMED stage exclusively for the star schema transformed data
final_stage = "FINAL_TRANSFORMED_STAGE"
cleaning_stage = "DATA_CLEANING_STAGE"

# Utility function to generate incremental surrogate keys

def generate_surrogate_keys(df, key_name="id"):
    df = df.copy()
    df[key_name] = range(1, len(df) + 1)
    return df

# === Dim_Location transformation ===
def create_dim_location(df_invoice: pd.DataFrame, df_customer: pd.DataFrame) -> pd.DataFrame:
    # Extract distinct location attributes from Invoice and Customer billing info
    loc_invoice = df_invoice[["BILLINGCITY", "BILLINGSTATE", "BILLINGCOUNTRY", "BILLINGPOSTALCODE"]].drop_duplicates()
    loc_invoice.columns = ["CITY", "STATE", "COUNTRY", "POSTALCODE"]
    loc_customer = df_customer[["CITY", "STATE", "COUNTRY", "POSTALCODE"]].drop_duplicates()
    loc_all = pd.concat([loc_invoice, loc_customer], ignore_index=True).drop_duplicates().reset_index(drop=True)

    # Assign surrogate integer keys
    loc_all = generate_surrogate_keys(loc_all, "LOCATION_ID")
    return loc_all

# === FactSales transformation ===
def create_fact_sales(
    df_invoiceline: pd.DataFrame,
    df_invoice: pd.DataFrame,
    df_track: pd.DataFrame,
    df_album: pd.DataFrame,
    df_artist: pd.DataFrame,
    df_customer: pd.DataFrame,
    df_employee: pd.DataFrame,
    df_playlisttrack: pd.DataFrame,
    df_dim_location: pd.DataFrame,
) -> pd.DataFrame:
    # Join InvoiceLine to Invoice
    fact = df_invoiceline.merge(df_invoice, on="INVOICEID", how="left", suffixes=("", "_inv"))

    # Join to Track
    fact = fact.merge(df_track, left_on="TRACKID", right_on="TRACKID", how="left", suffixes=("", "_trk"))

    # Ensure ALBUMID columns are int
    fact['ALBUMID'] = pd.to_numeric(fact['ALBUMID'], errors='coerce').fillna(0).astype(int)
    df_album['ALBUMID'] = pd.to_numeric(df_album['ALBUMID'], errors='coerce').fillna(0).astype(int)

    # Join Track -> Album
    fact = fact.merge(df_album, left_on="ALBUMID", right_on="ALBUMID", how="left", suffixes=("", "_alb"))

    # Ensure ARTISTID columns are int
    fact['ARTISTID'] = pd.to_numeric(fact['ARTISTID'], errors='coerce').fillna(0).astype(int)
    df_artist['ARTISTID'] = pd.to_numeric(df_artist['ARTISTID'], errors='coerce').fillna(0).astype(int)

    # Join Album -> Artist
    fact = fact.merge(df_artist, left_on="ARTISTID", right_on="ARTISTID", how="left", suffixes=("", "_art"))

    # Ensure CUSTOMERID columns are int
    fact['CUSTOMERID'] = pd.to_numeric(fact['CUSTOMERID'], errors='coerce').fillna(0).astype(int)
    df_customer['CUSTOMERID'] = pd.to_numeric(df_customer['CUSTOMERID'], errors='coerce').fillna(0).astype(int)

    # Join InvoiceLine/Invoice -> Customer
    fact = fact.merge(df_customer, left_on="CUSTOMERID", right_on="CUSTOMERID", how="left", suffixes=("", "_cust"))

    # Ensure SUPPORTREPID and EMPLOYEEID columns are int
    fact['SUPPORTREPID'] = pd.to_numeric(fact.get('SUPPORTREPID', 0), errors='coerce').fillna(0).astype(int)
    df_employee['EMPLOYEEID'] = pd.to_numeric(df_employee['EMPLOYEEID'], errors='coerce').fillna(0).astype(int)

    # Join Customer -> Employee (SupportRep)
    fact = fact.merge(df_employee, left_on="SUPPORTREPID", right_on="EMPLOYEEID", how="left", suffixes=("", "_emp"))

    # Ensure TRACKID columns are int for playlisttrack join
    fact['TRACKID'] = pd.to_numeric(fact['TRACKID'], errors='coerce').fillna(0).astype(int)
    df_playlisttrack['TRACKID'] = pd.to_numeric(df_playlisttrack['TRACKID'], errors='coerce').fillna(0).astype(int)

    # Join PlaylistTrack on TrackID to get Playlist_id
    fact = fact.merge(df_playlisttrack, left_on="TRACKID", right_on="TRACKID", how="left", suffixes=("", "_plt"))

    # Join with Dim_Location on city/state/country/postalcode to get Location_id
    fact = fact.merge(
        df_dim_location,
        left_on=["BILLINGCITY", "BILLINGSTATE", "BILLINGCOUNTRY", "BILLINGPOSTALCODE"],
        right_on=["CITY", "STATE", "COUNTRY", "POSTALCODE"],
        how="left",
    )

    # Calculate Total_amount
    fact["TOTAL_AMOUNT"] = fact["UNITPRICE"] * fact["QUANTITY"]

    # Select and rename columns per star schema
    fact_sales = fact[
        [
            "INVOICELINEID",        # InvoiceLine_id (PK)
            "LOCATION_ID",          # Location_id (FK)
            "INVOICEID",            # Invoice_id (FK)
            "TRACKID",              # Track_id (FK)
            "CUSTOMERID",           # Customer_id (FK)
            "EMPLOYEEID",           # Employee_id (FK)
            "MEDIATYPEID",          # MediaType_id (FK)
            "INVOICEDATE",          # Invoice_date
            "UNITPRICE",            # Unit_price
            "QUANTITY",             # Quantity
            "TOTAL_AMOUNT",         # Total_amount
            "PLAYLISTID",           # Playlist_id (FK)
            "ALBUMID",              # Album_id (FK)
            "ARTISTID",             # Artist_id (FK)
            "MILLISECONDS",         # Milliseconds
            "BYTES",                # Bytes
        ]
    ].copy()

    # Rename columns to match star schema
    fact_sales.rename(
        columns={
            "INVOICELINEID": "INVOICELINE_ID",
            "INVOICEID": "INVOICE_ID",
            "TRACKID": "TRACK_ID",
            "CUSTOMERID": "CUSTOMER_ID",
            "EMPLOYEEID": "EMPLOYEE_ID",
            "MEDIATYPEID": "MEDIATYPE_ID",
            "INVOICEDATE": "INVOICE_DATE",
            "UNITPRICE": "UNIT_PRICE",
            "QUANTITY": "QUANTITY",
            "TOTAL_AMOUNT": "TOTAL_AMOUNT",
            "PLAYLISTID": "PLAYLIST_ID",
            "ALBUMID": "ALBUM_ID",
            "ARTISTID": "ARTIST_ID",
            "MILLISECONDS": "MILLISECONDS",
            "BYTES": "BYTES",
        },
        inplace=True
    )

    # Convert dates to datetime
    fact_sales["INVOICE_DATE"] = pd.to_datetime(fact_sales["INVOICE_DATE"], errors="coerce")

    return fact_sales
# === Snowflake interaction functions for final STAR schema ===

def create_final_stage(session):
    # Create a NEW STAGE specifically for the final transformed STAR schema data
    sql = f'''
        CREATE STAGE IF NOT EXISTS "{new_database}"."{raw_schema}"."{final_stage}"
        FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY = '"' SKIP_HEADER = 1)
    '''
    print("Creating final transformed stage if not exists...")
    session.sql(sql).collect()

def create_star_schema_table(session, table_name, create_sql):
    print(f"Creating star schema table {final_schema}.{table_name} if not exists...")
    session.sql(create_sql).collect()

def export_to_csv(df, table_name):
    filename = f"{table_name}_star.csv"
    df.to_csv(filename, index=False)
    print(f"Exported {table_name} star schema data to {filename}")
    return filename

def remove_file_from_stage(session, filename):
    # Remove from the NEW final transformed stage
    remove_sql = f"REMOVE @\"{new_database}\".\"{raw_schema}\".\"{final_stage}\"/{filename}.gz"
    print(f"Removing old staged file {filename}.gz from final transformed stage...")
    session.sql(remove_sql).collect()

def upload_to_stage(session, csv_file):
    csv_path = pathlib.Path(csv_file).resolve().as_posix()
    filename = pathlib.Path(csv_file).name
    remove_file_from_stage(session, filename)
    put_sql = f"PUT 'file://{csv_path}' @\"{new_database}\".\"{raw_schema}\".\"{final_stage}\" AUTO_COMPRESS=TRUE"
    print(f"Uploading {csv_path} to final transformed stage {new_database}.{raw_schema}.{final_stage} ...")
    res = session.sql(put_sql).collect()
    print("PUT result:", res)

def copy_into_star_table(session, table_name, csv_file):
    copy_sql = f'''
        COPY INTO "{final_schema}"."{table_name.upper()}"
        FROM @\"{new_database}\".\"{raw_schema}\".\"{final_stage}\"/{csv_file}.gz
        FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1)
        ON_ERROR = 'CONTINUE'
    '''
    print(f"Copying data into {final_schema}.{table_name.upper()} from final transformed stage file {csv_file}.gz ...")
    res = session.sql(copy_sql).collect()
    print("COPY INTO result:", res)

def truncate_star_table(session, table_name):
    truncate_sql = f'TRUNCATE TABLE "{final_schema}"."{table_name.upper()}"'
    print(f"Truncating table {final_schema}.{table_name.upper()} before loading fresh data...")
    session.sql(truncate_sql).collect()



# === Main ETL pipeline extension for FINAL TRANSFORMED STAR schema ===
import os

def main_star_schema():
    with Session.builder.configs(snowflake_params).create() as session:
        create_final_stage(session)  # Create the NEW final transformed stage

        base_path = os.path.join(os.getcwd(), "ERD_cleaned") + os.sep

        # Load cleaned CSVs needed from ERD_cleaned folder
        df_customer = pd.read_csv(base_path + "Customer_cleaned.csv")
        df_employee = pd.read_csv(base_path + "Employee_cleaned.csv")
        df_artist = pd.read_csv(base_path + "Artist_cleaned.csv")
        df_album = pd.read_csv(base_path + "Album_cleaned.csv")
        df_invoice = pd.read_csv(base_path + "Invoice_cleaned.csv")
        df_invoiceline = pd.read_csv(base_path + "InvoiceLine_cleaned.csv")
        df_track = pd.read_csv(base_path + "Track_cleaned.csv")
        df_playlisttrack = pd.read_csv(base_path + "PlaylistTrack_cleaned.csv")

        # rest of your code unchanged...
        print("Creating Dim_Location...")
        df_dim_location = create_dim_location(df_invoice, df_customer)

        print("Creating FactSales...")
        df_fact_sales = create_fact_sales(
            df_invoiceline,
            df_invoice,
            df_track,
            df_album,
            df_artist,
            df_customer,
            df_employee,
            df_playlisttrack,
            df_dim_location,
        )

        # Export and load Dim_Location
        loc_csv = export_to_csv(df_dim_location, "Dim_Location")
        upload_to_stage(session, loc_csv)
        create_star_schema_table(session, "Dim_Location", f"CREATE TABLE IF NOT EXISTS \"{final_schema}\".\"Dim_Location\" (LOCATION_ID INT PRIMARY KEY, CITY STRING, STATE STRING, COUNTRY STRING, POSTALCODE STRING)")
        truncate_star_table(session, "Dim_Location")
        copy_into_star_table(session, "Dim_Location", loc_csv)

        # Export and load FactSales
        fact_csv = export_to_csv(df_fact_sales, "FactSales")
        upload_to_stage(session, fact_csv)
        create_star_schema_table(session, "FactSales", 
            f"CREATE TABLE IF NOT EXISTS \"{final_schema}\".\"FactSales\" ("
            "INVOICELINE_ID INT PRIMARY KEY, LOCATION_ID INT, INVOICE_ID INT, TRACK_ID INT, CUSTOMER_ID INT, "
            "EMPLOYEE_ID INT, MEDIATYPE_ID INT, INVOICE_DATE DATE, UNIT_PRICE FLOAT, QUANTITY INT, TOTAL_AMOUNT FLOAT, "
            "PLAYLIST_ID INT, ALBUM_ID INT, ARTIST_ID INT, MILLISECONDS INT, BYTES INT)"
        )
        truncate_star_table(session, "FactSales")
        copy_into_star_table(session, "FactSales", fact_csv)

        print("Final transformed star schema tables loaded successfully.")

if __name__ == "__main__":
    main_star_schema()


Creating final transformed stage if not exists...
Creating Dim_Location...
Creating FactSales...
Exported Dim_Location star schema data to Dim_Location_star.csv
Removing old staged file Dim_Location_star.csv.gz from final transformed stage...
Uploading E:/IBA_MS_DS 2026/Data WareHousing and Analysis/BI_project/Dim_Location_star.csv to final transformed stage CHINOOK_DATABASE.ERD_SCHEMA.FINAL_TRANSFORMED_STAGE ...
PUT result: [Row(source='Dim_Location_star.csv', target='Dim_Location_star.csv.gz', source_size=2042, target_size=1248, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]
Creating star schema table ERD_SCHEMA_STAR.Dim_Location if not exists...


SnowparkSQLException: (1304): 01bc6a21-0001-2863-0001-824a000aa7d2: 002003 (02000): SQL compilation error:
Schema 'CHINOOK_DATABASE.ERD_SCHEMA_STAR' does not exist or not authorized.

In [10]:
import os
import pathlib
import pandas as pd
from datetime import datetime
from snowflake.snowpark import Session
from dotenv import load_dotenv

load_dotenv()

snowflake_params = {
    "account": os.getenv("SNOWFLAKE_ACCOUNT"),
    "user": os.getenv("SNOWFLAKE_USER"),
    "password": os.getenv("SNOWFLAKE_PASSWORD"),
    "role": os.getenv("SNOWFLAKE_ROLE"),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
    "database": os.getenv("SNOWFLAKE_DATABASE"),
    "schema": os.getenv("SNOWFLAKE_SCHEMA"),
}

new_database = snowflake_params["database"]
raw_schema = snowflake_params["schema"]
cleaned_schema = "ERD_SCHEMA_CLEANED"
final_schema = "ERD_SCHEMA_STAR"
final_stage = "FINAL_TRANSFORMED_STAGE"
cleaning_stage = "DATA_CLEANING_STAGE"

def generate_surrogate_keys(df, key_name="id"):
    df = df.copy()
    df[key_name] = range(1, len(df) + 1)
    return df

# Dim Date
def create_dim_date(df_invoice: pd.DataFrame) -> pd.DataFrame:
    unique_dates = pd.to_datetime(df_invoice['INVOICEDATE'].dropna().drop_duplicates().reset_index(drop=True))
    df_date = pd.DataFrame({'DATE': unique_dates})
    df_date['DATE_ID'] = range(1, len(df_date) + 1)
    df_date['DAY'] = df_date['DATE'].dt.day
    df_date['WEEK_DAY'] = df_date['DATE'].dt.day_name()
    df_date['IS_WEEKEND'] = df_date['WEEK_DAY'].isin(['Saturday', 'Sunday'])
    df_date['IS_HOLIDAY'] = False  # Placeholder; can be extended with real holiday data
    df_date['MONTH_NAME'] = df_date['DATE'].dt.month_name()
    df_date['MONTH_NUMBER'] = df_date['DATE'].dt.month
    df_date['QUARTER'] = df_date['DATE'].dt.quarter
    df_date['YEAR'] = df_date['DATE'].dt.year
    cols = ['DATE_ID', 'DATE', 'DAY', 'WEEK_DAY', 'IS_HOLIDAY', 'IS_WEEKEND', 'MONTH_NAME', 'MONTH_NUMBER', 'QUARTER', 'YEAR']
    return df_date[cols]

# Dim Location (from customer only)
def create_dim_location(df_customer: pd.DataFrame) -> pd.DataFrame:
    loc = df_customer[['CITY', 'STATE', 'COUNTRY', 'POSTALCODE']].drop_duplicates().reset_index(drop=True)
    loc = generate_surrogate_keys(loc, 'LOCATION_ID')
    return loc

# Dim Album Artist
def create_dim_album_artist(df_album: pd.DataFrame, df_artist: pd.DataFrame) -> pd.DataFrame:
    # Convert ARTISTID to int in both DataFrames for consistent join
    df_album['ARTISTID'] = pd.to_numeric(df_album['ARTISTID'], errors='coerce').fillna(0).astype(int)
    df_artist['ARTISTID'] = pd.to_numeric(df_artist['ARTISTID'], errors='coerce').fillna(0).astype(int)

    df = df_album.merge(df_artist, on="ARTISTID", how="left", suffixes=("", "_art"))
    df = df.rename(columns={"ALBUMID": "ALBUM_ID", "TITLE": "TITLE", "ARTISTID": "ARTIST_ID", "NAME": "ARTIST_NAME"})
    df = df[["ALBUM_ID", "TITLE", "ARTIST_ID", "ARTIST_NAME"]].drop_duplicates()
    return df

# Dim Track
def create_dim_track(df_track: pd.DataFrame, df_genre: pd.DataFrame, df_mediatype: pd.DataFrame) -> pd.DataFrame:
    df = df_track.merge(df_genre, on="GENREID", how="left", suffixes=("", "_gen"))
    df = df.merge(df_mediatype, on="MEDIATYPEID", how="left", suffixes=("", "_med"))
    df = df.rename(columns={
        "TRACKID": "TRACK_ID",
        "NAME": "TRACK_NAME",
        "GENREID": "GENRE_ID",
        "NAME_gen": "GENRE_NAME",
        "MEDIATYPEID": "MEDIATYPE_ID",
        "NAME_med": "MEDIA_TYPE_NAME",
        "ALBUMID": "ALBUM_ID",
    })
    return df[["TRACK_ID", "TRACK_NAME", "ALBUM_ID", "MEDIATYPE_ID", "MEDIA_TYPE_NAME", "GENRE_ID", "GENRE_NAME"]].drop_duplicates()

# Dim Playlist Track
def create_dim_playlist_track(df_playlisttrack: pd.DataFrame, df_playlist: pd.DataFrame) -> pd.DataFrame:
    df = df_playlisttrack.merge(df_playlist, on="PLAYLISTID", how="left", suffixes=("", "_plt"))
    df = df.rename(columns={"PLAYLISTID": "PLAYLIST_ID", "NAME": "PLAYLIST_NAME", "TRACKID": "TRACK_ID"})
    return df[["PLAYLIST_ID", "PLAYLIST_NAME", "TRACK_ID"]].drop_duplicates()

# Fact Sales
def create_fact_sales(
    df_invoiceline, df_invoice, df_track, df_album, df_artist,
    df_customer, df_employee, df_playlisttrack, df_dim_location,
):
    def fix_int_col(df, col):
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
    for df_, cols in [
        (df_invoiceline, ['INVOICEID', 'TRACKID', 'CUSTOMERID']),
        (df_invoice, ['INVOICEID']),
        (df_track, ['TRACKID', 'ALBUMID', 'GENREID', 'MEDIATYPEID']),
        (df_album, ['ALBUMID', 'ARTISTID']),
        (df_artist, ['ARTISTID']),
        (df_customer, ['CUSTOMERID', 'SUPPORTREPID']),
        (df_employee, ['EMPLOYEEID']),
        (df_playlisttrack, ['PLAYLISTID', 'TRACKID']),
        (df_dim_location, ['LOCATION_ID']),
    ]:
        for col in cols:
            fix_int_col(df_, col)

    fact = df_invoiceline.merge(df_invoice, on="INVOICEID", how="left", suffixes=("", "_inv"))
    fact = fact.merge(df_track, on="TRACKID", how="left", suffixes=("", "_trk"))
    fact = fact.merge(df_album, on="ALBUMID", how="left", suffixes=("", "_alb"))
    fact = fact.merge(df_artist, on="ARTISTID", how="left", suffixes=("", "_art"))
    fact = fact.merge(df_customer, on="CUSTOMERID", how="left", suffixes=("", "_cust"))
    fact = fact.merge(df_employee, left_on="SUPPORTREPID", right_on="EMPLOYEEID", how="left", suffixes=("", "_emp"))
    fact = fact.merge(df_playlisttrack, on="TRACKID", how="left", suffixes=("", "_plt"))
    fact = fact.merge(
        df_dim_location,
        left_on=["BILLINGCITY", "BILLINGSTATE", "BILLINGCOUNTRY", "BILLINGPOSTALCODE"],
        right_on=["CITY", "STATE", "COUNTRY", "POSTALCODE"],
        how="left",
    )

    fact["TOTAL_AMOUNT"] = fact["UNITPRICE"] * fact["QUANTITY"]

    fact_sales = fact[
        [
            "INVOICELINEID", "LOCATION_ID", "INVOICEID", "TRACKID", "CUSTOMERID", "EMPLOYEEID",
            "MEDIATYPEID", "INVOICEDATE", "UNITPRICE", "QUANTITY", "TOTAL_AMOUNT",
            "PLAYLISTID", "ALBUMID", "ARTISTID", "MILLISECONDS", "BYTES",
        ]
    ].copy()

    fact_sales.rename(columns={
        "INVOICELINEID": "INVOICELINE_ID",
        "LOCATION_ID": "LOCATION_ID",
        "INVOICEID": "INVOICE_ID",
        "TRACKID": "TRACK_ID",
        "CUSTOMERID": "CUSTOMER_ID",
        "EMPLOYEEID": "EMPLOYEE_ID",
        "MEDIATYPEID": "MEDIATYPE_ID",
        "INVOICEDATE": "INVOICE_DATE",
        "UNITPRICE": "UNIT_PRICE",
        "QUANTITY": "QUANTITY",
        "TOTAL_AMOUNT": "TOTAL_AMOUNT",
        "PLAYLISTID": "PLAYLIST_ID",
        "ALBUMID": "ALBUM_ID",
        "ARTISTID": "ARTIST_ID",
        "MILLISECONDS": "MILLISECONDS",
        "BYTES": "BYTES",
    }, inplace=True)
    fact_sales["INVOICE_DATE"] = pd.to_datetime(fact_sales["INVOICE_DATE"], errors="coerce")
    return fact_sales

# Snowflake interaction functions

def create_final_schema(session):
    sql = f'CREATE SCHEMA IF NOT EXISTS "{new_database}"."{final_schema}"'
    print(f"Creating final star schema {final_schema} if not exists...")
    session.sql(sql).collect()

def create_final_stage(session):
    sql = f'''
        CREATE STAGE IF NOT EXISTS "{new_database}"."{raw_schema}"."{final_stage}"
        FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY = '"' SKIP_HEADER = 1)
    '''
    print("Creating final transformed stage if not exists...")
    session.sql(sql).collect()

def create_star_schema_table(session, table_name, create_sql):
    print(f"Creating star schema table {final_schema}.{table_name} if not exists...")
    session.sql(create_sql).collect()

def export_to_csv(df, table_name):
    folder = "ERD_cleaned"
    os.makedirs(folder, exist_ok=True)
    filename = os.path.join(folder, f"{table_name}_star.csv")
    df.to_csv(filename, index=False)
    print(f"Exported {table_name} star schema data to {filename}")
    return filename

def remove_file_from_stage(session, filename):
    remove_sql = f"REMOVE @\"{new_database}\".\"{raw_schema}\".\"{final_stage}\"/{filename}.gz"
    print(f"Removing old staged file {filename}.gz from final transformed stage...")
    session.sql(remove_sql).collect()

def upload_to_stage(session, csv_file):
    csv_path = pathlib.Path(csv_file).resolve().as_posix()
    filename = pathlib.Path(csv_file).name
    remove_file_from_stage(session, filename)
    put_sql = f"PUT 'file://{csv_path}' @\"{new_database}\".\"{raw_schema}\".\"{final_stage}\" AUTO_COMPRESS=TRUE"
    print(f"Uploading {csv_path} to final transformed stage {new_database}.{raw_schema}.{final_stage} ...")
    res = session.sql(put_sql).collect()
    print("PUT result:", res)

def copy_into_star_table(session, table_name, csv_file):
    copy_sql = f'''
        COPY INTO "{final_schema}"."{table_name.upper()}"
        FROM @\"{new_database}\".\"{raw_schema}\".\"{final_stage}\"/{csv_file}.gz
        FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1)
        ON_ERROR = 'CONTINUE'
    '''
    print(f"Copying data into {final_schema}.{table_name.upper()} from final transformed stage file {csv_file}.gz ...")
    res = session.sql(copy_sql).collect()
    print("COPY INTO result:", res)

def truncate_star_table(session, table_name):
    truncate_sql = f'TRUNCATE TABLE "{final_schema}"."{table_name.upper()}"'
    print(f"Truncating table {final_schema}.{table_name.upper()} before loading fresh data...")
    session.sql(truncate_sql).collect()

def main_star_schema():
    with Session.builder.configs(snowflake_params).create() as session:
        create_final_schema(session)

        create_final_stage(session)

        base_path = os.path.join(os.getcwd(), "ERD_cleaned") + os.sep

        df_customer = pd.read_csv(base_path + "Customer_cleaned.csv")
        df_employee = pd.read_csv(base_path + "Employee_cleaned.csv")
        df_artist = pd.read_csv(base_path + "Artist_cleaned.csv")
        df_album = pd.read_csv(base_path + "Album_cleaned.csv")
        df_invoice = pd.read_csv(base_path + "Invoice_cleaned.csv")
        df_invoiceline = pd.read_csv(base_path + "InvoiceLine_cleaned.csv")
        df_track = pd.read_csv(base_path + "Track_cleaned.csv")
        df_playlisttrack = pd.read_csv(base_path + "PlaylistTrack_cleaned.csv")
        df_genre = pd.read_csv(base_path + "Genre_cleaned.csv")
        df_mediatype = pd.read_csv(base_path + "MediaType_cleaned.csv")

        print("Creating Dim_Date...")
        df_dim_date = create_dim_date(df_invoice)
        print("Creating Dim_Location...")
        df_dim_location = create_dim_location(df_customer)
        print("Creating Dim_Album_Artist...")
        df_dim_album_artist = create_dim_album_artist(df_album, df_artist)
        print("Creating Dim_Track...")
        df_dim_track = create_dim_track(df_track, df_genre, df_mediatype)
        print("Creating Dim_Playlist_Track...")
        df_dim_playlist_track = create_dim_playlist_track(df_playlisttrack, pd.read_csv(base_path + "Playlist_cleaned.csv"))

        print("Creating FactSales...")
        df_fact_sales = create_fact_sales(
            df_invoiceline,
            df_invoice,
            df_track,
            df_album,
            df_artist,
            df_customer,
            df_employee,
            df_playlisttrack,
            df_dim_location,
        )

        # Export, upload, create, truncate, and copy for each dimension and fact table

        def process_table(df, table_name, create_sql):
            csv_file = export_to_csv(df, table_name)
            upload_to_stage(session, csv_file)
            create_star_schema_table(session, table_name, create_sql)
            truncate_star_table(session, table_name)#Continuing and completing your pipeline with dimension table creation, exports, uploads, and loading to Snowflake:


def upload_to_stage(session, csv_file):
    csv_path = pathlib.Path(csv_file).resolve().as_posix()
    filename = pathlib.Path(csv_file).name
    remove_file_from_stage(session, filename)
    put_sql = f"PUT 'file://{csv_path}' @\"{new_database}\".\"{raw_schema}\".\"{final_stage}\" AUTO_COMPRESS=TRUE"
    print(f"Uploading {csv_path} to final transformed stage {new_database}.{raw_schema}.{final_stage} ...")
    res = session.sql(put_sql).collect()
    print("PUT result:", res)
def copy_into_star_table(session, table_name, csv_file):
    copy_sql = f'''
        COPY INTO "{final_schema}"."{table_name.upper()}"
        FROM @\"{new_database}\".\"{raw_schema}\".\"{final_stage}\"/{csv_file}.gz
        FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1)
        ON_ERROR = 'ABORT_STATEMENT'
    '''
    print(f"Copying data into {final_schema}.{table_name.upper()} from final transformed stage file {csv_file}.gz ...")
    res = session.sql(copy_sql).collect()
    print("COPY INTO result:", res)

def truncate_star_table(session, table_name):
    truncate_sql = f'TRUNCATE TABLE "{final_schema}"."{table_name.upper()}"'
    print(f"Truncating table {final_schema}.{table_name.upper()} before loading fresh data...")
    session.sql(truncate_sql).collect()
def set_schema(session, database: str, schema: str):
    use_sql = f'USE SCHEMA "{database}"."{schema}"'
    print(f"Setting current schema to {database}.{schema} ...")
    session.sql(use_sql).collect()

        

def main_star_schema():
    with Session.builder.configs(snowflake_params).create() as session:
        create_final_schema(session)  # Ensure final star schema exists
        set_schema(session, new_database, final_schema)  # <--- ADD THIS LINE
        create_final_stage(session)   # Ensure final stage exists

        base_path = os.path.join(os.getcwd(), "ERD_cleaned") + os.sep

        df_customer = pd.read_csv(base_path + "Customer_cleaned.csv")
        df_employee = pd.read_csv(base_path + "Employee_cleaned.csv")
        df_artist = pd.read_csv(base_path + "Artist_cleaned.csv")
        df_album = pd.read_csv(base_path + "Album_cleaned.csv")
        df_invoice = pd.read_csv(base_path + "Invoice_cleaned.csv")
        df_invoiceline = pd.read_csv(base_path + "InvoiceLine_cleaned.csv")
        df_track = pd.read_csv(base_path + "Track_cleaned.csv")
        df_playlisttrack = pd.read_csv(base_path + "PlaylistTrack_cleaned.csv")
        df_genre = pd.read_csv(base_path + "Genre_cleaned.csv")
        df_mediatype = pd.read_csv(base_path + "MediaType_cleaned.csv")
        df_playlist = pd.read_csv(base_path + "Playlist_cleaned.csv")

        # Create Dimensions
        print("Creating Dim_Date...")
        df_dim_date = create_dim_date(df_invoice)
        dim_date_csv = export_to_csv(df_dim_date, "Dim_Date")
        upload_to_stage(session, dim_date_csv)
        create_star_schema_table(session, "Dim_Date",
            f"""
            CREATE TABLE IF NOT EXISTS "{final_schema}"."Dim_Date" (
                DATE_ID INT PRIMARY KEY,
                DATE DATE,
                DAY INT,
                WEEK_DAY STRING,
                IS_HOLIDAY BOOLEAN,
                IS_WEEKEND BOOLEAN,
                MONTH_NAME STRING,
                MONTH_NUMBER INT,
                QUARTER INT,
                YEAR INT
            )
            """
        )
        truncate_star_table(session, "Dim_Date")
        copy_into_star_table(session, "Dim_Date", dim_date_csv)

        print("Creating Dim_Location...")
        df_dim_location = create_dim_location(df_customer)
        loc_csv = export_to_csv(df_dim_location, "Dim_Location")
        upload_to_stage(session, loc_csv)
        create_star_schema_table(session, "Dim_Location",
            f"""
            CREATE TABLE IF NOT EXISTS "{final_schema}"."Dim_Location" (
                LOCATION_ID INT PRIMARY KEY,
                CITY STRING,
                STATE STRING,
                COUNTRY STRING,
                POSTALCODE STRING
            )
            """
        )
        truncate_star_table(session, "Dim_Location")
        copy_into_star_table(session, "Dim_Location", loc_csv)

        print("Creating Dim_Album_Artist...")
        df_dim_album_artist = create_dim_album_artist(df_album, df_artist)
        album_artist_csv = export_to_csv(df_dim_album_artist, "Dim_Album_Artist")
        upload_to_stage(session, album_artist_csv)
        create_star_schema_table(session, "Dim_Album_Artist",
            f"""
            CREATE TABLE IF NOT EXISTS "{final_schema}"."Dim_Album_Artist" (
                ALBUM_ID INT PRIMARY KEY,
                TITLE STRING,
                ARTIST_ID INT,
                ARTIST_NAME STRING
            )
            """
        )
        truncate_star_table(session, "Dim_Album_Artist")
        copy_into_star_table(session, "Dim_Album_Artist", album_artist_csv)

        print("Creating Dim_Track...")
        df_dim_track = create_dim_track(df_track, df_genre, df_mediatype)
        track_csv = export_to_csv(df_dim_track, "Dim_Track")
        upload_to_stage(session, track_csv)
        create_star_schema_table(session, "Dim_Track",
            f"""
            CREATE TABLE IF NOT EXISTS "{final_schema}"."Dim_Track" (
                TRACK_ID INT PRIMARY KEY,
                TRACK_NAME STRING,
                ALBUM_ID INT,
                MEDIATYPE_ID INT,
                MEDIA_TYPE_NAME STRING,
                GENRE_ID INT,
                GENRE_NAME STRING
            )
            """
        )
        truncate_star_table(session, "Dim_Track")
        copy_into_star_table(session, "Dim_Track", track_csv)

        print("Creating Dim_Playlist_Track...")
        df_dim_playlist_track = create_dim_playlist_track(df_playlisttrack, df_playlist)
        playlist_track_csv = export_to_csv(df_dim_playlist_track, "Dim_Playlist_Track")
        upload_to_stage(session, playlist_track_csv)
        create_star_schema_table(session, "Dim_Playlist_Track",
            f"""
            CREATE TABLE IF NOT EXISTS "{final_schema}"."Dim_Playlist_Track" (
                PLAYLIST_ID INT,
                PLAYLIST_NAME STRING,
                TRACK_ID INT
            )
            """
        )
        truncate_star_table(session, "Dim_Playlist_Track")
        copy_into_star_table(session, "Dim_Playlist_Track", playlist_track_csv)

        print("Creating FactSales...")
        df_fact_sales = create_fact_sales(
            df_invoiceline,
            df_invoice,
            df_track,
            df_album,
            df_artist,
            df_customer,
            df_employee,
            df_playlisttrack,
            df_dim_location,
        )
        fact_csv = export_to_csv(df_fact_sales, "FactSales")
        upload_to_stage(session, fact_csv)
        create_star_schema_table(session, "FactSales",
            f"""
            CREATE TABLE IF NOT EXISTS "{final_schema}"."FactSales" (
                INVOICELINE_ID INT PRIMARY KEY,
                LOCATION_ID INT,
                INVOICE_ID INT,
                TRACK_ID INT,
                CUSTOMER_ID INT,
                EMPLOYEE_ID INT,
                MEDIATYPE_ID INT,
                INVOICE_DATE DATE,
                UNIT_PRICE FLOAT,
                QUANTITY INT,
                TOTAL_AMOUNT FLOAT,
                PLAYLIST_ID INT,
                ALBUM_ID INT,
                ARTIST_ID INT,
                MILLISECONDS INT,
                BYTES INT
            )
            """
        )
        truncate_star_table(session, "FactSales")
        copy_into_star_table(session, "FactSales", fact_csv)

        print("Final transformed star schema tables loaded successfully.")

if __name__ == "__main__":
    main_star_schema()


Creating final star schema ERD_SCHEMA_STAR if not exists...
Setting current schema to CHINOOK_DATABASE.ERD_SCHEMA_STAR ...
Creating final transformed stage if not exists...
Creating Dim_Date...
Exported Dim_Date star schema data to ERD_cleaned\Dim_Date_star.csv
Removing old staged file Dim_Date_star.csv.gz from final transformed stage...
Uploading E:/IBA_MS_DS 2026/Data WareHousing and Analysis/BI_project/ERD_cleaned/Dim_Date_star.csv to final transformed stage CHINOOK_DATABASE.ERD_SCHEMA.FINAL_TRANSFORMED_STAGE ...
PUT result: [Row(source='Dim_Date_star.csv', target='Dim_Date_star.csv.gz', source_size=19415, target_size=2928, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]
Creating star schema table ERD_SCHEMA_STAR.Dim_Date if not exists...
Truncating table ERD_SCHEMA_STAR.DIM_DATE before loading fresh data...


SnowparkSQLException: (1304): 01bc6a52-0001-2784-0001-824a00095c2a: 002003 (42S02): SQL compilation error:
Table 'CHINOOK_DATABASE.ERD_SCHEMA_STAR.DIM_DATE' does not exist or not authorized.

In [11]:
import os
import pandas as pd

base_path = os.path.join(os.getcwd(), "ERD_cleaned") + os.sep

df_customer = pd.read_csv(base_path + "Customer_cleaned.csv")
df_employee = pd.read_csv(base_path + "Employee_cleaned.csv")
df_artist = pd.read_csv(base_path + "Artist_cleaned.csv")
df_album = pd.read_csv(base_path + "Album_cleaned.csv")
df_invoice = pd.read_csv(base_path + "Invoice_cleaned.csv")
df_invoiceline = pd.read_csv(base_path + "InvoiceLine_cleaned.csv")
df_track = pd.read_csv(base_path + "Track_cleaned.csv")
df_playlisttrack = pd.read_csv(base_path + "PlaylistTrack_cleaned.csv")
df_genre = pd.read_csv(base_path + "Genre_cleaned.csv")
df_mediatype = pd.read_csv(base_path + "MediaType_cleaned.csv")
df_playlist = pd.read_csv(base_path + "Playlist_cleaned.csv")

print("Files loaded successfully")

# Test CSV exports
def export_to_csv(df, table_name):
    folder = "ERD_cleaned"
    os.makedirs(folder, exist_ok=True)
    filename = os.path.join(folder, f"{table_name}_star.csv")
    df.to_csv(filename, index=False)
    print(f"Exported {table_name} star schema data to {filename}")
    return filename

export_to_csv(df_customer, "Dim_Customer")
export_to_csv(df_artist, "Dim_Artist")
export_to_csv(df_album, "Dim_Album")
export_to_csv(df_invoice, "Fact_Invoice")


Files loaded successfully
Exported Dim_Customer star schema data to ERD_cleaned\Dim_Customer_star.csv
Exported Dim_Artist star schema data to ERD_cleaned\Dim_Artist_star.csv
Exported Dim_Album star schema data to ERD_cleaned\Dim_Album_star.csv
Exported Fact_Invoice star schema data to ERD_cleaned\Fact_Invoice_star.csv


'ERD_cleaned\\Fact_Invoice_star.csv'

In [18]:
import os
import pathlib
from snowflake.snowpark import Session
from dotenv import load_dotenv

load_dotenv()
snowflake_params = {
    "account": os.getenv("SNOWFLAKE_ACCOUNT"),
    "user": os.getenv("SNOWFLAKE_USER"),
    "password": os.getenv("SNOWFLAKE_PASSWORD"),
    "role": os.getenv("SNOWFLAKE_ROLE"),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
    "database": os.getenv("SNOWFLAKE_DATABASE"),
    "schema": os.getenv("SNOWFLAKE_SCHEMA"),  # Raw schema for stage
}

# Use the correct schema for star schema
raw_schema = snowflake_params["schema"]
final_schema = os.getenv("SNOWFLAKE_STAR_SCHEMA", "ERD_SCHEMA_STAR")
final_stage = "FINAL_TRANSFORMED_STAGE"

def upload_to_stage(session, csv_file):
    csv_path = pathlib.Path(csv_file).resolve().as_posix()
    filename = pathlib.Path(csv_file).name
    print(f"Uploading {csv_path} to stage...")
    put_sql = f"PUT 'file://{csv_path}' @\"{new_database}\".\"{raw_schema}\".\"{final_stage}\" AUTO_COMPRESS=TRUE"
    res = session.sql(put_sql).collect()
    print("PUT result:", res)

with Session.builder.configs(snowflake_params).create() as session:
    # Test file upload
    upload_to_stage(session, "ERD_cleaned/Dim_Customer_star.csv")


Uploading E:/IBA_MS_DS 2026/Data WareHousing and Analysis/BI_project/ERD_cleaned/Dim_Customer_star.csv to stage...
PUT result: [Row(source='Dim_Customer_star.csv', target='Dim_Customer_star.csv.gz', source_size=7093, target_size=0, source_compression='NONE', target_compression='GZIP', status='SKIPPED', message='')]


In [19]:
with Session.builder.configs(snowflake_params).create() as session:
    list_files_sql = f"LIST @{new_database}.{raw_schema}.{final_stage}"
    files = session.sql(list_files_sql).collect()
    print("Files on stage:", files)


Files on stage: [Row(name='final_transformed_stage/Dim_Customer_star.csv.gz', size=4016, md5='9364d9ddc4b4463a4b1b382c73bb4fd7', last_modified='Sat, 17 May 2025 20:21:44 GMT'), Row(name='final_transformed_stage/Dim_Date_star.csv.gz', size=2928, md5='8538a120ab9c85cea1fa29083d84c0c2', last_modified='Sat, 17 May 2025 20:21:46 GMT'), Row(name='final_transformed_stage/Dim_Location_star.csv.gz', size=1248, md5='6101cd16021935f4f955bdd38c3e1147', last_modified='Sat, 17 May 2025 20:21:47 GMT')]


In [20]:
def remove_file_from_stage(session, filename):
    remove_sql = f"REMOVE @\"{new_database}\".\"{raw_schema}\".\"{final_stage}\"/{filename}"
    print(f"Removing old staged file {filename} from final transformed stage...")
    res = session.sql(remove_sql).collect()
    print("REMOVE result:", res)

with Session.builder.configs(snowflake_params).create() as session:
    # Clear old files
    remove_file_from_stage(session, "Dim_Customer_star.csv.gz")
    remove_file_from_stage(session, "Dim_Date_star.csv.gz")
    remove_file_from_stage(session, "Dim_Location_star.csv.gz")


Removing old staged file Dim_Customer_star.csv.gz from final transformed stage...
REMOVE result: [Row(name='final_transformed_stage/Dim_Customer_star.csv.gz', result='removed')]
Removing old staged file Dim_Date_star.csv.gz from final transformed stage...
REMOVE result: [Row(name='final_transformed_stage/Dim_Date_star.csv.gz', result='removed')]
Removing old staged file Dim_Location_star.csv.gz from final transformed stage...
REMOVE result: [Row(name='final_transformed_stage/Dim_Location_star.csv.gz', result='removed')]


In [21]:
with Session.builder.configs(snowflake_params).create() as session:
    upload_to_stage(session, "ERD_cleaned/Dim_Customer_star.csv")
    upload_to_stage(session, "ERD_cleaned/Dim_Date_star.csv")
    upload_to_stage(session, "ERD_cleaned/Dim_Location_star.csv")


Uploading E:/IBA_MS_DS 2026/Data WareHousing and Analysis/BI_project/ERD_cleaned/Dim_Customer_star.csv to stage...
PUT result: [Row(source='Dim_Customer_star.csv', target='Dim_Customer_star.csv.gz', source_size=7093, target_size=4016, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]
Uploading E:/IBA_MS_DS 2026/Data WareHousing and Analysis/BI_project/ERD_cleaned/Dim_Date_star.csv to stage...
PUT result: [Row(source='Dim_Date_star.csv', target='Dim_Date_star.csv.gz', source_size=19415, target_size=2928, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]
Uploading E:/IBA_MS_DS 2026/Data WareHousing and Analysis/BI_project/ERD_cleaned/Dim_Location_star.csv to stage...
PUT result: [Row(source='Dim_Location_star.csv', target='Dim_Location_star.csv.gz', source_size=2042, target_size=1248, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]


In [22]:
with Session.builder.configs(snowflake_params).create() as session:
    list_files_sql = f"LIST @{new_database}.{raw_schema}.{final_stage}"
    files = session.sql(list_files_sql).collect()
    print("Files on stage (after cleanup):", files)


Files on stage (after cleanup): [Row(name='final_transformed_stage/Dim_Customer_star.csv.gz', size=4016, md5='f1e4f15a407bbb630667e527706f99a4', last_modified='Sat, 17 May 2025 20:33:20 GMT'), Row(name='final_transformed_stage/Dim_Date_star.csv.gz', size=2928, md5='f59015494461430eda1c688fbe5b08ec', last_modified='Sat, 17 May 2025 20:33:21 GMT'), Row(name='final_transformed_stage/Dim_Location_star.csv.gz', size=1248, md5='94849ccb733c0d1defb7f947c8e24559', last_modified='Sat, 17 May 2025 20:33:22 GMT')]


In [23]:
def truncate_star_table(session, table_name):
    truncate_sql = f'TRUNCATE TABLE "{final_schema}"."{table_name.upper()}"'
    print(f"Truncating table {final_schema}.{table_name.upper()} before loading fresh data...")
    session.sql(truncate_sql).collect()

def copy_into_star_table(session, table_name, csv_file):
    copy_sql = f'''
        COPY INTO "{final_schema}"."{table_name.upper()}"
        FROM @\"{new_database}\".\"{raw_schema}\".\"{final_stage}\"/{csv_file}.gz
        FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1)
        ON_ERROR = 'CONTINUE'
    '''
    print(f"Copying data into {final_schema}.{table_name.upper()} from final transformed stage file {csv_file}.gz ...")
    res = session.sql(copy_sql).collect()
    print("COPY INTO result:", res)

with Session.builder.configs(snowflake_params).create() as session:
    # Load Dim_Date
    truncate_star_table(session, "Dim_Date")
    copy_into_star_table(session, "Dim_Date", "Dim_Date_star.csv")

    # Load Dim_Location
    truncate_star_table(session, "Dim_Location")
    copy_into_star_table(session, "Dim_Location", "Dim_Location_star.csv")

    # Load Dim_Customer (if you have this table)
    truncate_star_table(session, "Dim_Customer")
    copy_into_star_table(session, "Dim_Customer", "Dim_Customer_star.csv")


Truncating table ERD_SCHEMA_STAR.DIM_DATE before loading fresh data...


SnowparkSQLException: (1304): 01bc6a71-0001-2859-0001-824a000a98da: 002003 (42S02): SQL compilation error:
Table 'CHINOOK_DATABASE.ERD_SCHEMA_STAR.DIM_DATE' does not exist or not authorized.