In [102]:
import pandas as pd
import time
from sqlalchemy import create_engine, text

In [103]:
db_connection = create_engine('mysql+pymysql://root:@localhost/elt_sales_db')

def run_sql(query, description):
    start_time = time.time()
    with db_connection.connect() as conn:
        conn.execute(text(query))
        conn.commit()
    end_time = time.time()
    print(f"âœ… {description} selesai dalam {end_time - start_time:.4f} detik.")

print("Siap melakukan transformasi ELT!")

Siap melakukan transformasi ELT!


In [104]:
create_table_sql = text("""
CREATE TABLE IF NOT EXISTS sales_processed (
    order_id VARCHAR(50) PRIMARY KEY,
    region VARCHAR(50),
    country VARCHAR(50),
    item_type VARCHAR(50),
    sales_channel VARCHAR(50),
    order_priority VARCHAR(10),
    order_date DATE,
    ship_date DATE,
    units_sold INT,
    unit_price DECIMAL(10,2),
    unit_cost DECIMAL(10,2),
    total_revenue DECIMAL(15,2),
    total_cost DECIMAL(15,2),
    total_profit DECIMAL(15,2)
);
""")

with db_connection.connect() as conn:
    conn.execute(create_table_sql)
    conn.commit()

print("Tabel sales_processed berhasil dibuat!")

Tabel sales_processed berhasil dibuat!


In [105]:
from sqlalchemy import text

with db_connection.connect() as conn:
    conn.execute(text("TRUNCATE TABLE sales_processed;"))
    conn.commit()

print("Tabel sales_processed dikosongkan!")


Tabel sales_processed dikosongkan!


In [106]:

insert_sql = text("""
INSERT INTO sales_processed (
    order_id, region, country, item_type, sales_channel, order_priority, 
    order_date, ship_date, units_sold, unit_price, unit_cost, 
    total_revenue, total_cost, total_profit
)
SELECT
    c.order_id,
    COALESCE(c.Region, a.Region) AS region,
    c.Country,
    c.item_type,
    c.sales_channel,
    c.order_priority,
    STR_TO_DATE(c.order_date, '%%m/%%d/%%Y'),
    STR_TO_DATE(c.ship_date, '%%m/%%d/%%Y'),
    c.units_sold,
    c.unit_price,
    c.unit_cost,
    c.total_revenue,
    c.total_cost,
    c.total_profit
FROM raw_sales_csv c
LEFT JOIN raw_sales_api a
    ON c.order_id = a.order_id;
""")


In [107]:
dedup_sql = text("""
DELETE sp1
FROM sales_processed sp1
JOIN sales_processed sp2
  ON sp1.order_id = sp2.order_id
 AND sp1.order_date > sp2.order_date;
""")

with db_connection.connect() as conn:
    conn.execute(dedup_sql)
    conn.commit()

print("Duplicate data berdasarkan order_id berhasil dihapus!")


Duplicate data berdasarkan order_id berhasil dihapus!


In [108]:
missing_sql = text("""
UPDATE sales_processed
SET
    region = COALESCE(Region, 'unknown'),
    country = COALESCE(Country, 'unknown'),
    item_type = COALESCE(item_type, 'unknown'),
    sales_channel = COALESCE(sales_channel, 'unknown'),
    order_priority = COALESCE(order_priority, 'unknown'),
    units_sold = COALESCE(units_sold, 0),
    unit_price = COALESCE(unit_price, 0),
    unit_cost = COALESCE(unit_cost, 0),
    total_revenue = COALESCE(total_revenue, 0),
    total_cost = COALESCE(total_cost, 0),
    total_profit = COALESCE(total_profit, 0);
""")

with db_connection.connect() as conn:
    conn.execute(missing_sql)
    conn.commit()

print("Missing value berhasil ditangani!")


Missing value berhasil ditangani!


In [109]:
date_check_sql = text("""
SELECT 
    COUNT(*) AS invalid_order_date
FROM sales_processed
WHERE order_date IS NULL;
""")

with db_connection.connect() as conn:
    result = conn.execute(date_check_sql)
    print("Order Date NULL count:", result.fetchone()[0])


Order Date NULL count: 0


In [110]:
from sqlalchemy import text
from sqlalchemy.exc import OperationalError

alter_feature_sql = text("""
ALTER TABLE sales_processed
ADD COLUMN cost_per_unit DECIMAL(15,2),
ADD COLUMN net_profit_ratio DECIMAL(15,4),
ADD COLUMN shipping_speed_category VARCHAR(20),
ADD COLUMN order_size_category VARCHAR(20),
ADD COLUMN margin_category VARCHAR(20);
""")

try:
    with db_connection.connect() as conn:
        conn.execute(alter_feature_sql)
        conn.commit()
    print("Kolom turunan berhasil ditambahkan!")
except OperationalError as e:
    print("Kolom kemungkinan sudah ada, lanjut proses.")


Kolom turunan berhasil ditambahkan!


In [111]:
update_cost_sql = text("""
UPDATE sales_processed
SET cost_per_unit = total_cost / NULLIF(units_sold, 0);
""")

with db_connection.connect() as conn:
    conn.execute(update_cost_sql)
    conn.commit()

print("cost_per_unit berhasil dihitung!")


cost_per_unit berhasil dihitung!


In [112]:
from sqlalchemy import text

update_margin_category_sql = text("""
UPDATE sales_processed
SET margin_category =
    CASE
        WHEN total_revenue = 0 THEN 'low'
        WHEN (total_profit / total_revenue) < 0.10 THEN 'low'
        WHEN (total_profit / total_revenue) <= 0.30 THEN 'medium'
        ELSE 'high'
    END;
""")

with db_connection.connect() as conn:
    conn.execute(update_margin_category_sql)
    conn.commit()

print("margin_category berhasil dihitung!")


margin_category berhasil dihitung!


In [113]:
update_profit_ratio_sql = text("""
UPDATE sales_processed
SET net_profit_ratio = total_profit / NULLIF(total_cost, 0);
""")

with db_connection.connect() as conn:
    conn.execute(update_profit_ratio_sql)
    conn.commit()

print("net_profit_ratio berhasil dihitung!")


net_profit_ratio berhasil dihitung!


In [114]:
update_shipping_sql = text("""
UPDATE sales_processed
SET shipping_speed_category =
    CASE
        WHEN DATEDIFF(ship_date, order_date) <= 3 THEN 'fast'
        WHEN DATEDIFF(ship_date, order_date) <= 7 THEN 'normal'
        ELSE 'slow'
    END;
""")

with db_connection.connect() as conn:
    conn.execute(update_shipping_sql)
    conn.commit()

print("shipping_speed_category berhasil dibuat!")


shipping_speed_category berhasil dibuat!


In [115]:
update_order_size_sql = text("""
UPDATE sales_processed
SET order_size_category =
    CASE
        WHEN units_sold < 50 THEN 'small'
        WHEN units_sold <= 200 THEN 'medium'
        ELSE 'large'
    END;
""")

with db_connection.connect() as conn:
    conn.execute(update_order_size_sql)
    conn.commit()

print("order_size_category berhasil dibuat!")


order_size_category berhasil dibuat!


In [116]:
query_cost_efficiency = text("""
SELECT 
    order_size_category,
    AVG(cost_per_unit) AS avg_cost_per_unit
FROM sales_processed
GROUP BY order_size_category;
""")

with db_connection.connect() as conn:
    result = conn.execute(query_cost_efficiency)
    print("ðŸ“Š Average Cost per Unit by Order Size")
    for row in result.fetchall():
        print(row)


ðŸ“Š Average Cost per Unit by Order Size


In [117]:
from sqlalchemy import text

query_margin_dist = text("""
SELECT
    margin_category,
    COUNT(*) AS total_orders,
    SUM(total_revenue) AS total_revenue,
    SUM(total_profit) AS total_profit
FROM sales_processed
GROUP BY margin_category;
""")

with db_connection.connect() as conn:
    print("ðŸ“Š Distribusi Profit Berdasarkan Margin Category")
    for row in conn.execute(query_margin_dist):
        print(row)


ðŸ“Š Distribusi Profit Berdasarkan Margin Category


In [118]:
query_cost_vs_size = text("""
SELECT
    order_size_category,
    AVG(cost_per_unit) AS avg_cost_per_unit,
    COUNT(*) AS total_orders
FROM sales_processed
GROUP BY order_size_category;
""")

with db_connection.connect() as conn:
    print("ðŸ“Š Efisiensi Biaya per Ukuran Order")
    for row in conn.execute(query_cost_vs_size):
        print(row)


ðŸ“Š Efisiensi Biaya per Ukuran Order


In [119]:
query_profit_vs_shipping = text("""
SELECT
    shipping_speed_category,
    AVG(net_profit_ratio) AS avg_net_profit_ratio,
    COUNT(*) AS total_orders
FROM sales_processed
GROUP BY shipping_speed_category;
""")

with db_connection.connect() as conn:
    print("ðŸ“Š Profitabilitas Berdasarkan Kecepatan Pengiriman")
    for row in conn.execute(query_profit_vs_shipping):
        print(row)


ðŸ“Š Profitabilitas Berdasarkan Kecepatan Pengiriman


In [120]:
query_margin_size = text("""
SELECT
    margin_category,
    order_size_category,
    COUNT(*) AS total_orders,
    SUM(total_revenue) AS total_revenue
FROM sales_processed
GROUP BY margin_category, order_size_category
ORDER BY margin_category, order_size_category;
""")

with db_connection.connect() as conn:
    print("ðŸ“Š Margin Category vs Order Size")
    for row in conn.execute(query_margin_size):
        print(row)

ðŸ“Š Margin Category vs Order Size


In [121]:
query_shipping_quality = text("""
SELECT
    shipping_speed_category,
    COUNT(*) AS total_orders,
    AVG(total_profit) AS avg_profit
FROM sales_processed
GROUP BY shipping_speed_category;
""")

with db_connection.connect() as conn:
    print("ðŸ“Š Kualitas Pengiriman vs Profit")
    for row in conn.execute(query_shipping_quality):
        print(row)


ðŸ“Š Kualitas Pengiriman vs Profit


In [122]:
from sqlalchemy import text

with db_connection.connect() as conn:
    conn.execute(text("""
    CREATE TABLE IF NOT EXISTS dim_date (
        order_date DATE,
        ship_date DATE
    );
    """))
    conn.commit()

print("dim_date created")


dim_date created


In [123]:
from sqlalchemy import text

with db_connection.connect() as conn:
    conn.execute(text("""
    CREATE TABLE IF NOT EXISTS dim_date (
        order_date DATE,
        ship_date DATE
    );
    """))
    conn.commit()

print("dim_date created")


dim_date created


In [124]:
with db_connection.connect() as conn:
    conn.execute(text("""
    CREATE TABLE IF NOT EXISTS dim_region (
        region VARCHAR(100),
        country VARCHAR(100)
    );
    """))
    conn.commit()

print("dim_region created")


dim_region created


In [125]:
with db_connection.connect() as conn:
    conn.execute(text("""
    INSERT INTO dim_region (Region, Country)
    SELECT DISTINCT
        Region,
        Country
    FROM sales_processed;
    """))
    conn.commit()

print("dim_region populated")


dim_region populated


In [126]:
with db_connection.connect() as conn:
    conn.execute(text("""
    CREATE TABLE IF NOT EXISTS dim_product (
        item_type VARCHAR(100),
        unit_price DECIMAL(15,2),
        unit_cost DECIMAL(15,2)
    );
    """))
    conn.commit()

print("dim_product created")


dim_product created


In [127]:
with db_connection.connect() as conn:
    conn.execute(text("""
    INSERT INTO dim_product (item_type, unit_price, unit_cost)
    SELECT DISTINCT
        item_type,
        unit_price,
        unit_cost
    FROM sales_processed;
    """))
    conn.commit()

print("dim_product populated")


dim_product populated


In [128]:
with db_connection.connect() as conn:
    conn.execute(text("""
    CREATE TABLE IF NOT EXISTS dim_order_attr (
        shipping_speed_category VARCHAR(20),
        order_size_category VARCHAR(20),
        margin_category VARCHAR(20)
    );
    """))
    conn.commit()

print("dim_order_attr created")


dim_order_attr created


In [129]:
with db_connection.connect() as conn:
    conn.execute(text("""
    INSERT INTO dim_order_attr (
        shipping_speed_category,
        order_size_category,
        margin_category
    )
    SELECT DISTINCT
        shipping_speed_category,
        order_size_category,
        margin_category
    FROM sales_processed;
    """))
    conn.commit()

print("dim_order_attr populated")


dim_order_attr populated


In [130]:
with db_connection.connect() as conn:
    conn.execute(text("""
    CREATE TABLE IF NOT EXISTS fact_sales (
        order_id VARCHAR(50) PRIMARY KEY,
        `Unit Sold` INT,
        total_revenue DECIMAL(15,2),
        total_cost DECIMAL(15,2),
        total_profit DECIMAL(15,2)
    );
    """))
    conn.commit()

print("fact_sales created")

fact_sales created


In [131]:
from sqlalchemy import text

with db_connection.connect() as conn:
    conn.execute(text("""
    INSERT INTO fact_sales (
        order_id,
        `Unit Sold`,
        total_revenue,
        total_cost,
        total_profit
    )
    SELECT
        order_id,
        units_sold,
        total_revenue,
        total_cost,
        total_profit
    FROM sales_processed;
    """))
    conn.commit()

print("fact_sales populated")


fact_sales populated
