When preprocessing and creating a model in a Machine Learning workflow, it is fundamental to ensure that every dataset is processed in the same way every time. To this extent, pipelines play a key role ensuring the same process for every dataset we want to import and use.

A pipeline can be defined as a sequence of objects that act on a set of data. The actions can include:

- apply transformations
- impute missing values (mean, median, zero, etc.)
- create a new feature
- fit a model
- predict on unseen data

The purpose of this is to validate the ML process. Indeed, whenever a part of the process changes, it is really useful to understand whether that change improved the performance or not. This could be the creation of a new feature or imputing missing values with the mean value of that feature rather than with zero. 

A trustable evaluation of the process is needed to get all this insights, else you will not be able to judge whether a specific change is improving or worsening your model performances. The definiton of a pipeline is fundamental to this extent, as repeating all those actions manually could easily lead to errors along the ML workflow.

First, let's import all the needed libraries and the dataset to be used:

In [None]:
import pandas as pd
import numpy as np

from sklearn.model_selection import StratifiedShuffleSplit

from sklearn.base import BaseEstimator, TransformerMixin

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

df_train_filepath = r'.\dataset\train.csv'
df_train = pd.read_csv(df_train_filepath)

df_test_filepath = r'.\dataset\test.csv'
df_test = pd.read_csv(df_test_filepath)

df_train.head()

def make_test(train, test_size, random_state, strat_feat=None):
    if strat_feat:
        
        split = StratifiedShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)

        for train_index, test_index in split.split(train, train[strat_feat]):
            train_set = train.loc[train_index]
            test_set = train.loc[test_index]
            
    return train_set, test_set

# Split the dataset while maintaining the proportion of 'Neighborhood'

train_set, test_set = make_test(df_train, 
                                test_size=0.2, random_state=654, 
                                strat_feat='Neighborhood')

print("\nTraining Set:\n", train_set)
print("\nTest Set:\n", test_set)

pd.set_option('display.max_columns', 500)

Let's define the classes that are simple wrappers of already existing classes. We do this because using for example the SimpleImputer class, the output would be a numpy object while we want to keep the DataFrame object along our ML workflow. To this extent, customized classes can be defined as follows:

In [None]:
class general_cleaner(BaseEstimator, TransformerMixin):
    '''
    This class applies what we know from the documetation.
    It cleans some known missing values
    If flags the missing values

    This process is supposed to happen as first step of any pipeline
    '''
        
    def fit(self, X, y=None):
        return self
    
    def transform(self, X, y=None):
        #LotFrontage
        X.loc[X.LotFrontage.isnull(), 'LotFrontage'] = 0
        #Alley
        X.loc[X.Alley.isnull(), 'Alley'] = "NoAlley"
        #MSSubClass
        X['MSSubClass'] = X['MSSubClass'].astype(str)
        #MissingBasement
        fil = ((X.BsmtQual.isnull()) & (X.BsmtCond.isnull()) & (X.BsmtExposure.isnull()) &
              (X.BsmtFinType1.isnull()) & (X.BsmtFinType2.isnull()))
        fil1 = ((X.BsmtQual.notnull()) | (X.BsmtCond.notnull()) | (X.BsmtExposure.notnull()) |
              (X.BsmtFinType1.notnull()) | (X.BsmtFinType2.notnull()))
        X.loc[fil1, 'MisBsm'] = 0
        X.loc[fil, 'MisBsm'] = 1 # made explicit for safety
        #BsmtQual
        X.loc[fil, 'BsmtQual'] = "NoBsmt" #missing basement
        #BsmtCond
        X.loc[fil, 'BsmtCond'] = "NoBsmt" #missing basement
        #BsmtExposure
        X.loc[fil, 'BsmtExposure'] = "NoBsmt" #missing basement
        #BsmtFinType1
        X.loc[fil, 'BsmtFinType1'] = "NoBsmt" #missing basement
        #BsmtFinType2
        X.loc[fil, 'BsmtFinType2'] = "NoBsmt" #missing basement
        #BsmtFinSF1
        X.loc[fil, 'BsmtFinSF1'] = 0 # No bsmt
        #BsmtFinSF2
        X.loc[fil, 'BsmtFinSF2'] = 0 # No bsmt
        #BsmtUnfSF
        X.loc[fil, 'BsmtUnfSF'] = 0 # No bsmt
        #TotalBsmtSF
        X.loc[fil, 'TotalBsmtSF'] = 0 # No bsmt
        #BsmtFullBath
        X.loc[fil, 'BsmtFullBath'] = 0 # No bsmt
        #BsmtHalfBath
        X.loc[fil, 'BsmtHalfBath'] = 0 # No bsmt
        #FireplaceQu
        X.loc[(X.Fireplaces == 0) & (X.FireplaceQu.isnull()), 'FireplaceQu'] = "NoFire" #missing
        #MisGarage
        fil = ((X.GarageYrBlt.isnull()) & (X.GarageType.isnull()) & (X.GarageFinish.isnull()) &
              (X.GarageQual.isnull()) & (X.GarageCond.isnull()))
        fil1 = ((X.GarageYrBlt.notnull()) | (X.GarageType.notnull()) | (X.GarageFinish.notnull()) |
              (X.GarageQual.notnull()) | (X.GarageCond.notnull()))
        X.loc[fil1, 'MisGarage'] = 0
        X.loc[fil, 'MisGarage'] = 1
        #GarageYrBlt
        X.loc[X.GarageYrBlt > 2200, 'GarageYrBlt'] = 2007 #correct mistake
        X.loc[fil, 'GarageYrBlt'] = 0
        #GarageType
        X.loc[fil, 'GarageType'] = "NoGrg" #missing garage
        #GarageFinish
        X.loc[fil, 'GarageFinish'] = "NoGrg" #missing
        #GarageQual
        X.loc[fil, 'GarageQual'] = "NoGrg" #missing
        #GarageCond
        X.loc[fil, 'GarageCond'] = "NoGrg" #missing
        #Fence
        X.loc[X.Fence.isnull(), 'Fence'] = "NoFence" #missing fence
        #Pool
        fil = ((X.PoolArea == 0) & (X.PoolQC.isnull()))
        X.loc[fil, 'PoolQC'] = 'NoPool' 
        
        del X['Id']
        del X['MiscFeature']
        del X['MSSubClass']
        del X['Neighborhood']  # this should be useful
        del X['Condition1']
        del X['Condition2']
        del X['ExterCond']  # maybe ordinal
        del X['Exterior1st']
        del X['Exterior2nd']
        del X['Functional']
        del X['Heating']
        del X['PoolQC']
        del X['RoofMatl']
        del X['RoofStyle']
        del X['SaleCondition']
        del X['SaleType']
        del X['Utilities']
        del X['BsmtCond']
        del X['Electrical']
        del X['Foundation']
        del X['Street']
        del X['Fence']
        del X['LandSlope']
        
        return X

class df_imputer(BaseEstimator, TransformerMixin):
    '''
    Just a wrapper for the SimpleImputer that keeps the dataframe structure
    '''
    def __init__(self, strategy='mean'):
        self.strategy = strategy
        self.imp = None
        self.statistics_ = None

    def fit(self, X, y=None):
        self.imp = SimpleImputer(strategy=self.strategy)
        self.imp.fit(X)
        self.statistics_ = pd.Series(self.imp.statistics_, index=X.columns)
        return self

    def transform(self, X):
        # X is supposed to be a DataFrame
        Ximp = self.imp.transform(X)
        Xfilled = pd.DataFrame(Ximp, index=X.index, columns=X.columns)
        return Xfilled
    
    
class df_scaler(BaseEstimator, TransformerMixin):
    '''
    Wrapper of StandardScaler or RobustScaler
    '''
    def __init__(self, method='standard'):
        self.scl = None
        self.scale_ = None
        self.method = method
        if self.method == 'sdandard':
            self.mean_ = None
        elif method == 'robust':
            self.center_ = None
        self.columns = None  # this is useful when it is the last step of a pipeline before the model

    def fit(self, X, y=None):
        if self.method == 'standard':
            self.scl = StandardScaler()
            self.scl.fit(X) # fit will learn the parameters needed to scale the data
            self.mean_ = pd.Series(self.scl.mean_, index=X.columns)
        elif self.method == 'robust':
            self.scl = RobustScaler()
            self.scl.fit(X) # fit will learn the parameters needed to scale the data
            self.center_ = pd.Series(self.scl.center_, index=X.columns)
        self.scale_ = pd.Series(self.scl.scale_, index=X.columns)
        return self

    def transform(self, X):
        # assumes X is a DataFrame, in such a way we applied the transform (scaling) while keeping the DataFrame
        #print(f"The previous saved columns during fit were: {self.scl.feature_names_in_}")
        #print(f"Now the columns used are: {X.columns}")
        # Compare the columns of the DataFrame with feature_names_in_
        #are_columns_equal = X.columns.equals(pd.Index(self.scl.feature_names_in_))
        #print("Columns match with feature_names_in_:", are_columns_equal)
        X = X.loc[:, self.scl.feature_names_in_]
        Xscl = self.scl.transform(X)
        
        #print(self.scl.feature_names_in_)
        Xscaled = pd.DataFrame(Xscl, index=X.index, columns=X.columns)
        self.columns = X.columns
        return Xscaled

    def get_feature_names(self):
        return list(self.columns)  # this is going to be useful when coupled with FeatureUnion
    

class dummify(BaseEstimator, TransformerMixin):
    '''
    Wrapper for get dummies, this is used to turn the categorical features into one-hot encodes.
    The different values of that feature will become new features. No need to fit anything. 
    '''
    def __init__(self, drop_first=False, match_cols=True):
        self.drop_first = drop_first
        self.columns = []  # useful to well behave with FeatureUnion
        self.match_cols = match_cols

    def fit(self, X, y=None):
        self.columns = []  # for safety, when we refit we want new columns
        return self
    
    def match_columns(self, X):
        miss_train = list(set(X.columns) - set(self.columns))
        miss_test = list(set(self.columns) - set(X.columns))
        
        err = 0
        
        if len(miss_test) > 0:
            for col in miss_test:
                X[col] = 0  # insert a column for the missing dummy
                err += 1
        if len(miss_train) > 0:
            for col in miss_train:
                del X[col]  # delete the column of the extra dummy
                err += 1
                
        if err > 0:
            warnings.warn('The dummies in this set do not match the ones in the train set, we corrected the issue.',
                         UserWarning) # this is to avoid that in the test dataset there are less dummies, this will take care of them assigning 0 to them, all this to not break the pipeline
        return X
    
        
    def transform(self, X):
        X = pd.get_dummies(X, drop_first=self.drop_first)
        if (len(self.columns) > 0): 
            if self.match_cols:
                X = self.match_columns(X)
            self.columns = X.columns
        else:
            self.columns = X.columns
        return X
    
    def get_features_name(self):
        return self.columns


The dummify class in machine learning is useful for encoding categorical features using one-hot encoding. Let's test it:

In [None]:
tmp = train_set[['HouseStyle']].copy()
dummifier = dummify()
tmp = dummifier.transform(tmp)
tmp.sample(5)

Notice that, before the categorical variable HouseStyle had different values (1.5Fin, 1.5Unf, 1Story, etc.) and they have been one-hot encoded and turned into dummy variables.

Anyways, in every ML workflow mostly of the times the first transformer to be applied will likely be the imputer. The imputation method will change if dealing with numerical variables or categorical ones. Thus, the following class is to first split numerical and categorical variables into two subsets.

In [None]:
class feat_sel(BaseEstimator, TransformerMixin):
    '''
    This transformer selects either numerical or categorical features.
    In this way we can build separate pipelines for separate data types.
    '''
    def __init__(self, dtype='numeric'):
        self.dtype = dtype

    def fit( self, X, y=None ):
        return self 

    def transform(self, X, y=None):
        if self.dtype == 'numeric':
            num_cols = X.columns[X.dtypes != object].tolist()
            return X[num_cols]
        elif self.dtype == 'category':
            cat_cols = X.columns[X.dtypes == object].tolist()
            return X[cat_cols]

and in order to test it:

In [None]:
tmp = train_set.copy()
selector = feat_sel()  # it is numeric by default
tmp = selector.transform(tmp)  # no reason to fit again
tmp.head()

Usually when working with categorical features some categories may be really rare, implying a mismatch in the dimension between test and train datasets. To take care of this issue, our dummify class will add the columns to not have any dimensional mismatch between train and test datasets. Let's try it out:

In [None]:
tmp = train_set[['RoofMatl']].copy()
dummifier = dummify()
dummifier.fit_transform(tmp).sum()

but plotting out the test dataset, we can see that we do not have all those categories:

In [None]:
test_set.RoofMatl.value_counts()

and dummifying this categorical variable would lead to creating just 4 columns, one for each category present. Though, the custom dummifier takes care of that in the following way:

In [None]:
tmp = test_set[['RoofMatl']].copy()
dummifier.transform(tmp).sum()

Now we are ready to create our customized pipeline to manage numerical features. Ideally, we want it to be as follows:
- cleaning the data
- impute some missing values with mean, median, etc.
- apply some transformation on some features
- create new features
- scale the data

First let's create a transformer for the numeric variables:

In [None]:
class tr_numeric(BaseEstimator, TransformerMixin):
    def __init__(self, SF_room=True):
        self.columns = []  # useful to well behave with FeatureUnion
        self.SF_room = SF_room
        

    def fit(self, X, y=None):
        return self
    

    def remove_skew(self, X, column):
        X[column] = np.log1p(X[column])
        return X


    def SF_per_room(self, X):
        if self.SF_room:
            X['sf_per_room'] = X['GrLivArea'] / X['TotRmsAbvGrd']
        return X
    

    def transform(self, X, y=None):
        for col in ['GrLivArea', '1stFlrSF', 'LotArea']: # they can also be inputs
            X = self.remove_skew(X, col)

        X = self.SF_per_room(X)
        
        self.columns = X.columns 
        return X
    

    def get_features_name(self):  # again, it will be useful later
        return self.columns

Please note that this transformer takes a parameter that determines whether or not to create a new feature. It is this kind of parameter that can be tuned with a GridSearch (more on this later).

Creating a new feature in that way would be impossible if the previous steps were not returning a DataFrame. There is naturally an alternative that includes specifying the index of the columns you want to use, but I find this approach way more user-friendly and robust.

A pipeline for numeric features would then look like this

In [None]:
numeric_pipe = Pipeline([('fs', feat_sel(dtype='numeric')),  # select only the numeric features
                         ('imputer', df_imputer(strategy='median')),  # impute the missing values with the median of each column
                         ('transf', tr_numeric(SF_room=True)),  # remove skew and create a new feature
                         ('scl', df_scaler(method='standard'))])  # scale the data

full_pipe = Pipeline([('gen_cl', general_cleaner()), ('num_pipe', numeric_pipe)])  # put the cleaner on top because we like it clean

In other words, with the use of the sklearn Pipeline, we want to sequentially apply the transformations in the given list. The list is made of tuples, the first element is a label for that step, and the second element is the transformation (or the model, or another pipeline). The name is useful to identify every parameter of the Pipeline, as we will see later.

This pipeline, given the training data, acts as follows

In [None]:
tmp = train_set.copy()
tmp = full_pipe.fit_transform(tmp)
tmp.head()

In [None]:
tmp.info()

As we wanted, the data flew through the pipeline, getting cleaned, transformed, and rescaled. Moreover, we still have a nice DataFrame structure.

The powerfulness of this pipeline is visible when we want to do the same thing to the validation set and it is evident when we implement it

In [None]:
tmp = test_set.copy()  # not ready to work on those sets yet
tmp = full_pipe.transform(tmp)  # the fit already happened with the training set, we don't want to fit again
tmp.head()

Along the pipeline the fit method is used by the scaler to get the parameters (mean value if StandardScaler, median value if RobustScaler) while the transform method is used to apply the transformation to the dataset. In all the ML applications when scaling the dataset to be used for training, it is important to keep in mind that the parameters have to be computed only on the training dataset and not including the testing dataset. This is done because computing the the mean value on the whole dataset would add an information about the testing data inside the training one, that is not what we want. We want to keep the testing data unseen and use it for inference! This is why when applying the pipeline to the testing dataset we do not want to fit again.

Though, the full pipeline will look like this:

In [None]:
full_pipe.get_params()

Instead for categorical features:

In [None]:
class make_ordinal(BaseEstimator, TransformerMixin):
    '''
    Transforms ordinal features in order to have them as numeric (preserving the order)
    If unsure about converting or not a feature (maybe making dummies is better), make use of
    extra_cols and include_extra
    '''
    def __init__(self, cols, extra_cols=None, include_extra=True):
        self.cols = cols
        self.extra_cols = extra_cols
        self.mapping = {'Po':1, 'Fa': 2, 'TA': 3, 'Gd': 4, 'Ex': 5}
        self.include_extra = include_extra
    

    def fit(self, X, y=None):
        return self
    

    def transform(self, X, y=None):
        if self.extra_cols:
            if self.include_extra:
                self.cols += self.extra_cols
            else:
                for col in self.extra_cols:
                    del X[col]
        
        for col in self.cols:
            X.loc[:, col] = X[col].map(self.mapping).fillna(0)
        return X

    
class recode_cat(BaseEstimator, TransformerMixin):        
    '''
    Recodes some categorical variables according to the insights gained from the
    data exploration phase. Not presented in this notebook
    '''
    def fit(self, X, y=None):
        return self
    
    
    def tr_GrgType(self, data):
        data['GarageType'] = data['GarageType'].map({'Basment': 'Attchd',
                                                  'CarPort': 'Detchd', 
                                                  '2Types': 'Attchd' }).fillna(data['GarageType'])
        return data
    
    
    def tr_LotShape(self, data):
        fil = (data.LotShape != 'Reg')
        data['LotShape'] = 1
        data.loc[fil, 'LotShape'] = 0
        return data
    
    
    def tr_LandCont(self, data):
        fil = (data.LandContour == 'HLS') | (data.LandContour == 'Low')
        data['LandContour'] = 0
        data.loc[fil, 'LandContour'] = 1
        return data
    
    
    def tr_LandSlope(self, data):
        fil = (data.LandSlope != 'Gtl')
        data['LandSlope'] = 0
        data.loc[fil, 'LandSlope'] = 1
        return data
    
    
    def tr_MSZoning(self, data):
        data['MSZoning'] = data['MSZoning'].map({'RH': 'RM', # medium and high density
                                                 'C (all)': 'RM', # commercial and medium density
                                                 'FV': 'RM'}).fillna(data['MSZoning'])
        return data
    
    
    def tr_Alley(self, data):
        fil = (data.Alley != 'NoAlley')
        data['Alley'] = 0
        data.loc[fil, 'Alley'] = 1
        return data
    
    
    def tr_LotConfig(self, data):
        data['LotConfig'] = data['LotConfig'].map({'FR3': 'Corner', # corners have 2 or 3 free sides
                                                   'FR2': 'Corner'}).fillna(data['LotConfig'])
        return data
    
    
    def tr_BldgType(self, data):
        data['BldgType'] = data['BldgType'].map({'Twnhs' : 'TwnhsE',
                                                 '2fmCon': 'Duplex'}).fillna(data['BldgType'])
        return data
    
    
    def tr_MasVnrType(self, data):
        data['MasVnrType'] = data['MasVnrType'].map({'BrkCmn': 'BrkFace'}).fillna(data['MasVnrType'])
        return data


    def tr_HouseStyle(self, data):
        data['HouseStyle'] = data['HouseStyle'].map({'1.5Fin': '1.5Unf', 
                                                         '2.5Fin': '2Story', 
                                                         '2.5Unf': '2Story', 
                                                         'SLvl': 'SFoyer'}).fillna(data['HouseStyle'])
        return data
    
    
    def transform(self, X, y=None):
        X = self.tr_GrgType(X)
        X = self.tr_LotShape(X)
        X = self.tr_LotConfig(X)
        X = self.tr_MSZoning(X)
        X = self.tr_Alley(X)
        X = self.tr_LandCont(X)
        X = self.tr_BldgType(X)
        X = self.tr_MasVnrType(X)
        X = self.tr_HouseStyle(X)
        return X

The pipeline for categorical features will then be

In [None]:
cat_pipe = Pipeline([('fs', feat_sel(dtype='category')),
                     ('imputer', df_imputer(strategy='most_frequent')), 
                     ('ord', make_ordinal(['BsmtQual', 'KitchenQual','GarageQual',
                                           'GarageCond', 'ExterQual', 'HeatingQC'])), 
                     ('recode', recode_cat()), 
                     ('dummies', dummify())])

full_pipe = Pipeline([('gen_cl', general_cleaner()), ('cat_pipe', cat_pipe)])

tmp = train_set.copy()
tmp = full_pipe.fit_transform(tmp)
tmp.head()

And there we have it, some categories converted into numeric features, other first recoded and the dummified. This dataset is ready for a model and, as before, this pipeline is ready for the validation set as well

In [None]:
tmp = test_set.copy()
tmp = full_pipe.transform(tmp)
tmp.head()

In [None]:
full_pipe.get_params()

Now we are ready to put everything together. We have a pipeline for numeric features, one for categorical one, now we want a complete pipeline for the entire dataset.

Sklearn again helps us with FeatureUnion that, sadly, again compromises the DataFrame structure we are very much fun of. By now, we are confident enough to create our own version of it.

In [None]:
class FeatureUnion_df(TransformerMixin, BaseEstimator):
    '''
    Wrapper of FeatureUnion but returning a Dataframe, 
    the column order follows the concatenation done by FeatureUnion

    transformer_list: list of Pipelines

    '''
    def __init__(self, transformer_list, n_jobs=None, transformer_weights=None, verbose=False, **kwargs):
        self.transformer_list = transformer_list
        self.n_jobs = n_jobs
        self.transformer_weights = transformer_weights
        self.verbose = verbose  # these are necessary to work inside of GridSearch or similar
        self.kwargs = kwargs  # Capture extra arguments
        self.feat_un = FeatureUnion(self.transformer_list, 
                                    n_jobs=self.n_jobs, 
                                    transformer_weights=self.transformer_weights, 
                                    verbose=self.verbose,
                                    **self.kwargs) # to fix additional key words passed by grid search to FeatureUnion
        
    def fit(self, X, y=None):
        self.feat_un.fit(X)
        return self

    def transform(self, X, y=None):
        X_tr = self.feat_un.transform(X)
        columns = []
        
        for trsnf in self.transformer_list:
            cols = trsnf[1].steps[-1][1].get_features_name()  # getting the features name from the last step of each pipeline
            columns += list(cols)

        X_tr = pd.DataFrame(X_tr, index=X.index, columns=columns)
        
        return X_tr

    def get_params(self, deep=True):  # necessary to well behave in GridSearch
        return self.feat_un.get_params(deep=deep)

I hope it is now evident why I kept implementing a get_features_name method in the previous classes. It was all for this moment.

The complete pipeline will then be

In [None]:
numeric_pipe = Pipeline([('fs', feat_sel('numeric')),
                         ('imputer', df_imputer(strategy='median')),
                         ('transf', tr_numeric())])

cat_pipe = Pipeline([('fs', feat_sel('category')),
                     ('imputer', df_imputer(strategy='most_frequent')), 
                     ('ord', make_ordinal(['BsmtQual', 'KitchenQual','GarageQual',
                                           'GarageCond', 'ExterQual', 'HeatingQC'])), 
                     ('recode', recode_cat()), 
                     ('dummies', dummify())])

processing_pipe = FeatureUnion_df(transformer_list=[('cat_pipe', cat_pipe),
                                                 ('num_pipe', numeric_pipe)], )

full_pipe = Pipeline([('gen_cl', general_cleaner()), 
                      ('processing', processing_pipe), 
                      ('scaler', df_scaler())])  # the scaler is here to have also the ordinal features scaled

tmp = df_train.copy()
tmp = full_pipe.fit_transform(tmp)
tmp.head()

The order of the columns in our DataFrame object changed because FeatureUnion concatenated the results of each transformer.

Again, we can now apply the pipeline to the test set

In [None]:
tmp = test_set.copy()
tmp = full_pipe.transform(tmp)
tmp.head()

This time, the parameters are a bit more complex

In [None]:
full_pipe.get_params()

Having set up everything as we did, it is not difficult to tune our pipeline with GridSearch. We will put a simple model at the end of the pipeline just for the fun of it and tune both the hyperparameters of this model and the parameters of the pipeline.

We thus make use of GridSearch to pick the best model configuration by varying several parameters, namely

Whether or not we create the new feature describing the square feet per room
If we impute the numerical missing values with the mean or the median
If we drop one dummy or not
If we change the regularization parameter of the Lasso regression
Thanks to the fact that we have a pipeline, we are able to easily explore all these configurations without worrying too much about information leakage or by repeating the same steps over and over

In [None]:
from sklearn.linear_model import Lasso
from sklearn.model_selection import KFold, GridSearchCV


folds = KFold(5, shuffle=True, random_state=541)

df_train['Target'] = np.log1p(df_train.SalePrice)

del df_train['SalePrice']

# Split the dataset while maintaining the proportion of 'Neighborhood'
train_set, test_set = make_test(df_train, 
                                test_size=0.2, random_state=654, 
                                strat_feat='Neighborhood')

y = train_set['Target'].copy()
del train_set['Target']

y_test = test_set['Target']
del test_set['Target']


def grid_search(data, target, estimator, param_grid, scoring, cv):
    
    grid = GridSearchCV(estimator=estimator, param_grid=param_grid, 
                        cv=cv, scoring=scoring, n_jobs=-1, return_train_score=False)
    
    pd.options.mode.chained_assignment = None  # this is because the gridsearch throws a lot of pointless warnings
    tmp = data.copy()
    grid = grid.fit(tmp, target)
    pd.options.mode.chained_assignment = 'warn'
    
    result = pd.DataFrame(grid.cv_results_).sort_values(by='mean_test_score', 
                                                        ascending=False).reset_index()
    
    del result['params']
    times = [col for col in result.columns if col.endswith('_time')]
    params = [col for col in result.columns if col.startswith('param_')]
    
    result = result[params + ['mean_test_score', 'std_test_score'] + times]
    
    return result, grid.best_params_

The grid search (here in an utility function just to have better looking results) looks like this

In [None]:
lasso_pipe = Pipeline([('gen_cl', general_cleaner()),
                       ('processing', processing_pipe),
                       ('scl', df_scaler()), 
                       ('lasso', Lasso(alpha=0.01))])

res, bp = grid_search(train_set, y, lasso_pipe, 
            param_grid={'processing__num_pipe__transf__SF_room': [True, False],  # here it is important to specify the values to try for the grid search algorithm
                        'processing__num_pipe__imputer__strategy': ['mean', 'median'],
                        'processing__cat_pipe__dummies__drop_first': [True, False],
                        'lasso__alpha': [0.1, 0.01, 0.001]},
            cv=folds, scoring='neg_mean_squared_error')

res

And the best parameters are

In [None]:
bp

Please note how we refer to a specific parameter by calling every step of the pipeline by its name and concatenating those names by the double underscore.

We see from the GridSearch that, if we ignore the fact that these models are vey simple and not high-performing, the best configurations of parameters are scoring very similar results. One may want to be sure that the model is really the best possible one and/or it is predicting reasonable prices.

Thanks to all that effort in preserving the feature names and making sure that everything happens inside of the pipeline, this model will fit pretty much in any validation approach you might want to adopt.

For example, I might be interested in seeing how the model performs in a 5-fold cross-validation setting, I might want to see how much the predictions are off, if I am missing something in the data, what are the most important features. With a few helper functions, we are going to do all of it.

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error

import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

def cv_score(df_train, y_train, kfolds, pipeline):
    oof = np.zeros(len(df_train))
    train = df_train.copy()
    
    for train_index, test_index in kfolds.split(train.values):
            
        trn_data = train.iloc[train_index][:]
        val_data = train.iloc[test_index][:]
        
        trn_target = y_train.iloc[train_index].values.ravel()
        val_target = y_train.iloc[test_index].values.ravel()
        
        pipeline.fit(trn_data, trn_target)

        oof[test_index] = pipeline.predict(val_data).ravel()
            
    return oof


def get_coef(pipe):
    imp = pipe.steps[-1][1].coef_.tolist()
    feats = pipe.steps[-2][1].get_feature_names()  # again, this is why we implemented that method
    result = pd.DataFrame({'feat':feats,'score':imp})
    result = result.sort_values(by=['score'],ascending=False)
    return result

def _plot_diagonal(ax):
    xmin, xmax = ax.get_xlim()
    ymin, ymax = ax.get_ylim()
    low = min(xmin, xmax)
    high = max(xmin, xmax)
    scl = (high - low) / 100
    
    line = pd.DataFrame({'x': np.arange(low, high ,scl), # small hack for a diagonal line
                         'y': np.arange(low, high ,scl)})
    ax.plot(line.x, line.y, color='black', linestyle='--')
    
    return ax


def plot_predictions(data, true_label, pred_label, feature=None, hue=None, legend=False):
    
    tmp = data.copy()
    tmp['Prediction'] = pred_label
    tmp['True Label'] = true_label
    tmp['Residual'] = tmp['True Label'] - tmp['Prediction']
    
    diag = False
    alpha = 0.7
    label = ''
    
    fig, ax = plt.subplots(1,2, figsize=(15,6))
    
    if feature is None:
        feature = 'True Label'
        diag = True
    else:
        legend = 'full'
        sns.scatterplot(x=feature, y='True Label', data=tmp, ax=ax[0], label='True',
                         hue=hue, legend=legend, alpha=alpha)
        label = 'Predicted'
        alpha = 0.4

    sns.scatterplot(x=feature, y='Prediction', data=tmp, ax=ax[0], label=label,
                         hue=hue, legend=legend, alpha=alpha)
    if diag:
        ax[0] = _plot_diagonal(ax[0])
    
    sns.scatterplot(x=feature, y='Residual', data=tmp, ax=ax[1], 
                    hue=hue, legend=legend, alpha=0.7)
    ax[1].axhline(y=0, color='r', linestyle='--')
    
    ax[0].set_title(f'{feature} vs Predictions')
    ax[1].set_title(f'{feature} vs Residuals')

lasso_oof = cv_score(train_set, y, folds, lasso_pipe)

lasso_oof[:10]

We have our predictions and we can see the coefficients of our regression

In [None]:
get_coef(lasso_pipe)

From the coefficients we can appreciate that GrLivArea and OverallQual have an higher impact on the SalePrice than the other features.

Now we want to see if the predictions are too far off or if there is something odd in the residual plot (I suggest to read about them as they are very useful tools for diagnosing something wrong in your model)

In [None]:
plot_predictions(train_set, y, lasso_oof)

There is a big outlier in our prediction and a visible pattern in the residual plot, both things that would require further investigation.

We can also plot the residuals against the most important features, for example

In [None]:
plot_predictions(train_set, y, lasso_oof, feature='GrLivArea')

And there we find that our prediction so much off with respect to the real value was indeed a house too cheap for its size (to be fair, this is one of the outliers that everybody know about and they are documented in the official documentation).

So far, we have used the default parameters of our pipeline but we know that there is a better configuration thanks to our GridSearch. Let's see if something changes.

In [None]:
numeric_pipe = Pipeline([('fs', feat_sel('numeric')),
                         ('imputer', df_imputer(strategy='mean')),  # tuned above
                         ('transf', tr_numeric(SF_room=True))])  # tuned above


cat_pipe = Pipeline([('fs', feat_sel('category')),
                     ('imputer', df_imputer(strategy='most_frequent')), 
                     ('ord', make_ordinal(['BsmtQual', 'KitchenQual','GarageQual',
                                           'GarageCond', 'ExterQual', 'HeatingQC'])), 
                     ('recode', recode_cat()), 
                     ('dummies', dummify(drop_first=True))])  # tuned above


processing_pipe = FeatureUnion_df(transformer_list=[('cat_pipe', cat_pipe),
                                                    ('num_pipe', numeric_pipe)])

lasso_pipe = Pipeline([('gen_cl', general_cleaner()), 
                 ('processing', processing_pipe),
                  ('scl', df_scaler()), ('lasso', Lasso(alpha=0.01))])  # tuned above

lasso_oof = cv_score(train_set, y, folds, lasso_pipe)

get_coef(lasso_pipe)

In [None]:
plot_predictions(train_set, y, lasso_oof)

In [None]:
plot_predictions(train_set, y, lasso_oof, feature='GrLivArea')

The coefficients are a bit different, but we did not solved much. This was expected since we were not changing too much from the default.

We can make further use of the fact that we are working with a pipeline and directly apply it to the test set and see if the behavior changes.

In [None]:
lasso_pred = lasso_pipe.predict(test_set)

plot_predictions(test_set, y_test, lasso_pred)

In [None]:
plot_predictions(test_set, y_test, lasso_pred, feature='GrLivArea')

For the fans of the numeric metrics

In [None]:
print('Score in 5-fold cv')
print(f'\tRMSE: {round(np.sqrt(mean_squared_error(y, lasso_oof)), 5)}')
print(f'\tMAE: {round(mean_absolute_error(np.expm1(y), np.expm1(lasso_oof)), 2)} dollars')
print('Score on holdout test')
print(f'\tRMSE: {round(np.sqrt(mean_squared_error(y_test, lasso_pred)), 5)}')
print(f'\tMAE: {round(mean_absolute_error(np.expm1(y_test), np.expm1(lasso_pred)), 2)} dollars')

In [None]:
sub = df_test[['Id']].copy()

predictions = lasso_pipe.predict(df_test)

And we again have to put no effort to make our pipeline work on new data. This is important for 2 reasons:

we can put all our effort in making the model better rather than fighting with messy code
we are virtually ready to send our model to our client and it is ready to use