# Library / Packages

In [None]:
# basic
import pandas as pd
import numpy as np
from scipy.stats import mstats
import warnings

# data preparation
from sklearn.impute import SimpleImputer 
from sklearn.base import BaseEstimator, TransformerMixin 
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder, StandardScaler, FunctionTransformer
from sklearn.pipeline import Pipeline 
from sklearn.compose import ColumnTransformer 

# data modeling
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import LinearRegression, Ridge, Lasso

# data scoring


# data tuning

# visualization
import matplotlib.pyplot as plt

# Format

In [None]:
def lab_round(x, pos): 
    if abs(x) >= 1e9: 
        return f'{x/1e9}B'
    
    elif abs(x) >= 1e6:
        return f'{x/1e6}M'
    
    elif abs(x) >= 1e3:
        return f'{x/1e3}K'
    
    else:
        return f'{x}'
    
def val_round(x):
    if abs(x) >= 1e9:
        return f'{x/1e9:.2f} B'
    
    elif abs(x) >= 1e6:
        return f'{x/1e6:.2f} M'
    
    elif abs(x) >= 1e3:
        return f'{x/1e3:.2f} K'
    
    else:
        return f'{x:.2f}'

In [None]:
# Menonaktifkan traceback pada peringatan
warnings.simplefilter("ignore", category = UserWarning)

In [None]:
class OutlierRemover(BaseEstimator, TransformerMixin):
    def __init__(self, factor: float = 1.5):
        self.factor = factor
        self.bounds_ = None

    def fit(self, X: pd.DataFrame | np.ndarray, y=None) -> "OutlierRemover":
        X = self._convert_to_dataframe(X)
        Q1 = X.quantile(0.25)
        Q3 = X.quantile(0.75)
        IQR = Q3 - Q1
        self.bounds_ = {
            col: (Q1[col] - self.factor * IQR[col], Q3[col] + self.factor * IQR[col])
            for col in X.columns
        }
        for col in X.columns:
            if IQR[col] == 0:
                warnings.warn(f"Column '{col}' has zero IQR; no outlier removal applied.")
        return self

    def transform(self, X: pd.DataFrame | np.ndarray) -> np.ndarray:
        if self.bounds_ is None:
            raise ValueError("This OutlierRemover instance is not fitted yet. Call 'fit' first.")
        X = self._convert_to_dataframe(X).copy()
        for col, (low, high) in self.bounds_.items():
            X[col] = X[col].clip(lower=low, upper=high)
        return X.to_numpy()

    def get_feature_names_out(self, input_features=None) -> list:
        if input_features is None:
            return [f"feature_{i}" for i in range(len(self.bounds_))]
        return input_features

    @staticmethod
    def _convert_to_dataframe(X: pd.DataFrame | np.ndarray) -> pd.DataFrame:
        if isinstance(X, np.ndarray):
            return pd.DataFrame(X, columns=[f"feature_{i}" for i in range(X.shape[1])])
        elif isinstance(X, pd.DataFrame):
            return X
        else:
            raise ValueError(
                "Input must be a pandas.DataFrame or numpy.ndarray, got "
                f"{type(X).__name__}"
            )

In [None]:
# Fungsi untuk konversi tipe data
def convert_object_columns_to_numeric(df):
    for col in df.select_dtypes(include = ['object']).columns:  
        try:
            # Cek apakah semua nilai bisa dikonversi ke float
            df[col] = pd.to_numeric(df[col], errors='raise')
            
            # Jika bisa, ubah ke int jika semua nilai adalah bilangan bulat
            if all(df[col] % 1 == 0):  # Cek apakah semua nilai adalah bilangan bulat
                df[col] = df[col].astype(int)

        except ValueError:
            pass  # Jika ada nilai non-angka, biarkan tetap object
        
    return df

# Dataset

In [None]:
# Memuat data train dan test
train_df = pd.read_csv('../dataset/train.csv')
test_df = pd.read_csv('../dataset/test.csv')

In [None]:
# show all column
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

## Train Dataset

In [None]:
train_df.info()

In [None]:
# drop column
train_df = train_df.drop('Id', axis = 1)

# convert object if all numeric
train_df = convert_object_columns_to_numeric(train_df)

# check duplicate general data
print(f'Total General Duplicated: {train_df.duplicated().sum()} \n')
train_df.info()

In [None]:
# null column
null_numeric = []
null_obj = []

# 
null_columns = train_df.columns[train_df.isnull().sum() > 0]

for col in null_columns:
    if train_df[col].dtype in ['int', 'float']:
        null_numeric.append(col)
        
    elif train_df[col].dtype == 'object':
        null_obj.append(col)

# 
print("Null Numeric:", null_numeric)
print("Null String:", null_obj)

In [None]:
# 
num_cols = []
obj_cols = []

for col in train_df:
    if train_df[col].dtype in ['int', 'float']:
        num_cols.append(col)
        
    elif train_df[col].dtype == 'object':
        obj_cols.append(col)

# 
print("Numeric Cols:", num_cols)
print("String Cols:", obj_cols)

In [None]:
# Original columns
train_original = train_df.columns

# Numeric Pipeline
numerical_pipeline = Pipeline(steps = [
    ("imputer", SimpleImputer(strategy = "mean")), 
    ("outlier_removal", OutlierRemover(factor = 1.5))
])

# String Pipeline
categorical_pipeline = Pipeline(steps = [
    ("imputer", SimpleImputer(strategy = "most_frequent"))
])

# ColumnTransformer untuk menggabungkan proses imputasi
prep_stage_1 = ColumnTransformer(
    transformers = [
        ("num", numerical_pipeline, num_cols), 
        ("cat", categorical_pipeline, obj_cols), 
    ], 
    remainder = "drop", 
    verbose_feature_names_out = True)

In [None]:
# Transform data menggunakan fit_transform pada tahap 1
train_df = prep_stage_1.fit_transform(train_df)

# Columns After: ubah kembali ke DataFrame dengan kolom dari prep_stage_1
train_df = pd.DataFrame(train_df, columns = prep_stage_1.get_feature_names_out())

# Hilangkan prefix (misalnya, "num__", "cat__", "out__")
clean_columns = [col.split("__", 1)[-1] for col in train_df.columns]
train_df.columns = clean_columns

In [None]:
# Menampilkan total null pada setiap kolom
null_columns = train_df.isnull().sum()[train_df.isnull().sum() > 0]
print(null_columns)
train_df.info()

In [None]:
train_df = convert_object_columns_to_numeric(train_df)
train_df.info()

In [None]:
# rows before filtering
print(f'Total Rows: {len(train_df)}')

# Filter kolom numerik
num_cols = train_df.select_dtypes(include = ["number"]).columns

# Pipeline untuk outlier remover hanya pada kolom numerik
outlier_pipeline = Pipeline(steps=[
    ("outlier_removal", OutlierRemover(factor=1.5))
])

# Transformasi data hanya pada kolom numerik
train_df[num_cols] = outlier_pipeline.fit_transform(train_df[num_cols])

# Output jumlah baris setelah transformasi
print(f'Total Rows: {len(train_df)}')

In [None]:
# Daftar kolom untuk label encoding (kolom ordinal)
encoding_set = {'OverallQual', 'OverallCond', 'ExterQual', 'ExterCond', 
                'BsmtQual', 'BsmtCond', 'HeatingQC', 'KitchenQual', 
                'FireplaceQu', 'GarageQual', 'GarageCond'}

# Inisialisasi list untuk menyimpan kolom yang telah dikelompokkan
train_ordinal_cols = []
train_one_hot_cols = []
train_numeric_cols = []

# Mengelompokkan kolom berdasarkan tipe data
for col in train_df.columns:
    if train_df[col].dtype in ['int', 'float']:
        train_numeric_cols.append(col)

    elif train_df[col].dtype == 'object':
        if col in encoding_set:
            train_ordinal_cols.append(col)

        else:
            train_one_hot_cols.append(col)

# Menampilkan hasil
print("Ordinal Encoding Columns:", train_ordinal_cols)
print("One-Hot Encoding Columns:", train_one_hot_cols)
print("Numeric Columns:", train_numeric_cols)

## Test Dataset

In [None]:
test_df.info()

In [None]:
# drop column
test_df = test_df.drop('Id', axis = 1)

# convert object if all numeric
test_df = convert_object_columns_to_numeric(test_df)

# check duplicate general data
print(f'Total General Duplicated: {test_df.duplicated().sum()} \n')
test_df.info()

In [None]:
# null column
null_numeric = []
null_obj = []

# 
null_columns = test_df.columns[test_df.isnull().sum() > 0]

for col in null_columns:
    if test_df[col].dtype in ['int', 'float']:
        null_numeric.append(col)
        
    elif test_df[col].dtype == 'object':
        null_obj.append(col)

# 
print("Null Numeric:", null_numeric)
print("Null String:", null_obj)

In [None]:
# 
num_cols = []
obj_cols = []


for col in test_df:
    if test_df[col].dtype in ['int', 'float']:
        num_cols.append(col)
        
    elif test_df[col].dtype == 'object':
        obj_cols.append(col)

# 
print("Numeric Cols:", num_cols)
print("String Cols:", obj_cols)

In [None]:
# Original columns
test_original = test_df.columns

# Numeric Pipeline
numerical_pipeline = Pipeline(steps = [
    ("imputer", SimpleImputer(strategy = "mean"))
])

# String Pipeline
categorical_pipeline = Pipeline(steps = [
    ("imputer", SimpleImputer(strategy = "most_frequent"))
])

# ColumnTransformer untuk menggabungkan proses imputasi
prep_stage_1 = ColumnTransformer(
    transformers = [
        ("num", numerical_pipeline, null_numeric), 
        ("out", OutlierRemover(factor = 1.5), num_cols),
        ("cat", categorical_pipeline, null_obj), 
    ], remainder = "passthrough")

In [None]:
# Transform data menggunakan fit_transform pada tahap 1
test_df = prep_stage_1.fit_transform(test_df)

# implement original column
test_df = pd.DataFrame(test_df, columns = test_original)

# Menampilkan total null pada setiap kolom
null_columns = test_df.isnull().sum()[test_df.isnull().sum() > 0]
print(null_columns)

In [None]:
# Daftar kolom untuk label encoding (kolom ordinal)
encoding_set = {'OverallQual', 'OverallCond', 'ExterQual', 'ExterCond', 
                'BsmtQual', 'BsmtCond', 'HeatingQC', 'KitchenQual', 
                'FireplaceQu', 'GarageQual', 'GarageCond'}

# Inisialisasi list untuk menyimpan kolom yang telah dikelompokkan
test_ordinal_cols = []
test_one_hot_cols = []
test_numeric_cols = []

# Mengelompokkan kolom berdasarkan tipe data
for col in train_df.columns:
    if train_df[col].dtype in ['int', 'float']:
        test_numeric_cols.append(col)

    elif train_df[col].dtype == 'object':
        if col in encoding_set:
            test_ordinal_cols.append(col)

        else:
            test_one_hot_cols.append(col)

# Menampilkan hasil
print("Ordinal Encoding Columns:", test_ordinal_cols)
print("One-Hot Encoding Columns:", test_one_hot_cols)
print("Numeric Columns:", test_numeric_cols)

## Split Set

In [None]:
# Identifikasi kolom-kolom yang ada di train dan test
ordinal_encoding_cols = list(set(train_ordinal_cols) & set(test_ordinal_cols))
one_hot_encoding_cols = list(set(train_ordinal_cols) & set(test_ordinal_cols))
numeric_cols = list(set(train_ordinal_cols) & set(test_ordinal_cols))

In [None]:
# Memisahkan kolom target dari data
target_col = 'SalePrice'

# Memastikan kolom target ada di dalam DataFrame sebelum mencoba memisahkannya
if target_col in train_df.columns:
    X_train = train_df.drop(columns = [target_col])
    y_train = train_df[target_col]

else:
    X_train = train_df  
    y_train = None  

if target_col in test_df.columns:
    X_test = test_df.drop(columns = [target_col])
    
else:
    X_test = test_df  

In [None]:
# transform
numerical_transformer = StandardScaler()
categorical_transformer = OneHotEncoder(handle_unknown = 'ignore', sparse_output = False)
ordinal_transformer = OrdinalEncoder(handle_unknown = 'use_encoded_value', unknown_value = -1)

prep_stage_2 = ColumnTransformer(
    transformers = [
        ("num", numerical_transformer, numeric_cols), 
        ("cat", categorical_transformer, one_hot_encoding_cols), 
        ("ord", ordinal_transformer, ordinal_encoding_cols)
    ], remainder = "passthrough")

In [None]:
# Transform data menggunakan fit_transform pada tahap 1
train_check = prep_stage_2.fit_transform(train_df)

# implement original column
train_check = pd.DataFrame(train_df, columns = train_original)

# Menampilkan total null pada setiap kolom
null_columns = train_check.isnull().sum()[train_check.isnull().sum() > 0]
print(f'Train Stage 2 Check: {null_columns}')

In [None]:
# Transform data menggunakan fit_transform pada tahap 1
test_check = prep_stage_2.fit_transform(test_df)

# implement original column
test_check = pd.DataFrame(test_df, columns = test_original)

# Menampilkan total null pada setiap kolom
null_columns = test_check.isnull().sum()[test_check.isnull().sum() > 0]
print(f'Test Stage 2 Check: {null_columns}')

# Modeling

In [None]:
# Membuat pipeline yang menggabungkan preprocessing dengan model
model_pipeline = Pipeline(steps = [
    ('preprocessor', prep_stage_2),
    ('regressor', LinearRegression())
])