In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import pathlib
import pandas as pd
from tqdm.notebook import tqdm
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import accuracy_score
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, MaxAbsScaler, TargetEncoder
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import SGDClassifier, LogisticRegression
from sklearn.ensemble import VotingClassifier, RandomForestClassifier, HistGradientBoostingClassifier, AdaBoostClassifier, GradientBoostingClassifier
from xgboost import XGBClassifier

from src.utils.processor.dataframe_selector import DataFrameSelector

In [None]:
def update_dict(clfs: dict, clf_name:str, perf: dict, acc: float, metric: str):
    if clf_name in clfs:
        if clfs[clf_name].get(metric) < acc:
            clfs[clf_name].update(perf)
    else: 
        clfs[clf_name] = perf
    return clfs

## Data preparation

### Load data and remove duplicates

In [None]:
PROJECT_NAME = 'housing_price_clf'

DATA_DIR = pathlib.Path('.', 'data', PROJECT_NAME)
DATA_PATH = list(DATA_DIR.glob('train.csv'))

# load data
house_df = pd.read_csv(DATA_PATH[0], index_col=0) 

# drop any duplicated data
house_df.drop(columns=["Id", "YearBuilt", "YearRemodAdd", "GarageYrBlt", "MoSold", "YrSold"], inplace=True)
house_df.drop_duplicates(inplace=True)
house_df.info()

### Create train and test data
Goal is predicting whether a house is, 1 == expensive or 0 == not expensive and should therefor be set as y variable. 

the test data size we'll use is 20%

In [None]:
# check data distribution
house_df['Expensive'].value_counts()

From the value count we conclude that the data is imbalanced and take care to make a stratified sampling when making the train and test split of the data

In [None]:
# set data and label
labels = house_df.pop('Expensive')
features = house_df.copy()

In [None]:
# set test data size
test_size = .2

# split data into train and test
X, X_test, y, y_test = train_test_split(features, labels, test_size=test_size, stratify=labels)

## 1. Preprocessing pipeline

In [None]:
# make num_pipeline
num_cols = X.select_dtypes(exclude=['object', 'category']).columns

num_pipe = Pipeline([
    ('selector', DataFrameSelector(feature_names=num_cols)),
    ('imputer', KNNImputer()),
])

In [None]:
# make categorical pipeline
cat_cols = X.select_dtypes(include=['object', 'category']).columns

cat_pipe = Pipeline([
    ('selector', DataFrameSelector(feature_names=cat_col)),
    ('imputer', SimpleImputer(strategy='constant', fill_value='N_A')),
    ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
])

In [None]:
# build preprocessor pipe
preprocessor = ColumnTransformer(transformers=[
    ('num_pipe', num_pipe, make_column_selector(dtype_include='number')), 
    ('target_pipe', cat_pipe, make_column_selector(dtype_include=['object', 'category']))
])#.set_output(transform="pandas")
preprocessor

## 2. Modelling
We'll be training several models, test different settings and save the best performant model. Best performant model is the model with highest f1-score. We use f1-score here because our data is unbalanced towards not expensive (85.1%) but as this is a binary classification example, accuracy would work just as fine. 

In [None]:
# initialize the parameter dict
prep_param_grid = {
    "input__preprocessor__num_pipe__imputer__n_neighbors": range(2,10),
    "input__preprocessor__num_pipe__imputer__weights": ['uniform', 'distance'],
}

In [None]:
# create dict of scalers we want to test
scalers ={
    'no_scaler' : {},
    'standard_scaler' : {
        'scl' : StandardScaler(),
        'param_gird': {
            "input__scaler__with_mean": [True, False],
            "input__scaler__with_std": [True, False],
        }
    },
    'standard_scaler' : {
        'scl' : MaxAbsScaler(),
        'param_gird': {}
    }
}

In [None]:
# create dict of classifiers we want to test
classifiers ={
    'decision_tree' : {
        'clf' : DecisionTreeClassifier(),
        'param_grid': {
            'clf__max_depth': range(2, 100, 2),
            'clf__min_samples_split': range(2, 20),
            "clf__min_samples_leaf": range(3, 12, 2),
            'clf__criterion':['gini', 'entropy'],
        }
    },
    'linear' : {
        'clf' : SGDClassifier(),
        'param_grid': {
            'clf__loss': ['hinge', 'log_loss', 'modified_huber'],
        }
    },
    'random_forest' : {
        'clf' : RandomForestClassifier(),
        'param_grid': {
            'clf__max_depth': range(2, 100, 2),
            'clf__min_samples_split': range(2, 20),
            "clf__min_samples_leaf": range(3, 12, 2),
            'clf__criterion':['gini', 'entropy'],
        }
    },
    'regression' : {
        'clf' : LogisticRegression(max_iter=10000),
        'param_grid': {
            'clf__solver': ['liblinear'],
            'clf__penalty': ['l1', 'l2'],
        }
    },
    'hist_gradient_booster' : {
        'clf' : HistGradientBoostingClassifier(),
        'param_grid': {
            'clf__learning_rate': [x / 1000 for x in range(100, 1, -1)],
        }
    },
    'gradient_booster' : {
        'clf' : GradientBoostingClassifier(),
        'param_grid': {
            'clf__learning_rate': [x / 1000 for x in range(100, 1, -1)],
            'clf__n_estimators': range(100,1000,100)
        }
    },
     'ada_booster' : {
        'clf' : AdaBoostClassifier(),
        'param_grid': {
            'clf__learning_rate': [x / 1000 for x in range(100, 1, -1)]
        }
    },
    'xg_boost' : {
        'clf': XGBClassifier(objective='binary:logistic', tree_method='hist', device='cpu'),
        'param_grid': {
            "clf__n_estimators": range(100,1000,100),
            "clf__learning_rate":  [x / 1000 for x in range(100, 1, -1)],
            "clf__max_depth": range(2, 14, 2),
            "clf__min_child_weight": range(1, 8, 2)
        }
    }
}

In [None]:
# initialize a dict to store the best classifiers of each type
best_classifiers = {}

In [None]:
# look for best performing setting for each classifier
for clf_name, clf_params in tqdm(classifiers.items()):        
    for scaler, scl_param in tqdm(scalers.items(), desc=clf_name):
        # build input pipeline
        input_pipe = Pipeline([('preprocessor', preprocessor)])
        if scl_param:
            input_pipe = Pipeline([
                ('preprocessor', preprocessor),
                ('scaler', scl_param.get('scl', None))
            ])

        # build clf pipeline
        clf_pipeline = Pipeline([
            ('input', input_pipe), 
            ('clf', clf_params.get('clf', None)),
        ])
    
        # build param_grid
        param_grid = {}
        for d in [prep_param_grid, scl_param.get('param_grid'), clf_params.get('param_grid')]:
            if d:
                param_grid.update(d)

        # grid search best parameters
        grid_search = RandomizedSearchCV(
            estimator=clf_pipeline,
            param_distributions=param_grid,
            cv=5,
            scoring='f1',
            n_jobs=-1,
            verbose=0,
            n_iter=500
        )
        grid_search.fit(X, y)
        
        # store best estimator
        acc = accuracy_score(y_test, grid_search.predict(X_test))
        perf = {
            'clf': grid_search.best_estimator_,
            'acc': acc,
            'f1': grid_search.best_score_,
            'clf_pipeline': clf_pipeline,
        }
        best_classifiers = update_dict(best_classifiers, clf_name, perf, acc, metric='f1')

## 3. Model evaluation

In [None]:
for clf_name, clf_perf in best_classifiers.items():
    print(f'{clf_name}:\n - acc-score: {clf_perf.get("acc")};\n - f1-score: {clf_perf.get("f1")}')

In [None]:
voting_clf = VotingClassifier([(clf_name, clf.get('clf')) for clf_name, clf in best_classifiers.items()], voting='hard')
voting_clf.fit(X, y)

In [None]:
print(f'Prediction accuracy on the test data is: {accuracy_score(y_test, voting_clf.predict(X_test))}')

## 4. Submission

In [None]:
DATA_DIR = pathlib.Path('.', 'data')
DATA_PATH = list(DATA_DIR.glob('test.csv'))

# load data
test_df = pd.read_csv(DATA_PATH[0]) 

test_id = test_df.pop('Id')
test_features = test_df

test_result = pd.DataFrame({
    'Id':test_id,
    'Expensive' : voting_clf.predict(test_features)
})

test_result.to_csv('test_result.csv', index=False)