In [None]:
%load_ext autoreload
%autoreload 2

%cd ../

In [None]:
from collections.abc import Collection
from typing import Any, Self

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import scipy.stats

from portfolio import synthetic

In [None]:
class GMM:
    def __init__(self, means, covs, ps: Collection[float], x_slice=None, y_slice=None):
        self.K = len(ps)
        self.d = means[0].shape[0]
        self.x_slice = x_slice
        self.y_slice = y_slice

        assert len(means) == self.K
        assert len(covs) == self.K
        for k in range(self.K):
            assert means[k].shape[0] == self.d
            assert covs[k].shape == (self.d, self.d)

        self.ps = np.asarray(ps)
        self.dists = [
            scipy.stats.multivariate_normal(mean=mean, cov=cov)
            for mean, cov in zip(means, covs)
        ]
        self.rng = np.random.default_rng()

        if x_slice is not None and y_slice is not None:
            self.dists_x = [
                scipy.stats.multivariate_normal(
                    mean=mean[x_slice], cov=cov[x_slice, x_slice])
                for mean, cov in zip(means, covs)
            ]
            self.dists_y = [
                scipy.stats.multivariate_normal(
                    mean=mean[y_slice], cov=cov[y_slice, y_slice])
                for mean, cov in zip(means, covs)
            ]

    def sample(self, size: int = 1) -> np.ndarray:
        if size == 1:
            k = self.rng.choice(len(self.ps), p=self.ps)
            return self.dists[k].rvs()
        else:
            comps = self.rng.choice(len(self.ps), p=self.ps, size=size)
            return np.stack([self.dists[k].rvs() for k in comps])

    def pdf(self, z: np.ndarray) -> np.ndarray:
        """
        Args:
            z: (d,) or (n, d) array of points to evaluate the pdf at
        """
        return sum(p * d.pdf(z) for p, d in zip(self.ps, self.dists))

    def cond_on_x(self, x: np.ndarray) -> Self:
        """Return the conditional distribution p(y|x).

        The conditional distribution itself is a GMM:
        p(y|x) = sum_k p(y,k|x) = sum_k [p(y|x,k) p(k|x)]
        - p(k|x) is the weights for the k-th component
            p(k|x) ~ p(x|k) p(k)
        - Each component p(y|x,k) is a Gaussian
        """
        # calculate p(k|x)
        ps_cond = np.array([d_x.pdf(x) for d_x in self.dists_x]) * self.ps
        ps_cond /= ps_cond.sum()

        # calculate p(y|x,k)
        means_cond = [
            self.dists_y[k].mean
            + self.dists[k].cov[self.y_slice, self.x_slice]
            @ np.linalg.solve(self.dists_x[k].cov, x - self.dists_x[k].mean)
            for k in range(self.K)
        ]
        covs_cond = [
            self.dists_y[k].cov
            - self.dists[k].cov[self.y_slice, self.x_slice]
            @ np.linalg.solve(self.dists_x[k].cov, self.dists[k].cov[self.x_slice, self.y_slice])
            for k in range(self.K)
        ]
        return GMM(means_cond, covs_cond, ps_cond)

In [None]:
ALPHA = 0.9
PHI = 0.7

# ALPHA = 0.1
# PHI = 0.1

Y1, Y2 = np.meshgrid(np.linspace(-6, 11, 100), np.linspace(-5, 6, 100))

def plot_gmm_cond(
    ax: plt.Axes, x: np.ndarray,
    alpha: float = ALPHA, phi: float = PHI
) -> None:
    gmm = GMM(
        *synthetic.create_gmm(alpha=alpha, phi=phi),
        x_slice=slice(2, None), y_slice=slice(0, 2)
    )
    gmm_cond = gmm.cond_on_x(x)

    # Option 1: contour plot the PDF
    pdf = gmm_cond.pdf(np.stack([Y1, Y2], axis=-1))
    ax.contourf(Y1, Y2, pdf, levels=100, cmap=plt.cm.magma_r)

    # Option 2: draw samples, then use seaborn KDE plot
    # samples = gmm_cond.sample(size=5000)
    # sns.kdeplot(x=samples[:, 0], y=samples[:, 1], fill=True, ax=ax)

Search over alphas & phis to match Chenreddy paper

In [None]:
alphas = [0.01, 0.05, 0.1, 0.5, 0.9, 1, 5, 10]
phis = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7,0.8,0.9]

fig, axs = plt.subplots(len(alphas), len(phis), figsize=(20, 20), tight_layout=True)

for r, alpha in enumerate(alphas):
    for c, phi in enumerate(phis):
        ax = axs[r, c]

        # x = np.array([2.5, -0.2])
        # x = np.array([-2.6, 0.5])
        x = np.array([2.7, 1.9])
        plot_gmm_cond(ax, x, alpha=alpha, phi=phi)

        if r == 0 or c == 0:
            ax.set_title(f"alpha={alpha}, phi={phi}")

Plot a random selection of conditional distributions from the test set

In [None]:
loaders, (y_mean, y_std) = synthetic.get_loaders(batch_size=256, seed=0, alpha=ALPHA, phi=PHI)
ds = loaders['test'].dataset
np.random.seed(2)
idxs = np.sort(np.random.choice(len(ds), size=100, replace=False))

fig, axs = plt.subplots(10, 10, figsize=(20, 20), tight_layout=True)
for i, ax in zip(idxs, axs.flat):
    x = ds[i][0].numpy()
    y = ds[i][1].numpy() * y_std + y_mean
    plot_gmm_cond(ax, x, alpha=ALPHA, phi=PHI)
    ax.scatter(y[0], y[1], color='green', s=20)
    ax.set(title=f'index {i}')

## Plot sublevel set of PICNN

In [None]:
import torch

In [None]:
def plot_picnn_contour(
    ckpt_path: str, hidden_dim: int, alpha: float, seed: int, x: np.ndarray, label: str, ax: plt.Axes,
    color: Any, y: np.ndarray | None = None
) -> None:
    from models.picnn import PICNN, conformal_q
    from portfolio.problems import PortfolioProblemPICNN

    model = PICNN(input_dim=2, y_dim=2, hidden_dim=hidden_dim, n_layers=2, y_in_output_layer=False)
    model.load_state_dict(torch.load(ckpt_path, weights_only=True))

    loaders, (y_mean, y_std) = synthetic.get_loaders(batch_size=256, seed=seed, alpha=ALPHA, phi=PHI)
    prob = PortfolioProblemPICNN(N=2, L=2, d=hidden_dim, y_mean=y_mean, y_std=y_std)

    q = conformal_q(model, loaders['calib'], alpha=alpha).item()

    thetas = np.linspace(0, 2*np.pi, num=100, endpoint=False)
    cs = np.stack([np.sin(thetas), np.cos(thetas)], axis=1)
    ys = [
        prob.solve_primal_max(x, model, q=q, c=c)
        for c in cs
    ]
    ys.append(ys[0])  # close the polygon
    ys = np.stack(ys)

    ys = ys * y_std + y_mean
    ax.plot(ys[:, 0], ys[:, 1], label=label, ls='--', lw=2, color=color)

    if y is not None:
        prob.solve(x, model, q=q)
        task_loss = prob.task_loss_np(y, is_standardized=True)

        z = prob.primal_vars['z'].value
        # ax.plot([0, 3*z[0]], [0, 3*z[1]], label=r'$z^\star$', color=color)
        ax.arrow(0, 0, 3*z[0], 3*z[1], width=0.15, label=r'$z^\star$', color=color)
        # ax.quiver(0, 0, z[0], z[1], scale=5, width=0.015, label=r'$z^\star$', color=color)
    else:
        task_loss = None

    return task_loss

Plot sublevel set at a specific `x`

In [None]:
fig, ax = plt.subplots(1, 1, tight_layout=True)

x = np.array([2.7, 1.9])
plot_gmm_cond(ax, x)

plot_picnn_contour(
    ckpt_path='out/portfolio_syn_picnn_d128/e2e_finetune_a0.10_L2_d128_lr0.005_reg0.001_seed0.pt',
    hidden_dim=128, alpha=0.1, seed=0, x=x, label='e2e', ax=ax,
    color=plt.cm.tab10.colors[0]
)

plot_picnn_contour(
    ckpt_path='out/portfolio_syn_picnn_d128/eto_L2_d128_lr0.01_reg0.001_seed0.pt',
    hidden_dim=128, alpha=0.1, seed=0, x=x, label='eto', ax=ax,
    color=plt.cm.tab10.colors[1]
)

ax.legend()
plt.show()

Plot PICNN sublevel sets for different alphas at a test set `x`

In [None]:
fig, axs = plt.subplots(1, 4, figsize=(12, 3), tight_layout=True)

seed = 2
loaders, (y_mean, y_std) = synthetic.get_loaders(batch_size=256, seed=seed, alpha=ALPHA, phi=PHI)
ds = loaders['test'].dataset

#  192, 280, 931
# 305 is inverted egg

x = ds[305][0].numpy()
y = ds[305][1].numpy()

y_unstandardized = y * y_std + y_mean

for ax, alpha in zip(axs, (0.01, 0.05, 0.1, 0.2)):

    plot_gmm_cond(ax, x, alpha=ALPHA, phi=PHI)

    eto_task_loss = plot_picnn_contour(
        ckpt_path='out/portfolio_syn_picnn_d128/eto_L2_d128_lr0.01_reg0.001_seed0.pt',
        hidden_dim=128, alpha=alpha, seed=seed, x=x, y=y, label='eto', ax=ax,
        color=plt.cm.tab10.colors[1]
    )

    e2e_task_loss = plot_picnn_contour(
        ckpt_path=f'out/portfolio_syn_picnn_d128/e2e_finetune_a{alpha:.2f}_L2_d128_lr0.005_reg0.001_seed0.pt',
        hidden_dim=128, alpha=alpha, seed=seed, x=x, y=y, label='e2e', ax=ax,
        color=plt.cm.tab10.colors[0]
    )

    # un-standardize the true y before plotting
    ax.scatter(y_unstandardized[0], y_unstandardized[1], color='green', s=20, label='true y')

    ax.legend()
    ax.set(title=f'alpha {alpha}')

plt.show()

## Plot box uncertainty from quantile regressor

In [None]:
import torch

from matplotlib.legend_handler import HandlerPatch
import matplotlib.patches as mpatches

def make_legend_arrow(legend, orig_handle,
                      xdescent, ydescent,
                      width, height, fontsize):
    p = mpatches.FancyArrow(0, 0.5*height, width, 0, length_includes_head=True, head_width=0.75*height )
    return p

In [None]:
def plot_box(
    ckpt_path: str, alpha: float, seed: int, x: np.ndarray, label: str, ax: plt.Axes,
    color: Any, y: np.ndarray | None = None
) -> None:
    from models.quantile import QuantileRegressor, conformal_q
    from portfolio.problems import PortfolioProblemBox

    model = QuantileRegressor(input_dim=2, y_dim=2)
    model.load_state_dict(torch.load(ckpt_path, weights_only=True))
    model.eval()

    loaders, (y_mean, y_std) = synthetic.get_loaders(batch_size=256, seed=seed, alpha=ALPHA, phi=PHI)
    q = conformal_q(model, loaders['calib'], y_dim=2, alpha=alpha).item()

    with torch.no_grad():
        x_batch = torch.from_numpy(x[None]).to(dtype=torch.float32)
        pred = model(x_batch)[0].numpy()

    pred_lo = pred[:2] - q
    pred_hi = pred[2:] + q

    pred_lo_unstd = pred_lo * y_std + y_mean
    pred_hi_unstd = pred_hi * y_std + y_mean

    box_points = np.array([
        pred_lo_unstd,
        [pred_lo_unstd[0], pred_hi_unstd[1]],
        pred_hi_unstd,
        [pred_hi_unstd[0], pred_lo_unstd[1]],
        pred_lo_unstd
    ])
    ax.plot(box_points[:, 0], box_points[:, 1], lw=2, ls='--', label=label, color=color)

    if y is not None:
        prob = PortfolioProblemBox(N=2, y_mean=y_mean, y_std=y_std)
        prob.solve(pred_lo, pred_hi)
        task_loss = prob.task_loss_np(y, is_standardized=True)

        # label = f'{label}, task loss {task_loss:.2g}'
        z = prob.primal_vars['z'].value
        # ax.plot([0, 3*z[0]], [0, 3*z[1]], label=r'$z^\star$', color=color)
        ax.arrow(0, 0, 3*z[0], 3*z[1], width=0.15, label=r'$z^\star$', color=color)
        # ax.quiver(0, 0, z[0], z[1], scale=5, width=0.015, label=r'$z^\star$', color=color)
    else:
        task_loss = None
    
    return task_loss

In [None]:
fig, ax = plt.subplots(1, 1, tight_layout=True)

loaders, (y_mean, y_std) = synthetic.get_loaders(batch_size=256, seed=0, alpha=ALPHA, phi=PHI)
ds = loaders['test'].dataset

#  192, 280, 931
# 305 is inverted egg

x = ds[192][0].numpy()
y = ds[192][1].numpy()

plot_gmm_cond(ax, x)

# SAVED_LR = {0.01: 10**(-1.5), 0.05: 10**(-1.5), 0.1: 1e-2, 0.2: 10**(-2.5)}
# SAVED_L2REG = {0.01: 1e-4, 0.05: 1e-4, 0.1: 1e-3, 0.2: 1e-3}

plot_box(
    ckpt_path='out/portfolio_syn_quantile/eto_a0.10_lr0.01_reg0.001_seed0.pt',
    alpha=0.1, seed=0, x=x, y=y, label='e2e', ax=ax, color=plt.cm.tab10.colors[0]
)

plot_box(
    ckpt_path='out/portfolio_syn_quantile/e2e_finetune_a0.10_lr0.0001_reg0.001_seed0.pt',
    alpha=0.1, seed=0, x=x, y=y, label='eto', ax=ax, color=plt.cm.tab10.colors[1]
)

# un-standardize the true y before plotting
y_unstandardized = y * y_std + y_mean
ax.scatter(y_unstandardized[0], y_unstandardized[1], color='green', s=20, label='true y')

ax.legend(handler_map={mpatches.FancyArrow: HandlerPatch(patch_func=make_legend_arrow)})
plt.show()

## Plot ellipse uncertainty from Gaussian regressor

In [None]:
from matplotlib.patches import Ellipse

In [None]:
def gaussian_to_ellipse_params(cov, q):
    eigvals, eigvecs = np.linalg.eigh(cov)

    # Calculate the angle of rotation (in degrees)
    # angle = np.degrees(np.arctan2(eigvecs[1, 0], eigvecs[0, 0]))
    angle = np.degrees(np.arctan2(eigvecs[1, 1], eigvecs[0, 1]))
    
    # Calculate the width and height
    width = 2 * np.sqrt(q * eigvals[0])
    height = 2 * np.sqrt(q * eigvals[1])
    return width, height, angle

In [None]:
def plot_ellipse(
    ckpt_path: str, alpha: float, seed: int, x: np.ndarray, label: str, ax: plt.Axes,
    color: Any, y: np.ndarray | None = None
) -> None:
    from models.gaussian import GaussianRegressor, conformal_q
    from portfolio.problems import PortfolioProblemEllipsoid

    model = GaussianRegressor(input_dim=2, y_dim=2)
    model.load_state_dict(torch.load(ckpt_path, weights_only=True))
    model.eval()

    loaders, (y_mean, y_std) = synthetic.get_loaders(batch_size=256, seed=seed, alpha=ALPHA, phi=PHI)
    q = conformal_q(model, loaders['calib'], alpha=alpha).item()

    with torch.no_grad():
        x_batch = torch.from_numpy(x[None]).to(dtype=torch.float32)
        loc, scale_tril = model(x_batch)
        loc = loc[0].numpy()
        scale_tril = scale_tril[0].numpy()

    if y is not None:
        prob = PortfolioProblemEllipsoid(N=2, y_mean=y_mean, y_std=y_std)
        prob.solve(loc, scale_tril * np.sqrt(q))
        task_loss = prob.task_loss_np(y, is_standardized=True)

        label = f'{label}, task loss {task_loss:.2g}'
        z = prob.primal_vars['z'].value
        # ax.plot([0, 3*z[0]], [0, 3*z[1]], label=r'$z^\star$', ls=':', color=color)
        ax.arrow(0, 0, 3*z[0], 3*z[1], width=0.15, label=r'$z^\star$', color=color)
        # ax.quiver(0, 0, z[0], z[1], scale=5, width=0.015, label=r'$z^\star$', color=color)
    else:
        task_loss = None

    # {yhat  :   (yhat - loc).T @ inv(cov) @ (yhat - loc) <= q}
    # yhat = (y - y_mean) / y_std = Sinv @ (y - y_mean)
    # {y :    (Sinv @ (y - y_mean) - loc).T @ inv(cov) @ (Sinv @ (y - y_mean) - loc) <= q}
    # {y :    (y - y_mean - S @ loc).T @ Sinv @ inv(cov) @ Sinv @ (y - y_mean - S @ loc) <= q}
    # ellipse with center = (y_mean + S @ loc) and cov = Sinv @ inv(cov) @ Sinv

    cov = scale_tril @ scale_tril.T
    Sinv = np.diag(1. / y_std)
    width, height, angle = gaussian_to_ellipse_params(Sinv @ np.linalg.pinv(cov) @ Sinv, q)
    mean = y_mean + loc * y_std

    ellipse = Ellipse(mean, width, height, angle=angle, ls='--', lw=2, fill=False, color=color)
    ax.add_artist(ellipse)
    return task_loss

In [None]:
fig, ax = plt.subplots(1, 1, tight_layout=True)

loaders, (y_mean, y_std) = synthetic.get_loaders(batch_size=256, seed=0, alpha=ALPHA, phi=PHI)
ds = loaders['test'].dataset

#  192, 280, 931
# 305 is inverted egg

x = ds[192][0].numpy()
y = ds[192][1].numpy()

plot_gmm_cond(ax, x)

# SAVED_LR = {0.01: 10**(-1.5), 0.05: 10**(-1.5), 0.1: 1e-2, 0.2: 10**(-2.5)}
# SAVED_L2REG = {0.01: 1e-4, 0.05: 1e-4, 0.1: 1e-3, 0.2: 1e-3}

plot_ellipse(
    ckpt_path='out/portfolio_syn_gaussian/eto_lr0.000316_reg0.01_seed0.pt',
    alpha=0.01, seed=0, x=x, y=y, label='eto', ax=ax, color=plt.cm.tab10.colors[0]
)

# plot_box(
#     ckpt_path='out/portfolio_syn_quantile/e2e_finetune_a0.10_lr0.0001_reg0.001_seed0.pt',
#     alpha=0.1, seed=0, x=x, y=y, label='eto', ax=ax, color=plt.cm.tab10.colors[1]
# )

# un-standardize the true y before plotting
y_unstandardized = y * y_std + y_mean
ax.scatter(y_unstandardized[0], y_unstandardized[1], color='green', s=20, label='true y')

ax.legend()
plt.show()

## Plot box, ellipse, and PICNN in 1 fig

In [None]:
fig, axs = plt.subplots(1, 3, figsize=(9, 3), sharey=True, tight_layout=True)

seed = 2
loaders, (y_mean, y_std) = synthetic.get_loaders(batch_size=256, seed=seed, alpha=ALPHA, phi=PHI)
ds = loaders['test'].dataset

#  192, 280, 931
# 305 is inverted egg

x = ds[192][0].numpy()
y = ds[192][1].numpy()
y_unstandardized = y * y_std + y_mean

# box uncertainty
ax = axs[0]
plot_gmm_cond(ax, x)
eto_task_loss = plot_box(
    ckpt_path='out/portfolio_syn_quantile/eto_a0.10_lr0.01_reg0.001_seed0.pt',
    alpha=0.1, seed=seed, x=x, y=y, label='ETO', ax=ax, color=plt.cm.tab10.colors[0]
)
e2e_task_loss = plot_box(
    ckpt_path='out/portfolio_syn_quantile/e2e_finetune_a0.10_lr0.0001_reg0.001_seed0.pt',
    alpha=0.1, seed=seed, x=x, y=y, label='E2E', ax=ax, color=plt.cm.tab10.colors[1]
)
ax.scatter(y_unstandardized[0], y_unstandardized[1], color='green', s=20, label='true y')
ax.set(title=f'Box\nETO task loss {eto_task_loss:.2f}\nE2E task loss {e2e_task_loss:.2f}')

# ellipse uncertainty
ax = axs[1]
plot_gmm_cond(ax, x)
eto_task_loss = plot_ellipse(
    ckpt_path='out/portfolio_syn_gaussian/eto_lr0.000316_reg0.01_seed0.pt',
    alpha=0.1, seed=seed, x=x, y=y, label='ETO', ax=ax, color=plt.cm.tab10.colors[0]
)
e2e_task_loss = plot_ellipse(
    ckpt_path='out/portfolio_syn_gaussian/e2e_finetune_a0.01_lr0.001_reg0.01_seed0.pt',
    alpha=0.1, seed=seed, x=x, y=y, label='E2E', ax=ax, color=plt.cm.tab10.colors[1]
)
ax.scatter(y_unstandardized[0], y_unstandardized[1], color='green', s=20, label='true y')
ax.set(title=f'Ellipse\nETO task loss {eto_task_loss:.2f}\nE2E task loss {e2e_task_loss:.2f}')

ax = axs[2]
plot_gmm_cond(ax, x)
alpha = 0.1
eto_task_loss = plot_picnn_contour(
    ckpt_path='out/portfolio_syn_picnn_d128/eto_L2_d128_lr0.01_reg0.001_seed0.pt',
    hidden_dim=128, alpha=alpha, seed=seed, x=x, y=y, label='ETO', ax=ax,
    color=plt.cm.tab10.colors[1]
)
e2e_task_loss = plot_picnn_contour(
    ckpt_path=f'out/portfolio_syn_picnn_d128/e2e_finetune_a{alpha:.2f}_L2_d128_lr0.005_reg0.001_seed0.pt',
    hidden_dim=128, alpha=alpha, seed=seed, x=x, y=y, label='E2E', ax=ax,
    color=plt.cm.tab10.colors[0]
)
ax.scatter(y_unstandardized[0], y_unstandardized[1], color='green', s=20, label='true y')
ax.set(title=f'PICNN\nETO task loss {eto_task_loss:.2f}\nE2E task loss {e2e_task_loss:.2f}')

ax.legend(
    handler_map={mpatches.FancyArrow: HandlerPatch(patch_func=make_legend_arrow)},
    bbox_to_anchor=(1.05, 1), loc='upper left'
)
plt.show()