In [115]:
# Data
import pandas as pd
import numpy as np

# Preprocessing
## sklearn
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer

from feature_engine.encoding import RareLabelEncoder
from sklearn import set_config
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PolynomialFeatures, SplineTransformer

# Pipeline functions

In [41]:
class PandasTransformerMixin:
    def set_output(self, transformer):
        self.transformer = transformer
        return self

    def get_output(self):
        if hasattr(self, 'transformer') and self.transformer == "pandas":
            return pd.DataFrame
        else:
            return None

In [32]:
class CategoricalGrouping(BaseEstimator, TransformerMixin):
    def __init__(self, threshold=50):
        self.threshold = threshold
        self.categories = {}
    
    def fit(self, X, y=None):
        object_cols = X.select_dtypes(include='object').columns
        for col in object_cols:
            val_counts = X[col].value_counts()
            self.categories[col] = val_counts[val_counts >= self.threshold].index.tolist()
        return self
    
    def transform(self, X):
        X = X.copy()
        object_cols = X.select_dtypes(include='object').columns
        for col in object_cols:
            if col in self.categories:
                # Replace values below threshold and missing values with "other"
                X[col] = X[col].where(X[col].isin(self.categories[col]), "other")
        return X
    
    def fit_transform(self, X, y=None):
        return self.fit(X, y).transform(X)


In [33]:
class TargetEncoder(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.mapping = {}

    def fit(self, X, y):
        self.mapping = {}
        df = pd.concat([X, y], axis=1)
        for col in X.columns:
            target_mean = df.groupby(col)[y.name].mean()
            self.mapping[col] = target_mean
        return self

    def transform(self, X):
        X_encoded = X.copy()
        for col, target_mean in self.mapping.items():
            X_encoded[col] = X[col].map(target_mean)
        return X_encoded

In [34]:
class RemoveIdenticalColumns(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        self.columns_to_drop_ = np.where(X.nunique() == 1)[0]
        return self

    def transform(self, X):
        return X.drop(columns=X.columns[self.columns_to_drop_])

In [42]:
class RemoveWhitespace(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X.applymap(lambda x: x.strip() if isinstance(x, str) else x)

# Load and split data

In [6]:
train = pd.read_csv("data/train.csv")
test = pd.read_csv("data/test.csv")

In [64]:
feature_list = ['MSSubClass', 'MSZoning', 'LotFrontage', 'LotArea', 'Street',
       'Alley', 'LotShape', 'LandContour', 'Utilities', 'LotConfig',
       'LandSlope', 'Neighborhood', 'Condition1', 'Condition2', 'BldgType',
       'HouseStyle', 'OverallQual', 'OverallCond', 'YearBuilt', 'YearRemodAdd',
       'RoofStyle', 'RoofMatl', 'Exterior1st', 'Exterior2nd', 'MasVnrType',
       'MasVnrArea', 'ExterQual', 'ExterCond', 'Foundation', 'BsmtQual',
       'BsmtCond', 'BsmtExposure', 'BsmtFinType1', 'BsmtFinSF1',
       'BsmtFinType2', 'BsmtFinSF2', 'BsmtUnfSF', 'TotalBsmtSF', 'Heating',
       'HeatingQC', 'CentralAir', 'Electrical', '1stFlrSF', '2ndFlrSF',
       'LowQualFinSF', 'GrLivArea', 'BsmtFullBath', 'BsmtHalfBath', 'FullBath',
       'HalfBath', 'BedroomAbvGr', 'KitchenAbvGr', 'KitchenQual',
       'TotRmsAbvGrd', 'Functional', 'Fireplaces', 'FireplaceQu', 'GarageType',
       'GarageYrBlt', 'GarageFinish', 'GarageCars', 'GarageArea', 'GarageQual',
       'GarageCond', 'PavedDrive', 'WoodDeckSF', 'OpenPorchSF',
       'EnclosedPorch', '3SsnPorch', 'ScreenPorch', 'PoolArea', 'PoolQC',
       'Fence', 'MiscFeature', 'MiscVal', 'MoSold', 'YrSold', 'SaleType',
       'SaleCondition']

target = 'SalePrice'

In [10]:
X_train = train[feature_list]
y_train = train[target]

X_test = test[feature_list]

In [65]:
cat_cols = [col for col in X_train.columns if X_train[col].dtype == 'object']
num_cols = [col for col in X_train.columns if X_train[col].dtype in ('int64', 'float64')]

# Data Pipeline

In [112]:
target = True

set_config(transform_output = 'pandas')

if target:
    CategoricalEncoding = make_pipeline(SimpleImputer(strategy="most_frequent"), 
                                        RareLabelEncoder(tol=0.05, n_categories = 5), 
                                        TargetEncoder(),
                                        StandardScaler())
else:
    CategoricalEncoding = make_pipeline(SimpleImputer(strategy="most_frequent"),
                                        RareLabelEncoder(tol=0.05, n_categories = 5),
                                        OneHotEncoder(drop='first', sparse_output=False, 
                                                      handle_unknown="ignore"))


num_pipe = make_pipeline(SimpleImputer(), StandardScaler())

num_cat_pipe = ColumnTransformer(
    (
        ("categorical", CategoricalEncoding, cat_cols),
        ("numerical", num_pipe, num_cols),
    ),
    verbose_feature_names_out=False,
    remainder = "passtrough"
)

treatment_pipe = make_pipeline(num_cat_pipe, RemoveIdenticalColumns())


In [113]:
treatment_pipe.fit(X_train, y_train)

X_train_t = treatment_pipe.transform(X_train)
X_test_t = treatment_pipe.transform(X_test)



# Feature engineering

In [119]:
num_cols

['MSSubClass',
 'LotFrontage',
 'LotArea',
 'OverallQual',
 'OverallCond',
 'YearBuilt',
 'YearRemodAdd',
 'MasVnrArea',
 'BsmtFinSF1',
 'BsmtFinSF2',
 'BsmtUnfSF',
 'TotalBsmtSF',
 '1stFlrSF',
 '2ndFlrSF',
 'LowQualFinSF',
 'GrLivArea',
 'BsmtFullBath',
 'BsmtHalfBath',
 'FullBath',
 'HalfBath',
 'BedroomAbvGr',
 'KitchenAbvGr',
 'TotRmsAbvGrd',
 'Fireplaces',
 'GarageYrBlt',
 'GarageCars',
 'GarageArea',
 'WoodDeckSF',
 'OpenPorchSF',
 'EnclosedPorch',
 '3SsnPorch',
 'ScreenPorch',
 'PoolArea',
 'MiscVal',
 'MoSold',
 'YrSold']

## Polynomial transformer

In [120]:
quadratic_cols = ['LotArea',
 'OverallQual']

cubic_cols = []

quartic_cols = []

quintic_cols = []

In [124]:
quadratic_transformer = PolynomialFeatures(2, include_bias = False)

cubic_transformer = PolynomialFeatures(3, include_bias = False)

quartic_transformer = PolynomialFeatures(4, include_bias = False)

quintic_transformer = PolynomialFeatures(5, include_bias = False)

poly_pipe = ColumnTransformer(
    (
        ("quadratic", quadratic_transformer, quadratic_cols),
        ("cubic", cubic_transformer, cubic_cols),
        ("quartic", quartic_transformer, quartic_cols),
        ("quintic", quintic_transformer, quintic_cols),
    ),
    verbose_feature_names_out=False,
    remainder = "passtrough"
)

## Splines

Add spesific features and specify degree and n_knots for that particular feature. 

In [None]:
spline_pipe = ColumnTransformer(
    (
        ("first_feature", SplineTransformer(n_knots = 3, degree = 3), []),
    ),
    verbose_feature_names_out=False,
    remainder = "passtrough"
)

In [122]:
poly_pipe.fit(X_train_t)

X_train_e = poly_pipe.transform(X_train_t)

In [123]:
X_train_e.columns

Index(['LotArea', 'OverallQual', 'LotArea^2', 'LotArea OverallQual',
       'OverallQual^2'],
      dtype='object')