This notebook is a cleaned up version of https://www.kaggle.com/code/abhivij/housing-price-prediction-part-2-exploratory 

# References
- sklearn pipeline : https://www.kaggle.com/code/alexisbcook/pipelines
- ordinal categorical features : https://www.kaggle.com/code/ryanholbrook/feature-engineering-for-house-prices

# Import libraries

In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
from pandas.api.types import CategoricalDtype

from xgboost import XGBRegressor

from sklearn.model_selection import KFold, cross_val_score
from sklearn.feature_selection import mutual_info_regression

from sklearn.cluster import KMeans
from sklearn.decomposition import PCA

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline

from sklearn.preprocessing import StandardScaler, OneHotEncoder

from functools import reduce

from category_encoders import MEstimateEncoder, cat_boost

from sklearn.compose import ColumnTransformer

import optuna
import time

# Global variables

In [2]:
SEED = 0

# Load data and preprocess function

In [3]:
def load_and_preprocess_data(train_data = True):
    if train_data:
        print("Train data")
        X = pd.read_csv('/kaggle/input/house-prices-advanced-regression-techniques/train.csv', index_col='Id')
        X.dropna(axis=0, subset=['SalePrice'], inplace=True)
        y = X.SalePrice
        X.drop(['SalePrice'], axis=1, inplace=True)
    else:
        print("Test data")
        X = pd.read_csv('/kaggle/input/house-prices-advanced-regression-techniques/test.csv', index_col='Id')
        y = None
    print("Loaded data")
    print(X.shape)
    
    X = encode(X)
    X = impute(X)
    
    return (X, y)

def encode(df):
    # Nominal categories
    for name in features_nom:
        df[name] = df[name].astype("category")
        # Add a None category for missing values
        if "None" not in df[name].cat.categories:
            df[name] = df[name].cat.add_categories("None")
    # Ordinal categories
    for name, levels in ordered_levels.items():
        df[name] = df[name].astype(CategoricalDtype(levels,
                                                    ordered=True))
    return df

def impute(df):
    for name in df.select_dtypes("number"):
        df[name] = df[name].fillna(0)
    for name in df.select_dtypes("category"):
        df[name] = df[name].fillna("None")
    return df

# Categorical features - special handling
Ref : https://www.kaggle.com/code/ryanholbrook/feature-engineering-for-house-prices

In [4]:
# The nominative (unordered) categorical features
features_nom = ["MSSubClass", "MSZoning", "Street", "Alley", "LandContour", "LotConfig", "Neighborhood", 
                "Condition1", "Condition2", "BldgType", "HouseStyle", "RoofStyle", "RoofMatl", "Exterior1st", "Exterior2nd", 
                "MasVnrType", "Foundation", "Heating", "CentralAir", "GarageType", "MiscFeature", "SaleType", "SaleCondition"]


# The ordinal (ordered) categorical features 
five_levels = ["Po", "Fa", "TA", "Gd", "Ex"]
ten_levels = list(range(1, 11))

ordered_levels = {
    "OverallQual": ten_levels,
    "OverallCond": ten_levels,
    "ExterQual": five_levels,
    "ExterCond": five_levels,
    "BsmtQual": five_levels,
    "BsmtCond": five_levels,
    "HeatingQC": five_levels,
    "KitchenQual": five_levels,
    "FireplaceQu": five_levels,
    "GarageQual": five_levels,
    "GarageCond": five_levels,
    "PoolQC": five_levels,
    "LotShape": ["IR3", "IR2", "IR1", "Reg"],
    "LandSlope": ["Sev", "Mod", "Gtl"],
    "BsmtExposure": ["No", "Mn", "Av", "Gd"],
    "BsmtFinType1": ["Unf", "LwQ", "Rec", "BLQ", "ALQ", "GLQ"],
    "BsmtFinType2": ["Unf", "LwQ", "Rec", "BLQ", "ALQ", "GLQ"],
    "Functional": ["Sal", "Sev", "Maj1", "Maj2", "Mod", "Min2", "Min1", "Typ"],
    "GarageFinish": ["Unf", "RFn", "Fin"],
    "PavedDrive": ["N", "P", "Y"],
    "Utilities": ["ELO", "NoSeWa", "NoSewr", "AllPub"],
    "CentralAir": ["N", "Y"],
    "Electrical": ["Mix", "FuseP", "FuseF", "FuseA", "SBrkr"],
    "Fence": ["MnWw", "GdWo", "MnPrv", "GdPrv"],
}

ordered_levels = {key: ["None"] + value for key, value in
                  ordered_levels.items()}
ordered_levels.keys()

dict_keys(['OverallQual', 'OverallCond', 'ExterQual', 'ExterCond', 'BsmtQual', 'BsmtCond', 'HeatingQC', 'KitchenQual', 'FireplaceQu', 'GarageQual', 'GarageCond', 'PoolQC', 'LotShape', 'LandSlope', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinType2', 'Functional', 'GarageFinish', 'PavedDrive', 'Utilities', 'CentralAir', 'Electrical', 'Fence'])

# Append features

In [5]:
ms_subclass_mapping = {
    20: "1-STORY 1946 & NEWER ALL STYLES",
    30: "1-STORY 1945 & OLDER",
    40: "1-STORY W/FINISHED ATTIC ALL AGES",
    45: "1-1/2 STORY - UNFINISHED ALL AGES",
    50: "1-1/2 STORY FINISHED ALL AGES",
    60: "2-STORY 1946 & NEWER",
    70: "2-STORY 1945 & OLDER",
    75: "2-1/2 STORY ALL AGES",
    80: "SPLIT OR MULTI-LEVEL",
    85: "SPLIT FOYER",
    90: "DUPLEX - ALL STYLES AND AGES",
    120: "1-STORY PUD (Planned Unit Development) - 1946 & NEWER",
    150: "1-1/2 STORY PUD - ALL AGES",
    160: "2-STORY PUD - 1946 & NEWER",
    180: "PUD - MULTILEVEL - INCL SPLIT LEV/FOYER",
    190: "2 FAMILY CONVERSION - ALL STYLES AND AGES"
}

ms_class_mapping = {
    "1-STORY 1946 & NEWER ALL STYLES": "1-Story",
    "1-STORY 1945 & OLDER": "1-Story",
    "1-STORY W/FINISHED ATTIC ALL AGES": "1-Story",
    "1-STORY PUD (Planned Unit Development) - 1946 & NEWER": "1-Story",
    "1-1/2 STORY - UNFINISHED ALL AGES": "1-1/2 Story",
    "1-1/2 STORY FINISHED ALL AGES": "1-1/2 Story",
    "1-1/2 STORY PUD - ALL AGES": "1-1/2 Story",
    "2-STORY 1946 & NEWER": "2-Story",
    "2-STORY 1945 & OLDER": "2-Story",
    "2-STORY PUD - 1946 & NEWER": "2-Story",
    "SPLIT OR MULTI-LEVEL": "Split-Level",
    "SPLIT FOYER": "Split-Level",
    "PUD - MULTILEVEL - INCL SPLIT LEV/FOYER": "Split-Level",
    "DUPLEX - ALL STYLES AND AGES": "Multi-Family/Duplex",
    "2 FAMILY CONVERSION - ALL STYLES AND AGES": "Multi-Family/Duplex",
    "2-1/2 STORY ALL AGES": "2-1/2 Story",
}

In [6]:
def append_features(df):
    df = df.copy()

    #The commented features below ended up decreasing the overall score
    
    df["LivLotRatio"] = df.GrLivArea / df.LotArea
    # df["Spaciousness"] = (df['1stFlrSF'] + df['2ndFlrSF']) / df.TotRmsAbvGrd
    # df["Spaciousness"] = df.GrLivArea / df.TotRmsAbvGrd

    # bldg_dummies = pd.get_dummies(df.BldgType, prefix="Bldg")
    # df = df.join(bldg_dummies.mul(df.GrLivArea, axis=0))
    
    # df["PorchTypes"] = df[["WoodDeckSF", "OpenPorchSF", "EnclosedPorch", "3SsnPorch", "ScreenPorch"]].gt(0.0).sum(axis=1)

    # df["TotalOutsideSF"] = df.WoodDeckSF + df.OpenPorchSF + df.EnclosedPorch + df["3SsnPorch"] + df.ScreenPorch

    df["MSClass"] = (X["MSSubClass"].map(ms_subclass_mapping)
                                    .map(ms_class_mapping)
                                    .astype('category')
                                    .cat.add_categories("None")
                                    .fillna("None"))
    df["IsPUD"] = (X["MSSubClass"].map(ms_subclass_mapping)
                                  .str.contains('PUD')
                                  .astype('category')
                                  .cat.add_categories("None")
                                  .fillna("None"))
    # df.drop(columns = "MSSubClass", inplace = True)

    # df["MedNhbdArea"] = df.groupby("Neighborhood")["GrLivArea"].transform("median")

    # #PCA inspired as specified in https://www.kaggle.com/code/ryanholbrook/feature-engineering-for-house-prices
    # df["Feature1"] = df.GrLivArea + df.TotalBsmtSF
    # df["Feature2"] = df.YearRemodAdd * df.TotalBsmtSF
    
    return df

# Load data and process

In [7]:
X, y = load_and_preprocess_data()
X_test, _ = load_and_preprocess_data(train_data = False)

print("removing less important features")
features_to_drop = ['MoSold', 'MiscVal', 'Utilities']
X.drop(columns = features_to_drop, inplace = True)
X_test.drop(columns = features_to_drop, inplace = True)
print(X.shape)
print(X_test.shape)

print("appending features")
X = append_features(X)
print(X.shape)
X_test = append_features(X_test)
print(X_test.shape)

def remove_columns_from_list(orig_list, to_remove):
    return [f for f in orig_list if f not in to_remove]
    
ordinal_categorical_cols = remove_columns_from_list(ordered_levels.keys(), features_to_drop)

Train data
Loaded data
(1460, 79)
Test data
Loaded data
(1459, 79)
removing less important features
(1460, 76)
(1459, 76)
appending features
(1460, 79)
(1459, 79)


# Append Cluster information as training features

In [8]:
class AppendKMeans(BaseEstimator, TransformerMixin):
    def __init__(self, cluster_columns, n_clusters=20, return_cluster=True, return_distances=False):
        self.cluster_columns = cluster_columns
        self.n_clusters = n_clusters
        self.return_cluster = return_cluster
        self.return_distances = return_distances

    def fit(self, X, y=None):
        X = X.copy()
        for colname in X.select_dtypes(["category"]):
            X[colname] = X[colname].cat.codes
        self.scaler = StandardScaler()
        X_scaled = self.scaler.fit_transform(X[self.cluster_columns])  # Scale features
        self.kmeans = KMeans(n_clusters=self.n_clusters, n_init=10, random_state=SEED)
        self.kmeans.fit(X_scaled)  # Fit K-Means on scaled features
        return self

    def transform(self, X):
        result = X.copy()
        X = X.copy()
        for colname in X.select_dtypes(["category"]):
            X[colname] = X[colname].cat.codes
        X_scaled = self.scaler.transform(X[self.cluster_columns])  # Apply same scaling as training
        if self.return_cluster:
            result["Cluster"] = self.kmeans.predict(X_scaled)  # Get cluster
        if self.return_distances:
            cluster_distances = self.kmeans.transform(X_scaled)
            cluster_distances = pd.DataFrame(
                    cluster_distances, columns=[f"distance_centroid_{i}" for i in range(cluster_distances.shape[1])]
            )
            cluster_distances.set_index(X.index, inplace = True)
            result = result.join(cluster_distances)
        return result

# Append PCA

In [9]:
class AppendPCA(BaseEstimator, TransformerMixin):
    def __init__(self, pca_columns, n_components=2, pca_col_prefix="PCA"):
        self.pca_columns = pca_columns
        self.n_components = n_components
        self.pca_col_prefix = pca_col_prefix

    def fit(self, X, y=None):
        X = X.copy()
        for colname in X.select_dtypes(["category"]):
            X[colname] = X[colname].cat.codes
        self.scaler = StandardScaler()
        X_scaled = self.scaler.fit_transform(X[self.pca_columns])  # Scale features
        self.pca = PCA(n_components=self.n_components, random_state=SEED)
        self.pca.fit(X_scaled)  # Fit PCA on scaled features
        return self

    def transform(self, X):
        result = X.copy()
        X = X.copy()
        for colname in X.select_dtypes(["category"]):
            X[colname] = X[colname].cat.codes
        X_scaled = self.scaler.transform(X[self.pca_columns])  # Apply same scaling as training
        pca_components = self.pca.transform(X_scaled)  # Apply PCA
        # print(self.pca.explained_variance_ratio_)
        # print(np.cumsum(self.pca.explained_variance_ratio_))
        pca_components = pd.DataFrame(
                    pca_components, columns=[f"{self.pca_col_prefix}_{i}" for i in range(pca_components.shape[1])]
        )
        pca_components.set_index(X.index, inplace = True)
        result = result.join(pca_components)
        return result

# Target Encoding

In [10]:
class CrossFoldEncoder(BaseEstimator, TransformerMixin):
    
    #encoder_other_params should be a dict of argument_name and value
    # This is done to ensure it works properly within Pipeline
    # Not passing it as kwargs, because Pipeline uses sklearn.base.clone() and clone does not retain kwargs
    def __init__(self, cols, encoder, encoder_other_params):
        self.cols = cols
        self.encoder = encoder
        self.cv = KFold(n_splits=5)
        self.encoder_other_params = encoder_other_params  
        
    # Fit an encoder on one split and transform the feature on the
    # other. Iterating over the splits in all folds gives a complete
    # transformation. We also now have one trained encoder on each
    # fold.
    def fit(self, X, y):
        self.fitted_encoders_ = []
        X_encoded = []
        for idx_encode, _ in self.cv.split(X):
            fitted_encoder = self.encoder(cols=self.cols, **self.encoder_other_params)
            fitted_encoder.fit(
                X.iloc[idx_encode, :], y.iloc[idx_encode],
            )
            self.fitted_encoders_.append(fitted_encoder)
        return self

    # To transform the data, average the encodings learned from
    # each fold.
    def transform(self, X):
        X_encoded_list = []
        for fitted_encoder in self.fitted_encoders_:
            X_encoded = fitted_encoder.transform(X)
            X_encoded_list.append(X_encoded[self.cols])
        X_encoded = reduce(
            lambda x, y: x.add(y, fill_value=0), X_encoded_list
        ) / len(X_encoded_list)
        X_encoded.columns = [name + "_encoded" for name in X_encoded.columns]
        #drop columns for which target encoding has been created and join with target encodings
        return X.drop(columns=self.cols).join(X_encoded)   

# Training pipeline

Lets define a Transformer to convert categorical columns to their codes

In [11]:
class OrdinalEncoder(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        result = X.copy()
        for col in result.columns:
            result[col] = result[col].cat.codes
        return result

In [12]:
categorical_cols = [cname for cname in X.columns if
                    X[cname].dtype == "category"]

numerical_cols = [cname for cname in X.columns if 
                X[cname].dtype in ['int64', 'float64']]

small_cat_categorical_cols = [cname for cname in categorical_cols if
                             X[cname].nunique() < 10 and cname not in ordinal_categorical_cols]
large_cat_categorical_cols = [cname for cname in categorical_cols if
                             X[cname].nunique() >= 10 and cname not in ordinal_categorical_cols]

print(len(ordinal_categorical_cols))
print(len(small_cat_categorical_cols))
print(len(large_cat_categorical_cols))
print(len(categorical_cols))  
print(len(numerical_cols))

23
20
4
47
32


In [13]:
numerical_transformer = Pipeline(steps=[
    ('scaler', StandardScaler())
])
ord_categorical_transformer = Pipeline(steps=[
    ('catcode', OrdinalEncoder())
    # ,
    # ('scaler', StandardScaler())
])
small_categorical_transformer = Pipeline(steps=[
    # ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ('small_cat_catcode', OrdinalEncoder())
    # ,
    # ('scaler', StandardScaler())
])


pipeline = Pipeline([
    ('append_pca', AppendPCA(X.columns, n_components = 5)),  
    ('append_target_encoder', CrossFoldEncoder(cols=large_cat_categorical_cols, 
                                        encoder=MEstimateEncoder, 
                                        encoder_other_params={"m":10.0})),
    ('encoder_scaler', ColumnTransformer(
        transformers=[
            ('num', numerical_transformer, [col + "_encoded" for col in large_cat_categorical_cols]+[f"PCA_{i}" for i in range(5)]),
            ('ord_cat', ord_categorical_transformer, ordinal_categorical_cols),
            ('small_cat', small_categorical_transformer, small_cat_categorical_cols)
        ],
        remainder="passthrough")
    ),
    ('model', XGBRegressor())         
])

In [14]:
def score_dataset(X, y, model=XGBRegressor()):
    # Metric for Housing competition is RMSLE (Root Mean Squared Log Error)
    log_y = np.log(y)
    score = cross_val_score(
        model, X, log_y, cv=5, scoring="neg_root_mean_squared_error",
    )
    score = -1 * score.mean()
    return score

# Optimize hyperparameters

In [15]:
# def objective(trial):  
    
#     params = {
#         'model__random_state':       SEED,
#         'model__n_estimators':       trial.suggest_int('model__n_estimators', 200, 1000, step = 50),
#         'model__learning_rate':      trial.suggest_float('model__learning_rate', 1e-4, 0.5, log=True),
#         'model__max_depth':          trial.suggest_int('model__max_depth', 4, 8),
#         'model__min_child_weight':   trial.suggest_int('model__min_child_weight', 0, 10),
#         'model__lambda':             trial.suggest_float('model__lambda', 0, 10.0),
#         'model__alpha':              trial.suggest_float('model__alpha', 0, 10.0),
#         'model__subsample':          trial.suggest_float('model__subsample', 0.4, 1.0),
#         'model__colsample_bytree':   trial.suggest_float('model__colsample_bytree', 0.4, 1.0),
#         'model__colsample_bylevel':  trial.suggest_float('model__colsample_bylevel', 0.4, 1.0),
#         'model__colsample_bynode':   trial.suggest_float('model__colsample_bynode', 0.4, 1.0)
#     }
    
#     pipeline = Pipeline([
#         ('append_pca', AppendPCA(X.columns, n_components = 5)),  
#         ('append_target_encoder', CrossFoldEncoder(cols=large_cat_categorical_cols, 
#                                             encoder=MEstimateEncoder, 
#                                             encoder_other_params={"m":10.0})),
#         ('encoder_scaler', ColumnTransformer(
#             transformers=[
#                 ('num', numerical_transformer, [col + "_encoded" for col in large_cat_categorical_cols]+[f"PCA_{i}" for i in range(5)]),
#                 ('ord_cat', ord_categorical_transformer, ordinal_categorical_cols),
#                 ('small_cat', small_categorical_transformer, small_cat_categorical_cols)
#             ],
#             remainder="passthrough")
#         ),
#         ('model', XGBRegressor())         
#     ])
#     pipeline.set_params(**params)

#     val_score = score_dataset(X, y, pipeline)
#     return val_score

# start_time = time.time()
# study = optuna.create_study(direction = 'minimize')
# study.optimize(objective, n_trials = 100)
# end_time = time.time()
# elapsed_time = end_time - start_time
# print(f"XGB tuning took {elapsed_time:.2f} seconds.")
# print(elapsed_time)

# print(study.best_params)
# print(study.best_value)
# print(study.best_trial)

In [16]:
#Best params
best_params = {'model__n_estimators': 1000, 'model__learning_rate': 0.0397655074882934, 
               'model__max_depth': 5, 'model__min_child_weight': 10, 'model__lambda': 4.136351431737015, 
               'model__alpha': 0.1695476600984575, 'model__subsample': 0.4628671044982048, 'model__colsample_bytree': 0.5990823984487135, 
               'model__colsample_bylevel': 0.8507190370379901, 'model__colsample_bynode': 0.5575670205606194}
#0.11995546098890655

pipeline.set_params(**best_params)

# CV Score

In [20]:
# score_dataset(X, y, pipeline)
# # 0.12792979790024622
# public score : 0.13787

# # 0.12984893358925625 - after modifying Utilities, LotShape categories
# public score : 0.13794


# 0.11995546098890655  with best Optuna hyperparams
# public score : 0.12103

# Train on full data and obtain test predictions

In [18]:
#retrain on full data and obtain test predictions using best model hyperparameter values
pipeline.fit(X, np.log(y))

# Preprocessing of validation data, get predictions
pred = np.exp(pipeline.predict(X_test))

print(pred[:10])

[126233.984 162617.17  179080.66  195618.72  179640.84  181887.75
 175557.3   167952.25  171820.72  128997.9  ]


In [19]:
# Save test predictions to file
output = pd.DataFrame({'Id': X_test.index,
                       'SalePrice': pred})
output.to_csv('submission.csv', index=False)
print('saved output file')

saved output file
