## **Mentoring Week 8 - Data Integration and ETL Pipeline** ##

### Building Data Pipeline with Python and Pyspark - Pacmann AI ###

As a Data Engineer, we need to understand and assessing the quality of a given dataset containing sales data. This responsibilities include:

1. **Data Profiling:** Explore the dataset to gain insights into its structure and attributes.

2. **Data Quality Check:** Assess the validity and consistency of the data. Identify any anomalies or missing values.

3. **Recommendations:** Based on your findings, provide recommendations for cleaning and improving the dataset.

### **The Dataset**

* Use Docker Compose to run the container: [repository](https://github.com/Kurikulum-Sekolah-Pacmann/data_pipeline_exercise_4)
* This dataset provides detailed information about car sales and spread across multiple sources from the database, API, and Spreadsheet.

## **Pipeline Code**

### **~ Helper Function** ###

In [1]:
import os
from dotenv import load_dotenv
from sqlalchemy import create_engine
import sqlalchemy
from minio import Minio
from io import BytesIO
import pandas as pd
from datetime import datetime
import logging

# Load environment variables
load_dotenv()

# Setup Logging Configuration

LOG_DIR = "log"
LOG_FILE = os.path.join(LOG_DIR, "info_process.log")

# Ensure log directory exists
if not os.path.exists(LOG_DIR):
    os.makedirs(LOG_DIR)

# Configure logging only once
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)


# Database Connections
def get_db_connection(db_type):
    if db_type == 'source':
        return create_engine(f"postgresql://{os.getenv('SRC_POSTGRES_USER')}:{os.getenv('SRC_POSTGRES_PASSWORD')}@{os.getenv('SRC_POSTGRES_HOST')}:{os.getenv('SRC_POSTGRES_PORT')}/{os.getenv('SRC_POSTGRES_DB')}")
    elif db_type == 'staging':
        return create_engine(f"postgresql://{os.getenv('STG_POSTGRES_USER')}:{os.getenv('STG_POSTGRES_PASSWORD')}@{os.getenv('STG_POSTGRES_HOST')}:{os.getenv('STG_POSTGRES_PORT')}/{os.getenv('STG_POSTGRES_DB')}")
    elif db_type == 'warehouse':
        return create_engine(f"postgresql://{os.getenv('WH_POSTGRES_USER')}:{os.getenv('WH_POSTGRES_PASSWORD')}@{os.getenv('WH_POSTGRES_HOST')}:{os.getenv('WH_POSTGRES_PORT')}/{os.getenv('WH_POSTGRES_DB')}")
    elif db_type == 'log':
        return create_engine(f"postgresql://{os.getenv('LOG_POSTGRES_USER')}:{os.getenv('LOG_POSTGRES_PASSWORD')}@{os.getenv('LOG_POSTGRES_HOST')}:{os.getenv('LOG_POSTGRES_PORT')}/{os.getenv('LOG_POSTGRES_DB')}")

# Logging
def etl_log(log_msg: dict):
    # Write log to database
    try:
        conn = get_db_connection('log')
        df_log = pd.DataFrame([log_msg])
        df_log.to_sql(name="etl_log", con=conn, if_exists="append", index=False)
    except Exception as e:
        logging.error(f"Can't save log to DB. Cause: {str(e)}")

    # Write log to file
    try:
        log_line = ""

        for key, value in log_msg.items():
            log_line += f"{key}={value} | "

        log_line = log_line.rstrip(" | ")

        logging.info(log_line)
    except Exception as e:
        print(f"Can't save log to file. Cause: {str(e)}")

def read_etl_log(filter_params: dict) -> pd.DataFrame:
    """
    Reads the latest etl_date from the log table for incremental extraction.
    """
    try:
        # create connection to database        
        conn = get_db_connection('log')

        query = sqlalchemy.text("""
            SELECT MAX(etl_date) as latest_etl_date
            FROM etl_log
            WHERE 
                step = :step AND
                component = :component AND
                status = :status AND
                table_name ILIKE :table_name
        """)

        # Execute the query with pd.read_sql
        df = pd.read_sql(sql=query, con=conn, params=filter_params)

        #return extracted data
        return df
    
    except Exception as e:
        print(f"Can't execute your query. Cause: {str(e)}")
        return pd.DataFrame()

# Read SQL Query from Table
def read_sql(table_name: str) -> str:
    """
    Generates a basic SQL query to select all rows from the specified table
    """
    # // where created_at is greater than a given etl_date (parameterized) (WHERE created_at > :etl_date). 
    query = f"SELECT * FROM {table_name}"
    return query

def read_sql_inc(table_name: str) -> str:
    """
    Generates a basic SQL query to select all rows from the specified table
    Where created_at is greater than a given etl_date (parameterized) (WHERE created_at > :etl_date). 
    """
    query = f"SELECT * FROM {table_name} WHERE created_at > :etl_date"
    return query

# Create Function handle_error to dump failure data to MiniO
def handle_error(data, bucket_name: str, table_name: str, step: str, component: str):
    current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # Initialize MinIO client
    client = Minio('localhost:9000',
                access_key=os.getenv('MINIO_ACCESS_KEY'),
                secret_key=os.getenv('MINIO_SECRET_KEY'),
                secure=False)
    
    # Make a bucket if it doesn't exist
    if not client.bucket_exists(bucket_name):
        client.make_bucket(bucket_name)

    # Convert DataFrame to CSV and then to bytes
    csv_bytes = data.to_csv().encode('utf-8')
    csv_buffer = BytesIO(csv_bytes)

    # Upload the CSV file to the bucket
    client.put_object(
        bucket_name=bucket_name,
        object_name=f"{step}_{component}_{table_name}_{current_date}.csv",
        data=csv_buffer,
        length=len(csv_bytes),
        content_type='application/csv'
    )

    # List objects in the bucket
    objects = client.list_objects(bucket_name, recursive=True)
    for obj in objects:
        print(obj.object_name)

### **~ Data Profiling and Data Quality** ###

In [29]:
import pandas as pd
import json
import os
from datetime import datetime
from dotenv import load_dotenv
from sqlalchemy import create_engine
from helper.utils import etl_log
from staging.extract.extract_api import extract_api
from staging.extract.extract_spreadsheet import extract_sheet
from staging.extract.extract_db import extract_database

def extract_all_sources():
    """Extract data from all sources and return combined dictionary."""
    combined_data = {}

    # Extract from database
    df_car_sales = extract_database(table_name="car_sales")
    combined_data["db_car_sales"] = df_car_sales

    # Extract from API
    df_api = extract_api(
        link_api="https://raw.githubusercontent.com/Kurikulum-Sekolah-Pacmann/us_states_data/refs/heads/main/us_states.json",
        list_parameter={},
        data_name="regions"
    )
    combined_data["api_us_state"] = df_api

    # Extract from Spreadsheet
    spreadsheet_key = os.getenv("KEY_SPREADSHEET")
    worksheet_name = "brand_car"
    df_sheet = extract_sheet(spreadsheet_key, worksheet_name)
    combined_data["sheet_brand_car"] = df_sheet

    return combined_data

def table_shapes(data):
    return {table: df.shape for table, df in data.items()}

def column_types(data):
    return {table: {col: str(df[col].dtype) for col in df.columns} for table, df in data.items()}

def unique_values(data):
    target_columns = ['state', 'body', 'color', 'interior']
    result = {}

    # Only check unique values for the database data (db_car_sales)
    if "db_car_sales" in data:
        df = data["db_car_sales"]
        result["db_car_sales"] = {}
        for col in target_columns:
            result["db_car_sales"][col] = df[col].unique().tolist() if col in df.columns else []
    return result

def missing_value_percent(data):
    result = {}
    for table, df in data.items():
        result[table] = {}
        for col in df.columns:
            # Include '' and '—' as missing value
            missing_mask = df[col].isnull() | (df[col] == '') | (df[col] == '—')
            missing_percentage = round(float(missing_mask.mean() * 100), 2)
            result[table][col] = missing_percentage
    return result


def profile_report():
    data = extract_all_sources()

    report = {
        "person_in_charge": "Reza",
        "date_profiling": str(datetime.now()),
        "result": {}
    }

    shape_result = table_shapes(data)
    type_result = column_types(data)
    unique_result = unique_values(data)
    missing_result = missing_value_percent(data)

    for table in data.keys():
        report["result"][table] = {
            "shape": shape_result[table],
            "data_types": type_result[table],
            "unique_values": unique_result.get(table, {}),
            "missing_percentage": missing_result[table]
        }

    # Save JSON
    os.makedirs("profiling/output_profiling", exist_ok=True)
    output_path = os.path.join("profiling/output_profiling", "car_sales_profiling_report.json")
    with open(output_path, "w") as f:
        json.dump(report, f, indent=4)

    return report

# Run it
profile_report()


{'person_in_charge': 'Reza',
 'date_profiling': '2025-04-20 14:14:13.176267',
 'result': {'db_car_sales': {'shape': (30000, 17),
   'data_types': {'id_sales': 'int64',
    'year': 'int64',
    'brand_car': 'object',
    'model': 'object',
    'trim': 'object',
    'body': 'object',
    'transmission': 'object',
    'vin': 'object',
    'state': 'object',
    'condition': 'float64',
    'odometer': 'float64',
    'color': 'object',
    'interior': 'object',
    'seller': 'object',
    'mmr': 'float64',
    'sellingprice': 'float64',
    'saledate': 'object'},
   'unique_values': {'state': ['fl',
     'mo',
     'nj',
     'pa',
     'il',
     'tx',
     'ut',
     'mn',
     'ca',
     'md',
     'va',
     'ga',
     'qc',
     'ma',
     'mi',
     'tn',
     'nc',
     'oh',
     'ms',
     'co',
     'sc',
     'wi',
     'az',
     'hi',
     'ne',
     'wa',
     'ny',
     'pr',
     'nv',
     'on',
     'in',
     'la',
     'nm',
     'ab',
     'or',
     'ok',
     'ns',
  

------

### **1. STAGING** ###

### **STAGING_Extract from API** ###

In [30]:
import pandas as pd
from dotenv import load_dotenv
import requests
from datetime import datetime
from helper.utils import etl_log

def extract_api(link_api:str, list_parameter:dict, data_name:str) -> pd.DataFrame:
    log_msg = {
        "step": "staging",
        "component": "extract_api",
        "table_name": data_name,
        "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }

    try:
        # Establish connection to API        
        resp = requests.get(link_api, params=list_parameter)
        resp.raise_for_status()  # Raises error if status is not 200

        # Parse the response JSON
        raw_response = resp.json()

        # Convert the JSON data to a pandas DataFrame        
        df_api = pd.DataFrame(raw_response)

        # Convert the key into a list and return it as a DataFrame
        df_result = pd.DataFrame(df_api[data_name].tolist())

        # create success log message
        log_msg["status"] = "success"
        return df_result

    except requests.exceptions.RequestException as e:
        # create fail log message        
        print(f"API request error: {e}")
        log_msg["status"] = "failed"
        log_msg["error_msg"] = str(e)
        return pd.DataFrame()

    except ValueError as e:
        # create fail log message        
        print(f"JSON parsing error: {e}")
        log_msg["status"] = "failed"
        log_msg["error_msg"] = str(e)
        return pd.DataFrame()

    finally:
        etl_log(log_msg)

In [31]:
df_api = extract_api(link_api="https://raw.githubusercontent.com/Kurikulum-Sekolah-Pacmann/us_states_data/refs/heads/main/us_states.json", list_parameter="", data_name="regions")

In [32]:
df_api

Unnamed: 0,id_state,code,name
0,1,al,Alabama
1,2,ak,Alaska
2,3,az,Arizona
3,4,ar,Arkansas
4,5,ca,California
...,...,...,...
63,64,pr,Puerto Rico
64,65,vi,U.S. Virgin Islands
65,66,gu,Guam
66,67,mp,Northern Mariana Islands


### **STAGING_Extract from Database** ###

In [None]:
import pandas as pd
from sqlalchemy import text 
import sqlalchemy
from helper.utils import get_db_connection, etl_log, read_etl_log, read_sql
from datetime import datetime

def extract_database(table_name: str) -> pd.DataFrame:
    """
    Extracts data from the source database incrementally.
    """
    try:
        conn = get_db_connection('source')
        
        # Get the latest etl_date from the log table
        filter_log = {
            "step": "staging",
            "component": "load",
            "status": "success",
            "table_name": table_name
        }
        etl_date = read_etl_log(filter_log)

        # If no previous extraction has been recorded (etl_date is empty), set etl_date to '1111-01-01' indicating the initial load.
        # Otherwise, retrieve data added since the last successful extraction (etl_date).
        # Set etl_date for incremental extraction
        if etl_date.empty or etl_date['latest_etl_date'][0] is None:
            etl_date = '1111-01-01'
        else:
            etl_date = etl_date['latest_etl_date'][0]

        # Constructs a SQL query to select all columns from the specified table_name
        #  where created_at is greater than etl_date.
        """
        SELECT * 
        FROM car_sales 
        """
        # WHERE created_at > :etl_date
        query = sqlalchemy.text(read_sql(table_name))
        # df = pd.read_sql(sql=query, con=conn, params={"etl_date": etl_date})
        df = pd.read_sql(sql=query, con=conn, params={})

        # Log success
        log_msg = {
            "step": "staging",
            "component": "extract_database",
            "status": "success",
            # "source": "database",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        return df
    except Exception as e:
        # Log failure
        log_msg = {
            "step": "staging",
            "component": "extract_database",
            "status": "failed",
            # "source": "database",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg": str(e)
        }
        print(e)
    finally:
        etl_log(log_msg)

In [None]:
db_car

Unnamed: 0,id_sales,year,brand_car,model,trim,body,transmission,vin,state,condition,odometer,color,interior,seller,mmr,sellingprice,saledate
0,1,2014,Chevrolet,Impala Limited,LT Fleet,Sedan,automatic,2g1wb5e37e1112559,fl,4.0,21507.0,white,black,gm remarketing,13450.0,13800.0,Mon Feb 23 2015 05:00:00 GMT-0800 (PST)
1,2,2003,Dodge,Ram Pickup 1500,SLT,Quad Cab,,1d7ha18n13s152972,mo,31.0,79712.0,—,black,tdaf remarketing,6025.0,6300.0,Tue Jan 20 2015 02:30:00 GMT-0800 (PST)
2,3,2007,Pontiac,G6,GT,Convertible,automatic,1g2zh361474252178,nj,34.0,65698.0,red,black,car authority inc,7375.0,8000.0,Wed Jan 14 2015 01:30:00 GMT-0800 (PST)
3,4,2011,Toyota,Corolla,LE,Sedan,automatic,jtdbu4eexb9167571,fl,43.0,23634.0,black,beige,world omni financial corporation,10800.0,11400.0,Tue Jan 27 2015 01:30:00 GMT-0800 (PST)
4,5,2012,Lexus,ES 350,Base,Sedan,,jthbk1eg6c2495519,pa,35.0,26483.0,black,brown,meridian remarketing,22500.0,23300.0,Fri Jan 30 2015 01:00:00 GMT-0800 (PST)
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
29995,29996,2012,Ford,Explorer,XLT,SUV,automatic,1fmhk8d83cga41998,mi,36.0,103016.0,white,gray,automobiles paille inc,17700.0,16800.0,Thu Jan 29 2015 01:30:00 GMT-0800 (PST)
29996,29997,2012,Volkswagen,Jetta,Base,sedan,automatic,3vw2k7aj1cm312846,fl,36.0,41092.0,—,tan,vw credit,8475.0,9800.0,Wed Jun 10 2015 02:40:00 GMT-0700 (PDT)
29997,29998,2003,Toyota,Tacoma,Base,Regular Cab,,5tenl42n03z286594,az,19.0,292925.0,white,gray,ge fleet services for itself/servicer,3225.0,2400.0,Thu May 21 2015 05:00:00 GMT-0700 (PDT)
29998,29999,2014,Chevrolet,Impala Limited,LT Fleet,Sedan,automatic,2g1wb5e37e1147702,fl,4.0,25083.0,gray,gray,gm remarketing,12900.0,12800.0,Mon Jan 26 2015 05:00:00 GMT-0800 (PST)


### **STAGING_Extract from Google Spreadsheet** ###

In [None]:
import pandas as pd
from helper.utils import etl_log
from datetime import datetime
import gspread
from google.auth.transport.requests import Request
from google.auth import load_credentials_from_file
import os

def auth_gspread():
    """
    Authenticates with Google Sheets API.
    """
    scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
    
    #Define your credentials
    credentials, project = load_credentials_from_file((os.getenv('CRED_PATH')), scopes=scope)
    return gspread.authorize(credentials)

def init_key_file(key_file:str):
    #define credentials to open the file
    gc = auth_gspread()
    
    #open spreadsheet file by key
    sheet_result = gc.open_by_key(key_file)
    
    return sheet_result

def extract_sheet(key_file: str, worksheet_name: str) -> pd.DataFrame:
    """
    Extracts data from a Google Sheet.
    """
    try:
        # init sheet
        sheet_result = init_key_file(key_file)
        
        worksheet_result = sheet_result.worksheet(worksheet_name)
        
        df_result = pd.DataFrame(worksheet_result.get_all_values())
        
        # set first rows as columns
        df_result.columns = df_result.iloc[0]
        
        # get all the rest of the values
        df_result = df_result[1:].copy()

        # Add the 'created_at' column with the current datetime
        df_result['created_at'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # Log success
        log_msg = {
            "step": "staging",
            "component": "extract_spreadsheet",
            "status": "success",
            # "source": "spreadsheet",
            "table_name": worksheet_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        return df_result
    
    except Exception as e:
        # Log failure
        log_msg = {
            "step": "staging",
            "component": "extract_spreadsheet",
            "status": "failed",
            # "source": "spreadsheet",
            "table_name": worksheet_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg": str(e)
        }
        print(e)
    finally:
        etl_log(log_msg)

In [None]:
extract_sheet(key_file=os.getenv('KEY_SPREADSHEET'), worksheet_name="brand_car")

Unnamed: 0,brand_car_id,brand_name,created_at
1,1,Acura,2025-04-18 01:11:54
2,2,Audi,2025-04-18 01:11:54
3,3,Bentley,2025-04-18 01:11:54
4,4,BMW,2025-04-18 01:11:54
5,5,Buick,2025-04-18 01:11:54
6,6,Cadillac,2025-04-18 01:11:54
7,7,Chevrolet,2025-04-18 01:11:54
8,8,Chrysler,2025-04-18 01:11:54
9,9,Daewoo,2025-04-18 01:11:54
10,10,Dodge,2025-04-18 01:11:54


------

### **STAGING_Transform Datatype Car Sales** ###

In [63]:
import pandas as pd

def transform_datatype_car_sales(df: pd.DataFrame) -> pd.DataFrame:
    """
    Transforms car_sales data according to the source-to-target mapping.
    """

    # Define the columns to keep
    selected_columns = [
        "id_sales", "year", "brand_car", "transmission", "state",
        "condition", "odometer", "color", "interior", "mmr", "sellingprice"
    ]

    # Select only the necessary columns
    df = df[selected_columns].copy()

    # Apply transformations
    df["year"] = df["year"].astype(str)
    df["condition"] = df["condition"].astype(str)
    df["odometer"] = df["odometer"].astype(str)
    df["mmr"] = df["mmr"].astype(str)
    df["sellingprice"] = df["sellingprice"].astype(str)

    return df


In [64]:
tf_df_car_sales = transform_datatype_car_sales(df_car_sales)

In [65]:
load_staging(data=tf_df_car_sales, schema='public', table_name='car_sales', idx_name='id_sales')

### **STAGING_Load Staging** ###

In [58]:
import pandas as pd
from helper.utils import get_db_connection, etl_log, handle_error
from datetime import datetime
from pangres import upsert

def load_staging(data, schema: str, table_name: str, idx_name: str):
    try:
        conn = get_db_connection('staging')
        data = data.set_index(idx_name)

        # Do upsert (Update for existing data and Insert for new data)
        upsert(con=conn, 
               df=data, 
               table_name=table_name, 
               schema=schema, 
               if_row_exists="update")
        
        #create success log message
        log_msg = {
            "step": "staging",
            "component": "load",
            "status": "success",
            # "source": source,
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
    except Exception as e:
        log_msg = {
            "step": "staging",
            "component": "load",
            "status": "failed",
            # "source": source,
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg": str(e)
        }
        
        # Handling error: save data to Object Storage
        # try:
        #     handle_error(data = data, bucket_name='error-paccar', table_name= table_name, step='staging', component='load')
        # except Exception as e:
        #     print(e)

    finally:
        etl_log(log_msg)

### **STAGING PIPELINE** ###

In [45]:
    df_car_sales = extract_database(table_name="car_sales")

    # Extract data from api
    df_us_state = extract_api(link_api="https://raw.githubusercontent.com/Kurikulum-Sekolah-Pacmann/us_states_data/refs/heads/main/us_states.json", list_parameter="", data_name="regions")

    # Extract data from spreadsheet
    df_car_brand = extract_sheet(key_file=os.getenv('KEY_SPREADSHEET'), worksheet_name="brand_car")

    # Transform car sales data from database
    tf_df_car_sales = transform_car_sales(df_car_sales)
    
    # Load data into staging (except last column, created_at)
    load_staging(data=tf_df_car_sales, schema='public', table_name='car_sales', idx_name='id_sales')
    load_staging(data=df_us_state, schema='public', table_name='us_state', idx_name='id_state')
    load_staging(data=df_car_brand, schema='public', table_name='car_brand', idx_name='brand_car_id')

### **2. WAREHOUSE** ###

### **WAREHOUSE_Extract from Staging** ###

In [7]:
import pandas as pd
from sqlalchemy import text 
import sqlalchemy
from helper.utils import get_db_connection, etl_log, read_etl_log, read_sql_inc
from datetime import datetime

def extract_staging(table_name: str) -> pd.DataFrame:
    """
    Extracts data from the staging database incrementally.
    """
    try:
        conn = get_db_connection('staging')
        
        # Get the latest etl_date from the log table
        filter_log = {
            "step": "warehouse",
            "table_name": table_name,
            "status": "success",
            "component": "load"
        }
        etl_date = read_etl_log(filter_log)

        # If no previous extraction has been recorded (etl_date is empty), set etl_date to '1111-01-01' indicating the initial load.
        # Otherwise, retrieve data added since the last successful extraction (etl_date).
        # Set etl_date for incremental extraction
        if etl_date.empty or etl_date['latest_etl_date'][0] is None:
            etl_date = '1111-01-01'
        else:
            etl_date = etl_date['latest_etl_date'][0]

        # Constructs a SQL query to select all columns from the specified table_name where created_at is greater than etl_date.
        """
        SELECT * 
        FROM car_sales 
        WHERE created_at > :etl_date
        """
        query = sqlalchemy.text(read_sql_inc(table_name))
        df = pd.read_sql(sql=query, con=conn, params={"etl_date": etl_date})

        # Log success
        log_msg = {
            "step": "warehouse",
            "component": "extract",
            "status": "success",
            # "source": "database",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        return df
    except Exception as e:
        # Log failure
        log_msg = {
            "step": "warehouse",
            "component": "extract",
            "status": "failed",
            # "source": "database",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg": str(e)
        }
        print(e)
    finally:
        etl_log(log_msg)

In [97]:
stg_car_sales

Unnamed: 0,id_sales,year,brand_car,transmission,state,condition,odometer,color,interior,mmr,sellingprice,created_at
0,400,2009,BMW,,pa,33.0,58609.0,blue,tan,17250.0,19000.0,2025-04-17 18:40:11.879574
1,7,2012,Ford,automatic,il,32.0,39151.0,red,tan,9525.0,9700.0,2025-04-17 18:40:11.879574
2,2514,2003,Toyota,,nc,36.0,83638.0,black,grey,3550.0,4000.0,2025-04-17 18:40:11.879574
3,4469,2006,,,nj,36.0,59574.0,white,black,12800.0,8000.0,2025-04-17 18:40:11.879574
4,4957,2000,Ford,,va,,88098.0,black,unknown,1700.0,1900.0,2025-04-17 18:40:11.879574
...,...,...,...,...,...,...,...,...,...,...,...,...
29995,29996,2012,Ford,automatic,mi,36.0,103016.0,white,grey,17700.0,16800.0,2025-04-17 18:40:11.879574
29996,29997,2012,Volkswagen,automatic,fl,36.0,41092.0,unknown,tan,8475.0,9800.0,2025-04-17 18:40:11.879574
29997,29998,2003,Toyota,,az,19.0,292925.0,white,grey,3225.0,2400.0,2025-04-17 18:40:11.879574
29998,29999,2014,Chevrolet,automatic,fl,4.0,25083.0,grey,grey,12900.0,12800.0,2025-04-17 18:40:11.879574


### **WAREHOUSE_Transform Car Sales** ###

In [10]:
import pandas as pd
import numpy as np

def clean_and_merge_categories(df):
    # Mapping for merging similar categories in the 'color' column
    color_mapping = {
        '—': '', '': '', '16633': '', '6388': '', 
        'off-white': 'white', 'white': 'white', 'gray': 'grey'
    }

    # Mapping for merging similar categories in the 'interior' column
    interior_mapping = {
        '—': '', '': '', 'off-white': 'white', 'white': 'white', 
        'gray': 'grey', 'green': 'green'
    }

    # Apply the category merging mappings to the 'color' and 'interior' columns
    df['color'] = df['color'].astype(str).str.lower().map(color_mapping).fillna(df['color'])
    df['interior'] = df['interior'].astype(str).str.lower().map(interior_mapping).fillna(df['interior'])
    
    return df

def drop_invalid_values(df):
    # Replace '' and '—' to np.nan and drop
    df = df.replace({'': np.nan, '—': np.nan})

    # Drop all row with NaN
    df = df.dropna()

    # Drop rows where 'condition' is NaN
    df = df[df['condition'].notna()]

    return df

def drop_sales_by_id(df):
    # Function to drop rows where 'id_sales' is 8013 or 26976
    df = df[~df['id_sales'].isin([8013, 26976])]

    # Drop rows with invalid state code
    invalid_states = ['3vwd17aj5fm219943', '3vwd17aj5fm297123']
    df = df[~df['state'].isin(invalid_states)]

    return df

def mapping_target(df, df_car_brand, df_us_state):
    # Map 'brand_car' from car_sales to 'brand_car_id' from car_brand table
    brand_mapping = dict(zip(df_car_brand['brand_name'], df_car_brand['brand_car_id']))
    df['brand_car_id'] = df['brand_car'].map(brand_mapping)

    # Map 'state' from car_sales to 'id_state' from us_state table
    state_mapping = dict(zip(df_us_state['code'], df_us_state['id_state']))
    df['id_state'] = df['state'].map(state_mapping)

    # Convert 'year' from varchar to int4
    df['year'] = pd.to_numeric(df['year'], errors='coerce', downcast='integer')

    # Convert 'condition', 'odometer', 'mmr', and 'sellingprice' from varchar to float4
    df['condition'] = pd.to_numeric(df['condition'], errors='coerce', downcast='float')
    df['odometer'] = pd.to_numeric(df['odometer'], errors='coerce', downcast='float')
    df['mmr'] = pd.to_numeric(df['mmr'], errors='coerce', downcast='float')
    df['sellingprice'] = pd.to_numeric(df['sellingprice'], errors='coerce', downcast='float')

    # Rename columns to match the warehouse schema
    df = df.rename(columns={
        'id_sales': 'id_sales_nk',
        'sellingprice': 'selling_price'
    })

    # Ensure all required columns are present and ordered correctly
    warehouse_columns = [
        'id_sales_nk',
        'year',
        'brand_car_id',
        'transmission',
        'id_state',
        'condition',
        'odometer',
        'color',
        'interior',
        'mmr',
        'selling_price',
        'created_at'
    ]

    # Filter columns to match the warehouse schema
    df = df[warehouse_columns]
    
    return df

def transform_car_sales(df, df_car_brand, df_us_state):
    # Step 1: Clean and merge categories
    df = clean_and_merge_categories(df)
    
    # Step 2: Drop rows with invalid values
    df = drop_invalid_values(df)
    
    # Step 3: Drop rows with 'id_sales' 8013 and 26976
    df = drop_sales_by_id(df)

    # Step 4: Mapping and transformation
    df = mapping_target(df, df_car_brand, df_us_state)
    
    return df


### **WAREHOUSE_Load Warehouse** ###

In [6]:
import pandas as pd
from helper.utils import get_db_connection, etl_log
from datetime import datetime
from pangres import upsert

def load_warehouse(data, schema: str, table_name: str, idx_name: str):
    try:
        conn = get_db_connection('warehouse')
        data = data.iloc[:, :-1] #Remove crated_at in the last column        
        data = data.set_index(idx_name)

        # Do upsert (Update for existing data and Insert for new data)
        upsert(con=conn, 
               df=data, 
               table_name=table_name, 
               schema=schema, 
               if_row_exists="update")
        
        #create success log message
        log_msg = {
            "step": "warehouse",
            "component": "load",
            "status": "success",
            # "source": source,
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
    except Exception as e:
        log_msg = {
            "step": "warehouse",
            "component": "load",
            "status": "failed",
            # "source": source,
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg": str(e)
        }
        
        # Handling error: save data to Object Storage
        # try:
        #     handle_error(data = data, bucket_name='error-paccar', table_name= table_name, step='warehouse', component='load')
        # except Exception as e:
        #     print(e)

    finally:
        etl_log(log_msg)

### **WAREHOUSE PIPELINE** ###

In [None]:
from warehouse.extract.extract_stg import extract_staging
from warehouse.transform.transform_car_sales import transform_car_sales
from warehouse.load.load_wh import load_warehouse

def warehouse_pipeline():
    # Extract data from staging
    stg_car_sales = extract_staging("car_sales")
    stg_us_state = extract_staging("us_state")
    stg_car_brand = extract_staging("car_brand")

    # Transform car sales data from staging
    tf_stg_car_sales = transform_car_sales("stg_car_sales")

    # Load data into warehouse
    load_warehouse(data=tf_stg_car_sales, schema='public', table_name='car_sales', idx_name='id_sales_nk')

In [126]:
missing_states = stg_car_sales[stg_car_sales['id_state'].isnull()]
print("State code yang tidak ditemukan di mapping:", missing_states['state'].unique())

State code yang tidak ditemukan di mapping: ['3vwd17aj5fm219943' '3vwd17aj5fm297123']


In [119]:
tf_stg_car_sales['id_state'].info

<bound method Series.info of 0         9.0
2        30.0
3         9.0
5        38.0
6        13.0
         ... 
29989    13.0
29991     3.0
29992    35.0
29995    22.0
29998     9.0
Name: id_state, Length: 23999, dtype: float64>

### **3. MODELLING** ###

### **MODELLING_Extract Warehouse** ###

In [20]:
import pandas as pd
from sqlalchemy import text 
import sqlalchemy
from helper.utils import get_db_connection, etl_log, read_etl_log, read_sql
from datetime import datetime

def extract_warehouse(table_name: str) -> pd.DataFrame:
    """
    Extracts all data from the warehouse database.
    """
    try:
        conn = get_db_connection('warehouse')
        
        # Get the latest etl_date from the log table
        filter_log = {
            "step": "modelling",
            "component": "load",
            "status": "success",
            "table_name": table_name
        }
        etl_date = read_etl_log(filter_log)

        # If no previous extraction has been recorded (etl_date is empty), set etl_date to '1111-01-01' indicating the initial load.
        # Otherwise, retrieve data added since the last successful extraction (etl_date).
        # Set etl_date for incremental extraction
        if etl_date.empty or etl_date['latest_etl_date'][0] is None:
            etl_date = '1111-01-01'
        else:
            etl_date = etl_date['latest_etl_date'][0]

        # Constructs a SQL query to select all columns from the specified table_name
        #  where created_at is greater than etl_date.
        """
        SELECT * 
        FROM car_sales 
        """
        # WHERE created_at > :etl_date
        query = sqlalchemy.text(read_sql(table_name))
        # df = pd.read_sql(sql=query, con=conn, params={"etl_date": etl_date})
        df = pd.read_sql(sql=query, con=conn, params={})

        # Log success
        log_msg = {
            "step": "modelling",
            "component": "extract_warehouse",
            "status": "success",
            # "source": "database",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        return df
    except Exception as e:
        # Log failure
        log_msg = {
            "step": "modelling",
            "component": "extract_warehouse",
            "status": "failed",
            # "source": "database",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg": str(e)
        }
        print(e)
    finally:
        etl_log(log_msg)

### **MODELLING_Process Preprocessing** ###

In [21]:
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder
from datetime import datetime
from helper.utils import etl_log  # Make sure this utility function is available

def process_preprocessing(df: pd.DataFrame, features: list, target: str) -> pd.DataFrame:
    """
    Perform preprocessing: handle nulls, encoding, and scaling.

    Steps:
    1. Drop rows with '' or 'unknown' in features or target
    2. Encode categorical columns
    3. Scale numerical columns
    """

    try:
        # Step 1: Drop rows containing '' or 'unknown'
        # df = df[~df[features + [target]].isin(['', 'unknown']).any(axis=1)]
        
        # Log success message after cleaning missing data
        # log_msg = {
        #     "step": "modelling",
        #     "component": "preprocessing_drop_rows",
        #     "status": "success",
        #     "table_name": "car_sales",
        #     "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        # }
        # etl_log(log_msg)

        # Step 1: Encode categorical columns
        categorical_cols = df[features].select_dtypes(include='object').columns.tolist()
        for col in categorical_cols:
            le = LabelEncoder()
            df[col] = le.fit_transform(df[col])
        
        # Log success message after encoding categorical features
        log_msg = {
            "step": "modelling",
            "component": "preprocessing_encode_categorical",
            "status": "success",
            "table_name": "car_sales",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        etl_log(log_msg)

        # Step 2: Scale numerical columns
        numerical_cols = df[features].select_dtypes(include=['int64', 'float64']).columns.tolist()
        scaler = StandardScaler()
        df[numerical_cols] = scaler.fit_transform(df[numerical_cols])

        # Log success message after scaling numerical features
        log_msg = {
            "step": "modelling",
            "component": "preprocessing_scale_numerical",
            "status": "success",
            "table_name": "car_sales",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        etl_log(log_msg)
        
    except Exception as e:
        log_msg = {
            "step": "modelling",
            "component": "preprocessing_process_preprocessing",
            "status": "failed",
            "table_name": "car_sales",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg": str(e)
        }
        etl_log(log_msg)
        raise

    return df


### **MODELLING_Split Data** ###

In [22]:
from sklearn.model_selection import train_test_split
import pandas as pd
from datetime import datetime
from helper.utils import etl_log  # Make sure this utility function is available

def split_data(df: pd.DataFrame, features: list, target: str, test_size=0.2, random_state=42):
    """
    Split dataset into train and test sets
    """
    try:
        X = df[features]
        y = df[target]
        
        # Log success message before splitting
        log_msg = {
            "step": "modelling",
            "component": "split_data",
            "status": "success",
            "table_name": "car_sales",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        etl_log(log_msg)
        
        return train_test_split(X, y, test_size=test_size, random_state=random_state)

    except Exception as e:
        log_msg = {
            "step": "modelling",
            "component": "split_data",
            "status": "failed",
            "table_name": "car_sales",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg": str(e)
        }
        etl_log(log_msg)
        raise


In [23]:
import pandas as pd
from sqlalchemy import text 
import sqlalchemy
from helper.utils import get_db_connection, etl_log, read_etl_log, read_sql
from datetime import datetime

def extract_warehouse(table_name: str) -> pd.DataFrame:
    """
    Extracts all data from the warehouse database.
    """
    try:
        conn = get_db_connection('warehouse')
        
        # Get the latest etl_date from the log table
        filter_log = {
            "step": "modelling",
            "component": "load",
            "status": "success",
            "table_name": table_name
        }
        etl_date = read_etl_log(filter_log)

        # If no previous extraction has been recorded (etl_date is empty), set etl_date to '1111-01-01' indicating the initial load.
        # Otherwise, retrieve data added since the last successful extraction (etl_date).
        # Set etl_date for incremental extraction
        if etl_date.empty or etl_date['latest_etl_date'][0] is None:
            etl_date = '1111-01-01'
        else:
            etl_date = etl_date['latest_etl_date'][0]

        # Constructs a SQL query to select all columns from the specified table_name
        #  where created_at is greater than etl_date.
        """
        SELECT * 
        FROM car_sales 
        """
        # WHERE created_at > :etl_date
        query = sqlalchemy.text(read_sql(table_name))
        # df = pd.read_sql(sql=query, con=conn, params={"etl_date": etl_date})
        df = pd.read_sql(sql=query, con=conn, params={})

        # Log success
        log_msg = {
            "step": "modelling",
            "component": "extract_warehouse",
            "status": "success",
            # "source": "database",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        return df
    except Exception as e:
        # Log failure
        log_msg = {
            "step": "modelling",
            "component": "extract_warehouse",
            "status": "failed",
            # "source": "database",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg": str(e)
        }
        print(e)
    finally:
        etl_log(log_msg)

### **MODELLING_Linear Regression Main Script** ###

In [28]:
import pandas as pd
import joblib
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from datetime import datetime
from helper.utils import etl_log
from modelling.extract.extract_warehouse import extract_warehouse
from modelling.preprocessing.preprocessing_data import process_preprocessing
from modelling.preprocessing.splitting_data import split_data
import os

# Load environment variables
load_dotenv()

# Extract data from warehouse database
df = extract_warehouse(table_name="car_sales")

# Define features and target
features = ['year', 'condition', 'odometer', 'mmr']
target = 'selling_price'

try:
    # Step 1: Preprocess
    df_processed = process_preprocessing(df, features, target)
    df_processed = df_processed.dropna()

    # Log success message after preprocessing
    log_msg = {
        "step": "modelling",
        "component": "preprocess_data",
        "status": "success",
        "table_name": "car_sales",
        "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    etl_log(log_msg)

    # Step 2: Split data
    X_train, X_test, y_train, y_test = split_data(df_processed, features, target)

    # Log success message after splitting data
    log_msg = {
        "step": "modelling",
        "component": "split_data",
        "status": "success",
        "table_name": "car_sales",
        "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    etl_log(log_msg)

    # Step 3: Train model
    model = LinearRegression()
    model.fit(X_train, y_train)

    # Log success message after training model
    log_msg = {
        "step": "modelling",
        "component": "train_model",
        "status": "success",
        "table_name": "car_sales",
        "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    etl_log(log_msg)

    # Step 4: Evaluate
    y_pred = model.predict(X_test)
    print("MAE:", mean_absolute_error(y_test, y_pred))
    print("MSE:", mean_squared_error(y_test, y_pred))
    print("R²:", r2_score(y_test, y_pred))

    # Log success message after evaluation
    log_msg = {
        "step": "modelling",
        "component": "evaluate_model",
        "status": "success",
        "table_name": "car_sales",
        "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    etl_log(log_msg)

    # Step 5: Save model and dump to minIo
    model_filename = "carprice-model.pkl"
    joblib.dump(model, model_filename)

    # Upload the model to MinIO
    client = Minio('localhost:9001',
                access_key=os.getenv('MINIO_ACCESS_KEY'),
                secret_key=os.getenv('MINIO_SECRET_KEY'),
                secure=False)
    
    # Make a bucket if it doesn't exist
    bucket_name = "car-sales-modelling"
    if not client.bucket_exists(bucket_name):
        client.make_bucket(bucket_name)

    # Save the model to the MinIO bucket 'models'
    client.fput_object(bucket_name, model_filename, model_filename)

    # Log success message after saving model
    log_msg = {
        "step": "modelling",
        "component": "save_model",
        "status": "success",
        "table_name": "car_sales",
        "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    etl_log(log_msg)

except Exception as e:
    log_msg = {
        "step": "modelling",
        "component": "linear_regression",
        "status": "failed",
        "table_name": "car_sales",
        "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "error_msg": str(e)
    }
    etl_log(log_msg)
    raise


MAE: 1022.1468501632764
MSE: 2470617.4329239563
R²: 0.971674869947022
