In [None]:
"""
Code for CS51: Correlation and Regression
"""

import warnings
import itertools

import pandas as pd
import numpy as np
import seaborn as sns
import scipy.stats as stats
import matplotlib.pyplot as plt
import statsmodels.api as statsmodels

from pandas import DataFrame, Series
from typing import Optional
from datetime import datetime

# This is to suppress warnings from the statsmodels package. It can be
# removed if you want to see the warnings.
warnings.filterwarnings("ignore")


In [None]:
# This is the URL for the dataset.
URL = (
    "https://course-resources.minerva.edu/uploaded_files/mu/00294342-2873/diabetes.csv"
)

# Studied predictor variables
X = "SkinThickness"

# Arbitrarily chosen predictor variables
Xs = ["SkinThickness", "Glucose", "BloodPressure"]

# Response variable
Y = "BMI"


In [None]:
def preprocess_df(df: DataFrame) -> DataFrame:
    """
    Preprocesses the DataFrame by removing rows with 0 values for the columns
    of interest.
    """

    # Remove rows with 0 values for the columns of interest.
    df = df[(df[X] != 0) & (df[Y] != 0)]
    for x in Xs:
        df = df[df[x] != 0]

    return df


In [None]:
def get_figure_filename(name: str) -> str:
    """Returns a filename for a figure based on the current time and the given name."""

    now = datetime.now()
    return f"figures/{now.strftime('%Y%m%d%H%M%S')}-{name}.png"


In [None]:
df = preprocess_df(pd.read_csv(URL))


In [None]:
def get_summary(df: DataFrame, X: str, Y: str) -> DataFrame:
    """Returns a summary of the DataFrame for the given X and Y variables."""

    summary = df[[X, Y]].describe().T[["mean", "std", "min", "max"]]
    summary["mode"] = df[[X, Y]].mode().T
    summary["median"] = df[[X, Y]].median().T
    summary["range"] = summary["max"] - summary["min"]
    summary = summary[["mean", "std", "mode", "median", "min", "max", "range"]]
    summary = summary.T
    return summary


In [None]:
summary = get_summary(df, X, Y)
print(summary)


In [None]:
def plot_distribution(x: Series, name: Optional[str] = None):
    """Plots (and saves) a distribution plot for the given Series."""

    sns.displot(x, kde=True)

    # Plot the mean, median, and mode as vertical lines.
    plt.axvline(x.mean(), color="red", label="mean", linestyle="--", linewidth=2)
    plt.axvline(x.median(), color="green", label="median", linestyle="--", linewidth=2)
    plt.axvline(x.mode()[0], color="blue", label="mode", linestyle="--", linewidth=2)

    plt.title(f"{x.name} distribution")
    plt.legend()
    plt.savefig(get_figure_filename(name or "distribution"), dpi=600)
    plt.show()


In [None]:
plot_distribution(df[X], name="skin-thickness-histogram")
plot_distribution(df[Y], name="bmi-histogram")


In [None]:
# This was adapted from the previous assignment.

BINS = 20
Y_THRESHOLD = 30

control = df[df[Y] < Y_THRESHOLD]
treatment = df[df[Y] >= Y_THRESHOLD]

control_range = control[X].max() - control[X].min()
treatment_range = treatment[X].max() - treatment[X].min()
max_range = max(control_range, treatment_range)
control_bins = round(control_range / max_range * BINS)
treatment_bins = round(treatment_range / max_range * BINS)

sns.histplot(
    control[X],
    label=f"{Y} < {Y_THRESHOLD}",
    bins=control_bins,
    kde=True,
    # ax=ax,
)

plt.axvline(
    control[X].mean(),
    color="red",
    label=f"mean({Y} < {Y_THRESHOLD})",
    linestyle="--",
    linewidth=2,
)

sns.histplot(
    treatment[X],
    label=f"{Y} >= {Y_THRESHOLD}",
    bins=treatment_bins,
    kde=True,
    # ax=ax,
)

plt.axvline(
    treatment[X].mean(),
    color="green",
    label=f"mean({Y} >= {Y_THRESHOLD})",
    linestyle="--",
    linewidth=2,
)

plt.title(f"{control[X].name} distribution")
plt.legend()

# plt.savefig("figure3.png", dpi=600)
plt.show()


In [None]:
def plot_correlation_matrix(df: DataFrame, name: Optional[str] = None):
    """Plots (and saves) a correlation matrix for the given DataFrame."""

    sns.heatmap(
        df.corr().abs(),
        annot=True,
        cmap="YlGnBu",
        fmt=".2f", # format the numbers to 2 decimal places
        cbar=False, # remove the color bar
        square=True, # make the cells square (looks pretty :))
    )

    plt.savefig(get_figure_filename(name or "correlation-matrix"), dpi=600)
    plt.show()


In [None]:
plot_correlation_matrix(df, name="correlation-matrix")


In [None]:
def plot_scatter(x: Series, y: Series, name: Optional[str] = None):
    """Plots (and saves) a scatter plot for the given Series."""

    fig, (ax1, ax2) = plt.subplots(ncols=2, figsize=(12, 4))

    ax1.grid(True)
    ax1.minorticks_on()
    ax1.grid(which="minor", linestyle=":", linewidth="0.25", color="black")
    ax1.set_title(f"{x.name} vs {y.name}")
    sns.scatterplot(x=x, y=y, ax=ax1)

    ax2.grid(True)
    ax2.minorticks_on()
    ax2.grid(which="minor", linestyle=":", linewidth="0.25", color="black")
    ax2.set_title(f"Kernel density estimation (levels=4)")
    sns.scatterplot(x=x, y=y, ax=ax2)

    # `levels` for kernel density estimation was chosen arbitrarily.
    sns.kdeplot(x=x, y=y, ax=ax2, levels=4, color=".2")

    plt.savefig(get_figure_filename(name or "scatter"), dpi=600)
    plt.show()


In [None]:
plot_scatter(df[X], df[Y], name="scatter")


In [None]:
def plot_scatter_matrix(df: DataFrame, name: Optional[str] = None):
    """Plots (and saves) a scatter matrix for the given DataFrame."""

    plot = sns.pairplot(df)
    plot.map_lower(sns.kdeplot, levels=4, color=".2")

    plt.savefig(get_figure_filename(name or "scatter-matrix"), dpi=600)
    plt.show()


In [None]:
plot_scatter_matrix(df[[*Xs, Y]], name="pair-scatter")
# plot_scatter_matrix(df, name="pair-scatter")


In [None]:
def simple_regression(
    x: Series,
    y: Series,
    plot: bool = False,
    plot_name: Optional[str] = None,
    summary: bool = False,
) -> tuple[float, float, float]:
    """Performs a simple regression on the given Series and returns the R^2, b_0, and b_1."""

    _x = statsmodels.add_constant(x)
    model = statsmodels.OLS(y, _x)
    results = model.fit()

    # Print the summary if requested
    if summary:
        print(results.summary())

    residuals = results.resid
    r_squared = results.rsquared
    b_0, b_1 = results.params

    # Plot the results if requested
    if plot:
        fig, (ax1, ax2, ax3) = plt.subplots(ncols=3, figsize=(18, 4))

        # ax1: Make a QQ-plot for the residuals
        statsmodels.qqplot(residuals, fit=True, line="45", ax=ax1)

        # ax2: Make a residual plot
        sns.residplot(x=x, y=y, ax=ax2)
        ax2.set(
            ylabel="Residuals",
            xlabel="Fitted values",
        )

        # ax3: Make a histogram of the residuals
        sns.histplot(residuals, kde=True, ax=ax3)
        ax3.set(xlabel="Residuals")

        plt.savefig(get_figure_filename(plot_name or "simple-regression"), dpi=600)
        plt.show()

        # Make a regression plot
        sns.regplot(
            x=x,
            y=y,
            marker="+",
        )
        plt.title(f"Simple linear regression: {x.name} vs {y.name}")

        plt.savefig(
            get_figure_filename(plot_name or "simple-regression-regplot"), dpi=600
        )
        plt.show()

    return r_squared, b_0, b_1


In [None]:
r, _ = stats.pearsonr(df[X], df[Y])
r_squared, b_0, b_1 = simple_regression(
    df[X], df[Y], plot=True, summary=True, plot_name="simple-regression"
)

print(f"{r=} {r_squared=}, b_0={b_0}, b_1={b_1}")


In [None]:
def multiple_regression(
    x,
    y: Series,
    plot: bool = False,
    plot_name: Optional[str] = None,
    summary: bool = False,
) -> tuple[float, float, list[float]]:
    """Performs a multiple regression on the given Series and returns the adjusted R^2, b_0, and b_1."""

    x = statsmodels.add_constant(x)
    model = statsmodels.OLS(y, x)
    results = model.fit()

    # Print the summary if requested
    if summary:
        print(results.summary())

    residuals = results.resid
    r_squared_adj = results.rsquared_adj
    b_0, *b = results.params

    y_hat = results.predict()

    # Plot the results if requested
    if plot:
        fig, (ax1, ax2, ax3) = plt.subplots(ncols=3, figsize=(18, 4))

        # ax1: Make a QQ-plot for the residuals
        statsmodels.qqplot(residuals, fit=True, line="45", ax=ax1)

        # ax2: Make a residual plot
        sns.residplot(x=y_hat, y=residuals, ax=ax2)
        ax2.set(
            ylabel="Residuals",
            xlabel="Fitted values",
        )

        # ax3: Make a histogram of the residuals
        sns.histplot(residuals, kde=True, ax=ax3)
        ax3.set(xlabel="Residuals")

        plt.savefig(get_figure_filename(plot_name or "multiple-regression"), dpi=600)
        plt.show()

        # Make a regression plot using the predicted values and the actual values, given
        # the high-dimensional nature of the data.
        sns.regplot(
            x=y_hat,
            y=y,
            marker="+",
        )
        plt.xlabel(f"predicted {y.name}")
        plt.show()

    return r_squared_adj, b_0, b


In [None]:
def forward_selection(
    df: DataFrame,
    xs: list[str],
    y: str,
) -> tuple[list[str], float, float, list[float]]:
    """Performs a forward selection on the given DataFrame and returns the best R^2, b_0, and b_1."""

    best_r_squared_adj = 0
    best_xs = None
    best_b_0 = None
    best_b = None

    for i in range(1, len(xs) + 1):
        # Get all combinations of length i
        for _xs in itertools.combinations(xs, i):
            r_squared_adj, b_0, b = multiple_regression(df[list(_xs)], df[y])

            # If the R^2 is better than the best R^2, update the best R^2 and the best xs
            if r_squared_adj > best_r_squared_adj:
                best_r_squared_adj = r_squared_adj
                best_xs = _xs
                best_b_0 = b_0
                best_b = b

    return best_xs, best_r_squared_adj, best_b_0, best_b  # type: ignore


In [None]:
columns: list[str] = list(df.columns)  # type: ignore
columns.remove(Y)

xs, r_squared, b_0, b = forward_selection(df, columns, Y)
xs = list(xs)

print(f"{xs=}", f"{r_squared=:.2f}")

# print(f"\\hat{{y}} = {b_0:.2f}", end="")
# for i, x in enumerate(xs):
#     print(f" + {b[i]:.2f} \\times {x}", end="")

r_squared, b_0, b = multiple_regression(
    df[xs], df[Y], plot=True, summary=True, plot_name="multiple-regression"
)


In [None]:
def standard_error(x: Series, y: Series, r_squared: float, ddof=1) -> float:
    """Calculates the standard error of the regression."""

    n = len(x)
    s_x = x.std(ddof=ddof)
    s_y = y.std(ddof=ddof)
    return (s_y / s_x) * np.sqrt((1 - r_squared) / (n - 2))


In [None]:
def p_value(x: Series, y: Series, beta_1: float, tails: int = 2) -> float:
    """Calculates the p-value of the regression."""

    # Use the previously defined functions to calculate the p-value
    r_squared, b_0, b_1 = simple_regression(x, y)

    # Calculate the standard error of the regression
    SE_b_1 = standard_error(x, y, r_squared)

    # Calculate the t-score
    t = (b_1 - beta_1) / SE_b_1

    # Calculate the p-value using the survival function of the t-distribution.
    # Note that the degrees of freedom is n - 2, where n is the number of observations,
    # because we have two parameters (b_0 and b_1).
    p: float = stats.t.sf(np.abs(t), len(x) - 2) * tails  # type: ignore

    return p


In [None]:
SE_b_1 = standard_error(df[X], df[Y], r_squared)
p = p_value(
    df[X],
    df[Y],
    0,
    tails=1,
)
print(f"{SE_b_1=} {p=}")


In [None]:
def confidence_interval(
    x: Series, y: Series, alpha: float = 0.05
) -> tuple[float, float]:
    """Calculates the confidence interval of the regression."""

    # Use the previously defined functions to calculate the confidence interval
    r_squared, b_0, b_1 = simple_regression(x, y)

    # Calculate the standard error of the regression
    SE_b_1 = standard_error(x, y, r_squared)

    # Calculate the t-score. Note that the degrees of freedom is n - 2, where n is the number of observations,
    # because we have two parameters (b_0 and b_1).
    t: float = stats.t.ppf(1 - alpha / 2, len(x) - 2)  # type: ignore

    lower = b_1 - t * SE_b_1
    upper = b_1 + t * SE_b_1

    return lower, upper


In [None]:
ALPHA = 0.01
lower, upper = confidence_interval(df[X], df[Y], alpha=ALPHA)
print(f"{(1-ALPHA)*100:.2f}% [{lower=}, {upper=}]")
