### **Load Library**

In [5]:
import pandas as pd
import sqlalchemy as sa

# from config import oltp_conn_string, warehouse_conn_string, oltp_tables, warehouse_tables, dimension_columns, ddl_statements, ddl_marts
from config_local import oltp_conn_string_local, warehouse_conn_string_local, oltp_tables, warehouse_tables, dimension_columns, ddl_statements, ddl_marts

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


### **Function ETL**

In [8]:
def create_tables():
    """Create tables in the data warehouse if they do not exist."""
    engine = sa.create_engine(warehouse_conn_string_local)
    with engine.connect() as conn:
        for ddl in ddl_statements.values():
            conn.execute(ddl)
            
def extract_data(table_name):
    """Extract data from a table in the OLTP database."""
    engine = sa.create_engine(oltp_conn_string_local)
    query = f"SELECT * FROM {oltp_tables[table_name]}"
    df = pd.read_sql(query, engine)
    print(f'Extract Data {oltp_tables[table_name]} Success')
    return df

def transform_data(df, target_table):
    """Transform the extracted data to match the schema of the target dimension table."""
    columns = dimension_columns.get(target_table)
    if columns:
        df = df[columns]
    print(f'Transform Data {target_table} Success')
    return df

def transform_fact_orders():
    """Transform data for the fact_orders table."""
    dataframes = {table: extract_data(table) for table in oltp_tables.keys()}
    for table_name, df in dataframes.items():
        print(f"Columns in {table_name}: {df.columns.tolist()}")
    # Join order_items with orders to bring in user_id and other columns
    df_order_items = dataframes['order_items']
    df_orders = dataframes['orders']
    
    # Exclude rows with missing product_id
    df_order_items = df_order_items[df_order_items['product_id'].notnull()]
    df_fact_orders = df_order_items.merge(df_orders, on='order_id')

    # Merge with other dimension tables
    df_fact_orders = df_fact_orders.merge(dataframes['users'], on='user_id')
    df_fact_orders = df_fact_orders.merge(dataframes['products'], on='product_id')
    df_fact_orders = df_fact_orders.merge(dataframes['product_category'], on='product_category_id')
    df_fact_orders = df_fact_orders.merge(dataframes['payments'], on='payment_id')
    df_fact_orders = df_fact_orders.merge(dataframes['shippers'], on='shipper_id')
    df_fact_orders = df_fact_orders.merge(dataframes['ratings'], on='rating_id')
    df_fact_orders = df_fact_orders.merge(dataframes['vouchers'], how='left', on='voucher_id')

    df_fact_orders.rename(columns={'user_id_x': 'user_id'}, inplace=True)
    
    fact_orders_columns = dimension_columns.get('fact_orders')
    return df_fact_orders[fact_orders_columns]


# def load_data(df, table_name):
#     """Load the transformed data into the target table in the data warehouse."""
#     engine = sa.create_engine(warehouse_conn_string_local)
    
#     with engine.connect() as conn:
#         # Cek kunci unique
#         unique_key = get_unique_key(table_name) 
#         # Misalnya user_id untuk tabel dim_user
#         existing_data = pd.read_sql(f"SELECT {unique_key} FROM {table_name}", conn)
#         #print(existing_data)
#         # Deduplikasi data
#         df = deduplicate_data(df, existing_data, unique_key)
#         print(df.shape)
#         # Masukkan data baru
#         df.to_sql(table_name, conn, index=False, if_exists='append', method='multi')
#         print(df.shape)
#         print(f'Load Data {table_name} Success')

def load_data(df, table_name):
    """Load the transformed data into the target table in the data warehouse."""
    engine = sa.create_engine(warehouse_conn_string_local)
    # Cek kunci unique
    unique_key = get_unique_key(table_name) 
    # Misalnya user_id untuk tabel dim_user
    existing_data = pd.read_sql(f"SELECT {unique_key} FROM {table_name}", engine)
    #print(existing_data)
    # Deduplikasi data
    df = deduplicate_data(df, existing_data, unique_key)
    print(df.shape)
    # Masukkan data baru
    df.to_sql(table_name, engine, index=False, if_exists='append', method='multi')
    print(df.shape)
    print(f'Load Data {table_name} Success')
        
def deduplicate_data(new_data, existing_data, unique_key):
    """Remove duplicates from new data based on existing data."""
    #print(new_data)
    existing_keys = existing_data[unique_key].tolist()
    #print(existing_keys)
    unique_rows = new_data[~new_data[unique_key].isin(existing_keys)]
    #print(unique_rows)
    return unique_rows

def get_unique_key(table_name):
    """Retrieve the unique key of the table."""
    if table_name == 'dim_user':
        return 'user_id'
    if table_name == 'dim_product':
        return 'product_id'
    if table_name == "dim_product_category":
        return 'product_category_id'
    elif table_name == 'dim_payment':
        return 'payment_id'
    elif table_name == 'dim_shipper':
        return 'shipper_id'
    elif table_name == 'dim_rating':
        return 'rating_id'
    elif table_name == 'dim_voucher':
        return 'voucher_id'
    elif table_name == 'fact_orders':
        return 'order_id'
    # Tambahkan kondisi lain jika ada tabel lain
    else:
        raise ValueError("Table name not recognized.")

### **Function Data Mart**

In [2]:
def create_and_insert_dm_sales():
    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string_local)
    with engine.connect() as conn:
        # Create dm_sales table
        conn.execute(ddl_marts['dim_sales'])

        # Insert data into dm_sales table
        conn.execute(ddl_marts['insert_dm_sales'])
    print(f'Data Mart Has Create Success')

### **Function Run**

In [3]:
def etl_process():
    """Run the entire ETL process."""
    # Create tables
    #create_tables()

    # Process dimension tables
    for dim_table, target_table in warehouse_tables.items():
        #if dim_table != 'fact_orders':
        if dim_table != 'orders':
            print(dim_table)
            source_table = dim_table
            df = extract_data(source_table)
            transformed_df = transform_data(df, dim_table)
            load_data(transformed_df, target_table)
        else:
            # Process fact table
            df_fact_orders = transform_fact_orders()
            load_data(df_fact_orders, target_table)

    # proses mart table
    create_and_insert_dm_sales()

# **Run ETL**

In [9]:
#script running all ETL
etl_process()

users
Extract Data tb_users Success
Transform Data users Success
(0, 7)
(0, 7)
Load Data dim_user Success
product_category
Extract Data tb_product_category Success
Transform Data product_category Success
(0, 2)
(0, 2)
Load Data dim_product_category Success
products
Extract Data tb_products Success
Transform Data products Success
(0, 6)
(0, 6)
Load Data dim_product Success
payments
Extract Data tb_payments Success
Transform Data payments Success
(0, 3)
(0, 3)
Load Data dim_payment Success
shippers
Extract Data tb_shippers Success
Transform Data shippers Success
(0, 2)
(0, 2)
Load Data dim_shipper Success
ratings
Extract Data tb_ratings Success
Transform Data ratings Success
(0, 3)
(0, 3)
Load Data dim_rating Success
vouchers
Extract Data tb_vouchers Success
Transform Data vouchers Success
(0, 5)
(0, 5)
Load Data dim_voucher Success
Extract Data tb_users Success
Extract Data tb_product_category Success
Extract Data tb_products Success
Extract Data tb_payments Success
Extract Data tb_ship

IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "product_id" of relation "fact_orders" violates not-null constraint
DETAIL:  Failing row contains (1110001, 2022-01-20, 100101, null, 1202, 60002001, 250000, 15000, 41000101, 230000, 800010003).

[SQL: INSERT INTO fact_orders (order_id, order_date, user_id, payment_id, shipper_id, order_price, order_discount, voucher_id, order_total, rating_id) VALUES (%(order_id__0)s, %(order_date__0)s, %(user_id__0)s, %(payment_id__0)s, %(shipper_id__0)s, %(order ... 4144 characters truncated ... der_price__21)s, %(order_discount__21)s, %(voucher_id__21)s, %(order_total__21)s, %(rating_id__21)s)]
[parameters: {'voucher_id__0': 41000101.0, 'order_total__0': 230000, 'shipper_id__0': 60002001, 'order_date__0': datetime.date(2022, 1, 20), 'order_discount__0': 15000, 'order_id__0': 1110001, 'payment_id__0': 1202, 'order_price__0': 250000, 'user_id__0': 100101, 'rating_id__0': 800010003, 'voucher_id__1': 41000102.0, 'order_total__1': 575000, 'shipper_id__1': 60002001, 'order_date__1': datetime.date(2022, 1, 29), 'order_discount__1': 40000, 'order_id__1': 1110002, 'payment_id__1': 1202, 'order_price__1': 620000, 'user_id__1': 100102, 'rating_id__1': 800010003, 'voucher_id__2': 41000102.0, 'order_total__2': 575000, 'shipper_id__2': 60002001, 'order_date__2': datetime.date(2022, 1, 29), 'order_discount__2': 40000, 'order_id__2': 1110002, 'payment_id__2': 1202, 'order_price__2': 620000, 'user_id__2': 100102, 'rating_id__2': 800010003, 'voucher_id__3': 41000103.0, 'order_total__3': 4995000, 'shipper_id__3': 60002001, 'order_date__3': datetime.date(2022, 2, 13), 'order_discount__3': 1000000, 'order_id__3': 1110003, 'payment_id__3': 1204, 'order_price__3': 6000000, 'user_id__3': 100103, 'rating_id__3': 800010001, 'voucher_id__4': 41000103.0, 'order_total__4': 4995000, 'shipper_id__4': 60002001, 'order_date__4': datetime.date(2022, 2, 13), 'order_discount__4': 1000000, 'order_id__4': 1110003, 'payment_id__4': 1204, 'order_price__4': 6000000, 'user_id__4': 100103, 'rating_id__4': 800010001 ... 120 parameters truncated ... 'voucher_id__17': None, 'order_total__17': 1005000, 'shipper_id__17': 60002003, 'order_date__17': datetime.date(2022, 7, 1), 'order_discount__17': 45000, 'order_id__17': 1110010, 'payment_id__17': 1204, 'order_price__17': 1050000, 'user_id__17': 100102, 'rating_id__17': 800010002, 'voucher_id__18': None, 'order_total__18': 535000, 'shipper_id__18': 60002002, 'order_date__18': datetime.date(2022, 7, 21), 'order_discount__18': 15000, 'order_id__18': 1110011, 'payment_id__18': 1203, 'order_price__18': 550000, 'user_id__18': 100110, 'rating_id__18': 800010005, 'voucher_id__19': None, 'order_total__19': 535000, 'shipper_id__19': 60002002, 'order_date__19': datetime.date(2022, 7, 21), 'order_discount__19': 15000, 'order_id__19': 1110011, 'payment_id__19': 1203, 'order_price__19': 550000, 'user_id__19': 100110, 'rating_id__19': 800010005, 'voucher_id__20': 41000115.0, 'order_total__20': 445000, 'shipper_id__20': 60002001, 'order_date__20': datetime.date(2022, 7, 30), 'order_discount__20': 35000, 'order_id__20': 1110012, 'payment_id__20': 1202, 'order_price__20': 490000, 'user_id__20': 100110, 'rating_id__20': 800010004, 'voucher_id__21': 41000115.0, 'order_total__21': 445000, 'shipper_id__21': 60002001, 'order_date__21': datetime.date(2022, 7, 30), 'order_discount__21': 35000, 'order_id__21': 1110012, 'payment_id__21': 1202, 'order_price__21': 490000, 'user_id__21': 100110, 'rating_id__21': 800010004}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)

### **Run Testing**

In [23]:
#create_tables()

source_table = 'product_category'
df = extract_data(source_table)
df

Extract Data tb_product_category Success


Unnamed: 0,product_category_id,product_category_name
0,320001001,Fashion
1,320001002,Electronic
2,320001003,Health & Beauty


In [52]:
transformed_df = transform_data(df, 'dim_user')
transformed_df

Transform Data dim_user Success


Unnamed: 0,user_id,user_first_name,user_last_name,user_gender,user_address,user_birthday,user_join
0,100101,Budi,Gunawan,Male,"Jl. Pondok Indah No.1, Kecamatan Pondok Labu, ...",1998-09-12,2022-01-13
1,100102,Eva,Susanti,Female,"Jl. Timur Raya No. 13, Kramat Jaya, Jakarta Ti...",1997-02-16,2022-01-29
2,100103,Dana,Pradana,Male,"Jl. Pahlawan, Surabaya, Jawa Timur",1999-07-19,2022-02-11
3,100104,Rahmat,Hidayat,Male,"Jl. Amil Abas, Jakarta Timur, DKI Jakarta",2000-02-14,2022-03-22
4,100105,Dodo,Andriano,Male,"Jl. Pakuan Selatan No. 177, Magelang, Jawa Tengah",2000-09-06,2022-04-03
5,100106,Caca,Kumala,Female,"Jl. Bunga Raya, Kota Tanggerang, Banten",1998-11-05,2022-05-20
6,100107,Andi,Kurniawan,Male,"Jl. Mawar Indah No. 25, Jakarta Barat, DKI Jak...",2001-03-14,2022-05-24
7,100108,Fanny,Utami,Female,"Jl. Kilometer Panjang No. 210, Jakarta Utara, ...",2002-01-27,2022-06-02
8,100109,Gagah,Prakasa,Male,"Jl. Timur Asri No. 10, Denpasar, Bali",2001-08-05,2022-07-14
9,100110,Anita,Friska,Female,"Jl. Tembung Raya, Kota Medan Timur, Sumatera U...",2000-11-04,2022-07-21


In [59]:
load_data(transformed_df, 'dim_user')

(10, 7)
(10, 7)
Load Data dim_user Success


In [56]:

# with engine.connect() as conn:
#     # Cek kunci unique
#     unique_key = get_unique_key("dim_user") 
#     #print(unique_key)
#     # Misalnya user_id untuk tabel dim_user
#     existing_data = pd.read_sql(f"SELECT {unique_key} FROM dim_user", conn)
#     # Deduplikasi data
#     df = deduplicate_data(df, existing_data, unique_key)
print(df.shape)
engine = sa.create_engine(warehouse_conn_string_local)
df.to_sql("dim_user",engine , if_exists='append', index=False)

(10, 7)


10

### **Script Upload Google Sheets**

In [23]:
import json
import gspread
from oauth2client.service_account import ServiceAccountCredentials

with open('digitalskola_key.json','rb') as file:
    key = json.load(file)
    
scope = ['https://www.googleapis.com/auth/drive','https://spreadsheets.google.com/feeds']
creds = ServiceAccountCredentials.from_json_keyfile_dict(key, scope)
client = gspread.authorize(creds)

###tambahkan email googledigitalskola@digitalskola-368401.iam.gserviceaccount.com ke dalam google sheet anda#

In [34]:
def fetch_data_from_dwh(query):
     # Membuat koneksi ke database
    engine = sa.create_engine(warehouse_conn_string)
    
    # Membuat hasil query menjadi Datafrmae
    df = pd.read_sql(query, engine)
    
    return df

df_mart = fetch_data_from_dwh("""SELECT * FROM dm_sales;""")
df_mart

Unnamed: 0,order_id,order_date,user_id,user_name,payment_type,shipper_name,order_price,order_discount,voucher_name,order_total
0,1110001,2022-01-20,100101,Budi Gunawan,Debit,JNE Express,250000,15000,New User,230000
1,1110002,2022-01-29,100102,Eva Susanti,Debit,JNE Express,620000,40000,New User,575000
2,1110003,2022-02-13,100103,Dana Pradana,Credit,JNE Express,6000000,1000000,New User,4995000
3,1110005,2022-04-28,100105,Dodo Andriano,Debit,Sicepat Express,4000000,1000000,New User,2995000
4,1110008,2022-06-02,100108,Fanny Utami,Credit,Sicepat Express,2000000,0,New User,1995000
5,1110012,2022-07-30,100110,Anita Friska,Debit,JNE Express,490000,35000,Body Soap Promo,445000


In [41]:
# ganti dengan nama google sheets anda
sheet = client.open('Contoh Source Data')

# ganti sesuai dengan nama sheet didalam google sheets anda
# siapkan nama kolom pada sheet di google sheet anda

export = sheet.worksheet('Sheet3')
export.update([df_mart.columns.values.tolist()] + df_mart.astype(str).values.tolist())

{'spreadsheetId': '163IyMV2W_SR_vYg9IOPYcmQwOtVxSfFQzhPPb9RjBA0',
 'updatedRange': 'Sheet3!A1:J7',
 'updatedRows': 7,
 'updatedColumns': 10,
 'updatedCells': 70}