In [102]:
from dotenv import load_dotenv # for load from env 
import os
import pandas as pd
from datetime import datetime 
import re # regex module
import logging

from minio import Minio 
from io  import BytesIO

from sqlalchemy import create_engine 
import sqlalchemy 
from pangres import upsert 

In [103]:
#load env variables 
load_dotenv(".env")

DB_HOST = os.getenv("DB_HOST")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME")
DB_SCHEMA_STG = os.getenv("DB_SCHEMA_STG")
DB_SCHEMA_LOG = os.getenv("DB_SCHEMA_LOG")
DB_SCHEMA_DWH = os.getenv("DB_SCHEMA_DWH")
MODEL_PATH = os.getenv("MODEL_PATH")   

# get minio access from env 
ACCESS_KEY_MINIO = os.getenv("ACCESS_KEY_MINIO")
SECRET_KEY_MINIO = os.getenv("SECRET_KEY_MINIO")

#### Read SQL

In [104]:
def read_sql(table_name) :
    # open sql file and read content
    with open(f"{MODEL_PATH}{table_name}.sql","r") as file:
        content = file.read()
    # return to text query
    return content

#### ETL Log

In [105]:
def etl_log(log_msg: dict) :
    try :
        #create connection database 
        conn = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}")

        # change dictionary log_msg to dataframe
        df_log = pd.DataFrame([log_msg])

        #extract data log 
        with conn.connect() as connection :
            df_log.to_sql(
                name = "etl_log",
                con = connection,
                schema = "log",
                if_exists = "append",
                index = False
            )
    except Exception as e :
        logging.error(f"Cant save your log message. Error: {e}", exc_info=True)
        raise



#### Read ETL Log

In [106]:
def read_etl_log(filter_params: dict) :
    try :
        # create connection to database 
        conn = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}")

        # get etl_date from latest
        query = sqlalchemy.text(read_sql("log"))

        # execute query with pd.read_sql 
        with conn.connect() as connection :
            df = pd.read_sql(
                sql = query,
                con = connection,
                params = (filter_params,)
            )
        # return extracted data 
        return df
    except Exception as  e :
        logging.error(f"Cant execute your query. Error: {e}", exc_info=True)
        raise

#### Extract Staging 

In [107]:
def extract_staging(table_name:str, schema_name:str) -> pd.DataFrame :
    try :
        # create connection to database 
        conn = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}")

        # get date from previous process in etl_log 
        filter_log = {
            "step_name" : "warehouse",
            "table_name" : table_name,
            "status" : "success",
            "process" : "load"
        }
        etl_date = read_etl_log(filter_log) 

        # if previous process is null, set etl_date to 1990-01-01
        # if previous process is not null, get the latest etl_date 
        if (etl_date['max'][0] == None) :
            etl_date = '1990-01-01'
        else :
            etl_date = etl_date[max][0]

        # create query to select all column from specified table where created_at > etl_date
        query = f"SELECT * FROM {schema_name}.{table_name} WHERE created_at > %s::timestamp"

        # execute the query with pd.read_sql 
        with conn.connect() as connection :
            df = pd.read_sql(
                sql=query,
                con=connection,
                params=(etl_date,)
            )
        log_msg = {
            "step"  : "warehouse",
            "process" : "extraction",
            "status" : "success",
            "source" :  "database",
            "table_name" : table_name, 
            "etl_date" : datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        return df
        
    except Exception as  e :
        log_msg = {
            "step"  : "warehouse",
            "process" : "extraction",
            "status" : "failed",
            "source" :  "database",
            "table_name" : table_name, 
            "etl_date" : datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg" : str(e)
        }
        logging.error(f"Cant execute your query. Error: {e}", exc_info=True)
        raise
    finally :
        etl_log(log_msg)

#### Handle Error MINIO

In [108]:
# create function to handle error data and upload it to minio
def handle_error(data, bucket_name: str, table_name: str, process: str) :
    current_date = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    
    try : 
        client = Minio('localhost:9000',
                       access_key=ACCESS_KEY_MINIO,
                       secret_key=SECRET_KEY_MINIO,
                       secure=False)
        # check if bucket exists, if not exists then create it
        if not client.bucket_exists(bucket_name):
            client.make_bucket(bucket_name)
        
        # convert dataframe to csv and then to bytes
        csv_bytes = data.to_csv().encode('utf-8')
        csv_buffer = BytesIO(csv_bytes)

        # upload the csv file to the bucket 
        client.put_object(
            bucket_name=bucket_name,
            object_name=f"{process}_{table_name}_{current_date}.csv",
            data=csv_buffer,
            length=len(csv_bytes),
            content_type='application/csv'
        )

        # list objects in the bucket 
        objects = client.list_objects(bucket_name, recursive=True)
        for obj in objects:
            logging.info(f"Object in bucket: {obj.object_name}")
    except Exception as e :
        logging.error(f"Failed to upload error data to Minio. Error: {e}", exc_info=True)
        raise


#### Load Warehouse 

In [133]:
# create function for load data to data warehouse
def load_warehouse(data, schema:str, table_name:str, idx_name:str, source):
    try :
        # create connection to database 
        conn = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}")

        # set data index or primary key
        data = data.set_index(idx_name)

        # do upsert ( insert for non existing data, update for existing data )
        # with conn.connect() as connection :
        upsert(
            df = data,
            con = conn,
            table_name = table_name,
            schema = schema,
            if_row_exists = "update"
        )
        log_msg = {
            "step" : "warehouse",
            "process":"load",
            "status": "success",
            "source": source,
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
        }
    except Exception as e :
        log_msg = {
            "step" : "warehouse",
            "process":"load",
            "status": "failed",
            "source": source,
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
            "error_msg": str(e)

        }
        handle_error(data, bucket_name="error-dellstore", table_name=table_name, process="load")
    finally :
        etl_log(log_msg)    


#### Extract Target

In [110]:
# create function to get data from data warehouse to obtain value of foreign key
def extract_target(table_name:str) :
    conn = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}")
    query = f"SELECT * FROM public.{table_name}"
    with conn.connect() as connection :
        df = pd.read_sql(
            sql = query,
            con = connection    
        )
    return df

## Transform

#### Transform Categories

In [111]:
## this function is for transformation table category from staging to data warehouse
## - rename column from category to category_nk
## - rename column from categoryname to category_name

def transform_categories(data: pd.DataFrame, table_name:str) -> pd.DataFrame :
    try :
        process = "transformation"
        data = data.rename(columns={'category' : 'category_nk',
                                   'categoryname' : 'category_name'})
        
        # remove dupicate based on category_nk and category_name 
        data = data.drop_duplicates(subset='category_nk')

        # drop column created_at 
        data = data.drop(columns=['created_at'])

        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "success",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
        }
        return data

    except Exception as e :
        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
            "error_msg" : str(e)
        }
        handle_error(data = data, 
                     bucket_name = 'error-dellstore',
                     table_name = table_name, 
                     process = process)
    finally :
        etl_log(log_msg)


#### Transform Customer

In [112]:
## this function is for transformation table customer from staging to data warehouse
## - rename column from creditcardtype to credit_card_type
## - rename column from creditcard to credit_card
## - rename column from creditcardexplanation to credit_card_explanation
## - rename column from firstname to first_name 
## - rename column from lastname to last_name

def transform_customer(data: pd.DataFrame, table_name:str) -> pd.DataFrame :
    try :
        process = "transformation"
        data = data.rename(columns={'customerid': 'customer_nk',
                            'firstname': 'first_name',
                            'lastname': 'last_name',
                            'creditcardtype': 'credit_card_type',
                            'creditcard': 'credit_card',
                            'creditcardexpiration': 'credit_card_expiration'})

        # remove dupicate based on customer_nk and customer_name 
        data = data.drop_duplicates(subset='customer_nk')

        # masking credit_card_number 
        data['credit_card'] = data['credit_card'].apply(lambda x: re.sub(r'\d', 'X', x[:-4]) + x[-4:])

        # drop column created_at 
        data = data.drop(columns=['created_at'])

        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "success",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
        }
        return data

    except Exception as e :
        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
            "error_msg" : str(e)
        }
        handle_error(data = data, 
                     bucket_name = 'error-dellstore',
                     table_name = table_name, 
                     process = process)
    finally :
        etl_log(log_msg)


#### Transform product

In [113]:
## this function is for transformation table products from staging to data warehouse
## - rename column from prod_id to product_nk
## - rename column from category to category_nk
## - lookup category_nk from categories tables based on category
def transform_product(data: pd.DataFrame, table_name:str) -> pd.DataFrame :
    try :
        process = "transformation"

        #rename column product 
        data = data.rename(columns= {
            "prod_id" : "product_nk",
            "category" : "category_nk"
        })

        # remove duplicate based on product_nk 
        data = data.drop_duplicates(subset='product_nk')

        # extract data from the 'categories' table 
        categories = extract_target('categories')

        # lookup 'category_id' from categories table based on 'category'
        data['category_id'] = data['category_nk'].apply(lambda x: categories.loc[categories['category_nk']== x, 'category_id'].values[0])
        # category_mapping = categories.set_index('category_nk')['category_id']
        # data['category_id'] = data['category_nk'].map(category_mapping)

        # drop column created_at
        data = data.drop(columns=['created_at','category_nk'])

        log_msg = {
            "step" : "warehouse",
            "process" : process, 
            "status" : "success",
            "source" : "staging",
            "table_name"  : table_name, 
            "etl_date" : datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
        }

        return data 
    
    except Exception as e :
        log_msg = {
            "step" : "warehouse",
            "process" : process, 
            "status" : "failed",
            "source" : "staging",
            "table_name"  : table_name, 
            "etl_date" : datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
            "error_msg" : str(e)
        }
        handle_error(data = data, 
                     bucket_name = 'error-dellstore',
                     table_name = table_name, 
                     process = process)    
    finally :
        etl_log(log_msg)

#### Transform Inventory

In [188]:
## this function is for transformation table inventory from staging to data warehouse
## - rename column quan_in_stock to quantity_in_stock
## - column product_id refers to products table using column product_nk

def transform_inventory(data: pd.DataFrame, table_name: str) -> pd.DataFrame : 
    try :
        process = "transformation"

        # rename column inventory
        data = data.rename(columns={
            "quan_in_stock" : "quantity_stock",
            "prod_id" : "product_nk"
        }) 

        # remove duplicate based on product_nk 
        data = data.drop_duplicates(subset='product_nk')

        #extract data from the 'products' table 
        products = extract_target('products')

        # lookup 'product_id' from products table based on 'product_nk'
        data['product_id'] = data['product_nk'].apply(lambda x : products.loc[products['product_nk']== x, 'product_id'].values[0])
        # coba pake map 
        # products_mapping = products.set_index('product_nk')['product_id']
        # data['product_id'] = data['product_nk'].map(products_mapping)

        # drop column created_at 
        data = data.drop(columns=['created_at'])

        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "success",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
        }
        return data
    except Exception as e :
        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
            "error_msg" : str(e)
        }
        handle_error(data = data, 
                     bucket_name = 'error-dellstore',
                     table_name = table_name, 
                     process = process)
    finally :
        etl_log(log_msg)



#### Transform Orders 

In [115]:
## this function is for transformation table inventory from staging to data warehouse
## - rename column orderid to order_nk 
## - get customer_id values from customer table by matching customer_nk 
## - rename orderdate to order_date 
## - rename netamount to net_amount 
## - rename totalamount to total_amount 

def transform_orders(data: pd.DataFrame, table_name: str) -> pd.DataFrame : 
    try :
        
        process = "transformation"

        # rename column 
        data = data.rename(columns = {
            "orderid" : "order_nk",
            "orderdate" : "order_date",
            "netamount" : "net_amount",
            "totalamount" : "total_amount",
            "customerid" : "customer_nk"
        })

        # extract table customer 
        customers = extract_target("customers")

        # lookup customer_id from customer table based on customer_nk
        data['customer_id'] = data['customer_nk'].apply(lambda x : 
                                                        customers.loc[customers['customer_nk']==x, 'customer_id'].values[0])
        # or using map
        # customer_mapping = customers.set_index('customer_nk')['customer_id']  
        # data['customer_id'] = data['customer_nk'].map(customer_mapping)

        # drop column created at and customer_nk (for identifier customer can use customer_id column)
        data = data.drop(columns = ['created_at', 'customer_nk'])

        log_msg = {
                "step" : "warehouse",
                "process": process,
                "status": "success",
                "source": "staging",
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        return data

    except Exception as e :
        log_msg = {
                "step" : "warehouse",
                "process": process,
                "status": "failed",
                "source": "staging",
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg" : str(e)
                }
        handle_error(
            data=data,
            bucket_name='error-dellstore',
            table_name = table_name,
            process=process
        )

    finally :
        etl_log(log_msg)


#### Transform Orderline

In [237]:
## this function is for transform orders table from staging to data warehouse 
## - rename column orderlineid to orderline_nk
## - rename column orderid to order_id and lookup from orders table based on orderid
## - rename column prod_id to product_id and lookup from products based on prod_id
## - rename column orderdate to order_date 

def transform_orderlines(data: pd.DataFrame, table_name: str) -> pd.DataFrame :
    try :
        process = "transformation"
        
        #rename column 
        data = data.rename(columns= {
            "orderlineid" : "orderline_nk",
            "orderid" : "order_nk",
            "prod_id" : "product_nk",
            "orderdate"  :"order_date"
        })
        # print(data)

        # extract data from orders table 
        orders = extract_target("orders")
        
        #lookup 'order_id' 
        data['order_id'] = data['order_nk'].apply(lambda x: orders.loc[orders['order_nk'] == x, 'order_id'].values[0])
        #or using map 
        # orders_mapping = orders.set_index('order_nk')['order_id']
        # data['order_id'] = data['order_nk'].map(orders_mapping)

        # extract data from products table 
        products = extract_target("products")

        #lookup product id 
        data['product_id'] = data['product_nk'].apply(lambda x: products.loc[products['product_nk'] == x, 'product_id'].values[0])
        #or using map 
        # products_mapping = products.set_index('product_nk')['product_id']
        # data['product_id'] = data['product_nk'].map(products_mapping)

        # drop unnecessary columns 
        data = data.drop(columns = ['created_at', 'order_nk', 'product_nk'])

        log_msg = {
                "step" : "warehouse",
                "process": process,
                "status": "success",
                "source": "staging",
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        
        return data

    except Exception as e :
        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp,
            "error_msg": str(e)
            }
        
        handle_error(
            data= data,
            bucket_name= 'error-dellstore',
            table_name = table_name,
            process= process
        )

    finally :
        etl_log(log_msg)


#### Transform customer_orders_history

Target:
- Table customers
- Table product
- Table orders
- Table orderlines

In [117]:
def transform_order_hist_cust(data: pd.DataFrame, table_name: str) -> pd.DataFrame:
    try:
        process = "transformation"
        # rename column for customers
        data = data.rename(columns={
                                'customer_id': 'customer_nk',
                                'customer_firstname': 'first_name',
                                'customer_lastname': 'last_name',
                                'customer_address1': 'address1',
                                'customer_address2': 'address2',
                                'customer_city': 'city',
                                'customer_state': 'state',
                                'customer_zip': 'zip',
                                'customer_country': 'country',
                                'customer_region': 'region',
                                'customer_email': 'email',
                                'customer_phone': 'phone',
                                'customer_creditcardtype': 'credit_card_type',
                                'customer_creditcard': 'credit_card',
                                'customer_creditcardexpiration': 'credit_card_expiration',
                                'customer_username': 'username',
                                'customer_password': 'password',
                                'customer_age': 'age',
                                'customer_income': 'income',
                                'customer_gender': 'gender'
                            }) 
        
        columns_to_keep = [
            'customer_nk', 'customer_id', 'first_name', 'last_name', 
            'address1', 'address2', 'city', 'state', 'zip', 
            'country', 'region', 'email', 'phone', 
            'credit_card_type', 'credit_card', 'credit_card_expiration', 
            'username', 'password', 'age', 'income', 'gender'
        ]

        # Drop unnecessary columns
        data = data.drop(columns=[col for col in data.columns if col not in columns_to_keep])

        # Deduplication based on customer_nk
        data = data.drop_duplicates(subset='customer_nk')

        log_msg = {
                "step" : "warehouse",
                "process": process,
                "status": "success",
                "source": "staging",
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        
        return data
    except Exception as e:
        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp,
            "error_msg": str(e)
            }
        print(e)
        handle_error(data = data, bucket_name='error-dellstore', table_name= table_name, process=process)
    finally:
        # Save the log message
        etl_log(log_msg)



In [249]:
def transform_order_hist_prod(data: pd.DataFrame, table_name: str) -> pd.DataFrame:
    """
    This function is used to transform data orders from customer_order_hist staging database to the data warehouse.
    """
    try:
        process = "transformation"
        # rename column for products
        data = data.rename(columns={
            'product_id': 'product_nk', 
            'product_category': 'category_nk', 
            'product_title': 'title', 
            'product_actor': 'actor', 
            'product_price': 'price', 
            'product_special': 'special', 
            'product_common_prod_id': 'common_prod_id'
        })

        # Deduplication based on product_nk
        data = data.drop_duplicates(subset='product_nk')

        # Extract data from the `categories` table
        categories = extract_target('categories')

        #Lookup `category_id` from `categories` table based on `category`   
        data['category_id'] = data['category_nk'].apply(lambda x: categories.loc[categories['category_nk'] == x, 'category_id'].values[0])
        
        # Get relevant columns
        data = data[['product_nk', 'category_id', 'title', 'actor', 'price', 'special', 'common_prod_id']]



        log_msg = {
                "step" : "warehouse",
                "process": process,
                "status": "success",
                "source": "staging",
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        
        return data
    except Exception as e:
        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp,
            "error_msg": str(e)
            }
        print(e)
        # Handling error: save data to Object Storage
        try:
            handle_error(data = data, bucket_name='error-dellstore', table_name= table_name, process=process)
        except Exception as e:
            print(e)
    finally:
        # Save the log message
        etl_log(log_msg)



In [250]:
def transform_order_hist_order(data: pd.DataFrame, table_name: str) -> pd.DataFrame:
    """
    This function is used to transform data order from customer_order_hist staging database to the data warehouse.
    """
    try:
        process = "transformation"
        # rename column for orders
        data = data.rename(columns={
                    'order_id': 'order_nk', 
                    'order_customerid': 'customer_nk', 
                    'order_date': 'order_date', 
                    'order_netamount': 'net_amount', 
                    'order_tax': 'tax', 
                    'order_totalamount': 'total_amount'
                })


        # Deduplication based on order_nk
        data = data.drop_duplicates(subset='order_nk')

        # Extract data from the `customers` table
        customer = extract_target('customers')

        #Lookup `customer_id` from `customers` table based on `customer_nk`   
        data['customer_id'] = data['customer_nk'].apply(lambda x: customer.loc[customer['customer_nk'] == x, 'customer_id'].values[0])
        
        # Get relevant columns
        data = data[['order_nk', 'customer_id', 'order_date', 'net_amount', 'tax', 'total_amount']]

        log_msg = {
                "step" : "warehouse",
                "process": process,
                "status": "success",
                "source": "staging",
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        
        return data
    except Exception as e:
        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp,
            "error_msg": str(e)
            }
        print(e)
        # Handling error: save data to Object Storage
        try:
            handle_error(data = data, bucket_name='error-dellstore', table_name= table_name, process=process)
        except Exception as e:
            print(e)
    finally:
        # Save the log message
        etl_log(log_msg)



In [251]:
def transform_order_hist_orderline(data: pd.DataFrame, table_name: str) -> pd.DataFrame:
    """
    This function is used to transform data orderline from customer_order_hist staging database to the data warehouse.
    """
    try:
        process = "transformation"
        #drop column order_date
        data = data.drop(columns=['order_date'])

        # rename column for orders
        data = data.rename(columns={
            'orderline_id': 'orderline_nk', 
            'order_id': 'order_nk', 
            'product_id': 'product_nk', 
            'orderline_quantity': 'quantity', 
            'orderline_orderdate': 'order_date'
        })

        # Deduplication based on order_nk
        data = data.drop_duplicates(subset=['orderline_nk','order_nk','product_nk','quantity'])

        # Extract data from the `orders` table
        orders = extract_target('orders')

        # Lookup `order_id` from `orders` table based on `orderid`   
        data['order_id'] = data['order_nk'].apply(lambda x: orders.loc[orders['order_nk'] == x, 'order_id'].values[0])
        
        # Extract data from the `product` table
        products = extract_target('products')

        # Lookup `product_id` from `product` table based on `prod_id`   
        data['product_id'] = data['product_nk'].apply(lambda x: products.loc[products['product_nk'] == x, 'product_id'].values[0])
        
        
        # Get relevant columns
        data = data[['orderline_nk', 'order_id', 'product_id', 'quantity', 'order_date']]
        
        log_msg = {
                "step" : "warehouse",
                "process": process,
                "status": "success",
                "source": "staging",
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        
        return data
    except Exception as e:
        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp,
            "error_msg": str(e)
            }
        print(e)
        # Handling error: save data to Object Storage
        try:
            handle_error(data = data, bucket_name='error-dellstore', table_name= table_name, process=process)
        except Exception as e:
            print(e)
    finally:
        # Save the log message
        etl_log(log_msg)



#### Transform Customer History 

In [118]:
def transform_cust_hist(data: pd.DataFrame, table_name: str) -> pd.DataFrame:
    try:
        process = "transformation"
        # rename column orderlines
        data = data.rename(columns={'customerid':'customer_nk', 
                                    'prod_id':'product_nk', 
                                    'orderid':'order_nk'})
        
        # Extract data from the `customers` table
        customers = extract_target('customers')

        # Lookup `customer_id` from `customers` table based on `customerid`   
        data['customer_id'] = data['customer_nk'].apply(lambda x: customers.loc[customers['customer_nk'] == x, 'customer_id'].values[0])
        

        # Extract data from the `orders` table
        orders = extract_target('orders')

        # Lookup `order_id` from `orders` table based on `orderid`   
        data['order_id'] = data['order_nk'].apply(lambda x: orders.loc[orders['order_nk'] == x, 'order_id'].values[0])
        
        # Extract data from the `product` table
        products = extract_target('products')

        # Lookup `product_id` from `product` table based on `prod_id`   
        data['product_id'] = data['product_nk'].apply(lambda x: products.loc[products['product_nk'] == x, 'product_id'].values[0])
        
        # drop unnecessary columns
        data = data.drop(columns=['customer_nk','order_nk','product_nk'])


        log_msg = {
                "step" : "warehouse",
                "process": process,
                "status": "success",
                "source": "staging",
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        
        return data
    except Exception as e:
        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp,
            "error_msg": str(e)
            }
        handle_error(data = data, bucket_name='error-dellstore', table_name= table_name, process=process)
    finally:
        # Save the log message
        etl_log(log_msg)



#### Transform order status analytics

In [119]:
def transform_order_status_analytic(data: pd.DataFrame, table_name: str) -> pd.DataFrame:
    try:
        process = "transformation"
        # rename column order_status_analytic
        data = data.rename(columns={'orderid':'order_nk'})

        # Extract data from the `orders` table
        orders = extract_target('orders')

        # Lookup `order_id` from `orders` table based on `orderid`   
        data['order_id'] = data['order_nk'].apply(lambda x: orders.loc[orders['order_nk'] == x, 'order_id'].values[0])
        
        # drop unnecessary columns
        data = data.drop(columns='created_at')

        log_msg = {
                "step" : "warehouse",
                "process": process,
                "status": "success",
                "source": "staging",
                "table_name": table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
        
        return data
    except Exception as e:
        log_msg = {
            "step" : "warehouse",
            "process": process,
            "status": "failed",
            "source": "staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp,
            "error_msg": str(e)
            }
        handle_error(data = data, bucket_name='error-dellstore', table_name= table_name, process=process)
    finally:
        # Save the log message
        etl_log(log_msg)



## Validation

#### Table Customer 

- Validasi format email
- Check nomor hp memiliki 10 digit
- Check expired CC pake format YYYY/MM

In [120]:
def validate_email_format(email) :
    email_regex = re.compile(r"^[\w\.-]+@(yahoo\.com|hotmail\.com|gmail\.com)$")
    return bool(email_regex.match(email))


In [121]:
def validate_phone_format(phone) :
    phone_regex = re.compile(r"^\d{10}$")
    return bool(phone_regex.match(phone))

In [122]:
def validate_credit_card_expiration_format(expiration_date) :
    expiration_date_regex = re.compile(r"^\d{4}/\d{2}$")
    return bool(expiration_date_regex.match(expiration_date))

#### Table Product

- Check harga apakah masih mencakup range 1 - 100

In [123]:
def validate_price_range(price) :
    return 0<= price <= 100 

#### Table Orders dan Orderline

table order :
- Check apakah kolom net_amount, tax dan total_amount memiliki nilai positif

table orderline :
- Check apakah kolom quantity memiliki nomor positif

In [124]:
def validate_positive_values(value) :
    return value >= 0

#### Table Order Status Analytic

- check status apakah partial, fulfilled atau backordered

In [125]:
def validate_order_status(status) :
    return status in ['partial', 
                      'fulfilled',
                      'backordered']

#### Validation Function

In [175]:
def validation_data(data: pd.DataFrame, table_name: str, validation_functions: dict) -> pd.DataFrame :
    try :
        # create report dataframe
        report_data = {f'validate_{name}' : data[name].apply(func) for name, func in validation_functions.items()}
        report_df = pd.DataFrame(report_data)

        # summarize status data by all conditions
        report_df['all_valid'] = report_df.all(axis=1)

        # # filter out valid rows (all_valid = 'True')
        valid_data_df = data[report_df['all_valid']]

        # # filter out invalid rows (all_valid = 'False)
        invalid_data_df = data[~report_df['all_valid']]

        # # create success log 
        log_msg = {
            "step" : "warehouse",
            "process" : "validation",
            "status" : "success",
            "source" : "staging",
            "table_name" : table_name, 
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        # print(invalid_data_df, valid_data_df)
        return valid_data_df, invalid_data_df
        # return invalid_data_df
    except Exception as e:
        # create fail log msg 
        log_msg = {
            "step" : "warehouse",
            "process" : "validation",
            "status" : "success",
            "source" : "staging",
            "table_name" : table_name, 
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg" : str(3)
        }
        # print(e)

    finally :
        etl_log(log_msg)


## Pipeline Warehouse

#### Categories

In [127]:
## text function validate 
# data = pd.DataFrame({
#     "customerid" : [11,12,13,14,15], 
#     "firstname" : ['Becky', 'Raymond', 'Melanie', 'Heather', 'Heather'],
#     "email" : ["beckycochran@yahoo.com","raymondyang@yahoo.com","melaniewade@yahoo.com","heathercruz@hotmail.com","heatherburgess@hotmail.com"],
#     "phone" : ["2415449050","1896033667","3029418206","3748672054","3354132892"],
#     "credit_card_expiration" : ["2010/03","2011/10","2009/11","2011/07","2008/05"]
# })

# print(data)
# validation_cust, invalid_cust = validation_data(data=data, table_name='customers', validation_functions = {"email": validate_email_format,
#                                                                                             "phone" : validate_phone_format,
#                                                                                             "credit_card_expiration" : validate_credit_card_expiration_format})

In [130]:
# extract staging
df_category = extract_staging(table_name = "categories", schema_name=DB_SCHEMA_STG)

In [131]:
# transform categories
category_tf = transform_categories(data = df_category, table_name="categories")

In [134]:
load_warehouse(data= category_tf, schema="public", table_name = "categories", idx_name="category_nk", source="staging")

#### Customer

In [136]:
# extract staging 
df_customers = extract_staging(table_name= 'customers', schema_name=DB_SCHEMA_STG)

In [137]:
# transform customer 
customer_tf = transform_customer(data=df_customers, table_name="customers")

In [None]:
# validation data customers 
valid_cust, invalid_cust = validation_data(data=customer_tf, table_name="customers",validation_functions={"email": validate_email_format,
                                                                                                          "phone" : validate_phone_format,
                                                                                                          "credit_card_expiration" : validate_credit_card_expiration_format
})

In [178]:
# load valid data from staging to data warehouse
load_warehouse(data=valid_cust, schema="public", table_name="customers", idx_name="customer_nk",
               source="staging")

In [179]:
# store invalid data to minio
if (not invalid_cust.empty) :
    handle_error(data=invalid_cust, bucket_name="error-dellstore", table_name="customers", process="validation")

#### Product

In [180]:
# Extract data from staging
df_product = extract_staging(table_name='products',schema_name= DB_SCHEMA_STG)

In [181]:
# transform data from staging 
product_tf = transform_product(data=df_product, table_name="products")

In [182]:
# validation data products
valid_prod, invalid_prod = validation_data(data=product_tf, table_name="products",validation_functions= {"price" : validate_price_range})

In [183]:
# load to data warehouse
load_warehouse(data=valid_prod, schema="public", table_name="products", idx_name="product_nk",
               source="staging")
# store invalid data to minio
if (not invalid_prod.empty) :
    handle_error(data=invalid_prod, bucket_name="error-dellstore", table_name="products"
                 , process="validation")

#### Inventory

In [184]:
# extract data from staging 
df_inventory = extract_staging(table_name="inventory", schema_name = DB_SCHEMA_STG)


In [189]:
# transform data staging inventory 
inventory_tf = transform_inventory(data=df_inventory, 
                                   table_name="inventory")

In [191]:
# load data after transform to data warehouse
load_warehouse(data=inventory_tf, schema="public",table_name="inventory",
                idx_name="product_nk", source="staging")

#### Orders

In [192]:
# extract data from staging 
df_orders = extract_staging(table_name="orders", schema_name=DB_SCHEMA_STG)

In [193]:
# transform data orders
orders_tf = transform_orders(data=df_orders, table_name="orders")

In [194]:
# validation data orders 
valid_orders, invalid_orders = validation_data(data=orders_tf, table_name="orders",
                                               validation_functions={"net_amount" : validate_positive_values,
                                                                     "tax": validate_positive_values,
                                                                     "total_amount" : validate_positive_values})

In [195]:
# load valid data orders to data warehouse
load_warehouse(data=valid_orders, schema="public", table_name="orders",
               idx_name="order_nk",
               source="staging")

In [196]:
# store invalid data to minio
if (not invalid_orders.empty) :
    handle_error(data=invalid_orders,bucket_name="error-dellstore", 
                 table_name="orders", process="validation")

#### Orderlines

In [227]:
# extract data from staging 
df_orderlines = extract_staging(table_name="orderlines", schema_name= DB_SCHEMA_STG)

In [212]:
print(df_orderlines)

       orderlineid  orderid  prod_id  quantity   orderdate  \
0                1     2001     9702         3  2004-03-04   
1                2     2001     3782         3  2004-03-04   
2                3     2001     2714         2  2004-03-04   
3                4     2001     4178         3  2004-03-04   
4                5     2001        4         2  2004-03-04   
...            ...      ...      ...       ...         ...   
50261            5    12000     7670         2  2004-12-15   
50262            6    12000     1054         1  2004-12-15   
50263            7    12000     1717         1  2004-12-15   
50264            8    12000     1807         3  2004-12-15   
50265            9    12000     3576         3  2004-12-15   

                      created_at  
0     2026-01-19 12:59:03.253602  
1     2026-01-19 12:59:03.253602  
2     2026-01-19 12:59:03.253602  
3     2026-01-19 12:59:03.253602  
4     2026-01-19 12:59:03.253602  
...                          ...  
50261 2026

In [238]:
# transform data orderlines
orderlines_tf = transform_orderlines(data=df_orderlines, table_name="orderlines")

In [240]:
# validation data orderlines 
valid_orderlines, invalid_orderlines = validation_data(data=orderlines_tf, table_name="orderlines", 
                                                       validation_functions={"quantity": validate_positive_values})

In [241]:
# load to data warehouse
load_warehouse(data=valid_orderlines, schema="public", table_name="orderlines",
               idx_name=['orderline_nk','order_id','product_id', 'quantity'],
               source="staging")

In [242]:
# store invalid data to minio
if (not invalid_orderlines.empty) : 
    handle_error(data=invalid_cust, bucket_name="error-dellstore", table_name="orderlines", process="validation")

#### Customer Orders History

In [None]:
# extract data from staging table customer_orders_history
df_order_hist = extract_staging(table_name="customer_orders_history", schema_name=DB_SCHEMA_STG)

In [None]:
# transform data customser_orders_history
cust_order_hist_tf = transform_order_hist_cust(data=df_order_hist, table_name="customer_orders_history")

In [246]:
# validation data 
valid_cust_order_hist, invalid_cust_order_hist = validation_data(data=cust_order_hist_tf, table_name="customer_orders_history", validation_functions={
    "email" : validate_email_format,
    "phone" : validate_phone_format,
    "credit_card_expiration" : validate_credit_card_expiration_format
})

In [247]:
# load data to warehouse 
load_warehouse(data=valid_cust_order_hist, 
               schema="public", 
               table_name = "customers",
               idx_name=['customer_nk'],
               source="staging"
               )

In [252]:
# transform data products 
prod_order_hist_tf = transform_order_hist_prod(data=df_order_hist, table_name="customer_orders_history")

In [253]:
# validation data products 
valid_order_hist_prod, invalid_order_hist_prod = validation_data(
    data=prod_order_hist_tf,
    table_name="products",
    validation_functions = {
        "price" : validate_price_range
    }
)

In [256]:
print(valid_order_hist_prod)

       product_nk                           category_id               title  \
0               3  eca34eae-0461-4be1-a517-6b2237243559  ACADEMY ADAPTATION   
1            8910  ba727e59-3780-4c93-8227-7ef9ad761695   ALABAMA TREATMENT   
2             155  66c43dc2-a217-43b5-a334-3a25a9a4f268   ACADEMY CLEOPATRA   
3            1471  92443c20-6cd0-4816-8ea7-9eee0dab9646          ACE ISLAND   
4            4014  92443c20-6cd0-4816-8ea7-9eee0dab9646       AFRICAN ALICE   
...           ...                                   ...                 ...   
10068        5220  abf303a9-b082-4172-84c5-0f687df7ffd7          AGENT DEER   
10070        6166  66c43dc2-a217-43b5-a334-3a25a9a4f268      AIRPLANE COLOR   
10076         644  179a94b6-0602-4388-ab22-b547173e273f       ACADEMY OSCAR   
10080        8330  92443c20-6cd0-4816-8ea7-9eee0dab9646   ALABAMA FORRESTER   
10082        9346  5ee0a3f3-9057-4553-9315-12de10139ad4      ALADDIN GALAXY   

               actor  price  special  common_prod_i

In [254]:
load_warehouse(data=valid_order_hist_prod, schema="public", table_name="products", idx_name=["product_nk"], source="staging")
if (not invalid_order_hist_prod.empty):
    handle_error(data=invalid_order_hist_prod, bucket_name='error-dellstore', table_name="products", process='validation')


In [258]:
# Data Orders
order_hist_tf = transform_order_hist_order(data=df_order_hist, table_name="customer_orders_history")
valid_order_hist, invalid_order_hist = validation_data(data=order_hist_tf, table_name="orders", validation_functions={"net_amount": validate_positive_values, 
                                                                                                           "tax": validate_positive_values, 
                                                                                                           "total_amount": validate_positive_values})
load_warehouse(data=valid_order_hist, schema="public", table_name="orders", idx_name=["order_nk"], source="staging")
if (not invalid_order_hist.empty):
    handle_error(data=invalid_order_hist, bucket_name='error-dellstore', table_name="orders", process='validation')

# Data Orderlines
orderline_hist_tf = transform_order_hist_orderline(data=df_order_hist, table_name="customer_orders_history")
valid_orderline_hist, invalid_orderline_hist = validation_data(data=orderline_hist_tf, table_name="orderlines", validation_functions={"quantity": validate_positive_values})
load_warehouse(data=valid_orderline_hist, schema="public", table_name="orderlines", 
               idx_name=["orderline_nk","order_id","product_id","quantity"], source="staging")
if (not invalid_orderline_hist.empty):
    handle_error(data=invalid_orderline_hist, bucket_name='error-dellstore', table_name="orderlines", process='validation')

In [260]:
#Data Customer History
df_cust_hist = extract_staging(table_name="cust_hist", schema_name=DB_SCHEMA_STG)
cust_hist_tf = transform_cust_hist(data=df_cust_hist, table_name="cust_hist")
load_warehouse(data=cust_hist_tf, schema="public", table_name="cust_hist", 
               idx_name=["customer_id","order_id","product_id"], source="staging")

In [262]:
#Data Order Status Analytic
df_order_analytic = extract_staging(table_name="order_status_analytic", schema_name=DB_SCHEMA_STG)
order_analytic_tf = transform_order_status_analytic(data=df_order_analytic, table_name="order_status_analytic")
valid_orders_analytic, invalid_orders_analytic = validation_data(data=order_analytic_tf, table_name="order_status_analytic", validation_functions={"status": validate_order_status})
load_warehouse(data=valid_orders_analytic, schema="public", table_name="order_status_analytic", 
               idx_name="order_id", source="staging")
if (not invalid_orders_analytic.empty):
    handle_error(data=invalid_orders_analytic, bucket_name='error-dellstore', table_name="order_status_analytic", process='validation')