In [1]:
import seaborn as sns
import matplotlib.pyplot as plt
import os
import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype

In [2]:
path= '../../datasets/main_data/bank-additional-full.csv'
full_bank = pd.read_csv(path, sep=';')

#### Module Data.py codes

In [3]:
%%writefile data.py
# %%writefile ../scripts/data.py


import seaborn as sns
import matplotlib.pyplot as plt
import os
import numpy as  np
import pandas as pd
from pandas.api.types import is_numeric_dtype
from sklearn.ensemble import IsolationForest
import os
from sklearn.preprocessing import RobustScaler, StandardScaler

def load_data(path="", sep=",", cols_to_drop=[]):
    data = pd.read_csv(path, sep)
    for col in cols_to_drop:
        data.drop(col, axis=1, inplace=True)
    return data

def check_outliers(data, show_plot=False, save_img=os.getcwd()+'/outliers.png'):
    
    """
    This functions checks for columns with outlers using the IQR method
    
    It accespts as argmuent a dataset. 
    show_plot can be set to True to output pairplots of outlier columns    
    """
    
    outliers = [] 
    Q1 = data.quantile(0.25)  
    Q3 = data.quantile(0.75)
    IQR = Q3 - Q1
    num_data = data.select_dtypes(include='number')
    result = dict ((((num_data < (Q1 - 1.5 * IQR)) | (num_data > (Q3 + 1.5 * IQR)))==True).any())
    #data[(data[col] >= high)|(data[col] <= low)].index
    index = data[(num_data < Q1 - 1.5 * IQR) | (num_data > Q3 + 1.5 * IQR)].index
    for k,v in result.items():
        if v == True:  
            outliers.append(k)
    if show_plot:
        pair_plot = sns.pairplot(data[outliers]);
        print(f'{result},\n\n Visualization of outlier columns')
        plt.savefig(fname=save_img, format='png')
        return pair_plot
    else:
        return data.loc[index, outliers]
    
    

def treat_outliers(data, type='median_replace'):
    
    """
    This treat outliers using any ofthses 3 methods as specified by user
    
        1. median_replace -  median replacement
        
        2. quant_floor - quantile flooring
        
        3. trim - trimming 
        
        4. log_transform - log transformations
    
    The methods are some of the commont statistical methods in treating outler
    columns
    
    By default treatment type is set to median replacement

    """
    
    if type == "median_replace":
        
        for col in data.columns.tolist():
            if is_numeric_dtype(data[col]):
                median = (data[col].quantile(0.50))
                print(median)
                q1 = data[col].quantile(0.25)
                q3 = data[col].quantile(0.75)
                iqr = q3 - q1
                high = int(q3 + 1.5 * iqr) 
                low = int(q1 - 1.5 * iqr)
                print(high, low, iqr)
                print(col)
                data[col] = np.where(data[col] > high, median, data[col])
                data[col] = np.where(data[col] > high, median, data[col])        
    
    if type == "quant_floor":
        
        for col in data.columns.tolist():
            if is_numeric_dtype(data[col]):
                q_10 = data[col].quantile(0.5)
                q_90 = data[col].quantile(0.95)
                data[col] =  data[col] = np.where(data[col] < q_10, q_10 , data[col])
                data[col] =  data[col] = np.where(data[col] > q_90, q_90 , data[col])
            
    if type == "trim":
        
        for col in data.columns.tolist():
            low = .05
            high = .95
            quant_df = data.quantile([low, high])
            for name in list(data.columns):
                if is_numeric_dtype(data[name]):
                    data = data[(data[name] >= quant_df.loc[low, name]) 
                        & (data[name] <= quant_df.loc[high, name])]
            
    if type == "log_transform":  
        for col in data.columns.tolist():
            if is_numeric_dtype(data[col]):
                data[col] = data[col].map(lambda i: np.log(i) if i > 0 else 0)
                
    if type == "isf": 
        iso = IsolationForest(contamination=0.1)
        yhat = iso.fit_predict(data.select_dtypes(exclude='object'))
        #select all rows that are not outliers
        mask = yhat != -1 
        data = data[mask]
        

    return data 


def scale_data(data,scaler=RobustScaler()):
    
    """
    Specify scaler type, scaler type must have fit_transform as a method
    
    """
    data_scaled = scaler.fit_transform(data)
    return data_scaled

Overwriting data.py


#### Module Plot.py codes

In [4]:
%%writefile plot.py 
import seaborn as sns
import matplotlib.pyplot as plt
import os
import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype
import os

def plot_univariate (data, x=None, y=None, color='r',save=False,
                title='New Chart', chart_type='hist', xlabel='', ylabel='',
                    save_to=os.getcwd(), log_normalise=False):
    
    
    """
    Make a univariate plot of any of these selcted types:
    
    1. bar - barchart
    
    2. hist - Histogram
    
    3. pie - Piechart
    
    4. count - Countplot
    
    
    """
    
    plt.subplots(figsize=(10,7))
    plt.title(title, fontsize=18)
    plt.xlabel(xlabel, fontsize=15)
    plt.ylabel(ylabel, fontsize=15)
    plt.xticks(fontsize=12)
    plt.yticks(fontsize=12)
    
    
    if chart_type == 'hist':
        if log_normalise:
            data = np.log(data)
        plot = sns.distplot(a=data, color=color)
        if save:
            plt.savefig(fname=save_to+f'/{title}.png', format='png')
        
    return plot

def plot_bivariate(data, x=None, y=None, hue=None, 
                  color='r',save=False,
                title='New Chart', chart_type='hist',
                   xlabel='', ylabel='',
                    save_to=os.getcwd(), img_name = " ", 
                   palette={'use':False, "size":1}, log_normalise=False,
                  kind_joint_plot = 'scatter', kind_pair_plot="scatter", figsize=(10,7)):
    
    """
    Make a bivariate plot of any of the selcted types:
    
    1. bar - barchart
    
    2. scatter  - scatter plot
    
    3. cat  - catplot
    
    4. count - countplot
    
    5 joint - jointplot 
    
    6  pair - pairplot
    
    7  corr - corr_plot
    
    When calling joint_plot:
        
        kind_joint_plot is default to `scatter`
        other types include "reg", "reside", "kde", "hex"
        
    When calling pair_plot:
        
        kind_pair_plot is default to `scatter`
        other types include 'reg'
    """
    def plt_tweaks():
        plt.subplots(figsize= figsize)
        plt.title(title, fontsize=18)
        plt.xlabel(xlabel, fontsize=15)
        plt.ylabel(ylabel, fontsize=15)
        plt.xticks(fontsize=12)
        plt.yticks(fontsize=12)
    
    
    # define helper functions
    
    def use_palette():
        palettes = []
#        palette_to_use=[]
        if palette['use'] == True:
            palette_to_use = [palettes[i] for i in range(palette['size'])]
            
            return palette_to_use

    def log_norm():
        if log_normalise and y != None:
            y = np.log(y)
        elif log_normalise and y == None:
            data = np.log(data)
            
    def save_image():
        if save:
            if img_name != " ":
                plt.savefig(fname=save_to+"/"+img_name+'.png', format='png')
            else:
                plt.savefig(fname=save_to+f'/{title}.png', format='png')
                
        
    # make plots
    
    if chart_type == "joint":
        log_norm()
        plot = sns.jointplot(x=x, y=y, data=data,
                            height=6, ratio=5, space=0.2, kind=kind_joint_plot)
        
        save_image()
        
    if chart_type == "pair":
       # try:
        log_norm()
        if palette['use'] == True:
            palette_to_use = use_palette()
            plot = sns.pairplot(data, palette=palette_to_use, 
                            kind= kind_pair_plot,height=3, aspect=1, hue=hue)
        else:
             plot = sns.pairplot(data, 
                            kind= kind_pair_plot,height=2.5, aspect=1, hue=hue, )
        save_image()
        
    if chart_type  == "corr":
        plt_tweaks()
        corr_data = data.corr()
        corr_plot = sns.heatmap(corr_data,annot=True, fmt='.2g', center=0) 

def plot_univariate (data, x=None, y=None, color='r',save=False,
                title='New Chart', chart_type='hist', xlabel='', ylabel='',
                    save_to=os.getcwd(), log_normalise=False):
    
    
    """
    Make a univariate plot of any of these selcted types:
    
    1. bar - barchart
    
    2. hist - Histogram
    
    3. pie - Piechart 
    
    4. count - Countplot
    
    
    """
    
    plt.subplots(figsize=(10,7))
    plt.title(title, fontsize=18)
    plt.xlabel(xlabel, fontsize=15)
    plt.ylabel(ylabel, fontsize=15)
    plt.xticks(fontsize=12)
    plt.yticks(fontsize=12)
    
    
    if chart_type == 'hist':
        if log_normalise:
            data = np.log(data)
        plot = sns.distplot(a=data, color=color)
        if save:
            plt.savefig(fname=save_to+f'/{title}.png', format='png')
        
    return plot

Overwriting plot.py


#### Module Model.py Codes

In [11]:
from sklearn.model_selection import KFold, StratifiedKFold, GridSearchCV

In [None]:
GridSearchCV()

In [10]:
%%writefile model.py

import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from sklearn.linear_model import LogisticRegressionCV, LogisticRegression
from sklearn.model_selection import StratifiedKFold
from imblearn.over_sampling import SMOTE, _random_over_sampler
from sklearn.preprocessing import OneHotEncoder
from imblearn.pipeline import Pipeline as ImbPipe
from sklearn.model_selection import KFold, StratifiedKFold, GridSearchCV


from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score, precision_score, recall_score
from sklearn.metrics import roc_curve, roc_auc_score, precision_recall_curve
from sklearn.model_selection import cross_val_predict, cross_val_score, cross_validate

def plot_pca_components(data):
    pca = PCA().fit(data)
    plt.plot(np.cumsum(pca.explained_variance_ratio_))
    plt.xlabel('number of components')
    plt.ylabel('cumulative explained variance');
    
def check_imbalance(data,label='', x=0.7, y=30000):
    plt.subplots(figsize=(10,8))
    data[label].value_counts().plot(kind='bar')
    text = f'Class Imbalance Count:\n\n{data[label].value_counts().to_dict()}'
    plt.text(x=x, y=y, s = text ,  fontsize=15)
    
def encode (data):
    ohe = OneHotEncoder(sparse=False, handle_unknown='ignore', )
    to_encode = data.select_dtypes(exclude='number')
    if data.shape[1] > 1:
        #ohe = MultiLabelBinarizer()
        data.drop(to_encode.columns.tolist(), axis=1, inplace = True)
        features_cat_encode = pd.DataFrame(ohe.fit_transform(to_encode))
        data = data.merge(features_cat_encode, left_index=True, right_index=True)
        #print(ohe.classes_) 
    else:
        data = pd.DataFrame(ohe.fit_transform(to_encode))
        print(ohe.categories_) 
    return data 

 

def x_y_split(data, x=None, y=None, type_="single", test_size=.10):
    
    """
    Single type divides into just x and y
    Double type divides into train and test for each of x and y
    """
    
    X, y = data.drop(columns=y, axis=1), data[y]
    
    if type_ == "single":
        
        return X, y
    
    if type == "double":
        X_train, X_test, y_train, y_test = train_test_split(X, y,
                                               test_size=test_size, random_state=123)
        
        return X_train, X_test, y_train, y_test
    
    
    
def model_pipeline(X_train=None, y_train=None, X_test=None, pca=PCA(), 
                   cv=StratifiedKFold(), imb_sample=SMOTE(random_state=123),
                  model=LogisticRegressionCV()):
    
    """
    Trains a model for an imbalanced class using the specified estimator
    The training is done in K-folds or its nuances as specified folds 
    applying the specified sampling strategy
    """
    
    model = ImbPipe([('imb_sample', imb_sample), ('pca', pca), ('model', model)])
    model.fit(X_train, y_train) 
    y_hat = model.predict(X_test) 
    return model, y_hat
    
    
def gridSearch(model,hyper_params={},cv=StratifiedKFold(), x_train=None, y_train=None):
    
    """
    Performs GridSeach of the best hyperparmaters for the passed model
    """
    
    search = GridSearchCV(model=model, param_grid = hyper_params, n_jobs=-1, cv=cv)
    search.fit(X=x_train, y=y_train)
    print("Best parameter (CV score=%0.3f):\n" % search.best_score_)
    print(search.best_params_)
    print(search.score) 
    return search


def plot_grid_search(search_obj, pca_obj, X_train):
    
    """
    Prints the best (optimised) hyperparmatersfor the grid search object
    and plots the optimised pca components
    """
    
    print("Best parameter (CV score=%0.3f):\n" % search.best_score_)
    print("Best Params:",search.best_params_)
    pca.fit(X_train_scaled)

    fig, (ax0, ax1) = plt.subplots(nrows=2, sharex=True, figsize=(8, 8))
    ax0.plot(np.arange(1, pca.n_components_ + 1),
             pca.explained_variance_ratio_, '+', linewidth=2)
    ax0.set_ylabel('PCA explained variance ratio')

    ax0.axvline(search.best_estimator_.named_steps['pca'].n_components,
                linestyle=':', label='n_components chosen')
    ax0.legend(prop=dict(size=12))

    # For each number of components, find the best classifier results
    results = pd.DataFrame(search.cv_results_)
    components_col = 'param_pca__n_components'
    best_clfs = results.groupby(components_col).apply(
        lambda g: g.nlargest(1, 'mean_test_score'))

    best_clfs.plot(x=components_col, y='mean_test_score', yerr='std_test_score',
                   legend=False, ax=ax1)
    ax1.set_ylabel('Classification accuracy (val)')
    ax1.set_xlabel('n_components')

    plt.xlim(-1, 70)

    plt.tight_layout()
    plt.show() 
    

class metrics ():
    
    def __init__(self, y_test, y_hat):
        pass
        self.y_test = y_test
        self.y_hat =  y_hat
        
    
    def class_report(self):
        
        full_report = classification_report(self.y_test, self.y_hat)
        
        print(full_report)
        
    def conf_matrix(self):
        
        conf_matrix = confusion_matrix(self.y_test, self.y_hat)
        
        conf_matrix_df = pd.DataFrame(conf_matrix, columns=['Actual_+ve', 'Actual_-ve'],
                               index=['predicted_+ve', 'predicted_-ve'])
        
        return conf_matrix_df
    
    def accuracy_score(self):
        return  accuracy_score(self.y_test, self.y_hat)
    
    def classification_error(self):
        
        return 1 - accuracy_score() 
        
    def specif_sensitiv(self):
        
        """
        Sensitivity: When the actual value is positive, how often is the prediction correct?
        
        Specificity: When the actual value is negative, how often is the prediction correct?
        """
        
        conf_matrix = confusion_matrix(self.y_test, self.y_hat)
        
        TP = conf_matrix[1, 1]
        TN = conf_matrix[0, 0]
        FP = conf_matrix[0, 1]
        FN = conf_matrix[1, 0]
        
        sensitivity = TP / float(FN + TP)
        specificity = TN / (TN + FP)
        
        sensitiv_specific_table = pd.DataFrame([[sensitivity, specificity]],
                                               columns=['sensitivity', 'specificity'])
        
        return sensitiv_specific_table

Overwriting model.py


In [6]:
%%writefile main.py
# %%writefile ../scripts/data.py

import pandas as pd
import numpy as np
%matplotlib inline 
import matplotlib.pyplot as plt
import seaborn as sns 
from pandas.api.types import is_numeric_dtype 

from plot import plot_univariate, plot_bivariate
from data import check_outliers, treat_outliers, scale_data, load_data
from model import check_imbalance, plot_pca_components, encode, x_y_split

from sklearn.preprocessing import StandardScaler,RobustScaler, MinMaxScaler
from sklearn.preprocessing import OneHotEncoder, MultiLabelBinarizer
from imblearn.over_sampling import SMOTE, _random_over_sampler
from sklearn.decomposition import PCA
from sklearn.feature_selection import f_classif, from_model, SelectKBest,chi2, mutual_info_classif
from sklearn.model_selection import train_test_split
from sklearn.ensemble import IsolationForest # outlier detection and re,oval
from collections import Counter

from sklearn.ensemble import ExtraTreesClassifier, RandomForestClassifier
from sklearn.model_selection import KFold, StratifiedKFold, GridSearchCV
import xgboost
from sklearn.linear_model import LogisticRegression, LogisticRegressionCV
from sklearn.naive_bayes import GaussianNB
from sklearn.neural_network import MLPClassifier
from sklearn.base import BaseEstimator, TransformerMixin

from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score, precision_score, recall_score
from sklearn.metrics import roc_curve, roc_auc_score, precision_recall_curve
from sklearn.model_selection import cross_val_predict, cross_val_score, cross_validate

from sklearn.ensemble import ExtraTreesClassifier, RandomForestClassifier
from sklearn.model_selection import KFold, StratifiedKFold, GridSearchCV
import xgboost
from sklearn.linear_model import LogisticRegression, LogisticRegressionCV
from sklearn.naive_bayes import GaussianNB
from sklearn.neural_network import MLPClassifier
from sklearn.base import BaseEstimator, TransformerMixin

from sklearn.pipeline import Pipeline
from sklearn.pipeline import FeatureUnion
from imblearn.pipeline import Pipeline as ImbPipe
import joblib


if __name__ == "__main__":
    

Overwriting main.py
