In [None]:
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('seaborn-v0_8')  # make plots a bit nicer by default

# Generating data

In [None]:
def generate_linear_data(d_feature: int, n_samples: int, w_cov: np.ndarray = None, noise_scale: float = 0.0) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Generates data y = Xw + e where X is (n_samples, d_feature) and w is (d_feature, 1).

    Args:
        d_feature: Dimension of feature space.
        n_samples: Number of samples to generate.
        w_cov: Covariance matrix for generating w. If None, uses identity matrix.
        noise_scale: Standard deviation of Gaussian noise.

    Returns:
        X: Feature matrix of shape (n_samples, d_feature).
        y: Target vector of shape (n_samples,).
        w: True weight vector of shape (d_feature,).
    """
    if w_cov is None:
        w_cov = np.eye(d_feature)
    
    w = np.random.multivariate_normal(mean=np.zeros(d_feature), cov=w_cov)
    X = np.random.randn(n_samples, d_feature)
    y = X @ w + np.random.normal(scale=noise_scale, size=(n_samples,))
    
    return X, y.ravel(), w


def generate_mixed_linear_data(d_feature: int, n_samples_per_w: int, n_ws: int, w_cov: np.ndarray = None, noise_scale: float = 0.0) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Generates data y = Xw + e for multiple w vectors.

    Args:
        d_feature: Dimension of feature space.
        n_samples_per_w: Number of samples to generate for each w.
        n_ws: Number of different w vectors to use.
        w_cov: Covariance matrix for generating w. If None, uses identity matrix.
        noise_scale: Standard deviation of Gaussian noise.

    Returns:
        X: Feature matrix of shape (n_ws * n_samples_per_w, d_feature).
        y: Target vector of shape (n_ws * n_samples_per_w,).
        W: Matrix of true weight vectors of shape (n_ws, d_feature).
    """
    if w_cov is None:
        w_cov = np.eye(d_feature)
    
    W = np.random.multivariate_normal(mean=np.zeros(d_feature), cov=w_cov, size=n_ws)
    X = np.random.randn(n_ws * n_samples_per_w, d_feature)
    y = np.concatenate([X[i * n_samples_per_w : (i+1) * n_samples_per_w] @ W[i] for i in range(n_ws)])
    y += np.random.normal(scale=noise_scale, size=y.shape)
    
    return X, y, W


def generate_specified_linear_data(n_samples_per_w: int, w_stars: np.ndarray, noise_scale: float = 0.0) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Generates data y = Xw + e for specified w vectors.

    Args:
        n_samples_per_w: Number of samples to generate for each w.
        w_stars: Array of weight vectors, shape (n_ws, d_feature).
        noise_scale: Standard deviation of Gaussian noise.

    Returns:
        X: Feature matrix of shape (n_ws * n_samples_per_w, d_feature).
        y: Target vector of shape (n_ws * n_samples_per_w,).
        W: Matrix of true weight vectors of shape (n_ws, d_feature).
    """
    n_ws, d_feature = w_stars.shape
    X = np.random.randn(n_ws * n_samples_per_w, d_feature)
    y = np.concatenate([X[i*n_samples_per_w:(i+1)*n_samples_per_w] @ w_stars[i] for i in range(n_ws)])
    y += np.random.normal(scale=noise_scale, size=y.shape)
    
    return X, y, w_stars


def generate_std_basis_data(d_feature: int, n_samples: int, w: np.ndarray = None) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Generates data y = Xw where X consists of standard basis vectors.

    Args:
        d_feature: Dimension of feature space.
        n_samples: Number of samples to generate (must be <= d_feature).
        w: True weight vector. If None, randomly generated.

    Returns:
        X: Feature matrix of shape (n_samples, d_feature).
        y: Target vector of shape (n_samples,).
        w: True weight vector of shape (d_feature,).
    """
    assert n_samples <= d_feature, "n_samples must be <= d_feature for standard basis vectors"
    
    if w is None:
        w = np.random.randn(d_feature)
    
    X = np.eye(d_feature)[:n_samples]
    y = (X @ w).ravel()
    
    return X, y, w


def generate_mixed_std_basis_data(d_feature: int, n_samples_per_w: int, n_ws: int, w_cov: np.ndarray = None, noise_scale: float = 0.0) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Generates data y = Xw + e for multiple w vectors, where X consists of standard basis vectors.

    Args:
        d_feature: Dimension of feature space.
        n_samples_per_w: Number of samples to generate for each w (must be <= d_feature).
        n_ws: Number of different w vectors to use.
        w_cov: Covariance matrix for generating w. If None, uses identity matrix.
        noise_scale: Standard deviation of Gaussian noise.

    Returns:
        X: Feature matrix of shape (n_ws * n_samples_per_w, d_feature).
        y: Target vector of shape (n_ws * n_samples_per_w,).
        W: Matrix of true weight vectors of shape (n_ws, d_feature).
    """
    assert n_samples_per_w <= d_feature, "n_samples_per_w must be <= d_feature for standard basis vectors"
    
    if w_cov is None:
        w_cov = np.eye(d_feature)
    
    W = np.random.multivariate_normal(mean=np.zeros(d_feature), cov=w_cov, size=n_ws)
    X = np.tile(np.eye(d_feature)[:n_samples_per_w], (n_ws, 1))
    y = np.concatenate([W[i, :n_samples_per_w] for i in range(n_ws)])
    y += np.random.normal(scale=noise_scale, size=y.shape)
    
    return X, y, W

# $Q$ parameterization: not ensuring psd or symmetric

## Basic training loop ✅

In [None]:
def mirror_descent_step(w: np.ndarray, Q: np.ndarray, lr: float, x: np.ndarray, y: float) -> np.ndarray:
    """
    Single step of mirror descent.
    [ 7.1: this function isn't actually used right now, since the potential update computes this manually ]

    Args:
        w: Current weight vector (d_feature, 1).
        Q: Potential matrix (d_feature, d_feature).
        lr: Learning rate.
        x: Feature vector (d_feature,).
        y: Target value.

    Returns:
        Updated weight vector (d_feature,).
    """
    return w - 2 * lr * (np.inner(w, x) - y) * (Q @ x)


def crossval(w: np.ndarray, Q: np.ndarray, lr: float, X: np.ndarray, y: np.ndarray,
             ignore_diag=True) -> float:
    """
    Perform leave-one-out cross-validation.

    For each (x_i, y_i) in the dataset:
    1. "Train" a model with a single step of mirror descent on (x_i, y_i)
    2. Evaluate it on the rest of the dataset

    Tested against non-vectorized version, performs identically.

    Args:
        w: Initial weight vector (d_feature,).
        Q: Potential matrix (d_feature, d_feature).
        lr: Learning rate.
        X: Feature matrix (n_samples, d_feature).
        y: Target vector (n_samples,).

    Returns:
        Average loss over all i, j with i != j.
    """
    n, d = X.shape

    # Compute all pairwise losses in a vectorized manner
    errors = X @ w - y  # (n_samples,)
    XQX = X @ Q.T @ X.T  # (n_samples, n_samples)
    
    # Broadcasting to compute L_ij for all i, j
    L_squared = (errors - 2 * lr * errors[:, np.newaxis] * XQX)**2  # (n_samples, n_samples)
    
    # Enforcing i != j condition, if applicable
    if ignore_diag:
        np.fill_diagonal(L_squared, 0)
        denom = 2 * n * (n-1)
    else:
        denom = 2 * n**2
    
    return np.sum(L_squared) / denom  # average over all L_ij


def potential_update(w: np.ndarray, Q: np.ndarray, outer_lr: float, inner_lr: float, X: np.ndarray, y: np.ndarray,
                     ignore_diag=True) -> np.ndarray:
    """
    Update the potential matrix Q based on the derivative of the cross-validation loss.

    Args:
        w: Current weight vector (d_feature,).
        Q: Current potential matrix (d_feature, d_feature).
        outer_lr: Learning rate for updating Q.
        inner_lr: Learning rate for the inner mirror descent step (η in the formula).
        X: Feature matrix (n_samples, d_feature).
        y: Target vector (n_samples,).

    Returns:
        Updated potential matrix Q (d_feature, d_feature).
    """
    n_samples = X.shape[0]
    
    z = X @ w - y  # (n_samples,)
    XQX = X @ Q.T @ X.T  # (n_samples, n_samples)
    L = z - 2 * inner_lr * z[:, np.newaxis] * XQX  # L_{ij}, (n_samples, n_samples)
    
    # Enforce i != j condition, if applicable
    if ignore_diag:
        np.fill_diagonal(L, 0)
        denom = n_samples * (n_samples - 1)
    else:
        denom = n_samples ** 2
    
    # Black magic einsum expression to combine everything
    # Breaking it down:
    #  Crossval derivative involves z_i, L_{ij}, and x_j x_i^T.
    #  The indices of L are i,j
    #  The index of z is i
    #  The way to do "sum over all outer products of pairs of rows" is jk,il->kl
    #   (this isn't so obvious, but see "scratch work" below to walk through.)
    update = np.einsum('ij,i,jk,il->kl', L, z, X, X)
    update *= -2 * inner_lr / denom
    return Q - outer_lr * update

In [None]:
def train_potential(d, n, inner_lr, outer_lr, w0, Q0, n_iters,
                    w_cov=None, noise_scale=0):
    """
    Basic training loop.
    Generates linear data and runs potential_update for n_iters iterations.

    Returns crossvals, Qs, X, y, w_star.
    """
    X, y, w_star = generate_linear_data(d, n, w_cov, noise_scale)

    crossvals = np.zeros(n_iters)
    Q = Q0.copy()
    Qs = np.zeros((n_iters, d, d))
    Qs[0] = Q.copy()
    for i in range(n_iters):
        crossvals[i] = crossval(w0, Q, outer_lr, inner_lr, X, y, ignore_diag=True)
        Q = potential_update(w0, Q, outer_lr, inner_lr, X, y, ignore_diag=False)
        Qs[i] = Q.copy()

    return crossvals, Qs, X, y, w_star

### Checking correctness of `crossval` and `potential_update` ✅

In [None]:
# test crossval and potential_update against unvectorized versions

def random_experiment_setup():
    d_feature = np.random.randint(5, 15)
    n_samples = np.random.randint(5, 100)
    lr = np.random.uniform(0.01, 1)
    noise_scale = np.random.uniform(0, 1)

    X, y, w_star = generate_linear_data(d_feature, n_samples, noise_scale=noise_scale)

    w = np.random.randn(d_feature)
    Q = np.random.randn(d_feature, d_feature)

    return d_feature, n_samples, lr, X, y, w_star, w, Q

n_tests = 1

# ------------------------ Testing L_ij -----------------------------
for _ in range(n_tests):
    d, n, lr, X, y, w_star, w, Q = random_experiment_setup()
    z = w @ X.T - y
    M = X @ Q.T @ X.T
    L = (z - 2 * lr * z[:, np.newaxis] * M)  # L[i,j] = L_{ij}  (assuming lr = 0.5)

    # check that it works
    for i in range(d):
        for j in range(d):
            manual_calc = z[j] - 2 * lr * z[i] * X[j].T @ Q @ X[i]
            assert np.isclose(L[i,j], manual_calc), f"{i,j} {L[i,j], manual_calc}"
print("L_ij passed test.")


# ------------------------ Testing crossval -----------------------------
def crossval_nonvec(w, Q, lr, xs, ys):
    """
    For each (x_i, y_i) in zip(xs, ys):
    1. "Train" a model with a single step of mirror descent on (x_i, y_i)
    2. Evaluate it on the rest of the dataset
    Return the average loss over all i, j with i != j.
    """
    k = len(xs)
    def L_ij(w, Q, xi, xj, yi, yj):
        return ( w.T @ xj - 2 * lr * (w.T @ xi - yi) * (xj.T @ Q @ xi) - yj )**2
    value = 0
    for i, (xi, yi) in enumerate(zip(xs, ys)):
        for j, (xj, yj) in enumerate(zip(xs, ys)):
            if i == j:
                continue
            value += L_ij(w, Q, xi, xj, yi, yj).item()
    return value / (2 * k * (k-1))

for _ in range(n_tests):
    # all parameters are randomized over a bunch of runs
    d_feature, n_samples, lr, X, y, w_star, w, Q = random_experiment_setup()

    # Prepare inputs for crossval_nonvec
    xs = [X[i, :].reshape(-1, 1) for i in range(n_samples)]  # list of (d_feature, 1) arrays
    ys = y.tolist()  # list of floats

    result_nonvec = crossval_nonvec(w[:, np.newaxis], Q, lr, xs, ys)
    result_vec = crossval(w, Q, lr, X, y)

    assert np.isclose(result_nonvec, result_vec) 
print("crossval passed test.")


# ------------------------ Testing potential_update -----------------------------
def potential_update_nonvec(w, Q, outer_lr, inner_lr, xs, ys):
    """
    Derivative of the cross-validation loss (as implemented in `crossval` above)
    with respect to the matrix Q.

    Returns the updated matrix.
    """
    k = len(xs)
    def cv_derivative(w, Q, lr, xi, xj, yi, yj):
        err = (w.T @ xi - yi).item()
        #return (w.T @ xj - 2 * lr * err * (xi.T @ Q @ xj) - yj).item() * err * np.outer(xi, xj)
        return (w.T @ xj - 2 * lr * err * (xj.T @ Q @ xi) - yj).item() * err * (xj @ xi.T)
    
    update = 0
    for i, (xi, yi) in enumerate(zip(xs, ys)):
        for j, (xj, yj) in enumerate(zip(xs, ys)):
            if i != j:
                update += cv_derivative(w, Q, inner_lr, xi, xj, yi, yj)
    update = - 2 * inner_lr * update / (k * (k - 1))
    return Q - outer_lr * update

for _ in range(n_tests):
    d_feature, n_samples, inner_lr, X, y, w_star, w, Q = random_experiment_setup()
    outer_lr = np.random.uniform(0.01, 1)

    xs = [X[i, :].reshape(-1, 1) for i in range(n_samples)]  # list of (d_feature, 1) arrays
    ys = y.tolist()  # list of floats

    result_nonvec = potential_update_nonvec(w, Q, outer_lr, inner_lr, xs, ys)
    result_vec = potential_update(w, Q, outer_lr, inner_lr, X, y)
    assert np.allclose(result_nonvec, result_vec) 
print("potential_update passed test.")
    


In [None]:
# Scratch work: how do you vectorize "sum of outer products of rows of X"

# i.e. sum_{i,j} np.outer(x_i, x_j)
d, n = 5, 10
X = np.random.randn(n,d)

# sum of outer products of rows, written as a loop
sopr = np.zeros((d,d))
for i in range(n):
    for j in range(n):
        sopr += np.outer(X[j], X[i])

# turn the loop into a nested list comprehension: (n,d) -> (n, n, d, d), and then sum over first two dims
expand = np.array( [[ np.outer(X[i], X[j]) for j in range(n)] for i in range(n)] )
sopr2 = expand.sum((0,1))
assert np.allclose(sopr, sopr2)

# turn into einsum: same expand step first, then sum over first two indices
einsum_expand = np.einsum('ik,jl->ijkl', X, X)
sopr3 = np.einsum('ik,jl->ijkl', X, X).sum((0,1))
assert np.allclose(einsum_expand, expand)
assert np.allclose(sopr, sopr3)

# finally, turn into an even simpler einsum
sopr4 = np.einsum('ik,jl->kl', X, X)
assert np.allclose(sopr, sopr4)

### Known example: noiseless case, standard basis data ✅

In [None]:
def optimal_non_pd_potential_std_basis(w0: np.ndarray, w_star: np.ndarray, lr: float = 0.5) -> np.ndarray:
    """
    Returns matrix Q that makes the error L_[ij] = 0 for standard basis data.

    Args:
        w0: Initial weight vector (d_feature,).
        w_star: True weight vector (d_feature,).
        lr: Learning rate.

    Returns:
        Optimal potential matrix Q (d_feature, d_feature).
    """
    d_feature = w0.shape[0]
    Q = np.zeros((d_feature, d_feature))
    for i in range(d_feature):
        for j in range(d_feature):
            Q[i, j] = ((w0[i] - w_star[i]) / (w0[j] - w_star[j])).item()
    return Q / (2 * lr)

In [None]:
d_feature, n_samples = 5, 5
inner_lr, outer_lr = 1, 0.4
n_potential_iterations = 3000

X, y, w_star = generate_std_basis_data(d_feature, n_samples)
w0 = np.ones((d_feature,))
Q0 = np.random.randn(d_feature, d_feature)

Q_star = optimal_non_pd_potential_std_basis(w0, w_star, inner_lr)

Q = Q0.copy()
crossvals = np.zeros(n_potential_iterations)
Qs = np.zeros((n_potential_iterations, d_feature, d_feature))
for i in range(n_potential_iterations):
    crossvals[i] = crossval(w0, Q, inner_lr, X, y)
    Qs[i] = Q
    Q = potential_update(w0, Q, outer_lr, inner_lr, X, y, ignore_diag=False)

# Plotting results
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(crossvals)
plt.title("Crossval loss")
plt.yscale('log')
plt.xlabel("Iterations")
plt.ylabel("Loss")

plt.subplot(1, 2, 2)
Q_dists = np.linalg.norm(Qs - Q_star, axis=(1, 2))
plt.plot(Q_dists)
plt.title("$d(Q, Q_\star)$")
plt.yscale('log')
plt.xlabel("Iterations")
plt.ylabel("Distance")

plt.tight_layout()
plt.show()

## Graphs for single $w$

## Graphs for mixed $w$ / recovering covariance

# $UU^\top$ parameterization

## Basic training loop

## Graphs for single $w$

## Graphs for mixed $w$ / recovering covariance