In [1]:
import joblib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import statsmodels.api as sm
import statsmodels.formula.api as smf
import time
import warnings

from scipy.stats import pearsonr, shapiro, levene
from sklearn.cluster import KMeans
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import ConfusionMatrixDisplay, classification_report, f1_score, mean_squared_error, r2_score, confusion_matrix
from sklearn.model_selection import GridSearchCV, train_test_split, validation_curve, learning_curve
from sklearn.neighbors import KNeighborsClassifier
from sklearn import neighbors
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
from statsmodels.stats.outliers_influence import variance_inflation_factor
from xgboost import XGBClassifier

# Ignore warnings
warnings.simplefilter(action="ignore")

# Set seaborn visual style
sns.set()

In [2]:
def data_scaler(df, columns=None, method="standard", scaler_all=False):
    """
    Scale the specified columns of the dataframe using the chosen method.
    Parameters:
    df (DataFrame): Input dataframe.
    columns (list): Columns to be scaled.
    method (str): Scaling method ("standard", "minmax", "robust", or "log").
    scaler_all (bool): If True, scale all columns in the dataframe.
    Returns:
    DataFrame: Scaled dataframe.
    """
    if columns is None and not scaler_all:
        raise ValueError("Specify the columns to scale or enable the 'scaler_all' option to scale all columns.")
    
    if scaler_all:
        columns = df.columns.tolist()
    
    if method == "standard":
        scaler = StandardScaler()
    elif method == "minmax":
        scaler = MinMaxScaler()
    elif method == "robust":
        scaler = RobustScaler()
    elif method == "log":
        def log_scaler(data):
            return np.log1p(data)
        scaler = log_scaler
    else:
        raise ValueError("Invalid scaling method. Possible values are 'standard', 'minmax', 'robust', and 'log'.")
    
    if method == "log":
        df_scaled = df[columns].apply(scaler)
    else:
        scaler.fit(df[columns])
        scaled_data = scaler.transform(df[columns])
        df_scaled = pd.DataFrame(scaled_data, columns=columns, index=df.index)
    
    df = pd.concat([df.drop(columns, axis=1), df_scaled], axis=1)
    return df

In [3]:
def corr_heatmap(data, annot=True):
    """
    Plot a heatmap of Pearson correlation.
    Parameters:
    data (DataFrame): Input dataframe.
    annot (bool): If True, display values on the heatmap.
    Returns:
    None
    """
    plt.figure(figsize=(10,6))
    plt.title("Pearson Correlation Heatmap")
    sns.heatmap(data.corr(), annot=annot, cmap="coolwarm")
    plt.show()

In [4]:
def backward_selected(data, response):
    """
    Perform backward feature selection using OLS regression.
    
    Parameters:
        data (DataFrame): Input dataframe containing both response and predictor variables.
        response (str): Name of the response variable.
        
    Returns:
        model (OLS): Final OLS model after backward feature selection.
    """
    remaining = set(data._get_numeric_data().columns)
    if response in remaining:
        remaining.remove(response)
    cond = True
    
    while remaining and cond:
        formula = f"{response} ~ {' + '.join(remaining)} + 1"
        print("--")
        print(formula)
        model = smf.ols(formula, data).fit()
        score = model.pvalues[1:]
        to_remove = score[score == score.max()]
        if to_remove.values > 0.05:
            print(f"Remove {to_remove.index[0]} (p-value: {round(to_remove.values[0], 3)})")
            remaining.remove(to_remove.index[0])
        else:
            cond = False
            print("Final model selected!")
        print("")
    
    print(model.summary())
    return model

In [5]:
def backward_selected_logistic(data, response):
    remaining = set(data._get_numeric_data().columns)
    
    if response in remaining:
        remaining.remove(response)
    
    cond = True
    
    while remaining and cond:
        formula = "{} ~ {} + 1".format(response, " + ".join(remaining))
        print("--")
        print(formula)
        
        # Fit logistic regression model
        model = smf.logit(formula, data).fit()
        
        # Get p-values
        score = model.pvalues[1:]
        
        # Remove variable with highest p-value if greater than 0.05
        toRemove = score[score == score.max()]
        if toRemove.values > 0.05:
            print("Remove", toRemove.index[0], "(p-value:", round(toRemove.values[0], 3), ")")
            remaining.remove(toRemove.index[0])
        else:
            cond = False
            print("Final model selected!")
        
        print("")
    
    print(model.summary())
    
    return model

In [6]:
def kmeans_elbow(X_train, cluster_max=10):
    """
    Use the elbow method to find the optimal number of clusters.
    
    Parameters:
    - X_train (array-like): Training data.
    - cluster_max (int): Maximum number of clusters to consider.
    
    Returns:
    - int: Optimal number of clusters.
    """
    inertia = []
    k_list = range(1, cluster_max + 1)
    for k in k_list:
        kmeans = KMeans(n_clusters=k)
        kmeans.fit(X_train)
        inertia.append(kmeans.inertia_)
    
    # Plot the elbow curve
    plt.plot(k_list, inertia, marker='o')
    plt.xlabel('Number of Clusters')
    plt.ylabel('Inertia')
    plt.title('Elbow Method for Optimal Number of Clusters')
    plt.show()
    
    # Find the optimal number of clusters using the elbow point
    optimal_index = np.argmin(np.diff(inertia)) + 1
    optimal_n_clusters = optimal_index + 1
    print("Optimal number of clusters:", optimal_n_clusters)
    
    return optimal_n_clusters

In [7]:
def plot_confusion_matrix(y_true, y_pred, model_names):
    cf = confusion_matrix(y_true, y_pred)
    cm = ConfusionMatrixDisplay(cf, display_labels=model_names)
    cm.plot(cmap=plt.cm.Blues)
    plt.grid(False)
    plt.show()

In [8]:
def plot_learning_curve(train_sizes, train_scores, test_scores, title):
    train_mean = np.mean(train_scores, axis=1)
    train_std = np.std(train_scores, axis=1)
    test_mean = np.mean(test_scores, axis=1)
    test_std = np.std(test_scores, axis=1)

    plt.figure(figsize=(10, 6))
    plt.title(title)
    plt.xlabel("Training Set Size")
    plt.ylabel("F1 Score")
    plt.plot(train_sizes, train_mean, label="Training score", color="blue")
    plt.fill_between(train_sizes, train_mean - train_std, train_mean + train_std, alpha=0.2, color="blue")
    plt.plot(train_sizes, test_mean, label="Cross-validation score", color="red")
    plt.fill_between(train_sizes, test_mean - test_std, test_mean + test_std, alpha=0.2, color="red")
    plt.legend(loc="best")
    plt.grid()
    plt.show()

In [9]:
def plot_validation_curve(param_range, train_scores, test_scores, param_name, title):
    train_mean = np.mean(train_scores, axis=1)
    train_std = np.std(train_scores, axis=1)
    test_mean = np.mean(test_scores, axis=1)
    test_std = np.std(test_scores, axis=1)

    plt.figure(figsize=(10, 6))
    plt.title(title)
    plt.xlabel(param_name)
    plt.ylabel("F1 Score")
    plt.plot(param_range, train_mean, label="Training score", color="blue")
    plt.fill_between(param_range, train_mean - train_std, train_mean + train_std, alpha=0.2, color="blue")
    plt.plot(param_range, test_mean, label="Cross-validation score", color="red")
    plt.fill_between(param_range, test_mean - test_std, test_mean + test_std, alpha=0.2, color="red")
    plt.legend(loc="best")
    plt.grid()
    plt.show()