In [None]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from scipy.linalg import svd
from skimage.metrics import structural_similarity as ssim
import plotly.graph_objects as go
from plotly.subplots import make_subplots


In [None]:
def plot_digits(X, title):
    """Small helper function to plot 100 digits."""
    fig, axs = plt.subplots(nrows=10, ncols=10, figsize=(8, 8))
    for img, ax in zip(X, axs.ravel()):
        ax.imshow(img.reshape((16, 16)), cmap="Greys")
        ax.axis("off")
    fig.suptitle(title, fontsize=24)

def SSIM_Batch(X, X_true):
    m, _ = X.shape
    ssim_val = 0
    for i in range(m):
        ns = X[i].reshape((16, 16))
        gt = X_true[i].reshape((16, 16))

        ssim_val += ssim(ns, gt, data_range=1.)

    return ssim_val / m


In [None]:
X, y = fetch_openml(data_id=41082, as_frame=False, return_X_y=True)
X = MinMaxScaler().fit_transform(X)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, stratify=y, random_state=0, train_size=1_000, test_size=100
)

rng = np.random.RandomState(0)
noise = rng.normal(scale=0.25, size=X_test.shape)
X_test_noisy = X_test + noise

noise = rng.normal(scale=0.25, size=X_train.shape)
X_train_noisy = X_train + noise

In [None]:
plot_digits(X_test, "Uncorrupted test images")
plot_digits(
    X_test_noisy, f"Noisy test images\nMSE: {np.mean((X_test - X_test_noisy) ** 2):.2f}"
)

# PCA - Principal Component Analysis

Given a matrix of data $X$, we will perform PCA to reduce the dimension of $X$ and then transform back in hopes that we remove the noise, which is usually related to the lower valued singular values, while still keeping the relevant information usually related to the higher eigenvalues. We do this by first making $X$ have zero mean, transforming it to a lower dimension, and then transforming it back to the original dimension only keeping the largest *Principal Components* which are the singular values of normalized $X$.

1. First, normalize $X$ with the column mean of $X$, denoted as $\mu_X$, to get $\hat{X}$:

$$\hat{X} = X - \mu_X$$


2. Perform the SVD of $\hat{X}$:
$$\hat{X} = U\Sigma W^T$$

3. Transform $\hat{X}$ to $Z_k$, where $Z_k$ is the lower dimensional representation of $\hat{X}$ using $k$ of the principal components (right singular vectors) of $\hat{X}$:
$$Z_k = \hat{X}W_k$$

4. We can transform back to the original dimension while loosing lower level information to get $X_k$:
$$X_k = Z_kW_k^T$$

We can see the error of $X$ against $X_k$ using the MSE score defined as $||e_k||^2_2$, where $e_k = X - X_k$ and note that as $k$ increases, $e_k$ will decrease.

**NOTE** The sign corresponding to the vectors in $U$ and $W$ can be different depending on implementation, but will cancel out when multiplied to get $X$ back.

## Singular Value Decomposition

In [None]:
col_train_means = np.mean(X_train_noisy, axis=0)
col_test_means = np.mean(X_test_noisy, axis=0)

X_train_hat = X_train_noisy - col_train_means
X_test_hat = X_test_noisy - col_test_means

U_train, s_train, W_train = svd(X_train_hat, full_matrices=True)
U_test, s_test, W_test = svd(X_test_hat, full_matrices=True)

S_train = np.zeros_like(X_train_hat)
S_train[:len(s_train), :len(s_train)] = np.diag(s_train)

S_test = np.zeros_like(X_test_hat)
S_test[:len(s_test), :len(s_test)] = np.diag(s_test)

print('X_train_hat == U_train S_train W_train: ', np.allclose(X_train_hat, np.matmul(np.matmul(U_train, S_train), W_train)))
print('X_test_hat == U_test S_test W_test: \t', np.allclose(X_test_hat, np.matmul(np.matmul(U_test, S_test), W_test)))

## PCA - All Singular Values 

In [None]:
pca = PCA(n_components=256)
pca.fit(X_train_noisy)

Z_train_noisy = pca.transform(X_train_noisy)

print('Z_train_noisy == X_train_hat W_train: \t\t', 
      np.allclose(abs(Z_train_noisy), abs(np.matmul(X_train_hat, W_train.T))))
print('Z_train_noisy == X_train_hat pca.components_.T: ', 
      np.allclose(Z_train_noisy, np.matmul(X_train_hat, pca.components_.T)))
print('Z_train_noisy == U_train S_train: \t\t', 
      np.allclose(abs(Z_train_noisy), abs(np.matmul(U_train, S_train))))


In [None]:
Z = np.matmul(X_train_hat, W_train.T)
X_train_back = pca.inverse_transform(Z_train_noisy)

print('X_train_noisy == X_train_back: \t\t\t\t', 
      np.allclose(X_train_noisy, X_train_back))
print('X_train_noisy == (X_train_hat W_train.T) W_train: \t', 
      np.allclose(X_train_hat, np.matmul(Z, W_train)))
print('X_train_noisy == Z_train_noisy, pca.components_: \t', 
      np.allclose(X_train_hat, np.matmul(Z_train_noisy, pca.components_)))


### Test

In [None]:
Z_test_noisy = pca.transform(X_test_noisy)
X_test_pca = pca.inverse_transform(Z_test_noisy)

print('Z_test_noisy == (X_test_noisy - col_train_means) pca.components_.T: ', 
      np.allclose(Z_test_noisy, np.dot(X_test_noisy - col_train_means, pca.components_.T)))

print()
print('MSE Error: ', np.mean((X_test_pca - X_test) ** 2))

## PCA - 32 Components

In [None]:
k = 32
pca = PCA(n_components=k)

pca.fit(X_train_noisy)

Z_test_noisy = pca.transform(X_test_noisy)
X_test_pca = pca.inverse_transform(Z_test_noisy)
print('MSE Error: ', np.mean((X_test_pca - X_test) ** 2))

In [None]:
plot_digits(X_test, "Uncorrupted test images")
plot_digits(
    X_test_noisy, f"Noisy test images\nMSE: {np.mean((X_test - X_test_noisy) ** 2):.3f}"
)
plot_digits(
    X_test_pca,
    f"PCA reconstruction, k = {k}, \nMSE: {np.mean((X_test - X_test_pca) ** 2):.3f}".format(k=k),
)

## Find the best $k$ value for this denoising method

In [None]:
k_vals = np.arange(1, 256 + 1, 1)

In [None]:
MSE = []
SSIM_array = []
MSE_max = 1e4
MSE_index = -1
for index, k in enumerate(k_vals):
    pca = PCA(n_components=k)
    pca.fit(X_train_noisy)
    
    Z_test_noisy = pca.transform(X_test_noisy)
    X_test_pca = pca.inverse_transform(Z_test_noisy)
    MSE_val = np.mean((X_test_pca - X_test) ** 2)
    SSIM_val = SSIM_Batch(X_test, X_test_pca)
    MSE.append(MSE_val)
    SSIM_array.append(SSIM_val)

    if MSE_max > MSE_val:
        MSE_max = MSE_val
        MSE_index = index

In [None]:
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(go.Scatter(x=k_vals, y=MSE, name='MSE'), secondary_y=False)
# fig.add_trace(go.Scatter(x=k_vals, y=SSIM_array, name='SSIM'), secondary_y=True)

fig.update_layout(
    xaxis_title='Principal Components',
    yaxis_title='Loss',
    width=1000,  # Set width of the graph
    height=400  # Set height of the graph
)
fig.show()

In [None]:
k = k_vals[MSE_index]
pca = PCA(n_components=k)

pca.fit(X_train_noisy)

Z_test_noisy = pca.transform(X_test_noisy)
X_test_pca = pca.inverse_transform(Z_test_noisy)
print('MSE Error: ', np.mean((X_test_pca - X_test) ** 2))

In [None]:
def plot_digits_plotly(X):
    """Small helper function to plot 64 digits using plotly."""
    # Create subplot grid
    fig = make_subplots(rows=8, cols=8)
    
    # Add each image as a heatmap
    for idx, img in enumerate(X[:64]):
        row = idx // 8 + 1
        col = idx % 8 + 1
        
        # Reshape image and create heatmap
        img_reshaped = img.reshape((16, 16))
        fig.add_trace(
            go.Heatmap(z=img_reshaped, 
                      colorscale='Greys',
                      showscale=False),
            row=row, col=col
        )
        
        # Remove axes for each subplot
        fig.update_xaxes(showticklabels=False, showgrid=False, row=row, col=col)
        fig.update_yaxes(autorange="reversed", showticklabels=False, showgrid=False, row=row, col=col)
    
    # Update layout
    fig.update_layout(
        width=800,
        height=800,
        showlegend=False,
        margin=dict(t=0, l=0, r=0, b=0)
    )
    
    return fig

In [None]:
plot_digits_plotly(X_test)


In [None]:
plot_digits_plotly(X_test_noisy)

In [None]:
plot_digits_plotly(X_test_pca)

In [None]:
SSIM_Batch(X_test_pca, X_test)