In [1]:
import pandas as pd
import numpy as np
import os

from enum import Enum
from typing import NamedTuple
import functools

from sklearn.linear_model import HuberRegressor, LinearRegression, Ridge, Lasso
from sklearn.model_selection import train_test_split
from scipy.stats import zscore
from scipy.linalg import inv

# Functions & Tools

Functions that compute some model evaluation metrics:

In [2]:
# Mean Absolute Error (MAE)
def mae(y, y_pred):
    return np.mean(np.abs(np.subtract(y, y_pred)))

# Mean Squared Error (MSE)
def mse(y, y_pred):
    return np.mean(np.square(np.subtract(y, y_pred)))

# Root Mean Squared Error (RMSE)
def rmse(y, y_pred):
    return np.sqrt(mse(y, y_pred))

# Residual Sum of Squares (RSS)
def rss(y, y_pred):
    return np.sum(np.square(np.subtract(y, y_pred)))

# Total Sum of Squares
def tss(y):
    return rss(y, np.mean(y))

# R2 Score
def r2score(y, y_pred):
    return 1 - rss(y, y_pred) / tss(y)

Define the inverse function of the logarithmic in base 10:

In [3]:
# np.log10 inverse function
log10_inverse_fn = functools.partial(np.power, 10)

# Classes, Enums and NamedTuple to build models

### VariableType Enum

There are four types of variable: continous, discrete, ordinal and nominal.  
We represent them with a `VariableType` Enum. 

In [4]:
# Variable Type
class VariableType(Enum):
    CONTINUOUS = 1
    DISCRETE = 2
    ORDINAL = 3
    NOMINAL = 4

### Variable NamedTuple

A variable/feature/predictor can be represented by four properties: name, variable type, a mapping/encoding and a transformation function.  
Mapping/encoding is required for ordinal and nominal variables.  
Transformation function can be used to transform a continous variable (example: logarithmic transformation of the Lot Area)

We define a `Variable` **immutable** *structure* with the help of NamedTuple (immutable and more friendly than tuple).

In [5]:
# Variable NameTuple (We want an immutable structure more friendly than tuples)
class Variable(NamedTuple):
    name: str
    type: VariableType
    mapping: dict = None
    fn: fn = None
    
    # return a new variable with a different name
    def set_name(self, new_name):
        return Variable(new_name, self.type, self.mapping, self.fn)
    
    # return a new variable with a different type
    def set_type(self, new_type):
        return Variable(self.name, new_type, self.mapping, self,fn)
    
    # return a new variable with a different transformation function
    def set_function(self, new_fn):
        return Variable(self.name, self.type, self.mapping, new_fn)
    
    # return a new variable with a different mapping (merged or overriden)
    def set_mapping(self, new_mapping, merge=True):
        if merge:
            new_mapping = {**self.mapping, **new_mapping}
        return Variable(self.name, self.type, new_mapping, self.fn)
    
    # return True if the mapping allow to map all the data 
    def check_mapping(self, serie):
        for key in serie.unique():
            if key not in self.mapping:
                return False
        return True

### ModelDataFrameBuilder

This class is the class that do the job of preparing the data according to our wish:

1. Creating a `ModelDataFrameBuilder` object
2. Call its `prepare()` method by providing :
    * the source dataframe (will not be altered by the preparation process)
    * the list of variables (as Variable objects) to prepare
3. Get the prepared data by calling the `data()` method

**Note:**  
All transformations applied during the preparation have no side effects on our source data. We can prepare different data set with different variables and mapping rules without risk to encounter re-entrance issues.

The class also provide some post processing preparation methods:

* `create_interaction_with_nominal_variable()` that allow to multiply the dummy variables by the value of a continuous variable  
(I tried different things like creating an interaction between the lot area and the neighborhood variable)
* `merge_dummy_variables()` to merge the `condition 1 & 2` & `exterior 1st & 2nd` dummy variables
* `normalize()` that allow to standardize/normalize the predictors (can pass an exclusion list if needed and the method used z-score|center)

In [6]:
class ModelDataFrameBuilder:
    def __init__(self):
        self.model_data = pd.DataFrame()
        self.prepared_variables = []
                
    def data(self, exclude=[]):
        return self.model_data.drop(columns=exclude)
        
    def prepare(self, data, variables):
        for var in variables:
            self.prepared_variables.append(var.name)
            self.model_data = pd.concat([self.model_data, self._prepare_variable(var, data)], axis=1)
            
    def create_interaction_with_nominal_variable(self, nominal_var, interaction_var, drop=True):
        # Nominal variables have been one hot encoded (we need the list of the dummy variable names)
        dummy_vars = [var for var in self.model_data.columns if nominal_var in var]
        for var in dummy_vars:
            self.model_data[var] = self.model_data[var] * self.model_data[interaction_var]
        if drop:
            self.model_data.drop(columns=[interaction_var], inplace=True)
     
    def merge_dummy_variables(self, variable1, variable2):
        dummy_vars_1 = [var for var in self.model_data.columns if variable1 in var]
        dummy_vars_2 = [var for var in self.model_data.columns if variable2 in var]
        
        for var in dummy_vars_2:
            suffix = var.split('_')[1]
            dummy = variable1 + '_' + suffix
            self.model_data[dummy] = self.model_data.loc[:, [dummy, var]].max(axis=1)
            self.model_data.drop(columns=[var], inplace=True)
                
    def normalize(self, exclude=[], method='z-score'):
        for var in self.model_data.columns:
            if var in exclude:
                continue
            if method == 'z-score':
                self.model_data[var] = (self.model_data[var] - self.model_data[var].mean()) / self.model_data[var].std()
            else:
                self.model_data[var] = (self.model_data[var] - self.model_data[var].min()) / (self.model_data[var].max() - self.model_data[var].min())
            
    def normalize_include(self, include=[]):
        for var_name in include:
            dummy_vars = [var for var in self.model_data.columns if var_name in var]
            for var in dummy_vars:
                self.model_data[var] = (self.model_data[var] - self.model_data[var].mean()) / self.model_data[var].std()
        
    def _prepare_variable(self, variable, data):
        if variable.type == VariableType.CONTINUOUS:
            return self._prepare_continous_variable(variable, data)
        elif variable.type == VariableType.DISCRETE:
            return self._prepare_discrete_variable(variable, data) 
        elif variable.type == VariableType.ORDINAL:
            return self._prepare_ordinal_variable(variable, data) 
        elif variable.type == VariableType.NOMINAL:
            return self._prepare_nominal_variable(variable, data)
        else:
            print('Unknown Variable Type Error: ', variable.type) 
            
    def _prepare_continous_variable(self, variable, data):
        if variable.fn:
            return variable.fn(data[variable.name])
        return data[variable.name]
    
    def _prepare_discrete_variable(self, variable, data):
        if variable.fn:
            return variable.fn(data[variable.name])
        return data[variable.name]
    
    def _prepare_ordinal_variable(self, variable, data):
        if variable.mapping:
            if variable.check_mapping(data[variable.name]):
                return data[variable.name].map(variable.mapping)
            else:
                raise Exception("MappingError: Unable to encode the variable: {}".format(variable))
        return data[variable.name]
            
    def _prepare_nominal_variable(self, variable, data):
        serie = data[variable.name]
        if variable.mapping:
            if variable.check_mapping(serie):
                serie = serie.map(variable.mapping)
            else:
                raise Exception("MappingError: Unable to encode the variable: {}".format(variable))
        return pd.get_dummies(serie, prefix=variable.name)

### ModelAgent Class

The `ModelAgent` class is responsible to fit and evaluate our model. It manages automatically transformed target variables and compute the different evaluation metrics for the target and transformed target variables if we pass it the transformation inverse function.

Using this class is easy:

1. Instantiate a `ModelAgent` object by passing it:
    * A model object like `HuberRegressor`, `LinearRegression`, `Ridge` and `Lasso` object
    * The optional split parameters (default to test size 50% and random state 0)

2. Call its `fit_and_evaluate()` method, passing:
    * The prepared data returned by the `ModelDataFrameBuilder` object (`data()` method)
    * The name of the target variable (as String)
    * The transformation inverse function if the target variable has been transformed
    
3. Get the results attribute that is a dictionary containing the different vectors and matrices and the evaluation scores train/test x target/target transformed

In [7]:
class ModelAgent:
    def __init__(self, model, split_params={'test_size': 0.5, 'random_state': 0}):
        self.model = model
        self.split_params = split_params
        self.coef_labels = None
        self.results = None
        
    def fit_and_evaluate(self, data, target, inverse_fn=None):
        
        # prepare the predictors matrix and the target vector
        X, y = self._prepare_matrix_vector(data, target)
        
        # split the train/test data sets
        X_tr, X_te, y_tr, y_te = train_test_split(X, y, **self.split_params)
        
        # fit and evaluate the model
        self._fit_and_evaluate(X_tr, X_te, y_tr, y_te, inverse_fn)
     
    def fit_and_evaluate_without_residuals(self, is_residual_filter, inverse_fn = None):
        X_tr = self.results['data']['X_tr']
        X_te = self.results['data']['X_te']
        if inverse_fn:
            # not elegant but to avoid re-intrance issue with the permutation of transformed target variable
            y_tr = self.results['data']['y_transformed_tr']
            y_te = self.results['data']['y_transformed_te']
        else:
            y_tr = self.results['data']['y_tr']
            y_te = self.results['data']['y_te']
        
        X_tr_without_residuals = X_tr[~is_residual_filter, :]
        y_tr_without_residuals = y_tr[~is_residual_filter]
        self._fit_and_evaluate(X_tr_without_residuals, X_te, y_tr_without_residuals, y_te, inverse_fn)
    
    def predict(self, data, inverse_fn=None):
        X = data.values
        if inverse_fn:
            return inverse_fn(self.model.predict(X))
        else:
            return self.model.predict(X)
    
    def coefs(self):
        return pd.DataFrame({'label': self.coef_labels, 'coef': self.model.coef_ }) \
                     .sort_values(['coef'], ascending=True) \
                     .reset_index(drop=True)
    
    def print_coefs(self):
        justify = self.coef_labels.map(len).max() + 1
        
        print("\n\nCoefficients:")
        print("-------------")
        print("{}: {: .4e}".format("Intercept".ljust(justify), self.model.intercept_))
        for col, coef in zip(self.coef_labels, self.model.coef_):
            print("{}: {: .4e}".format(col.ljust(justify), coef))
            
    def plot_coefs(self, figsize=(10, 8), offset=0.002):
        df_coefs = self.coefs()
    
        plt.figure(figsize=figsize)
        plt.barh(np.arange(0, df_coefs.shape[0]), df_coefs['coef'], tick_label=df_coefs['label'])
        for index, value in enumerate(df_coefs['coef']):
            if value > 0:
                text_offset = offset * -1
            else:
                text_offset = 0.001
            v = "{: .4e}".format(value)
            plt.text(x=text_offset, y=index, s=v, va='center', color=blue, fontweight='bold')
        plt.xlabel('Coefficients')
        plt.show()
        
    def residuals_analysis(self, plot=False):
        X_tr = self.results['data']['X_tr']
        y_tr = self.results['data']['y_tr']
        y_pred_tr = self.results['data']['y_pred_tr']
        rss_tr = self.results['target']['train']['RSS']

        # compute training residual standard error (RSE)
        n_predictors = X_tr.shape[1]
        rse = np.sqrt(rss_tr / (y_pred_tr.shape[0] - n_predictors - 1))

        # compute high leverage statistics
        H = np.matmul(X_tr, np.matmul(inv(np.matmul(X_tr.T, X_tr)), X_tr.T))  # H = X * (XT * X)^-1 * XT
        leverage = np.diag(H) # leverage for the ith data point is the hii (diagonal factor) in the hat matrix
        self.results['data']['leverage_log10'] = np.log10(leverage)
        
        # compute the studentized residual
        studentized_residual_tr = (y_tr - y_pred_tr) / (rse * np.sqrt(1 - leverage))
        self.results['data']['studentized_residual_tr'] = studentized_residual_tr
        
        if plot:
            # plot the studentized residual in function of the fitted values and logarithm of the leverage score
            fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(16, 6), sharey=False)
            ax[0].scatter(y_pred_tr, y_tr, s=5, color=blue)
            ax[0].plot(y_pred_tr, y_pred_tr, color=red)
            ax[0].set_xlabel('fitted value')
            ax[0].set_ylabel('real value')
            ax[1].scatter(y_pred_tr, studentized_residual_tr, s=5, color=blue)
            ax[1].set_xlabel('fitted value')
            ax[1].set_ylabel('studentized residual')
            ax[2].scatter(np.log10(leverage), studentized_residual_tr, s=5, color=blue)
            ax[2].set_xlabel('$\log10{(leverage)}$')
            plt.suptitle('Residual Analysis (train set)')
            plt.show()
        
    def _prepare_matrix_vector(self, data, target):
        self.coef_labels = data.drop(columns=[target]).columns
        y = data[target].values
        X = data.drop(columns=[target]).values
        return (X, y)
    
    def _fit_and_evaluate(self, X_tr, X_te, y_tr, y_te, inverse_fn):
        # fit the model
        self.model.fit(X_tr, y_tr)
        
        # compute train/test predictions
        y_pred_tr = self.model.predict(X_tr)
        y_pred_te = self.model.predict(X_te) 
        
        # The target variable could have been transformed
        # Here, we apply the inverse transformation
        # WARNING: for naming clarity, we permutte the variable names
        y_transformed_pred_tr = y_pred_tr  # permutation
        y_transformed_pred_te = y_pred_te  # permutation
        if inverse_fn:
            y_pred_tr = inverse_fn(y_transformed_pred_tr) 
            y_pred_te = inverse_fn(y_transformed_pred_te) 
        
        # target values 
        y_transformed_tr = y_tr  # permutation
        y_transformed_te = y_te  # permutation 
        if inverse_fn:
            y_tr = inverse_fn(y_transformed_tr)
            y_te = inverse_fn(y_transformed_te)
        
        # evaluate the model
        self._evaluate(X_tr, X_te,
                       y_tr, y_te, 
                       y_pred_tr, y_pred_te,
                       y_transformed_tr, y_transformed_te,
                       y_transformed_pred_tr, y_transformed_pred_te)
                
    def _evaluate(self, 
                  X_tr, X_te,
                  y_tr, y_te, 
                  y_pred_tr, y_pred_te,
                  y_transformed_tr, y_transformed_te,
                  y_transformed_pred_tr, y_transformed_pred_te):
        
        self.results = {
            'data': {
                'X_tr': X_tr,
                'X_te': X_te,
                'y_tr': y_tr,
                'y_pred_tr': y_pred_tr,
                'y_te': y_te,
                'y_pred_te': y_pred_te,
                'y_transformed_tr': y_transformed_tr,
                'y_transformed_te': y_transformed_te,
                'y_transformed_pred_tr': y_transformed_pred_tr,
                'y_transformed_pred_te': y_transformed_pred_te
            },
            'target': {
                'train': {
                    'RSS' : rss(y_tr, y_pred_tr),    
                    'TSS' : tss(y_tr),
                    'R2'  : r2score(y_tr, y_pred_tr),
                    'MSE' : mse(y_tr, y_pred_tr),
                    'RMSE': rmse(y_tr, y_pred_tr),
                    'MAE' : mae(y_tr, y_pred_tr),
                    'MSE Baseline': mse(y_tr, np.mean(y_tr)),
                    'MAE Baseline': mae(y_tr, np.median(y_tr))
                },
                'test': {
                    'RSS' : rss(y_te, y_pred_te),    
                    'TSS' : tss(y_te),
                    'R2'  : r2score(y_te, y_pred_te),
                    'MSE' : mse(y_te, y_pred_te),
                    'RMSE': rmse(y_te, y_pred_te),
                    'MAE' : mae(y_te, y_pred_te),
                    'MSE Baseline': mse(y_te, np.mean(y_tr)),
                    'MAE Baseline': mae(y_te, np.median(y_tr))
                }
            },
            'transformed_target': {
                'train': {
                    'RSS' : rss(y_transformed_tr, y_transformed_pred_tr),    
                    'TSS' : tss(y_transformed_tr),
                    'R2'  : r2score(y_transformed_tr, y_transformed_pred_tr),
                    'MSE' : mse(y_transformed_tr, y_transformed_pred_tr),
                    'RMSE': rmse(y_transformed_tr, y_transformed_pred_tr),
                    'MAE' : mae(y_transformed_tr, y_transformed_pred_tr),
                    'MSE Baseline': mse(y_transformed_tr, np.mean(y_transformed_tr)),
                    'MAE Baseline': mae(y_transformed_tr, np.median(y_transformed_tr))    
                },
                'test': {
                    'RSS' : rss(y_transformed_te, y_transformed_pred_te),    
                    'TSS' : tss(y_transformed_te),
                    'R2'  : r2score(y_transformed_te, y_transformed_pred_te),
                    'MSE' : mse(y_transformed_te, y_transformed_pred_te),
                    'RMSE': rmse(y_transformed_te, y_transformed_pred_te),
                    'MAE' : mae(y_transformed_te, y_transformed_pred_te),
                    'MSE Baseline': mse(y_transformed_te, np.mean(y_transformed_tr)),
                    'MAE Baseline': mae(y_transformed_te, np.median(y_transformed_tr))
                }
            }
        }

In [8]:
# Util method to print model evaluation as stated in the tasks
def print_scores(agent):
    print('\nTrain Set Scores:')
    print('-----------------')
    print('RMSLE Baseline :', np.round(np.sqrt(agent.results['transformed_target']['train']['MSE Baseline']), 6))
    print('MAE Baseline :', np.round(agent.results['target']['train']['MAE Baseline'], 2))
    print('')
    print('RMSLE :', np.round(agent.results['transformed_target']['train']['RMSE'], 6))
    print('MAE   :',np.round(agent.results['target']['train']['MAE'], 2))
    print('R2    :', np.round(agent.results['target']['train']['R2'], 5))
    print('')
    print('Test Set Scores:')
    print('----------------')
    print('RMSLE Baseline :', np.round(np.sqrt(agent.results['transformed_target']['test']['MSE Baseline']), 6))
    print('MAE Baseline :', np.round(agent.results['target']['test']['MAE Baseline'], 2))
    print('')
    print('RMSLE :', np.round(agent.results['transformed_target']['test']['RMSE'], 6))
    print('MAE   :', np.round(agent.results['target']['test']['MAE'], 2))
    print('R2    :', np.round(agent.results['target']['test']['R2'], 5))

### GridSearch Class

The `GridSearch` class is responsible to find the best hypermater for Ridge and Lasso Regularization. It allow also to plot the validation curves.

Using this class is easy:

1. Instantiate a GridSearch object by passing it:
    * The prepared data
    * The target variable name
    * The inverse function if the target variable has been transformed
    
2. Call the `run()` method and pass:
    * The list of hyper parameters to try
    * The method 'Ridge'|'Lasso'
    * The max_iter parameter if required

3. Plot the validation curves by calling the `plot_validation_curves()` method
4. Get the best hyper parameter by calling the `best_alpha()` method

In [None]:
class GridSearch:
    
    def __init__(self, data, target, inverse_fn=None):
        self.df = data
        self.target = target
        self.inverse_fn = inverse_fn
        self.train_scores = []
        self.test_scores = []
        
    def run(self, alphas, method='Ridge', max_iter=None):
        self.alphas = alphas
        for alpha in self.alphas:
            if method == 'Ridge':
                model = Ridge(alpha, max_iter=max_iter)
            else:
                model = Lasso(alpha, max_iter=max_iter)
                
            ma = ModelAgent(model)
            ma.fit_and_evaluate(self.df, self.target, self.inverse_fn)
            
            # Scores
            if method == 'Ridge':
                if self.inverse_fn:
                    score_tr = ma.results['transformed_target']['train']['MSE']
                    score_te = ma.results['transformed_target']['test']['MSE']
                else:
                    score_tr = ma.results['target']['train']['MSE']
                    score_te = ma.results['target']['test']['MSE']
            else:
                score_tr = ma.results['target']['train']['MAE']
                score_te = ma.results['target']['test']['MAE']
            
            self.train_scores.append(score_tr)
            self.test_scores.append(score_te)
            
    def plot_validation_curves(self):
        plt.semilogx(self.alphas, self.train_scores, label='train curve')
        plt.semilogx(self.alphas, self.test_scores, label='test curve')
        plt.legend()
        plt.show()
        
    def best_alpha(self):
        idx = np.argmin(self.test_scores)
        return self.alphas[idx]