In [None]:
!pip install pandas sqlalchemy psycopg2-binary

In [None]:
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy import text


In [None]:
menu_item = pd.read_csv(r"C:\Users\personal\Downloads\Data Engineer Dataset 2\menu_items.csv")
order_item = pd.read_csv(r"C:\Users\personal\Downloads\Data Engineer Dataset 2\order_item.csv")
orders = pd.read_csv(r"C:\Users\personal\Downloads\Data Engineer Dataset 2\orders.csv")

In [None]:
menu_item.info()
order_item.info()
orders.info()

In [None]:
# Step 2: Connect to PostgreSQL
engine = create_engine("postgresql+psycopg2://your_user:your_password@localhost:5432/your_db")

In [None]:
# Step 3: Create schema and table explicitly
with engine.connect() as conn:
    # Create schema if not exists
    conn.execute(text("CREATE SCHEMA IF NOT EXISTS staging;"))
    
    # Drop the table if it exists (optional)
    conn.execute(text("DROP TABLE IF EXISTS staging.raw_menu_item;"))
    
    # Create the staging table manually
    create_table_sql = """
    CREATE TABLE staging.raw_menu_item (
        item_id TEXT NOT NULL,
        item_name TEXT NOT NULL,
        category TEXT,
        description TEXT
    );
    """

    # Drop the table if it exists (optional)
    conn.execute(text("DROP TABLE IF EXISTS staging.raw_orders;"))
    
    # Create the staging table manually
    create_table_sql = """
    CREATE TABLE staging.raw_orders (
        order_id TEXT NOT NULL,
        customer_id TEXT NOT NULL,
        order_date DATE,
        total_amount NUMERIC
    );
    """


    # Drop the table if it exists (optional)
    conn.execute(text("DROP TABLE IF EXISTS staging.raw_order_item;"))
    
    # Create the staging table manually
    create_table_sql = """
    CREATE TABLE staging.raw_order_item (
        order_id TEXT NOT NULL,
        item_id TEXT NOT NULL,
        quantity INT,
        unit_price INT
    );
    """
    
    conn.execute(text(create_table_sql))
    conn.commit()

In [None]:
# Write raw data to the staging table
menu_item.to_sql("staging.raw_menu_item", engine, if_exists="replace", index=False)
order_item.to_sql("staging.raw_order_item", engine, if_exists="replace", index=False)
orders.to_sql("staging.raw_orders", engine, if_exists="replace", index=False)

In [None]:
# --- Step 3: Data Loading
# Example: select only employees with salary > 55000 and calculate annual bonus = 10% salary

transformation_sql = """
CREATE TABLE IF NOT EXISTS transformed_table AS
SELECT 
    o.order_id as order_id,
    m.item_id as item_id,
    item_name,
    category,
    description,
    quantity,
    unit_price,
    customer_id,
    order_date,
    total_amount
FROM staging.raw_menu_item m
JOIN staging.raw_order_item oi
ON m.item_id = oi.item_id
JOIN staging.raw_orders o
ON o.order_id = oi.order_id
"""

# --- Step 4: Execute transformation and write to final table
with engine.connect() as conn:
    conn.execute(text("DROP TABLE IF EXISTS transformed_table"))  # optional: reset
    conn.execute(text(transformation_sql))
    conn.commit()