# House Price Regression 🏠

In [10]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

#Custom imports
from regress.missing_values import missing_values_summarizer
from regress.styler import style_dataframe
from regress.metrics import compute_metrics
#https://www.kaggle.com/datasets/yasserh/housing-prices-dataset/data

## Read in data

In [11]:
train = pd.read_csv('data/train.csv')
test = pd.read_csv('data/test.csv')

In [None]:
len(train.columns)

## Understanding the data 

In [None]:
style_dataframe(train.describe(include=['int', 'float']))
len(train.describe(include=['int', 'float']).columns)

In [None]:
style_dataframe(train.describe(include=['object']))
len(train.describe(include=['object']).columns)

In [15]:
def scatter_correlation(df, x_col, 
                        y_col, 
                        title=None, 
                        size=8, line_color='red'):
 

    if x_col not in df.columns or y_col not in df.columns:
        raise ValueError(f"Columns '{x_col}' or '{y_col}' not found in DataFrame")
    
    if not pd.api.types.is_numeric_dtype(df[x_col]) or not pd.api.types.is_numeric_dtype(df[y_col]):
        raise ValueError("Both x_col and y_col must be numeric")

    plt.figure(figsize=(10, 7))

    sns.regplot(
        x=df[x_col],
        y=df[y_col],
        scatter_kws={'s': size},  
        line_kws={'color': line_color},  
        ci=None 
    )

    plt.xlabel(x_col)
    plt.ylabel(y_col)
    plt.title(title if title else f"{y_col} vs {x_col}")
    plt.show()


In [None]:
scatter_correlation(df=train, 
                    x_col="LotFrontage", 
                    y_col="SalePrice")

## Drop unecessary features

In [17]:
X_train = train.drop(columns=['Id', 'SalePrice'])
y_train = train['SalePrice'].copy()

## Analyze missing values

In [18]:
props_df, _ = missing_values_summarizer(df=X_train)

In [None]:
style_dataframe(props_df.iloc[np.where(props_df['missing_count'] > 0)])

## Preprocess pipeline

In [20]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import RobustScaler, OneHotEncoder
import pandas as pd
import numpy as np

def preprocess_data(X_train, X_test, X_val=None, 
                    threshold=0.7):
    
    numeric_cols = X_train.select_dtypes(include=['number']).columns.tolist()
    categorical_cols = X_train.select_dtypes(include=['object', 'category']).columns.tolist()

    min_non_na = int(threshold * len(X_train))
    new_numeric_cols = X_train[numeric_cols].dropna(axis=1, 
                                                    thresh=min_non_na).columns.tolist()
    new_categorical_cols = X_train[categorical_cols].dropna(axis=1, 
                                                            thresh=min_non_na).columns.tolist()

    numeric_pipeline = Pipeline([
        ('numeric_imputer', SimpleImputer(strategy='median')),
        ('scaler', RobustScaler())
    ])

    categorical_pipeline = Pipeline([
        ('categorical_imputer', SimpleImputer(strategy='most_frequent')),
        ('one_hot_encoder', OneHotEncoder(handle_unknown='ignore', 
                                          sparse_output=False, drop='first'))
    ])

    preprocessing_pipeline = ColumnTransformer([
        ('numeric_preprocess', numeric_pipeline, 
         new_numeric_cols),
        ('categorical_preprocess', categorical_pipeline, 
         new_categorical_cols)
    ], remainder='drop', 
    verbose_feature_names_out=False)

    X_train_transformed = preprocessing_pipeline.fit_transform(X_train)
    X_test_transformed = preprocessing_pipeline.transform(X_test)

    feature_names = preprocessing_pipeline.get_feature_names_out()
    X_train_df = pd.DataFrame(X_train_transformed, 
                              columns=feature_names, 
                              index=X_train.index)
    X_test_df = pd.DataFrame(X_test_transformed, 
                             columns=feature_names, 
                             index=X_test.index)

    if X_val is not None:
        X_val_transformed = preprocessing_pipeline.transform(X_val)
        X_val_df = pd.DataFrame(X_val_transformed, 
                                columns=feature_names, 
                                index=X_val.index)
    else:
        X_val_df = None

    def align_columns(*dfs):
        all_columns = list(set().union(*(df.columns for df in dfs if df is not None)))
        for df in dfs:
            if df is not None:
                missing_cols = [col for col in all_columns if col not in df.columns]
                for col in missing_cols:
                    df[col] = 0
                df = df[all_columns]  
        return dfs

    X_train_df, X_test_df, X_val_df = align_columns(X_train_df, 
                                                    X_test_df, 
                                                    X_val_df)

    return (X_train_df, X_test_df, X_val_df) if X_val is not None else (X_train_df, X_test_df)


In [21]:
X_train_pp, X_test_pp = preprocess_data(X_train=X_train, 
                                        X_test=test)

## Modeling

In [22]:
from sklearn.linear_model import LinearRegression, Ridge, ElasticNet
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor
import os

In [23]:
models = {
    "linear_regression": LinearRegression(),
    "ridge": Ridge(alpha=1.0),
    "elastic_net": ElasticNet(alpha=0.1, 
                              l1_ratio=0.5),
    "random_forest": RandomForestRegressor(n_estimators=100, 
                                           random_state=42),
    "xgboost": XGBRegressor(n_estimators=500, 
                            max_depth=6, 
                            learning_rate=0.1, 
                            random_state=42)
}

In [24]:
import os
import pickle
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold

def regression_training_loop(models, 
                             X_train, 
                             y_train, 
                             X_val=None, 
                             save_path='models', 
                             use_best_model=True, 
                             cv_folds=5,
                             scoring_metric="MSE"):
    
    if not os.path.exists(save_path):
        os.makedirs(save_path)

    results = {}
    best_model = None 
    best_score = float('-inf') if scoring_metric == 'R2' else float("inf")  
    kf = KFold(n_splits=cv_folds, shuffle=True, random_state=42)

    for name, model in models.items():
        print(f'\nTraining {name} with {cv_folds}-Fold Cross Validation...')
        
        fold_metrics = []  
        for fold, (train_idx, val_idx) in enumerate(kf.split(X_train), 1):
            print(f"  ⏳ Training {name} - Fold {fold}/{cv_folds}...")

            X_train_cv, X_val_cv = X_train.iloc[train_idx], X_train.iloc[val_idx]
            y_train_cv, y_val_cv = y_train.iloc[train_idx], y_train.iloc[val_idx]

            model.fit(X_train_cv, y_train_cv)
            y_pred_cv = model.predict(X_val_cv)

            metrics = compute_metrics(y_val_cv, y_pred_cv)
            fold_metrics.append(metrics)

            with open(os.path.join(save_path, f"{name}-fold{fold}.sav"), 'wb') as f:
                pickle.dump(model, f)

        avg_metrics = pd.DataFrame(fold_metrics).mean().to_dict()
        results[name] = avg_metrics

        metric_val = avg_metrics[scoring_metric]
        if (scoring_metric == "R2" and metric_val > best_score) or (scoring_metric in ["MSE", 
                                                                                       "RMSE", 
                                                                                       "MAE", 
                                                                                       "MSLE"] 
                                                                                       and metric_val < best_score):
            best_score = metric_val
            best_model = model

    results_df = pd.DataFrame(results).T
    formatted_results = results_df.applymap(lambda x: f"{x:,.2f}" if isinstance(x, 
                                                                                (int, float)) else x)

    print("\n✅ Model Training Completed. Results:")
    print(formatted_results)

    if use_best_model and best_model:
        print(f"\n🏆 Retraining best model ({best_model.__class__.__name__}) on full training data...")
        best_model.fit(X_train, y_train)

        best_model_filename = os.path.join(save_path, f"best_model_{best_model.__class__.__name__}.sav")
        with open(best_model_filename, 'wb') as f:
            pickle.dump(best_model, f)
        
        if X_val is not None: 
            print("\n📊 Generating predictions on validation set...")
            val_preds = best_model.predict(X_val)
            return best_model, formatted_results, val_preds

    return best_model, formatted_results, best_model_filename


In [None]:
best_model, model_results, df_best_model_fn = regression_training_loop(
    models, 
    X_train=X_train_pp,
    y_train=y_train,
    scoring_metric="MAE")


In [None]:
style_dataframe(model_results)

## Test final model on test data

In [29]:
with open(df_best_model_fn, 'rb') as f:
    best_model = pickle.load(f)


## Use Shapley to visualise local and global importance

In [None]:
import shap

## Global Feature Importance

In [None]:
sample_X_test = X_test_pp[:100]

In [35]:
explainer = shap.Explainer(best_model, sample_X_test)

In [36]:
shap_values = explainer(sample_X_test)

In [None]:
shap.summary_plot(shap_values, sample_X_test)

## Local Importance

In [None]:
idx = 0
shap.waterfall_plot(shap_values[idx])

In [None]:
shap.force_plot(explainer.expected_value, 
                shap_values[idx].values, 
                sample_X_test.iloc[idx], matplotlib=True)


## Join results on to test data

In [109]:
y_test_preds = best_model.predict(X_test_pp)

In [110]:
x_test_with_preds = pd.concat(
    [
        pd.Series(y_test_preds, name='PredSalePrice'), 
        X_test_pp
    ], 
    axis=1)

In [None]:
style_dataframe(x_test_with_preds.head())