### **Load Library**

In [1]:
import pandas as pd
import sqlalchemy as sa

from config import oltp_conn_string, warehouse_conn_string, oltp_tables, warehouse_tables, dimension_columns, ddl_statements, ddl_marts

### **Function ETL**

In [2]:
def create_tables():
    """Create tables in the data warehouse if they do not exist."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        for ddl in ddl_statements.values():
            conn.execute(ddl)
            
def extract_data(table_name):
    """Extract data from a table in the OLTP database."""
    engine = sa.create_engine(oltp_conn_string)
    query = f"SELECT * FROM {oltp_tables[table_name]}"
    df = pd.read_sql(sa.text(query), engine.connect())
    print(f'Extract Data {oltp_tables[table_name]} Success')
    return df

def transform_data(df, target_table):
    """Transform the extracted data to match the schema of the target dimension table."""
    columns = dimension_columns.get(target_table)
    if columns:
        df = df[columns]
    print(f'Transform Data {target_table} Success')
    return df

def transform_fact_orders():
    """Transform data for the fact_orders table."""
    dataframes = {table: extract_data(table) for table in oltp_tables.keys()}

    df_orders = dataframes['orders']
    df_orders = df_orders.merge(dataframes['users'], on='user_id')
    df_orders = df_orders.merge(dataframes['payments'], on='payment_id')
    df_orders = df_orders.merge(dataframes['shippers'], on='shipper_id')
    df_orders = df_orders.merge(dataframes['ratings'], on='rating_id')
    df_orders = df_orders.merge(dataframes['vouchers'], how='left', on='voucher_id')
    df_orders.rename(columns={'user_id_x': 'user_id'}, inplace=True)
    
    fact_orders_columns = dimension_columns.get('fact_orders')
    return df_orders[fact_orders_columns]


def load_data(df, table_name):
    """Load the transformed data into the target table in the data warehouse."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        # Cek kunci unique
        unique_key = get_unique_key(table_name)  # Misalnya user_id untuk tabel dim_user
        existing_data = pd.read_sql(f"SELECT {unique_key} FROM {table_name}", conn)
        
        # Deduplikasi data
        df = deduplicate_data(df, existing_data, unique_key)
        
        # Masukkan data baru
        df.to_sql(table_name, conn, index=False, if_exists='append', method='multi')
        print(f'Load Data {table_name} Success')
        
def deduplicate_data(new_data, existing_data, unique_key):
    """Remove duplicates from new data based on existing data."""
    existing_keys = existing_data[unique_key].tolist()
    unique_rows = new_data[~new_data[unique_key].isin(existing_keys)]
    return unique_rows

def get_unique_key(table_name):
    """Retrieve the unique key of the table."""
    if table_name == 'dim_user':
        return 'user_id'
    elif table_name == 'dim_payment':
        return 'payment_id'
    elif table_name == 'dim_shipper':
        return 'shipper_id'
    elif table_name == 'dim_rating':
        return 'rating_id'
    elif table_name == 'dim_voucher':
        return 'voucher_id'
    elif table_name == 'dim_product':
        return 'product_id'
    elif table_name == 'dim_product_category':
        return 'product_category_id'
    elif table_name == 'fact_orders':
        return 'order_id'
    elif table_name == 'fact_order_items':
        return 'order_item_id'
    else:
        raise ValueError("Table name not recognized.")

### **Function Data Mart**

In [3]:
def create_and_insert():
    """Create dm_sales table and insert data into it."""
    engine = sa.create_engine(warehouse_conn_string)
    with engine.connect() as conn:
        for key, value in ddl_marts.items():
            conn.execute(sa.text(ddl_marts[key]))
            print(f"Create and Inserting data table for {key}")
            #conn.commit()  # Commit changes after executing statements
            print(f'Data Mart Has Create Success')
    print(f'Data Mart Has Create Success')

### **Function Run**

In [4]:
def etl_process():
    """Run the entire ETL process."""
    # Create tables
    create_tables()

    # Process dimension tables
    for dim_table, target_table in warehouse_tables.items():
        print(dim_table, target_table)
        if dim_table != 'fact_orders':
            source_table = dim_table
            df = extract_data(source_table)
            transformed_df = transform_data(df, dim_table)
            load_data(transformed_df, target_table)
        else:
            # Process fact table
            df_fact_orders = transform_fact_orders()
            load_data(df_fact_orders, target_table)

    # proses mart table
    create_and_insert()

# **Run ETL**

In [5]:
#script running all ETL
etl_process()

users dim_user
Extract Data tb_users Success
Transform Data users Success
Load Data dim_user Success
payments dim_payment
Extract Data tb_payments Success
Transform Data payments Success
Load Data dim_payment Success
shippers dim_shipper
Extract Data tb_shippers Success
Transform Data shippers Success
Load Data dim_shipper Success
ratings dim_rating
Extract Data tb_ratings Success
Transform Data ratings Success
Load Data dim_rating Success
vouchers dim_voucher
Extract Data tb_vouchers Success
Transform Data vouchers Success
Load Data dim_voucher Success
products dim_product
Extract Data tb_products Success
Transform Data products Success
Load Data dim_product Success
product_category dim_product_category
Extract Data tb_product_category Success
Transform Data product_category Success
Load Data dim_product_category Success
orders fact_orders
Extract Data tb_orders Success
Transform Data orders Success
Load Data fact_orders Success
order_items fact_order_items
Extract Data tb_order_items

### **Run Testing**

In [6]:
create_tables()

source_table = 'ratings'
df = extract_data(source_table)
df

Extract Data tb_ratings Success


Unnamed: 0,rating_id,rating_level,rating_status
0,800010001,1,Very Low Impact
1,800010002,2,Low Impact
2,800010003,3,Medium Impact
3,800010004,4,Medium High Impact
4,800010005,5,High Impact


In [7]:
transformed_df = transform_data(df, 'dim_rating')
transformed_df

Transform Data dim_rating Success


Unnamed: 0,rating_id,rating_level,rating_status
0,800010001,1,Very Low Impact
1,800010002,2,Low Impact
2,800010003,3,Medium Impact
3,800010004,4,Medium High Impact
4,800010005,5,High Impact


In [8]:
load_data(transformed_df, 'dim_rating')

Load Data dim_rating Success


### **Script Upload Google Sheets**

In [9]:
import json
import gspread
from oauth2client.service_account import ServiceAccountCredentials

with open('digitalskola_key.json','rb') as file:
    key = json.load(file)
    
scope = ['https://www.googleapis.com/auth/drive','https://spreadsheets.google.com/feeds']
creds = ServiceAccountCredentials.from_json_keyfile_dict(key, scope)
client = gspread.authorize(creds)

###tambahkan email googledigitalskola@digitalskola-368401.iam.gserviceaccount.com ke dalam google sheet anda#

In [10]:
def fetch_data_from_dwh(query):
     # Membuat koneksi ke database
    engine = sa.create_engine(warehouse_conn_string)
    
    # Membuat hasil query menjadi Datafrmae
    df = pd.read_sql(query, engine)
    
    return df

df_mart = fetch_data_from_dwh("""SELECT * FROM dm_sales_per_category""")
df_mart

Unnamed: 0,product_category_id,category_name,total_sales
0,320001001,Fashion,18
1,320001002,Electronic,7
2,320001003,Health & Beauty,5


In [11]:
# ganti dengan nama google sheets anda
sheet = client.open('Contoh Source Data')

# ganti sesuai dengan nama sheet didalam google sheets anda
# siapkan nama kolom pada sheet di google sheet anda

export = sheet.worksheet('Sheet3')
export.update([df_mart.columns.values.tolist()] + df_mart.astype(str).values.tolist())

{'spreadsheetId': '163IyMV2W_SR_vYg9IOPYcmQwOtVxSfFQzhPPb9RjBA0',
 'updatedRange': 'Sheet3!A1:C4',
 'updatedRows': 4,
 'updatedColumns': 3,
 'updatedCells': 12}