<h1>Data Pipeline for Machine Learning Model</h1>

<h3>Build ETL Data Pipeline for Integrated Machine Learning Model</h3>

Data engineer is a crucial role in ecosystem of data, they responsible to provide precise and accurate data for implementation of machine learning model developed by data scientist. The responsibelity of data engineer including:

#### 1. **Data Pipeline Development**
   - Design and implement **ETL/ELT pipelines** (Extract, Transform, Load).  
   - Automate workflows using tools like **Apache Airflow, Luigi, or NiFi**.  
   - Ensure reliable and efficient data movement from sources to storage.

#### 2. **Data Storage & Management**
   - Manage **SQL & NoSQL databases** (PostgreSQL, MySQL, MongoDB, Cassandra).  
   - Work with **data warehouses** (Snowflake, BigQuery, Redshift) and **data lakes** (Hadoop, Delta Lake, S3).  
   - Optimize storage for performance and cost efficiency.

#### 3. **Data Processing & Transformation**
   - Process data in **batch (Spark, Hadoop)** and **real-time (Kafka, Flink)**.  
   - Clean, normalize, and transform raw data for analytics.  
   - Implement data quality checks and validation.  


#### 4. **Data Governance & Security**
   - Enforce **data security** (encryption, access controls).  
   - Ensure compliance with **GDPR, CCPA, HIPAA**.  
   - Maintain **metadata management & data lineage**.  

On this project, we will learn to build integrated data pipeline of machine learning model. We will not focus on developing good quality data to feed the model instead of develop the machine learning model. 

#### A. Dataset
- Run the Docker compose from this [repository](https://github.com/ibnufajar1994/machine-learning-pipeline)
- This dataset compiles detailed car sales records from diverse sources, including databases, APIs, and spreadsheets.


## **STAGING**

### Extract Data From Source

In [1]:
import pandas as pd
import os
from dotenv import load_dotenv
from src.utils.helper import auth_gspread
from src.utils.engine import init_engine
from src.utils.load_log import LOAD_LOG
from datetime import datetime
import requests

load_dotenv()

GS_KEY = os.getenv("GS_KEY")
CRED_PATH = os.getenv("CRED_PATH")
worksheet_name = os.getenv("WORKSHEET_NAME")
link_api = os.getenv("LINK_API")

def extract_spreadsheet():
    try:

        gc = auth_gspread()
            
        # init spreadsheet by key
        sheet_result = gc.open_by_key(GS_KEY)
                
        # read spreadsheet data
        worksheet_result = sheet_result.worksheet(worksheet_name)

        # convert it to dataframe
        df_result = pd.DataFrame(worksheet_result.get_all_values())
                
        # set first rows as headers columns
        df_result.columns = df_result.iloc[0]
                
        # get all the rest of the values
        df_result = df_result[1:].copy()

        log_msg = {
            "step" : "staging",
            "component":"extraction",
            "status": "success!",
            "table_name": "brand_car",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
        }
        
        return df_result
    
    except Exception as e:
        log_msg = {
            "step" : "staging",
            "component":"extraction",
            "status": "failed!",
            "table_name": "brand_car",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
            "error_msg": str(e)
        }

    finally:
        LOAD_LOG(log_msg)
        
def extract_db_source(table_name: str) -> pd.DataFrame:
    try:

        src_engine = init_engine("source")

        df_data = pd.read_sql(sql = f"select * from {table_name}",
                              con = src_engine)

        log_msg = {
            "step" : "staging",
            "component":"extraction",
            "status": "success!",
            "table_name": "car_sales",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
        }

        return df_data

    except Exception as e:

        log_msg = {
            "step" : "staging",
            "component":"extraction",
            "status": "failed",
            "table_name": "car_sales",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 
            "error_msg": str(e)
        }

    finally:
        LOAD_LOG(log_msg)



def extract_api():
    try:
        response = requests.get(link_api)
        
        data = response.json()
        
        # Jika data memiliki kunci 'regions', gunakan itu
        if 'regions' in data:
            df = pd.DataFrame(data['regions'])
        else:
            # Jika tidak, gunakan seluruh data
            df = pd.DataFrame(data)
        
        log_msg = {
            "step": "staging",
            "component": "extraction",
            "status": "success!",
            "table_name": "us_state",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        }

        return df
    
    except Exception as e:
        log_msg = {
            "step": "staging",
            "component": "extraction",
            "status": "failed",
            "table_name": "us_state",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 
            "error_msg": str(e)
        }

    finally:
        LOAD_LOG(log_msg)

In [2]:
df_car_sales = extract_db_source("car_sales")
df_car_sales

Unnamed: 0,id_sales,year,brand_car,model,trim,body,transmission,vin,state,condition,odometer,color,interior,seller,mmr,sellingprice,saledate
0,1,2014,Chevrolet,Impala Limited,LT Fleet,Sedan,automatic,2g1wb5e37e1112559,fl,4.0,21507.0,white,black,gm remarketing,13450.0,13800.0,Mon Feb 23 2015 05:00:00 GMT-0800 (PST)
1,2,2003,Dodge,Ram Pickup 1500,SLT,Quad Cab,,1d7ha18n13s152972,mo,31.0,79712.0,—,black,tdaf remarketing,6025.0,6300.0,Tue Jan 20 2015 02:30:00 GMT-0800 (PST)
2,3,2007,Pontiac,G6,GT,Convertible,automatic,1g2zh361474252178,nj,34.0,65698.0,red,black,car authority inc,7375.0,8000.0,Wed Jan 14 2015 01:30:00 GMT-0800 (PST)
3,4,2011,Toyota,Corolla,LE,Sedan,automatic,jtdbu4eexb9167571,fl,43.0,23634.0,black,beige,world omni financial corporation,10800.0,11400.0,Tue Jan 27 2015 01:30:00 GMT-0800 (PST)
4,5,2012,Lexus,ES 350,Base,Sedan,,jthbk1eg6c2495519,pa,35.0,26483.0,black,brown,meridian remarketing,22500.0,23300.0,Fri Jan 30 2015 01:00:00 GMT-0800 (PST)
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
29995,29996,2012,Ford,Explorer,XLT,SUV,automatic,1fmhk8d83cga41998,mi,36.0,103016.0,white,gray,automobiles paille inc,17700.0,16800.0,Thu Jan 29 2015 01:30:00 GMT-0800 (PST)
29996,29997,2012,Volkswagen,Jetta,Base,sedan,automatic,3vw2k7aj1cm312846,fl,36.0,41092.0,—,tan,vw credit,8475.0,9800.0,Wed Jun 10 2015 02:40:00 GMT-0700 (PDT)
29997,29998,2003,Toyota,Tacoma,Base,Regular Cab,,5tenl42n03z286594,az,19.0,292925.0,white,gray,ge fleet services for itself/servicer,3225.0,2400.0,Thu May 21 2015 05:00:00 GMT-0700 (PDT)
29998,29999,2014,Chevrolet,Impala Limited,LT Fleet,Sedan,automatic,2g1wb5e37e1147702,fl,4.0,25083.0,gray,gray,gm remarketing,12900.0,12800.0,Mon Jan 26 2015 05:00:00 GMT-0800 (PST)


In [3]:
df_car_brand = extract_spreadsheet()
df_car_brand.head()

Unnamed: 0,brand_car_id,brand_name
1,1,Acura
2,2,Audi
3,3,Bentley
4,4,BMW
5,5,Buick


In [4]:
df_us_state = extract_api()
df_us_state.head()

Unnamed: 0,id_state,code,name
0,1,al,Alabama
1,2,ak,Alaska
2,3,az,Arizona
3,4,ar,Arkansas
4,5,ca,California


#### Load data to Staging Database

In [5]:
import pandas as pd
from src.utils.load_log import LOAD_LOG
from src.utils.engine import init_engine
from datetime import datetime


def load_to_staging(data: pd.DataFrame, table_name: str) -> None:
    try:
        stg_engine = init_engine("staging")

        data = data.copy()

        data.to_sql(name = table_name,
                        con = stg_engine,
                        if_exists = "replace",
                        index = False)
        
        log_msg = {
            "step" : "staging",
            "component":"load",
            "status": "success!",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
        }
        
    
    except Exception as e:
        log_msg = {
            "step" : "staging",
            "component":"load",
            "status": "failed!",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
            "error_msg": str(e)
        }

    finally:
        LOAD_LOG(log_msg)
        stg_engine.dispose()

In [6]:
load_to_staging(data=df_car_sales, table_name="car_sales")
load_to_staging(data=df_car_brand, table_name="car_brand")
load_to_staging(data=df_us_state, table_name="us_state")

## **WAREHOUSE**

#### Extract Data From Staging

In [7]:
import pandas as pd
from src.utils.engine import init_engine
from src.utils.load_log import LOAD_LOG
from datetime import datetime



def extract_db_staging(table_name: str) -> pd.DataFrame:
    try:

        src_engine = init_engine("staging")

        df_data = pd.read_sql(sql = f"select * from {table_name}",
                              con = src_engine)

        log_msg = {
            "step" : "warehouse",
            "component":"extraction",
            "status": "success!",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
        }

        return df_data

    except Exception as e:

        log_msg = {
            "step" : "staging",
            "component":"extraction",
            "status": "failed",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 
            "error_msg": str(e)
        }

    finally:
        LOAD_LOG(log_msg)


In [8]:
df1 = extract_db_staging("car_sales") #df 1 is car_sales
df2 = extract_db_staging("car_brand") #df 1 is car_brand
df3 = extract_db_staging("us_state")#df 3 is us_state

#### Transform Data after Extracted from Staging

In [9]:
from src.utils.load_log import LOAD_LOG
from datetime import datetime

import pandas as pd
class Transformation():


    def __init__(self, data: pd.DataFrame, table_name: str) -> None:
        self.data = data
        self.table_name = table_name
        self.columns = data.columns

    
    def drop_missing_value(self, col_name):
        try:
            if isinstance(col_name, str):
                col_name = [col_name]
            
            self.data = self.data.dropna(subset=col_name)

            # Update column information
            self.columns = self.data.columns

                    
            log_msg = {
                    "step" : "Warehouse",
                    "component":"Transformation (Drop Missing Value)",
                    "status": "success!",
                    "table_name": self.table_name,
                    "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }
                

            return self.data
        
        except Exception as e:
            
            log_msg = {
                    "step" : "Warehouse",
                    "component":"Transformation (Drop Missing Value)",
                    "status": "Failed!",
                    "table_name": self.table_name,
                    "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
                    "error_msg": str(e)
                }

            
        finally:
            LOAD_LOG(log_msg)

    
    def drop_invalid_value(self, col_names: list, invalid_values: list):
        try:

            # Iterasi setiap kolom yang ada dalam col_names
            for col_name in col_names:
                # Hapus baris yang memiliki nilai invalid pada kolom ini
                self.data = self.data[~self.data[col_name].isin(invalid_values)]

            # Update column information
            self.columns = self.data.columns

            # Logging message
            log_msg = {
                    "step" : "Warehouse",
                    "component": "Transformation (Drop Invalid Value)",
                    "status": "success!",
                    "table_name": self.table_name,
                    "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }

            return self.data

        except Exception as e:
            # Logging error message
            log_msg = {
                    "step" : "Warehouse",
                    "component": "Transformation (Drop Invalid Value)",
                    "status": "Failed!",
                    "table_name": self.table_name,
                    "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
                    "error_msg": str(e)
                }

        finally:
            # Ensure that log is written regardless of success or failure
            LOAD_LOG(log_msg)


    def to_lower_case(self, col_names: list):
        try:

            # Ubah semua nilai di kolom yang ditentukan menjadi huruf kapital
            for col_name in col_names:
                self.data[col_name] = self.data[col_name].str.lower()

            # Update column information
            self.columns = self.data.columns

            # Logging message
            log_msg = {
                    "step" : "Warehouse",
                    "component": "Transformation (To Lower Case)",
                    "status": "success!",
                    "table_name": self.table_name,
                    "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
                }

            return self.data

        except Exception as e:
            # Logging error message
            log_msg = {
                    "step" : "Warehouse",
                    "component": "Transformation (To Lower Case)",
                    "status": "Failed!",
                    "table_name": self.table_name,
                    "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
                    "error_msg": str(e)
                }

        finally:
            # Ensure that log is written regardless of success or failure
            LOAD_LOG(log_msg)


    def join_data(self, df1: pd.DataFrame, df2: pd.DataFrame):
        try:
            # Simpan dataframe asli (self.data) sebagai df
            df = self.data
            
            # Join df dengan df1 berdasarkan brand_car dan brand_name
            merged_df = df.merge(
                df1,
                left_on="brand_car",
                right_on="brand_name",
                how="left"
            )
            
            # Join hasil merge pertama dengan df2 berdasarkan state dan code
            final_df = merged_df.merge(
                df2,
                left_on="state",
                right_on="code",
                how="left"
            )
            
            # Update dataframe internal
            self.data = final_df
            
            # Update column information
            self.columns = self.data.columns
            
            # Logging message
            log_msg = {
                "step": "Warehouse",
                "component": "Transformation (Join Data)",
                "status": "success!",
                "table_name": self.table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")  # Current timestamp
            }
            
            return self.data
            
        except Exception as e:
            # Logging error message
            log_msg = {
                "step": "Warehouse",
                "component": "Transformation (Join Data)",
                "status": "Failed!",
                "table_name": self.table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
                "error_msg": str(e)
            }
            
        finally:
            # Ensure that log is written regardless of success or failure
            LOAD_LOG(log_msg)


    def select_merged_columns(self):
        try:
            # Daftar kolom yang dipilih (TANPA brand_car!)
            selected_columns = [
                "id_sales", "year", "brand_car_id", "transmission", "id_state", "odometer", 
                "condition", "color", "interior", "mmr", "sellingprice"
            ]
            
            # Cek kolom yang tersedia
            for col in selected_columns:
                if col not in self.data.columns:
                    print(f"Warning: Column '{col}' not found in dataframe")
            
            # Filter hanya kolom yang benar-benar ada
            valid_columns = [col for col in selected_columns if col in self.data.columns]
            
            # Memperbarui self.data secara internal
            self.data = self.data[valid_columns]
            
            # Update column information
            self.columns = self.data.columns
            
            
            log_msg = {
                "step": "Warehouse",
                "component": "Transformation (Select Merged Columns)",
                "status": "success!",
                "table_name": self.table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }
            
            return self.data
            
        except Exception as e:
            print(f"Error in select_merged_columns: {str(e)}")
            log_msg = {
                "step": "Warehouse",
                "component": "Transformation (Select Merged Columns)",
                "status": "Failed!",
                "table_name": self.table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }
            
        finally:
            LOAD_LOG(log_msg)

    def cast_columns(self):
        try:
            # Dictionary mapping tipe data string ke tipe data pandas
            type_mapping = {
                "integer": "int64",
                "float": "float64",
                "string": "object"
            }
            
            # Dictionary kolom dan tipe data target
            data_types = {
                "id_sales": "integer",
                "year": "integer",
                "brand_car_id": "integer",
                "transmission": "string",
                "id_state": "integer",
                "condition": "float", 
                "odometer": "float",
                "color": "string",
                "interior": "string",
                "mmr": "float",
                "sellingprice": "float"
            }
            
            # Seleksi hanya kolom yang ada di dataframe
            existing_columns = [col for col in data_types.keys() if col in self.data.columns]
            
            # Lakukan casting untuk setiap kolom
            for column in existing_columns:
                try:
                    target_type = type_mapping[data_types[column]]
                    
                    # Tangani nilai yang mungkin error saat konversi
                    if data_types[column] in ["integer", "float"]:
                        # Konversi ke numerik, dengan coercing errors menjadi NaN
                        self.data[column] = pd.to_numeric(self.data[column], errors='coerce')
                    
                    # Terapkan tipe data
                    self.data[column] = self.data[column].astype(target_type)
                    
                except Exception as e:
                    print(f"Error casting column {column}: {str(e)}")
            
            # Logging message
            log_msg = {
                "step": "Warehouse",
                "component": "Transformation (Cast Columns)",
                "status": "success!",
                "table_name": self.table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }
            
            return self.data
            
        except Exception as e:
            # Logging error message
            log_msg = {
                "step": "Warehouse",
                "component": "Transformation (Cast Columns)",
                "status": "Failed!",
                "table_name": self.table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }
            
        finally:
            # Ensure that log is written regardless of success or failure
            LOAD_LOG(log_msg)

    def rename_columns(self):
        try:
            # Dictionary mapping nama kolom original ke nama kolom baru
            column_mapping = {
                "id_sales": "id_sales_nk",
                "sellingprice": "selling_price"
            }
            
            # Filter hanya kolom yang ada di dataframe
            valid_columns = {k: v for k, v in column_mapping.items() if k in self.data.columns}
            
            # Rename kolom
            if valid_columns:
                self.data = self.data.rename(columns=valid_columns)
                
                # Update column information
                self.columns = self.data.columns
            
            # Logging message
            log_msg = {
                "step": "Warehouse",
                "component": "Transformation (Rename Columns)",
                "status": "success!",
                "table_name": self.table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }
            
            return self.data
            
        except Exception as e:
            # Logging error message
            log_msg = {
                "step": "Warehouse",
                "component": "Transformation (Rename Columns)",
                "status": "Failed!",
                "table_name": self.table_name,
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }
            
        finally:
            # Ensure that log is written regardless of success or failure
            LOAD_LOG(log_msg)


        


In [10]:
df1_trans = Transformation(data=df1, table_name="car_sales")
df2_trans = Transformation(data =df2, table_name="car_brand")
df3_trans = Transformation(data=df3, table_name="us_state")

In [11]:
## Transformation of df1 (car_sales data)
missing_value_col = ["odometer", "mmr", "condition"]
invalid_value_col = ["brand_car", "model", "trim", "body", "transmission", "vin", "state", "color", "interior", "seller"]
invalid_value = ["", "—","3vwd17aj5fm219943", "3vwd17aj5fm297123"]
to_lowercase_col = ["brand_car", "model", "trim", "body", "transmission", "color", "interior", "seller"]

df1_clean = df1_trans.drop_missing_value(missing_value_col)
df1_clean = df1_trans.drop_invalid_value(invalid_value_col, invalid_value)
df1_clean = df1_trans.to_lower_case(to_lowercase_col)

In [12]:
## Transformation of df2 (car_brand data)
to_lowercase_col2 = ["brand_name"]
df2_clean = df2_trans.to_lower_case(to_lowercase_col2)

In [13]:
## Transformation of df3 (us_state data)
to_lowercase_col3 = ["name"]
df3_clean = df3_trans.to_lower_case(to_lowercase_col3)

In [14]:
df1_clean = df1_trans.join_data(df2_clean, df3_clean)

In [15]:
# Sel [15]
df1_clean = df1_trans.select_merged_columns()


# Sel [16]
df1_clean = df1_trans.cast_columns()

# Sel [17]
df1_clean = df1_trans.rename_columns()

df1_clean.head()

Unnamed: 0,id_sales_nk,year,brand_car_id,transmission,id_state,odometer,condition,color,interior,mmr,selling_price
0,1,2014,7,automatic,9,21507.0,4.0,white,black,13450.0,13800.0
1,3,2007,38,automatic,30,65698.0,34.0,red,black,7375.0,8000.0
2,4,2011,48,automatic,9,23634.0,43.0,black,beige,10800.0,11400.0
3,6,2012,17,automatic,38,29050.0,34.0,burgundy,tan,12350.0,12700.0
4,7,2012,13,automatic,13,39151.0,32.0,red,tan,9525.0,9700.0


#### Load Data into Warehouse Database

In [16]:
import pandas as pd
from src.utils.load_log import LOAD_LOG
from src.utils.engine import init_engine
from datetime import datetime


def load_to_warehouse(data: pd.DataFrame, table_name: str) -> None:
    try:
        stg_engine = init_engine("warehouse")

        data = data.copy()

        data.to_sql(name = table_name,
                        con = stg_engine,
                        if_exists = "append",
                        index = False)
        
        log_msg = {
            "step" : "warehouse",
            "component":"load",
            "status": "success!",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
        }
        
    
    except Exception as e:
        log_msg = {
            "step" : "warehouse",
            "component":"load",
            "status": "failed!",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),  # Current timestamp
            "error_msg": str(e)
        }

    finally:
        LOAD_LOG(log_msg)
        stg_engine.dispose()

In [17]:
load_to_warehouse(data=df1_clean, table_name="car_sales")

## **MACHINE LEARNING MODEL**

#### Extract Clean Data from Warehouse

In [1]:
import pandas as pd
from src.utils.engine import init_engine
from src.utils.load_log import LOAD_LOG
from datetime import datetime

def extract_warehouse(table_name: str) -> pd.DataFrame:
    try:
        # Initialize database engine
        src_engine = init_engine("warehouse")

        # Extract data from SQL table
        df_data = pd.read_sql(sql=f"select * from {table_name}",
                              con=src_engine)

        # Exclude the last column ('created_at')
        df_data = df_data.iloc[:, :-1]  # Or use df_data.drop(columns=["created_at"])

        # Log success message
        log_msg = {
            "step": "modelling",
            "component": "extraction",
            "status": "success!",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        }

        return df_data

    except Exception as e:
        # Log failure message in case of an error
        log_msg = {
            "step": "modelling",
            "component": "extraction",
            "status": "failed",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_msg": str(e)
        }

    finally:
        # Save the log
        LOAD_LOG(log_msg)


In [2]:
df = extract_warehouse(table_name="car_sales")

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import GridSearchCV
import numpy as np
from datetime import datetime
import joblib
import os
from minio import Minio
from src.utils.load_log import LOAD_LOG

class CarPriceModel:
    def __init__(self, data: pd.DataFrame) -> None:
        """
        Initialize the car price prediction model
        
        Parameters:
        -----------
        data : pd.DataFrame
            DataFrame containing car data
        """
        self.data = data
        self.features = ['odometer_log', 'condition', 'car_age', 'brand_car_id', 'transmission', 'color', 'mmr']
        self.target = 'selling_price'
        self.model = None
        self.scaler = StandardScaler()
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.X_encoded = None
        self.y = None
        
    def feature_engineering(self):
        """
        Create new features needed for the model
        """       
        try:


            # Calculate the car age based on the reference year 2015
            self.data['car_age'] = 2015 - self.data['year']
            
            # Log transformation for the odometer
            self.data['odometer_log'] = np.log1p(self.data['odometer'])

            # Log success message
            log_msg = {
                "step": "modelling",
                "component": "Feature Engineering",
                "status": "success!",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }

            
            return self
        
        except Exception as e:
            # Log failure message in case of an error
            log_msg = {
                "step": "modelling",
                "component": "Feature Engineering",
                "status": "failed",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }

        finally:
            # Save the log
            LOAD_LOG(log_msg)
    
    def prepare_data(self):
        """
        Prepare data for modeling, including encoding categorical features
        """
        try:
            # One-hot encoding for categorical variables
            self.X_encoded = pd.get_dummies(self.data[self.features], drop_first=True)
            
            # Target is 'selling_price'
            self.y = self.data[self.target]
            # Log success message
            log_msg = {
                "step": "modelling",
                "component": "Prepare Data",
                "status": "success!",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }
            
            return self
        
        except Exception as e:
            # Log failure message in case of an error
            log_msg = {
                "step": "modelling",
                "component": "Prepare Data",
                "status": "failed",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }

        finally:
            # Save the log
            LOAD_LOG(log_msg)
    
    def scale_features(self):
        """
        Normalize features using StandardScaler
        """
        try:

            self.X_scaled = self.scaler.fit_transform(self.X_encoded)

            log_msg = {
                "step": "modelling",
                "component": "Scale Features",
                "status": "success!",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }

            return self
        
        except Exception as e:
            # Log failure message in case of an error
            log_msg = {
                "step": "modelling",
                "component": "Scale Feature",
                "status": "failed",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }

        finally:
            # Save the log
            LOAD_LOG(log_msg)
    
    def split_data(self, test_size=0.2, random_state=42):
        """
        Split data into training and testing sets
        
        Parameters:
        -----------
        test_size : float, default=0.2
            Proportion of dataset to be used for testing
        random_state : int, default=42
            Random state for reproducibility
        """
        try:
            self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
                self.X_scaled, self.y, test_size=test_size, random_state=random_state
            )

            log_msg = {
                "step": "modelling",
                "component": "Splitting Data",
                "status": "success!",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }

            return self
        
        except Exception as e:
            # Log failure message in case of an error
            log_msg = {
                "step": "modelling",
                "component": "Splitting Data",
                "status": "failed",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }

        finally:
            # Save the log
            LOAD_LOG(log_msg)

    
    def train_model(self, n_estimators=100, random_state=42):
        """
        Train the RandomForestRegressor model
        
        Parameters:
        -----------
        n_estimators : int, default=100
            Number of trees in the random forest
        random_state : int, default=42
            Random state for reproducibility
        """
        try:
            self.model = RandomForestRegressor(n_estimators=n_estimators, random_state=random_state)
            self.model.fit(self.X_train, self.y_train)

            log_msg = {
                "step": "modelling",
                "component": "Train Model",
                "status": "success!",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }

            return self

        except Exception as e:
            # Log failure message in case of an error
            log_msg = {
                "step": "modelling",
                "component": "Train Model",
                "status": "failed",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }

        finally:
            # Save the log
            LOAD_LOG(log_msg)


    def evaluate_model(self):
        """
        Evaluate the model using test data
        
        Returns:
        --------
        dict
            Dictionary containing model evaluation metrics (MSE and R²)
        """
        try:

            log_msg = {
                "step": "modelling",
                "component": "Evaluate Model",
                "status": "success!",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }

            y_pred = self.model.predict(self.X_test)
            
            mse = mean_squared_error(self.y_test, y_pred)
            r2 = r2_score(self.y_test, y_pred)
            
            metrics = {
                'mean_squared_error': mse,
                'r2_score': r2
            }
            
            return metrics
        
        except Exception as e:
            # Log failure message in case of an error
            log_msg = {
                "step": "modelling",
                "component": "Evaluate Model",
                "status": "failed",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }

        finally:
            # Save the log
            LOAD_LOG(log_msg)
    
    def tune_hyperparameters(self, param_grid=None):
        """
        Perform hyperparameter tuning using GridSearchCV
        
        Parameters:
        -----------
        param_grid : dict, default=None
            Grid of parameters for tuning
        
        Returns:
        --------
        dict
            Best parameters and best score
        """
        try:
            if param_grid is None:
                param_grid = {
                    'n_estimators': [50, 100, 200],
                    'max_depth': [None, 10, 20, 30],
                    'min_samples_split': [2, 5, 10],
                    'min_samples_leaf': [1, 2, 4]
                }
            
            grid_search = GridSearchCV(
                RandomForestRegressor(random_state=42),
                param_grid=param_grid,
                cv=5,
                scoring='neg_mean_squared_error',
                n_jobs=-1
            )
            
            grid_search.fit(self.X_train, self.y_train)
            
            self.model = grid_search.best_estimator_

            log_msg = {
                "step": "modelling",
                "component": "Tune Hyperparameter",
                "status": "success!",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }
           
            
            return {
                'best_params': grid_search.best_params_,
                'best_score': -grid_search.best_score_  # Negation because scoring is neg_mean_squared_error
            }
        

        except Exception as e:
            # Log failure message in case of an error
            log_msg = {
                "step": "modelling",
                "component": "Tune Hyperparameter",
                "status": "failed",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }

        finally:
            # Save the log
            LOAD_LOG(log_msg) 

    def predict(self, X):
        """
        Make predictions with the trained model
        
        Parameters:
        -----------
        X : array-like
            Features for prediction
        
        Returns:
        --------
        array
            Predicted car price values
        """
        try:
        # Ensure the given features are consistent with the features used for training
            if isinstance(X, pd.DataFrame):
                # If X is a DataFrame, perform feature engineering
                X_copy = X.copy()
                X_copy['car_age'] = 2015 - X_copy['year']
                X_copy['odometer_log'] = np.log1p(X_copy['odometer'])
                
                # One-hot encoding
                X_encoded = pd.get_dummies(X_copy[self.features], drop_first=True)
                
                # Ensure all columns used during training are present
                missing_cols = set(self.X_encoded.columns) - set(X_encoded.columns)
                for col in missing_cols:
                    X_encoded[col] = 0
                X_encoded = X_encoded[self.X_encoded.columns]
                
                # Scaling
                X_scaled = self.scaler.transform(X_encoded)
            else:
                # If X is already an array, directly transform
                X_scaled = self.scaler.transform(X)

            log_msg = {
                "step": "modelling",
                "component": "Predict",
                "status": "success!",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }

            return self.model.predict(X_scaled)
        
        except Exception as e:
            # Log failure message in case of an error
            log_msg = {
                "step": "modelling",
                "component": "Predict",
                "status": "failed",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }

        finally:
            # Save the log
            LOAD_LOG(log_msg) 
    
    def store_model(self, model_filename="car_price_model.pkl", bucket_name="car-sales-modelling", 
                    minio_host="localhost:9001", minio_access_key=None, minio_secret_key=None):
        """
        Save the machine learning model to MinIO object storage
        
        Parameters:
        -----------
        model_filename : str, default="car_price_model.pkl"
            File name for saving the model
        bucket_name : str, default="car_sales_modelling"
            Bucket name in MinIO
        minio_host : str, default="localhost:9000"
            MinIO server host and port
        minio_access_key : str, default=None
            Access key for MinIO authentication (if None, will be taken from environment variables)
        minio_secret_key : str, default=None
            Secret key for MinIO authentication (if None, will be taken from environment variables)
            
        Returns:
        --------
        bool
            True if the model is successfully saved to MinIO, False if failed
        """
        if self.model is None:
            print("The model is not trained. Please train the model first with the train_model() method.")
            return False
        
        try:
            # Get credentials from environment variables if not provided
            if minio_access_key is None:
                minio_access_key = os.getenv('MINIO_ACCESS_KEY')
            if minio_secret_key is None:
                minio_secret_key = os.getenv('MINIO_SECRET_KEY')
                
            if minio_access_key is None or minio_secret_key is None:
                print("Error: MINIO_ACCESS_KEY and MINIO_SECRET_KEY must be provided")
                return False
                
            # Save model to a local file first
            print(f"Saving model to local file: {model_filename}")
            joblib.dump(self.model, model_filename)
            
            # Also save the scaler since it's required for prediction
            scaler_filename = f"scaler_{model_filename}"
            joblib.dump(self.scaler, scaler_filename)
            
            # Save the column information for one-hot encoding
            columns_filename = f"columns_{model_filename}.pkl"
            joblib.dump(self.X_encoded.columns, columns_filename)
            
            # Initialize MinIO client
            print(f"Connecting to MinIO server: {minio_host}")
            client = Minio(
                minio_host,
                access_key=minio_access_key,
                secret_key=minio_secret_key,
                secure=False  # Set to True if using HTTPS
            )
            
            # Check and create the bucket if not exists
            if not client.bucket_exists(bucket_name):
                print(f"Bucket '{bucket_name}' not found. Creating new bucket.")
                client.make_bucket(bucket_name)
                
            # Date for versioning
            current_date = datetime.now().strftime("%Y%m%d_%H%M%S")
            model_version_filename = f"{current_date}_{model_filename}"
            
            # Upload model to MinIO
            print(f"Uploading model to bucket '{bucket_name}' with file name '{model_version_filename}'")
            client.fput_object(bucket_name, model_version_filename, model_filename)
            
            # Upload scaler to MinIO
            scaler_version_filename = f"{current_date}_scaler_{model_filename}"
            client.fput_object(bucket_name, scaler_version_filename, scaler_filename)
            
            # Upload column information to MinIO
            columns_version_filename = f"{current_date}_columns_{model_filename}"
            client.fput_object(bucket_name, columns_version_filename, columns_filename)
            
            # Remove local files after upload
            os.remove(model_filename)
            os.remove(scaler_filename)
            os.remove(columns_filename)
            
            print("Model, scaler, and column information successfully saved to MinIO.")

            log_msg = {
                "step": "modelling",
                "component": "Store Model",
                "status": "success!",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }

            return True
        
        
        except Exception as e:
            # Log failure message in case of an error
            log_msg = {
                "step": "modelling",
                "component": "Predict",
                "status": "failed",
                "table_name": "car_price",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "error_msg": str(e)
            }

            print(f"Error saving model to MinIO: {str(e)}")
            return False

        finally:
            # Save the log
            LOAD_LOG(log_msg) 
    
    def run_pipeline(self, tune=False, store=False):
        """
        Run the entire modeling pipeline from start to finish
        
        Parameters:
        -----------
        tune : bool, default=False
            Whether to perform hyperparameter tuning
        store : bool, default=False
            Whether to save the model to MinIO after training
            
        Returns:
        --------
        dict
            Model evaluation metrics
        """
        self.feature_engineering()
        self.prepare_data()
        self.scale_features()
        self.split_data()
        
        if tune:
            tuning_results = self.tune_hyperparameters()
            print(f"Best parameters: {tuning_results['best_params']}")
            print(f"Best MSE score: {tuning_results['best_score']}")
        else:
            self.train_model()
        
        metrics = self.evaluate_model()
        print(f"Mean Squared Error: {metrics['mean_squared_error']}")
        print(f"R-squared: {metrics['r2_score']}")
        
        if store:
            self.store_model()
        
        return metrics


In [3]:
df_model = CarPriceModel(data=df)

In [4]:
metrics = df_model.run_pipeline(store=True)



Mean Squared Error: 2710093.717584965
R-squared: 0.9718879951976879
Saving model to local file: car_price_model.pkl
Connecting to MinIO server: localhost:9001
Bucket 'car-sales-modelling' not found. Creating new bucket.
Uploading model to bucket 'car-sales-modelling' with file name '20250424_205128_car_price_model.pkl'
Model, scaler, and column information successfully saved to MinIO.
