<a href="https://colab.research.google.com/github/ariahosseini/TradML/blob/main/MachineLearningUtils.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [46]:
# libs
import os, sys, warnings, itertools
import pandas as pd
import numpy as np
from tqdm import tqdm
from scipy import sparse
from scipy.stats import linregress
# sklearn
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
# vis
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from IPython.core.display import HTML

In [47]:
def summarize_columns(df):
    num_rows = len(df)
    summary = pd.DataFrame(df.dtypes, columns=["dtypes"]).reset_index().rename(columns={"index": "col_name"})[["col_name", "dtypes"]]
    summary["missing"] = df.isnull().sum().values
    summary["missing_percent"] = (summary["missing"] * 100 / df.shape[0]).round(1)
    summary["uniques"] = df.nunique().values
    summary["first_value"] = df.iloc[0].values
    summary["second_value"] = df.iloc[1].values
    summary["third_value"] = df.iloc[2].values
    return summary

In [48]:
def reduce_memory_usage(df, category = False):
    start_mem = df.memory_usage().sum() / (1024**2)
    print("Memory usage of dataframe is {:2f} MB!".format(start_mem))
    for col in df.columns:
        col_type = df[col].dtype
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == "int":
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            if category:
                df[col] = df[col].astype("category")
    end_mem = df.memory_usage().sum() / (1024**2)
    print("Memory usage after optimization is {:2f} MB!".format(end_mem))
    print("Decreased by {:.1f}%".format(100 * (start_mem-end_mem) / start_mem))

In [49]:
def display_df(df, message = " "):
    print("Dataframe: {}".format(message))
    num_rows, num_cols = df.shape
    print(f"num_rows = {num_rows:,} \nnum_cols = {num_cols:,}")
    display(df.head())
    print("Info:")
    df.info()
    if df.isnull().any().any():
        print("Number of null data points:")
        print(df.isnull().sum()[df.isnull().sum() != 0])

In [50]:
def plot_variables(df, vars_to_plot, cts_vars, num_cols=2, hist_num_bins=20):
    num_rows = (len(vars_to_plot) + num_cols - 1) // num_cols
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(num_cols * 5, num_rows * 5))
    axes = axes.flatten()

    for i, var in enumerate(vars_to_plot):
        ax = axes[i]
        if var in cts_vars:
            ax.hist(df[var], bins=hist_num_bins)
            plt.xticks(rotation=45)
            ax.set_title(f"{var} Histogram")
        else:
            df[var].value_counts().plot(kind="bar", ax=ax, title=f"{var} Counts")

    for j in range(i + 1, len(axes)):
        axes[j].axis('off')

    plt.tight_layout()
    plt.show()

In [51]:
def scatter_plot_vars(df, vars_to_plot, num_cols=2):
    num_rows = (len(vars_to_plot) + num_cols - 1) // num_cols
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(num_cols * 5, num_rows * 5))
    axes = axes.flatten()

    for i, var in enumerate(vars_to_plot):
        ax = axes[i]
        sns.scatterplot(x=var, y='index', data=df.reset_index(), ax=ax)
        ax.set_title(f'Distribution of {var}')
        ax.set_xlabel(var)
        ax.set_ylabel('Index')
        ax.tick_params(axis='x', labelrotation=45)

    for j in range(i + 1, len(axes)):
        axes[j].axis('off')

    plt.tight_layout()
    plt.show()

In [52]:
def plot_regs(df, cts_vars, response, num_cols=3, dot_size=10, line_width=3):
    num_rows = (len(cts_vars) + num_cols - 1) // num_cols
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(num_cols * 5, num_rows * 5))
    axes = axes.flatten()

    for i, var in enumerate(cts_vars):
        ax = axes[i]
        slope, intercept, r_value, p_value, std_error = linregress(df[var], df[response])
        sns.regplot(x=df[var], y=df[response], ax=ax,
                   scatter_kws={"s": dot_size},
                   line_kws={"linewidth": line_width},
                   label="y={0:.1f}x+{1:.1f}".format(slope, intercept)).legend(loc="best")

    for j in range(i + 1, len(axes)):
        axes[j].axis('off')

    fig.suptitle("Regression Scatter Plots for {}".format(response), fontsize=16)
    fig.subplots_adjust(top=0.95)
    plt.tight_layout()
    plt.show()

In [53]:
def plot_clfs(df, feature_vars, response, num_cols=3, dot_size=10):
    num_rows = (len(feature_vars) + num_cols - 1) // num_cols
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(num_cols * 5, num_rows * 5))
    axes = axes.flatten()

    for i, var in tqdm(enumerate(feature_vars), total=len(feature_vars), desc="Processing features"):
        ax = axes[i]

        X = df[[var]]
        y = df[response]

        if df[var].dtype == 'object' or df[var].dtype == 'category' or len(df[var].unique()) < 10:
            preprocessor = ColumnTransformer(
                transformers=[('cat', Pipeline(steps=[
                    ('imputer', SimpleImputer(strategy='most_frequent')),
                    ('onehot', OneHotEncoder(drop='first'))
                ]), [0])],
                remainder='passthrough'
            )
        else:
            preprocessor = ColumnTransformer(
                transformers=[('num', Pipeline(steps=[
                    ('imputer', SimpleImputer(strategy='mean')),
                    ('scaler', StandardScaler())
                ]), [0])],
                remainder='passthrough'
            )

        pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                                   ('classifier', LogisticRegression())])

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        if len(np.unique(y_test)) == 2:
          auc = roc_auc_score(y_test, pipeline.predict_proba(X_test)[:, 1])
        else:
          auc = roc_auc_score(y_test, pipeline.predict_proba(X_test), multi_class='ovr', average='macro')

        if df[var].dtype == 'object' or df[var].dtype == 'category' or len(df[var].unique()) < 10:
            sns.countplot(x=X_train[var], hue=y_train, ax=ax, palette='coolwarm')
        else:
            sns.scatterplot(x=X_train[var], y=y_train, hue=y_train, ax=ax, s=dot_size, palette='coolwarm')

            x_vals = np.linspace(X_train[var].min(), X_train[var].max(), 100).reshape(-1, 1)
            y_vals = pipeline.predict_proba(x_vals)[:, 1]
            ax.plot(x_vals, y_vals, color='black', linewidth=2, label=f'Acc: {accuracy:.2f}, AUC: {auc:.2f}')

        ax.set_title(f"Logistic Regression for {var}")
        ax.legend(loc='best')

    for j in range(i + 1, len(axes)):
        axes[j].axis('off')

    fig.suptitle(f"Logistic Regression with Each Feature Separately ({response})", fontsize=16)
    fig.subplots_adjust(top=0.95)
    plt.tight_layout()
    plt.show()

In [54]:
@staticmethod
def display_side_by_side(dfs: list, captions: list, table_spacing=5):
    if len(dfs) != len(captions):
        raise ValueError("The number of DataFrames and captions must be equal!")

    output = ""
    for (caption, df) in zip(captions, dfs):
        output += df.style.set_table_attributes("style='display:inline'").set_caption(caption)._repr_html_()
        output += table_spacing * "\xa0"
    display(HTML(output))

In [55]:
def one_hot_encode(df, ohe, var_list, drop_original=True, sparse_matrix=False, handle_unknown='ignore'):
    if not sparse_matrix:
        temp_df = pd.DataFrame(data=ohe.transform(df[var_list]), columns=ohe.get_feature_names_out())
    else:
        temp_df = sparse.csr_matrix(ohe.transform(df[var_list]))

    df = pd.concat([df.reset_index(drop=True), temp_df], axis=1)
    if drop_original:
        df.drop(columns=var_list, axis=1, inplace=True)
    return df

In [56]:
def plot_conf_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()