In [60]:
import pandas as pd
import os
import polars as pl

## Extract

In [67]:
def union_data(folder):
    customers = []
    coupons = []
    login_attempts = []
    order_items = []
    orders = []
    product_category = []
    products = []
    suppliers = []

    for filename in os.listdir(folder):
        path_file = os.path.join(folder, filename)
        if filename.startswith("customer") and filename.endswith(".csv"):
            df = pd.read_csv(path_file)
            customers.append(df)
        elif filename.startswith("coupons") and filename.endswith(".json"):
            df = pd.read_json(path_file)
            coupons.append(df)
        elif filename.startswith("login_attempts") and filename.endswith(".json"):
            df = pd.read_json(path_file)
            login_attempts.append(df)
        elif filename.startswith("order_item") and filename.endswith(".avro"):
            df = pl.read_avro(path_file)
            df = df.to_pandas()
            order_items.append(df)
        elif filename.startswith("order") and filename.endswith(".parquet"):
            df = pd.read_parquet(path_file)
            orders.append(df)
        elif filename.startswith("product_category") and filename.endswith(".xls"):
            df = pd.read_excel(path_file)
            product_category.append(df)
        elif filename.startswith("product") and filename.endswith(".xls"):
            df = pd.read_excel(path_file)
            products.append(df)
        elif filename.startswith("supplier") and filename.endswith(".xls"):
            df = pd.read_excel(path_file)
            suppliers.append(df)
        else:
            print(f"file {filename} not found")
    
    return {
    "customers": customers,
    "coupons": coupons,
    "login_attempts": login_attempts,
    "order_items": order_items,
    "orders": orders,
    "product_category": product_category,
    "products": products,
    "suppliers": suppliers
    }

union_data = union_data("data")
customers = union_data["customers"]
coupons = union_data["coupons"]
login_attempts = union_data["login_attempts"]
order_items = union_data["order_items"]
orders = union_data["orders"]
product_category = union_data["product_category"]
products = union_data["products"]
suppliers = union_data["suppliers"]

## Transform

In [None]:
# customers = pd.concat(customers)
# customers.drop_duplicates(inplace=True)
# customers.dropna(inplace=True)
# if 'Unnamed: 0' in customers.columns:
#     customers.drop(columns='Unnamed: 0', inplace=True)

# coupons = pd.concat(coupons)
# coupons.drop_duplicates(inplace=True)
# coupons.dropna(inplace=True)
# if 'Unnamed: 0' in coupons.columns:
#     coupons.drop(columns='Unnamed: 0', inplace=True)

# login_atttempts = pd.concat(login_atttempts)
# login_atttempts.drop_duplicates(inplace=True)
# login_atttempts.dropna(inplace=True)
# if 'Unnamed: 0' in login_atttempts.columns:
#     login_atttempts.drop(columns='Unnamed: 0', inplace=True)

# order_items = pd.concat(order_items)
# order_items.drop_duplicates(inplace=True)
# order_items.dropna(inplace=True)
# order_items['amount'] = order_items['amount'].astype(int)
# if 'Unnamed: 0' in order_items.columns:
#     order_items.drop(columns='Unnamed: 0', inplace=True)

# orders = pd.concat(orders)
# orders.drop_duplicates(inplace=True)
# orders.dropna(inplace=True)
# if 'Unnamed: 0' in orders.columns:
#     orders.drop(columns='Unnamed: 0', inplace=True)

# product_category = pd.concat(product_category)
# product_category.drop_duplicates(inplace=True)
# product_category.dropna(inplace=True)
# if 'Unnamed: 0' in product_category.columns:
#     product_category.drop(columns='Unnamed: 0', inplace=True)

# products = pd.concat(products)
# products.drop_duplicates(inplace=True)
# products.dropna(inplace=True)
# products.drop(columns='Unnamed: 0', inplace=True)
# if 'Unnamed: 0' in products.columns:
#     products.drop(columns='Unnamed: 0', inplace=True)

# suppilers = pd.concat(suppliers)
# suppilers.drop_duplicates(inplace=True)
# suppilers.dropna(inplace=True)
# if 'Unnamed: 0' in suppilers.columns:
#     suppilers.drop(columns='Unnamed: 0', inplace=True)

In [68]:
def clean_dataframe(df):
    df = pd.concat(df)
    df.drop_duplicates(inplace=True)
    df.dropna(inplace=True)
    if 'Unnamed: 0' in df.columns:
        df.drop(columns='Unnamed: 0', inplace=True)
    return df

customers = clean_dataframe(customers)
coupons = clean_dataframe(coupons)
login_attempts = clean_dataframe(login_attempts)

order_items = clean_dataframe(order_items)
order_items['coupon_id'] = order_items['coupon_id'].fillna(0).astype(int)

orders = clean_dataframe(orders)
product_category = clean_dataframe(product_category)
products = clean_dataframe(products)
suppliers = clean_dataframe(suppliers)

In [75]:
# customers.head()
# coupons.head()
login_attempts.head()
# orders.head()
# order_items.head()
# product_category.head()
# products.head()
# suppliers.shape

Unnamed: 0,id,customer_id,login_successful,attempted_at
0,0,7090,True,2023-02-12 23:24:23
1,1,6752,True,2023-02-12 23:24:23
2,2,6220,True,2023-08-20 13:06:26
3,3,8017,True,2023-08-20 13:06:26
4,4,2404,True,2023-08-01 06:14:06


## Load to Data Warehouse

In [52]:
import psycopg2

conn = psycopg2.connect(
    dbname="data_penjualan",
    user="postgres",
    password="yanns21_",
    host="127.0.0.1",
    port="5432"
)
cur = conn.cursor()

In [76]:
create_customers_sql = """
    CREATE TABLE IF NOT EXISTS customers (
        customer_id INT PRIMARY KEY,
        first_name VARCHAR(50),
        last_name VARCHAR(50),
        gender VARCHAR(50),
        address TEXT,
        zip_code VARCHAR(50)
    );
"""

create_coupons_sql = """
    CREATE TABLE IF NOT EXISTS coupons (
        coupon_id INT PRIMARY KEY,
        dsicont_percent INT
    );
"""

create_login_attempts_sql = """
    CREATE TABLE IF NOT EXISTS login_attempts (
        login_attempts_id INT PRIMARY KEY,
        customer_id INT,
        login_status BOOLEAN,
        attempted_at TIMESTAMP
        
    );
"""

create_order_items_sql = """
    CREATE TABLE IF NOT EXISTS order_items (
        order_item_id INT PRIMARY KEY,
        order_id INT,
        product_id INT,
        amount INT,
        coupon_id INT
    );
"""

create_orders_sql = """
    CREATE TABLE IF NOT EXISTS orders (
        order_id INT PRIMARY KEY,
        customer_id INT,
        order_status VARCHAR(24),
        created_at DATE
    );
"""

create_product_category_sql = """
    CREATE TABLE IF NOT EXISTS product_category (
        product_category_id INT PRIMARY KEY,
        category_name VARCHAR(50)
    );
"""

create_products_sql = """
    CREATE TABLE IF NOT EXISTS products (
        product_id INT PRIMARY KEY,
        product_name VARCHAR(255),
        price FLOAT,
        category_id INT,
        supplier_id INT
    );
"""

create_suppliers_sql = """
    CREATE TABLE IF NOT EXISTS suppliers (
        supplier_id INT PRIMARY KEY,
        supplier_name VARCHAR(50),
        supplier_country VARCHAR(24)
    );
"""

table_list = [
    create_customers_sql, 
    create_coupons_sql, 
    create_login_attempts_sql, 
    create_order_items_sql, 
    create_orders_sql,
    create_product_category_sql,
    create_products_sql,
    create_suppliers_sql
    ]


for i, query in enumerate(table_list, start=1):
    try:
        cur.execute(query)
        conn.commit()
        print(f"✅ Query #{i} berhasil dieksekusi")
    except Exception as e:
        conn.rollback()  # Bersihkan transaksi gagal
        print(f"❌ Error in query #{i}:\n→ {e}")


✅ Query #1 berhasil dieksekusi
✅ Query #2 berhasil dieksekusi
✅ Query #3 berhasil dieksekusi
✅ Query #4 berhasil dieksekusi
✅ Query #5 berhasil dieksekusi
✅ Query #6 berhasil dieksekusi
✅ Query #7 berhasil dieksekusi
✅ Query #8 berhasil dieksekusi


## Insert Data

In [None]:
dataframes = {
    # "customers": customers,
    # "coupons": coupons,
    # "login_attempts": login_attempts,
    # "order_items": order_items,
    # "orders": orders,
    # "product_category": product_category,
    # "products": products,
    # "suppliers": suppliers
}

def insert_multiple_dataframes_to_postgres(df_dict, conn):
    from io import StringIO

    for table_name, df in df_dict.items():
        buffer = StringIO()
        df.to_csv(buffer, index=False, header=False)
        buffer.seek(0)

        try:
            with conn.cursor() as cur:
                cur.copy_expert(
                    sql=f"COPY {table_name} FROM STDIN WITH CSV",
                    file=buffer
                )
            conn.commit()
            print(f"✅ Inserted into '{table_name}' ({len(df)} rows)")
        except Exception as e:
            conn.rollback()
            print(f"❌ Failed to insert into '{table_name}':\n→ {e}")

insert_multiple_dataframes_to_postgres(dataframes, conn)

✅ Inserted into 'login_attempts' (1000000 rows)
