### Importing Libraries

In [92]:
# importing libraries

import os # for handling files
import pandas as pd # for data cleaning 

from dotenv import load_dotenv #



import sqlalchemy as sal # for connecting to sql database

### Cleaning and Filtering data 

In [93]:
load_dotenv(override=True) 
folder_path = os.getenv("FOLDER_PATH")

cleaned_tables = {}

# Loop through all CSV files in folder and remove duplicates or blanks rows(IF EXISTS!!)
for file in os.listdir(folder_path):
    if file.endswith(".csv"):
        table_name = file.replace(".csv", "")
        df = pd.read_csv(os.path.join(folder_path, file))
        original_rows = len(df)
        
        # Remove rows where all columns are NaN, if exists
        df = df.dropna(how="all")
        
        # Remove row duplicates, if exists
        df_cleaned = df.drop_duplicates()
        
        print(f"{table_name}: Original rows = {original_rows}, After cleaning = {len(df_cleaned)}")
        
        # Save cleaned DataFrame in dictionary
        cleaned_tables[table_name] = df_cleaned

dim_customer: Original rows = 107776, After cleaning = 107776
dim_delivery_partner: Original rows = 15000, After cleaning = 15000
dim_menu_item: Original rows = 342671, After cleaning = 342671
dim_restaurant: Original rows = 19995, After cleaning = 19995
fact_delivery_performance: Original rows = 149166, After cleaning = 149166
fact_orders: Original rows = 149166, After cleaning = 149166
fact_order_items: Original rows = 342994, After cleaning = 342994
fact_ratings: Original rows = 68842, After cleaning = 68825


### Converting the  format "order_timestamp" from fact_orders table and "review_timestamp" from the fact_ratings to datatime

In [94]:
# timestamps are in "str" format

print(type(cleaned_tables["fact_orders"]["order_timestamp"][1]))
print(type(cleaned_tables["fact_ratings"]["review_timestamp"][1]))

<class 'str'>
<class 'str'>


In [95]:
cleaned_tables["fact_orders"]["order_timestamp"] = pd.to_datetime(
    cleaned_tables["fact_orders"]["order_timestamp"], format="%Y-%m-%d %H:%M:%S"
)

cleaned_tables["fact_ratings"]["review_timestamp"] = pd.to_datetime(
    cleaned_tables["fact_ratings"]["review_timestamp"], format="%d-%m-%Y %H:%M"
).dt.floor("min")


In [96]:
# converted to datetime format

cleaned_tables["fact_ratings"]["review_timestamp"].sample()
cleaned_tables["fact_orders"]["order_timestamp"].sample()

147257   2025-09-24 19:52:00
Name: order_timestamp, dtype: datetime64[ns]

### Connecting to MS SQL server and creating Database and Schema 

In [97]:
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

load_dotenv(override=True) 

engine_url = os.getenv("DB_ENGINE_URL")  # master DB URL
schema_name = "quick_bite_schema"
database_name = "quick_bite_database"

# Connect to master to create database if not exists
engine_master = create_engine(engine_url)

# Check if Schema exists, if not then make it 
with engine_master.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
    try:
        # Ensure database exists
        result = conn.execute(text(f"SELECT * FROM sys.databases WHERE name='{database_name}'"))
        if result.fetchone(): 
            print(f"Database '{database_name}' already exists.")
        else:
            conn.execute(text(f"CREATE DATABASE {database_name};"))
            print(f"Database '{database_name}' created successfully.")
    except SQLAlchemyError as e:
        print(f"Failed to create database '{database_name}': {e}")

# Connect to the actual database "quick_bite_database"
engine_db = create_engine(
    f"mssql+pyodbc://localhost\\SQLEXPRESS/{database_name}?driver=ODBC+Driver+17+for+SQL+Server&trusted_connection=yes"
)

# Check if Schema exists, if not then make it 
with engine_db.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
    try:
        result = conn.execute(text(f"SELECT * FROM sys.schemas WHERE name='{schema_name}'"))
        if result.fetchone():
            print(f"Schema '{schema_name}' already exists.")
        else:
            conn.execute(text(f"EXEC('CREATE SCHEMA {schema_name}')"))
            print(f"Schema '{schema_name}' created successfully.")
    except SQLAlchemyError as e:
        print(f"Failed to create schema '{schema_name}': {e}")

Database 'quick_bite_database' created successfully.
Schema 'quick_bite_schema' created successfully.


### Load data to SQL server

In [98]:
for table_name, df in cleaned_tables.items():
    df.to_sql(
        name=table_name,
        con=engine_db,
        schema=schema_name,
        if_exists='append',  # Append new data if table exists
        index=False
    )
    print(f"{table_name} pushed to SQL Server schema '{schema_name}'.")

print("ETL Process Completed Successfully!")

dim_customer pushed to SQL Server schema 'quick_bite_schema'.
dim_delivery_partner pushed to SQL Server schema 'quick_bite_schema'.
dim_menu_item pushed to SQL Server schema 'quick_bite_schema'.
dim_restaurant pushed to SQL Server schema 'quick_bite_schema'.
fact_delivery_performance pushed to SQL Server schema 'quick_bite_schema'.
fact_orders pushed to SQL Server schema 'quick_bite_schema'.
fact_order_items pushed to SQL Server schema 'quick_bite_schema'.
fact_ratings pushed to SQL Server schema 'quick_bite_schema'.
ETL Process Completed Successfully!


### Optional step, if by mistakenly user ran the script multiple times and then wants to clear the duplicates

In [99]:
'''

for table_name in cleaned_tables.keys():
    # Fetch the table back from SQL
    df_db = pd.read_sql_table(table_name, con=engine_db, schema=schema_name)
    
    # Remove rows where all columns are NaN
    df_db_cleaned = df_db.dropna(how="all")
    
    # Remove exact duplicate rows (all columns identical)
    df_db_cleaned = df_db_cleaned.drop_duplicates()
    
    # Replace the table in SQL with cleaned data
    df_db_cleaned.to_sql(
        name=table_name,
        con=engine_db,
        schema=schema_name,
        if_exists='replace',  # Replace the existing table
        index=False
    )
    
    print(f"Table '{table_name}' cleaned and replaced in SQL (NaNs & exact duplicates removed).")


'''

'\n\nfor table_name in cleaned_tables.keys():\n    # Fetch the table back from SQL\n    df_db = pd.read_sql_table(table_name, con=engine_db, schema=schema_name)\n    \n    # Remove rows where all columns are NaN\n    df_db_cleaned = df_db.dropna(how="all")\n    \n    # Remove exact duplicate rows (all columns identical)\n    df_db_cleaned = df_db_cleaned.drop_duplicates()\n    \n    # Replace the table in SQL with cleaned data\n    df_db_cleaned.to_sql(\n        name=table_name,\n        con=engine_db,\n        schema=schema_name,\n        if_exists=\'replace\',  # Replace the existing table\n        index=False\n    )\n    \n    print(f"Table \'{table_name}\' cleaned and replaced in SQL (NaNs & exact duplicates removed).")\n\n\n'