### Load Library

In [1]:
import pandas as pd
import sqlalchemy as sa

from config import oltp_conn_string, warehouse_conn_string, oltp_tables, dimension_columns, ddl_statements

In [2]:
conn_oltp = sa.create_engine(oltp_conn_string)
conn_dwh = sa.create_engine(warehouse_conn_string)

### Create Table

In [3]:
def create_tables():
    """Buat table di dalam datawarehouse jika belum ada"""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        for ddl in ddl_statements.values():
            conn.execute(sa.text(ddl))
            conn.commit()

### Extract Data

In [4]:
def extract_data(table_name):
    """Extract data dari table di database oltp"""
    engine = sa.create_engine(oltp_conn_string)
    query = f"SELECT * FROM {table_name}"
    df = pd.read_sql(query, engine)
    print(f'Extract Data {table_name} Success')
    return df

In [5]:
df_orders = extract_data("tb_orders")
df_user = extract_data("tb_user")
df_payment = extract_data("tb_payment")
df_shipper = extract_data("tb_shipper")
df_rating = extract_data("tb_rating")
df_voucher = extract_data("tb_voucher")

Extract Data tb_orders Success
Extract Data tb_user Success
Extract Data tb_payment Success
Extract Data tb_shipper Success
Extract Data tb_rating Success
Extract Data tb_voucher Success


### Transform Data

In [6]:
def transform_data(df, target_table):
    """Transform the extracted data to match the schema of the target dimension table"""
    columns = dimension_columns.get(target_table)
    if columns:
        df = df[columns]
    print(f'Transform Data {target_table} Success')
    return df

In [7]:
tdf_orders = transform_data(df_orders, "tb_orders")
tdf_user = transform_data(df_user, "tb_user")
tdf_payment = transform_data(df_payment, "tb_payment")
tdf_shipper = transform_data(df_shipper, "tb_shipper")
tdf_rating = transform_data(df_rating, "tb_rating")
tdf_voucher = transform_data(df_voucher, "tb_voucher")

Transform Data tb_orders Success
Transform Data tb_user Success
Transform Data tb_payment Success
Transform Data tb_shipper Success
Transform Data tb_rating Success
Transform Data tb_voucher Success


### Transform table fact

In [9]:
def transform_fact_orders():
    """Transform data for the fact_orders table"""
    dataframes = {table: extract_data(table) for table in oltp_tables.values()}

    df_orders = dataframes['tb_orders']
    df_orders = df_orders.merge(dataframes['tb_user'], on = 'user_id')
    df_orders = df_orders.merge(dataframes['tb_payment'], on = 'payment_id')
    df_orders = df_orders.merge(dataframes['tb_shipper'], on = 'shipper_id')
    df_orders = df_orders.merge(dataframes['tb_rating'], on = 'rating_id')
    df_orders = df_orders.merge(dataframes['tb_voucher'], how='left', on = 'voucher_id')
    df_orders.rename(columns={'user_id_x' : 'user_id'}, inplace=True)

    fact_orders_columns = dimension_columns.get('fact_orders')
    return df_orders[fact_orders_columns]

In [11]:
fact_orders_df = transform_fact_orders()

Extract Data tb_user Success
Extract Data tb_payment Success
Extract Data tb_shipper Success
Extract Data tb_rating Success
Extract Data tb_voucher Success
Extract Data tb_orders Success


### Load Data

In [12]:
def load_data(df, table_name):
    """Load the transformed data into the target table in the data warehouse"""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        # Masukan data baru
        df.to_sql(table_name, conn, index=False, if_exists='append', method='multi')
        print(f'Load Data {table_name} Success')

In [13]:
load_data(tdf_user, "dim_user")
load_data(df_payment, "dim_payment")
load_data(tdf_shipper, "dim_shipper")
load_data(tdf_rating, "dim_rating")
load_data(tdf_voucher, "dim_voucher")
load_data(fact_orders_df, "fact_orders")

Load Data dim_user Success
Load Data dim_payment Success
Load Data dim_shipper Success
Load Data dim_rating Success
Load Data dim_voucher Success
Load Data fact_orders Success
