### **Load Library**

In [1]:
import pandas as pd
import sqlalchemy as sa

from config import oltp_conn_string, warehouse_conn_string, oltp_tables, warehouse_tables, dimension_columns, ddl_statements, ddl_marts

In [2]:
import sqlalchemy
sqlalchemy.__version__ 

'1.4.36'

### **Function ETL**

In [3]:
def create_tables():
    """Create tables in the data warehouse if they do not exist."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        for ddl in ddl_statements.values():
            conn.execute(ddl)
            
def extract_data(table_name):
    """Extract data from a table in the OLTP database."""
    engine = sa.create_engine(oltp_conn_string)
    query = f"SELECT * FROM {oltp_tables[table_name]}"
    df = pd.read_sql(query, engine)
    print(f'Extract Data {oltp_tables[table_name]} Success')
    return df

def transform_data(df, target_table):
    """Transform the extracted data to match the schema of the target dimension table."""
    columns = dimension_columns.get(target_table)
    if columns:
        df = df[columns]
    print(f'Transform Data {target_table} Success')
    return df

def transform_fact_orders():
    """Transform data for the fact_orders table."""
    dataframes = {table: extract_data(table) for table in oltp_tables.keys()}

    df_orders = dataframes['orders']
    df_orders = df_orders.merge(dataframes['users'], on='user_id')
    df_orders = df_orders.merge(dataframes['payments'], on='payment_id')
    df_orders = df_orders.merge(dataframes['shippers'], on='shipper_id')
    df_orders = df_orders.merge(dataframes['ratings'], on='rating_id')
    df_orders = df_orders.merge(dataframes['vouchers'], how='left', on='voucher_id')
    df_orders.rename(columns={'user_id_x': 'user_id'}, inplace=True)
    df_orders = df_orders.merge(dataframes['products'], on='product_id')
    df_orders = df_orders.merge(dataframes['product_category'], on='product_category_id')
    df_orders = df_orders.merge(dataframes['order_items'], on='order_item_id')
    
    fact_orders_columns = dimension_columns.get('fact_orders')
    return df_orders[fact_orders_columns]

def load_data(df, table_name):
    """Load the transformed data into the target table in the data warehouse."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        # Cek kunci unique
        unique_key = get_unique_key(table_name)  # Misalnya user_id untuk tabel dim_user
        existing_data = pd.read_sql(f"SELECT {unique_key} FROM {table_name}", conn)
        
        # Deduplikasi data
        df = deduplicate_data(df, existing_data, unique_key)
        
        # Masukkan data baru
        df.to_sql(table_name, conn, index=False, if_exists='append', method='multi')
        print(f'Load Data {table_name} Success')
        
def deduplicate_data(new_data, existing_data, unique_key):
    """Remove duplicates from new data based on existing data."""
    existing_keys = existing_data[unique_key].tolist()
    unique_rows = new_data[~new_data[unique_key].isin(existing_keys)]
    return unique_rows

def get_unique_key(table_name):
    """Retrieve the unique key of the table."""
    if table_name == 'dim_user':
        return 'user_id'
    elif table_name == 'dim_payment':
        return 'payment_id'
    elif table_name == 'dim_shipper':
        return 'shipper_id'
    elif table_name == 'dim_rating':
        return 'rating_id'
    elif table_name == 'dim_voucher':
        return 'voucher_id'
    elif table_name == 'fact_orders':
        return 'order_id'
    elif table_name == 'dim_product_category':
        return 'product_category_id'
    elif table_name == 'dim_products':
        return 'product_id'
    elif table_name == 'fact_order_items':
        return 'order_item_id'
    # Tambahkan kondisi lain jika ada tabel lain
    else:
        raise ValueError("Table name not recognized.")

### **Function Data Mart**

In [4]:
def create_and_insert_dm_sales():
    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts['dim_sales'])

        # Insert data into dm_sales table
        conn.execute(ddl_marts['insert_dm_sales'])
    print(f'Data Mart Has Create Success')

### **Function Run**

In [5]:
def etl_process():
    """Run the entire ETL process."""
    # Create tables
    create_tables()

    # Process dimension tables
    for dim_table, target_table in warehouse_tables.items():
        if dim_table != 'fact_orders':
            source_table = dim_table
            df = extract_data(source_table)
            transformed_df = transform_data(df, dim_table)
            load_data(transformed_df, target_table)
        else:
            # Process fact table
            df_fact_orders = transform_fact_orders()
            load_data(df_fact_orders, target_table)

    # proses mart table
    create_and_insert_dm_sales()

# **Run ETL**

In [6]:
#script running all ETL
etl_process()

Extract Data tb_users Success
Transform Data users Success
Load Data dim_user Success
Extract Data tb_payments Success
Transform Data payments Success
Load Data dim_payment Success
Extract Data tb_shippers Success
Transform Data shippers Success
Load Data dim_shipper Success
Extract Data tb_ratings Success
Transform Data ratings Success
Load Data dim_rating Success
Extract Data tb_vouchers Success
Transform Data vouchers Success
Load Data dim_voucher Success
Extract Data tb_orders Success
Transform Data orders Success
Load Data fact_orders Success
Extract Data tb_product_category Success
Transform Data product_category Success
Load Data dim_product_category Success
Extract Data tb_products Success
Transform Data products Success
Load Data dim_products Success
Extract Data tb_order_items Success
Transform Data order_items Success
Load Data fact_order_items Success
Data Mart Has Create Success


### **Run Testing**

In [7]:
create_tables()

source_table = 'users'
df = extract_data(source_table)
df

Extract Data tb_users Success


Unnamed: 0,user_id,user_first_name,user_last_name,user_gender,user_address,user_birthday,user_join
0,100101,Budi,Gunawan,Male,"Jl. Pondok Indah No.1, Kecamatan Pondok Labu, ...",1998-09-12,2022-01-13
1,100102,Eva,Susanti,Female,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti...",1997-02-16,2022-01-29
2,100103,Dana,Pradana,Male,"Jl. Pahlawan, Surabaya, Jawa Timur",1999-07-19,2022-02-11
3,100104,Rahmat,Hidayat,Male,"Jl. Amil Abas, Jakarta Timur, DKI Jakarta",2000-02-14,2022-03-22
4,100105,Dodo,Andriano,Male,"Jl. Pakuan Selatan No. 177, Magelang, Jawa Tengah",2000-09-06,2022-04-03
5,100106,Caca,Kumala,Female,"Jl. Bunga Raya, Kota Tanggerang, Banten",1998-11-05,2022-05-20
6,100107,Andi,Kurniawan,Male,"Jl. Mawar Indah No. 25, Jakarta Barat, DKI Jak...",2001-03-14,2022-05-24
7,100108,Fanny,Utami,Female,"Jl. Kilometer Panjang No. 210, Jakarta Utara, ...",2002-01-27,2022-06-02
8,100109,Gagah,Prakasa,Male,"Jl. Timur Asri No. 10, Denpasar, Bali",2001-08-05,2022-07-14
9,100110,Anita,Friska,Female,"Jl. Tembung Raya, Kota Medan Timur, Sumatera U...",2000-11-04,2022-07-21


In [8]:
transformed_df = transform_data(df, 'dim_user')
transformed_df

Transform Data dim_user Success


Unnamed: 0,user_id,user_first_name,user_last_name,user_gender,user_address,user_birthday,user_join
0,100101,Budi,Gunawan,Male,"Jl. Pondok Indah No.1, Kecamatan Pondok Labu, ...",1998-09-12,2022-01-13
1,100102,Eva,Susanti,Female,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti...",1997-02-16,2022-01-29
2,100103,Dana,Pradana,Male,"Jl. Pahlawan, Surabaya, Jawa Timur",1999-07-19,2022-02-11
3,100104,Rahmat,Hidayat,Male,"Jl. Amil Abas, Jakarta Timur, DKI Jakarta",2000-02-14,2022-03-22
4,100105,Dodo,Andriano,Male,"Jl. Pakuan Selatan No. 177, Magelang, Jawa Tengah",2000-09-06,2022-04-03
5,100106,Caca,Kumala,Female,"Jl. Bunga Raya, Kota Tanggerang, Banten",1998-11-05,2022-05-20
6,100107,Andi,Kurniawan,Male,"Jl. Mawar Indah No. 25, Jakarta Barat, DKI Jak...",2001-03-14,2022-05-24
7,100108,Fanny,Utami,Female,"Jl. Kilometer Panjang No. 210, Jakarta Utara, ...",2002-01-27,2022-06-02
8,100109,Gagah,Prakasa,Male,"Jl. Timur Asri No. 10, Denpasar, Bali",2001-08-05,2022-07-14
9,100110,Anita,Friska,Female,"Jl. Tembung Raya, Kota Medan Timur, Sumatera U...",2000-11-04,2022-07-21


In [9]:
load_data(transformed_df, 'dim_user')

Load Data dim_user Success


### **Script Upload Google Sheets**

In [10]:
import json
import gspread
from oauth2client.service_account import ServiceAccountCredentials

with open('digitalskola_key.json','rb') as file:
    key = json.load(file)
    
scope = ['https://www.googleapis.com/auth/drive','https://spreadsheets.google.com/feeds']
creds = ServiceAccountCredentials.from_json_keyfile_dict(key, scope)
client = gspread.authorize(creds)

###tambahkan email googledigitalskola@digitalskola-368401.iam.gserviceaccount.com ke dalam google sheet anda#

In [11]:
def fetch_data_from_dwh(query):
     # Membuat koneksi ke database
    engine = sa.create_engine(warehouse_conn_string)
    
    # Membuat hasil query menjadi Datafrmae
    df = pd.read_sql(query, engine)
    
    return df

df_mart = fetch_data_from_dwh("""SELECT * FROM dm_sales;""")
df_mart

Unnamed: 0,order_id,order_date,user_id,user_name,payment_type,shipper_name,voucher_name,product_category,region,order_price,order_discount,order_total,rating_level,rating_status
0,1110004,2022-03-06,100102,Eva Susanti,Wallet,JNE Express,,Electronic,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti...",1800000,0,1800000,5,High Impact
1,1110006,2022-05-09,100103,Dana Pradana,Debit,Sicepat Express,,Fashion,"Jl. Pahlawan, Surabaya, Jawa Timur",500000,30000,470000,4,Medium High Impact
2,1110012,2022-07-30,100110,Anita Friska,Debit,JNE Express,Body Soap Promo,Fashion,"Jl. Tembung Raya, Kota Medan Timur, Sumatera U...",250000,15000,235000,4,Medium High Impact
3,1110006,2022-05-09,100103,Dana Pradana,Debit,Sicepat Express,,Electronic,"Jl. Pahlawan, Surabaya, Jawa Timur",4000000,1000000,3000000,4,Medium High Impact
4,1110005,2022-04-28,100105,Dodo Andriano,Debit,Sicepat Express,New User,Electronic,"Jl. Pakuan Selatan No. 177, Magelang, Jawa Tengah",4000000,1000000,3000000,1,Very Low Impact
5,1110009,2022-06-23,100103,Dana Pradana,Credit,Lazada Express,,Electronic,"Jl. Pahlawan, Surabaya, Jawa Timur",2000000,0,2000000,5,High Impact
6,1110001,2022-01-20,100101,Budi Gunawan,Debit,JNE Express,New User,Fashion,"Jl. Pondok Indah No.1, Kecamatan Pondok Labu, ...",250000,15000,235000,3,Medium Impact
7,1110011,2022-07-21,100110,Anita Friska,Wallet,Sicepat Express,,Fashion,"Jl. Tembung Raya, Kota Medan Timur, Sumatera U...",550000,15000,535000,5,High Impact
8,1110002,2022-01-29,100102,Eva Susanti,Debit,JNE Express,New User,Fashion,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti...",500000,30000,470000,3,Medium Impact
9,1110007,2022-05-21,100106,Caca Kumala,Debit,JNE Express,,Fashion,"Jl. Bunga Raya, Kota Tanggerang, Banten",250000,15000,235000,5,High Impact


In [12]:
# ganti dengan nama google sheets anda
sheet = client.open('Datamart')

# ganti sesuai dengan nama sheet didalam google sheets anda
# siapkan nama kolom pada sheet di google sheet anda

export = sheet.worksheet('Sheet1')
export.update([df_mart.columns.values.tolist()] + df_mart.astype(str).values.tolist())

{'spreadsheetId': '1CVmN6bnPKPccJj090rxMPQBmkCMZrNlMKjen-tBzSTU',
 'updatedRange': 'Sheet1!A1:N18',
 'updatedRows': 18,
 'updatedColumns': 14,
 'updatedCells': 252}