In [None]:
# %pip install duckdb pandas numpy 

In [1]:
import duckdb
import pandas as pd
from datetime import datetime
import pickle

# ETL Pipeline

## Load Olist Dataset from Kaggle

In [3]:
# Lewati jika sudah ada cache pickle

import kagglehub as kh
import os

if not os.path.exists('dataset_dict.pkl'):
    path = kh.dataset_download("olistbr/brazilian-ecommerce")

    print("Path to dataset files:", path)

    customers_csv = pd.read_csv(os.path.join(path,'olist_customers_dataset.csv'))
    geo_csv = pd.read_csv(os.path.join(path,'olist_geolocation_dataset.csv'))
    items_csv = pd.read_csv(os.path.join(path,'olist_order_items_dataset.csv'))
    payments_csv = pd.read_csv(os.path.join(path,'olist_order_payments_dataset.csv'))
    reviews_csv = pd.read_csv(os.path.join(path,'olist_order_reviews_dataset.csv'))
    orders_csv = pd.read_csv(os.path.join(path,'olist_orders_dataset.csv'))
    products_csv = pd.read_csv(os.path.join(path,'olist_products_dataset.csv'))
    sellers_csv = pd.read_csv(os.path.join(path,'olist_sellers_dataset.csv'))
    category_csv = pd.read_csv(os.path.join(path,'product_category_name_translation.csv'))

    dataset_dict = {
      'customers': customers_csv,
      'geo': geo_csv,
      'items': items_csv,
      'payments': payments_csv,
      'reviews': reviews_csv,
      'orders': orders_csv,
      'products': products_csv,
      'sellers': sellers_csv,
      'category': category_csv
    }

    # Save the dataset_dict to a pickle file
    with open('dataset_dict.pkl', 'wb') as f:
        pickle.dump(dataset_dict, f)

## Load Olist Dataset from Cache (Pickle)

In [2]:
# Load from cache (the pickle file)
with open('dataset_dict.pkl', 'rb') as f:
    data_from_pkl = pickle.load(f)

print(data_from_pkl.keys())

dict_keys(['customers', 'geo', 'items', 'payments', 'reviews', 'orders', 'products', 'sellers', 'category'])


## Pipeline Class

In [3]:
class ETLpipeline:
    def __init__(self, db="brazilian_ecommerce.db"):
        self.conn = duckdb.connect(database=db, read_only=False)
        self.initialized = False
        self.log_table_setup()

    def log_table_setup(self):
        """Membuat tabel log untuk melacak proses ETL"""
        self.conn.execute("""
        CREATE TABLE IF NOT EXISTS etl_log (
            log_id INTEGER PRIMARY KEY,
            process_name VARCHAR,
            start_time TIMESTAMP,
            end_time TIMESTAMP,
            records_processed INTEGER,
            status VARCHAR,
            message VARCHAR
        )
        """)

    def log_process_start(self, process_name):
        """Mencatat mulainya proses ETL"""
        log_id = self.conn.execute("SELECT COALESCE(MAX(log_id), 0) + 1 FROM etl_log").fetchone()[0]
        self.conn.execute("""
        INSERT INTO etl_log (log_id, process_name, start_time, status)
        VALUES (?, ?, CURRENT_TIMESTAMP, 'RUNNING')
        """, [log_id, process_name])
        return log_id

    def log_process_end(self, log_id, records=0, status='SUCCESS', message=None):
        """Mencatat selesainya proses ETL"""
        self.conn.execute("""
        UPDATE etl_log
        SET end_time = CURRENT_TIMESTAMP,
            records_processed = ?,
            status = ?,
            message = ?
        WHERE log_id = ?
        """, [records, status, message, log_id])

    def initialize_warehouse_schema(self):
        if self.initialized:
            return

        log_id = self.log_process_start("initialize_warehouse_schema")
        try:
            self.conn.execute("DROP TABLE IF EXISTS dim_payments")
            self.conn.execute("DROP TABLE IF EXISTS fact_order_items")
            self.conn.execute("DROP TABLE IF EXISTS fact_orders")
            self.conn.execute("DROP TABLE IF EXISTS dim_products")
            self.conn.execute("DROP TABLE IF EXISTS dim_sellers")
            self.conn.execute("DROP TABLE IF EXISTS dim_customers")
            self.conn.execute("DROP TABLE IF EXISTS dim_date")
            self.conn.execute("DROP TABLE IF EXISTS dim_category")

            self.conn.execute("""
            CREATE OR REPLACE TABLE dim_date AS
            WITH date_range AS (
                SELECT unnest(generate_series('2016-09-01'::DATE, '2020-05-01'::DATE, INTERVAL '1 day')) as date
            )
            SELECT
                (EXTRACT(YEAR FROM date) * 10000 + EXTRACT(MONTH FROM date) * 100 + EXTRACT(DAY FROM date))::INTEGER AS date_id,
                date as date_value,
                EXTRACT(DAY FROM date) AS day,
                EXTRACT(MONTH FROM date) AS month,
                strftime(date, '%B') AS month_name,
                EXTRACT(YEAR FROM date) AS year,
                strftime(date, '%A') AS day_name,
                EXTRACT(DOW FROM date) AS day_of_week,
                EXTRACT(QUARTER FROM date) AS quarter
            FROM date_range;
            """)

            # Add primary key to dim_date table
            self.conn.execute("ALTER TABLE dim_date ADD PRIMARY KEY (date_id);")

            self.conn.execute("""
            -- Tabel Dimensi
            CREATE OR REPLACE TABLE dim_customers (
                customer_id VARCHAR(50) PRIMARY KEY,
                customer_unique_id VARCHAR(50) NOT NULL,
                customer_city VARCHAR(100),
                customer_state VARCHAR(50),
                customer_zip_code_prefix VARCHAR(20)
            );

            CREATE OR REPLACE TABLE dim_sellers (
                seller_id VARCHAR(50) PRIMARY KEY,
                seller_zip_code_prefix VARCHAR(20),
                seller_city VARCHAR(100),
                seller_state VARCHAR(50)
            );

            -- scd type 2
            CREATE OR REPLACE TABLE dim_products (
                produk_key INT PRIMARY KEY,
                product_id VARCHAR(50) UNIQUE NOT NULL,
                product_category_name VARCHAR(100),
                product_category_name_english VARCHAR(100),
                product_weight_g DECIMAL(10,2),
                product_length_cm DECIMAL(10,2),
                product_height_cm DECIMAL(10,2),
                product_width_cm DECIMAL(10,2),
                effective_date DATE NOT NULL,
                expiration_date DATE,
                current_flag BOOLEAN DEFAULT TRUE
            );

            -- Tabel Fakta Pemesanan
            CREATE OR REPLACE TABLE fact_orders (
                order_id VARCHAR(50) PRIMARY KEY,
                customer_id VARCHAR(50) NOT NULL,
                date_id INT NOT NULL,
                order_status VARCHAR(50),
                order_purchase_timestamp DATETIME NOT NULL,
                order_approved_at DATETIME,
                order_delivered_customer_date DATETIME,
                order_estimated_delivery_date DATETIME,
                sales_amount DECIMAL(10,2),
                FOREIGN KEY (customer_id) REFERENCES dim_customers(customer_id),
                FOREIGN KEY (date_id) REFERENCES dim_date(date_id)
            );

            -- Tabel Fakta Item Pemesanan
            CREATE OR REPLACE TABLE fact_order_items (
                order_id VARCHAR(50) NOT NULL,
                order_item_id VARCHAR(50) NOT NULL,
                product_id VARCHAR(50) NOT NULL,
                seller_id VARCHAR(50) NOT NULL,
                date_id INT NOT NULL,
                shipping_limit_date DATETIME,
                price DECIMAL(10,2),
                freight_value DECIMAL(10,2),
                PRIMARY KEY (order_id, order_item_id),
                FOREIGN KEY (order_id) REFERENCES fact_orders(order_id),
                FOREIGN KEY (seller_id) REFERENCES dim_sellers(seller_id),
                FOREIGN KEY (product_id) REFERENCES dim_products(product_id),
                FOREIGN KEY (date_id) REFERENCES dim_date(date_id)
            );

            -- DuckDB belum mendukung ALTER TABLE ADD FOREIGN KEY
            -- Sehingga dimensi payments harus setelah fact_orders karena foreign key
            CREATE OR REPLACE TABLE dim_payments (
                payment_id INT PRIMARY KEY,
                order_id VARCHAR(50) NOT NULL,
                payment_sequential INT,
                payment_type VARCHAR(50),
                payment_installments INT,
                payment_value DECIMAL(10,2),
                FOREIGN KEY (order_id) REFERENCES fact_orders(order_id)
            );
            """)

            self.initialized = True
            self.log_process_end(log_id, status='SUCCESS', message='Data warehouse schema initialized')

        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise


    def extract(self):
        log_id = self.log_process_start("extract")

        customers_df = data_from_pkl['customers']
        sellers_df = data_from_pkl['sellers']
        products_df = data_from_pkl['products']
        category_df = data_from_pkl['category']
        payments_df = data_from_pkl['payments']
        orders_df = data_from_pkl['orders']
        items_df = data_from_pkl['items']

        # Extracting data from packle file
        try:
            # Menggunakan staging table dari DuckDB
            self.conn.execute("""
                CREATE OR REPLACE TABLE stg_customers AS SELECT * FROM customers_df;
                CREATE OR REPLACE TABLE stg_sellers AS SELECT * FROM sellers_df;
                CREATE OR REPLACE TABLE stg_products AS SELECT * FROM products_df;
                CREATE OR REPLACE TABLE stg_payments AS SELECT * FROM payments_df;
                CREATE OR REPLACE TABLE stg_orders AS SELECT * FROM orders_df;
                CREATE OR REPLACE TABLE stg_items AS SELECT * FROM items_df;
                CREATE OR REPLACE TABLE stg_category AS SELECT * FROM category_df;
            """)

            # Logging
            total_records = 0
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_customers").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_sellers").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_products").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_payments").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_orders").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_items").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_category").fetchone()[0]

            self.log_process_end(log_id, records=total_records, status='SUCCESS')
            # print(f"Extract total records: {total_records}")
            return True

        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise

    def transform(self):
        log_id = self.log_process_start("transform")

        try:
            # Dimensi Customers
            stg_customers = self.conn.execute("SELECT * FROM stg_customers").fetchdf()
            stg_customers = stg_customers.drop_duplicates(subset=['customer_id'])
            stg_customers_sorted = stg_customers[['customer_id', 'customer_unique_id', 'customer_city', 'customer_state', 'customer_zip_code_prefix']]
            self.conn.execute("CREATE OR REPLACE TABLE stg_dim_customers AS SELECT * FROM stg_customers_sorted")

            # Dimensi Sellers
            stg_sellers = self.conn.execute("SELECT * FROM stg_sellers").fetchdf()
            stg_sellers = stg_sellers.drop_duplicates(subset=['seller_id'])
            stg_sellers_sorted = stg_sellers[['seller_id', 'seller_city', 'seller_state', 'seller_zip_code_prefix']]
            self.conn.execute("CREATE OR REPLACE TABLE stg_dim_sellers AS SELECT * FROM stg_sellers_sorted")

            # Dimensi Products
            stg_products = self.conn.execute("SELECT * FROM stg_products").fetchdf()
            stg_products = stg_products.drop_duplicates(subset=['product_id'])

            stg_products = stg_products.drop('product_name_lenght', axis=1)
            stg_products = stg_products.drop('product_description_lenght', axis=1)
            stg_products = stg_products.drop('product_photos_qty', axis=1)

            for column in ['product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm']:
                mean_value = stg_products[column].mean()
                stg_products[column] = stg_products[column].fillna(mean_value)

            stg_products['product_key'] = range(1, len(stg_products) + 1)
            stg_products['effective_date'] = datetime.now().date()
            stg_products['expiration_date'] = None
            stg_products['current_flag'] = True

            stg_category = self.conn.execute("SELECT * FROM stg_category").fetchdf()
            for i in range(len(stg_category)):
                ctg = stg_category['product_category_name'][i]
                eng_ctg = stg_category['product_category_name_english'][i]
                if (ctg in stg_products['product_category_name'].unique()):
                    stg_products.loc[stg_products['product_category_name'] == ctg, 'product_category_name_english'] = eng_ctg

            stg_products_sorted = stg_products[['product_key', 'product_id',
                                                'product_category_name', 'product_category_name_english',
                                                'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm',
                                                'effective_date', 'expiration_date', 'current_flag']]
            self.conn.execute("CREATE OR REPLACE TABLE stg_dim_products AS SELECT * FROM stg_products_sorted")

            # Dimensi Payments
            stg_payments = self.conn.execute("SELECT * FROM stg_payments").fetchdf()
            stg_payments['payment_id'] = range(1, len(stg_payments) + 1)
            stg_payments_sorted = stg_payments[['payment_id', 'order_id', 'payment_sequential', 'payment_type', 'payment_installments', 'payment_value']]
            self.conn.execute("CREATE OR REPLACE TABLE stg_dim_payments AS SELECT * FROM stg_payments_sorted")

            # Fakta Orders
            stg_orders = self.conn.execute("SELECT * FROM stg_orders").fetchdf()
            stg_orders = stg_orders.drop('order_delivered_carrier_date', axis=1)

            stg_orders['date_id'] = pd.to_datetime(stg_orders['order_purchase_timestamp']).dt.strftime('%Y%m%d').astype(int)

            orders_cols = list(stg_orders.columns)
            customer_id_index = orders_cols.index('customer_id')
            orders_cols.insert(customer_id_index + 1, orders_cols.pop(orders_cols.index('date_id')))
            stg_orders_sorted = stg_orders[orders_cols]

            stg_items = self.conn.execute("SELECT * FROM stg_items").fetchdf()
            sales_amount_df = stg_items.groupby('order_id')['price'].sum().reset_index()
            sales_amount_df = sales_amount_df.rename(columns={'price': 'sales_amount'})
            stg_orders_sorted = pd.merge(stg_orders_sorted, sales_amount_df, on='order_id', how='left')
            self.conn.execute("CREATE OR REPLACE TABLE stg_fact_orders AS SELECT * FROM stg_orders_sorted")

            # Fakta Order Items
            stg_order_items = self.conn.execute("SELECT * FROM stg_items").fetchdf()
            stg_order_items['date_id'] = pd.to_datetime(stg_order_items['shipping_limit_date']).dt.strftime('%Y%m%d').astype(int)

            items_cols = list(stg_order_items.columns)
            product_id_index = items_cols.index('seller_id')
            items_cols.insert(product_id_index + 1, items_cols.pop(items_cols.index('date_id')))
            stg_order_items_sorted = stg_order_items[items_cols]
            self.conn.execute("CREATE OR REPLACE TABLE stg_fact_order_items AS SELECT * FROM stg_order_items_sorted")

            # Logging
            total_records = 0
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_dim_customers").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_dim_sellers").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_dim_products").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_dim_payments").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_fact_orders").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM stg_fact_order_items").fetchone()[0]
            self.log_process_end(log_id, records=total_records, status='SUCCESS')
            # print(f"Transform total records: {total_records}")

            return True

        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise

    def load(self):
        # Loading data into DuckDB
        log_id = self.log_process_start("load")
        try:
            stg_dim_customers = self.conn.execute("SELECT * FROM stg_dim_customers").fetchdf()
            stg_dim_sellers = self.conn.execute("SELECT * FROM stg_dim_sellers").fetchdf()
            stg_dim_products = self.conn.execute("SELECT * FROM stg_dim_products").fetchdf()
            stg_dim_payments = self.conn.execute("SELECT * FROM stg_dim_payments").fetchdf()
            stg_fact_orders = self.conn.execute("SELECT * FROM stg_fact_orders").fetchdf()
            stg_fact_order_items = self.conn.execute("SELECT * FROM stg_fact_order_items").fetchdf()

            self.conn.execute("INSERT INTO dim_customers SELECT * FROM stg_dim_customers")
            self.conn.execute("INSERT INTO dim_sellers SELECT * FROM stg_dim_sellers")
            self.conn.execute("INSERT INTO dim_products SELECT * FROM stg_dim_products")
            self.conn.execute("INSERT INTO fact_orders SELECT * FROM stg_fact_orders")
            self.conn.execute("INSERT INTO fact_order_items SELECT * FROM stg_fact_order_items")
            self.conn.execute("INSERT INTO dim_payments SELECT * FROM stg_dim_payments")

            total_records = 0
            total_records += self.conn.execute("SELECT COUNT(*) FROM dim_customers").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM dim_sellers").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM dim_products").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM dim_payments").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM fact_orders").fetchone()[0]
            total_records += self.conn.execute("SELECT COUNT(*) FROM fact_order_items").fetchone()[0]

            self.log_process_end(log_id, records=total_records, status='SUCCESS')
            # print(f"Load total records: {total_records}")

            return True

        except Exception as e:
            self.log_process_end(log_id, status='ERROR', message=str(e))
            raise

    def run_pipeline(self):
        self.initialize_warehouse_schema()
        self.extract()
        self.transform()
        self.load()

    def get_etl_log(self, limit=10):
        return self.conn.execute(f"""
            SELECT
                log_id,
                process_name,
                start_time,
                end_time,
                EXTRACT(EPOCH FROM (end_time - start_time)) as duration_seconds,
                records_processed,
                status,
                message
            FROM etl_log
            ORDER BY log_id DESC
            LIMIT {limit}
        """).fetchdf()
    
    def close_connection(self):
        try:
            if hasattr(self, 'conn'):
                self.conn.close()
                print("Koneksi DuckDB ditutup.")
        except:
            pass

etl = ETLpipeline()

## Run ETL Pipeline

In [4]:
etl.run_pipeline()
etl.get_etl_log()

Unnamed: 0,log_id,process_name,start_time,end_time,duration_seconds,records_processed,status,message
0,8,load,2025-04-03 21:08:34.528,2025-04-03 21:08:46.449,11.921,451464,SUCCESS,
1,7,transform,2025-04-03 21:08:27.595,2025-04-03 21:08:34.370,6.775,451464,SUCCESS,
2,6,extract,2025-04-03 21:08:23.424,2025-04-03 21:08:27.588,4.164,451535,SUCCESS,
3,5,initialize_warehouse_schema,2025-04-03 21:08:23.268,2025-04-03 21:08:23.409,0.141,0,SUCCESS,Data warehouse schema initialized
4,4,load,2025-04-03 20:59:12.496,2025-04-03 20:59:25.911,13.415,451464,SUCCESS,
5,3,transform,2025-04-03 20:59:04.654,2025-04-03 20:59:12.350,7.696,451464,SUCCESS,
6,2,extract,2025-04-03 20:59:01.791,2025-04-03 20:59:04.640,2.849,451535,SUCCESS,
7,1,initialize_warehouse_schema,2025-04-03 20:59:01.687,2025-04-03 20:59:01.784,0.097,0,SUCCESS,Data warehouse schema initialized


In [5]:
etl.close_connection()

Koneksi DuckDB ditutup.


In [7]:
def get_data():
    conn = duckdb.connect("brazilian_ecommerce.db")
    df = conn.execute("SELECT * FROM fact_orders").fetchdf()
    conn.close()
    return df

get_data()

Unnamed: 0,order_id,customer_id,date_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_customer_date,order_estimated_delivery_date,sales_amount
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,20171002,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-10 21:25:13,2017-10-18,29.99
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,20180724,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-08-07 15:27:45,2018-08-13,118.70
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,20180808,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-17 18:06:29,2018-09-04,159.90
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,20171118,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-12-02 00:28:42,2017-12-15,45.00
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,20180213,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-16 18:17:02,2018-02-26,19.90
...,...,...,...,...,...,...,...,...,...
99436,9c5dedf39a927c1b2549525ed64a053c,39bd1228ee8140590ac3aca26f2dfe00,20170309,delivered,2017-03-09 09:54:05,2017-03-09 09:54:05,2017-03-17 15:08:01,2017-03-28,72.00
99437,63943bddc261676b46f01ca7ac2f7bd8,1fca14ff2861355f6e5f14306ff977a7,20180206,delivered,2018-02-06 12:58:58,2018-02-06 13:10:37,2018-02-28 17:37:56,2018-03-02,174.90
99438,83c1379a015df1e13d02aae0204711ab,1aa71eb042121263aafbe80c1b562c9c,20170827,delivered,2017-08-27 14:46:43,2017-08-27 15:04:16,2017-09-21 11:24:17,2017-09-27,205.99
99439,11c177c8e97725db2631073c19f07b62,b331b74b18dc79bcdf6532d51e1637c1,20180108,delivered,2018-01-08 21:28:27,2018-01-08 21:36:21,2018-01-25 23:32:54,2018-02-15,359.98
