# Homework: Few-Shot Learning via Auxiliary Labels


Import necessary Python packages.


In [None]:
import numpy as np
import sklearn
from sklearn.decomposition import PCA
from matplotlib import pyplot as plt
from sklearn.linear_model import Ridge
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error


Let's firs generate the data. Note that we only generate it once for fair comparison of different methods in later parts.


In [None]:
np.random.seed(0)

# Define number of points for training and validatiion
N_train = 1000
N_val = 5000

# Define dimensions of true latent variable and irrelavant features
dim_t = 2
dim_f = 20

# Define dimension of data X
dim_d = 200

# Define noise level
sigma_n = 0.001
sigma_z = 1
sigma_y = 1
sigma_f = np.sqrt(0.5*0.75)

# Define hyperparameter for the ridge regression
ridge_lambda = 0.05

def generate_T(N):
    theta = np.random.uniform(low=0.0, high=2*np.pi, size=N)
    L = np.vstack((0.5 * np.sin(theta), np.cos(theta))).transpose()
    return L@orth_basis(2,2)
#     return np.sign(np.random.rand(N, dim_t)) * 0.2


def generate_f(N, dim_f=2):
    F = np.random.normal(0, sigma_f, (N, dim_f))
    return F

def orth_basis(dim, dim_t):
    ## This function creates orthogonal basis from random projection
    random_state = np.random
    H = np.eye(dim)
    D = np.ones((dim,))
    for n in range(1, dim):
        x = random_state.normal(size=(dim-n+1,))
        D[n-1] = np.sign(x[0])
        x[0] -= D[n-1]*np.sqrt((x*x).sum())
        # Householder transformation
        Hx = (np.eye(dim-n+1) - 2.*np.outer(x, x)/(x*x).sum())
        mat = np.eye(dim)
        mat[n-1:, n-1:] = Hx
        H = np.dot(H, mat)
        # Fix the last sign such that the determinant is 1
    D[-1] = (-1)**(1-(dim % 2))*D.prod()
    # Equivalent to np.dot(np.diag(D), H) but faster, apparently
    H = (D*H.T).T
    return H[:, :dim_t]


# Generate latent variable
T = generate_T(N_train+N_val)
# Generate irrelevant features
F = generate_f(N_train+N_val, dim_f)

# Generate data X
noise = np.random.normal(0, sigma_n, (N_train+N_val, dim_d))
V = orth_basis(dim_d, dim_t+dim_f)
X = np.hstack((T, F))@np.transpose(V) + noise

# Generate output y
theta = np.random.rand(dim_t, 1)
y = T@theta + np.random.normal(0, sigma_y, (N_train+N_val, 1))

# Whitening the data
scaler = StandardScaler()
scaler.fit(X)
X_w = scaler.transform(X)

# Split train/val
X_val = X_w[-N_val:, :]
y_val = y[-N_val:, :]
X_train = X_w[:N_train, :]
y_train = y[:N_train, :]
T_train = T[:N_train, :]


Now let's generate auxiliary labels.


In [None]:
def generate_Z(dim_k):
    q = np.random.randn(dim_t, dim_k)
    Z_train = T_train@q  + np.random.normal(0, sigma_z, (N_train, dim_k))

    Z_clean = T_train@q
    return q, Z_train, Z_clean


Below are functions for different methods. You will need to implement missing parts for truncated SVD and solving $\theta$.


In [None]:
def tSVD(X, Z):
    pass
    ## TODO: implement the truncated SVD as part (b)
    ## You are allowed to use np.linalg packages if necessary
    ### start 1 ###

    ### end 1 ###

def fit_PCA(X, dim_t):
    # pca = PCA(n_components=dim_l, whiten=True)
    pca = PCA(n_components=dim_t)
    pca.fit(X)
    T_hat = pca.fit_transform(X)
    return T_hat, pca.components_

def solve_theta(T_hat, y):
    pass
    ## TODO: implement the function to solve \theta from T and y
    ## You can refer to part (c) if necessary
    ### start 2 ###

    ### end 2 ###

def random_project_data(X, dim_t):
    dim_d = X.shape[1]
    W = orth_basis(dim_d, dim_t)
    return X@W

def loss_eval(w):
    return mean_squared_error(X_val@w, y_val)


Now let's run the code!


In [None]:
def vis_latent(L, X_hat, L_hat, L_rand, L_pca):
    fig, (ax1, ax2, ax3, ax4, ax5) = plt.subplots(1, 5)
    fig.set_size_inches(26.5, 6.5)
    fig.suptitle('Latent Variable: Clean Ground-Truth (1st) vs Noisy Ground-Truth (2nd) vs SVD Recovered (3rd) vs PCA Recovered (4th) vs Random Projection (5th)', fontsize=25)
    ax1.plot(L[:, 0], L[:,1], ".")
    ax2.plot(X_hat[:, 0], X_hat[:,1], ".")
    ax3.plot(L_hat[:, 0], L_hat[:,1], ".")
    ax4.plot(L_pca[:, 0], L_pca[:,1], ".")
    ax5.plot(L_rand[:, 0], L_rand[:,1], ".")

def gen_aux_data_and_fit_model(N, sigma_n, dim_d = 50, dim_t_local = dim_t, dim_k=100, VIS = False):
    q, Z, Z_clean = generate_Z(dim_k)

    # random projection
    T_rand = random_project_data(X_train, dim_t_local)

    # PCA projection
    T_pca, U_pca = fit_PCA(X_train, dim_t_local)
    U_pca = U_pca.transpose()
    theta_baseline = solve_theta(T_pca, y_train)
    weight_baseline = U_pca@theta_baseline

    # SVD
    U = tSVD(X_train, Z)
    T_hat = X_train@U
    theta_hat = solve_theta(T_hat, y_train)
    weight_ours = U@theta_hat

    # baseline (access to clean Z)
    U_clean = tSVD(X_train, Z_clean)
    T_clean_hat = X_train@U_clean
    theta_clean_hat = solve_theta(T_clean_hat, y_train)
    weight_clean_ours = U_clean@theta_clean_hat

    # perfect baseline
    T_best = X_train@V[:, :dim_t]
    theta_best = solve_theta(T_best, y_train)
    weight_best = V[:, :dim_t]@theta_best

    if VIS:
        vis_latent(T, T_best, T_hat, T_rand, T_pca)

    return loss_eval(weight_baseline), loss_eval(weight_ours), loss_eval(weight_best), loss_eval(weight_clean_ours)


How does truncated SVD recover the latent variable $T$? Let's visualize the latent space.

**Make sure you include this plot in your solution.**


In [None]:
_, _, _, _ = gen_aux_data_and_fit_model(N_train, sigma_n, dim_d, dim_t_local = 2, dim_k=40, VIS = True)


Let's change the dimension of auxiliary label $Z$ and observe the performance of different methods. Run this a few times so that you can get a sense of the variability of this approach as well.

**Make sure you include this plot in your solution.**


In [None]:
diff_baselines, diff_ourss, diff_bests, diff_clean_ourss = [], [], [], []
dim_ks = np.arange(1, 40, 1)
for dim_k in dim_ks:
    diff_baseline, diff_ours, diff_best, diff_clean_ours = gen_aux_data_and_fit_model(N_train, sigma_n, dim_d, dim_t_local = 5, dim_k=dim_k, VIS = False)
    diff_baselines.append(diff_baseline)
    diff_bests.append(diff_best)
    diff_ourss.append(diff_ours)
    diff_clean_ourss.append(diff_clean_ours)


In [None]:
plt.plot(dim_ks, diff_baselines, '-.', label="PCA baseline")
plt.plot(dim_ks, diff_bests, '--.', label="best (access to V_{ux})")
plt.plot(dim_ks, diff_clean_ourss, '--.', label="baseline (access to clean aux labels)")
plt.plot(dim_ks, diff_ourss,label="ours")
plt.legend()
plt.title("Coefficient Reconstruction Difference vs # Dimension of Z")
plt.xlabel("$k$")
plt.xticks(np.arange(1, 40, 2))
plt.ylabel("MSE Loss (Validation)")
plt.show()
