# ml_helpers


> This module contains several Python functions for running a quick ML prototype on your processed dataset. 

In [None]:
#| default_exp ml_helpers

In [None]:
#| hide
%matplotlib inline
%reload_ext autoreload
%autoreload 2

In [None]:
#| hide
from nbdev.showdoc import *
from fastcore.test import *

In [None]:
#| export
from __future__ import annotations
from that_ml_library.utils import *
from that_ml_library.chart_plotting import plot_permutation_importances,plot_confusion_matrix,plot_residuals,plot_feature_importances
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, cross_validate, train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier,AdaBoostClassifier
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor,AdaBoostRegressor
from sklearn.metrics import f1_score,accuracy_score,classification_report,log_loss
import numpy as np
import pandas as pd
import statsmodels.api as sm
import re

In [None]:
#| export
def run_logistic_regression(X_trn:pd.DataFrame, # Training dataframe
                            y_trn:pd.Series|np.ndarray, # Training label
                            multi_class='multinomial', # sklearn's log reg multiclass option
                            solver='newton-cg', # sklearn's log reg solver option
                            penalty=None, # sklearn's log reg penalty option
                            max_iter=10000 # sklearn's log reg max iteration option
                           ):
    "Perform Sklearn logistic regression, then print coefficients and classification report"
    model = LogisticRegression(random_state=0, multi_class=multi_class, 
                               penalty=penalty, solver=solver,max_iter=max_iter).fit(X_trn, y_trn)
    preds = model.predict(X_trn)
    prob_preds = model.predict_proba(X_trn)
    print('-'*100)
    print('Intercept: \n', model.intercept_)
    print('Coefficients: \n', model.coef_)
    print('Coefficients exp :\n',np.exp(model.coef_))

    print('-'*100)
    print('Log loss: ',log_loss(y_trn,prob_preds))
    print('-'*100)
    print(classification_report(y_trn,preds))
    return model

In [None]:
show_doc(run_logistic_regression)

---

[source](https://github.com/anhquan0412/that-ml-library/blob/main/that_ml_library/ml_helpers.py#L24){target="_blank" style="float:right; font-size:smaller"}

### run_logistic_regression

>      run_logistic_regression (X_trn:pandas.core.frame.DataFrame,
>                               y_trn:pandas.core.series.Series|numpy.ndarray,
>                               multi_class='multinomial', solver='newton-cg',
>                               penalty=None, max_iter=10000)

Perform Sklearn logistic regression, then print coefficients and classification report

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| X_trn | pd.DataFrame |  | Training dataframe |
| y_trn | pd.Series \| np.ndarray |  | Training label |
| multi_class | str | multinomial | sklearn's log reg multiclass option |
| solver | str | newton-cg | sklearn's log reg solver option |
| penalty | NoneType | None | sklearn's log reg penalty option |
| max_iter | int | 10000 | sklearn's log reg max iteration option |

In [None]:
#| export
def run_multinomial_statmodel(X_trn:pd.DataFrame, # Training dataframe
                              y_trn:pd.Series|np.ndarray, # Training label
                              add_constant=True # To add a constant column to X_trn
                             ):
    "Perform multinominal logit from statsmodel, then print results and classification report"
    if add_constant:
        X_trn = sm.add_constant(X_trn)
    logit_model=sm.MNLogit(y_trn,X_trn)
    result=logit_model.fit()
    stats1=result.summary()
    print(stats1)
    prob_preds = logit_model.predict(params = result.params.values)
    print('-'*100)
    print('Log loss: ',log_loss(y_trn,prob_preds))
    print('-'*100)
    print(classification_report(y_trn,np.argmax(prob_preds,axis=1)))
    return logit_model

In [None]:
show_doc(run_multinomial_statmodel)

---

[source](https://github.com/anhquan0412/that-ml-library/blob/main/that_ml_library/ml_helpers.py#L48){target="_blank" style="float:right; font-size:smaller"}

### run_multinomial_statmodel

>      run_multinomial_statmodel (X_trn:pandas.core.frame.DataFrame,
>                                 y_trn:pandas.core.series.Series|numpy.ndarray,
>                                 add_constant=True)

Perform multinominal logit from statsmodel, then print results and classification report

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| X_trn | pd.DataFrame |  | Training dataframe |
| y_trn | pd.Series \| np.ndarray |  | Training label |
| add_constant | bool | True | To add a constant column to X_trn |

In [None]:
#| export
def run_sklearn_model(model_name:str, # sklearn's Machine Learning model to try. Currently support DecisionTree,AdaBoost,RandomForest
                      model_params:dict, # A dictionary containing model's hyperparameters
                      X_trn:pd.DataFrame, # Training dataframe
                      y_trn:pd.Series|np.ndarray, # Training label
                      is_regression=False, # To use regression model or classification model
                      class_names:list=None, # List of names associated with the labels (same order); e.g. ['no','yes']. For classification only
                      test_split=None, # Test set split. If float: random split. If list of list: indices of train and test set. If None: skip splitting
                      metric_funcs={}, # Dictionary of metric functions: {metric_name:metric_func}
                      seed=42, # Random seed
                      plot_fea_imp=True # To whether plot sklearn's feature importances. Set to False to skip the plot
                    ):
    np.random.seed(seed)
    if isinstance(test_split,float):
        X_trn,X_test,y_trn,y_test = train_test_split(X_trn,y_trn,test_size=test_split,random_state=seed)
    if isinstance(test_split,list) and len(test_split)==2 and isinstance(test_split[0],list) and isinstance(test_split[1],list):
        X_test = X_trn.iloc[test_split[1]].copy()
        y_test = y_trn[test_split[1]]
        y_trn = y_trn[test_split[0]]
        X_trn = X_trn.iloc[test_split[0]]

    if model_name=='DecisionTree':
        if is_regression:
            _model = DecisionTreeRegressor(random_state=seed,**model_params)
        else:
            _model = DecisionTreeClassifier(random_state=seed,**model_params)
    elif model_name=='AdaBoost':
        dt_params={k.split('__')[1]:v for k,v in model_params.items() if 'base_estimator' in k}
        abc_params={k:v for k,v in model_params.items() if 'base_estimator' not in k}        
        print(f'Decision Tree params: {dt_params}')
        print(f'AdaBoost params: {abc_params}')
        if is_regression:
            dt = DecisionTreeRegressor(random_state=seed,**dt_params)
            _model = AdaBoostRegressor(base_estimator=dt,random_state=seed,algorithm='SAMME',**abc_params)
        else:
            dt = DecisionTreeClassifier(random_state=seed,**dt_params)
            _model = AdaBoostClassifier(base_estimator=dt,random_state=seed,**abc_params)
    elif model_name=='RandomForest':
        if is_regression:
            _model = RandomForestRegressor(random_state=seed,**model_params)
        else:
            _model = RandomForestClassifier(random_state=seed,**model_params)
    else:
        print('Unsupported model')
        return

    _model = _model.fit(X_trn,y_trn)
    pred_trn = _model.predict(X_trn)
    
    # For regression:
    if is_regression:
        print('-'*30 + ' Train set ' + '-'*30)
        for k,v in metric_funcs.items():
            print(f'{k}: {v(y_trn,pred_trn)}')
        
        if test_split is not None:
            print('-'*30 + ' Test set ' + '-'*30)
            pred_test = _model.predict(X_test)
            for k,v in metric_funcs.items():
                print(f'{k}: {v(y_test,pred_test)}')
            
            # plot residual plot
            plot_residuals(_model,X_trn,y_trn,X_test,y_test,
                           is_fit=True,
                           qqplot=True)
        else:
            plot_residuals(_model,X_trn,y_trn,
                           is_fit=True,
                           qqplot=True)

    # For classification
    else:
        prob_trn = _model.predict_proba(X_trn)
        print('-'*30 + ' Train set ' + '-'*30)
        print(f'Log loss: {log_loss(y_trn,prob_trn)}')
#         for k,v in metric_funcs.items():
#             print(f'{k}: {v(y_trn,prob_trn)}')
        print(classification_report(y_trn, pred_trn, target_names=class_names))
        if test_split is not None:
            print('-'*30 + ' Test set ' + '-'*30)
            pred_test = _model.predict(X_test)
            prob_test = _model.predict_proba(X_test)
            print(f'Log loss: {log_loss(y_test,prob_test)}')
            print(classification_report(y_test, pred_test, target_names=class_names))
            print('-'*100)
            df2 = pd.DataFrame({'Class': class_names,
                                'True Distribution':pd.Series(y_test).value_counts(normalize=True).sort_index(),
                               'Prediction Distribution':pd.Series(pred_test).value_counts(normalize=True).sort_index()}
                              )
            print(df2)
            plot_confusion_matrix(y_test,pred_test,class_names)


#     _ = plot_permutation_importances(_model,X_trn,y_trn,scoring=['neg_root_mean_squared_error'])
    if plot_fea_imp:
        plot_feature_importances(_model.feature_importances_,X_trn.columns.values)
    
    return _model

In [None]:
show_doc(run_sklearn_model)

---

[source](https://github.com/anhquan0412/that-ml-library/blob/main/that_ml_library/ml_helpers.py#L67){target="_blank" style="float:right; font-size:smaller"}

### run_sklearn_model

>      run_sklearn_model (model_name:str, model_params:dict,
>                         X_trn:pandas.core.frame.DataFrame,
>                         y_trn:pandas.core.series.Series|numpy.ndarray,
>                         is_regression=False, class_names:list=None,
>                         test_split=None, metric_funcs={}, seed=42,
>                         plot_fea_imp=True)

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| model_name | str |  | sklearn's Machine Learning model to try. Currently support DecisionTree,AdaBoost,RandomForest |
| model_params | dict |  | A dictionary containing model's hyperparameters |
| X_trn | pd.DataFrame |  | Training dataframe |
| y_trn | pd.Series \| np.ndarray |  | Training label |
| is_regression | bool | False | To use regression model or classification model |
| class_names | list | None | List of names associated with the labels (same order); e.g. ['no','yes']. For classification only |
| test_split | NoneType | None | Test set split. If float: random split. If list of list: indices of train and test set. If None: skip splitting |
| metric_funcs | dict | {} | Dictionary of metric functions: {metric_name:metric_func} |
| seed | int | 42 | Random seed |
| plot_fea_imp | bool | True | To whether plot sklearn's feature importances. Set to False to skip the plot |

In [None]:
#| export
def tune_sklearn_model(model_name:str, # sklearn's Machine Learning model to try. Currently support DecisionTree,AdaBoost,RandomForest,
                      param_grid:dict, # Dictionary with parameters names (str) as keys and lists of parameter settings to try as values
                      X_trn:pd.DataFrame, # Training dataframe
                      y_trn:pd.Series|np.ndarray, # Training label
                      is_regression=False, # Is it a regression problem, or classification?
                      custom_cv=5, # sklearn's cross-validation splitting strategy
                      random_cv_iter=None, # Number of parameter settings that are sampled. Use this if you want to do RandomizedSearchCV
                      scoring=None, # Metric
                      seed=42, # Random seed
                      rank_show=10, # Number of ranks to show (descending order)
                      show_split_scores=True, # To show both train and test split scores
                     ):
    "Perform either Sklearn's Grid Search or Randomized Search (based on random_cv_iter) of the model using param_grid"
    if is_regression:
        if model_name=='DecisionTree':
            _model = DecisionTreeRegressor(random_state=seed)
        elif model_name=='AdaBoost':
            dt = DecisionTreeRegressor(random_state=seed)
            _model = AdaBoostRegressor(base_estimator= dt,random_state=seed)
        elif model_name=='RandomForest':
            _model = RandomForestRegressor(random_state=seed)
        else:
            print('Unsupported model')
            return
    else:
        if model_name=='DecisionTree':
            _model = DecisionTreeClassifier(random_state=seed)
        elif model_name=='AdaBoost':
            dt = DecisionTreeClassifier(random_state=seed)
            _model = AdaBoostClassifier(base_estimator= dt,random_state=seed,algorithm='SAMME')
        elif model_name=='RandomForest':
            _model = RandomForestClassifier(random_state=seed)
        else:
            print('Unsupported model')
            return
    
    scoring = val2list(scoring)
    search_cv,default_cv = do_param_search(X_trn,y_trn,_model,param_grid,cv=custom_cv,scoring=scoring,random_cv_iter = random_cv_iter,seed=seed)
    # Default to show results for the first metric
    show_both_cv(search_cv,default_cv,scoring[0],rank_show,show_split_scores)
    return search_cv

In [None]:
show_doc(tune_sklearn_model)

---

[source](https://github.com/anhquan0412/that-ml-library/blob/main/that_ml_library/ml_helpers.py#L166){target="_blank" style="float:right; font-size:smaller"}

### tune_sklearn_model

>      tune_sklearn_model (model_name:str, param_grid:dict,
>                          X_trn:pandas.core.frame.DataFrame,
>                          y_trn:pandas.core.series.Series|numpy.ndarray,
>                          is_regression=False, custom_cv=5,
>                          random_cv_iter=None, scoring=None, seed=42,
>                          rank_show=10, show_split_scores=True)

Perform either Sklearn's Grid Search or Randomized Search (based on random_cv_iter) of the model using param_grid

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| model_name | str |  | sklearn's Machine Learning model to try. Currently support DecisionTree,AdaBoost,RandomForest, |
| param_grid | dict |  | Dictionary with parameters names (str) as keys and lists of parameter settings to try as values |
| X_trn | pd.DataFrame |  | Training dataframe |
| y_trn | pd.Series \| np.ndarray |  | Training label |
| is_regression | bool | False | Is it a regression problem, or classification? |
| custom_cv | int | 5 | sklearn's cross-validation splitting strategy |
| random_cv_iter | NoneType | None | Number of parameter settings that are sampled. Use this if you want to do RandomizedSearchCV |
| scoring | NoneType | None | Metric |
| seed | int | 42 | Random seed |
| rank_show | int | 10 | Number of ranks to show (descending order) |
| show_split_scores | bool | True | To show both train and test split scores |

In [None]:
#| export
def do_param_search(
    X_train,y_train,
    estimator,
    param_grid,
    random_cv_iter=None,
    include_default=True,
    cv=None,
    scoring=None,
    seed=42
    
):
    scoring = val2list(scoring)
    search_cv,default_cv=None,None
    if random_cv_iter:
        search_cv = RandomizedSearchCV(estimator=estimator,
                                      n_iter=random_cv_iter,
                                      param_distributions=param_grid,
                                      scoring=scoring,
                                      n_jobs=-1,
                                      cv=cv,
                                        return_train_score=True,
                                      verbose=1,refit=False,random_state=seed)
        search_cv.fit(X_train,y_train)
    else:
        search_cv = GridSearchCV(estimator,param_grid,scoring=scoring,n_jobs=-1,cv=cv,verbose=1,
                                 return_train_score=True,refit=False)
        search_cv.fit(X_train,y_train)
    if include_default:
        default_cv = cross_validate(estimator,X_train,y_train,scoring=scoring,cv=cv,n_jobs=-1,verbose=1,
                                   return_train_score=True)
    return search_cv.cv_results_,default_cv
        

def summarize_cv_results(search_cv,scoring,top_n=10,show_split_scores=False):
    num_split = len([c for c in search_cv.keys() if re.search(r'split\d_test', c)])
    search_cv = pd.DataFrame(search_cv)
    search_cv = search_cv.sort_values(f'rank_test_{scoring}')
    for rec in search_cv[['params',f'mean_train_{scoring}',f'std_train_{scoring}',
                          f'mean_test_{scoring}',f'std_test_{scoring}',f'rank_test_{scoring}']+
                         [f'split{i}_train_{scoring}' for i in range(num_split)] +
                         [f'split{i}_test_{scoring}' for i in range(num_split)]
                        ].values[:top_n]:
        print('-'*10)
        print(f'Rank {rec[5]}')
        print(f'Params: {rec[0]}')
        if show_split_scores:
            print(f'Train scores: {[round(i,2) for i in rec[6:6+num_split]]}')
        print(f'Mean train score: {rec[1]:.3f} +- {rec[2]:.3f}')
        if show_split_scores:
            print(f'Test scores:  {[round(i,2) for i in rec[-num_split:]]}')
        print(f'Mean test score: {rec[3]:.3f} +- {rec[4]:.3f}')

def summarize_default_cv(default_cv,s):
    print('-'*10)
    print("Default Params")
    print(f"Mean train score: {round(default_cv[f'train_{s}'].mean(),3)} +- {round(default_cv[f'train_{s}'].std(),3)}")
    print(f"Mean test score: {round(default_cv[f'test_{s}'].mean(),3)} +- {round(default_cv[f'test_{s}'].std(),3)}")

def show_both_cv(search_cv,default_cv,scoring,top_n=10,show_split_scores=False):
    summarize_cv_results(search_cv,scoring,top_n,show_split_scores)
    summarize_default_cv(default_cv,scoring)

    
def get_adaboost_info(dt_params,ada_params,X,y,seed=42):
    dt = DecisionTreeClassifier(random_state=seed,**dt_params)
    abc = AdaBoostClassifier(base_estimator=dt,random_state=seed,**ada_params)
    abc.fit(X,y)
    for i,t in enumerate(abc.estimators_):
        print(f'{t}\n\tTree depth: {t.tree_.max_depth}, Weight: {abc.estimator_weights_[i]}, Error: {abc.estimator_errors_[i]}')
    return abc

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()