In [1]:
!pip install seaborn matplotlib

In [4]:
from operator import methodcaller

import polars as pl
import numpy as np
import renkon.api as rk

import seaborn as sns
import matplotlib.pyplot as plt


def plot_linreg(m, c, *, ax):
    """ Plot y = mx + c """
    ax = ax or plt.gca()
    x = np.array(ax.get_xlim())
    y = m * x + c
    ax.plot(x, y, color="green", linestyle=":")

%load_ext autoreload
%autoreload 2

In [5]:
cereals_df = pl.read_csv("../etc/samples/cereals.csv", separator=";", skip_rows_after_header=1).with_columns(
    carbs=pl.col("carbo") + pl.col("sugars"),
)
cereals_df

In [4]:
cereals_df.write_csv("../etc/samples/cereals-cleaned.csv", separator=",")

In [6]:
# Data Inspection

fig, axs = plt.subplots(1, 4, figsize=(16, 3))
sns.histplot(ax=axs[0], data=cereals_df, x="calories", kde=True);
sns.histplot(ax=axs[1], data=cereals_df, x="carbs", kde=True);
sns.histplot(ax=axs[2], data=cereals_df, x="protein", kde=True);
sns.histplot(ax=axs[3], data=cereals_df, x="fat", kde=True);
fig.tight_layout()

fig, axs = plt.subplots(1, 3, figsize=(12, 4))
sns.scatterplot(ax=axs[0], data=cereals_df, x="carbs", y="calories")
sns.scatterplot(ax=axs[1], data=cereals_df, x="protein", y="calories")
sns.scatterplot(ax=axs[2], data=cereals_df, x="fat", y="calories")
fig.tight_layout()

In [7]:
CALORIES_PER = {
    "carbs": 4,
    "protein": 4,
    "fat": 9,
}

cals_df = cereals_df.select(
    pl.col("name"),
    pl.col("calories").alias("cals_from_label"),
    pl.concat_list("carbs", "protein", "fat").apply(
        methodcaller("dot", CALORIES_PER.values())
    ).alias("cals_from_macros")
)
cals_df

In [8]:
model, fit = rk.stats.linear_fit(data=cereals_df, y="calories", x=["carbs", "protein", "fat"])
fit.params

In [9]:
fig, ax = plt.subplots(figsize=(6, 3))
ax.set_xlim(0, 200)
ax.set_ylim(0, 200)
ax.plot([0, 200], [0, 200], color="green", linestyle=":")
sns.scatterplot(ax=ax, data=cals_df, x="cals_from_label", y="cals_from_macros")
fig.tight_layout()

# Definitions

In [11]:
import functools as ft
# Draw a parametrized sigmoid:

dist = lambda a, b: np.abs(a - b)

delta = 0.001

# y-values of sigmoids asymptotes:
y_max = 1
y_min = 0

# "Amplitude" of the sigmoid:
amp = dist(y_max, y_min)

# x-values where the sigmoid is at 0, reaches y_max - delta, and y_min + delta:
x_lo = 0
x_0 = 10
x_hi = 2 * (x_0 - x_lo)

# "Wavelength" of the sigmoid:
wlen = dist(x_hi, x_lo)

k = (np.log(amp/delta - 1) - np.log(amp/(amp - delta) -1)) / wlen

x = np.linspace(-100, 100, 1000)
y = 1 / (1 + np.exp(k*(x - x_0)))
y = dist(y_min, y_max) * y + y_min

# ---

fig, ax = plt.subplots(figsize=(8, 6))

ax.set_xlim(np.min(x), np.max(x))
ax.set_ylim(y_min - delta, y_max + delta)

ax.plot(x, y, label="|y_max - y_min| / (1 + exp(x) + y_min")

# Draw lines for y_max and y_min:
ax.hlines(y_max, np.min(x), np.max(x), color="red", linestyle=":", label="y_max")
ax.hlines(y_min, np.min(x), np.max(x), color="red", linestyle=":", label="y_min")

# Draw vertical lines for x_lo and x_hi:
ax.vlines(x_lo, np.min(y), np.max(y), color="green", linestyle=":", label=f"x_lo s.t. f(x_lo) = y_max - {delta}")
# ax.vlines(x_hi, np.min(y), np.max(y), color="green", linestyle=":", label=f"x_hi s.t. f(x_hi) = y_min + {delta}")

# Draw vertical line for x_0:
ax.vlines(x_0, np.min(y), np.max(y), color="orange", linestyle=":", label=f"inlier cutoff")

# Zero lines for x and y:
ax.hlines(0, np.min(x), np.max(x), color="black", alpha=0.1)
ax.vlines(0, np.min(y), np.max(y), color="black", alpha=0.1)

# Draw legend with solid background.
ax.legend(frameon=True, facecolor="white", framealpha=1.0)
ax.set_xlim(x_lo - 5, max(x) / 2)
ax.xaxis.set_major_locator(plt.MultipleLocator(10))

ax.set_xlabel("Residual")
ax.set_ylabel("Conformity Score")
ax.set_title("Conformity Score of Residuals (parametrized by δ, x_lo, x_0, y_min, and y_max)")

fig.tight_layout()

In [118]:
np.log((y_max - (y_max - delta)) - 1)

In [107]:
dist(-1, 1)

In [None]:
def conformity(
        errors: np.ndarray,
        
)

In [7]:
def linear_conformity(
        res: np.ndarray,
        thresh: float,
        thresh_val: float = 0.8,
        k: float = 1.0
) -> np.ndarray:
    """
    @param res: Residuals
    @param thresh: Threshold
    @param thresh_val: Conformity at the threshold value.
    @param k: Slope of the sigmoid (tangent to the midpoint).
    """

    # Calculate the midpoint of the sigmoid based on the threshold, f(threshold) and k.
    x0 = thresh + (1 / k) * np.log((thresh_val / (1 - thresh_val)))

    # Calculate the sigmoid.
    return 1 / (1 + np.exp(k * (np.abs(res) - x0)))


def linear_aberrance(
        res: np.ndarray,
        thresh: float,
        thresh_val: float = 0.8,
        k: float = 1.0
) -> np.ndarray:
    """
    @param res: Residuals
    @param thresh: Threshold
    @param thresh_val: Conformity at the threshold value.
    @param k: Slope of the sigmoid (tangent to the midpoint).
    """
    return 1 - linear_conformity(res, thresh, thresh_val, k)

In [80]:
thresh = 5
k = 0.5
thresh_conformity = 0.1

x = np.linspace(0, 20, 100)
y = linear_conformity(x, thresh, thresh_conformity, k)

fig, ax = plt.subplots(figsize=(6, 3))
ax.plot(x, y)
ax.vlines(thresh, 0, 1, color="red", linestyle=":")
ax.hlines(thresh_conformity, 0, 20, color="green", linestyle=":")
ax.set_xlabel("Residual")
ax.set_ylabel("Conformity")

# Model Testing

In [9]:
# Calculate IDR of calories
cal_idr = cereals_df.select(pl.col("calories")).quantile(0.90) - cereals_df.select(pl.col("calories")).quantile(0.10)
cal_idr

In [10]:
cereals_df.select(pl.col("calories")).quantile(0.25)

In [11]:
cal_iqr = pl.col("calories").quantile(0.75) - pl.col("calories").quantile(0.25)
cal_idr = pl.col("calories").quantile(0.9) - pl.col("calories").quantile(0.1)
cal_mad = (pl.col("calories") - pl.col("calories").median()).abs().median()

In [59]:
results_df = (cereals_df.select(
    pl.col("carbs", "protein", "fat"),
    expected=pl.col("calories"),
    predicted=fit.predict().alias("calories_pred"),
    err=fit.errors().abs().alias("err"),
    # MAD of calories
    threshold=cal_idr
).with_columns(
    is_outlier=pl.col("err") > pl.col("threshold")
)
.with_columns(
    macro_sum=pl.concat_list("carbs", "protein", "fat").apply(
        methodcaller("dot", CALORIES_PER.values())
    )
))
results_df;

In [60]:
fig, ax = plt.subplots(figsize=(5, 5))
ax.set_xlim(0, 200)
ax.set_ylim(0, 200)
# todo: plot the trend line properly! 
ax.plot([0, 200], [0, 200], color="green", linestyle=":")
sns.scatterplot(ax=ax, data=results_df, x="expected", y="predicted", hue="is_outlier")
fig.tight_layout()

In [14]:
results_df.with_row_count().filter(pl.col("is_outlier")).sort("err", descending=True)

# RANSAC (Reference)

In [15]:
from sklearn.linear_model import RANSACRegressor

In [30]:
ransac = RANSACRegressor(random_state=42)
ransac.fit(cereals_df.select(pl.col("carbs", "protein", "fat")), cereals_df.select(pl.col("calories")))
ransac.estimator_.coef_

In [53]:
fig, axs = plt.subplots(2, figsize=(5, 5))
ax.set_xlim(0, 200)
ax.set_ylim(0, 200)
ax.plot([0, 200], [0, 200], color="green", linestyle=":")
sns.scatterplot(ax=ax, data=results_df.with_columns(
    sk_predicted=pl.lit(ransac.predict(cereals_df.select(pl.col("carbs", "protein", "fat"))).reshape(-1))
), x="expected", y="sk_predicted", hue="is_outlier")
fig.tight_layout()